From 5eb716184191559395356040fbc9618d6fec018a Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Tue, 11 Dec 2012 22:27:04 -0800 Subject: [PATCH] Generator correctly handles Trident Function, created proxy class for Trident Functions --- generator.rb | 10 ++- java_proxy.erb | 15 ++-- lib/red_storm/proxy/batch_spout.rb | 72 ++++++++------- lib/red_storm/proxy/proxy_function.rb | 48 ++++++++++ .../redstorm/storm/jruby/JRubyBatchSpout.java | 88 +++++++++++-------- .../storm/jruby/JRubyProxyFunction.java | 59 +++++++++++++ 6 files changed, 211 insertions(+), 81 deletions(-) create mode 100644 lib/red_storm/proxy/proxy_function.rb create mode 100644 src/main/redstorm/storm/jruby/JRubyProxyFunction.java diff --git a/generator.rb b/generator.rb index 98ffbe1..a66c30b 100644 --- a/generator.rb +++ b/generator.rb @@ -31,17 +31,21 @@ to_generate.each do |klass| # Boil down functions to {:function_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } } functions = _functions.reduce({}) do |memo, f| + before_serialization = %w{ }.include?(f.name.to_s) + memoize = %w{ prepare execute }.include?(f.name.to_s) memo[:"#{f.name}"] = { :return_type => f.return_type ? f.return_type.name.split('.')[-1] : "void", - :args => f.argument_types.map {|at| {:"_#{at.name.split('.')[-1].camelize(:lower)}" => at.name.split('.')[-1]} }.reduce({}){|m,o| m.merge(o)} + :args => f.argument_types.map {|at| {:"_#{at.name.split('.')[-1].camelize(:lower)}" => at.name.split('.')[-1]} }.reduce({}){|m,o| m.merge(o)}, + :before_serialization => before_serialization, + :memoize => memoize } memo end interface_name = klass.split(".")[-1] - # IBlah to Blah if IBlah - ruby_class_name = interface_name.starts_with?('I') ? interface_name[1..-1] : interface_name + # IBlah to ProxyBlah if IBlah + ruby_class_name = "Proxy#{interface_name.starts_with?('I') ? interface_name[1..-1] : interface_name}" java_class_name = "JRuby#{ruby_class_name}" diff --git a/java_proxy.erb b/java_proxy.erb index b078485..c154169 100644 --- a/java_proxy.erb +++ b/java_proxy.erb @@ -24,19 +24,20 @@ public class <%= java_class_name %> implements <%= interface_name %> { if (_fields.length > 0) { <%= params[:args].values[0] %>.declare(new Fields(_fields)); } else { - newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:function_args_and_types].values.join(', ') %>); + newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>); } <% elsif params[:before_serialization] %> - newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:function_args_and_types].values.join(', ') %>); + newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>); + <% elsif params[:memoize] %> + if(_proxy == null) { + _proxy = newProxy(_baseClassPath, _realClassName); + } + _proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>); <% else %> - _proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>) + _proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>); <% end %> } <% end %> - @Override - public Map getComponentConfiguration() { - newProxy(_baseClassPath, _realClassName).getComponentConfiguration(); - } private static <%= interface_name %> newProxy(final String baseClassPath, final String realClassName) { try { diff --git a/lib/red_storm/proxy/batch_spout.rb b/lib/red_storm/proxy/batch_spout.rb index 3bb71d6..21a4e9d 100644 --- a/lib/red_storm/proxy/batch_spout.rb +++ b/lib/red_storm/proxy/batch_spout.rb @@ -1,63 +1,71 @@ require 'java' - -java_import 'java.util.Map' - -java_import 'backtype.storm.task.TopologyContext' - java_import 'storm.trident.operation.TridentCollector' - -java_import 'backtype.storm.tuple.Fields' - +java_import 'backtype.storm.task.TopologyContext' java_import 'storm.trident.spout.IBatchSpout' - - +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'backtype.storm.tuple.Tuple' +java_import 'backtype.storm.tuple.Fields' +java_import 'backtype.storm.tuple.Values' +java_import 'java.util.Map' module Backtype java_import 'backtype.storm.Config' end java_package 'redstorm.proxy' +# the BatchSpout class is a proxy to the real batch spout to avoid having to deal with all the +# Java artifacts when creating a spout. +# +# The real batch spout class implementation must define these methods: +# - open(conf, context, collector) +# - emitBatch +# - getOutputFields +# - ack(batch_id) +# +# and optionnaly: +# - close +# + class BatchSpout java_implements IBatchSpout - java_signature 'IBatchSpout (String base_class_path, String real_class_name)' - def initialize(base_class_path, real_class_name) - @real = Object.module_eval(real_class_name).new + java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)' + def initialize(base_class_path, real_spout_class_name) + @real_spout = Object.module_eval(real_spout_class_name).new rescue NameError require base_class_path - @real = Object.module_eval(real_class_name).new + @real_spout = Object.module_eval(real_spout_class_name).new end java_signature 'void open(Map, TopologyContext)' - def open(_map, _topology_context) - @real.open(Map, TopologyContext) + def open(conf, context) + @real_spout.open(conf, context) end java_signature 'void close()' - def close() - @real.close() + def close + @real_spout.close if @real_spout.respond_to?(:close) + end + + java_signature 'void emitBatch(long, TridentCollector)' + def emitBatch(batch_id, collector) + @real_spout.emit_batch(batch_id, collector) end java_signature 'void ack(long)' - def ack(_long) - @real.ack(long) + def ack(batch_id) + @real_spout.ack(batch_id) end - java_signature 'void emit_batch(long, TridentCollector)' - def emit_batch(_long, _trident_collector) - @real.emit_batch(long, TridentCollector) + java_signature 'Fields getOutputFields()' + def getOutputFields + @real_spout.get_output_fields() end - java_signature 'Map get_component_configuration()' - def get_component_configuration() - @real.get_component_configuration() + java_signature 'Map getComponentConfiguration()' + def getComponentConfiguration + @real_spout.get_component_configuration end - java_signature 'Fields get_output_fields()' - def get_output_fields() - @real.get_output_fields() - end - - end diff --git a/lib/red_storm/proxy/proxy_function.rb b/lib/red_storm/proxy/proxy_function.rb new file mode 100644 index 0000000..d0e7944 --- /dev/null +++ b/lib/red_storm/proxy/proxy_function.rb @@ -0,0 +1,48 @@ +require 'java' + + +java_import 'storm.trident.tuple.TridentTuple' + +java_import 'storm.trident.operation.TridentCollector' + +java_import 'java.util.Map' + +java_import 'storm.trident.operation.TridentOperationContext' + +java_import 'storm.trident.operation.Function' + + +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + +class ProxyFunction + java_implements Function + + java_signature 'Function (String base_class_path, String real_class_name)' + def initialize(base_class_path, real_class_name) + @real = Object.module_eval(real_class_name).new + rescue NameError + require base_class_path + @real = Object.module_eval(real_class_name).new + end + + java_signature 'void execute(TridentTuple, TridentCollector)' + def execute(_trident_tuple, _trident_collector) + @real.execute(_trident_tuple, _trident_collector) + end + + java_signature 'void cleanup()' + def cleanup() + @real.cleanup() + end + + java_signature 'void prepare(Map, TridentOperationContext)' + def prepare(_map, _trident_operation_context) + @real.prepare(_map, _trident_operation_context) + end + + +end diff --git a/src/main/redstorm/storm/jruby/JRubyBatchSpout.java b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java index 855750a..9d8d61f 100644 --- a/src/main/redstorm/storm/jruby/JRubyBatchSpout.java +++ b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java @@ -1,75 +1,85 @@ package redstorm.storm.jruby; -import java.util.Map; -import backtype.storm.task.TopologyContext; import storm.trident.operation.TridentCollector; -import backtype.storm.tuple.Fields; +import backtype.storm.task.TopologyContext; import storm.trident.spout.IBatchSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Fields; +import java.util.Map; +/** + * the JRubyBatchSpout class is a simple proxy class to the actual spout implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization process when dispatching + * the spout to the workers. JRuby does not yet support serialization from Java + * (Java serialization call on a JRuby class). + * + * Note that the JRuby spout proxy class is instanciated in the open method which is called after + * deserialization at the worker and in both the declareOutputFields and isDistributed methods which + * are called once before serialization at topology creation. + */ public class JRubyBatchSpout implements IBatchSpout { - IBatchSpout _proxy; - String _realClassName; + IBatchSpout _proxySpout; + String _realSpoutClassName; String _baseClassPath; String[] _fields; - public JRubyBatchSpout(final String baseClassPath, final String realClassName, final String[] fields) { + /** + * create a new JRubyBatchSpout + * + * @param baseClassPath the topology/project base JRuby class file path + * @param realSpoutClassName the fully qualified JRuby spout implementation class name + */ + public JRubyBatchSpout(String baseClassPath, String realSpoutClassName, String[] fields) { _baseClassPath = baseClassPath; - _realClassName = realClassName; + _realSpoutClassName = realSpoutClassName; _fields = fields; } + @Override + public void open(final Map conf, final TopologyContext context) { + // create instance of the jruby proxy class here, after deserialization in the workers. + _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); + _proxySpout.open(conf, context); + } @Override - public void open(final Map _map, final TopologyContext _topologyContext) { - - _proxy = newProxy(_baseClassPath, _realClassName); - _proxy.open(_map, _topologyContext); - + public void emitBatch(final long batchId, final TridentCollector collector) { + _proxySpout.emitBatch(batchId, collector); } + @Override public void close() { - - _proxy.close() - + _proxySpout.close(); } @Override - public void ack(final long _long) { - - _proxy.ack(_long) - - } - - @Override - public void emitBatch(final long _long, final TridentCollector _tridentCollector) { - - _proxy.emitBatch(_long, _tridentCollector) - - } - - @Override - public Map getComponentConfiguration() { - - _proxy.getComponentConfiguration() - + public void ack(final long batchId) { + _proxySpout.ack(batchId); } @Override public Fields getOutputFields() { - - _proxy.getOutputFields() - + // getOutputFields is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getOutputFields(); } @Override public Map getComponentConfiguration() { - newProxy(_baseClassPath, _realClassName).getComponentConfiguration(); + // getComponentConfiguration is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getComponentConfiguration(); } - private static IBatchSpout newProxy(String baseClassPath, String realClassName) { + private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) { try { - redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realClassName); + redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName); return proxy; } catch (Exception e) { diff --git a/src/main/redstorm/storm/jruby/JRubyProxyFunction.java b/src/main/redstorm/storm/jruby/JRubyProxyFunction.java new file mode 100644 index 0000000..c6030ab --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyProxyFunction.java @@ -0,0 +1,59 @@ +package redstorm.storm.jruby; + +import storm.trident.tuple.TridentTuple; +import storm.trident.operation.TridentCollector; +import java.util.Map; +import storm.trident.operation.TridentOperationContext; +import storm.trident.operation.Function; + +public class JRubyProxyFunction implements Function { + Function _proxy; + String _realClassName; + String _baseClassPath; + String[] _fields; + + public JRubyProxyFunction(final String baseClassPath, final String realClassName, final String[] fields) { + _baseClassPath = baseClassPath; + _realClassName = realClassName; + _fields = fields; + } + + + @Override + public void execute(final TridentTuple _tridentTuple, final TridentCollector _tridentCollector) { + + if(_proxy == null) { + _proxy = newProxy(_baseClassPath, _realClassName); + } + _proxy.execute(_tridentTuple, _tridentCollector); + + } + + @Override + public void cleanup() { + + _proxy.cleanup(); + + } + + @Override + public void prepare(final Map _map, final TridentOperationContext _tridentOperationContext) { + + if(_proxy == null) { + _proxy = newProxy(_baseClassPath, _realClassName); + } + _proxy.prepare(_map, _tridentOperationContext); + + } + + + private static Function newProxy(final String baseClassPath, final String realClassName) { + try { + redstorm.proxy.ProxyFunction proxy = new redstorm.proxy.ProxyFunction(baseClassPath, realClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +}