diff --git a/examples/ruby_exclamation_topology.rb b/examples/ruby_exclamation_topology.rb index 3e665bc..25590d7 100644 --- a/examples/ruby_exclamation_topology.rb +++ b/examples/ruby_exclamation_topology.rb @@ -2,12 +2,12 @@ java_import 'backtype.storm.testing.TestWordSpout' require 'examples/ruby_exclamation_bolt' class RubyExclamationTopology - def start + def start(base_class_path) builder = TopologyBuilder.new builder.setSpout(1, TestWordSpout.new, 10) - builder.setBolt(2, JRubyBolt.new("RubyExclamationBolt"), 3).shuffleGrouping(1) - builder.setBolt(3, JRubyBolt.new("RubyExclamationBolt"), 2).shuffleGrouping(2) + builder.setBolt(2, JRubyBolt.new(base_class_path, "RubyExclamationBolt"), 3).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyExclamationBolt"), 2).shuffleGrouping(2) conf = Config.new conf.setDebug(true) diff --git a/examples/ruby_exclamation_topology2.rb b/examples/ruby_exclamation_topology2.rb index 5a87d1e..87a4ae0 100644 --- a/examples/ruby_exclamation_topology2.rb +++ b/examples/ruby_exclamation_topology2.rb @@ -16,12 +16,12 @@ class RubyExclamationBolt2 end class RubyExclamationTopology2 - def start + def start(base_class_path) builder = TopologyBuilder.new builder.setSpout(1, TestWordSpout.new, 10) - builder.setBolt(2, JRubyBolt.new("RubyExclamationBolt2"), 3).shuffleGrouping(1) - builder.setBolt(3, JRubyBolt.new("RubyExclamationBolt2"), 2).shuffleGrouping(2) + builder.setBolt(2, JRubyBolt.new(base_class_path, "RubyExclamationBolt2"), 3).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyExclamationBolt2"), 2).shuffleGrouping(2) conf = Config.new conf.setDebug(true) diff --git a/examples/ruby_redis_word_count_topology.rb b/examples/ruby_redis_word_count_topology.rb index 689bf8a..b510a74 100644 --- a/examples/ruby_redis_word_count_topology.rb +++ b/examples/ruby_redis_word_count_topology.rb @@ -4,13 +4,8 @@ require 'thread' # RubyRedisWordSpout reads the Redis queue "test" on localhost:6379 # and emits each word items pop'ed from the queue. class RubyRedisWordSpout - def is_distributed - false - end - def open(conf, context, collector) @collector = collector - @q = Queue.new @redis_reader = detach_redis_reader end @@ -65,10 +60,10 @@ class RubyRedisWordCount end class RubyRedisWordCountTopology - def start + def start(base_class_path) builder = TopologyBuilder.new - builder.setSpout(1, JRubySpout.new("RubyRedisWordSpout"), 1) - builder.setBolt(2, JRubyBolt.new("RubyRedisWordCount"), 3).fieldsGrouping(1, Fields.new("word")) + builder.setSpout(1, JRubySpout.new(base_class_path, "RubyRedisWordSpout"), 1) + builder.setBolt(2, JRubyBolt.new(base_class_path, "RubyRedisWordCount"), 3).fieldsGrouping(1, Fields.new("word")) conf = Config.new conf.setDebug(true) diff --git a/examples/ruby_word_count_topology.rb b/examples/ruby_word_count_topology.rb index c33ea0a..0b53a85 100644 --- a/examples/ruby_word_count_topology.rb +++ b/examples/ruby_word_count_topology.rb @@ -1,5 +1,8 @@ class RubyRandomSentenceSpout + attr_reader :is_distributed + def initialize + @is_distributed = true @sentences = [ "the cow jumped over the moon", "an apple a day keeps the doctor away", @@ -9,10 +12,6 @@ class RubyRandomSentenceSpout ] end - def is_distributed - true - end - def open(conf, context, collector) @collector = collector end @@ -62,11 +61,11 @@ end class RubyWordCountTopology - def start + def start(base_class_path) builder = TopologyBuilder.new - builder.setSpout(1, JRubySpout.new("RubyRandomSentenceSpout"), 5) - builder.setBolt(2, JRubyBolt.new("RubySplitSentence"), 8).shuffleGrouping(1) - builder.setBolt(3, JRubyBolt.new("RubyWordCount"), 12).fieldsGrouping(2, Fields.new("word")) + builder.setSpout(1, JRubySpout.new(base_class_path, "RubyRandomSentenceSpout"), 5) + builder.setBolt(2, JRubyBolt.new(base_class_path, "RubySplitSentence"), 8).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyWordCount"), 12).fieldsGrouping(2, Fields.new("word")) conf = Config.new conf.setDebug(true) diff --git a/lib/red_storm/proxy/bolt.rb b/lib/red_storm/proxy/bolt.rb index 809946b..eb9434f 100644 --- a/lib/red_storm/proxy/bolt.rb +++ b/lib/red_storm/proxy/bolt.rb @@ -4,9 +4,7 @@ java_import 'backtype.storm.task.OutputCollector' java_import 'backtype.storm.task.TopologyContext' java_import 'backtype.storm.topology.IRichBolt' java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'backtype.storm.tuple.Fields' java_import 'backtype.storm.tuple.Tuple' -java_import 'backtype.storm.tuple.Values' java_import 'java.util.Map' java_package 'redstorm.proxy' @@ -25,8 +23,11 @@ java_package 'redstorm.proxy' class Bolt java_implements IRichBolt - java_signature 'IRichBolt (String real_bolt_class_name)' - def initialize(real_bolt_class_name) + java_signature 'IRichBolt (String base_class_path, String real_bolt_class_name)' + def initialize(base_class_path, real_bolt_class_name) + @real_bolt = Object.module_eval(real_bolt_class_name).new + rescue NameError + require base_class_path @real_bolt = Object.module_eval(real_bolt_class_name).new end @@ -49,5 +50,4 @@ class Bolt def declareOutputFields(declarer) @real_bolt.declare_output_fields(declarer) end - end diff --git a/lib/red_storm/proxy/spout.rb b/lib/red_storm/proxy/spout.rb index 6ccdb54..b154b9c 100644 --- a/lib/red_storm/proxy/spout.rb +++ b/lib/red_storm/proxy/spout.rb @@ -4,9 +4,7 @@ java_import 'backtype.storm.spout.SpoutOutputCollector' java_import 'backtype.storm.task.TopologyContext' java_import 'backtype.storm.topology.IRichSpout' java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'backtype.storm.tuple.Fields' java_import 'backtype.storm.tuple.Tuple' -java_import 'backtype.storm.tuple.Values' java_import 'java.util.Map' java_package 'redstorm.proxy' @@ -28,14 +26,17 @@ java_package 'redstorm.proxy' class Spout java_implements IRichSpout - java_signature 'IRichSpout (String real_spout_class_name)' - def initialize(real_spout_class_name) + java_signature 'IRichSpout (String base_class_path, String real_spout_class_name)' + def initialize(base_class_path, real_spout_class_name) + @real_spout = Object.module_eval(real_spout_class_name).new + rescue NameError + require base_class_path @real_spout = Object.module_eval(real_spout_class_name).new end java_signature 'boolean isDistributed()' def isDistributed - @real_spout.is_distributed + @real_spout.respond_to?(:is_distributed) ? @real_spout.is_distributed : false end java_signature 'void open(Map, TopologyContext, SpoutOutputCollector)' @@ -67,5 +68,4 @@ class Spout def declareOutputFields(declarer) @real_spout.declare_output_fields(declarer) end - end diff --git a/lib/red_storm/topology_launcher.rb b/lib/red_storm/topology_launcher.rb index 2053c35..aeb8a3d 100644 --- a/lib/red_storm/topology_launcher.rb +++ b/lib/red_storm/topology_launcher.rb @@ -1,8 +1,11 @@ require 'java' require 'rubygems' +require 'lib/red_storm/version' + java_import 'backtype.storm.Config' java_import 'backtype.storm.LocalCluster' +java_import 'backtype.storm.StormSubmitter' java_import 'backtype.storm.topology.TopologyBuilder' java_import 'backtype.storm.tuple.Fields' java_import 'backtype.storm.tuple.Tuple' @@ -11,19 +14,21 @@ java_import 'backtype.storm.tuple.Values' java_import 'backtype.storm.jruby.JRubyBolt' java_import 'backtype.storm.jruby.JRubySpout' - +# TopologyLauncher is the application entry point when launching a topology. Basically it will +# call require on the specified Ruby topology/project class file path and call its start method class TopologyLauncher java_signature 'void main(String[])' def self.main(args) unless args.size > 0 - puts("usage: redstorm {ruby topology class file path}") + puts("Usage: redstorm topology_class_file") exit(1) end - require args[0] - clazz = camel_case(args[0].split('/').last.split('.').first) - puts("redstorm launching #{clazz}") - Object.module_eval(clazz).new.start + class_path = args[0] + clazz = camel_case(class_path.split('/').last.split('.').first) + puts("redstorm v#{Redstorm::VERSION} launching #{clazz}") + require class_path + Object.module_eval(clazz).new.start(class_path) end private @@ -31,4 +36,4 @@ class TopologyLauncher def self.camel_case(s) s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase } end -end \ No newline at end of file +end diff --git a/lib/red_storm/version.rb b/lib/red_storm/version.rb new file mode 100644 index 0000000..dc3fa18 --- /dev/null +++ b/lib/red_storm/version.rb @@ -0,0 +1,3 @@ +module Redstorm + VERSION = '0.1' +end \ No newline at end of file diff --git a/storm/src/jvm/backtype/storm/jruby/JRubyBolt.java b/storm/src/jvm/backtype/storm/jruby/JRubyBolt.java index ef1d075..b73099c 100644 --- a/storm/src/jvm/backtype/storm/jruby/JRubyBolt.java +++ b/storm/src/jvm/backtype/storm/jruby/JRubyBolt.java @@ -20,20 +20,22 @@ import java.util.Map; public class JRubyBolt implements IRichBolt { IRichBolt _proxyBolt; String _realBoltClassName; - + String _baseClassPath; /** * create a new JRubyBolt * + * @param baseClassPath the topology/project base JRuby class file path * @param realBoltClassName the fully qualified JRuby bolt implementation class name */ - public JRubyBolt(String realBoltClassName) { + public JRubyBolt(String baseClassPath, String realBoltClassName) { + _baseClassPath = baseClassPath; _realBoltClassName = realBoltClassName; } @Override public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) { // create instance of the jruby class here, after deserialization in the workers. - _proxyBolt = newProxyBolt(_realBoltClassName); + _proxyBolt = newProxyBolt(_baseClassPath, _realBoltClassName); _proxyBolt.prepare(stormConf, context, collector); } @@ -52,13 +54,13 @@ public class JRubyBolt implements IRichBolt { // declareOutputFields is executed in the topology creation time, before serialisation. // do not set the _proxyBolt instance variable here to avoid JRuby serialization // issues. Just create tmp bolt instance to call declareOutputFields. - IRichBolt bolt = newProxyBolt(_realBoltClassName); + IRichBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); bolt.declareOutputFields(declarer); } - private static IRichBolt newProxyBolt(String realBoltClassName) { + private static IRichBolt newProxyBolt(String baseClassPath, String realBoltClassName) { try { - redstorm.proxy.Bolt proxy = new redstorm.proxy.Bolt(realBoltClassName); + redstorm.proxy.Bolt proxy = new redstorm.proxy.Bolt(baseClassPath, realBoltClassName); return proxy; } catch (Exception e) { diff --git a/storm/src/jvm/backtype/storm/jruby/JRubySpout.java b/storm/src/jvm/backtype/storm/jruby/JRubySpout.java index 631e9d6..4cd7a7d 100644 --- a/storm/src/jvm/backtype/storm/jruby/JRubySpout.java +++ b/storm/src/jvm/backtype/storm/jruby/JRubySpout.java @@ -20,13 +20,16 @@ import java.util.Map; public class JRubySpout implements IRichSpout { IRichSpout _proxySpout; String _realSpoutClassName; + String _baseClassPath; /** * create a new JRubySpout * + * @param baseClassPath the topology/project base JRuby class file path * @param realSpoutClassName the fully qualified JRuby spout implementation class name */ - public JRubySpout(String realSpoutClassName) { + public JRubySpout(String baseClassPath, String realSpoutClassName) { + _baseClassPath = baseClassPath; _realSpoutClassName = realSpoutClassName; } @@ -35,14 +38,14 @@ public class JRubySpout implements IRichSpout { // isDistributed is executed in the topology creation time before serialisation. // do not set the _proxySpout instance variable here to avoid JRuby serialization // issues. Just create tmp spout instance to call isDistributed. - IRichSpout spout = newProxySpout(_realSpoutClassName); + IRichSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); return spout.isDistributed(); } @Override public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) { // create instance of the jruby proxy class here, after deserialization in the workers. - _proxySpout = newProxySpout(_realSpoutClassName); + _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); _proxySpout.open(conf, context, collector); } @@ -71,13 +74,13 @@ public class JRubySpout implements IRichSpout { // declareOutputFields is executed in the topology creation time before serialisation. // do not set the _proxySpout instance variable here to avoid JRuby serialization // issues. Just create tmp spout instance to call declareOutputFields. - IRichSpout spout = newProxySpout(_realSpoutClassName); + IRichSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); spout.declareOutputFields(declarer); } - private static IRichSpout newProxySpout(String realSpoutClassName) { + private static IRichSpout newProxySpout(String baseClassPath, String realSpoutClassName) { try { - redstorm.proxy.Spout proxy = new redstorm.proxy.Spout(realSpoutClassName); + redstorm.proxy.Spout proxy = new redstorm.proxy.Spout(baseClassPath, realSpoutClassName); return proxy; } catch (Exception e) {