diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb index f0f851c..a0c6e9b 100644 --- a/lib/red_storm/dsl/bolt.rb +++ b/lib/red_storm/dsl/bolt.rb @@ -1,6 +1,7 @@ require 'java' require 'red_storm/configurator' require 'red_storm/environment' +require 'red_storm/dsl/output_fields' require 'pathname' java_import 'backtype.storm.tuple.Fields' @@ -14,6 +15,8 @@ module RedStorm class Bolt attr_reader :collector, :context, :config + include OutputFields + def self.java_proxy; "Java::RedstormStormJruby::JRubyBolt"; end # DSL class methods @@ -22,19 +25,6 @@ module RedStorm @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) end - def self.output_fields(*fields) - @fields ||= [] - fields.each do |field| - if field.kind_of? Hash - @fields << Hash[ - field.map { |k, v| [k.to_s, v.kind_of?(Array) ? v.map(&:to_s) : v.to_s] } - ] - else - @fields << field.to_s - end - end - end - def self.configure(&configure_block) @configure_block = block_given? ? configure_block : lambda {} end @@ -67,10 +57,6 @@ module RedStorm self.class.log end - def stream - self.class.stream - end - def unanchored_emit(*values) @collector.emit_tuple(Values.new(*values)) end @@ -132,21 +118,6 @@ module RedStorm on_close end - def declare_output_fields(declarer) - default_fields = [] - self.class.fields.each do |field| - if field.kind_of? Hash - field.each do |stream, fields| - declarer.declareStream(stream, Fields.new(fields)) - end - else - default_fields << field - end - end - - declarer.declare(Fields.new(default_fields.flatten)) unless default_fields.empty? - end - def get_component_configuration configurator = Configurator.new configurator.instance_exec(&self.class.configure_block) @@ -159,10 +130,6 @@ module RedStorm def on_init; end def on_close; end - def self.fields - @fields ||= [] - end - def self.configure_block @configure_block ||= lambda {} end @@ -183,14 +150,6 @@ module RedStorm !!self.receive_options[:anchor] end - def self.stream? - self.receive_options[:stream] && !self.receive_options[:stream].empty? - end - - def self.stream - self.receive_options[:stream] - end - # below non-dry see Spout class def self.inherited(subclass) path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(BoltError, "unable to extract base topology class path from #{caller.first.inspect}") diff --git a/lib/red_storm/dsl/output_fields.rb b/lib/red_storm/dsl/output_fields.rb new file mode 100644 index 0000000..a506711 --- /dev/null +++ b/lib/red_storm/dsl/output_fields.rb @@ -0,0 +1,58 @@ + +module RedStorm + module DSL + module OutputFields + + def self.included(base) + base.extend ClassMethods + end + + def declare_output_fields(declarer) + default_fields = [] + self.class.fields.each do |field| + if field.kind_of? Hash + field.each do |stream, fields| + declarer.declareStream(stream, Fields.new(fields)) + end + else + default_fields << field + end + end + + declarer.declare(Fields.new(default_fields.flatten)) unless default_fields.empty? + end + + def stream + self.class.stream + end + + module ClassMethods + def output_fields(*fields) + @fields ||= [] + fields.each do |field| + if field.kind_of? Hash + @fields << Hash[ + field.map { |k, v| [k.to_s, v.kind_of?(Array) ? v.map(&:to_s) : v.to_s] } + ] + else + @fields << field.to_s + end + end + end + + def fields + @fields ||= [] + end + + def stream? + self.receive_options[:stream] && !self.receive_options[:stream].empty? + end + + def stream + self.receive_options[:stream] + end + end + end + end +end + diff --git a/lib/red_storm/dsl/spout.rb b/lib/red_storm/dsl/spout.rb index 4846742..14ab5e8 100644 --- a/lib/red_storm/dsl/spout.rb +++ b/lib/red_storm/dsl/spout.rb @@ -1,6 +1,7 @@ require 'java' require 'red_storm/configurator' require 'red_storm/environment' +require 'red_storm/dsl/output_fields' require 'pathname' module RedStorm @@ -11,6 +12,8 @@ module RedStorm class Spout attr_reader :config, :context, :collector + include OutputFields + def self.java_proxy; "Java::RedstormStormJruby::JRubySpout"; end # DSL class methods @@ -23,10 +26,6 @@ module RedStorm @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) end - def self.output_fields(*fields) - @fields = fields.map(&:to_s) - end - def self.on_send(*args, &on_send_block) options = args.last.is_a?(Hash) ? args.pop : {} method_name = args.first @@ -126,10 +125,6 @@ module RedStorm on_deactivate end - def declare_output_fields(declarer) - declarer.declare(Fields.new(self.class.fields)) - end - def ack(msg_id) on_ack(msg_id) end @@ -154,10 +149,6 @@ module RedStorm def on_ack(msg_id); end def on_fail(msg_id); end - def self.fields - @fields ||= [] - end - def self.configure_block @configure_block ||= lambda {} end diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index b09429e..ecb6dcb 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -26,16 +26,37 @@ module RedStorm @constructor_args = constructor_args @id = id.to_s @parallelism = parallelism - @output_fields = [] + @output_fields = Hash.new([]) end def output_fields(*args) - args.empty? ? @output_fields : @output_fields = args.map(&:to_s) + args.each do |field| + if field.kind_of? Hash + field.each { |k, v| merge_fields(k.to_s, v) } + else + merge_fields('default', field) + end + end + @output_fields end def is_java? @clazz.name.split('::').first.downcase == 'java' end + + private + + def java_safe_fields + java_hash = java.util.HashMap.new() + @output_fields.each do |k, v| + java_hash.put(k, v.to_java('java.lang.String')) + end + java_hash + end + + def merge_fields(stream, fields) + @output_fields[stream] |= fields.kind_of?(Array) ? fields.map(&:to_s) : [fields.to_s] + end end class SpoutDefinition < ComponentDefinition @@ -47,7 +68,7 @@ module RedStorm elsif is_java? @clazz.new(*constructor_args) else - Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields) + Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields) end end end @@ -100,7 +121,7 @@ module RedStorm elsif is_java? @clazz.new(*constructor_args) else - Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields) + Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields) end end end diff --git a/spec/red_storm/dsl/topology_spec.rb b/spec/red_storm/dsl/topology_spec.rb index 64866ea..c1b25ce 100644 --- a/spec/red_storm/dsl/topology_spec.rb +++ b/spec/red_storm/dsl/topology_spec.rb @@ -1,6 +1,8 @@ require 'spec_helper' require 'red_storm/dsl/topology' +require 'pry' + describe RedStorm::SimpleTopology do # mock Storm imported classes @@ -112,8 +114,11 @@ describe RedStorm::SimpleTopology do output_fields :f3 end end - Topology1.spouts.first.output_fields.should == ["f1", "f2"] - Topology1.spouts.last.output_fields.should == [ "f3"] + # Pry.config.input = STDIN + # Pry.config.output = STDOUT + # binding.pry + Topology1.spouts.first.output_fields.should == { "default" => ["f1", "f2"] } + Topology1.spouts.last.output_fields.should == { "default" => ["f3"] } end end @@ -195,8 +200,8 @@ describe RedStorm::SimpleTopology do output_fields :f3 end end - Topology1.bolts.first.output_fields.should == ["f1", "f2"] - Topology1.bolts.last.output_fields.should == [ "f3"] + Topology1.bolts.first.output_fields.should == { "default" => ["f1", "f2"] } + Topology1.bolts.last.output_fields.should == { "default" => ["f3"] } end end @@ -307,8 +312,8 @@ describe RedStorm::SimpleTopology do RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) RedStorm::Configurator.should_receive(:new).and_return(configurator) - RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout1) - RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", []).and_return(jruby_spout2) + RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", {}).and_return(jruby_spout1) + RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", {}).and_return(jruby_spout2) builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1).and_return(declarer) builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1).and_return(declarer) @@ -345,8 +350,8 @@ describe RedStorm::SimpleTopology do RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) RedStorm::Configurator.should_receive(:new).and_return(configurator) - RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt1) - RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", []).and_return(jruby_bolt2) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", {}).and_return(jruby_bolt1) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", {}).and_return(jruby_bolt2) builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return(declarer) builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer) @@ -377,8 +382,8 @@ describe RedStorm::SimpleTopology do backtype_config = mock(Backtype::Config) Backtype::Config.should_receive(:new).any_number_of_times.and_return(backtype_config) backtype_config.should_receive(:put) - RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt) - RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", {}).and_return(jruby_bolt) + RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", {}).and_return(jruby_spout) builder.should_receive("setBolt").with('bolt_class1', jruby_bolt, 1).and_return(@declarer) builder.should_receive("setSpout").with('1', jruby_spout, 1).and_return(@declarer) @declarer.should_receive("addConfigurations").twice diff --git a/src/main/redstorm/storm/jruby/JRubyBolt.java b/src/main/redstorm/storm/jruby/JRubyBolt.java index a4106e5..abbb49f 100644 --- a/src/main/redstorm/storm/jruby/JRubyBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyBolt.java @@ -6,9 +6,11 @@ import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; +import java.util.Iterator; import java.util.Map; import org.jruby.Ruby; +import org.jruby.RubyHash; import org.jruby.RubyObject; import org.jruby.runtime.Helpers; import org.jruby.runtime.builtin.IRubyObject; @@ -27,7 +29,7 @@ import org.jruby.exceptions.RaiseException; */ public class JRubyBolt implements IRichBolt { private final String _realBoltClassName; - private final String[] _fields; + private final Map _fields; private final String _bootstrap; // transient to avoid serialization @@ -41,7 +43,7 @@ public class JRubyBolt implements IRichBolt { * @param realBoltClassName the fully qualified JRuby bolt implementation class name * @param fields the output fields names */ - public JRubyBolt(String baseClassPath, String realBoltClassName, String[] fields) { + public JRubyBolt(String baseClassPath, String realBoltClassName, Map fields) { _realBoltClassName = realBoltClassName; _fields = fields; _bootstrap = "require '" + baseClassPath + "'"; @@ -72,8 +74,13 @@ public class JRubyBolt implements IRichBolt { // declareOutputFields is executed in the topology creation time, before serialisation. // just create tmp bolt instance to call declareOutputFields. - if (_fields.length > 0) { - declarer.declare(new Fields(_fields)); + if (_fields.size() > 0) { + Iterator iterator = _fields.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry field = (Map.Entry)iterator.next(); + declarer.declareStream(field.getKey(), new Fields(field.getValue())); + iterator.remove(); + } } else { IRubyObject ruby_bolt = initialize_ruby_bolt(); IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer); diff --git a/src/main/redstorm/storm/jruby/JRubySpout.java b/src/main/redstorm/storm/jruby/JRubySpout.java index b30d292..a6d81f2 100644 --- a/src/main/redstorm/storm/jruby/JRubySpout.java +++ b/src/main/redstorm/storm/jruby/JRubySpout.java @@ -6,6 +6,7 @@ import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; +import java.util.Iterator; import java.util.Map; import org.jruby.Ruby; @@ -27,7 +28,7 @@ import org.jruby.exceptions.RaiseException; */ public class JRubySpout implements IRichSpout { private final String _realSpoutClassName; - private final String[] _fields; + private final Map _fields; private final String _bootstrap; // transient to avoid serialization @@ -41,7 +42,7 @@ public class JRubySpout implements IRichSpout { * @param realSpoutClassName the fully qualified JRuby spout implementation class name * @param fields the output fields names */ - public JRubySpout(String baseClassPath, String realSpoutClassName, String[] fields) { + public JRubySpout(String baseClassPath, String realSpoutClassName, Map fields) { _realSpoutClassName = realSpoutClassName; _fields = fields; _bootstrap = "require '" + baseClassPath + "'"; @@ -93,8 +94,13 @@ public class JRubySpout implements IRichSpout { // declareOutputFields is executed in the topology creation time, before serialisation. // just create tmp spout instance to call declareOutputFields. - if (_fields.length > 0) { - declarer.declare(new Fields(_fields)); + if (_fields.size() > 0) { + Iterator iterator = _fields.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry field = (Map.Entry)iterator.next(); + declarer.declareStream(field.getKey(), new Fields(field.getValue())); + iterator.remove(); + } } else { IRubyObject ruby_spout = initialize_ruby_spout(); IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer);