diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8942e7f --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +storm/lib/ +storm/classes/ diff --git a/README.md b/README.md index 1172f5a..8a33298 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,9 @@ This has been tested on OSX 10.6.8 using Storm 0.5.3 and JRuby 1.6.4 - edit Rakefile to ajust your `JRUBY_JAR` ``` sh -$ ./lein deps -rake build -rake storm class=storm.starter.RubyExclamationTopology +$ rake deps +$ rake build +$ rake storm class=RubyExclamationTopology +$ rake storm class=RubyExclamationTopology2 +$ rake storm class=RubyWordCountTopology ``` diff --git a/Rakefile b/Rakefile index 5e62e82..3bd0b9e 100644 --- a/Rakefile +++ b/Rakefile @@ -1,23 +1,24 @@ -require 'ant' - -PROJECT_NAME = 'storm-jruby' - -JAVA_SRC_DIR = 'src/jvm' -JRUBY_SRC_DIR = 'src/jruby' - -RUNTIME_LIB_DIR = 'lib' -DEV_LIB_DIR = 'lib/dev' +require 'ant' + JRUBY_JAR = '/Users/colin/.rvm/rubies/jruby-1.6.4/lib/jruby.jar' - -BUILD_DIR = 'build' -CLASSES_DIR = "classes" + +STORM_DIR = './storm' +JAVA_SRC_DIR = "#{STORM_DIR}/src/jvm" +EXAMPLES_SRC_DIR = "./examples" +JRUBY_SRC_DIR = "./lib/red_storm" +RUNTIME_LIB_DIR = "#{STORM_DIR}/lib" +DEV_LIB_DIR = "#{STORM_DIR}/lib/dev" +CLASSES_DIR = "#{STORM_DIR}/classes" task :default => [:clean, :build] -task :clean do - ant.delete :dir => BUILD_DIR - puts -end +task :clean_all => :clean do + ant.delete :dir => RUNTIME_LIB_DIR +end + +task :clean do + ant.delete :dir => CLASSES_DIR +end task :setup do ant.mkdir :dir => CLASSES_DIR @@ -28,29 +29,40 @@ task :setup do end end -task :build => :setup do - build_java "#{JAVA_SRC_DIR}/backtype/storm/jruby" - build_jruby "#{JRUBY_SRC_DIR}/storm/starter" -end - -def build_java(source_folder) - ant.javac :srcdir => source_folder, :destdir => CLASSES_DIR, :classpathref => 'classpath', - :source => "1.6", :target => "1.6", :debug => "yes", :includeantruntime => "no" - puts -end - -def build_jruby(source_folder) - puts("compiling jruby") - exec("jrubyc -t #{CLASSES_DIR} --javac -c \"#{DEV_LIB_DIR}/storm-0.5.3.jar\" -c \"#{CLASSES_DIR}\" #{JRUBY_SRC_DIR}") -end +task :deps do + system("cd #{STORM_DIR}; ./lein deps") +end +task :build => :setup do + build_jruby("#{JRUBY_SRC_DIR}") + build_java("#{JAVA_SRC_DIR}/backtype/storm/jruby") + build_jruby("#{EXAMPLES_SRC_DIR}") +end task :storm do unless ENV['class'] puts("usage: rake storm class={fully qualified java class name}") exit(1) end - exec("java -cp \"./#{CLASSES_DIR}:./#{RUNTIME_LIB_DIR}/*:./#{DEV_LIB_DIR}/*:#{JRUBY_JAR}\" #{ENV['class']}") + system("java -cp \"./#{CLASSES_DIR}:./#{RUNTIME_LIB_DIR}/*:./#{DEV_LIB_DIR}/*:#{JRUBY_JAR}\" #{ENV['class']}") end +def build_java(source_folder) + puts("\n--> Building Java:") + ant.javac( + :srcdir => source_folder, + :destdir => CLASSES_DIR, + :classpathref => 'classpath', + :source => "1.6", + :target => "1.6", + :debug => "yes", + :includeantruntime => "no", + :verbose => false, + :listfiles => true + ) +end +def build_jruby(source_folder) + puts("\n--> Building JRuby #{source_folder}") + system("jrubyc -t #{CLASSES_DIR} --verbose --javac -c \"#{DEV_LIB_DIR}/storm-0.5.3.jar\" -c \"#{CLASSES_DIR}\" #{source_folder}") +end diff --git a/examples/ruby_exclamation_bolt.rb b/examples/ruby_exclamation_bolt.rb new file mode 100644 index 0000000..1c483a9 --- /dev/null +++ b/examples/ruby_exclamation_bolt.rb @@ -0,0 +1,17 @@ + +class RubyExclamationBolt + + def prepare(conf, context, collector) + @collector = collector + end + + def execute(tuple) + @collector.emit(tuple, Values.new(tuple.getString(0) + "!!!")) + @collector.ack(tuple) + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new("word")) + end + +end diff --git a/src/jruby/storm/starter/ruby_exclamation_topology.rb b/examples/ruby_exclamation_topology.rb similarity index 78% rename from src/jruby/storm/starter/ruby_exclamation_topology.rb rename to examples/ruby_exclamation_topology.rb index 139eac0..102e5dd 100644 --- a/src/jruby/storm/starter/ruby_exclamation_topology.rb +++ b/examples/ruby_exclamation_topology.rb @@ -15,9 +15,8 @@ java_import 'backtype.storm.utils.Utils' java_import 'java.util.Map' java_import 'backtype.storm.jruby.JRubyBolt' -java_import 'storm.starter.RubyExclamationBolt' -java_package 'storm.starter' +require 'examples/ruby_exclamation_bolt' class RubyExclamationTopology @@ -26,15 +25,15 @@ class RubyExclamationTopology builder = TopologyBuilder.new builder.setSpout(1, TestWordSpout.new, 10) - builder.setBolt(2, JRubyBolt.new("storm.starter.RubyExclamationBolt"), 3).shuffleGrouping(1) - builder.setBolt(3, JRubyBolt.new("storm.starter.RubyExclamationBolt"), 2).shuffleGrouping(2) + builder.setBolt(2, JRubyBolt.new("RubyExclamationBolt"), 3).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new("RubyExclamationBolt"), 2).shuffleGrouping(2) conf = Config.new conf.setDebug(true) cluster = LocalCluster.new cluster.submitTopology("test", conf, builder.createTopology) - Utils.sleep(10000) + sleep(5) cluster.killTopology("test") cluster.shutdown end diff --git a/examples/ruby_exclamation_topology2.rb b/examples/ruby_exclamation_topology2.rb new file mode 100644 index 0000000..ec79bd8 --- /dev/null +++ b/examples/ruby_exclamation_topology2.rb @@ -0,0 +1,56 @@ +require 'java' + +java_import 'backtype.storm.Config' +java_import 'backtype.storm.LocalCluster' +java_import 'backtype.storm.task.OutputCollector' +java_import 'backtype.storm.task.TopologyContext' +java_import 'backtype.storm.testing.TestWordSpout' +java_import 'backtype.storm.topology.IRichBolt' +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'backtype.storm.topology.TopologyBuilder' +java_import 'backtype.storm.tuple.Fields' +java_import 'backtype.storm.tuple.Tuple' +java_import 'backtype.storm.tuple.Values' +java_import 'backtype.storm.utils.Utils' +java_import 'java.util.Map' + +java_import 'backtype.storm.jruby.JRubyBolt' + +class RubyExclamationBolt2 + + def prepare(conf, context, collector) + @collector = collector + end + + def execute(tuple) + @collector.emit(tuple, Values.new(tuple.getString(0) + "!!!")) + @collector.ack(tuple) + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new("word")) + end + +end + +class RubyExclamationTopology2 + + java_signature 'void main(String[])' + def self.main(args) + builder = TopologyBuilder.new + + builder.setSpout(1, TestWordSpout.new, 10) + builder.setBolt(2, JRubyBolt.new("RubyExclamationBolt2"), 3).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new("RubyExclamationBolt2"), 2).shuffleGrouping(2) + + conf = Config.new + conf.setDebug(true) + + cluster = LocalCluster.new + cluster.submitTopology("test", conf, builder.createTopology) + sleep(5) + cluster.killTopology("test") + cluster.shutdown + end + +end diff --git a/examples/ruby_word_count_topology.rb b/examples/ruby_word_count_topology.rb new file mode 100644 index 0000000..4e486b8 --- /dev/null +++ b/examples/ruby_word_count_topology.rb @@ -0,0 +1,102 @@ +require 'java' + +java_import 'backtype.storm.Config' +java_import 'backtype.storm.LocalCluster' +java_import 'backtype.storm.task.OutputCollector' +java_import 'backtype.storm.task.TopologyContext' +java_import 'backtype.storm.testing.TestWordSpout' +java_import 'backtype.storm.topology.IRichBolt' +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'backtype.storm.topology.TopologyBuilder' +java_import 'backtype.storm.tuple.Fields' +java_import 'backtype.storm.tuple.Tuple' +java_import 'backtype.storm.tuple.Values' +java_import 'backtype.storm.utils.Utils' +java_import 'java.util.Map' + +java_import 'backtype.storm.jruby.JRubyBolt' +java_import 'backtype.storm.jruby.JRubySpout' + + +class RubyRandomSentenceSpout + def initialize + @sentences = [ + "the cow jumped over the moon", + "an apple a day keeps the doctor away", + "four score and seven years ago", + "snow white and the seven dwarfs", + "i am at two with nature" + ] + end + + def is_distributed + true + end + + def open(conf, context, collector) + @collector = collector + end + + def next_tuple + @collector.emit(Values.new(@sentences[rand(@sentences.length)])) + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new("word")) + end +end + +class RubySplitSentence + def prepare(conf, context, collector) + @collector = collector + end + + def execute(tuple) + tuple.getString(0).split(" ").each {|w| @collector.emit(Values.new(w)) } + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new("word")) + end +end + +class RubyWordCount + def initialize + @counts = Hash.new{|h, k| h[k] = 0} + end + + def prepare(conf, context, collector) + @collector = collector + end + + def execute(tuple) + word = tuple.getString(0) + @counts[word] += 1 + @collector.emit(Values.new(word, @counts[word])) + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new("word", "count")) + end +end + + +class RubyWordCountTopology + + java_signature 'void main(String[])' + def self.main(args) + builder = TopologyBuilder.new + builder.setSpout(1, JRubySpout.new("RubyRandomSentenceSpout"), 5) + builder.setBolt(2, JRubyBolt.new("RubySplitSentence"), 8).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new("RubyWordCount"), 12).fieldsGrouping(2, Fields.new("word")) + + conf = Config.new + conf.setDebug(true) + conf.setMaxTaskParallelism(3) + + cluster = LocalCluster.new + cluster.submitTopology("word-count", conf, builder.createTopology) + sleep(5) + cluster.shutdown + end +end \ No newline at end of file diff --git a/lib/red_storm.rb b/lib/red_storm.rb new file mode 100644 index 0000000..c3ebc96 --- /dev/null +++ b/lib/red_storm.rb @@ -0,0 +1,2 @@ +require 'lib/red_storm/proxy/bolt' +require 'lib/red_storm/proxy/spout' diff --git a/lib/red_storm/proxy/bolt.rb b/lib/red_storm/proxy/bolt.rb new file mode 100644 index 0000000..8629ba2 --- /dev/null +++ b/lib/red_storm/proxy/bolt.rb @@ -0,0 +1,43 @@ +require 'java' + +java_import 'backtype.storm.task.OutputCollector' +java_import 'backtype.storm.task.TopologyContext' +java_import 'backtype.storm.topology.IRichBolt' +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'backtype.storm.tuple.Fields' +java_import 'backtype.storm.tuple.Tuple' +java_import 'backtype.storm.tuple.Values' +java_import 'java.util.Map' + +java_package 'redstorm.proxy' + +class Bolt + java_implements IRichBolt + + java_signature 'IRichBolt (String real_bolt_class_name)' + def initialize(real_bolt_class_name) + @real_bolt_class_name = real_bolt_class_name + end + + java_signature 'void prepare(Map, TopologyContext, OutputCollector)' + def prepare(conf, context, collector) + @real_bolt = Object.module_eval(@real_bolt_class_name).new + @real_bolt.prepare(conf, context, collector) + end + + java_signature 'void execute(Tuple)' + def execute(tuple) + @real_bolt.execute(tuple) + end + + java_signature 'void cleanup()' + def cleanup + @real_bolt.cleanup if @real_bolt.respond_to?(:cleanup) + end + + java_signature 'void declareOutputFields(OutputFieldsDeclarer)' + def declareOutputFields(declarer) + Object.module_eval(@real_bolt_class_name).new.declare_output_fields(declarer) + end + +end diff --git a/lib/red_storm/proxy/spout.rb b/lib/red_storm/proxy/spout.rb new file mode 100644 index 0000000..10c0d67 --- /dev/null +++ b/lib/red_storm/proxy/spout.rb @@ -0,0 +1,58 @@ +require 'java' + +java_import 'backtype.storm.spout.SpoutOutputCollector' +java_import 'backtype.storm.task.TopologyContext' +java_import 'backtype.storm.topology.IRichSpout' +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'backtype.storm.tuple.Fields' +java_import 'backtype.storm.tuple.Tuple' +java_import 'backtype.storm.tuple.Values' +java_import 'java.util.Map' + +java_package 'redstorm.proxy' + +class Spout + java_implements IRichSpout + + java_signature 'IRichSpout (String real_spout_class_name)' + def initialize(real_spout_class_name) + @real_spout_class_name = real_spout_class_name + end + + java_signature 'boolean isDistributed()' + def isDistributed + Object.module_eval(@real_spout_class_name).new.is_distributed + end + + java_signature 'void open(Map, TopologyContext, SpoutOutputCollector)' + def open(conf, context, collector) + @real_spout = Object.module_eval(@real_spout_class_name).new + @real_spout.open(conf, context, collector) + end + + java_signature 'void close()' + def close + @real_spout.close if @real_spout.respond_to?(:close) + end + + java_signature 'void nextTuple()' + def nextTuple + @real_spout.next_tuple + end + + java_signature 'void ack(Object)' + def ack(msgId) + @real_spout.ack(msgId) if @real_spout.respond_to?(:close) + end + + java_signature 'void fail(Object)' + def fail(msgId) + @real_spout.fail(msgId) if @real_spout.respond_to?(:close) + end + + java_signature 'void declareOutputFields(OutputFieldsDeclarer)' + def declareOutputFields(declarer) + Object.module_eval(@real_spout_class_name).new.declare_output_fields(declarer) + end + +end diff --git a/src/jruby/storm/starter/bolt/ruby_exclamation_bolt.rb b/src/jruby/storm/starter/bolt/ruby_exclamation_bolt.rb deleted file mode 100644 index 41161be..0000000 --- a/src/jruby/storm/starter/bolt/ruby_exclamation_bolt.rb +++ /dev/null @@ -1,37 +0,0 @@ -require 'java' - -java_import 'backtype.storm.task.OutputCollector' -java_import 'backtype.storm.task.TopologyContext' -java_import 'backtype.storm.topology.IRichBolt' -java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'backtype.storm.tuple.Fields' -java_import 'backtype.storm.tuple.Tuple' -java_import 'backtype.storm.tuple.Values' -java_import 'java.util.Map' - -java_package 'storm.starter' - -class RubyExclamationBolt - java_implements IRichBolt - - java_signature 'void prepare(Map, TopologyContext, OutputCollector)' - def prepare(conf, context, collector) - @collector = collector - end - - java_signature 'void execute(Tuple)' - def execute(tuple) - @collector.emit(tuple, Values.new(tuple.getString(0) + "!!!")) - @collector.ack(tuple) - end - - java_signature 'void cleanup()' - def cleanup - end - - java_signature 'void declareOutputFields(OutputFieldsDeclarer)' - def declareOutputFields(declarer) - declarer.declare(Fields.new("word")) - end - - end diff --git a/lein b/storm/lein similarity index 100% rename from lein rename to storm/lein diff --git a/project.clj b/storm/project.clj similarity index 89% rename from project.clj rename to storm/project.clj index 59716db..c18a61b 100644 --- a/project.clj +++ b/storm/project.clj @@ -1,6 +1,9 @@ (defproject storm-jruby "0.0.1" :source-path "src/clj" :java-source-path "src/jvm" + :library-path "lib/" + :dev-library-path "lib/dev/" + :compile-path "build/classes" :javac-options {:debug "true" :fork "true"} :resources-path "multilang" :aot :all diff --git a/src/jvm/backtype/storm/jruby/JRubyBolt.java b/storm/src/jvm/backtype/storm/jruby/JRubyBolt.java similarity index 60% rename from src/jvm/backtype/storm/jruby/JRubyBolt.java rename to storm/src/jvm/backtype/storm/jruby/JRubyBolt.java index f29288a..9e01e67 100644 --- a/src/jvm/backtype/storm/jruby/JRubyBolt.java +++ b/storm/src/jvm/backtype/storm/jruby/JRubyBolt.java @@ -19,48 +19,50 @@ import java.util.Map; * post-deserialization at the workers. */ public class JRubyBolt implements IRichBolt { - IRichBolt _bolt; - String _jrubyClassName; + IRichBolt _proxyBolt; + String _realBoltClassName; /** * create a new JRubyBolt * - * @param jrubyClassName the fully qualified JRuby bolt implementation class name + * @param realBoltClassName the fully qualified JRuby bolt implementation class name */ - public JRubyBolt(String jrubyClassName) { - // create instance of the jruby class so its available for declareOutputFields - // which gets executed in the topology creation time, before the prepare method - _jrubyClassName = jrubyClassName; - _bolt = realJRubyBolt(_jrubyClassName); + public JRubyBolt(String realBoltClassName) { + _realBoltClassName = realBoltClassName; } @Override public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) { // create instance of the jruby class here, after deserialization for the workers. - _bolt = realJRubyBolt(_jrubyClassName); - _bolt.prepare(stormConf, context, collector); + _proxyBolt = newProxyBolt(_realBoltClassName); + _proxyBolt.prepare(stormConf, context, collector); } @Override public void execute(Tuple input) { - _bolt.execute(input); + _proxyBolt.execute(input); } @Override public void cleanup() { - _bolt.cleanup(); + _proxyBolt.cleanup(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - _bolt.declareOutputFields(declarer); + // declareOutputFields is executed in the topology creation time, before the prepare method at + // pre serialisation. do not set the _proxyBolt instance variable here to avoid JRuby serialization + // issues. Just create tmp bolt instance to call declareOutputFields. + IRichBolt bolt = newProxyBolt(_realBoltClassName); + bolt.declareOutputFields(declarer); } - private static IRichBolt realJRubyBolt(String realClassName) { + private static IRichBolt newProxyBolt(String realBoltClassName) { try { - Class clazz = Class.forName(realClassName); - return (IRichBolt)clazz.newInstance(); - } catch (Exception e) { + redstorm.proxy.Bolt proxy = new redstorm.proxy.Bolt(realBoltClassName); + return proxy; + } + catch (Exception e) { throw new RuntimeException(e); } } diff --git a/storm/src/jvm/backtype/storm/jruby/JRubySpout.java b/storm/src/jvm/backtype/storm/jruby/JRubySpout.java new file mode 100644 index 0000000..953aee8 --- /dev/null +++ b/storm/src/jvm/backtype/storm/jruby/JRubySpout.java @@ -0,0 +1,86 @@ +package backtype.storm.jruby; + +import backtype.storm.generated.StreamInfo; +import backtype.storm.spout.ISpout; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.IRichSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Fields; +import backtype.storm.utils.Utils; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * the JRubySpout class is a simple proxy class to the actual spout implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization process when dispatching + * the spout to the workers. JRuby does not yet support serialization from Java + * (Java serialization call on a JRuby class). + * + * Note that the JRuby spout class is instanciated twice, in the constructor and in the open + * method. The constructor instance is required to support the declareOutputFields at topology + * creation while the open instance is required for the actual spout execution, + * post-deserialization at the workers. + */ +public class JRubySpout implements IRichSpout { + IRichSpout _proxySpout; + String _realSpoutClassName; + + /** + * create a new JRubySpout + * + * @param jrubyClassName the fully qualified JRuby spout implementation class name + */ + public JRubySpout(String realSpoutClassName) { + _realSpoutClassName = realSpoutClassName; + } + + @Override + public boolean isDistributed() { + IRichSpout spout = newProxySpout(_realSpoutClassName); + return spout.isDistributed(); + } + + @Override + public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) { + _proxySpout = newProxySpout(_realSpoutClassName); + _proxySpout.open(conf, context, collector); + } + + @Override + public void close() { + _proxySpout.close(); + } + + @Override + public void nextTuple() { + _proxySpout.nextTuple(); + } + + @Override + public void ack(Object msgId) { + _proxySpout.ack(msgId); + } + + @Override + public void fail(Object msgId) { + _proxySpout.fail(msgId); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + IRichSpout spout = newProxySpout(_realSpoutClassName); + spout.declareOutputFields(declarer); + } + + private static IRichSpout newProxySpout(String realSpoutClassName) { + try { + redstorm.proxy.Spout proxy = new redstorm.proxy.Spout(realSpoutClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +}