From e3e5fb957ab3fe932f2b3c1cd5c4507a8d654a3d Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Wed, 5 Dec 2012 13:57:12 -0800 Subject: [PATCH 01/10] Implementation of BatchSpout proxy class --- lib/red_storm/proxy/batch_spout.rb | 71 +++++++++++++++ .../redstorm/storm/jruby/JRubyBatchSpout.java | 89 +++++++++++++++++++ 2 files changed, 160 insertions(+) create mode 100644 lib/red_storm/proxy/batch_spout.rb create mode 100644 src/main/redstorm/storm/jruby/JRubyBatchSpout.java diff --git a/lib/red_storm/proxy/batch_spout.rb b/lib/red_storm/proxy/batch_spout.rb new file mode 100644 index 0000000..21a4e9d --- /dev/null +++ b/lib/red_storm/proxy/batch_spout.rb @@ -0,0 +1,71 @@ +require 'java' + +java_import 'storm.trident.operation.TridentCollector' +java_import 'backtype.storm.task.TopologyContext' +java_import 'storm.trident.spout.IBatchSpout' +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'backtype.storm.tuple.Tuple' +java_import 'backtype.storm.tuple.Fields' +java_import 'backtype.storm.tuple.Values' +java_import 'java.util.Map' +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + +# the BatchSpout class is a proxy to the real batch spout to avoid having to deal with all the +# Java artifacts when creating a spout. +# +# The real batch spout class implementation must define these methods: +# - open(conf, context, collector) +# - emitBatch +# - getOutputFields +# - ack(batch_id) +# +# and optionnaly: +# - close +# + +class BatchSpout + java_implements IBatchSpout + + java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)' + def initialize(base_class_path, real_spout_class_name) + @real_spout = Object.module_eval(real_spout_class_name).new + rescue NameError + require base_class_path + @real_spout = Object.module_eval(real_spout_class_name).new + end + + java_signature 'void open(Map, TopologyContext)' + def open(conf, context) + @real_spout.open(conf, context) + end + + java_signature 'void close()' + def close + @real_spout.close if @real_spout.respond_to?(:close) + end + + java_signature 'void emitBatch(long, TridentCollector)' + def emitBatch(batch_id, collector) + @real_spout.emit_batch(batch_id, collector) + end + + java_signature 'void ack(long)' + def ack(batch_id) + @real_spout.ack(batch_id) + end + + java_signature 'Fields getOutputFields()' + def getOutputFields + @real_spout.get_output_fields() + end + + java_signature 'Map getComponentConfiguration()' + def getComponentConfiguration + @real_spout.get_component_configuration + end + +end diff --git a/src/main/redstorm/storm/jruby/JRubyBatchSpout.java b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java new file mode 100644 index 0000000..9d8d61f --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java @@ -0,0 +1,89 @@ +package redstorm.storm.jruby; + +import storm.trident.operation.TridentCollector; +import backtype.storm.task.TopologyContext; +import storm.trident.spout.IBatchSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Fields; +import java.util.Map; + +/** + * the JRubyBatchSpout class is a simple proxy class to the actual spout implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization process when dispatching + * the spout to the workers. JRuby does not yet support serialization from Java + * (Java serialization call on a JRuby class). + * + * Note that the JRuby spout proxy class is instanciated in the open method which is called after + * deserialization at the worker and in both the declareOutputFields and isDistributed methods which + * are called once before serialization at topology creation. + */ +public class JRubyBatchSpout implements IBatchSpout { + IBatchSpout _proxySpout; + String _realSpoutClassName; + String _baseClassPath; + String[] _fields; + + /** + * create a new JRubyBatchSpout + * + * @param baseClassPath the topology/project base JRuby class file path + * @param realSpoutClassName the fully qualified JRuby spout implementation class name + */ + public JRubyBatchSpout(String baseClassPath, String realSpoutClassName, String[] fields) { + _baseClassPath = baseClassPath; + _realSpoutClassName = realSpoutClassName; + _fields = fields; + } + + @Override + public void open(final Map conf, final TopologyContext context) { + // create instance of the jruby proxy class here, after deserialization in the workers. + _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); + _proxySpout.open(conf, context); + } + + @Override + public void emitBatch(final long batchId, final TridentCollector collector) { + _proxySpout.emitBatch(batchId, collector); + } + + + @Override + public void close() { + _proxySpout.close(); + } + + @Override + public void ack(final long batchId) { + _proxySpout.ack(batchId); + } + + @Override + public Fields getOutputFields() { + // getOutputFields is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getOutputFields(); + } + + @Override + public Map getComponentConfiguration() { + // getComponentConfiguration is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getComponentConfiguration(); + } + + private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) { + try { + redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} From 636cc896df983e66655f4ffb21f4a8517ffe26ff Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Wed, 5 Dec 2012 16:56:22 -0800 Subject: [PATCH 02/10] Add JRubyBatchSpout to Topology Launder --- lib/red_storm/topology_launcher.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/red_storm/topology_launcher.rb b/lib/red_storm/topology_launcher.rb index 38c3930..189fc7f 100644 --- a/lib/red_storm/topology_launcher.rb +++ b/lib/red_storm/topology_launcher.rb @@ -14,6 +14,7 @@ java_import 'backtype.storm.tuple.Values' java_import 'redstorm.storm.jruby.JRubyBolt' java_import 'redstorm.storm.jruby.JRubySpout' +java_import 'redstorm.storm.jruby.JRubyBatchSpout' java_package 'redstorm' From 348ac629654cab337c0e3799d3abb13f4b2b7a15 Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Mon, 10 Dec 2012 15:45:32 -0800 Subject: [PATCH 03/10] Add support for linear DRPC to redstorm --- lib/red_storm.rb | 1 + lib/red_storm/application.rb | 12 ++--- lib/red_storm/simple_drpc_topology.rb | 68 +++++++++++++++++++++++++++ lib/red_storm/topology_launcher.rb | 8 ++-- 4 files changed, 80 insertions(+), 9 deletions(-) create mode 100644 lib/red_storm/simple_drpc_topology.rb diff --git a/lib/red_storm.rb b/lib/red_storm.rb index 4321e7e..ea6b0cd 100644 --- a/lib/red_storm.rb +++ b/lib/red_storm.rb @@ -6,3 +6,4 @@ require 'red_storm/configuration' require 'red_storm/simple_bolt' require 'red_storm/simple_spout' require 'red_storm/simple_topology' +require 'red_storm/simple_drpc_topology' diff --git a/lib/red_storm/application.rb b/lib/red_storm/application.rb index 370ac04..76d167b 100644 --- a/lib/red_storm/application.rb +++ b/lib/red_storm/application.rb @@ -7,7 +7,7 @@ TARGET_LIB_DIR = "#{TARGET_DIR}/lib" TARGET_SRC_DIR = "#{TARGET_DIR}/src" TARGET_GEM_DIR = "#{TARGET_DIR}/gems/gems" TARGET_SPECS_DIR = "#{TARGET_DIR}/gems/specifications" -TARGET_CLASSES_DIR = "#{TARGET_DIR}/classes" +TARGET_CLASSES_DIR = "#{TARGET_DIR}/classes" TARGET_DEPENDENCY_DIR = "#{TARGET_DIR}/dependency" TARGET_DEPENDENCY_UNPACKED_DIR = "#{TARGET_DIR}/dependency-unpacked" TARGET_CLUSTER_JAR = "#{TARGET_DIR}/cluster-topology.jar" @@ -26,9 +26,9 @@ CUSTOM_IVY_SETTINGS = "#{DST_IVY_DIR}/settings.xml" module RedStorm - - class Application - TASKS_FILE = "#{RedStorm::REDSTORM_HOME}/lib/tasks/red_storm.rake" + + class Application + TASKS_FILE = "#{RedStorm::REDSTORM_HOME}/lib/tasks/red_storm.rake" def self.local_storm_command(class_file, ruby_mode = nil) src_dir = File.expand_path(File.dirname(class_file)) @@ -38,7 +38,7 @@ module RedStorm def self.cluster_storm_command(class_file, ruby_mode = nil) "storm jar #{TARGET_CLUSTER_JAR} -Djruby.compat.version=#{RedStorm.jruby_mode_token(ruby_mode)} redstorm.TopologyLauncher cluster #{class_file}" end - + def self.usage puts("usage: redstorm version") puts(" redstorm install") @@ -82,4 +82,4 @@ module RedStorm end -end \ No newline at end of file +end diff --git a/lib/red_storm/simple_drpc_topology.rb b/lib/red_storm/simple_drpc_topology.rb new file mode 100644 index 0000000..d08345d --- /dev/null +++ b/lib/red_storm/simple_drpc_topology.rb @@ -0,0 +1,68 @@ +require 'java' +require 'red_storm/configuration' +require 'red_storm/configurator' + +module RedStorm + + class InputBoltDefinition < SimpleTopology::BoltDefinition + + def source(source_fields) + @sources << source_fields + end + + def define_grouping(declarer) + @sources.each do |source_fields| + declare.declare(Fields.new(*(Array.wrap(source_fields).map(&:to_s)))) + end + end + end + + class SimpleDRPCTopology < SimpleTopology + + def self.spout + raise TopologyDefinitionError, "DRPC spout is already defined" + end + + + def start(base_class_path, env) + # self.class.resolve_ids!(self.class.components) + + builder = LinearDRPCTopologyBuilder.new(self.class.topology_name) + + self.class.bolts.each do |bolt| + declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java) + #declarer.addConfigurations(bolt.config) + #bolt.define_grouping(declarer) + end + + # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode + defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"} + + configurator = Configurator.new(defaults) + configurator.instance_exec(env, &self.class.configure_block) + + drpc = nil + if env == :local + drpc = LocalDRPC.new + submitter = @cluster = LocalCluster.new + submitter.submitTopology(self.class.topology_name, configurator.config, builder.createLocalTopology(drpc)) + else + submitter = StormSubmitter + submitter.submitTopology(self.class.topology_name, configurator.config, builder.createRemoteTopology) + end + instance_exec(env, drpc, &self.class.submit_block) + end + + def self.input_bolt(bolt_class, *args, &bolt_block) + options = args.last.is_a?(Hash) ? args.pop : {} + contructor_args = !args.empty? ? args.pop : [] + bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options) + + bolt = InputBoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism]) + raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given? + bolt.instance_exec(&bolt_block) + self.components << bolt + end + end + +end diff --git a/lib/red_storm/topology_launcher.rb b/lib/red_storm/topology_launcher.rb index 189fc7f..d39e494 100644 --- a/lib/red_storm/topology_launcher.rb +++ b/lib/red_storm/topology_launcher.rb @@ -6,8 +6,10 @@ module Backtype end java_import 'backtype.storm.LocalCluster' +java_import 'backtype.storm.LocalDRPC' java_import 'backtype.storm.StormSubmitter' java_import 'backtype.storm.topology.TopologyBuilder' +java_import 'backtype.storm.drpc.LinearDRPCTopologyBuilder' java_import 'backtype.storm.tuple.Fields' java_import 'backtype.storm.tuple.Tuple' java_import 'backtype.storm.tuple.Values' @@ -18,7 +20,7 @@ java_import 'redstorm.storm.jruby.JRubyBatchSpout' java_package 'redstorm' -# TopologyLauncher is the application entry point when launching a topology. Basically it will +# TopologyLauncher is the application entry point when launching a topology. Basically it will # call require on the specified Ruby topology class file path and call its start method class TopologyLauncher @@ -37,14 +39,14 @@ class TopologyLauncher $:.unshift File.expand_path(launch_path + '/lib') $:.unshift File.expand_path(launch_path + '/target/lib') - require "#{class_path}" + require "#{class_path}" topology_name = RedStorm::Configuration.topology_class.respond_to?(:topology_name) ? "/#{RedStorm::Configuration.topology_class.topology_name}" : '' puts("RedStorm v#{RedStorm::VERSION} starting topology #{RedStorm::Configuration.topology_class.name}#{topology_name} in #{env.to_s} environment") RedStorm::Configuration.topology_class.new.start(class_path, env) end - private + private def self.camel_case(s) s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase } From 94ccdce6ee265bd18d7966c00a6f5d797a3e9fa8 Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Mon, 10 Dec 2012 16:36:07 -0800 Subject: [PATCH 04/10] Support more the one bolt in LinearDRPC, add support for grouping --- lib/red_storm/simple_drpc_topology.rb | 37 ++++++++++++++++++++------- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/lib/red_storm/simple_drpc_topology.rb b/lib/red_storm/simple_drpc_topology.rb index d08345d..682d310 100644 --- a/lib/red_storm/simple_drpc_topology.rb +++ b/lib/red_storm/simple_drpc_topology.rb @@ -5,14 +5,36 @@ require 'red_storm/configurator' module RedStorm class InputBoltDefinition < SimpleTopology::BoltDefinition + attr_accessor :grouping - def source(source_fields) - @sources << source_fields + def initialize(*args) + super + @grouping = :none + end + + def grouping(grouping) + @grouping = @grouping end def define_grouping(declarer) - @sources.each do |source_fields| - declare.declare(Fields.new(*(Array.wrap(source_fields).map(&:to_s)))) + + case @grouping + when :fields + declarer.fieldsGrouping(Fields.new(*([params].flatten.map(&:to_s)))) + when :global + declarer.globalGrouping() + when :shuffle + declarer.shuffleGrouping() + when :local_or_shuffle + declarer.localOrShuffleGrouping() + when :none + declarer.noneGrouping() + when :all + declarer.allGrouping() + when :direct + declarer.directGrouping() + else + raise("unknown grouper=#{grouper.inspect}") end end end @@ -23,16 +45,13 @@ module RedStorm raise TopologyDefinitionError, "DRPC spout is already defined" end - def start(base_class_path, env) - # self.class.resolve_ids!(self.class.components) - builder = LinearDRPCTopologyBuilder.new(self.class.topology_name) self.class.bolts.each do |bolt| declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java) - #declarer.addConfigurations(bolt.config) - #bolt.define_grouping(declarer) + declarer.addConfigurations(bolt.config) + bolt.define_grouping(declarer) end # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode From 38b12801180ebaeed2c303d82dfdfa1abd16c48c Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Mon, 10 Dec 2012 16:47:28 -0800 Subject: [PATCH 05/10] Refer to LinearDRPCTopologyBuilder by it's global namespace name --- lib/red_storm/simple_drpc_topology.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/red_storm/simple_drpc_topology.rb b/lib/red_storm/simple_drpc_topology.rb index 682d310..ab2a318 100644 --- a/lib/red_storm/simple_drpc_topology.rb +++ b/lib/red_storm/simple_drpc_topology.rb @@ -46,7 +46,7 @@ module RedStorm end def start(base_class_path, env) - builder = LinearDRPCTopologyBuilder.new(self.class.topology_name) + builder = Java::BacktypeStormDrpc::LinearDRPCTopologyBuilder.new(self.class.topology_name) self.class.bolts.each do |bolt| declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java) From e19be8c463e11bad93006aae11f7343cb8fbb5d1 Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Fri, 7 Dec 2012 12:06:06 -0800 Subject: [PATCH 06/10] WIP attempt to build a proxy class generator --- generator.rb | 43 +++++++++ java_proxy.erb | 50 +++++++++++ lib/red_storm/proxy/batch_spout.rb | 74 +++++++-------- ruby_proxy.erb | 30 +++++++ .../redstorm/storm/jruby/JRubyBatchSpout.java | 90 +++++++++---------- 5 files changed, 196 insertions(+), 91 deletions(-) create mode 100644 generator.rb create mode 100644 java_proxy.erb create mode 100644 ruby_proxy.erb diff --git a/generator.rb b/generator.rb new file mode 100644 index 0000000..f1dd447 --- /dev/null +++ b/generator.rb @@ -0,0 +1,43 @@ +require 'erb' +require 'pry' + +require 'java' +require 'active_support/core_ext' + +Dir["../triggit-storm/target/dependency/storm/default/*"].each{|f| $CLASSPATH << File.expand_path(f) } + +to_generate = ["storm.trident.spout.IBatchSpout"] + + +PROXY_JRUBY_TEMPLATE = File.read("./ruby_proxy.erb") +PROXY_JAVA_TEMPLATE = File.read("./java_proxy.erb") + +to_generate = ["storm.trident.spout.IBatchSpout"] + + +to_generate.each do |klass| + _functions = Object.const_get(java_import(klass)[0].to_s.split("::")[-1]).java_class.declared_instance_methods + + java_deps = _functions.map{|f| [f.argument_types.map{|at| at.name}, f.return_type ? f.return_type.name : "void"] }.flatten.uniq.reject{|t| t.split('.').count == 1} << klass + + functions = _functions.reduce({}) do |memo, f| + memo[:"#{f.name}"] = { + :return_type => f.return_type ? f.return_type.name.split('.')[-1] : "void", + :args => f.argument_types.map {|at| {:"_#{at.name.split('.')[-1].camelize(:lower)}" => at.name.split('.')[-1]} }.reduce({}){|m,o| m.merge(o)} + } + memo + end + + interface_name = klass.split(".")[-1] + + ruby_class_name = interface_name[1..-1] + + java_class_name = "JRuby#{ruby_class_name}" + + methods = functions.map do |f_name, params| + {f_name.to_s.underscore.to_sym => {:return_type => params[:return_type], :args => params[:args].map{|name, type| {name.to_s.underscore.to_sym => type}}.reduce({}){|m,o| m.merge(o)} }} + end.reduce({}){|m,o| m.merge(o)} + + File.open("./lib/red_storm/proxy/#{ruby_class_name.underscore}.rb", 'w') {|f| f.write(ERB.new(PROXY_JRUBY_TEMPLATE).result(binding)) } + File.open("./src/main/redstorm/storm/jruby/#{java_class_name}.java", 'w') {|f| f.write(ERB.new(PROXY_JAVA_TEMPLATE).result(binding)) } +end diff --git a/java_proxy.erb b/java_proxy.erb new file mode 100644 index 0000000..b078485 --- /dev/null +++ b/java_proxy.erb @@ -0,0 +1,50 @@ +package redstorm.storm.jruby; +<% java_deps.each do |dep| %> +import <%= dep %>;<% end %> + +public class <%= java_class_name %> implements <%= interface_name %> { + <%= interface_name %> _proxy; + String _realClassName; + String _baseClassPath; + String[] _fields; + + public <%= java_class_name %>(final String baseClassPath, final String realClassName, final String[] fields) { + _baseClassPath = baseClassPath; + _realClassName = realClassName; + _fields = fields; + } + +<% functions.each do |function_name, params| %> + @Override + public <%= params[:return_type] %> <%= function_name %>(<%= params[:args].map{|n,t| ["final #{t}", n].join(' ') }.flatten.join(', ') %>) { + <% if function_name == :open %> + _proxy = newProxy(_baseClassPath, _realClassName); + _proxy.open(<%= params[:args].keys.flatten.join(', ') %>); + <% elsif function_name == :declareOutputFields %> + if (_fields.length > 0) { + <%= params[:args].values[0] %>.declare(new Fields(_fields)); + } else { + newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:function_args_and_types].values.join(', ') %>); + } + <% elsif params[:before_serialization] %> + newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:function_args_and_types].values.join(', ') %>); + <% else %> + _proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>) + <% end %> + } +<% end %> + @Override + public Map getComponentConfiguration() { + newProxy(_baseClassPath, _realClassName).getComponentConfiguration(); + } + + private static <%= interface_name %> newProxy(final String baseClassPath, final String realClassName) { + try { + redstorm.proxy.<%= ruby_class_name %> proxy = new redstorm.proxy.<%= ruby_class_name %>(baseClassPath, realClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/lib/red_storm/proxy/batch_spout.rb b/lib/red_storm/proxy/batch_spout.rb index 21a4e9d..3bb71d6 100644 --- a/lib/red_storm/proxy/batch_spout.rb +++ b/lib/red_storm/proxy/batch_spout.rb @@ -1,71 +1,63 @@ require 'java' -java_import 'storm.trident.operation.TridentCollector' -java_import 'backtype.storm.task.TopologyContext' -java_import 'storm.trident.spout.IBatchSpout' -java_import 'backtype.storm.topology.OutputFieldsDeclarer' -java_import 'backtype.storm.tuple.Tuple' -java_import 'backtype.storm.tuple.Fields' -java_import 'backtype.storm.tuple.Values' + java_import 'java.util.Map' + +java_import 'backtype.storm.task.TopologyContext' + +java_import 'storm.trident.operation.TridentCollector' + +java_import 'backtype.storm.tuple.Fields' + +java_import 'storm.trident.spout.IBatchSpout' + + module Backtype java_import 'backtype.storm.Config' end java_package 'redstorm.proxy' -# the BatchSpout class is a proxy to the real batch spout to avoid having to deal with all the -# Java artifacts when creating a spout. -# -# The real batch spout class implementation must define these methods: -# - open(conf, context, collector) -# - emitBatch -# - getOutputFields -# - ack(batch_id) -# -# and optionnaly: -# - close -# - class BatchSpout java_implements IBatchSpout - java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)' - def initialize(base_class_path, real_spout_class_name) - @real_spout = Object.module_eval(real_spout_class_name).new + java_signature 'IBatchSpout (String base_class_path, String real_class_name)' + def initialize(base_class_path, real_class_name) + @real = Object.module_eval(real_class_name).new rescue NameError require base_class_path - @real_spout = Object.module_eval(real_spout_class_name).new + @real = Object.module_eval(real_class_name).new end java_signature 'void open(Map, TopologyContext)' - def open(conf, context) - @real_spout.open(conf, context) + def open(_map, _topology_context) + @real.open(Map, TopologyContext) end java_signature 'void close()' - def close - @real_spout.close if @real_spout.respond_to?(:close) - end - - java_signature 'void emitBatch(long, TridentCollector)' - def emitBatch(batch_id, collector) - @real_spout.emit_batch(batch_id, collector) + def close() + @real.close() end java_signature 'void ack(long)' - def ack(batch_id) - @real_spout.ack(batch_id) + def ack(_long) + @real.ack(long) end - java_signature 'Fields getOutputFields()' - def getOutputFields - @real_spout.get_output_fields() + java_signature 'void emit_batch(long, TridentCollector)' + def emit_batch(_long, _trident_collector) + @real.emit_batch(long, TridentCollector) end - java_signature 'Map getComponentConfiguration()' - def getComponentConfiguration - @real_spout.get_component_configuration + java_signature 'Map get_component_configuration()' + def get_component_configuration() + @real.get_component_configuration() end + java_signature 'Fields get_output_fields()' + def get_output_fields() + @real.get_output_fields() + end + + end diff --git a/ruby_proxy.erb b/ruby_proxy.erb new file mode 100644 index 0000000..3141baf --- /dev/null +++ b/ruby_proxy.erb @@ -0,0 +1,30 @@ +require 'java' + +<% java_deps.each do |dep| %> +java_import '<%= dep %>' +<% end %> + +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + +class <%= ruby_class_name %> + java_implements <%= interface_name %> + + java_signature '<%= interface_name %> (String base_class_path, String real_class_name)' + def initialize(base_class_path, real_class_name) + @real = Object.module_eval(real_class_name).new + rescue NameError + require base_class_path + @real = Object.module_eval(real_class_name).new + end +<% methods.each do |method_name, params| %> + java_signature '<%= params[:return_type] %> <%= method_name %>(<%= params[:args].values.join(', ') %>)' + def <%= method_name %>(<%= params[:args].keys.join(', ') %>) + @real.<%= method_name %>(<%= params[:args].values.join(', ') %>) + end +<% end %> + +end diff --git a/src/main/redstorm/storm/jruby/JRubyBatchSpout.java b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java index 9d8d61f..855750a 100644 --- a/src/main/redstorm/storm/jruby/JRubyBatchSpout.java +++ b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java @@ -1,85 +1,75 @@ package redstorm.storm.jruby; -import storm.trident.operation.TridentCollector; -import backtype.storm.task.TopologyContext; -import storm.trident.spout.IBatchSpout; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.Fields; import java.util.Map; +import backtype.storm.task.TopologyContext; +import storm.trident.operation.TridentCollector; +import backtype.storm.tuple.Fields; +import storm.trident.spout.IBatchSpout; -/** - * the JRubyBatchSpout class is a simple proxy class to the actual spout implementation in JRuby. - * this proxy is required to bypass the serialization/deserialization process when dispatching - * the spout to the workers. JRuby does not yet support serialization from Java - * (Java serialization call on a JRuby class). - * - * Note that the JRuby spout proxy class is instanciated in the open method which is called after - * deserialization at the worker and in both the declareOutputFields and isDistributed methods which - * are called once before serialization at topology creation. - */ public class JRubyBatchSpout implements IBatchSpout { - IBatchSpout _proxySpout; - String _realSpoutClassName; + IBatchSpout _proxy; + String _realClassName; String _baseClassPath; String[] _fields; - /** - * create a new JRubyBatchSpout - * - * @param baseClassPath the topology/project base JRuby class file path - * @param realSpoutClassName the fully qualified JRuby spout implementation class name - */ - public JRubyBatchSpout(String baseClassPath, String realSpoutClassName, String[] fields) { + public JRubyBatchSpout(final String baseClassPath, final String realClassName, final String[] fields) { _baseClassPath = baseClassPath; - _realSpoutClassName = realSpoutClassName; + _realClassName = realClassName; _fields = fields; } - @Override - public void open(final Map conf, final TopologyContext context) { - // create instance of the jruby proxy class here, after deserialization in the workers. - _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); - _proxySpout.open(conf, context); - } @Override - public void emitBatch(final long batchId, final TridentCollector collector) { - _proxySpout.emitBatch(batchId, collector); + public void open(final Map _map, final TopologyContext _topologyContext) { + + _proxy = newProxy(_baseClassPath, _realClassName); + _proxy.open(_map, _topologyContext); + } - @Override public void close() { - _proxySpout.close(); + + _proxy.close() + } @Override - public void ack(final long batchId) { - _proxySpout.ack(batchId); + public void ack(final long _long) { + + _proxy.ack(_long) + + } + + @Override + public void emitBatch(final long _long, final TridentCollector _tridentCollector) { + + _proxy.emitBatch(_long, _tridentCollector) + + } + + @Override + public Map getComponentConfiguration() { + + _proxy.getComponentConfiguration() + } @Override public Fields getOutputFields() { - // getOutputFields is executed in the topology creation time before serialisation. - // do not set the _proxySpout instance variable here to avoid JRuby serialization - // issues. Just create tmp spout instance to call declareOutputFields. - IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); - return spout.getOutputFields(); + + _proxy.getOutputFields() + } @Override public Map getComponentConfiguration() { - // getComponentConfiguration is executed in the topology creation time before serialisation. - // do not set the _proxySpout instance variable here to avoid JRuby serialization - // issues. Just create tmp spout instance to call declareOutputFields. - IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); - return spout.getComponentConfiguration(); + newProxy(_baseClassPath, _realClassName).getComponentConfiguration(); } - private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) { + private static IBatchSpout newProxy(String baseClassPath, String realClassName) { try { - redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName); + redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realClassName); return proxy; } catch (Exception e) { From af0948a46e70ee04a6a2d23b39b4a3de7abaeafa Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Tue, 11 Dec 2012 21:41:27 -0800 Subject: [PATCH 07/10] Further updates to proxy class generator --- generator.rb | 24 ++++++++++++++++++------ ruby_proxy.erb | 2 +- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/generator.rb b/generator.rb index f1dd447..98ffbe1 100644 --- a/generator.rb +++ b/generator.rb @@ -6,20 +6,30 @@ require 'active_support/core_ext' Dir["../triggit-storm/target/dependency/storm/default/*"].each{|f| $CLASSPATH << File.expand_path(f) } -to_generate = ["storm.trident.spout.IBatchSpout"] - PROXY_JRUBY_TEMPLATE = File.read("./ruby_proxy.erb") PROXY_JAVA_TEMPLATE = File.read("./java_proxy.erb") -to_generate = ["storm.trident.spout.IBatchSpout"] +to_generate = ["storm.trident.operation.Function"] +# Return all java functions of a java class +def get_functions(jlass) + jlass.declared_instance_methods.concat( jlass.interfaces.map{|i| get_functions(i) }.flatten ) +end + +# Return all java deps of a class +def get_java_deps(functions, klass) + functions.map{|f| [f.argument_types.map{|at| at.name}, f.return_type ? f.return_type.name : "void"] }.flatten.uniq.reject{|t| t.split('.').count == 1} << klass +end + to_generate.each do |klass| - _functions = Object.const_get(java_import(klass)[0].to_s.split("::")[-1]).java_class.declared_instance_methods + _functions = get_functions(Object.const_get(java_import(klass)[0].to_s.split("::")[-1]).java_class) - java_deps = _functions.map{|f| [f.argument_types.map{|at| at.name}, f.return_type ? f.return_type.name : "void"] }.flatten.uniq.reject{|t| t.split('.').count == 1} << klass + java_deps = get_java_deps(_functions, klass) + + # Boil down functions to {:function_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } } functions = _functions.reduce({}) do |memo, f| memo[:"#{f.name}"] = { :return_type => f.return_type ? f.return_type.name.split('.')[-1] : "void", @@ -30,10 +40,12 @@ to_generate.each do |klass| interface_name = klass.split(".")[-1] - ruby_class_name = interface_name[1..-1] + # IBlah to Blah if IBlah + ruby_class_name = interface_name.starts_with?('I') ? interface_name[1..-1] : interface_name java_class_name = "JRuby#{ruby_class_name}" + # Rubyify java functions into {:method_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } } methods = functions.map do |f_name, params| {f_name.to_s.underscore.to_sym => {:return_type => params[:return_type], :args => params[:args].map{|name, type| {name.to_s.underscore.to_sym => type}}.reduce({}){|m,o| m.merge(o)} }} end.reduce({}){|m,o| m.merge(o)} diff --git a/ruby_proxy.erb b/ruby_proxy.erb index 3141baf..d7c94eb 100644 --- a/ruby_proxy.erb +++ b/ruby_proxy.erb @@ -23,7 +23,7 @@ class <%= ruby_class_name %> <% methods.each do |method_name, params| %> java_signature '<%= params[:return_type] %> <%= method_name %>(<%= params[:args].values.join(', ') %>)' def <%= method_name %>(<%= params[:args].keys.join(', ') %>) - @real.<%= method_name %>(<%= params[:args].values.join(', ') %>) + @real.<%= method_name %>(<%= params[:args].keys.join(', ') %>) end <% end %> From 5eb716184191559395356040fbc9618d6fec018a Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Tue, 11 Dec 2012 22:27:04 -0800 Subject: [PATCH 08/10] Generator correctly handles Trident Function, created proxy class for Trident Functions --- generator.rb | 10 ++- java_proxy.erb | 15 ++-- lib/red_storm/proxy/batch_spout.rb | 72 ++++++++------- lib/red_storm/proxy/proxy_function.rb | 48 ++++++++++ .../redstorm/storm/jruby/JRubyBatchSpout.java | 88 +++++++++++-------- .../storm/jruby/JRubyProxyFunction.java | 59 +++++++++++++ 6 files changed, 211 insertions(+), 81 deletions(-) create mode 100644 lib/red_storm/proxy/proxy_function.rb create mode 100644 src/main/redstorm/storm/jruby/JRubyProxyFunction.java diff --git a/generator.rb b/generator.rb index 98ffbe1..a66c30b 100644 --- a/generator.rb +++ b/generator.rb @@ -31,17 +31,21 @@ to_generate.each do |klass| # Boil down functions to {:function_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } } functions = _functions.reduce({}) do |memo, f| + before_serialization = %w{ }.include?(f.name.to_s) + memoize = %w{ prepare execute }.include?(f.name.to_s) memo[:"#{f.name}"] = { :return_type => f.return_type ? f.return_type.name.split('.')[-1] : "void", - :args => f.argument_types.map {|at| {:"_#{at.name.split('.')[-1].camelize(:lower)}" => at.name.split('.')[-1]} }.reduce({}){|m,o| m.merge(o)} + :args => f.argument_types.map {|at| {:"_#{at.name.split('.')[-1].camelize(:lower)}" => at.name.split('.')[-1]} }.reduce({}){|m,o| m.merge(o)}, + :before_serialization => before_serialization, + :memoize => memoize } memo end interface_name = klass.split(".")[-1] - # IBlah to Blah if IBlah - ruby_class_name = interface_name.starts_with?('I') ? interface_name[1..-1] : interface_name + # IBlah to ProxyBlah if IBlah + ruby_class_name = "Proxy#{interface_name.starts_with?('I') ? interface_name[1..-1] : interface_name}" java_class_name = "JRuby#{ruby_class_name}" diff --git a/java_proxy.erb b/java_proxy.erb index b078485..c154169 100644 --- a/java_proxy.erb +++ b/java_proxy.erb @@ -24,19 +24,20 @@ public class <%= java_class_name %> implements <%= interface_name %> { if (_fields.length > 0) { <%= params[:args].values[0] %>.declare(new Fields(_fields)); } else { - newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:function_args_and_types].values.join(', ') %>); + newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>); } <% elsif params[:before_serialization] %> - newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:function_args_and_types].values.join(', ') %>); + newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>); + <% elsif params[:memoize] %> + if(_proxy == null) { + _proxy = newProxy(_baseClassPath, _realClassName); + } + _proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>); <% else %> - _proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>) + _proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>); <% end %> } <% end %> - @Override - public Map getComponentConfiguration() { - newProxy(_baseClassPath, _realClassName).getComponentConfiguration(); - } private static <%= interface_name %> newProxy(final String baseClassPath, final String realClassName) { try { diff --git a/lib/red_storm/proxy/batch_spout.rb b/lib/red_storm/proxy/batch_spout.rb index 3bb71d6..21a4e9d 100644 --- a/lib/red_storm/proxy/batch_spout.rb +++ b/lib/red_storm/proxy/batch_spout.rb @@ -1,63 +1,71 @@ require 'java' - -java_import 'java.util.Map' - -java_import 'backtype.storm.task.TopologyContext' - java_import 'storm.trident.operation.TridentCollector' - -java_import 'backtype.storm.tuple.Fields' - +java_import 'backtype.storm.task.TopologyContext' java_import 'storm.trident.spout.IBatchSpout' - - +java_import 'backtype.storm.topology.OutputFieldsDeclarer' +java_import 'backtype.storm.tuple.Tuple' +java_import 'backtype.storm.tuple.Fields' +java_import 'backtype.storm.tuple.Values' +java_import 'java.util.Map' module Backtype java_import 'backtype.storm.Config' end java_package 'redstorm.proxy' +# the BatchSpout class is a proxy to the real batch spout to avoid having to deal with all the +# Java artifacts when creating a spout. +# +# The real batch spout class implementation must define these methods: +# - open(conf, context, collector) +# - emitBatch +# - getOutputFields +# - ack(batch_id) +# +# and optionnaly: +# - close +# + class BatchSpout java_implements IBatchSpout - java_signature 'IBatchSpout (String base_class_path, String real_class_name)' - def initialize(base_class_path, real_class_name) - @real = Object.module_eval(real_class_name).new + java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)' + def initialize(base_class_path, real_spout_class_name) + @real_spout = Object.module_eval(real_spout_class_name).new rescue NameError require base_class_path - @real = Object.module_eval(real_class_name).new + @real_spout = Object.module_eval(real_spout_class_name).new end java_signature 'void open(Map, TopologyContext)' - def open(_map, _topology_context) - @real.open(Map, TopologyContext) + def open(conf, context) + @real_spout.open(conf, context) end java_signature 'void close()' - def close() - @real.close() + def close + @real_spout.close if @real_spout.respond_to?(:close) + end + + java_signature 'void emitBatch(long, TridentCollector)' + def emitBatch(batch_id, collector) + @real_spout.emit_batch(batch_id, collector) end java_signature 'void ack(long)' - def ack(_long) - @real.ack(long) + def ack(batch_id) + @real_spout.ack(batch_id) end - java_signature 'void emit_batch(long, TridentCollector)' - def emit_batch(_long, _trident_collector) - @real.emit_batch(long, TridentCollector) + java_signature 'Fields getOutputFields()' + def getOutputFields + @real_spout.get_output_fields() end - java_signature 'Map get_component_configuration()' - def get_component_configuration() - @real.get_component_configuration() + java_signature 'Map getComponentConfiguration()' + def getComponentConfiguration + @real_spout.get_component_configuration end - java_signature 'Fields get_output_fields()' - def get_output_fields() - @real.get_output_fields() - end - - end diff --git a/lib/red_storm/proxy/proxy_function.rb b/lib/red_storm/proxy/proxy_function.rb new file mode 100644 index 0000000..d0e7944 --- /dev/null +++ b/lib/red_storm/proxy/proxy_function.rb @@ -0,0 +1,48 @@ +require 'java' + + +java_import 'storm.trident.tuple.TridentTuple' + +java_import 'storm.trident.operation.TridentCollector' + +java_import 'java.util.Map' + +java_import 'storm.trident.operation.TridentOperationContext' + +java_import 'storm.trident.operation.Function' + + +module Backtype + java_import 'backtype.storm.Config' +end + +java_package 'redstorm.proxy' + +class ProxyFunction + java_implements Function + + java_signature 'Function (String base_class_path, String real_class_name)' + def initialize(base_class_path, real_class_name) + @real = Object.module_eval(real_class_name).new + rescue NameError + require base_class_path + @real = Object.module_eval(real_class_name).new + end + + java_signature 'void execute(TridentTuple, TridentCollector)' + def execute(_trident_tuple, _trident_collector) + @real.execute(_trident_tuple, _trident_collector) + end + + java_signature 'void cleanup()' + def cleanup() + @real.cleanup() + end + + java_signature 'void prepare(Map, TridentOperationContext)' + def prepare(_map, _trident_operation_context) + @real.prepare(_map, _trident_operation_context) + end + + +end diff --git a/src/main/redstorm/storm/jruby/JRubyBatchSpout.java b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java index 855750a..9d8d61f 100644 --- a/src/main/redstorm/storm/jruby/JRubyBatchSpout.java +++ b/src/main/redstorm/storm/jruby/JRubyBatchSpout.java @@ -1,75 +1,85 @@ package redstorm.storm.jruby; -import java.util.Map; -import backtype.storm.task.TopologyContext; import storm.trident.operation.TridentCollector; -import backtype.storm.tuple.Fields; +import backtype.storm.task.TopologyContext; import storm.trident.spout.IBatchSpout; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Fields; +import java.util.Map; +/** + * the JRubyBatchSpout class is a simple proxy class to the actual spout implementation in JRuby. + * this proxy is required to bypass the serialization/deserialization process when dispatching + * the spout to the workers. JRuby does not yet support serialization from Java + * (Java serialization call on a JRuby class). + * + * Note that the JRuby spout proxy class is instanciated in the open method which is called after + * deserialization at the worker and in both the declareOutputFields and isDistributed methods which + * are called once before serialization at topology creation. + */ public class JRubyBatchSpout implements IBatchSpout { - IBatchSpout _proxy; - String _realClassName; + IBatchSpout _proxySpout; + String _realSpoutClassName; String _baseClassPath; String[] _fields; - public JRubyBatchSpout(final String baseClassPath, final String realClassName, final String[] fields) { + /** + * create a new JRubyBatchSpout + * + * @param baseClassPath the topology/project base JRuby class file path + * @param realSpoutClassName the fully qualified JRuby spout implementation class name + */ + public JRubyBatchSpout(String baseClassPath, String realSpoutClassName, String[] fields) { _baseClassPath = baseClassPath; - _realClassName = realClassName; + _realSpoutClassName = realSpoutClassName; _fields = fields; } + @Override + public void open(final Map conf, final TopologyContext context) { + // create instance of the jruby proxy class here, after deserialization in the workers. + _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); + _proxySpout.open(conf, context); + } @Override - public void open(final Map _map, final TopologyContext _topologyContext) { - - _proxy = newProxy(_baseClassPath, _realClassName); - _proxy.open(_map, _topologyContext); - + public void emitBatch(final long batchId, final TridentCollector collector) { + _proxySpout.emitBatch(batchId, collector); } + @Override public void close() { - - _proxy.close() - + _proxySpout.close(); } @Override - public void ack(final long _long) { - - _proxy.ack(_long) - - } - - @Override - public void emitBatch(final long _long, final TridentCollector _tridentCollector) { - - _proxy.emitBatch(_long, _tridentCollector) - - } - - @Override - public Map getComponentConfiguration() { - - _proxy.getComponentConfiguration() - + public void ack(final long batchId) { + _proxySpout.ack(batchId); } @Override public Fields getOutputFields() { - - _proxy.getOutputFields() - + // getOutputFields is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getOutputFields(); } @Override public Map getComponentConfiguration() { - newProxy(_baseClassPath, _realClassName).getComponentConfiguration(); + // getComponentConfiguration is executed in the topology creation time before serialisation. + // do not set the _proxySpout instance variable here to avoid JRuby serialization + // issues. Just create tmp spout instance to call declareOutputFields. + IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + return spout.getComponentConfiguration(); } - private static IBatchSpout newProxy(String baseClassPath, String realClassName) { + private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) { try { - redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realClassName); + redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName); return proxy; } catch (Exception e) { diff --git a/src/main/redstorm/storm/jruby/JRubyProxyFunction.java b/src/main/redstorm/storm/jruby/JRubyProxyFunction.java new file mode 100644 index 0000000..c6030ab --- /dev/null +++ b/src/main/redstorm/storm/jruby/JRubyProxyFunction.java @@ -0,0 +1,59 @@ +package redstorm.storm.jruby; + +import storm.trident.tuple.TridentTuple; +import storm.trident.operation.TridentCollector; +import java.util.Map; +import storm.trident.operation.TridentOperationContext; +import storm.trident.operation.Function; + +public class JRubyProxyFunction implements Function { + Function _proxy; + String _realClassName; + String _baseClassPath; + String[] _fields; + + public JRubyProxyFunction(final String baseClassPath, final String realClassName, final String[] fields) { + _baseClassPath = baseClassPath; + _realClassName = realClassName; + _fields = fields; + } + + + @Override + public void execute(final TridentTuple _tridentTuple, final TridentCollector _tridentCollector) { + + if(_proxy == null) { + _proxy = newProxy(_baseClassPath, _realClassName); + } + _proxy.execute(_tridentTuple, _tridentCollector); + + } + + @Override + public void cleanup() { + + _proxy.cleanup(); + + } + + @Override + public void prepare(final Map _map, final TridentOperationContext _tridentOperationContext) { + + if(_proxy == null) { + _proxy = newProxy(_baseClassPath, _realClassName); + } + _proxy.prepare(_map, _tridentOperationContext); + + } + + + private static Function newProxy(final String baseClassPath, final String realClassName) { + try { + redstorm.proxy.ProxyFunction proxy = new redstorm.proxy.ProxyFunction(baseClassPath, realClassName); + return proxy; + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} From 0fca9ffc954f792f15cfa15a778dd1d88534c2a8 Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Fri, 1 Feb 2013 13:45:28 -0800 Subject: [PATCH 09/10] Add support for customizing spout id's --- lib/red_storm/simple_topology.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/red_storm/simple_topology.rb b/lib/red_storm/simple_topology.rb index 880365d..cb1134b 100644 --- a/lib/red_storm/simple_topology.rb +++ b/lib/red_storm/simple_topology.rb @@ -36,7 +36,7 @@ module RedStorm end class SpoutDefinition < ComponentDefinition - + # WARNING non-dry see BoltDefinition#new_instance def new_instance(base_class_path) if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout" @@ -49,7 +49,7 @@ module RedStorm # is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name) end end - + class BoltDefinition < ComponentDefinition attr_accessor :sources, :command @@ -119,7 +119,7 @@ module RedStorm def self.bolt(bolt_class, *args, &bolt_block) options = args.last.is_a?(Hash) ? args.pop : {} contructor_args = !args.empty? ? args.pop : [] - bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options) + bolt_options = {:id => options[:id] ? options[:id] : self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options) bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism]) raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given? @@ -158,7 +158,7 @@ module RedStorm configurator = Configurator.new(defaults) configurator.instance_exec(env, &self.class.configure_block) - + submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology) instance_exec(env, &self.class.submit_block) From a2ecfe0b9708c910f8814226f65f7bbaf988de5f Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Tue, 5 Feb 2013 10:24:46 -0800 Subject: [PATCH 10/10] Update storm to v0.8.2 --- lib/tasks/red_storm.rake | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/lib/tasks/red_storm.rake b/lib/tasks/red_storm.rake index 59c330e..c36ca79 100644 --- a/lib/tasks/red_storm.rake +++ b/lib/tasks/red_storm.rake @@ -10,7 +10,7 @@ require 'jruby/jrubyc' require 'red_storm' require 'red_storm/application' -DEP_STORM_VERSION = "0.8.1" +DEP_STORM_VERSION = "0.8.2" DEP_JRUBY_VERSION = "1.6.8" INSTALL_IVY_VERSION = "2.2.0" @@ -26,7 +26,7 @@ DEFAULT_DEPENDENCIES = { task :launch, :env, :ruby_mode, :class_file do |t, args| # use ruby mode parameter or default to current interpreter version version_token = RedStorm.jruby_mode_token(args[:ruby_mode]) - + command = case args[:env] when "local" RedStorm::Application.local_storm_command(args[:class_file], args[:ruby_mode]) @@ -54,16 +54,16 @@ end task :setup do puts("\n--> Setting up target directories") - ant.mkdir :dir => TARGET_DIR - ant.mkdir :dir => TARGET_CLASSES_DIR + ant.mkdir :dir => TARGET_DIR + ant.mkdir :dir => TARGET_CLASSES_DIR ant.mkdir :dir => TARGET_DEPENDENCY_DIR ant.mkdir :dir => TARGET_SRC_DIR ant.mkdir :dir => TARGET_GEM_DIR ant.mkdir :dir => TARGET_SPECS_DIR - ant.path :id => 'classpath' do - fileset :dir => TARGET_DEPENDENCY_DIR - fileset :dir => TARGET_CLASSES_DIR - end + ant.path :id => 'classpath' do + fileset :dir => TARGET_DEPENDENCY_DIR + fileset :dir => TARGET_CLASSES_DIR + end end task :install => [:deps, :build] do @@ -164,7 +164,7 @@ task :deps => "ivy:install" do artifact, transitive = dependency.split(/\s*,\s*/) ivy_retrieve(*artifact.split(':').concat([transitive.split(/\s*=\s*/).last, "#{TARGET_DEPENDENCY_DIR}/topology", "default"])) end -end +end task :jar, [:include_dir] => [:clean_jar] do |t, args| puts("\n--> Generating JAR file #{TARGET_CLUSTER_JAR}") @@ -215,7 +215,7 @@ def build_java_dir(source_folder) ant.javac( :srcdir => source_folder, :destdir => TARGET_CLASSES_DIR, - :classpathref => 'classpath', + :classpathref => 'classpath', :source => "1.6", :target => "1.6", :debug => "yes", @@ -224,8 +224,8 @@ def build_java_dir(source_folder) :listfiles => true ) do # compilerarg :value => "-Xlint:unchecked" - end -end + end +end def build_jruby(source_path) puts("\n--> Compiling JRuby")