From ba19200e6ba73f8a545befb6b37099d7f867e829 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Sun, 2 Mar 2014 03:04:01 -0500 Subject: [PATCH] removed remaining ruby proxy classes, java proxies now use JRuby API directly --- lib/red_storm/dsl/batch_bolt.rb | 34 +++++ lib/red_storm/dsl/batch_committer_bolt.rb | 9 ++ lib/red_storm/dsl/batch_spout.rb | 53 ++++++++ lib/red_storm/dsl/bolt.rb | 2 + lib/red_storm/dsl/spout.rb | 4 +- lib/red_storm/dsl/topology.rb | 4 +- lib/red_storm/dsl/tuple.rb | 2 + lib/red_storm/proxy/batch_bolt.rb | 59 --------- lib/red_storm/proxy/batch_committer_bolt.rb | 48 ------- lib/red_storm/proxy/batch_spout.rb | 55 -------- lib/red_storm/proxy/bolt.rb | 60 --------- lib/red_storm/proxy/proxy_function.rb | 36 ------ lib/red_storm/proxy/spout.rb | 84 ------------- .../proxy/transactional_committer_spout.rb | 43 ------- lib/red_storm/proxy/transactional_spout.rb | 41 ------ lib/red_storm/topology_launcher.rb | 16 +-- .../redstorm/storm/jruby/JRubyBatchBolt.java | 90 +++++++------ .../redstorm/storm/jruby/JRubyBatchSpout.java | 119 +++++++++++------- src/main/redstorm/storm/jruby/JRubyBolt.java | 3 +- .../storm/jruby/JRubyProxyFunction.java | 52 +++++--- src/main/redstorm/storm/jruby/JRubySpout.java | 102 +++++++++------ .../storm/jruby/JRubyTransactionalBolt.java | 92 ++++++++------ .../JRubyTransactionalCommitterBolt.java | 23 +--- .../JRubyTransactionalCommitterSpout.java | 40 +++--- .../storm/jruby/JRubyTransactionalSpout.java | 97 ++++++++------ 25 files changed, 479 insertions(+), 689 deletions(-) create mode 100644 lib/red_storm/dsl/batch_bolt.rb create mode 100644 lib/red_storm/dsl/batch_committer_bolt.rb create mode 100644 lib/red_storm/dsl/batch_spout.rb delete mode 100644 lib/red_storm/proxy/batch_bolt.rb delete mode 100644 lib/red_storm/proxy/batch_committer_bolt.rb delete mode 100644 lib/red_storm/proxy/batch_spout.rb delete mode 100644 lib/red_storm/proxy/bolt.rb delete mode 100644 lib/red_storm/proxy/proxy_function.rb delete mode 100644 lib/red_storm/proxy/spout.rb delete mode 100644 lib/red_storm/proxy/transactional_committer_spout.rb delete mode 100644 lib/red_storm/proxy/transactional_spout.rb diff --git a/lib/red_storm/dsl/batch_bolt.rb b/lib/red_storm/dsl/batch_bolt.rb new file mode 100644 index 0000000..262fe32 --- /dev/null +++ b/lib/red_storm/dsl/batch_bolt.rb @@ -0,0 +1,34 @@ +module RedStorm + module DSL + + class BatchBolt < Bolt + attr_reader :id + + def self.java_proxy; "Java::RedstormStormJruby::JRubyBatchBolt"; end + + def self.on_finish_batch(method_name = nil, &on_finish_batch_block) + body = block_given? ? on_finish_batch_block : lambda {self.send((method_name || :on_finish_batch).to_sym)} + define_method(:on_finish_batch, body) + end + + def prepare(config, context, collector, id) + @collector = collector + @context = context + @config = config + @id = id + + on_init + end + + def finish_batch + on_finish_batch + end + + private + + # default noop optional dsl callbacks + def on_finish_batch; end + + end + end +end \ No newline at end of file diff --git a/lib/red_storm/dsl/batch_committer_bolt.rb b/lib/red_storm/dsl/batch_committer_bolt.rb new file mode 100644 index 0000000..aaf8898 --- /dev/null +++ b/lib/red_storm/dsl/batch_committer_bolt.rb @@ -0,0 +1,9 @@ +module RedStorm + module DSL + + class BatchCommitterBolt < BatchBolt + + def self.java_proxy; "Java::RedstormStormJruby::JRubyBatchCommitterBolt"; end + end + end +end \ No newline at end of file diff --git a/lib/red_storm/dsl/batch_spout.rb b/lib/red_storm/dsl/batch_spout.rb new file mode 100644 index 0000000..26586cd --- /dev/null +++ b/lib/red_storm/dsl/batch_spout.rb @@ -0,0 +1,53 @@ +module RedStorm + module DSL + + class BatchSpout < Spout + + def self.java_proxy; "Java::RedstormStormJruby::JRubyBatchSpout"; end + + def get_output_fields + Fields.new(self.class.fields) + end + + def self.on_emit_batch(*args, &on_emit_batch_block) + options = args.last.is_a?(Hash) ? args.pop : {} + method_name = args.first + + self.on_emit_batch_options.merge!(options) + + # indirecting through a lambda defers the method lookup at invocation time + # and the performance penalty is negligible + body = block_given? ? on_emit_batch_block : lambda{|batch_id, collector| self.send((method_name || :on_emit_batch).to_sym)} + define_method(:on_emit_batch, body) + end + + # Spout proxy interface + + # + # note that in batch spout, ack is for the batch id and not the message id as in the base spout. + # TODO maybe rename msg_id to just id in the base spout + # + + def emit_batch(batch_id, collector) + # TODO this is a TridentCollector, emit should just work by setting @collector + # report_error need to be hooked? + @collector = collector + on_emit_batch(batch_id, collector) + end + + def open(config, context) + @context = context + @config = config + + on_init + end + + private + + def self.on_emit_batch_options + @on_emit_batch_options ||= {} + end + + end + end +end diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb index 0905694..c3895ce 100644 --- a/lib/red_storm/dsl/bolt.rb +++ b/lib/red_storm/dsl/bolt.rb @@ -14,6 +14,8 @@ module RedStorm class Bolt attr_reader :collector, :context, :config + def self.java_proxy; "Java::RedstormStormJruby::JRubyBolt"; end + # DSL class methods def self.log diff --git a/lib/red_storm/dsl/spout.rb b/lib/red_storm/dsl/spout.rb index 7c435c9..4846742 100644 --- a/lib/red_storm/dsl/spout.rb +++ b/lib/red_storm/dsl/spout.rb @@ -11,6 +11,8 @@ module RedStorm class Spout attr_reader :config, :context, :collector + def self.java_proxy; "Java::RedstormStormJruby::JRubySpout"; end + # DSL class methods def self.configure(&configure_block) @@ -99,7 +101,7 @@ module RedStorm unreliable_emit(*values) end else - sleep(0.1) + sleep(0.1) # see https://twitter.com/colinsurprenant/status/406445541904494592 end end end diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index d34562e..923a323 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -46,7 +46,7 @@ module RedStorm elsif is_java? @clazz.new(*constructor_args) else - JRubySpout.new(@clazz.base_class_path, @clazz.name, @output_fields) + Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields) end end end @@ -95,7 +95,7 @@ module RedStorm elsif is_java? @clazz.new(*constructor_args) else - JRubyBolt.new(@clazz.base_class_path, @clazz.name, @output_fields) + Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields) end end end diff --git a/lib/red_storm/dsl/tuple.rb b/lib/red_storm/dsl/tuple.rb index 8df68ab..f8bbe25 100644 --- a/lib/red_storm/dsl/tuple.rb +++ b/lib/red_storm/dsl/tuple.rb @@ -23,6 +23,8 @@ class TupleImpl end alias_method :[], :value + alias_method :values, :getValues # wire directly to avoid method missing indirection + def field_index(field) fieldIndex(field.to_s) end diff --git a/lib/red_storm/proxy/batch_bolt.rb b/lib/red_storm/proxy/batch_bolt.rb deleted file mode 100644 index 0593722..0000000 --- a/lib/red_storm/proxy/batch_bolt.rb +++ /dev/null @@ -1,59 +0,0 @@ -require 'java' - -java_import 'backtype.storm.coordination.BatchOutputCollector' -java_import 'backtype.storm.task.TopologyContext' -java_import 'backtype.storm.topology.IRichBolt' -java_import 'backtype.storm.coordination.IBatchBolt' -java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'backtype.storm.tuple.Tuple' -java_import 'java.util.Map' - -java_package 'redstorm.proxy' - -# the Bolt class is a proxy to the real bolt to avoid having to deal with all the -# Java artifacts when creating a bolt. -# -# The real bolt class implementation must define these methods: -# - prepare(conf, context, collector) -# - execute(tuple) -# - declare_output_fields -# -# and optionnaly: -# - cleanup -# -class BatchBolt - java_implements IBatchBolt - - java_signature 'IBatchBolt (String base_class_path, String real_bolt_class_name)' - def initialize(base_class_path, real_bolt_class_name) - @real_bolt = Object.module_eval(real_bolt_class_name).new - rescue NameError - require base_class_path - @real_bolt = Object.module_eval(real_bolt_class_name).new - end - - java_signature 'void prepare(Map, TopologyContext, BatchOutputCollector, Object)' - def prepare(conf, context, collector, id) - @real_bolt.prepare(conf, context, collector, id) - end - - java_signature 'void execute(Tuple)' - def execute(tuple) - @real_bolt.execute(tuple) - end - - java_signature 'void finishBatch()' - def finishBatch - @real_bolt.finish_batch if @real_bolt.respond_to?(:finish_batch) - end - - java_signature 'void declareOutputFields(OutputFieldsDeclarer)' - def declareOutputFields(declarer) - @real_bolt.declare_output_fields(declarer) - end - - java_signature 'Map getComponentConfiguration()' - def getComponentConfiguration - @real_bolt.get_component_configuration - end -end diff --git a/lib/red_storm/proxy/batch_committer_bolt.rb b/lib/red_storm/proxy/batch_committer_bolt.rb deleted file mode 100644 index b84e0f8..0000000 --- a/lib/red_storm/proxy/batch_committer_bolt.rb +++ /dev/null @@ -1,48 +0,0 @@ -require 'java' - -java_import 'backtype.storm.coordination.BatchOutputCollector' -java_import 'backtype.storm.task.TopologyContext' -java_import 'backtype.storm.coordination.IBatchBolt' -java_import 'backtype.storm.transactional.ICommitter' -java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'backtype.storm.tuple.Tuple' -java_import 'java.util.Map' - -java_package 'redstorm.proxy' - -class BatchCommitterBolt - java_implements 'ICommitter, IBatchBolt' - - java_signature 'IBatchCommitterBolt (String base_class_path, String real_bolt_class_name)' - def initialize(base_class_path, real_bolt_class_name) - @real_bolt = Object.module_eval(real_bolt_class_name).new - rescue NameError - require base_class_path - @real_bolt = Object.module_eval(real_bolt_class_name).new - end - - java_signature 'void prepare(Map, TopologyContext, BatchOutputCollector, Object)' - def prepare(conf, context, collector, id) - @real_bolt.prepare(conf, context, collector, id) - end - - java_signature 'void execute(Tuple)' - def execute(tuple) - @real_bolt.execute(tuple) - end - - java_signature 'void finishBatch()' - def finishBatch - @real_bolt.finish_batch if @real_bolt.respond_to?(:finish_batch) - end - - java_signature 'void declareOutputFields(OutputFieldsDeclarer)' - def declareOutputFields(declarer) - @real_bolt.declare_output_fields(declarer) - end - - java_signature 'Map getComponentConfiguration()' - def getComponentConfiguration - @real_bolt.get_component_configuration - end -end diff --git a/lib/red_storm/proxy/batch_spout.rb b/lib/red_storm/proxy/batch_spout.rb deleted file mode 100644 index 53a7fca..0000000 --- a/lib/red_storm/proxy/batch_spout.rb +++ /dev/null @@ -1,55 +0,0 @@ -require 'java' - -java_import 'backtype.storm.task.TopologyContext' -java_import 'storm.trident.operation.TridentCollector' -java_import 'storm.trident.spout.IBatchSpout' -java_import 'backtype.storm.tuple.Fields' -java_import 'java.util.Map' - -java_package 'redstorm.proxy' - -# the Spout class is a proxy to the real spout to avoid having to deal with all the -# Java artifacts when creating a spout. - -class BatchSpout - java_implements IBatchSpout - - java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)' - def initialize(base_class_path, real_spout_class_name) - @real_spout = Object.module_eval(real_spout_class_name).new - rescue NameError - require base_class_path - @real_spout = Object.module_eval(real_spout_class_name).new - end - - java_signature 'void open(Map, TopologyContext)' - def open(conf, context) - @real_spout.open(conf, context) if @real_spout.respond_to?(:open) - end - - java_signature 'void emitBatch(long, TridentCollector)' - def emitBatch(batch_id, collector) - @real_spout.emit_batch(batch_id, collector) - end - - java_signature 'void close()' - def close - @real_spout.close if @real_spout.respond_to?(:close) - end - - java_signature 'void ack(long)' - def ack(batch_id) - @real_spout.ack(batch_id) if @real_spout.respond_to?(:ack) - end - - java_signature 'Fields getOutputFields()' - def getOutputFields() - @real_spout.get_output_fields - end - - java_signature 'Map getComponentConfiguration()' - def getComponentConfiguration - @real_spout.get_component_configuration - end - -end diff --git a/lib/red_storm/proxy/bolt.rb b/lib/red_storm/proxy/bolt.rb deleted file mode 100644 index af97581..0000000 --- a/lib/red_storm/proxy/bolt.rb +++ /dev/null @@ -1,60 +0,0 @@ -require 'java' - -java_import 'backtype.storm.task.OutputCollector' -java_import 'backtype.storm.task.TopologyContext' -java_import 'backtype.storm.topology.IRichBolt' -java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'backtype.storm.tuple.Tuple' -java_import 'backtype.storm.tuple.Fields' -java_import 'backtype.storm.tuple.Values' -java_import 'java.util.Map' - -java_package 'redstorm.proxy' - -# the Bolt class is a proxy to the real bolt to avoid having to deal with all the -# Java artifacts when creating a bolt. -# -# The real bolt class implementation must define these methods: -# - prepare(conf, context, collector) -# - execute(tuple) -# - declare_output_fields -# -# and optionnaly: -# - cleanup -# -class Bolt - java_implements IRichBolt - - java_signature 'IRichBolt (String base_class_path, String real_bolt_class_name)' - def initialize(base_class_path, real_bolt_class_name) - @real_bolt = Object.module_eval(real_bolt_class_name).new - rescue NameError - require base_class_path - @real_bolt = Object.module_eval(real_bolt_class_name).new - end - - java_signature 'void prepare(Map, TopologyContext, OutputCollector)' - def prepare(conf, context, collector) - @real_bolt.prepare(conf, context, collector) - end - - java_signature 'void execute(Tuple)' - def execute(tuple) - @real_bolt.execute(tuple) - end - - java_signature 'void cleanup()' - def cleanup - @real_bolt.cleanup if @real_bolt.respond_to?(:cleanup) - end - - java_signature 'void declareOutputFields(OutputFieldsDeclarer)' - def declareOutputFields(declarer) - @real_bolt.declare_output_fields(declarer) - end - - java_signature 'Map getComponentConfiguration()' - def getComponentConfiguration - @real_bolt.get_component_configuration - end -end diff --git a/lib/red_storm/proxy/proxy_function.rb b/lib/red_storm/proxy/proxy_function.rb deleted file mode 100644 index 5072e31..0000000 --- a/lib/red_storm/proxy/proxy_function.rb +++ /dev/null @@ -1,36 +0,0 @@ -require 'java' - -java_import 'storm.trident.tuple.TridentTuple' -java_import 'storm.trident.operation.TridentCollector' -java_import 'storm.trident.operation.TridentOperationContext' -java_import 'storm.trident.operation.Function' -java_import 'java.util.Map' - -java_package 'redstorm.proxy' - -class ProxyFunction - java_implements Function - - java_signature 'Function (String base_class_path, String real_class_name)' - def initialize(base_class_path, real_class_name) - @real = Object.module_eval(real_class_name).new - rescue NameError - require base_class_path - @real = Object.module_eval(real_class_name).new - end - - java_signature 'void execute(TridentTuple, TridentCollector)' - def execute(_trident_tuple, _trident_collector) - @real.execute(_trident_tuple, _trident_collector) - end - - java_signature 'void cleanup()' - def cleanup() - @real.cleanup() - end - - java_signature 'void prepare(Map, TridentOperationContext)' - def prepare(_map, _trident_operation_context) - @real.prepare(_map, _trident_operation_context) - end -end diff --git a/lib/red_storm/proxy/spout.rb b/lib/red_storm/proxy/spout.rb deleted file mode 100644 index 670eaac..0000000 --- a/lib/red_storm/proxy/spout.rb +++ /dev/null @@ -1,84 +0,0 @@ -require 'java' - -java_import 'backtype.storm.spout.SpoutOutputCollector' -java_import 'backtype.storm.task.TopologyContext' -java_import 'backtype.storm.topology.IRichSpout' -java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'backtype.storm.tuple.Tuple' -java_import 'backtype.storm.tuple.Fields' -java_import 'backtype.storm.tuple.Values' -java_import 'java.util.Map' - -java_package 'redstorm.proxy' - -# the Spout class is a proxy to the real spout to avoid having to deal with all the -# Java artifacts when creating a spout. -# -# The real spout class implementation must define these methods: -# - open(conf, context, collector) -# - next_tuple -# - declare_output_fields -# -# and optionnaly: -# - ack(msg_id) -# - fail(msg_id) -# - close -# - -class Spout - java_implements IRichSpout - - java_signature 'IRichSpout (String base_class_path, String real_spout_class_name)' - def initialize(base_class_path, real_spout_class_name) - @real_spout = Object.module_eval(real_spout_class_name).new - rescue NameError - require base_class_path - @real_spout = Object.module_eval(real_spout_class_name).new - end - - java_signature 'void open(Map, TopologyContext, SpoutOutputCollector)' - def open(conf, context, collector) - @real_spout.open(conf, context, collector) - end - - java_signature 'void close()' - def close - @real_spout.close if @real_spout.respond_to?(:close) - end - - java_signature 'void activate()' - def activate - @real_spout.activate if @real_spout.respond_to?(:activate) - end - - java_signature 'void deactivate()' - def deactivate - @real_spout.deactivate if @real_spout.respond_to?(:deactivate) - end - - java_signature 'void nextTuple()' - def nextTuple - @real_spout.next_tuple - end - - java_signature 'void ack(Object)' - def ack(msg_id) - @real_spout.ack(msg_id) if @real_spout.respond_to?(:ack) - end - - java_signature 'void fail(Object)' - def fail(msg_id) - @real_spout.fail(msg_id) if @real_spout.respond_to?(:fail) - end - - java_signature 'void declareOutputFields(OutputFieldsDeclarer)' - def declareOutputFields(declarer) - @real_spout.declare_output_fields(declarer) - end - - java_signature 'Map getComponentConfiguration()' - def getComponentConfiguration - @real_spout.get_component_configuration - end - -end diff --git a/lib/red_storm/proxy/transactional_committer_spout.rb b/lib/red_storm/proxy/transactional_committer_spout.rb deleted file mode 100644 index 1ce1be8..0000000 --- a/lib/red_storm/proxy/transactional_committer_spout.rb +++ /dev/null @@ -1,43 +0,0 @@ -require 'java' - -java_import 'backtype.storm.task.TopologyContext' -java_import 'backtype.storm.transactional.ITransactionalSpout' -java_import 'backtype.storm.transactional.ICommitterTransactionalSpout' -java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'java.util.Map' - -java_package 'redstorm.proxy' - - -class TransactionalCommitterSpout - java_implements 'ICommitterTransactionalSpout' - - java_signature 'ICommitterTransactionalSpout (String base_class_path, String real_spout_class_name)' - def initialize(base_class_path, real_spout_class_name) - @real_spout = Object.module_eval(real_spout_class_name).new - rescue NameError - require base_class_path - @real_spout = Object.module_eval(real_spout_class_name).new - end - - java_signature 'ICommitterTransactionalSpout.Emitter getEmitter(Map, TopologyContext)' - def getEmitter(conf, context) - @real_spout.get_emitter(conf, context) - end - - java_signature 'ITransactionalSpout.Coordinator getCoordinator(Map, TopologyContext)' - def getCoordinator(conf, context) - @real_spout.get_coordinator(conf, context) - end - - java_signature 'void declareOutputFields(OutputFieldsDeclarer)' - def declareOutputFields(declarer) - @real_spout.declare_output_fields(declarer) - end - - java_signature 'Map getComponentConfiguration()' - def getComponentConfiguration - @real_spout.get_component_configuration - end - -end \ No newline at end of file diff --git a/lib/red_storm/proxy/transactional_spout.rb b/lib/red_storm/proxy/transactional_spout.rb deleted file mode 100644 index 7b38229..0000000 --- a/lib/red_storm/proxy/transactional_spout.rb +++ /dev/null @@ -1,41 +0,0 @@ -require 'java' - -java_import 'backtype.storm.task.TopologyContext' -java_import 'backtype.storm.transactional.ITransactionalSpout' -java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'java.util.Map' - -java_package 'redstorm.proxy' - -class TransactionalSpout - java_implements 'ITransactionalSpout' - - java_signature 'ITransactionalSpout (String base_class_path, String real_spout_class_name)' - def initialize(base_class_path, real_spout_class_name) - @real_spout = Object.module_eval(real_spout_class_name).new - rescue NameError - require base_class_path - @real_spout = Object.module_eval(real_spout_class_name).new - end - - java_signature 'ITransactionalSpout.Emitter getEmitter(Map, TopologyContext)' - def getEmitter(conf, context) - @real_spout.get_emitter(conf, context) - end - - java_signature 'ITransactionalSpout.Coordinator getCoordinator(Map, TopologyContext)' - def getCoordinator(conf, context) - @real_spout.get_coordinator(conf, context) - end - - java_signature 'void declareOutputFields(OutputFieldsDeclarer)' - def declareOutputFields(declarer) - @real_spout.declare_output_fields(declarer) - end - - java_signature 'Map getComponentConfiguration()' - def getComponentConfiguration - @real_spout.get_component_configuration - end - -end \ No newline at end of file diff --git a/lib/red_storm/topology_launcher.rb b/lib/red_storm/topology_launcher.rb index 62870e4..88ea7d3 100644 --- a/lib/red_storm/topology_launcher.rb +++ b/lib/red_storm/topology_launcher.rb @@ -20,14 +20,14 @@ java_import 'backtype.storm.tuple.Fields' java_import 'backtype.storm.tuple.Tuple' java_import 'backtype.storm.tuple.Values' -java_import 'redstorm.storm.jruby.JRubyBolt' -java_import 'redstorm.storm.jruby.JRubySpout' -java_import 'redstorm.storm.jruby.JRubyBatchBolt' -java_import 'redstorm.storm.jruby.JRubyBatchCommitterBolt' -java_import 'redstorm.storm.jruby.JRubyBatchSpout' -java_import 'redstorm.storm.jruby.JRubyTransactionalSpout' -java_import 'redstorm.storm.jruby.JRubyTransactionalBolt' -java_import 'redstorm.storm.jruby.JRubyTransactionalCommitterBolt' +# java_import 'redstorm.storm.jruby.JRubyBolt' +# java_import 'redstorm.storm.jruby.JRubySpout' +# java_import 'redstorm.storm.jruby.JRubyBatchBolt' +# java_import 'redstorm.storm.jruby.JRubyBatchCommitterBolt' +# java_import 'redstorm.storm.jruby.JRubyBatchSpout' +# java_import 'redstorm.storm.jruby.JRubyTransactionalSpout' +# java_import 'redstorm.storm.jruby.JRubyTransactionalBolt' +# java_import 'redstorm.storm.jruby.JRubyTransactionalCommitterBolt' java_package 'redstorm' diff --git a/src/main/redstorm/storm/jruby/JRubyBatchBolt.java b/src/main/redstorm/storm/jruby/JRubyBatchBolt.java index f46e57c..91e79e6 100644 --- a/src/main/redstorm/storm/jruby/JRubyBatchBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyBatchBolt.java @@ -10,81 +10,99 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import java.util.Map; +import org.jruby.Ruby; +import org.jruby.RubyObject; +import org.jruby.runtime.Helpers; +import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.javasupport.JavaUtil; +import org.jruby.RubyModule; +import org.jruby.exceptions.RaiseException; + /** - * the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby. - * this proxy is required to bypass the serialization/deserialization process when dispatching - * the bolts to the workers. JRuby does not yet support serialization from Java - * (Java serialization call on a JRuby class). + * the JRubyBolt class is a proxy class to the actual bolt implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization issues of JRuby objects. + * JRuby classes do not support Java serialization. * - * Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after - * deserialization at the worker and in the declareOutputFields method which is called once before - * serialization at topology creation. + * Note that the JRuby bolt class is instanciated in the prepare method which is called after + * deserialization at the worker and in the declareOutputFields & getComponentConfiguration + * methods which are called once before serialization at topology creation. */ public class JRubyBatchBolt extends BaseBatchBolt { - IBatchBolt _proxyBolt; - String _realBoltClassName; - String _baseClassPath; - String[] _fields; + private final String _realBoltClassName; + private final String[] _fields; + private final String _bootstrap; + + // transient to avoid serialization + private transient IRubyObject _ruby_bolt; + private transient Ruby __ruby__; /** * create a new JRubyBolt - * - * @param baseClassPath the topology/project base JRuby class file path + * + * @param baseClassPath the topology/project base JRuby class file path * @param realBoltClassName the fully qualified JRuby bolt implementation class name + * @param fields the output fields names */ public JRubyBatchBolt(String baseClassPath, String realBoltClassName, String[] fields) { - _baseClassPath = baseClassPath; _realBoltClassName = realBoltClassName; _fields = fields; + _bootstrap = "require '" + baseClassPath + "'"; } @Override - public void prepare(final Map stormConf, final TopologyContext context, final BatchOutputCollector collector, final Object id) { - // create instance of the jruby class here, after deserialization in the workers. - _proxyBolt = newProxyBolt(_baseClassPath, _realBoltClassName); - _proxyBolt.prepare(stormConf, context, collector, id); + public void prepare(final Map conf, final TopologyContext context, final BatchOutputCollector collector, final Object id) { + _ruby_bolt = initialize_ruby_bolt(); + IRubyObject ruby_conf = JavaUtil.convertJavaToRuby(__ruby__, conf); + IRubyObject ruby_context = JavaUtil.convertJavaToRuby(__ruby__, context); + IRubyObject ruby_collector = JavaUtil.convertJavaToRuby(__ruby__, collector); + IRubyObject ruby_id = JavaUtil.convertJavaToRuby(__ruby__, id); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "prepare", ruby_conf, ruby_context, ruby_collector, ruby_id); } @Override public void execute(Tuple input) { - _proxyBolt.execute(input); + IRubyObject ruby_input = JavaUtil.convertJavaToRuby(__ruby__, input); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "execute", ruby_input); } @Override public void finishBatch() { - _proxyBolt.finishBatch(); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "finish_batch"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - // declareOutputFields is executed in the topology creation time, before serialisation. - // do not set the _proxyBolt instance variable here to avoid JRuby serialization - // issues. Just create tmp bolt instance to call declareOutputFields. if (_fields.length > 0) { declarer.declare(new Fields(_fields)); } else { - IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); - bolt.declareOutputFields(declarer); + IRubyObject ruby_bolt = initialize_ruby_bolt(); + IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer); + Helpers.invoke(__ruby__.getCurrentContext(), ruby_bolt, "declare_output_fields", ruby_declarer); } } @Override public Map getComponentConfiguration() { // getComponentConfiguration is executed in the topology creation time, before serialisation. - // do not set the _proxyBolt instance variable here to avoid JRuby serialization - // issues. Just create tmp bolt instance to call declareOutputFields. - IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); - return bolt.getComponentConfiguration(); - } - + // just create tmp bolt instance to call getComponentConfiguration. - private static IBatchBolt newProxyBolt(String baseClassPath, String realBoltClassName) { + IRubyObject ruby_bolt = initialize_ruby_bolt(); + IRubyObject ruby_result = Helpers.invoke(__ruby__.getCurrentContext(), ruby_bolt, "get_component_configuration"); + return (Map)ruby_result.toJava(Map.class); + } + + private IRubyObject initialize_ruby_bolt() { + __ruby__ = Ruby.getGlobalRuntime(); + + RubyModule ruby_class; try { - redstorm.proxy.BatchBolt proxy = new redstorm.proxy.BatchBolt(baseClassPath, realBoltClassName); - return proxy; + ruby_class = __ruby__.getClassFromPath(_realBoltClassName); } - catch (Exception e) { - throw new RuntimeException(e); + catch (RaiseException e) { + // after deserialization we need to recreate ruby environment + __ruby__.evalScriptlet(_bootstrap); + ruby_class = __ruby__.getClassFromPath(_realBoltClassName); } + return Helpers.invoke(__ruby__.getCurrentContext(), ruby_class, "new"); } } diff --git a/src/main/redstorm/storm/jruby/JRubyBatchSpout.java b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java index 5fca722..40aea23 100644 --- a/src/main/redstorm/storm/jruby/JRubyBatchSpout.java +++ b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java @@ -6,83 +6,118 @@ import storm.trident.operation.TridentCollector; import storm.trident.spout.IBatchSpout; import java.util.Map; +import org.jruby.Ruby; +import org.jruby.RubyObject; +import org.jruby.runtime.Helpers; +import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.javasupport.JavaUtil; +import org.jruby.RubyModule; +import org.jruby.exceptions.RaiseException; + /** - * the JRubySpout class is a simple proxy class to the actual spout implementation in JRuby. - * this proxy is required to bypass the serialization/deserialization process when dispatching - * the spout to the workers. JRuby does not yet support serialization from Java - * (Java serialization call on a JRuby class). + * the JRubySpout class is a proxy class to the actual spout implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization issues of JRuby objects. + * JRuby classes do not support Java serialization. * - * Note that the JRuby spout proxy class is instanciated in the open method which is called after - * deserialization at the worker and in both the declareOutputFields and isDistributed methods which - * are called once before serialization at topology creation. + * Note that the JRuby spout class is instanciated in the open method which is called after + * deserialization at the worker and in the declareOutputFields & getComponentConfiguration + * methods which are called once before serialization at topology creation. */ public class JRubyBatchSpout implements IBatchSpout { - IBatchSpout _proxySpout; - String _realSpoutClassName; - String _baseClassPath; - + private final String _realSpoutClassName; + private final String[] _fields; + private final String _bootstrap; + + // transient to avoid serialization + private transient IRubyObject _ruby_spout; + private transient Ruby __ruby__; + /** - * create a new JRubySpout - * - * @param baseClassPath the topology/project base JRuby class file path + * create a new JRubyBatchSpout + * + * @param baseClassPath the topology/project base JRuby class file path * @param realSpoutClassName the fully qualified JRuby spout implementation class name + * @param fields the output fields names */ public JRubyBatchSpout(String baseClassPath, String realSpoutClassName) { - _baseClassPath = baseClassPath; _realSpoutClassName = realSpoutClassName; + _fields = null; + _bootstrap = "require '" + baseClassPath + "'"; + } + + /* constructor for compatibility with JRubySpout signature */ + public JRubyBatchSpout(String baseClassPath, String realSpoutClassName, String[] fields) { + _realSpoutClassName = realSpoutClassName; + _fields = fields; + _bootstrap = "require '" + baseClassPath + "'"; } @Override public void open(final Map conf, final TopologyContext context) { - // create instance of the jruby proxy class here, after deserialization in the workers. - _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); - _proxySpout.open(conf, context); + // // create instance of the jruby proxy class here, after deserialization in the workers. + // _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); + // _proxySpout.open(conf, context); + + + _ruby_spout = initialize_ruby_spout(); + IRubyObject ruby_conf = JavaUtil.convertJavaToRuby(__ruby__, conf); + IRubyObject ruby_context = JavaUtil.convertJavaToRuby(__ruby__, context); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "open", ruby_conf, ruby_context); } @Override public void emitBatch(long batchId, TridentCollector collector) { - _proxySpout.emitBatch(batchId, collector); - } + // _proxySpout.emitBatch(batchId, collector); + + IRubyObject ruby_batch_id = JavaUtil.convertJavaToRuby(__ruby__, batchId); + IRubyObject ruby_collector = JavaUtil.convertJavaToRuby(__ruby__, collector); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "emit_batch", ruby_batch_id, ruby_collector); + } @Override public void close() { - _proxySpout.close(); + // _proxySpout.close(); + + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "close"); } @Override public void ack(long batchId) { - _proxySpout.ack(batchId); + // _proxySpout.ack(batchId); + + IRubyObject ruby_batch_id = JavaUtil.convertJavaToRuby(__ruby__, batchId); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "ack", ruby_batch_id); } @Override public Fields getOutputFields() { - if (_proxySpout == null) { - // getOutputFields is executed in the topology creation time before serialisation. - // do not set the _proxySpout instance variable here to avoid JRuby serialization - // issues. Just create tmp spout instance to call getOutputFields. - IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); - return spout.getOutputFields(); - } else { - return _proxySpout.getOutputFields(); - } + IRubyObject ruby_spout = initialize_ruby_spout(); + IRubyObject ruby_result = Helpers.invoke(__ruby__.getCurrentContext(), ruby_spout, "get_output_fields"); + return (Fields)ruby_result.toJava(Fields.class); } @Override public Map getComponentConfiguration() { - // getComponentConfiguration is executed in the topology creation time before serialisation. - // do not set the _proxySpout instance variable here to avoid JRuby serialization - // issues. Just create tmp spout instance to call declareOutputFields. - IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); - return spout.getComponentConfiguration(); + // getComponentConfiguration is executed in the topology creation time, before serialisation. + // just create tmp spout instance to call getComponentConfiguration. + + IRubyObject ruby_spout = initialize_ruby_spout(); + IRubyObject ruby_result = Helpers.invoke(__ruby__.getCurrentContext(), ruby_spout, "get_component_configuration"); + return (Map)ruby_result.toJava(Map.class); } - - private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) { + + private IRubyObject initialize_ruby_spout() { + __ruby__ = Ruby.getGlobalRuntime(); + + RubyModule ruby_class; try { - redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName); - return proxy; + ruby_class = __ruby__.getClassFromPath(_realSpoutClassName); } - catch (Exception e) { - throw new RuntimeException(e); + catch (RaiseException e) { + // after deserialization we need to recreate ruby environment + __ruby__.evalScriptlet(_bootstrap); + ruby_class = __ruby__.getClassFromPath(_realSpoutClassName); } + return Helpers.invoke(__ruby__.getCurrentContext(), ruby_class, "new"); } } diff --git a/src/main/redstorm/storm/jruby/JRubyBolt.java b/src/main/redstorm/storm/jruby/JRubyBolt.java index 7a9b0c7..a4106e5 100644 --- a/src/main/redstorm/storm/jruby/JRubyBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyBolt.java @@ -19,13 +19,12 @@ import org.jruby.exceptions.RaiseException; /** * the JRubyBolt class is a proxy class to the actual bolt implementation in JRuby. * this proxy is required to bypass the serialization/deserialization issues of JRuby objects. - * JRuby classes are not support Java serialization. + * JRuby classes do not support Java serialization. * * Note that the JRuby bolt class is instanciated in the prepare method which is called after * deserialization at the worker and in the declareOutputFields & getComponentConfiguration * methods which are called once before serialization at topology creation. */ - public class JRubyBolt implements IRichBolt { private final String _realBoltClassName; private final String[] _fields; diff --git a/src/main/redstorm/storm/jruby/JRubyProxyFunction.java b/src/main/redstorm/storm/jruby/JRubyProxyFunction.java index e646eed..65c3041 100644 --- a/src/main/redstorm/storm/jruby/JRubyProxyFunction.java +++ b/src/main/redstorm/storm/jruby/JRubyProxyFunction.java @@ -6,46 +6,64 @@ import java.util.Map; import storm.trident.operation.TridentOperationContext; import storm.trident.operation.Function; +import org.jruby.Ruby; +import org.jruby.RubyObject; +import org.jruby.runtime.Helpers; +import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.javasupport.JavaUtil; +import org.jruby.RubyModule; +import org.jruby.exceptions.RaiseException; + public class JRubyProxyFunction implements Function { - Function _proxy; - String _realClassName; - String _baseClassPath; + private final String _realClassName; + private final String _bootstrap; + + // transient to avoid serialization + private transient IRubyObject _proxy; + private transient Ruby __ruby__; public JRubyProxyFunction(final String baseClassPath, final String realClassName) { - _baseClassPath = baseClassPath; _realClassName = realClassName; + _bootstrap = "require '" + baseClassPath + "'"; } - @Override - public void execute(final TridentTuple _tridentTuple, final TridentCollector _tridentCollector) { + public void execute(final TridentTuple tuple, final TridentCollector collector) { if(_proxy == null) { - _proxy = newProxy(_baseClassPath, _realClassName); + _proxy = initialize_proxy(); } - _proxy.execute(_tridentTuple, _tridentCollector); + IRubyObject ruby_tuple = JavaUtil.convertJavaToRuby(__ruby__, tuple); + IRubyObject ruby_collector = JavaUtil.convertJavaToRuby(__ruby__, collector); + Helpers.invoke(__ruby__.getCurrentContext(), _proxy, "execute", ruby_tuple, ruby_collector); } @Override public void cleanup() { - _proxy.cleanup(); + Helpers.invoke(__ruby__.getCurrentContext(), _proxy, "cleanup"); } @Override - public void prepare(final Map _map, final TridentOperationContext _tridentOperationContext) { + public void prepare(final Map conf, final TridentOperationContext context) { if(_proxy == null) { - _proxy = newProxy(_baseClassPath, _realClassName); + _proxy = initialize_proxy(); } - _proxy.prepare(_map, _tridentOperationContext); + IRubyObject ruby_conf = JavaUtil.convertJavaToRuby(__ruby__, conf); + IRubyObject ruby_context = JavaUtil.convertJavaToRuby(__ruby__, context); + Helpers.invoke(__ruby__.getCurrentContext(), _proxy, "prepare", ruby_conf, ruby_context); } + private IRubyObject initialize_proxy() { + __ruby__ = Ruby.getGlobalRuntime(); - private static Function newProxy(final String baseClassPath, final String realClassName) { + RubyModule ruby_class; try { - redstorm.proxy.ProxyFunction proxy = new redstorm.proxy.ProxyFunction(baseClassPath, realClassName); - return proxy; + ruby_class = __ruby__.getClassFromPath(_realClassName); } - catch (Exception e) { - throw new RuntimeException(e); + catch (RaiseException e) { + // after deserialization we need to recreate ruby environment + __ruby__.evalScriptlet(_bootstrap); + ruby_class = __ruby__.getClassFromPath(_realClassName); } + return Helpers.invoke(__ruby__.getCurrentContext(), ruby_class, "new"); } } diff --git a/src/main/redstorm/storm/jruby/JRubySpout.java b/src/main/redstorm/storm/jruby/JRubySpout.java index 828d3f9..b30d292 100644 --- a/src/main/redstorm/storm/jruby/JRubySpout.java +++ b/src/main/redstorm/storm/jruby/JRubySpout.java @@ -8,100 +8,122 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import java.util.Map; +import org.jruby.Ruby; +import org.jruby.RubyObject; +import org.jruby.runtime.Helpers; +import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.javasupport.JavaUtil; +import org.jruby.RubyModule; +import org.jruby.exceptions.RaiseException; + /** - * the JRubySpout class is a simple proxy class to the actual spout implementation in JRuby. - * this proxy is required to bypass the serialization/deserialization process when dispatching - * the spout to the workers. JRuby does not yet support serialization from Java - * (Java serialization call on a JRuby class). + * the JRubySpout class is a proxy class to the actual spout implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization issues of JRuby objects. + * JRuby classes do not support Java serialization. * - * Note that the JRuby spout proxy class is instanciated in the open method which is called after - * deserialization at the worker and in both the declareOutputFields and isDistributed methods which - * are called once before serialization at topology creation. + * Note that the JRuby spout class is instanciated in the open method which is called after + * deserialization at the worker and in the declareOutputFields & getComponentConfiguration + * methods which are called once before serialization at topology creation. */ public class JRubySpout implements IRichSpout { - IRichSpout _proxySpout; - String _realSpoutClassName; - String _baseClassPath; - String[] _fields; + private final String _realSpoutClassName; + private final String[] _fields; + private final String _bootstrap; + + // transient to avoid serialization + private transient IRubyObject _ruby_spout; + private transient Ruby __ruby__; /** * create a new JRubySpout - * - * @param baseClassPath the topology/project base JRuby class file path + * + * @param baseClassPath the topology/project base JRuby class file path * @param realSpoutClassName the fully qualified JRuby spout implementation class name + * @param fields the output fields names */ public JRubySpout(String baseClassPath, String realSpoutClassName, String[] fields) { - _baseClassPath = baseClassPath; _realSpoutClassName = realSpoutClassName; _fields = fields; + _bootstrap = "require '" + baseClassPath + "'"; } @Override public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) { - // create instance of the jruby proxy class here, after deserialization in the workers. - _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); - _proxySpout.open(conf, context, collector); + _ruby_spout = initialize_ruby_spout(); + IRubyObject ruby_conf = JavaUtil.convertJavaToRuby(__ruby__, conf); + IRubyObject ruby_context = JavaUtil.convertJavaToRuby(__ruby__, context); + IRubyObject ruby_collector = JavaUtil.convertJavaToRuby(__ruby__, collector); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "open", ruby_conf, ruby_context, ruby_collector); } @Override public void close() { - _proxySpout.close(); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "close"); } @Override public void activate() { - _proxySpout.activate(); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "activate"); } @Override public void deactivate() { - _proxySpout.deactivate(); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "deactivate"); } @Override public void nextTuple() { - _proxySpout.nextTuple(); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "next_tuple"); } @Override public void ack(Object msgId) { - _proxySpout.ack(msgId); + IRubyObject ruby_msg_id = JavaUtil.convertJavaToRuby(__ruby__, msgId); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "ack", ruby_msg_id); } @Override public void fail(Object msgId) { - _proxySpout.fail(msgId); + IRubyObject ruby_msg_id = JavaUtil.convertJavaToRuby(__ruby__, msgId); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "fail", ruby_msg_id); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - // declareOutputFields is executed in the topology creation time before serialisation. - // do not set the _proxySpout instance variable here to avoid JRuby serialization - // issues. Just create tmp spout instance to call declareOutputFields. + // declareOutputFields is executed in the topology creation time, before serialisation. + // just create tmp spout instance to call declareOutputFields. + if (_fields.length > 0) { declarer.declare(new Fields(_fields)); } else { - IRichSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); - spout.declareOutputFields(declarer); + IRubyObject ruby_spout = initialize_ruby_spout(); + IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer); + Helpers.invoke(__ruby__.getCurrentContext(), ruby_spout, "declare_output_fields", ruby_declarer); } - } + } @Override public Map getComponentConfiguration() { - // getComponentConfiguration is executed in the topology creation time before serialisation. - // do not set the _proxySpout instance variable here to avoid JRuby serialization - // issues. Just create tmp spout instance to call declareOutputFields. - IRichSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); - return spout.getComponentConfiguration(); + // getComponentConfiguration is executed in the topology creation time, before serialisation. + // just create tmp spout instance to call getComponentConfiguration. + + IRubyObject ruby_spout = initialize_ruby_spout(); + IRubyObject ruby_result = Helpers.invoke(__ruby__.getCurrentContext(), ruby_spout, "get_component_configuration"); + return (Map)ruby_result.toJava(Map.class); } - - private static IRichSpout newProxySpout(String baseClassPath, String realSpoutClassName) { + + private IRubyObject initialize_ruby_spout() { + __ruby__ = Ruby.getGlobalRuntime(); + + RubyModule ruby_class; try { - redstorm.proxy.Spout proxy = new redstorm.proxy.Spout(baseClassPath, realSpoutClassName); - return proxy; + ruby_class = __ruby__.getClassFromPath(_realSpoutClassName); } - catch (Exception e) { - throw new RuntimeException(e); + catch (RaiseException e) { + // after deserialization we need to recreate ruby environment + __ruby__.evalScriptlet(_bootstrap); + ruby_class = __ruby__.getClassFromPath(_realSpoutClassName); } + return Helpers.invoke(__ruby__.getCurrentContext(), ruby_class, "new"); } } diff --git a/src/main/redstorm/storm/jruby/JRubyTransactionalBolt.java b/src/main/redstorm/storm/jruby/JRubyTransactionalBolt.java index 7f162e5..fce0091 100644 --- a/src/main/redstorm/storm/jruby/JRubyTransactionalBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyTransactionalBolt.java @@ -11,80 +11,102 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import java.util.Map; +import org.jruby.Ruby; +import org.jruby.RubyObject; +import org.jruby.runtime.Helpers; +import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.javasupport.JavaUtil; +import org.jruby.RubyModule; +import org.jruby.exceptions.RaiseException; + /** - * the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby. - * this proxy is required to bypass the serialization/deserialization process when dispatching - * the bolts to the workers. JRuby does not yet support serialization from Java - * (Java serialization call on a JRuby class). + * the JRubyBolt class is a proxy class to the actual bolt implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization issues of JRuby objects. + * JRuby classes do not support Java serialization. * - * Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after - * deserialization at the worker and in the declareOutputFields method which is called once before - * serialization at topology creation. + * Note that the JRuby bolt class is instanciated in the prepare method which is called after + * deserialization at the worker and in the declareOutputFields & getComponentConfiguration + * methods which are called once before serialization at topology creation. */ public class JRubyTransactionalBolt extends BaseTransactionalBolt { - IBatchBolt _proxyBolt; - String _realBoltClassName; - String _baseClassPath; - String[] _fields; + private final String _realBoltClassName; + private final String[] _fields; + private final String _bootstrap; - /** + // transient to avoid serialization + protected transient IRubyObject _ruby_bolt; + protected transient Ruby __ruby__; + + /** * create a new JRubyBolt - * - * @param baseClassPath the topology/project base JRuby class file path + * + * @param baseClassPath the topology/project base JRuby class file path * @param realBoltClassName the fully qualified JRuby bolt implementation class name + * @param fields the output fields names */ public JRubyTransactionalBolt(String baseClassPath, String realBoltClassName, String[] fields) { - _baseClassPath = baseClassPath; _realBoltClassName = realBoltClassName; _fields = fields; + _bootstrap = "require '" + baseClassPath + "'"; } @Override - public void prepare(final Map stormConf, final TopologyContext context, final BatchOutputCollector collector, final TransactionAttempt id) { - // create instance of the jruby class here, after deserialization in the workers. - _proxyBolt = newProxyBolt(_baseClassPath, _realBoltClassName); - _proxyBolt.prepare(stormConf, context, collector, id); + public void prepare(final Map conf, final TopologyContext context, final BatchOutputCollector collector, final TransactionAttempt id) { + _ruby_bolt = initialize_ruby_bolt(); + IRubyObject ruby_conf = JavaUtil.convertJavaToRuby(__ruby__, conf); + IRubyObject ruby_context = JavaUtil.convertJavaToRuby(__ruby__, context); + IRubyObject ruby_collector = JavaUtil.convertJavaToRuby(__ruby__, collector); + IRubyObject ruby_id = JavaUtil.convertJavaToRuby(__ruby__, id); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "prepare", ruby_conf, ruby_context, ruby_collector, ruby_id); } @Override public void execute(Tuple input) { - _proxyBolt.execute(input); + IRubyObject ruby_input = JavaUtil.convertJavaToRuby(__ruby__, input); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "execute", ruby_input); } @Override public void finishBatch() { - _proxyBolt.finishBatch(); + Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "finish_batch"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // declareOutputFields is executed in the topology creation time, before serialisation. - // do not set the _proxyBolt instance variable here to avoid JRuby serialization - // issues. Just create tmp bolt instance to call declareOutputFields. + // just create tmp bolt instance to call declareOutputFields. + if (_fields.length > 0) { declarer.declare(new Fields(_fields)); } else { - IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); - bolt.declareOutputFields(declarer); + IRubyObject ruby_bolt = initialize_ruby_bolt(); + IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer); + Helpers.invoke(__ruby__.getCurrentContext(), ruby_bolt, "declare_output_fields", ruby_declarer); } } @Override public Map getComponentConfiguration() { // getComponentConfiguration is executed in the topology creation time, before serialisation. - // do not set the _proxyBolt instance variable here to avoid JRuby serialization - // issues. Just create tmp bolt instance to call declareOutputFields. - IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); - return bolt.getComponentConfiguration(); + // just create tmp bolt instance to call getComponentConfiguration. + + IRubyObject ruby_bolt = initialize_ruby_bolt(); + IRubyObject ruby_result = Helpers.invoke(__ruby__.getCurrentContext(), ruby_bolt, "get_component_configuration"); + return (Map)ruby_result.toJava(Map.class); } - - private static IBatchBolt newProxyBolt(String baseClassPath, String realBoltClassName) { + + protected IRubyObject initialize_ruby_bolt() { + __ruby__ = Ruby.getGlobalRuntime(); + + RubyModule ruby_class; try { - redstorm.proxy.BatchBolt proxy = new redstorm.proxy.BatchBolt(baseClassPath, realBoltClassName); - return proxy; + ruby_class = __ruby__.getClassFromPath(_realBoltClassName); } - catch (Exception e) { - throw new RuntimeException(e); + catch (RaiseException e) { + // after deserialization we need to recreate ruby environment + __ruby__.evalScriptlet(_bootstrap); + ruby_class = __ruby__.getClassFromPath(_realBoltClassName); } + return Helpers.invoke(__ruby__.getCurrentContext(), ruby_class, "new"); } } diff --git a/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterBolt.java b/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterBolt.java index 008f722..dd6733e 100644 --- a/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterBolt.java @@ -5,27 +5,16 @@ import backtype.storm.transactional.TransactionAttempt; import backtype.storm.transactional.ICommitter; /** - * the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby. - * this proxy is required to bypass the serialization/deserialization process when dispatching - * the bolts to the workers. JRuby does not yet support serialization from Java - * (Java serialization call on a JRuby class). + * the JRubyBolt class is a proxy class to the actual bolt implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization issues of JRuby objects. + * JRuby classes do not support Java serialization. * - * Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after - * deserialization at the worker and in the declareOutputFields method which is called once before - * serialization at topology creation. + * Note that the JRuby bolt class is instanciated in the prepare method which is called after + * deserialization at the worker and in the declareOutputFields & getComponentConfiguration + * methods which are called once before serialization at topology creation. */ public class JRubyTransactionalCommitterBolt extends JRubyTransactionalBolt implements ICommitter { public JRubyTransactionalCommitterBolt(String baseClassPath, String realBoltClassName, String[] fields) { super(baseClassPath, realBoltClassName, fields); } - - private static IBatchBolt newProxyBolt(String baseClassPath, String realBoltClassName) { - try { - redstorm.proxy.BatchCommitterBolt proxy = new redstorm.proxy.BatchCommitterBolt(baseClassPath, realBoltClassName); - return proxy; - } - catch (Exception e) { - throw new RuntimeException(e); - } - } } \ No newline at end of file diff --git a/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterSpout.java b/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterSpout.java index 633c1f9..55fa876 100644 --- a/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterSpout.java +++ b/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterSpout.java @@ -5,40 +5,28 @@ import backtype.storm.transactional.ITransactionalSpout; import backtype.storm.task.TopologyContext; import java.util.Map; -/** - * the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby. - * this proxy is required to bypass the serialization/deserialization process when dispatching - * the bolts to the workers. JRuby does not yet support serialization from Java - * (Java serialization call on a JRuby class). - * - * Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after - * deserialization at the worker and in the declareOutputFields method which is called once before - * serialization at topology creation. - */ +import org.jruby.Ruby; +import org.jruby.RubyObject; +import org.jruby.runtime.Helpers; +import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.javasupport.JavaUtil; +import org.jruby.RubyModule; +import org.jruby.exceptions.RaiseException; + public class JRubyTransactionalCommitterSpout extends JRubyTransactionalSpout implements ICommitterTransactionalSpout { - ICommitterTransactionalSpout _proxySpout; - public JRubyTransactionalCommitterSpout(String baseClassPath, String realSpoutClassName, String[] fields) { super(baseClassPath, realSpoutClassName, fields); } @Override public ICommitterTransactionalSpout.Emitter getEmitter(Map conf, TopologyContext context) { - // create instance of the jruby class here, after deserialization in the workers. - if (_proxySpout == null) { - _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); - } - return _proxySpout.getEmitter(conf, context); - } - - private static ICommitterTransactionalSpout newProxySpout(String baseClassPath, String realSpoutClassName) { - try { - redstorm.proxy.TransactionalCommitterSpout proxy = new redstorm.proxy.TransactionalCommitterSpout(baseClassPath, realSpoutClassName); - return proxy; - } - catch (Exception e) { - throw new RuntimeException(e); + if (_ruby_spout == null) { + IRubyObject _ruby_spout = initialize_ruby_spout(); } + IRubyObject ruby_conf = JavaUtil.convertJavaToRuby(__ruby__, conf); + IRubyObject ruby_context = JavaUtil.convertJavaToRuby(__ruby__, context); + IRubyObject ruby_result = Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "get_emitter", ruby_conf, ruby_context); + return (ICommitterTransactionalSpout.Emitter)ruby_result.toJava(ICommitterTransactionalSpout.Emitter.class); } } \ No newline at end of file diff --git a/src/main/redstorm/storm/jruby/JRubyTransactionalSpout.java b/src/main/redstorm/storm/jruby/JRubyTransactionalSpout.java index c1fef76..072136b 100644 --- a/src/main/redstorm/storm/jruby/JRubyTransactionalSpout.java +++ b/src/main/redstorm/storm/jruby/JRubyTransactionalSpout.java @@ -9,81 +9,104 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import java.util.Map; +import org.jruby.Ruby; +import org.jruby.RubyObject; +import org.jruby.runtime.Helpers; +import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.javasupport.JavaUtil; +import org.jruby.RubyModule; +import org.jruby.exceptions.RaiseException; + /** * the JRubySpout class is a simple proxy class to the actual spout implementation in JRuby. * this proxy is required to bypass the serialization/deserialization process when dispatching * the spout to the workers. JRuby does not yet support serialization from Java - * (Java serialization call on a JRuby class). + * (Java serialization call on a JRuby class). * - * Note that the JRuby spout proxy class is instanciated in the open method which is called after - * deserialization at the worker and in both the declareOutputFields and isDistributed methods which - * are called once before serialization at topology creation. + * Note that the JRuby spout proxy class is instanciated in the open method which is called after + * deserialization at the worker and in both the declareOutputFields and isDistributed methods which + * are called once before serialization at topology creation. */ public class JRubyTransactionalSpout extends BaseTransactionalSpout { - ITransactionalSpout _proxySpout; - String _realSpoutClassName; - String _baseClassPath; - String[] _fields; - + private final String _realSpoutClassName; + private final String[] _fields; + private final String _bootstrap; + + // transient to avoid serialization + protected transient IRubyObject _ruby_spout; + protected transient Ruby __ruby__; + /** * create a new JRubySpout - * - * @param baseClassPath the topology/project base JRuby class file path + * + * @param baseClassPath the topology/project base JRuby class file path * @param realSpoutClassName the fully qualified JRuby spout implementation class name + * @param fields the output fields names */ public JRubyTransactionalSpout(String baseClassPath, String realSpoutClassName, String[] fields) { - _baseClassPath = baseClassPath; _realSpoutClassName = realSpoutClassName; _fields = fields; + _bootstrap = "require '" + baseClassPath + "'"; } @Override public ITransactionalSpout.Coordinator getCoordinator(Map conf, TopologyContext context) { - // create instance of the jruby class here, after deserialization in the workers. - if (_proxySpout == null) { - _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); + if (_ruby_spout == null) { + IRubyObject _ruby_spout = initialize_ruby_spout(); } - return _proxySpout.getCoordinator(conf, context); + IRubyObject ruby_conf = JavaUtil.convertJavaToRuby(__ruby__, conf); + IRubyObject ruby_context = JavaUtil.convertJavaToRuby(__ruby__, context); + IRubyObject ruby_result = Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "get_coordinator", ruby_conf, ruby_context); + return (ITransactionalSpout.Coordinator)ruby_result.toJava(ITransactionalSpout.Coordinator.class); } @Override public ITransactionalSpout.Emitter getEmitter(Map conf, TopologyContext context) { - // create instance of the jruby class here, after deserialization in the workers. - if (_proxySpout == null) { - _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); + if (_ruby_spout == null) { + IRubyObject _ruby_spout = initialize_ruby_spout(); } - return _proxySpout.getEmitter(conf, context); + IRubyObject ruby_conf = JavaUtil.convertJavaToRuby(__ruby__, conf); + IRubyObject ruby_context = JavaUtil.convertJavaToRuby(__ruby__, context); + IRubyObject ruby_result = Helpers.invoke(__ruby__.getCurrentContext(), _ruby_spout, "get_emitter", ruby_conf, ruby_context); + return (ITransactionalSpout.Emitter)ruby_result.toJava(ITransactionalSpout.Emitter.class); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - // declareOutputFields is executed in the topology creation time before serialisation. - // do not set the _proxySpout instance variable here to avoid JRuby serialization - // issues. Just create tmp spout instance to call declareOutputFields. + // declareOutputFields is executed in the topology creation time, before serialisation. + // just create tmp spout instance to call declareOutputFields. + if (_fields.length > 0) { declarer.declare(new Fields(_fields)); } else { - ITransactionalSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); - spout.declareOutputFields(declarer); + IRubyObject ruby_spout = initialize_ruby_spout(); + IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer); + Helpers.invoke(__ruby__.getCurrentContext(), ruby_spout, "declare_output_fields", ruby_declarer); } - } + } @Override public Map getComponentConfiguration() { - // getComponentConfiguration is executed in the topology creation time before serialisation. - // do not set the _proxySpout instance variable here to avoid JRuby serialization - // issues. Just create tmp spout instance to call declareOutputFields. - ITransactionalSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); - return spout.getComponentConfiguration(); + // getComponentConfiguration is executed in the topology creation time, before serialisation. + // just create tmp spout instance to call getComponentConfiguration. + + IRubyObject ruby_spout = initialize_ruby_spout(); + IRubyObject ruby_result = Helpers.invoke(__ruby__.getCurrentContext(), ruby_spout, "get_component_configuration"); + return (Map)ruby_result.toJava(Map.class); } - - private static ITransactionalSpout newProxySpout(String baseClassPath, String realSpoutClassName) { + + protected IRubyObject initialize_ruby_spout() { + __ruby__ = Ruby.getGlobalRuntime(); + + RubyModule ruby_class; try { - redstorm.proxy.TransactionalSpout proxy = new redstorm.proxy.TransactionalSpout(baseClassPath, realSpoutClassName); - return proxy; + ruby_class = __ruby__.getClassFromPath(_realSpoutClassName); } - catch (Exception e) { - throw new RuntimeException(e); + catch (RaiseException e) { + // after deserialization we need to recreate ruby environment + __ruby__.evalScriptlet(_bootstrap); + ruby_class = __ruby__.getClassFromPath(_realSpoutClassName); } + return Helpers.invoke(__ruby__.getCurrentContext(), ruby_class, "new"); } }