diff --git a/lib/red_storm/proxy/batch_spout.rb b/lib/red_storm/proxy/batch_spout.rb new file mode 100644 index 0000000..21a4e9d --- /dev/null +++ b/lib/red_storm/proxy/batch_spout.rb @@ -0,0 +1,71 @@ +require 'java' + +java_import 'storm.trident.operation.TridentCollector' +java_import 'backtype.storm.task.TopologyContext' +java_import 'storm.trident.spout.IBatchSpout' +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'backtype.storm.tuple.Tuple' +java_import 'backtype.storm.tuple.Fields' +java_import 'backtype.storm.tuple.Values' +java_import 'java.util.Map' +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + +# the BatchSpout class is a proxy to the real batch spout to avoid having to deal with all the +# Java artifacts when creating a spout. +# +# The real batch spout class implementation must define these methods: +# - open(conf, context, collector) +# - emitBatch +# - getOutputFields +# - ack(batch_id) +# +# and optionnaly: +# - close +# + +class BatchSpout + java_implements IBatchSpout + + java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)' + def initialize(base_class_path, real_spout_class_name) + @real_spout = Object.module_eval(real_spout_class_name).new + rescue NameError + require base_class_path + @real_spout = Object.module_eval(real_spout_class_name).new + end + + java_signature 'void open(Map, TopologyContext)' + def open(conf, context) + @real_spout.open(conf, context) + end + + java_signature 'void close()' + def close + @real_spout.close if @real_spout.respond_to?(:close) + end + + java_signature 'void emitBatch(long, TridentCollector)' + def emitBatch(batch_id, collector) + @real_spout.emit_batch(batch_id, collector) + end + + java_signature 'void ack(long)' + def ack(batch_id) + @real_spout.ack(batch_id) + end + + java_signature 'Fields getOutputFields()' + def getOutputFields + @real_spout.get_output_fields() + end + + java_signature 'Map getComponentConfiguration()' + def getComponentConfiguration + @real_spout.get_component_configuration + end + +end diff --git a/src/main/redstorm/storm/jruby/JRubyBatchSpout.java b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java new file mode 100644 index 0000000..9d8d61f --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java @@ -0,0 +1,89 @@ +package redstorm.storm.jruby; + +import storm.trident.operation.TridentCollector; +import backtype.storm.task.TopologyContext; +import storm.trident.spout.IBatchSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Fields; +import java.util.Map; + +/** + * the JRubyBatchSpout class is a simple proxy class to the actual spout implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization process when dispatching + * the spout to the workers. JRuby does not yet support serialization from Java + * (Java serialization call on a JRuby class). + * + * Note that the JRuby spout proxy class is instanciated in the open method which is called after + * deserialization at the worker and in both the declareOutputFields and isDistributed methods which + * are called once before serialization at topology creation. + */ +public class JRubyBatchSpout implements IBatchSpout { + IBatchSpout _proxySpout; + String _realSpoutClassName; + String _baseClassPath; + String[] _fields; + + /** + * create a new JRubyBatchSpout + * + * @param baseClassPath the topology/project base JRuby class file path + * @param realSpoutClassName the fully qualified JRuby spout implementation class name + */ + public JRubyBatchSpout(String baseClassPath, String realSpoutClassName, String[] fields) { + _baseClassPath = baseClassPath; + _realSpoutClassName = realSpoutClassName; + _fields = fields; + } + + @Override + public void open(final Map conf, final TopologyContext context) { + // create instance of the jruby proxy class here, after deserialization in the workers. + _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); + _proxySpout.open(conf, context); + } + + @Override + public void emitBatch(final long batchId, final TridentCollector collector) { + _proxySpout.emitBatch(batchId, collector); + } + + + @Override + public void close() { + _proxySpout.close(); + } + + @Override + public void ack(final long batchId) { + _proxySpout.ack(batchId); + } + + @Override + public Fields getOutputFields() { + // getOutputFields is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getOutputFields(); + } + + @Override + public Map getComponentConfiguration() { + // getComponentConfiguration is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getComponentConfiguration(); + } + + private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) { + try { + redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +}