diff --git a/lib/red_storm.rb b/lib/red_storm.rb index 4321e7e..5e5efe2 100644 --- a/lib/red_storm.rb +++ b/lib/red_storm.rb @@ -6,3 +6,4 @@ require 'red_storm/configuration' require 'red_storm/simple_bolt' require 'red_storm/simple_spout' require 'red_storm/simple_topology' +require 'red_storm/helpers' \ No newline at end of file diff --git a/lib/red_storm/helpers.rb b/lib/red_storm/helpers.rb new file mode 100644 index 0000000..75ff736 --- /dev/null +++ b/lib/red_storm/helpers.rb @@ -0,0 +1,26 @@ +# convience classes to change java method signature to ruby +module RedStorm + module TransactionalEmitter + + def emitBatch(tx, coordinatorMeta, collector) + emit_batch(tx, coordinatorMeta, collector) + end + + def cleanupBefore(txid) + cleanup_before(txid) if respond_to?(:cleanup_before) + end + + end + + module TransactionalCoordinator + + def initializeTransaction(txid, prevMetadata) + initialize_transaction(txid, prevMetadata) if respond_to?(:initialize_transaction) + end + + def isReady + respond_to?(:ready?) ? ready? : true + end + + end +end \ No newline at end of file diff --git a/lib/red_storm/proxy/batch_bolt.rb b/lib/red_storm/proxy/batch_bolt.rb new file mode 100644 index 0000000..99bfcf4 --- /dev/null +++ b/lib/red_storm/proxy/batch_bolt.rb @@ -0,0 +1,64 @@ +require 'java' + +java_import 'backtype.storm.coordination.BatchOutputCollector' +java_import 'backtype.storm.task.TopologyContext' +java_import 'backtype.storm.topology.IRichBolt' +java_import 'backtype.storm.coordination.IBatchBolt' +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'backtype.storm.tuple.Tuple' +java_import 'java.util.Map' +java_import 'org.apache.log4j.Logger' + +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + +# the Bolt class is a proxy to the real bolt to avoid having to deal with all the +# Java artifacts when creating a bolt. +# +# The real bolt class implementation must define these methods: +# - prepare(conf, context, collector) +# - execute(tuple) +# - declare_output_fields +# +# and optionnaly: +# - cleanup +# +class BatchBolt + java_implements IBatchBolt + + java_signature 'IBatchBolt (String base_class_path, String real_bolt_class_name)' + def initialize(base_class_path, real_bolt_class_name) + @real_bolt = Object.module_eval(real_bolt_class_name).new + rescue NameError + require base_class_path + @real_bolt = Object.module_eval(real_bolt_class_name).new + end + + java_signature 'void prepare(Map, TopologyContext, BatchOutputCollector, Object)' + def prepare(conf, context, collector, id) + @real_bolt.prepare(conf, context, collector, id) + end + + java_signature 'void execute(Tuple)' + def execute(tuple) + @real_bolt.execute(tuple) + end + + java_signature 'void finishBatch()' + def finishBatch + @real_bolt.finish_batch if @real_bolt.respond_to?(:finish_batch) + end + + java_signature 'void declareOutputFields(OutputFieldsDeclarer)' + def declareOutputFields(declarer) + @real_bolt.declare_output_fields(declarer) + end + + java_signature 'Map getComponentConfiguration()' + def getComponentConfiguration + @real_bolt.get_component_configuration + end +end diff --git a/lib/red_storm/proxy/batch_committer_bolt.rb b/lib/red_storm/proxy/batch_committer_bolt.rb new file mode 100644 index 0000000..35d122a --- /dev/null +++ b/lib/red_storm/proxy/batch_committer_bolt.rb @@ -0,0 +1,53 @@ +require 'java' + +java_import 'backtype.storm.coordination.BatchOutputCollector' +java_import 'backtype.storm.task.TopologyContext' +java_import 'backtype.storm.coordination.IBatchBolt' +java_import 'backtype.storm.transactional.ICommitter' +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'backtype.storm.tuple.Tuple' +java_import 'java.util.Map' +java_import 'org.apache.log4j.Logger' + +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + +class BatchCommitterBolt + java_implements 'ICommitter, IBatchBolt' + + java_signature 'IBatchCommitterBolt (String base_class_path, String real_bolt_class_name)' + def initialize(base_class_path, real_bolt_class_name) + @real_bolt = Object.module_eval(real_bolt_class_name).new + rescue NameError + require base_class_path + @real_bolt = Object.module_eval(real_bolt_class_name).new + end + + java_signature 'void prepare(Map, TopologyContext, BatchOutputCollector, Object)' + def prepare(conf, context, collector, id) + @real_bolt.prepare(conf, context, collector, id) + end + + java_signature 'void execute(Tuple)' + def execute(tuple) + @real_bolt.execute(tuple) + end + + java_signature 'void finishBatch()' + def finishBatch + @real_bolt.finish_batch if @real_bolt.respond_to?(:finish_batch) + end + + java_signature 'void declareOutputFields(OutputFieldsDeclarer)' + def declareOutputFields(declarer) + @real_bolt.declare_output_fields(declarer) + end + + java_signature 'Map getComponentConfiguration()' + def getComponentConfiguration + @real_bolt.get_component_configuration + end +end diff --git a/lib/red_storm/proxy/batch_spout.rb b/lib/red_storm/proxy/batch_spout.rb new file mode 100644 index 0000000..1899838 --- /dev/null +++ b/lib/red_storm/proxy/batch_spout.rb @@ -0,0 +1,59 @@ +require 'java' + +java_import 'backtype.storm.task.TopologyContext' +java_import 'storm.trident.operation.TridentCollector' +java_import 'storm.trident.spout.IBatchSpout' +java_import 'backtype.storm.tuple.Fields' +java_import 'java.util.Map' +java_import 'org.apache.log4j.Logger' +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + +# the Spout class is a proxy to the real spout to avoid having to deal with all the +# Java artifacts when creating a spout. + +class BatchSpout + java_implements IBatchSpout + + java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)' + def initialize(base_class_path, real_spout_class_name) + @real_spout = Object.module_eval(real_spout_class_name).new + rescue NameError + require base_class_path + @real_spout = Object.module_eval(real_spout_class_name).new + end + + java_signature 'void open(Map, TopologyContext)' + def open(conf, context) + @real_spout.open(conf, context) if @real_spout.respond_to?(:open) + end + + java_signature 'void emitBatch(long, TridentCollector)' + def emitBatch(batch_id, collector) + @real_spout.emit_batch(batch_id, collector) + end + + java_signature 'void close()' + def close + @real_spout.close if @real_spout.respond_to?(:close) + end + + java_signature 'void ack(long)' + def ack(batch_id) + @real_spout.ack(batch_id) if @real_spout.respond_to?(:ack) + end + + java_signature 'Fields getOutputFields()' + def getOutputFields() + @real_spout.get_output_fields + end + + java_signature 'Map getComponentConfiguration()' + def getComponentConfiguration + @real_spout.get_component_configuration + end + +end diff --git a/lib/red_storm/proxy/transactional_committer_spout.rb b/lib/red_storm/proxy/transactional_committer_spout.rb new file mode 100644 index 0000000..b0dae00 --- /dev/null +++ b/lib/red_storm/proxy/transactional_committer_spout.rb @@ -0,0 +1,47 @@ +require 'java' + +java_import 'backtype.storm.task.TopologyContext' +java_import 'backtype.storm.transactional.ITransactionalSpout' +java_import 'backtype.storm.transactional.ICommitterTransactionalSpout' +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'java.util.Map' +java_import 'org.apache.log4j.Logger' +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + + +class TransactionalCommitterSpout + java_implements 'ICommitterTransactionalSpout' + + java_signature 'ICommitterTransactionalSpout (String base_class_path, String real_spout_class_name)' + def initialize(base_class_path, real_spout_class_name) + @real_spout = Object.module_eval(real_spout_class_name).new + rescue NameError + require base_class_path + @real_spout = Object.module_eval(real_spout_class_name).new + end + + java_signature 'ICommitterTransactionalSpout.Emitter getEmitter(Map, TopologyContext)' + def getEmitter(conf, context) + @real_spout.get_emitter(conf, context) + end + + java_signature 'Coordinator getCoordinator(Map, TopologyContext)' + def getCoordinator(conf, context) + @real_spout.get_coordinator(conf, context) + end + + java_signature 'void declareOutputFields(OutputFieldsDeclarer)' + def declareOutputFields(declarer) + @real_spout.declare_output_fields(declarer) + end + + java_signature 'Map getComponentConfiguration()' + def getComponentConfiguration + @real_spout.get_component_configuration + end + +end \ No newline at end of file diff --git a/lib/red_storm/proxy/transactional_spout.rb b/lib/red_storm/proxy/transactional_spout.rb new file mode 100644 index 0000000..d0024c4 --- /dev/null +++ b/lib/red_storm/proxy/transactional_spout.rb @@ -0,0 +1,46 @@ +require 'java' + +java_import 'backtype.storm.task.TopologyContext' +java_import 'backtype.storm.transactional.ITransactionalSpout' +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'java.util.Map' +java_import 'org.apache.log4j.Logger' +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + + +class TransactionalSpout + java_implements 'ITransactionalSpout' + + java_signature 'ITransactionalSpout (String base_class_path, String real_spout_class_name)' + def initialize(base_class_path, real_spout_class_name) + @real_spout = Object.module_eval(real_spout_class_name).new + rescue NameError + require base_class_path + @real_spout = Object.module_eval(real_spout_class_name).new + end + + java_signature 'Emitter getEmitter(Map, TopologyContext)' + def getEmitter(conf, context) + @real_spout.get_emitter(conf, context) + end + + java_signature 'Coordinator getCoordinator(Map, TopologyContext)' + def getCoordinator(conf, context) + @real_spout.get_coordinator(conf, context) + end + + java_signature 'void declareOutputFields(OutputFieldsDeclarer)' + def declareOutputFields(declarer) + @real_spout.declare_output_fields(declarer) + end + + java_signature 'Map getComponentConfiguration()' + def getComponentConfiguration + @real_spout.get_component_configuration + end + +end \ No newline at end of file diff --git a/lib/red_storm/topology_launcher.rb b/lib/red_storm/topology_launcher.rb index 38c3930..2e13d9a 100644 --- a/lib/red_storm/topology_launcher.rb +++ b/lib/red_storm/topology_launcher.rb @@ -8,12 +8,19 @@ end java_import 'backtype.storm.LocalCluster' java_import 'backtype.storm.StormSubmitter' java_import 'backtype.storm.topology.TopologyBuilder' +java_import 'backtype.storm.coordination.BatchBoltExecutor' java_import 'backtype.storm.tuple.Fields' java_import 'backtype.storm.tuple.Tuple' java_import 'backtype.storm.tuple.Values' java_import 'redstorm.storm.jruby.JRubyBolt' java_import 'redstorm.storm.jruby.JRubySpout' +java_import 'redstorm.storm.jruby.JRubyBatchBolt' +java_import 'redstorm.storm.jruby.JRubyBatchCommitterBolt' +java_import 'redstorm.storm.jruby.JRubyBatchSpout' +java_import 'redstorm.storm.jruby.JRubyTransactionalSpout' +java_import 'redstorm.storm.jruby.JRubyTransactionalBolt' +java_import 'redstorm.storm.jruby.JRubyTransactionalCommitterBolt' java_package 'redstorm' diff --git a/src/main/MurmurHash.java b/src/main/MurmurHash.java new file mode 100644 index 0000000..1ead023 --- /dev/null +++ b/src/main/MurmurHash.java @@ -0,0 +1,95 @@ +// Takes the field specified in the FIELD_TO_HASH pararmeter and create 32-bit Murmur hash out of it in a field called HashedId +// Using Murmurhash from http://murmurhash.googlepages.com/ + +public final class MurmurHash { + + public static long hash64(final byte[] data, int length, int seed) { + final long m = 0xc6a4a7935bd1e995L; + final int r = 47; + + long h = (seed & 0xffffffffl) ^ (length * m); + + int length8 = length / 8; + + for (int i = 0; i < length8; i++) { + final int i8 = i * 8; + long k = ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8) + + (((long) data[i8 + 2] & 0xff) << 16) + (((long) data[i8 + 3] & 0xff) << 24) + + (((long) data[i8 + 4] & 0xff) << 32) + (((long) data[i8 + 5] & 0xff) << 40) + + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 + 7] & 0xff) << 56); + + k *= m; + k ^= k >>> r; + k *= m; + + h ^= k; + h *= m; + } + + switch (length % 8) { + case 7: + h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48; + case 6: + h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40; + case 5: + h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32; + case 4: + h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24; + case 3: + h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16; + case 2: + h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8; + case 1: + h ^= (long) (data[length & ~7] & 0xff); + h *= m; + } + ; + + h ^= h >>> r; + h *= m; + h ^= h >>> r; + + return h; + } + + /** + * Generates 64 bit hash from byte array with default seed value. + * + * @param data + * byte array to hash + * @param length + * length of the array to hash + * @return 64 bit hash of the given string + */ + public static long hash64(final byte[] data, int length) { + return hash64(data, length, 0xe17a1465); + } + + /** + * Generates 64 bit hash from a string. + * + * @param text + * string to hash + * @return 64 bit hash of the given string + */ + public static long hash64(final String text) { + final byte[] bytes = text.getBytes(); + return hash64(bytes, bytes.length); + } + + /** + * Generates 64 bit hash from a substring. + * + * @param text + * string to hash + * @param from + * starting index + * @param length + * length of the substring to hash + * @return 64 bit hash of the given array + */ + public static long hash64(final String text, int from, int length) { + return hash64(text.substring(from, from + length)); + } +} + diff --git a/src/main/redstorm/storm/jruby/JRubyBatchBolt.java b/src/main/redstorm/storm/jruby/JRubyBatchBolt.java new file mode 100644 index 0000000..5d39277 --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyBatchBolt.java @@ -0,0 +1,82 @@ +package redstorm.storm.jruby; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.base.BaseBatchBolt; +import backtype.storm.coordination.BatchOutputCollector; +import backtype.storm.coordination.IBatchBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import java.util.Map; + +/** + * the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization process when dispatching + * the bolts to the workers. JRuby does not yet support serialization from Java + * (Java serialization call on a JRuby class). + * + * Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after + * deserialization at the worker and in the declareOutputFields method which is called once before + * serialization at topology creation. + */ +public class JRubyBatchBolt extends BaseBatchBolt { + IBatchBolt _proxyBolt; + String _realBoltClassName; + String _baseClassPath; + /** + * create a new JRubyBolt + * + * @param baseClassPath the topology/project base JRuby class file path + * @param realBoltClassName the fully qualified JRuby bolt implementation class name + */ + public JRubyBatchBolt(String baseClassPath, String realBoltClassName) { + _baseClassPath = baseClassPath; + _realBoltClassName = realBoltClassName; + } + + @Override + public void prepare(final Map stormConf, final TopologyContext context, final BatchOutputCollector collector, final Object id) { + // create instance of the jruby class here, after deserialization in the workers. + _proxyBolt = newProxyBolt(_baseClassPath, _realBoltClassName); + _proxyBolt.prepare(stormConf, context, collector, id); + } + + @Override + public void execute(Tuple input) { + _proxyBolt.execute(input); + } + + @Override + public void finishBatch() { + _proxyBolt.finishBatch(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // declareOutputFields is executed in the topology creation time, before serialisation. + // do not set the _proxyBolt instance variable here to avoid JRuby serialization + // issues. Just create tmp bolt instance to call declareOutputFields. + IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); + bolt.declareOutputFields(declarer); + } + + @Override + public Map getComponentConfiguration() { + // getComponentConfiguration is executed in the topology creation time, before serialisation. + // do not set the _proxyBolt instance variable here to avoid JRuby serialization + // issues. Just create tmp bolt instance to call declareOutputFields. + IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); + return bolt.getComponentConfiguration(); + } + + + private static IBatchBolt newProxyBolt(String baseClassPath, String realBoltClassName) { + try { + redstorm.proxy.BatchBolt proxy = new redstorm.proxy.BatchBolt(baseClassPath, realBoltClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/redstorm/storm/jruby/JRubyBatchCommitterBolt.java b/src/main/redstorm/storm/jruby/JRubyBatchCommitterBolt.java new file mode 100644 index 0000000..dce6628 --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyBatchCommitterBolt.java @@ -0,0 +1,9 @@ +package redstorm.storm.jruby; + +import backtype.storm.transactional.ICommitter; + +public class JRubyBatchCommitterBolt extends JRubyBatchBolt implements ICommitter { + public JRubyBatchCommitterBolt(String baseClassPath, String realBoltClassName) { + super(baseClassPath, realBoltClassName); + } +} \ No newline at end of file diff --git a/src/main/redstorm/storm/jruby/JRubyBatchSpout.java b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java new file mode 100644 index 0000000..5fca722 --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java @@ -0,0 +1,88 @@ +package redstorm.storm.jruby; + +import backtype.storm.tuple.Fields; +import backtype.storm.task.TopologyContext; +import storm.trident.operation.TridentCollector; +import storm.trident.spout.IBatchSpout; +import java.util.Map; + +/** + * the JRubySpout class is a simple proxy class to the actual spout implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization process when dispatching + * the spout to the workers. JRuby does not yet support serialization from Java + * (Java serialization call on a JRuby class). + * + * Note that the JRuby spout proxy class is instanciated in the open method which is called after + * deserialization at the worker and in both the declareOutputFields and isDistributed methods which + * are called once before serialization at topology creation. + */ +public class JRubyBatchSpout implements IBatchSpout { + IBatchSpout _proxySpout; + String _realSpoutClassName; + String _baseClassPath; + + /** + * create a new JRubySpout + * + * @param baseClassPath the topology/project base JRuby class file path + * @param realSpoutClassName the fully qualified JRuby spout implementation class name + */ + public JRubyBatchSpout(String baseClassPath, String realSpoutClassName) { + _baseClassPath = baseClassPath; + _realSpoutClassName = realSpoutClassName; + } + + @Override + public void open(final Map conf, final TopologyContext context) { + // create instance of the jruby proxy class here, after deserialization in the workers. + _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); + _proxySpout.open(conf, context); + } + + @Override + public void emitBatch(long batchId, TridentCollector collector) { + _proxySpout.emitBatch(batchId, collector); + } + + @Override + public void close() { + _proxySpout.close(); + } + + @Override + public void ack(long batchId) { + _proxySpout.ack(batchId); + } + + @Override + public Fields getOutputFields() { + if (_proxySpout == null) { + // getOutputFields is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call getOutputFields. + IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getOutputFields(); + } else { + return _proxySpout.getOutputFields(); + } + } + + @Override + public Map getComponentConfiguration() { + // getComponentConfiguration is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getComponentConfiguration(); + } + + private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) { + try { + redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/redstorm/storm/jruby/JRubyTransactionalBolt.java b/src/main/redstorm/storm/jruby/JRubyTransactionalBolt.java new file mode 100644 index 0000000..2f58f97 --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyTransactionalBolt.java @@ -0,0 +1,82 @@ +package redstorm.storm.jruby; + +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.transactional.TransactionAttempt; +import backtype.storm.topology.base.BaseTransactionalBolt; +import backtype.storm.coordination.BatchOutputCollector; +import backtype.storm.coordination.IBatchBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import java.util.Map; + +/** + * the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization process when dispatching + * the bolts to the workers. JRuby does not yet support serialization from Java + * (Java serialization call on a JRuby class). + * + * Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after + * deserialization at the worker and in the declareOutputFields method which is called once before + * serialization at topology creation. + */ +public class JRubyTransactionalBolt extends BaseTransactionalBolt { + IBatchBolt _proxyBolt; + String _realBoltClassName; + String _baseClassPath; + /** + * create a new JRubyBolt + * + * @param baseClassPath the topology/project base JRuby class file path + * @param realBoltClassName the fully qualified JRuby bolt implementation class name + */ + public JRubyTransactionalBolt(String baseClassPath, String realBoltClassName) { + _baseClassPath = baseClassPath; + _realBoltClassName = realBoltClassName; + } + + @Override + public void prepare(final Map stormConf, final TopologyContext context, final BatchOutputCollector collector, final TransactionAttempt id) { + // create instance of the jruby class here, after deserialization in the workers. + _proxyBolt = newProxyBolt(_baseClassPath, _realBoltClassName); + _proxyBolt.prepare(stormConf, context, collector, id); + } + + @Override + public void execute(Tuple input) { + _proxyBolt.execute(input); + } + + @Override + public void finishBatch() { + _proxyBolt.finishBatch(); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // declareOutputFields is executed in the topology creation time, before serialisation. + // do not set the _proxyBolt instance variable here to avoid JRuby serialization + // issues. Just create tmp bolt instance to call declareOutputFields. + IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); + bolt.declareOutputFields(declarer); + } + + @Override + public Map getComponentConfiguration() { + // getComponentConfiguration is executed in the topology creation time, before serialisation. + // do not set the _proxyBolt instance variable here to avoid JRuby serialization + // issues. Just create tmp bolt instance to call declareOutputFields. + IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); + return bolt.getComponentConfiguration(); + } + + private static IBatchBolt newProxyBolt(String baseClassPath, String realBoltClassName) { + try { + redstorm.proxy.BatchBolt proxy = new redstorm.proxy.BatchBolt(baseClassPath, realBoltClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterBolt.java b/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterBolt.java new file mode 100644 index 0000000..88c9684 --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterBolt.java @@ -0,0 +1,31 @@ +package redstorm.storm.jruby; + +import backtype.storm.coordination.IBatchBolt; +import backtype.storm.transactional.TransactionAttempt; +import backtype.storm.transactional.ICommitter; + +/** + * the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization process when dispatching + * the bolts to the workers. JRuby does not yet support serialization from Java + * (Java serialization call on a JRuby class). + * + * Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after + * deserialization at the worker and in the declareOutputFields method which is called once before + * serialization at topology creation. + */ +public class JRubyTransactionalCommitterBolt extends JRubyTransactionalBolt implements ICommitter { + public JRubyTransactionalCommitterBolt(String baseClassPath, String realBoltClassName) { + super(baseClassPath, realBoltClassName); + } + + private static IBatchBolt newProxyBolt(String baseClassPath, String realBoltClassName) { + try { + redstorm.proxy.BatchCommitterBolt proxy = new redstorm.proxy.BatchCommitterBolt(baseClassPath, realBoltClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterSpout.java b/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterSpout.java new file mode 100644 index 0000000..076b6f8 --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterSpout.java @@ -0,0 +1,44 @@ +package redstorm.storm.jruby; + +import backtype.storm.transactional.ICommitterTransactionalSpout; +import backtype.storm.transactional.ITransactionalSpout; +import backtype.storm.task.TopologyContext; +import java.util.Map; + +/** + * the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization process when dispatching + * the bolts to the workers. JRuby does not yet support serialization from Java + * (Java serialization call on a JRuby class). + * + * Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after + * deserialization at the worker and in the declareOutputFields method which is called once before + * serialization at topology creation. + */ +public class JRubyTransactionalCommitterSpout extends JRubyTransactionalSpout implements ICommitterTransactionalSpout { + + ICommitterTransactionalSpout _proxySpout; + + public JRubyTransactionalCommitterSpout(String baseClassPath, String realSpoutClassName) { + super(baseClassPath, realSpoutClassName); + } + + @Override + public ICommitterTransactionalSpout.Emitter getEmitter(Map conf, TopologyContext context) { + // create instance of the jruby class here, after deserialization in the workers. + if (_proxySpout == null) { + _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); + } + return _proxySpout.getEmitter(conf, context); + } + + private static ICommitterTransactionalSpout newProxySpout(String baseClassPath, String realSpoutClassName) { + try { + redstorm.proxy.TransactionalCommitterSpout proxy = new redstorm.proxy.TransactionalCommitterSpout(baseClassPath, realSpoutClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/src/main/redstorm/storm/jruby/JRubyTransactionalSpout.java b/src/main/redstorm/storm/jruby/JRubyTransactionalSpout.java new file mode 100644 index 0000000..f2550ec --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyTransactionalSpout.java @@ -0,0 +1,84 @@ +package redstorm.storm.jruby; + +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; + +import backtype.storm.topology.base.BaseTransactionalSpout; +import backtype.storm.transactional.ITransactionalSpout; + +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import java.util.Map; + +/** + * the JRubySpout class is a simple proxy class to the actual spout implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization process when dispatching + * the spout to the workers. JRuby does not yet support serialization from Java + * (Java serialization call on a JRuby class). + * + * Note that the JRuby spout proxy class is instanciated in the open method which is called after + * deserialization at the worker and in both the declareOutputFields and isDistributed methods which + * are called once before serialization at topology creation. + */ +public class JRubyTransactionalSpout extends BaseTransactionalSpout { + ITransactionalSpout _proxySpout; + String _realSpoutClassName; + String _baseClassPath; + + /** + * create a new JRubySpout + * + * @param baseClassPath the topology/project base JRuby class file path + * @param realSpoutClassName the fully qualified JRuby spout implementation class name + */ + public JRubyTransactionalSpout(String baseClassPath, String realSpoutClassName) { + _baseClassPath = baseClassPath; + _realSpoutClassName = realSpoutClassName; + } + + @Override + public Coordinator getCoordinator(Map conf, TopologyContext context) { + // create instance of the jruby class here, after deserialization in the workers. + if (_proxySpout == null) { + _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); + } + return _proxySpout.getCoordinator(conf, context); + } + + @Override + public Emitter getEmitter(Map conf, TopologyContext context) { + // create instance of the jruby class here, after deserialization in the workers. + if (_proxySpout == null) { + _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); + } + return _proxySpout.getEmitter(conf, context); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // declareOutputFields is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + ITransactionalSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + spout.declareOutputFields(declarer); + } + + @Override + public Map getComponentConfiguration() { + // getComponentConfiguration is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + ITransactionalSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getComponentConfiguration(); + } + + private static ITransactionalSpout newProxySpout(String baseClassPath, String realSpoutClassName) { + try { + redstorm.proxy.TransactionalSpout proxy = new redstorm.proxy.TransactionalSpout(baseClassPath, realSpoutClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +}