diff --git a/README.md b/README.md index 2ab2ff5..e04d9ac 100644 --- a/README.md +++ b/README.md @@ -69,9 +69,11 @@ $ gem install redstorm-x.y.z.gem RedStorm now support [Bundler](http://gembundler.com/) for using gems in your topology. Basically supply a `Gemfile` in the root of your project directory and execute this command to install the gems into the `target/gems` directory. **Note that if you change the Gemfile you must rerun this command**. ``` sh - $ redstorm --1.9 gems + $ redstorm --1.9 gems [--gemfile=GEMFILE] ``` +All `bundle install` command options can be passed as options to `redstorm --1.9 gem` like `--gemfile=GEMFILE` to specify a Gemfile in an alternate path. + Basically, the `redstorm --1.9 gems` command installs the *Bundler* and *Rake* gems and all the gems specified in the Gemfile into the `target/gems` directory. The idea is that in order for the topology to run in a Storm cluster, everything, including the fully *installed* gems, must be packaged and self-contained into a single JAR file. This has an important consequence: the gems will not be *installed* on the cluster target machines, they are already *installed* in the JAR file. This could possibly lead to problems if the machine used to *install* the gems is of a different architecture than the cluster target machines **and** some of these gems have *native* C/FFI extensions. ### Run in local mode @@ -118,12 +120,10 @@ $ redstorm --1.9 local examples/simple/exclamation_topology2.rb $ redstorm --1.9 local examples/simple/word_count_topology.rb ``` -This next example requires the use of the [Redis Gem](https://github.com/ezmobius/redis-rb) and a [Redis][redis] server running on `localhost:6379` - -Fist create a Gemfile with the Redis gem in it and run +To run `examples/simple/redis_word_count_topology.rb` you need a [Redis][redis] server running on `localhost:6379` ``` sh -$ redstorm --1.9 gems +$ redstorm --1.9 gems --gemfile examples/simple/Gemfile ``` Run the topology in local mode @@ -152,15 +152,13 @@ All examples using the [simple DSL](https://github.com/colinsurprenant/redstorm/ Note the **-Djruby.compat.version=RUBY1_9** parameter. -- to run `examples/simple/redis_word_count_topology.rb` you need a [Redis][redis] server running on `localhost:6379` and a Gemfile with the required Redis gem +- to run `examples/simple/redis_word_count_topology.rb` you need a [Redis][redis] server running on `localhost:6379` - - install gems, generate jar and submit: - - ``` sh - $ redstorm --1.9 gems - $ redstorm --1.9 jar examples - $ storm jar ./target/cluster-topology.jar -Djruby.compat.version=RUBY1_9 redstorm.TopologyLauncher cluster examples/simple/redis_word_count_topology.rb - ``` + ``` sh + $ redstorm --1.9 gems --gemfile examples/simple/Gemfile + $ redstorm --1.9 jar examples + $ storm jar ./target/cluster-topology.jar -Djruby.compat.version=RUBY1_9 redstorm.TopologyLauncher cluster examples/simple/redis_word_count_topology.rb + ``` - using `redis-cli`, push words into the `test` list and watch Storm pick them up diff --git a/examples/simple/Gemfile b/examples/simple/Gemfile new file mode 100644 index 0000000..c646e60 --- /dev/null +++ b/examples/simple/Gemfile @@ -0,0 +1,2 @@ +source :rubygems +gem 'redis' \ No newline at end of file diff --git a/examples/simple/random_sentence_spout.rb b/examples/simple/random_sentence_spout.rb index 5e193aa..84bff0e 100644 --- a/examples/simple/random_sentence_spout.rb +++ b/examples/simple/random_sentence_spout.rb @@ -3,7 +3,6 @@ require 'red_storm' module RedStorm module Examples class RandomSentenceSpout < RedStorm::SimpleSpout - set :is_distributed => true output_fields :word on_send {@sentences[rand(@sentences.length)]} diff --git a/lib/red_storm/proxy/bolt.rb b/lib/red_storm/proxy/bolt.rb index c739da6..04b403d 100644 --- a/lib/red_storm/proxy/bolt.rb +++ b/lib/red_storm/proxy/bolt.rb @@ -53,4 +53,9 @@ class Bolt def declareOutputFields(declarer) @real_bolt.declare_output_fields(declarer) end + + java_signature 'Map getComponentConfiguration()' + def getComponentConfiguration + @real_bolt.get_component_configuration + end end diff --git a/lib/red_storm/proxy/spout.rb b/lib/red_storm/proxy/spout.rb index f84c2d0..74d034e 100644 --- a/lib/red_storm/proxy/spout.rb +++ b/lib/red_storm/proxy/spout.rb @@ -18,7 +18,6 @@ java_package 'redstorm.proxy' # The real spout class implementation must define these methods: # - open(conf, context, collector) # - next_tuple -# - is_distributed # - declare_output_fields # # and optionnaly: @@ -38,11 +37,6 @@ class Spout @real_spout = Object.module_eval(real_spout_class_name).new end - java_signature 'boolean isDistributed()' - def isDistributed - @real_spout.respond_to?(:is_distributed) ? @real_spout.is_distributed : false - end - java_signature 'void open(Map, TopologyContext, SpoutOutputCollector)' def open(conf, context, collector) @real_spout.open(conf, context, collector) @@ -53,6 +47,16 @@ class Spout @real_spout.close if @real_spout.respond_to?(:close) end + java_signature 'void activate()' + def activate + @real_spout.activate if @real_spout.respond_to?(:activate) + end + + java_signature 'void deactivate()' + def deactivate + @real_spout.deactivate if @real_spout.respond_to?(:deactivate) + end + java_signature 'void nextTuple()' def nextTuple @real_spout.next_tuple @@ -72,4 +76,10 @@ class Spout def declareOutputFields(declarer) @real_spout.declare_output_fields(declarer) end + + java_signature 'Map getComponentConfiguration()' + def getComponentConfiguration + @real_spout.get_component_configuration + end + end diff --git a/lib/red_storm/simple_bolt.rb b/lib/red_storm/simple_bolt.rb index cdbf81d..d67089f 100644 --- a/lib/red_storm/simple_bolt.rb +++ b/lib/red_storm/simple_bolt.rb @@ -72,6 +72,11 @@ module RedStorm declarer.declare(Fields.new(self.class.fields)) end + def get_component_configuration + # TODO: dummy implemetation + Backtype::Config.new + end + private # default noop optional dsl callbacks diff --git a/lib/red_storm/simple_spout.rb b/lib/red_storm/simple_spout.rb index 588abe9..bf2f338 100644 --- a/lib/red_storm/simple_spout.rb +++ b/lib/red_storm/simple_spout.rb @@ -33,6 +33,14 @@ module RedStorm @on_close_block = block_given? ? on_close_block : lambda {self.send(method_name || :on_close)} end + def self.on_activate(method_name = nil, &on_activate_block) + @on_activate_block = block_given? ? on_activate_block : lambda {self.send(method_name || :on_activate)} + end + + def self.on_deactivate(method_name = nil, &on_deactivate_block) + @on_deactivate_block = block_given? ? on_deactivate_block : lambda {self.send(method_name || :on_deactivate)} + end + def self.on_ack(method_name = nil, &on_ack_block) @on_ack_block = block_given? ? on_ack_block : lambda {|msg_id| self.send(method_name || :on_ack, msg_id)} end @@ -76,12 +84,16 @@ module RedStorm instance_exec(&self.class.on_close_block) end - def declare_output_fields(declarer) - declarer.declare(Fields.new(self.class.fields)) + def activate + instance_exec(&self.class.on_activate_block) end - def is_distributed - self.class.is_distributed? + def deactivate + instance_exec(&self.class.on_deactivate_block) + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new(self.class.fields)) end def ack(msg_id) @@ -92,11 +104,18 @@ module RedStorm instance_exec(msg_id, &self.class.on_fail_block) end + def get_component_configuration + # TODO: dummy implemetation + Backtype::Config.new + end + private # default optional noop dsl methods/callbacks def on_init; end def on_close; end + def on_activate; end + def on_deactivate; end def on_ack(msg_id); end def on_fail(msg_id); end @@ -116,6 +135,14 @@ module RedStorm @on_close_block ||= lambda {self.send(:on_close)} end + def self.on_activate_block + @on_activate_block ||= lambda {self.send(:on_activate)} + end + + def self.on_deactivate_block + @on_deactivate_block ||= lambda {self.send(:on_deactivate)} + end + def self.on_ack_block @on_ack_block ||= lambda {|msg_id| self.send(:on_ack, msg_id)} end @@ -129,13 +156,10 @@ module RedStorm end def self.spout_options + # TODO remove is_distributed @spout_options ||= {:is_distributed => false} end - def self.is_distributed? - !!self.spout_options[:is_distributed] - end - def self.emit? !!self.send_options[:emit] end diff --git a/lib/tasks/red_storm.rake b/lib/tasks/red_storm.rake index 5387138..73b3003 100644 --- a/lib/tasks/red_storm.rake +++ b/lib/tasks/red_storm.rake @@ -9,6 +9,8 @@ rescue LoadError require 'red_storm' end +STORM_VERSION = "0.7.1" + CWD = Dir.pwd TARGET_DIR = "#{CWD}/target" TARGET_SRC_DIR = "#{TARGET_DIR}/src" @@ -143,10 +145,12 @@ task :build => :setup do build_java_dir("#{TARGET_SRC_DIR}") end -task :gems => :setup do +task :gems, [:gemfile] => :setup do |t, args| + bundler_options = args[:gemfile].split(":").join(" ") + system("gem install bundler --install-dir #{TARGET_GEMS_DIR}/gems --no-ri --no-rdoc") system("gem install rake --version 0.9.2.2 --install-dir #{TARGET_GEMS_DIR}/gems --no-ri --no-rdoc") - system("jruby #{RedStorm::RUNTIME['RUBY_VERSION']} -S bundle install --path #{TARGET_GEMS_DIR}/bundler/") + system("jruby #{RedStorm::RUNTIME['RUBY_VERSION']} -S bundle install #{bundler_options} --path #{TARGET_GEMS_DIR}/bundler/") end def build_java_dir(source_folder) @@ -161,10 +165,12 @@ def build_java_dir(source_folder) :includeantruntime => "no", :verbose => false, :listfiles => true - ) + ) do + # compilerarg :value => "-Xlint:unchecked" + end end def build_jruby(source_path) puts("\n--> Compiling JRuby") - system("cd #{RedStorm::REDSTORM_HOME}; jrubyc -t #{TARGET_SRC_DIR} --verbose --java -c \"#{TARGET_DEPENDENCY_DIR}/storm-0.6.2.jar\" -c \"#{TARGET_CLASSES_DIR}\" #{source_path}") + system("cd #{RedStorm::REDSTORM_HOME}; jrubyc -t #{TARGET_SRC_DIR} --verbose --java -c \"#{TARGET_DEPENDENCY_DIR}/storm-#{STORM_VERSION}.jar\" -c \"#{TARGET_CLASSES_DIR}\" #{source_path}") end diff --git a/pom.xml b/pom.xml index 4c32462..aaa4f2f 100644 --- a/pom.xml +++ b/pom.xml @@ -27,7 +27,7 @@ storm storm - 0.6.2 + 0.7.1 diff --git a/src/main/redstorm/storm/jruby/JRubyBolt.java b/src/main/redstorm/storm/jruby/JRubyBolt.java index 1e9ed5c..4073868 100644 --- a/src/main/redstorm/storm/jruby/JRubyBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyBolt.java @@ -58,6 +58,16 @@ public class JRubyBolt implements IRichBolt { bolt.declareOutputFields(declarer); } + @Override + public Map getComponentConfiguration() { + // getComponentConfiguration is executed in the topology creation time, before serialisation. + // do not set the _proxyBolt instance variable here to avoid JRuby serialization + // issues. Just create tmp bolt instance to call declareOutputFields. + IRichBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); + return bolt.getComponentConfiguration(); + } + + private static IRichBolt newProxyBolt(String baseClassPath, String realBoltClassName) { try { redstorm.proxy.Bolt proxy = new redstorm.proxy.Bolt(baseClassPath, realBoltClassName); diff --git a/src/main/redstorm/storm/jruby/JRubySpout.java b/src/main/redstorm/storm/jruby/JRubySpout.java index 30b8696..e767abd 100644 --- a/src/main/redstorm/storm/jruby/JRubySpout.java +++ b/src/main/redstorm/storm/jruby/JRubySpout.java @@ -33,15 +33,6 @@ public class JRubySpout implements IRichSpout { _realSpoutClassName = realSpoutClassName; } - @Override - public boolean isDistributed() { - // isDistributed is executed in the topology creation time before serialisation. - // do not set the _proxySpout instance variable here to avoid JRuby serialization - // issues. Just create tmp spout instance to call isDistributed. - IRichSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); - return spout.isDistributed(); - } - @Override public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) { // create instance of the jruby proxy class here, after deserialization in the workers. @@ -54,6 +45,16 @@ public class JRubySpout implements IRichSpout { _proxySpout.close(); } + @Override + public void activate() { + _proxySpout.activate(); + } + + @Override + public void deactivate() { + _proxySpout.deactivate(); + } + @Override public void nextTuple() { _proxySpout.nextTuple(); @@ -78,6 +79,15 @@ public class JRubySpout implements IRichSpout { spout.declareOutputFields(declarer); } + @Override + public Map getComponentConfiguration() { + // getComponentConfiguration is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + IRichSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getComponentConfiguration(); + } + private static IRichSpout newProxySpout(String baseClassPath, String realSpoutClassName) { try { redstorm.proxy.Spout proxy = new redstorm.proxy.Spout(baseClassPath, realSpoutClassName);