From 54e43e39dc0b110b09fe8e0a6af1ff9f67afbcff Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Tue, 3 Mar 2015 22:47:37 +0000 Subject: [PATCH 1/7] Select stream when setting source for bolt --- lib/red_storm/dsl/bolt.rb | 36 ++++- lib/red_storm/dsl/output_collector.rb | 9 ++ lib/red_storm/dsl/topology.rb | 24 +-- spec/red_storm/dsl/bolt_spec.rb | 70 ++++++++- spec/red_storm/dsl/output_collector_spec.rb | 6 + spec/red_storm/dsl/topology_spec.rb | 159 +++++++++++++++++--- 6 files changed, 275 insertions(+), 29 deletions(-) diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb index c3895ce..8db2f2c 100644 --- a/lib/red_storm/dsl/bolt.rb +++ b/lib/red_storm/dsl/bolt.rb @@ -58,14 +58,26 @@ module RedStorm self.class.log end + def stream + self.class.stream + end + def unanchored_emit(*values) @collector.emit_tuple(Values.new(*values)) end + def unanchored_stream_emit(stream, *values) + @collector.emit_tuple_stream(stream, Values.new(*values)) + end + def anchored_emit(tuple, *values) @collector.emit_anchor_tuple(tuple, Values.new(*values)) end + def anchored_stream_emit(stream, tuple, *values) + @collector.emit_anchor_tuple_stream(stream, tuple, Values.new(*values)) + end + def ack(tuple) @collector.ack(tuple) end @@ -80,7 +92,21 @@ module RedStorm output = on_receive(tuple) if output && self.class.emit? values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output - values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)} + values_list.each do |values| + if self.class.anchor? + if self.class.stream? + anchored_stream_emit(self.stream, tuple, *values) + else + anchored_emit(tuple, *values) + end + else + if self.class.stream? + unanchored_stream_emit(self.stream, *values) + else + unanchored_emit(*values) + end + end + end @collector.ack(tuple) if self.class.ack? end end @@ -137,6 +163,14 @@ module RedStorm !!self.receive_options[:anchor] end + def self.stream? + self.receive_options[:stream] && !self.receive_options[:stream].empty? + end + + def self.stream + self.receive_options[:stream] + end + # below non-dry see Spout class def self.inherited(subclass) path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(BoltError, "unable to extract base topology class path from #{caller.first.inspect}") diff --git a/lib/red_storm/dsl/output_collector.rb b/lib/red_storm/dsl/output_collector.rb index d7a56e2..1cca8d1 100644 --- a/lib/red_storm/dsl/output_collector.rb +++ b/lib/red_storm/dsl/output_collector.rb @@ -6,4 +6,13 @@ java_import 'backtype.storm.tuple.Tuple' class OutputCollector java_alias :emit_tuple, :emit, [java.lang.Class.for_name("java.util.List")] java_alias :emit_anchor_tuple, :emit, [Tuple.java_class, java.lang.Class.for_name("java.util.List")] + java_alias :emit_tuple_stream, :emit, [ + java.lang.String, + java.lang.Class.for_name("java.util.List") + ] + java_alias :emit_anchor_tuple_stream, :emit, [ + java.lang.String, + Tuple.java_class, + java.lang.Class.for_name("java.util.List") + ] end diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index 121bca2..b09429e 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -60,29 +60,33 @@ module RedStorm @sources = [] end - def source(source_id, grouping) - @sources << [source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}] + def source(source_id, grouping, stream = 'default') + @sources << [ + source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, + grouping.is_a?(Hash) ? grouping : {grouping => nil}, + stream.to_s + ] end def define_grouping(declarer) - @sources.each do |source_id, grouping| + @sources.each do |source_id, grouping, stream| grouper, params = grouping.first # declarer.fieldsGrouping(source_id, Fields.new()) case grouper when :fields - declarer.fieldsGrouping(source_id, Fields.new(*([params].flatten.map(&:to_s)))) + declarer.fieldsGrouping(source_id, stream, Fields.new(*([params].flatten.map(&:to_s)))) when :global - declarer.globalGrouping(source_id) + declarer.globalGrouping(source_id, stream) when :shuffle - declarer.shuffleGrouping(source_id) + declarer.shuffleGrouping(source_id, stream) when :local_or_shuffle - declarer.localOrShuffleGrouping(source_id) + declarer.localOrShuffleGrouping(source_id, stream) when :none - declarer.noneGrouping(source_id) + declarer.noneGrouping(source_id, stream) when :all - declarer.allGrouping(source_id) + declarer.allGrouping(source_id, stream) when :direct - declarer.directGrouping(source_id) + declarer.directGrouping(source_id, stream) else raise("unknown grouper=#{grouper.inspect}") end diff --git a/spec/red_storm/dsl/bolt_spec.rb b/spec/red_storm/dsl/bolt_spec.rb index 52b05a1..83cdcd4 100644 --- a/spec/red_storm/dsl/bolt_spec.rb +++ b/spec/red_storm/dsl/bolt_spec.rb @@ -603,6 +603,74 @@ describe RedStorm::SimpleBolt do bolt.prepare(nil, nil, collector) bolt.execute("output") end + + it "should emit tuple on a stream" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :stream => :custom_stream do |tuple| + tuple + end + end + class Bolt2 < RedStorm::SimpleBolt + on_receive :my_method, :stream => :custom_stream + def my_method(tuple); tuple; end + end + class Bolt3 < RedStorm::SimpleBolt + on_receive :stream => :custom_stream + def on_receive(tuple); tuple; end + end + + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values") + collector.should_receive(:emit_tuple_stream).with(:custom_stream, "values").exactly(3).times + + bolt = Bolt1.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") + + bolt = Bolt2.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") + + bolt = Bolt3.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") + end + + it "should emit anchored tuple on a stream" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :anchor => true, :stream => :custom_stream do |tuple| + "output" + end + end + class Bolt2 < RedStorm::SimpleBolt + on_receive :my_method, :anchor => true, :stream => :custom_stream + def my_method(tuple) + "output" + end + end + class Bolt3 < RedStorm::SimpleBolt + on_receive :anchor => true, :stream => :custom_stream + def on_receive(tuple) + "output" + end + end + + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values") + collector.should_receive(:emit_anchor_tuple_stream).with(:custom_stream, "tuple", "values").exactly(3).times + + bolt = Bolt1.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") + + bolt = Bolt2.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") + + bolt = Bolt3.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") + end end describe "prepare" do @@ -701,4 +769,4 @@ describe RedStorm::SimpleBolt do end end -end \ No newline at end of file +end diff --git a/spec/red_storm/dsl/output_collector_spec.rb b/spec/red_storm/dsl/output_collector_spec.rb index d604337..82ddeb7 100644 --- a/spec/red_storm/dsl/output_collector_spec.rb +++ b/spec/red_storm/dsl/output_collector_spec.rb @@ -10,5 +10,11 @@ describe OutputCollector do # We should have an alias for #emit_anchor_tuple it { should respond_to :emit_anchor_tuple } + + # We should have an alias for #emit_tuple_stream + it { should respond_to :emit_tuple_stream } + + # We should have an alias for #emit_anchor_tuple_stream + it { should respond_to :emit_anchor_tuple_stream } end end diff --git a/spec/red_storm/dsl/topology_spec.rb b/spec/red_storm/dsl/topology_spec.rb index 9500679..64866ea 100644 --- a/spec/red_storm/dsl/topology_spec.rb +++ b/spec/red_storm/dsl/topology_spec.rb @@ -340,6 +340,7 @@ describe RedStorm::SimpleTopology do configurator = mock(RedStorm::Configurator) jruby_bolt1 = mock(RedStorm::JRubyBolt) jruby_bolt2 = mock(RedStorm::JRubyBolt) + jruby_bolt3 = mock(RedStorm::JRubyBolt) declarer = mock("Declarer") RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) @@ -394,7 +395,20 @@ describe RedStorm::SimpleTopology do end RedStorm::Fields.should_receive(:new).with("f1").and_return("fields") - @declarer.should_receive("fieldsGrouping").with('1', "fields") + @declarer.should_receive("fieldsGrouping").with('1', 'default', "fields") + Topology1.new.start(:cluster) + end + + it "should support single string fields with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, { :fields => "f1" }, 'custom_stream' + end + end + + RedStorm::Fields.should_receive(:new).with("f1").and_return("fields") + @declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields") Topology1.new.start(:cluster) end @@ -407,7 +421,20 @@ describe RedStorm::SimpleTopology do end RedStorm::Fields.should_receive(:new).with("s1").and_return("fields") - @declarer.should_receive("fieldsGrouping").with('1', "fields") + @declarer.should_receive("fieldsGrouping").with('1', 'default', "fields") + Topology1.new.start(:cluster) + end + + it "should support single symbolic fields with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, { :fields => :s1 }, 'custom_stream' + end + end + + RedStorm::Fields.should_receive(:new).with("s1").and_return("fields") + @declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields") Topology1.new.start(:cluster) end @@ -420,7 +447,20 @@ describe RedStorm::SimpleTopology do end RedStorm::Fields.should_receive(:new).with("f1", "f2").and_return("fields") - @declarer.should_receive("fieldsGrouping").with('1', "fields") + @declarer.should_receive("fieldsGrouping").with('1', 'default', "fields") + Topology1.new.start(:cluster) + end + + it "should support string array fields with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, { :fields => ["f1", "f2"] }, 'custom_stream' + end + end + + RedStorm::Fields.should_receive(:new).with("f1", "f2").and_return("fields") + @declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields") Topology1.new.start(:cluster) end @@ -433,7 +473,20 @@ describe RedStorm::SimpleTopology do end RedStorm::Fields.should_receive(:new).with("s1", "s2").and_return("fields") - @declarer.should_receive("fieldsGrouping").with('1', "fields") + @declarer.should_receive("fieldsGrouping").with('1', 'default', "fields") + Topology1.new.start(:cluster) + end + + it "should support symbolic array fields with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, { :fields => [:s1, :s2] }, 'custom_stream' + end + end + + RedStorm::Fields.should_receive(:new).with("s1", "s2").and_return("fields") + @declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields") Topology1.new.start(:cluster) end @@ -445,7 +498,19 @@ describe RedStorm::SimpleTopology do end end - @declarer.should_receive("shuffleGrouping").with('1') + @declarer.should_receive("shuffleGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support shuffle with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :shuffle, 'custom_stream' + end + end + + @declarer.should_receive("shuffleGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -457,7 +522,19 @@ describe RedStorm::SimpleTopology do end end - @declarer.should_receive("localOrShuffleGrouping").with('1') + @declarer.should_receive("localOrShuffleGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support local_or_shuffle with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :local_or_shuffle, 'custom_stream' + end + end + + @declarer.should_receive("localOrShuffleGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -469,7 +546,19 @@ describe RedStorm::SimpleTopology do end end - @declarer.should_receive("noneGrouping").with('1') + @declarer.should_receive("noneGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support none" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :none, 'custom_stream' + end + end + + @declarer.should_receive("noneGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -481,7 +570,19 @@ describe RedStorm::SimpleTopology do end end - @declarer.should_receive("globalGrouping").with('1') + @declarer.should_receive("globalGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support global with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :global, 'custom_stream' + end + end + + @declarer.should_receive("globalGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -493,7 +594,19 @@ describe RedStorm::SimpleTopology do end end - @declarer.should_receive("allGrouping").with('1') + @declarer.should_receive("allGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support all with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :all, 'custom_stream' + end + end + + @declarer.should_receive("allGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end @@ -505,7 +618,19 @@ describe RedStorm::SimpleTopology do end end - @declarer.should_receive("directGrouping").with('1') + @declarer.should_receive("directGrouping").with('1', 'default') + Topology1.new.start(:cluster) + end + + it "should support direct with a stream" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + bolt BoltClass1 do + source 1, :direct, 'custom_stream' + end + end + + @declarer.should_receive("directGrouping").with('1', 'custom_stream') Topology1.new.start(:cluster) end end @@ -563,7 +688,7 @@ describe RedStorm::SimpleTopology do Topology1.spouts.first.id.should == '1' Topology1.bolts.first.id.should == '2' - Topology1.bolts.first.sources.first.should == ['1', {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ['1', {:shuffle => nil}, 'default'] end it "should support explicit string ids" do @@ -577,7 +702,7 @@ describe RedStorm::SimpleTopology do Topology1.spouts.first.id.should == "id1" Topology1.bolts.first.id.should == "id2" - Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}, 'default'] end it "should support implicit string ids" do @@ -591,7 +716,7 @@ describe RedStorm::SimpleTopology do Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" - Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}, 'default'] end it "should support implicit symbol ids" do @@ -605,7 +730,7 @@ describe RedStorm::SimpleTopology do Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" - Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}, 'default'] end it "should support implicit class ids" do @@ -619,7 +744,7 @@ describe RedStorm::SimpleTopology do Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" - Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}, 'default'] end it "should raise on unresolvable" do @@ -633,7 +758,7 @@ describe RedStorm::SimpleTopology do Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" - Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}] + Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}, 'default'] lambda {Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)}.should raise_error RuntimeError, "cannot resolve BoltClass1 source id=dummy" end @@ -651,4 +776,4 @@ describe RedStorm::SimpleTopology do end end -end \ No newline at end of file +end From 32cceb724ea4b1514e0d2a642c2ad51b89489cc4 Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Tue, 10 Mar 2015 16:09:21 +0000 Subject: [PATCH 2/7] Bolts support declaring output streams --- lib/red_storm/dsl/bolt.rb | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb index 8db2f2c..b37537a 100644 --- a/lib/red_storm/dsl/bolt.rb +++ b/lib/red_storm/dsl/bolt.rb @@ -23,7 +23,14 @@ module RedStorm end def self.output_fields(*fields) - @fields = fields.map(&:to_s) + @fields ||= [] + fields.each do |field| + if field.kind_of? Hash + @fields << Hash[ field.map { |k, v| [k.to_s, v.to_s] } ] + else + @fields << field.to_s + end + end end def self.configure(&configure_block) @@ -124,7 +131,18 @@ module RedStorm end def declare_output_fields(declarer) - declarer.declare(Fields.new(self.class.fields)) + default_fields = [] + self.class.fields.each do |field| + if field.kind_of? Hash + field.each do |stream, fields| + declarer.declareStream(stream, Fields.new(fields)) + end + else + default_fields << field + end + end + + declarer.declare(Fields.new(default_fields.flatten)) end def get_component_configuration From dbf7888bb79fa81c782170fcb37cc55d2bd786f0 Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Tue, 10 Mar 2015 21:37:22 +0000 Subject: [PATCH 3/7] New test/specs for bolt stream support --- lib/red_storm/dsl/bolt.rb | 6 +- spec/red_storm/dsl/bolt_spec.rb | 107 +++++++++++++++++++++++++++----- 2 files changed, 96 insertions(+), 17 deletions(-) diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb index b37537a..f0f851c 100644 --- a/lib/red_storm/dsl/bolt.rb +++ b/lib/red_storm/dsl/bolt.rb @@ -26,7 +26,9 @@ module RedStorm @fields ||= [] fields.each do |field| if field.kind_of? Hash - @fields << Hash[ field.map { |k, v| [k.to_s, v.to_s] } ] + @fields << Hash[ + field.map { |k, v| [k.to_s, v.kind_of?(Array) ? v.map(&:to_s) : v.to_s] } + ] else @fields << field.to_s end @@ -142,7 +144,7 @@ module RedStorm end end - declarer.declare(Fields.new(default_fields.flatten)) + declarer.declare(Fields.new(default_fields.flatten)) unless default_fields.empty? end def get_component_configuration diff --git a/spec/red_storm/dsl/bolt_spec.rb b/spec/red_storm/dsl/bolt_spec.rb index 83cdcd4..97872e6 100644 --- a/spec/red_storm/dsl/bolt_spec.rb +++ b/spec/red_storm/dsl/bolt_spec.rb @@ -62,6 +62,27 @@ describe RedStorm::SimpleBolt do Bolt1.send(:fields).should == ["f1", "f2"] end + it "should parse single hash argument" do + class Bolt1 < RedStorm::SimpleBolt + output_fields :stream => :f1 + end + Bolt1.send(:fields).should == [{"stream" => "f1"}] + end + + it "should parse hash of string and symbols" do + class Bolt1 < RedStorm::SimpleBolt + output_fields "stream" => [:f1, :f2] + end + Bolt1.send(:fields).should == [{"stream" => ["f1", "f2"]}] + end + + it "should parse string and hash arguments" do + class Bolt1 < RedStorm::SimpleBolt + output_fields :f1, :stream => :f2 + end + Bolt1.send(:fields).should == ["f1", {"stream" => "f2"}] + end + it "should not share state over mutiple classes" do class Bolt1 < RedStorm::SimpleBolt output_fields :f1 @@ -115,6 +136,7 @@ describe RedStorm::SimpleBolt do Bolt1.send(:emit?).should be_true Bolt1.send(:ack?).should be_false Bolt1.send(:anchor?).should be_false + Bolt1.send(:stream?).should be_false end it "should parse :emit option" do @@ -147,16 +169,27 @@ describe RedStorm::SimpleBolt do Bolt1.send(:anchor?).should be_true end - it "should parse multiple option" do + it "should parse :stream option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :emit => false, :ack =>true, :anchor => true do + on_receive :stream => "test" do end end - Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:stream => "test") + Bolt1.send(:stream?).should be_true + end + + it "should parse multiple option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :emit => false, :ack =>true, :anchor => true, :stream => "test" do + end + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true, :stream => "test") Bolt1.send(:emit?).should be_false Bolt1.send(:ack?).should be_true Bolt1.send(:anchor?).should be_true + Bolt1.send(:stream?).should be_true end end @@ -166,13 +199,13 @@ describe RedStorm::SimpleBolt do class Bolt1 < RedStorm::SimpleBolt def test_method; end on_receive :test_method - end Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS Bolt1.send(:emit?).should be_true Bolt1.send(:ack?).should be_false Bolt1.send(:anchor?).should be_false + Bolt1.send(:stream?).should be_false end it "should parse :emit option" do @@ -186,8 +219,7 @@ describe RedStorm::SimpleBolt do it "should parse :ack option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :ack => true do - end + on_receive :test_method, :ack => true end Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:ack => true) @@ -196,24 +228,32 @@ describe RedStorm::SimpleBolt do it "should parse :anchor option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :anchor => true do - end + on_receive :test_method, :anchor => true end Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:anchor => true) Bolt1.send(:anchor?).should be_true end - it "should parse multiple option" do + it "should parse :stream option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :emit => false, :ack =>true, :anchor => true do - end + on_receive :test_method, :stream => "test" end - Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:stream => "test") + Bolt1.send(:stream?).should be_true + end + + it "should parse multiple option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :test_method, :emit => false, :ack =>true, :anchor => true, :stream => "test" + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true, :stream => "test") Bolt1.send(:emit?).should be_false Bolt1.send(:ack?).should be_true Bolt1.send(:anchor?).should be_true + Bolt1.send(:stream?).should be_true end end @@ -227,6 +267,7 @@ describe RedStorm::SimpleBolt do Bolt1.send(:emit?).should be_true Bolt1.send(:ack?).should be_false Bolt1.send(:anchor?).should be_false + Bolt1.send(:stream?).should be_false end it "should parse :emit option" do @@ -256,15 +297,25 @@ describe RedStorm::SimpleBolt do Bolt1.send(:anchor?).should be_true end - it "should parse multiple option" do + it "should parse :stream option" do class Bolt1 < RedStorm::SimpleBolt - on_receive :emit => false, :ack =>true, :anchor => true + on_receive :stream => "test" end - Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:stream => "test") + Bolt1.send(:stream?).should be_true + end + + it "should parse multiple option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :emit => false, :ack =>true, :anchor => true, :stream => "test" + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true, :stream => "test") Bolt1.send(:emit?).should be_false Bolt1.send(:ack?).should be_true Bolt1.send(:anchor?).should be_true + Bolt1.send(:stream?).should be_true end end end @@ -757,6 +808,32 @@ describe RedStorm::SimpleBolt do RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields") bolt.declare_output_fields(declarer) end + + it "should declare stream with fields" do + class Bolt1 < RedStorm::SimpleBolt + output_fields :stream => [:f1, :f2] + end + bolt = Bolt1.new + class RedStorm::Fields; end + declarer = mock("Declarer") + declarer.should_receive(:declareStream).with("stream", "fields") + RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields") + bolt.declare_output_fields(declarer) + end + + it "should declare default stream fields and custom stream fields" do + class Bolt1 < RedStorm::SimpleBolt + output_fields :f1, :f2, :stream => [:f3, :f4] + end + bolt = Bolt1.new + class RedStorm::Fields; end + declarer = mock("Declarer") + declarer.should_receive(:declareStream).with("stream", "stream_fields") + declarer.should_receive(:declare).with("default_fields") + RedStorm::Fields.should_receive(:new).with(["f3", "f4"]).and_return("stream_fields") + RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("default_fields") + bolt.declare_output_fields(declarer) + end end describe "get_component_configuration" do From 7b7f3f7c993cb208f40e613bb32ce504626d944f Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Tue, 31 Mar 2015 19:56:04 +0000 Subject: [PATCH 4/7] Bolt/Spouts in topos need to support streams This commit makes it possible to declare streams and fields for bolts and streams inside of the Topology definition. --- lib/red_storm/dsl/bolt.rb | 47 +-------------- lib/red_storm/dsl/output_fields.rb | 58 +++++++++++++++++++ lib/red_storm/dsl/spout.rb | 15 +---- lib/red_storm/dsl/topology.rb | 29 ++++++++-- spec/red_storm/dsl/topology_spec.rb | 25 ++++---- src/main/redstorm/storm/jruby/JRubyBolt.java | 15 +++-- src/main/redstorm/storm/jruby/JRubySpout.java | 14 +++-- 7 files changed, 125 insertions(+), 78 deletions(-) create mode 100644 lib/red_storm/dsl/output_fields.rb diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb index f0f851c..a0c6e9b 100644 --- a/lib/red_storm/dsl/bolt.rb +++ b/lib/red_storm/dsl/bolt.rb @@ -1,6 +1,7 @@ require 'java' require 'red_storm/configurator' require 'red_storm/environment' +require 'red_storm/dsl/output_fields' require 'pathname' java_import 'backtype.storm.tuple.Fields' @@ -14,6 +15,8 @@ module RedStorm class Bolt attr_reader :collector, :context, :config + include OutputFields + def self.java_proxy; "Java::RedstormStormJruby::JRubyBolt"; end # DSL class methods @@ -22,19 +25,6 @@ module RedStorm @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) end - def self.output_fields(*fields) - @fields ||= [] - fields.each do |field| - if field.kind_of? Hash - @fields << Hash[ - field.map { |k, v| [k.to_s, v.kind_of?(Array) ? v.map(&:to_s) : v.to_s] } - ] - else - @fields << field.to_s - end - end - end - def self.configure(&configure_block) @configure_block = block_given? ? configure_block : lambda {} end @@ -67,10 +57,6 @@ module RedStorm self.class.log end - def stream - self.class.stream - end - def unanchored_emit(*values) @collector.emit_tuple(Values.new(*values)) end @@ -132,21 +118,6 @@ module RedStorm on_close end - def declare_output_fields(declarer) - default_fields = [] - self.class.fields.each do |field| - if field.kind_of? Hash - field.each do |stream, fields| - declarer.declareStream(stream, Fields.new(fields)) - end - else - default_fields << field - end - end - - declarer.declare(Fields.new(default_fields.flatten)) unless default_fields.empty? - end - def get_component_configuration configurator = Configurator.new configurator.instance_exec(&self.class.configure_block) @@ -159,10 +130,6 @@ module RedStorm def on_init; end def on_close; end - def self.fields - @fields ||= [] - end - def self.configure_block @configure_block ||= lambda {} end @@ -183,14 +150,6 @@ module RedStorm !!self.receive_options[:anchor] end - def self.stream? - self.receive_options[:stream] && !self.receive_options[:stream].empty? - end - - def self.stream - self.receive_options[:stream] - end - # below non-dry see Spout class def self.inherited(subclass) path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(BoltError, "unable to extract base topology class path from #{caller.first.inspect}") diff --git a/lib/red_storm/dsl/output_fields.rb b/lib/red_storm/dsl/output_fields.rb new file mode 100644 index 0000000..a506711 --- /dev/null +++ b/lib/red_storm/dsl/output_fields.rb @@ -0,0 +1,58 @@ + +module RedStorm + module DSL + module OutputFields + + def self.included(base) + base.extend ClassMethods + end + + def declare_output_fields(declarer) + default_fields = [] + self.class.fields.each do |field| + if field.kind_of? Hash + field.each do |stream, fields| + declarer.declareStream(stream, Fields.new(fields)) + end + else + default_fields << field + end + end + + declarer.declare(Fields.new(default_fields.flatten)) unless default_fields.empty? + end + + def stream + self.class.stream + end + + module ClassMethods + def output_fields(*fields) + @fields ||= [] + fields.each do |field| + if field.kind_of? Hash + @fields << Hash[ + field.map { |k, v| [k.to_s, v.kind_of?(Array) ? v.map(&:to_s) : v.to_s] } + ] + else + @fields << field.to_s + end + end + end + + def fields + @fields ||= [] + end + + def stream? + self.receive_options[:stream] && !self.receive_options[:stream].empty? + end + + def stream + self.receive_options[:stream] + end + end + end + end +end + diff --git a/lib/red_storm/dsl/spout.rb b/lib/red_storm/dsl/spout.rb index 4846742..14ab5e8 100644 --- a/lib/red_storm/dsl/spout.rb +++ b/lib/red_storm/dsl/spout.rb @@ -1,6 +1,7 @@ require 'java' require 'red_storm/configurator' require 'red_storm/environment' +require 'red_storm/dsl/output_fields' require 'pathname' module RedStorm @@ -11,6 +12,8 @@ module RedStorm class Spout attr_reader :config, :context, :collector + include OutputFields + def self.java_proxy; "Java::RedstormStormJruby::JRubySpout"; end # DSL class methods @@ -23,10 +26,6 @@ module RedStorm @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) end - def self.output_fields(*fields) - @fields = fields.map(&:to_s) - end - def self.on_send(*args, &on_send_block) options = args.last.is_a?(Hash) ? args.pop : {} method_name = args.first @@ -126,10 +125,6 @@ module RedStorm on_deactivate end - def declare_output_fields(declarer) - declarer.declare(Fields.new(self.class.fields)) - end - def ack(msg_id) on_ack(msg_id) end @@ -154,10 +149,6 @@ module RedStorm def on_ack(msg_id); end def on_fail(msg_id); end - def self.fields - @fields ||= [] - end - def self.configure_block @configure_block ||= lambda {} end diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index b09429e..ecb6dcb 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -26,16 +26,37 @@ module RedStorm @constructor_args = constructor_args @id = id.to_s @parallelism = parallelism - @output_fields = [] + @output_fields = Hash.new([]) end def output_fields(*args) - args.empty? ? @output_fields : @output_fields = args.map(&:to_s) + args.each do |field| + if field.kind_of? Hash + field.each { |k, v| merge_fields(k.to_s, v) } + else + merge_fields('default', field) + end + end + @output_fields end def is_java? @clazz.name.split('::').first.downcase == 'java' end + + private + + def java_safe_fields + java_hash = java.util.HashMap.new() + @output_fields.each do |k, v| + java_hash.put(k, v.to_java('java.lang.String')) + end + java_hash + end + + def merge_fields(stream, fields) + @output_fields[stream] |= fields.kind_of?(Array) ? fields.map(&:to_s) : [fields.to_s] + end end class SpoutDefinition < ComponentDefinition @@ -47,7 +68,7 @@ module RedStorm elsif is_java? @clazz.new(*constructor_args) else - Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields) + Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields) end end end @@ -100,7 +121,7 @@ module RedStorm elsif is_java? @clazz.new(*constructor_args) else - Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields) + Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields) end end end diff --git a/spec/red_storm/dsl/topology_spec.rb b/spec/red_storm/dsl/topology_spec.rb index 64866ea..c1b25ce 100644 --- a/spec/red_storm/dsl/topology_spec.rb +++ b/spec/red_storm/dsl/topology_spec.rb @@ -1,6 +1,8 @@ require 'spec_helper' require 'red_storm/dsl/topology' +require 'pry' + describe RedStorm::SimpleTopology do # mock Storm imported classes @@ -112,8 +114,11 @@ describe RedStorm::SimpleTopology do output_fields :f3 end end - Topology1.spouts.first.output_fields.should == ["f1", "f2"] - Topology1.spouts.last.output_fields.should == [ "f3"] + # Pry.config.input = STDIN + # Pry.config.output = STDOUT + # binding.pry + Topology1.spouts.first.output_fields.should == { "default" => ["f1", "f2"] } + Topology1.spouts.last.output_fields.should == { "default" => ["f3"] } end end @@ -195,8 +200,8 @@ describe RedStorm::SimpleTopology do output_fields :f3 end end - Topology1.bolts.first.output_fields.should == ["f1", "f2"] - Topology1.bolts.last.output_fields.should == [ "f3"] + Topology1.bolts.first.output_fields.should == { "default" => ["f1", "f2"] } + Topology1.bolts.last.output_fields.should == { "default" => ["f3"] } end end @@ -307,8 +312,8 @@ describe RedStorm::SimpleTopology do RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) RedStorm::Configurator.should_receive(:new).and_return(configurator) - RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout1) - RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", []).and_return(jruby_spout2) + RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", {}).and_return(jruby_spout1) + RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", {}).and_return(jruby_spout2) builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1).and_return(declarer) builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1).and_return(declarer) @@ -345,8 +350,8 @@ describe RedStorm::SimpleTopology do RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) RedStorm::Configurator.should_receive(:new).and_return(configurator) - RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt1) - RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", []).and_return(jruby_bolt2) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", {}).and_return(jruby_bolt1) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", {}).and_return(jruby_bolt2) builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return(declarer) builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer) @@ -377,8 +382,8 @@ describe RedStorm::SimpleTopology do backtype_config = mock(Backtype::Config) Backtype::Config.should_receive(:new).any_number_of_times.and_return(backtype_config) backtype_config.should_receive(:put) - RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt) - RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", {}).and_return(jruby_bolt) + RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", {}).and_return(jruby_spout) builder.should_receive("setBolt").with('bolt_class1', jruby_bolt, 1).and_return(@declarer) builder.should_receive("setSpout").with('1', jruby_spout, 1).and_return(@declarer) @declarer.should_receive("addConfigurations").twice diff --git a/src/main/redstorm/storm/jruby/JRubyBolt.java b/src/main/redstorm/storm/jruby/JRubyBolt.java index a4106e5..abbb49f 100644 --- a/src/main/redstorm/storm/jruby/JRubyBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyBolt.java @@ -6,9 +6,11 @@ import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; +import java.util.Iterator; import java.util.Map; import org.jruby.Ruby; +import org.jruby.RubyHash; import org.jruby.RubyObject; import org.jruby.runtime.Helpers; import org.jruby.runtime.builtin.IRubyObject; @@ -27,7 +29,7 @@ import org.jruby.exceptions.RaiseException; */ public class JRubyBolt implements IRichBolt { private final String _realBoltClassName; - private final String[] _fields; + private final Map _fields; private final String _bootstrap; // transient to avoid serialization @@ -41,7 +43,7 @@ public class JRubyBolt implements IRichBolt { * @param realBoltClassName the fully qualified JRuby bolt implementation class name * @param fields the output fields names */ - public JRubyBolt(String baseClassPath, String realBoltClassName, String[] fields) { + public JRubyBolt(String baseClassPath, String realBoltClassName, Map fields) { _realBoltClassName = realBoltClassName; _fields = fields; _bootstrap = "require '" + baseClassPath + "'"; @@ -72,8 +74,13 @@ public class JRubyBolt implements IRichBolt { // declareOutputFields is executed in the topology creation time, before serialisation. // just create tmp bolt instance to call declareOutputFields. - if (_fields.length > 0) { - declarer.declare(new Fields(_fields)); + if (_fields.size() > 0) { + Iterator iterator = _fields.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry field = (Map.Entry)iterator.next(); + declarer.declareStream(field.getKey(), new Fields(field.getValue())); + iterator.remove(); + } } else { IRubyObject ruby_bolt = initialize_ruby_bolt(); IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer); diff --git a/src/main/redstorm/storm/jruby/JRubySpout.java b/src/main/redstorm/storm/jruby/JRubySpout.java index b30d292..a6d81f2 100644 --- a/src/main/redstorm/storm/jruby/JRubySpout.java +++ b/src/main/redstorm/storm/jruby/JRubySpout.java @@ -6,6 +6,7 @@ import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; +import java.util.Iterator; import java.util.Map; import org.jruby.Ruby; @@ -27,7 +28,7 @@ import org.jruby.exceptions.RaiseException; */ public class JRubySpout implements IRichSpout { private final String _realSpoutClassName; - private final String[] _fields; + private final Map _fields; private final String _bootstrap; // transient to avoid serialization @@ -41,7 +42,7 @@ public class JRubySpout implements IRichSpout { * @param realSpoutClassName the fully qualified JRuby spout implementation class name * @param fields the output fields names */ - public JRubySpout(String baseClassPath, String realSpoutClassName, String[] fields) { + public JRubySpout(String baseClassPath, String realSpoutClassName, Map fields) { _realSpoutClassName = realSpoutClassName; _fields = fields; _bootstrap = "require '" + baseClassPath + "'"; @@ -93,8 +94,13 @@ public class JRubySpout implements IRichSpout { // declareOutputFields is executed in the topology creation time, before serialisation. // just create tmp spout instance to call declareOutputFields. - if (_fields.length > 0) { - declarer.declare(new Fields(_fields)); + if (_fields.size() > 0) { + Iterator iterator = _fields.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry field = (Map.Entry)iterator.next(); + declarer.declareStream(field.getKey(), new Fields(field.getValue())); + iterator.remove(); + } } else { IRubyObject ruby_spout = initialize_ruby_spout(); IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer); From db8547ebd681ebaebe9510ce594b104686aa06e3 Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Thu, 16 Apr 2015 21:16:33 +0000 Subject: [PATCH 5/7] Output fields in ruby classes are declared in java Fixes an issue where outfields that are defined in the Spout or Bolt are not recognized by the Nimbus server when it validates the existence of streams for all bolts in the topology. --- lib/red_storm/dsl/output_fields.rb | 28 +++++++++------------------- lib/red_storm/dsl/topology.rb | 23 +++++++++++------------ spec/red_storm/dsl/bolt_spec.rb | 22 +++++++++++----------- spec/red_storm/dsl/spout_spec.rb | 16 ++++++++-------- spec/red_storm/dsl/topology_spec.rb | 15 ++++++--------- 5 files changed, 45 insertions(+), 59 deletions(-) diff --git a/lib/red_storm/dsl/output_fields.rb b/lib/red_storm/dsl/output_fields.rb index a506711..fb09d9e 100644 --- a/lib/red_storm/dsl/output_fields.rb +++ b/lib/red_storm/dsl/output_fields.rb @@ -1,4 +1,3 @@ - module RedStorm module DSL module OutputFields @@ -8,18 +7,9 @@ module RedStorm end def declare_output_fields(declarer) - default_fields = [] - self.class.fields.each do |field| - if field.kind_of? Hash - field.each do |stream, fields| - declarer.declareStream(stream, Fields.new(fields)) - end - else - default_fields << field - end + self.class.fields.each do |stream, fields| + declarer.declareStream(stream, Fields.new(fields)) end - - declarer.declare(Fields.new(default_fields.flatten)) unless default_fields.empty? end def stream @@ -27,21 +17,21 @@ module RedStorm end module ClassMethods + def output_fields(*fields) - @fields ||= [] + @output_fields ||= Hash.new([]) fields.each do |field| - if field.kind_of? Hash - @fields << Hash[ - field.map { |k, v| [k.to_s, v.kind_of?(Array) ? v.map(&:to_s) : v.to_s] } - ] + case field + when Hash + field.each { |k, v| @output_fields[k.to_s] = v.kind_of?(Array) ? v.map(&:to_s) : [v.to_s] } else - @fields << field.to_s + @output_fields['default'] |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s] end end end def fields - @fields ||= [] + @output_fields ||= Hash.new([]) end def stream? diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index ecb6dcb..de1efdb 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -4,6 +4,7 @@ require 'red_storm/configurator' java_import 'backtype.storm.topology.TopologyBuilder' java_import 'backtype.storm.generated.SubmitOptions' +java_import 'backtype.storm.utils.Utils' module RedStorm module DSL @@ -26,17 +27,19 @@ module RedStorm @constructor_args = constructor_args @id = id.to_s @parallelism = parallelism - @output_fields = Hash.new([]) + @output_fields = @clazz.fields.clone end - def output_fields(*args) - args.each do |field| - if field.kind_of? Hash - field.each { |k, v| merge_fields(k.to_s, v) } + def output_fields(*fields) + fields.each do |field| + case field + when Hash + field.each { |k, v| @output_fields[k.to_s] = v.kind_of?(Array) ? v.map(&:to_s) : [v.to_s] } else - merge_fields('default', field) + @output_fields[Utils::DEFAULT_STREAM_ID] |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s] end end + @output_fields end @@ -49,14 +52,10 @@ module RedStorm def java_safe_fields java_hash = java.util.HashMap.new() @output_fields.each do |k, v| - java_hash.put(k, v.to_java('java.lang.String')) + java_hash.put(k, v.to_java('java.lang.String')) unless v.empty? end java_hash end - - def merge_fields(stream, fields) - @output_fields[stream] |= fields.kind_of?(Array) ? fields.map(&:to_s) : [fields.to_s] - end end class SpoutDefinition < ComponentDefinition @@ -81,7 +80,7 @@ module RedStorm @sources = [] end - def source(source_id, grouping, stream = 'default') + def source(source_id, grouping, stream = Utils::DEFAULT_STREAM_ID) @sources << [ source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}, diff --git a/spec/red_storm/dsl/bolt_spec.rb b/spec/red_storm/dsl/bolt_spec.rb index 97872e6..fc7ada6 100644 --- a/spec/red_storm/dsl/bolt_spec.rb +++ b/spec/red_storm/dsl/bolt_spec.rb @@ -45,42 +45,42 @@ describe RedStorm::SimpleBolt do output_fields :f1 end bolt = Bolt1.new - Bolt1.send(:fields).should == ["f1"] + Bolt1.send(:fields).should == {"default" => ["f1"]} end it "should parse multiple arguments" do class Bolt1 < RedStorm::SimpleBolt output_fields :f1, :f2 end - Bolt1.send(:fields).should == ["f1", "f2"] + Bolt1.send(:fields).should == {"default" => ["f1", "f2"]} end it "should parse string and symbol arguments" do class Bolt1 < RedStorm::SimpleBolt output_fields :f1, "f2" end - Bolt1.send(:fields).should == ["f1", "f2"] + Bolt1.send(:fields).should == {"default" => ["f1", "f2"]} end it "should parse single hash argument" do class Bolt1 < RedStorm::SimpleBolt output_fields :stream => :f1 end - Bolt1.send(:fields).should == [{"stream" => "f1"}] + Bolt1.send(:fields).should == {"stream" => ["f1"]} end it "should parse hash of string and symbols" do class Bolt1 < RedStorm::SimpleBolt output_fields "stream" => [:f1, :f2] end - Bolt1.send(:fields).should == [{"stream" => ["f1", "f2"]}] + Bolt1.send(:fields).should == {"stream" => ["f1", "f2"]} end it "should parse string and hash arguments" do class Bolt1 < RedStorm::SimpleBolt output_fields :f1, :stream => :f2 end - Bolt1.send(:fields).should == ["f1", {"stream" => "f2"}] + Bolt1.send(:fields).should == {"default" => ["f1"], "stream" => ["f2"]} end it "should not share state over mutiple classes" do @@ -90,9 +90,9 @@ describe RedStorm::SimpleBolt do class Bolt2 < RedStorm::SimpleBolt output_fields :f2 end - RedStorm::SimpleBolt.send(:fields).should == [] - Bolt1.send(:fields).should == ["f1"] - Bolt2.send(:fields).should == ["f2"] + RedStorm::SimpleBolt.send(:fields).should == {} + Bolt1.send(:fields).should == {"default" => ["f1"]} + Bolt2.send(:fields).should == {"default" => ["f2"]} end end @@ -804,7 +804,7 @@ describe RedStorm::SimpleBolt do bolt = Bolt1.new class RedStorm::Fields; end declarer = mock("Declarer") - declarer.should_receive(:declare).with("fields") + declarer.should_receive(:declareStream).with("default", "fields") RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields") bolt.declare_output_fields(declarer) end @@ -829,7 +829,7 @@ describe RedStorm::SimpleBolt do class RedStorm::Fields; end declarer = mock("Declarer") declarer.should_receive(:declareStream).with("stream", "stream_fields") - declarer.should_receive(:declare).with("default_fields") + declarer.should_receive(:declareStream).with("default", "default_fields") RedStorm::Fields.should_receive(:new).with(["f3", "f4"]).and_return("stream_fields") RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("default_fields") bolt.declare_output_fields(declarer) diff --git a/spec/red_storm/dsl/spout_spec.rb b/spec/red_storm/dsl/spout_spec.rb index ece2a8d..e04d677 100644 --- a/spec/red_storm/dsl/spout_spec.rb +++ b/spec/red_storm/dsl/spout_spec.rb @@ -65,21 +65,21 @@ describe RedStorm::SimpleSpout do class Spout1 < RedStorm::SimpleSpout output_fields :f1 end - Spout1.send(:fields).should == ["f1"] + Spout1.send(:fields).should == {"default" => ["f1"]} end it "should parse multiple arguments" do class Spout1 < RedStorm::SimpleSpout output_fields :f1, :f2 end - Spout1.send(:fields).should == ["f1", "f2"] + Spout1.send(:fields).should == {"default" => ["f1", "f2"]} end it "should parse string and symbol arguments" do class Spout1 < RedStorm::SimpleSpout output_fields :f1, "f2" end - Spout1.send(:fields).should == ["f1", "f2"] + Spout1.send(:fields).should == {"default" => ["f1", "f2"]} end it "should not share state over mutiple classes" do @@ -89,9 +89,9 @@ describe RedStorm::SimpleSpout do class Spout2 < RedStorm::SimpleSpout output_fields :f2 end - RedStorm::SimpleSpout.send(:fields).should == [] - Spout1.send(:fields).should == ["f1"] - Spout2.send(:fields).should == ["f2"] + RedStorm::SimpleSpout.send(:fields).should == {} + Spout1.send(:fields).should == {"default" => ["f1"]} + Spout2.send(:fields).should == {"default" => ["f2"]} end end @@ -787,7 +787,7 @@ describe RedStorm::SimpleSpout do spout = Spout1.new class RedStorm::Fields; end declarer = mock("Declarer") - declarer.should_receive(:declare).with("fields") + declarer.should_receive(:declareStream).with("default", "fields") RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields") spout.declare_output_fields(declarer) end @@ -879,4 +879,4 @@ describe RedStorm::SimpleSpout do end end end -end \ No newline at end of file +end diff --git a/spec/red_storm/dsl/topology_spec.rb b/spec/red_storm/dsl/topology_spec.rb index c1b25ce..2e52208 100644 --- a/spec/red_storm/dsl/topology_spec.rb +++ b/spec/red_storm/dsl/topology_spec.rb @@ -1,7 +1,7 @@ require 'spec_helper' require 'red_storm/dsl/topology' - -require 'pry' +require 'red_storm/dsl/spout' +require 'red_storm/dsl/bolt' describe RedStorm::SimpleTopology do @@ -23,10 +23,10 @@ describe RedStorm::SimpleTopology do Object.send(:remove_const, "SpoutClass2") if Object.const_defined?("SpoutClass2") Object.send(:remove_const, "BoltClass1") if Object.const_defined?("BoltClass1") Object.send(:remove_const, "BoltClass2") if Object.const_defined?("BoltClass2") - class SpoutClass1; end - class SpoutClass2; end - class BoltClass1; end - class BoltClass2; end + class SpoutClass1 < RedStorm::DSL::Spout; end + class SpoutClass2 < RedStorm::DSL::Spout; end + class BoltClass1 < RedStorm::DSL::Bolt; end + class BoltClass2 < RedStorm::DSL::Bolt; end SpoutClass1.should_receive(:base_class_path).at_least(0).times.and_return("base_path") SpoutClass2.should_receive(:base_class_path).at_least(0).times.and_return("base_path") SpoutClass1.should_receive(:java_proxy).at_least(0).times.and_return("RedStorm::JRubySpout") @@ -114,9 +114,6 @@ describe RedStorm::SimpleTopology do output_fields :f3 end end - # Pry.config.input = STDIN - # Pry.config.output = STDOUT - # binding.pry Topology1.spouts.first.output_fields.should == { "default" => ["f1", "f2"] } Topology1.spouts.last.output_fields.should == { "default" => ["f3"] } end From 67fd211d4d77b792ba2682df610f878dfbf04bc6 Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Thu, 16 Apr 2015 22:58:34 +0000 Subject: [PATCH 6/7] Wrongly assumed topo bolts inherit from redstorm Not all bolts defined in a topology inherit from the RedStorm::DSL. This ensures the output fields in the ComponentDefinition are only copied if the class uses the OutputFields module. Also, redefining field names for the default stream will replace any previously defined default field names. --- lib/red_storm/dsl/topology.rb | 14 +++++++-- spec/red_storm/dsl/topology_spec.rb | 45 +++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 2 deletions(-) diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index de1efdb..f259522 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -27,18 +27,22 @@ module RedStorm @constructor_args = constructor_args @id = id.to_s @parallelism = parallelism - @output_fields = @clazz.fields.clone + @output_fields = Hash.new([]) + + initialize_output_fields end def output_fields(*fields) + default_fields = [] fields.each do |field| case field when Hash field.each { |k, v| @output_fields[k.to_s] = v.kind_of?(Array) ? v.map(&:to_s) : [v.to_s] } else - @output_fields[Utils::DEFAULT_STREAM_ID] |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s] + default_fields |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s] end end + @output_fields[Utils::DEFAULT_STREAM_ID] = default_fields unless default_fields.empty? @output_fields end @@ -49,6 +53,12 @@ module RedStorm private + def initialize_output_fields + if @clazz.ancestors.include?(RedStorm::DSL::OutputFields) + @output_fields = @clazz.fields.clone + end + end + def java_safe_fields java_hash = java.util.HashMap.new() @output_fields.each do |k, v| diff --git a/spec/red_storm/dsl/topology_spec.rb b/spec/red_storm/dsl/topology_spec.rb index 2e52208..1c24b25 100644 --- a/spec/red_storm/dsl/topology_spec.rb +++ b/spec/red_storm/dsl/topology_spec.rb @@ -118,6 +118,28 @@ describe RedStorm::SimpleTopology do Topology1.spouts.last.output_fields.should == { "default" => ["f3"] } end + it "should default output_fields to the class defined fields" do + class SpoutClass1 + output_fields :f1, :f2 + end + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1 + end + Topology1.spouts.first.output_fields.should == { "default" => ["f1", "f2"] } + end + + it "should override class defined fields with topology output fields" do + class SpoutClass1 + output_fields :f1, :f2 + end + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1 do + output_fields :f3, :f4 + end + end + Topology1.spouts.first.output_fields.should == { "default" => ["f3", "f4"] } + end + end describe "bolt statement" do @@ -201,6 +223,29 @@ describe RedStorm::SimpleTopology do Topology1.bolts.last.output_fields.should == { "default" => ["f3"] } end + it "should default output_fields to the class defined fields" do + class BoltClass1 + output_fields :f1, :f2 + end + class Topology1 < RedStorm::SimpleTopology + bolt BoltClass1 do + end + end + Topology1.bolts.first.output_fields.should == { "default" => ["f1", "f2"] } + end + + it "should override class defined fields with topology output fields" do + class BoltClass1 + output_fields :f1, :f2 + end + class Topology1 < RedStorm::SimpleTopology + bolt BoltClass1 do + output_fields :f3, :f4 + end + end + Topology1.bolts.first.output_fields.should == { "default" => ["f3", "f4"] } + end + end describe "configure statement" do From f593b216ca628ef6d489f95343ae64f3991be095 Mon Sep 17 00:00:00 2001 From: Keith Walters Date: Thu, 16 Apr 2015 23:23:08 +0000 Subject: [PATCH 7/7] Use Pry for helpful debugging --- Gemfile.lock | 12 ++++++++++++ redstorm.gemspec | 1 + 2 files changed, 13 insertions(+) diff --git a/Gemfile.lock b/Gemfile.lock index feaa583..124deed 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -7,6 +7,7 @@ PATH GEM remote: https://rubygems.org/ specs: + coderay (1.1.0) colorize (0.5.8) coveralls (0.6.7) colorize @@ -15,8 +16,15 @@ GEM simplecov (>= 0.7) thor diff-lcs (1.2.4) + ffi (1.9.8-java) + method_source (0.8.2) mime-types (1.23) multi_json (1.7.7) + pry (0.10.1-java) + coderay (~> 1.1.0) + method_source (~> 0.8.1) + slop (~> 3.4) + spoon (~> 0.0) rake (10.0.4) redis (3.0.4) rest-client (1.6.7) @@ -33,6 +41,9 @@ GEM multi_json (~> 1.0) simplecov-html (~> 0.7.1) simplecov-html (0.7.1) + slop (3.6.0) + spoon (0.0.4) + ffi thor (0.18.1) PLATFORMS @@ -40,6 +51,7 @@ PLATFORMS DEPENDENCIES coveralls + pry redis redstorm! rspec (~> 2.13) diff --git a/redstorm.gemspec b/redstorm.gemspec index fe760b3..7c4ef68 100644 --- a/redstorm.gemspec +++ b/redstorm.gemspec @@ -21,5 +21,6 @@ Gem::Specification.new do |s| s.executables = ['redstorm'] s.add_development_dependency 'rspec', '~> 2.13' + s.add_development_dependency 'pry' s.add_runtime_dependency 'rake' end