diff --git a/.gitignore b/.gitignore index 8942e7f..ac26b82 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ storm/lib/ storm/classes/ +tmp/ diff --git a/Rakefile b/Rakefile index b428064..49e6635 100644 --- a/Rakefile +++ b/Rakefile @@ -37,28 +37,15 @@ end task :build => :setup do # first compile the JRuby proxy classes, required by the Java bindings - build_jruby("#{JRUBY_SRC_DIR}") + build_jruby("#{JRUBY_SRC_DIR}/proxy") # compile the Storm Java->JRuby bindings build_java("#{JAVA_SRC_DIR}/backtype/storm/jruby") - # compile the Ruby examples - build_jruby("#{EXAMPLES_SRC_DIR}") - - # compile the Ruby user-created topologies - unless Dir["#{TOPOLOGIES_SRC_DIR}/*"].empty? - build_jruby("#{TOPOLOGIES_SRC_DIR}") - end + # first compile the JRuby proxy classes, required by the Java bindings + build_jruby("#{JRUBY_SRC_DIR}/topology_launcher.rb") end -task :storm do - unless ENV['class'] - puts("usage: rake storm class={fully qualified java class name}") - exit(1) - end - system("java -cp \"./#{CLASSES_DIR}:./#{RUNTIME_LIB_DIR}/*:./#{DEV_LIB_DIR}/*:#{JRUBY_JAR}\" #{ENV['class']}") -end - def build_java(source_folder) puts("\n--> Building Java:") ant.javac( diff --git a/bin/redstorm b/bin/redstorm new file mode 100755 index 0000000..8346de8 --- /dev/null +++ b/bin/redstorm @@ -0,0 +1,7 @@ +JRUBY_JAR="${HOME}/.rvm/rubies/jruby-1.6.5/lib/jruby-complete.jar" +STORM_DIR="./storm" +RUNTIME_LIB_DIR="${STORM_DIR}/lib" +DEV_LIB_DIR="${STORM_DIR}/lib/dev" +CLASSES_DIR="${STORM_DIR}/classes" + +java -cp "./${CLASSES_DIR}:./${RUNTIME_LIB_DIR}/*:./${DEV_LIB_DIR}/*:${JRUBY_JAR}" TopologyLauncher $@ \ No newline at end of file diff --git a/examples/ruby_exclamation_bolt.rb b/examples/ruby_exclamation_bolt.rb index 1c483a9..72d1071 100644 --- a/examples/ruby_exclamation_bolt.rb +++ b/examples/ruby_exclamation_bolt.rb @@ -1,6 +1,4 @@ - class RubyExclamationBolt - def prepare(conf, context, collector) @collector = collector end @@ -13,5 +11,4 @@ class RubyExclamationBolt def declare_output_fields(declarer) declarer.declare(Fields.new("word")) end - end diff --git a/examples/ruby_exclamation_topology.rb b/examples/ruby_exclamation_topology.rb index 102e5dd..3e665bc 100644 --- a/examples/ruby_exclamation_topology.rb +++ b/examples/ruby_exclamation_topology.rb @@ -1,27 +1,8 @@ -require 'java' - -java_import 'backtype.storm.Config' -java_import 'backtype.storm.LocalCluster' -java_import 'backtype.storm.task.OutputCollector' -java_import 'backtype.storm.task.TopologyContext' java_import 'backtype.storm.testing.TestWordSpout' -java_import 'backtype.storm.topology.IRichBolt' -java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'backtype.storm.topology.TopologyBuilder' -java_import 'backtype.storm.tuple.Fields' -java_import 'backtype.storm.tuple.Tuple' -java_import 'backtype.storm.tuple.Values' -java_import 'backtype.storm.utils.Utils' -java_import 'java.util.Map' - -java_import 'backtype.storm.jruby.JRubyBolt' - require 'examples/ruby_exclamation_bolt' class RubyExclamationTopology - - java_signature 'void main(String[])' - def self.main(args) + def start builder = TopologyBuilder.new builder.setSpout(1, TestWordSpout.new, 10) @@ -37,5 +18,4 @@ class RubyExclamationTopology cluster.killTopology("test") cluster.shutdown end - end diff --git a/examples/ruby_exclamation_topology2.rb b/examples/ruby_exclamation_topology2.rb index ec79bd8..5a87d1e 100644 --- a/examples/ruby_exclamation_topology2.rb +++ b/examples/ruby_exclamation_topology2.rb @@ -1,23 +1,6 @@ -require 'java' - -java_import 'backtype.storm.Config' -java_import 'backtype.storm.LocalCluster' -java_import 'backtype.storm.task.OutputCollector' -java_import 'backtype.storm.task.TopologyContext' java_import 'backtype.storm.testing.TestWordSpout' -java_import 'backtype.storm.topology.IRichBolt' -java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'backtype.storm.topology.TopologyBuilder' -java_import 'backtype.storm.tuple.Fields' -java_import 'backtype.storm.tuple.Tuple' -java_import 'backtype.storm.tuple.Values' -java_import 'backtype.storm.utils.Utils' -java_import 'java.util.Map' - -java_import 'backtype.storm.jruby.JRubyBolt' class RubyExclamationBolt2 - def prepare(conf, context, collector) @collector = collector end @@ -30,13 +13,10 @@ class RubyExclamationBolt2 def declare_output_fields(declarer) declarer.declare(Fields.new("word")) end - end class RubyExclamationTopology2 - - java_signature 'void main(String[])' - def self.main(args) + def start builder = TopologyBuilder.new builder.setSpout(1, TestWordSpout.new, 10) @@ -52,5 +32,4 @@ class RubyExclamationTopology2 cluster.killTopology("test") cluster.shutdown end - end diff --git a/examples/ruby_redis_word_count.rb b/examples/ruby_redis_word_count_topology.rb similarity index 68% rename from examples/ruby_redis_word_count.rb rename to examples/ruby_redis_word_count_topology.rb index 200ae50..689bf8a 100644 --- a/examples/ruby_redis_word_count.rb +++ b/examples/ruby_redis_word_count_topology.rb @@ -1,29 +1,9 @@ -require 'java' -require 'rubygems' require 'redis' require 'thread' -java_import 'backtype.storm.Config' -java_import 'backtype.storm.LocalCluster' -java_import 'backtype.storm.task.OutputCollector' -java_import 'backtype.storm.task.TopologyContext' -java_import 'backtype.storm.testing.TestWordSpout' -java_import 'backtype.storm.topology.IRichBolt' -java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'backtype.storm.topology.TopologyBuilder' -java_import 'backtype.storm.tuple.Fields' -java_import 'backtype.storm.tuple.Tuple' -java_import 'backtype.storm.tuple.Values' -java_import 'backtype.storm.utils.Utils' -java_import 'java.util.Map' - -java_import 'backtype.storm.jruby.JRubyBolt' -java_import 'backtype.storm.jruby.JRubySpout' - # RubyRedisWordSpout reads the Redis queue "test" on localhost:6379 # and emits each word items pop'ed from the queue. class RubyRedisWordSpout - def is_distributed false end @@ -36,6 +16,7 @@ class RubyRedisWordSpout end def next_tuple + # per doc nextTuple should not block, and sleep a bit when there's no data to process. if @q.size > 0 @collector.emit(Values.new(@q.pop)) else @@ -64,7 +45,6 @@ class RubyRedisWordSpout end class RubyRedisWordCount - def initialize @counts = Hash.new{|h, k| h[k] = 0} end @@ -85,9 +65,7 @@ class RubyRedisWordCount end class RubyRedisWordCountTopology - - java_signature 'void main(String[])' - def self.main(args) + def start builder = TopologyBuilder.new builder.setSpout(1, JRubySpout.new("RubyRedisWordSpout"), 1) builder.setBolt(2, JRubyBolt.new("RubyRedisWordCount"), 3).fieldsGrouping(1, Fields.new("word")) diff --git a/examples/ruby_word_count_topology.rb b/examples/ruby_word_count_topology.rb index 4e486b8..c33ea0a 100644 --- a/examples/ruby_word_count_topology.rb +++ b/examples/ruby_word_count_topology.rb @@ -1,23 +1,3 @@ -require 'java' - -java_import 'backtype.storm.Config' -java_import 'backtype.storm.LocalCluster' -java_import 'backtype.storm.task.OutputCollector' -java_import 'backtype.storm.task.TopologyContext' -java_import 'backtype.storm.testing.TestWordSpout' -java_import 'backtype.storm.topology.IRichBolt' -java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'backtype.storm.topology.TopologyBuilder' -java_import 'backtype.storm.tuple.Fields' -java_import 'backtype.storm.tuple.Tuple' -java_import 'backtype.storm.tuple.Values' -java_import 'backtype.storm.utils.Utils' -java_import 'java.util.Map' - -java_import 'backtype.storm.jruby.JRubyBolt' -java_import 'backtype.storm.jruby.JRubySpout' - - class RubyRandomSentenceSpout def initialize @sentences = [ @@ -82,9 +62,7 @@ end class RubyWordCountTopology - - java_signature 'void main(String[])' - def self.main(args) + def start builder = TopologyBuilder.new builder.setSpout(1, JRubySpout.new("RubyRandomSentenceSpout"), 5) builder.setBolt(2, JRubyBolt.new("RubySplitSentence"), 8).shuffleGrouping(1) diff --git a/lib/red_storm/topology_launcher.rb b/lib/red_storm/topology_launcher.rb new file mode 100644 index 0000000..2053c35 --- /dev/null +++ b/lib/red_storm/topology_launcher.rb @@ -0,0 +1,34 @@ +require 'java' +require 'rubygems' + +java_import 'backtype.storm.Config' +java_import 'backtype.storm.LocalCluster' +java_import 'backtype.storm.topology.TopologyBuilder' +java_import 'backtype.storm.tuple.Fields' +java_import 'backtype.storm.tuple.Tuple' +java_import 'backtype.storm.tuple.Values' + +java_import 'backtype.storm.jruby.JRubyBolt' +java_import 'backtype.storm.jruby.JRubySpout' + + +class TopologyLauncher + + java_signature 'void main(String[])' + def self.main(args) + unless args.size > 0 + puts("usage: redstorm {ruby topology class file path}") + exit(1) + end + require args[0] + clazz = camel_case(args[0].split('/').last.split('.').first) + puts("redstorm launching #{clazz}") + Object.module_eval(clazz).new.start + end + + private + + def self.camel_case(s) + s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase } + end +end \ No newline at end of file