WIP attempt to build a proxy class generator

This commit is contained in:
Paul Bergeron 2012-12-07 12:06:06 -08:00
parent 38b1280118
commit e19be8c463
5 changed files with 196 additions and 91 deletions

43
generator.rb Normal file
View File

@ -0,0 +1,43 @@
require 'erb'
require 'pry'
require 'java'
require 'active_support/core_ext'
Dir["../triggit-storm/target/dependency/storm/default/*"].each{|f| $CLASSPATH << File.expand_path(f) }
to_generate = ["storm.trident.spout.IBatchSpout"]
PROXY_JRUBY_TEMPLATE = File.read("./ruby_proxy.erb")
PROXY_JAVA_TEMPLATE = File.read("./java_proxy.erb")
to_generate = ["storm.trident.spout.IBatchSpout"]
to_generate.each do |klass|
_functions = Object.const_get(java_import(klass)[0].to_s.split("::")[-1]).java_class.declared_instance_methods
java_deps = _functions.map{|f| [f.argument_types.map{|at| at.name}, f.return_type ? f.return_type.name : "void"] }.flatten.uniq.reject{|t| t.split('.').count == 1} << klass
functions = _functions.reduce({}) do |memo, f|
memo[:"#{f.name}"] = {
:return_type => f.return_type ? f.return_type.name.split('.')[-1] : "void",
:args => f.argument_types.map {|at| {:"_#{at.name.split('.')[-1].camelize(:lower)}" => at.name.split('.')[-1]} }.reduce({}){|m,o| m.merge(o)}
}
memo
end
interface_name = klass.split(".")[-1]
ruby_class_name = interface_name[1..-1]
java_class_name = "JRuby#{ruby_class_name}"
methods = functions.map do |f_name, params|
{f_name.to_s.underscore.to_sym => {:return_type => params[:return_type], :args => params[:args].map{|name, type| {name.to_s.underscore.to_sym => type}}.reduce({}){|m,o| m.merge(o)} }}
end.reduce({}){|m,o| m.merge(o)}
File.open("./lib/red_storm/proxy/#{ruby_class_name.underscore}.rb", 'w') {|f| f.write(ERB.new(PROXY_JRUBY_TEMPLATE).result(binding)) }
File.open("./src/main/redstorm/storm/jruby/#{java_class_name}.java", 'w') {|f| f.write(ERB.new(PROXY_JAVA_TEMPLATE).result(binding)) }
end

50
java_proxy.erb Normal file
View File

@ -0,0 +1,50 @@
package redstorm.storm.jruby;
<% java_deps.each do |dep| %>
import <%= dep %>;<% end %>
public class <%= java_class_name %> implements <%= interface_name %> {
<%= interface_name %> _proxy;
String _realClassName;
String _baseClassPath;
String[] _fields;
public <%= java_class_name %>(final String baseClassPath, final String realClassName, final String[] fields) {
_baseClassPath = baseClassPath;
_realClassName = realClassName;
_fields = fields;
}
<% functions.each do |function_name, params| %>
@Override
public <%= params[:return_type] %> <%= function_name %>(<%= params[:args].map{|n,t| ["final #{t}", n].join(' ') }.flatten.join(', ') %>) {
<% if function_name == :open %>
_proxy = newProxy(_baseClassPath, _realClassName);
_proxy.open(<%= params[:args].keys.flatten.join(', ') %>);
<% elsif function_name == :declareOutputFields %>
if (_fields.length > 0) {
<%= params[:args].values[0] %>.declare(new Fields(_fields));
} else {
newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:function_args_and_types].values.join(', ') %>);
}
<% elsif params[:before_serialization] %>
newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:function_args_and_types].values.join(', ') %>);
<% else %>
_proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>)
<% end %>
}
<% end %>
@Override
public Map<String, Object> getComponentConfiguration() {
newProxy(_baseClassPath, _realClassName).getComponentConfiguration();
}
private static <%= interface_name %> newProxy(final String baseClassPath, final String realClassName) {
try {
redstorm.proxy.<%= ruby_class_name %> proxy = new redstorm.proxy.<%= ruby_class_name %>(baseClassPath, realClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,71 +1,63 @@
require 'java'
java_import 'storm.trident.operation.TridentCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'storm.trident.spout.IBatchSpout'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
java_import 'backtype.storm.task.TopologyContext'
java_import 'storm.trident.operation.TridentCollector'
java_import 'backtype.storm.tuple.Fields'
java_import 'storm.trident.spout.IBatchSpout'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
# the BatchSpout class is a proxy to the real batch spout to avoid having to deal with all the
# Java artifacts when creating a spout.
#
# The real batch spout class implementation must define these methods:
# - open(conf, context, collector)
# - emitBatch
# - getOutputFields
# - ack(batch_id)
#
# and optionnaly:
# - close
#
class BatchSpout
java_implements IBatchSpout
java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)'
def initialize(base_class_path, real_spout_class_name)
@real_spout = Object.module_eval(real_spout_class_name).new
java_signature 'IBatchSpout (String base_class_path, String real_class_name)'
def initialize(base_class_path, real_class_name)
@real = Object.module_eval(real_class_name).new
rescue NameError
require base_class_path
@real_spout = Object.module_eval(real_spout_class_name).new
@real = Object.module_eval(real_class_name).new
end
java_signature 'void open(Map, TopologyContext)'
def open(conf, context)
@real_spout.open(conf, context)
def open(_map, _topology_context)
@real.open(Map, TopologyContext)
end
java_signature 'void close()'
def close
@real_spout.close if @real_spout.respond_to?(:close)
end
java_signature 'void emitBatch(long, TridentCollector)'
def emitBatch(batch_id, collector)
@real_spout.emit_batch(batch_id, collector)
def close()
@real.close()
end
java_signature 'void ack(long)'
def ack(batch_id)
@real_spout.ack(batch_id)
def ack(_long)
@real.ack(long)
end
java_signature 'Fields getOutputFields()'
def getOutputFields
@real_spout.get_output_fields()
java_signature 'void emit_batch(long, TridentCollector)'
def emit_batch(_long, _trident_collector)
@real.emit_batch(long, TridentCollector)
end
java_signature 'Map<String, Object> getComponentConfiguration()'
def getComponentConfiguration
@real_spout.get_component_configuration
java_signature 'Map get_component_configuration()'
def get_component_configuration()
@real.get_component_configuration()
end
java_signature 'Fields get_output_fields()'
def get_output_fields()
@real.get_output_fields()
end
end

30
ruby_proxy.erb Normal file
View File

@ -0,0 +1,30 @@
require 'java'
<% java_deps.each do |dep| %>
java_import '<%= dep %>'
<% end %>
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
class <%= ruby_class_name %>
java_implements <%= interface_name %>
java_signature '<%= interface_name %> (String base_class_path, String real_class_name)'
def initialize(base_class_path, real_class_name)
@real = Object.module_eval(real_class_name).new
rescue NameError
require base_class_path
@real = Object.module_eval(real_class_name).new
end
<% methods.each do |method_name, params| %>
java_signature '<%= params[:return_type] %> <%= method_name %>(<%= params[:args].values.join(', ') %>)'
def <%= method_name %>(<%= params[:args].keys.join(', ') %>)
@real.<%= method_name %>(<%= params[:args].values.join(', ') %>)
end
<% end %>
end

View File

@ -1,85 +1,75 @@
package redstorm.storm.jruby;
import storm.trident.operation.TridentCollector;
import backtype.storm.task.TopologyContext;
import storm.trident.spout.IBatchSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import storm.trident.operation.TridentCollector;
import backtype.storm.tuple.Fields;
import storm.trident.spout.IBatchSpout;
/**
* the JRubyBatchSpout class is a simple proxy class to the actual spout implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization process when dispatching
* the spout to the workers. JRuby does not yet support serialization from Java
* (Java serialization call on a JRuby class).
*
* Note that the JRuby spout proxy class is instanciated in the open method which is called after
* deserialization at the worker and in both the declareOutputFields and isDistributed methods which
* are called once before serialization at topology creation.
*/
public class JRubyBatchSpout implements IBatchSpout {
IBatchSpout _proxySpout;
String _realSpoutClassName;
IBatchSpout _proxy;
String _realClassName;
String _baseClassPath;
String[] _fields;
/**
* create a new JRubyBatchSpout
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realSpoutClassName the fully qualified JRuby spout implementation class name
*/
public JRubyBatchSpout(String baseClassPath, String realSpoutClassName, String[] fields) {
public JRubyBatchSpout(final String baseClassPath, final String realClassName, final String[] fields) {
_baseClassPath = baseClassPath;
_realSpoutClassName = realSpoutClassName;
_realClassName = realClassName;
_fields = fields;
}
@Override
public void open(final Map conf, final TopologyContext context) {
// create instance of the jruby proxy class here, after deserialization in the workers.
_proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName);
_proxySpout.open(conf, context);
}
@Override
public void emitBatch(final long batchId, final TridentCollector collector) {
_proxySpout.emitBatch(batchId, collector);
public void open(final Map _map, final TopologyContext _topologyContext) {
_proxy = newProxy(_baseClassPath, _realClassName);
_proxy.open(_map, _topologyContext);
}
@Override
public void close() {
_proxySpout.close();
_proxy.close()
}
@Override
public void ack(final long batchId) {
_proxySpout.ack(batchId);
public void ack(final long _long) {
_proxy.ack(_long)
}
@Override
public void emitBatch(final long _long, final TridentCollector _tridentCollector) {
_proxy.emitBatch(_long, _tridentCollector)
}
@Override
public Map getComponentConfiguration() {
_proxy.getComponentConfiguration()
}
@Override
public Fields getOutputFields() {
// getOutputFields is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call declareOutputFields.
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.getOutputFields();
_proxy.getOutputFields()
}
@Override
public Map<String, Object> getComponentConfiguration() {
// getComponentConfiguration is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call declareOutputFields.
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.getComponentConfiguration();
newProxy(_baseClassPath, _realClassName).getComponentConfiguration();
}
private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) {
private static IBatchSpout newProxy(String baseClassPath, String realClassName) {
try {
redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName);
redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realClassName);
return proxy;
}
catch (Exception e) {