From 89396e1ad07f18840e2c04341f555ce42291ffe9 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Tue, 15 Nov 2011 16:37:48 -0500 Subject: [PATCH] more specs + cleanups --- lib/red_storm/simple_bolt.rb | 29 +- lib/red_storm/simple_spout.rb | 57 ++-- lib/red_storm/simple_topology.rb | 47 ++-- spec/red_storm/simple_bolt_spec.rb | 367 +++++++++++++++++++------ spec/red_storm/simple_spout_spec.rb | 363 +++++++++++++++++++----- spec/red_storm/simple_topology_spec.rb | 273 ++++++++++++------ 6 files changed, 834 insertions(+), 302 deletions(-) diff --git a/lib/red_storm/simple_bolt.rb b/lib/red_storm/simple_bolt.rb index f747fd1..19ffaa8 100644 --- a/lib/red_storm/simple_bolt.rb +++ b/lib/red_storm/simple_bolt.rb @@ -9,20 +9,20 @@ module RedStorm @fields = fields.map(&:to_s) end - def self.on_receive(*args, &receive_block) + def self.on_receive(*args, &on_receive_block) options = args.last.is_a?(Hash) ? args.pop : {} method_name = args.first self.receive_options.merge!(options) - @receive_block = block_given? ? receive_block : lambda {|tuple| self.send(method_name, tuple)} + @on_receive_block = block_given? ? on_receive_block : lambda {|tuple| self.send(method_name || :on_receive, tuple)} end - def self.on_init(method_name = nil, &init_block) - @init_block = block_given? ? init_block : lambda {self.send(method_name)} + def self.on_init(method_name = nil, &on_init_block) + @on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)} end def self.on_close(method_name = nil, &close_block) - @close_block = block_given? ? close_block : lambda {self.send(method_name)} + @close_block = block_given? ? close_block : lambda {self.send(method_name || :on_close)} end # DSL instance methods @@ -38,7 +38,7 @@ module RedStorm # Bolt proxy interface def execute(tuple) - if (output = instance_exec(tuple, &self.class.receive_block)) && self.class.emit? + if (output = instance_exec(tuple, &self.class.on_receive_block)) && self.class.emit? values = [output].flatten self.class.anchor? ? @collector.emit(tuple, Values.new(*values)) : emit(*values) @collector.ack(tuple) if self.class.ack? @@ -49,7 +49,7 @@ module RedStorm @collector = collector @context = context @config = config - instance_exec(&self.class.init_block) + instance_exec(&self.class.on_init_block) end def cleanup @@ -60,22 +60,27 @@ module RedStorm declarer.declare(Fields.new(self.class.fields)) end + # default optional dsl methods/callbacks + + def on_init; end + def on_close; end + private def self.fields @fields ||= [] end - def self.receive_block - @receive_block ||= lambda {} + def self.on_receive_block + @on_receive_block ||= lambda {|tuple| self.send(:on_receive, tuple)} end - def self.init_block - @init_block ||= lambda {} + def self.on_init_block + @on_init_block ||= lambda {self.send(:on_init)} end def self.close_block - @close_block ||= lambda {} + @close_block ||= lambda {self.send(:on_close)} end def self.receive_options diff --git a/lib/red_storm/simple_spout.rb b/lib/red_storm/simple_spout.rb index ebced89..bfe17b4 100644 --- a/lib/red_storm/simple_spout.rb +++ b/lib/red_storm/simple_spout.rb @@ -13,28 +13,28 @@ module RedStorm @fields = fields.map(&:to_s) end - def self.on_send(*args, &send_block) + def self.on_send(*args, &on_send_block) options = args.last.is_a?(Hash) ? args.pop : {} method_name = args.first self.send_options.merge!(options) - @send_block = block_given? ? send_block : lambda {self.send(method_name)} + @on_send_block = block_given? ? on_send_block : lambda {self.send(method_name || :on_send)} end - def self.on_init(method_name = nil, &init_block) - @init_block = block_given? ? init_block : lambda {self.send(method_name)} + def self.on_init(method_name = nil, &on_init_block) + @on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)} end - def self.on_close(method_name = nil, &close_block) - @close_block = block_given? ? close_block : lambda {self.send(method_name)} + def self.on_close(method_name = nil, &on_close_block) + @on_close_block = block_given? ? on_close_block : lambda {self.send(method_name || :on_close)} end - def self.on_ack(method_name = nil, &ack_block) - @ack_block = block_given? ? ack_block : lambda {|msg_id| self.send(method_name, msg_id)} + def self.on_ack(method_name = nil, &on_ack_block) + @on_ack_block = block_given? ? on_ack_block : lambda {|msg_id| self.send(method_name || :on_ack, msg_id)} end - def self.on_fail(method_name = nil, &fail_block) - @fail_block = block_given? ? fail_block : lambda {|msg_id| self.send(method_name, msg_id)} + def self.on_fail(method_name = nil, &on_fail_block) + @on_fail_block = block_given? ? on_fail_block : lambda {|msg_id| self.send(method_name || :on_fail, msg_id)} end # DSL instance methods @@ -46,7 +46,7 @@ module RedStorm # Spout proxy interface def next_tuple - output = instance_exec(&self.class.send_block) + output = instance_exec(&self.class.on_send_block) if self.class.emit? if output values = [output].flatten @@ -61,11 +61,11 @@ module RedStorm @collector = collector @context = context @config = config - instance_exec(&self.class.init_block) + instance_exec(&self.class.on_init_block) end def close - instance_exec(&self.class.close_block) + instance_exec(&self.class.on_close_block) end def declare_output_fields(declarer) @@ -77,37 +77,44 @@ module RedStorm end def ack(msg_id) - instance_exec(msg_id, &self.class.ack_block) + instance_exec(msg_id, &self.class.on_ack_block) end def fail(msg_id) - instance_exec(msg_id, &self.class.fail_block) + instance_exec(msg_id, &self.class.on_fail_block) end + # default optional dsl methods/callbacks + + def on_init; end + def on_close; end + def on_ack(msg_id); end + def on_fail(msg_id); end + private def self.fields @fields ||= [] end - def self.send_block - @send_block ||= lambda {} + def self.on_send_block + @on_send_block ||= lambda {self.send(:on_send)} end - def self.init_block - @init_block ||= lambda {} + def self.on_init_block + @on_init_block ||= lambda {self.send(:on_init)} end - def self.close_block - @close_block ||= lambda {} + def self.on_close_block + @on_close_block ||= lambda {self.send(:on_close)} end - def self.ack_block - @ack_block ||= lambda {} + def self.on_ack_block + @on_ack_block ||= lambda {|msg_id| self.send(:on_ack, msg_id)} end - def self.fail_block - @fail_block ||= lambda {} + def self.on_fail_block + @on_fail_block ||= lambda {|msg_id| self.send(:on_fail, msg_id)} end def self.send_options diff --git a/lib/red_storm/simple_topology.rb b/lib/red_storm/simple_topology.rb index b516cef..b03b0e7 100644 --- a/lib/red_storm/simple_topology.rb +++ b/lib/red_storm/simple_topology.rb @@ -23,10 +23,18 @@ module RedStorm grouper, params = grouping.first case grouper - when :shuffle - storm_bolt.shuffleGrouping(source_id) when :fields storm_bolt.fieldsGrouping(source_id, Fields.new(*params)) + when :global + storm_bolt.globalGrouping(source_id) + when :shuffle + storm_bolt.shuffleGrouping(source_id) + when :none + storm_bolt.noneGrouping(source_id) + when :all + storm_bolt.allGrouping(source_id) + when :direct + storm_bolt.directGrouping(source_id) else raise("unknown grouper=#{grouper.inspect}") end @@ -68,14 +76,14 @@ module RedStorm def self.spout(spout_class, options = {}) spout_options = {:id => self.underscore(spout_class), :parallelism => 1}.merge(options) spout = SpoutDefinition.new(spout_class, spout_options[:id], spout_options[:parallelism]) - self.spouts << spout + self.components << spout end def self.bolt(bolt_class, options = {}, &bolt_block) bolt_options = {:id => self.underscore(bolt_class), :parallelism => 1}.merge(options) bolt = BoltDefinition.new(bolt_class, bolt_options[:id], bolt_options[:parallelism]) bolt.instance_exec(&bolt_block) - self.bolts << bolt + self.components << bolt end def self.configure(name = nil, &configure_block) @@ -90,7 +98,7 @@ module RedStorm # topology proxy interface def start(base_class_path, env) - self.class.resolve_ids!(self.class.spouts + self.class.bolts) + self.class.resolve_ids!(self.class.components) builder = TopologyBuilder.new self.class.spouts.each do |spout| @@ -120,28 +128,29 @@ module RedStorm private def self.resolve_ids!(components) - next_id = 1 + next_numeric_id = 1 resolved_names = {} - numeric_ids, symbolic_ids = components.map(&:id).partition{|id| id.is_a?(Fixnum)} + numeric_components, symbolic_components = components.partition{|c| c.id.is_a?(Fixnum)} + numeric_ids = numeric_components.map(&:id) # map unused numeric ids to symbolic ids - symbolic_ids.map(&:to_s).uniq.each do |id| - unless resolved_names.has_key?(id) - next_id += 1 while numeric_ids.include?(next_id) - numeric_ids << next_id - resolved_names[id] = next_id - end + symbolic_components.each do |component| + id = component.id.to_s + raise("duplicate symbolic id in #{component.clazz.name} on id=#{id}") if resolved_names.has_key?(id) + next_numeric_id += 1 while numeric_ids.include?(next_numeric_id) + numeric_ids << next_numeric_id + resolved_names[id] = next_numeric_id end # reassign numeric ids in all components components.each do |component| unless component.id.is_a?(Fixnum) - component.id = resolved_names[component.id] || raise("cannot resolve #{component.clazz.name} id=#{component.id.inspect}") + component.id = resolved_names[component.id.to_s] || raise("cannot resolve #{component.clazz.name} id=#{component.id.to_s}") end if component.respond_to?(:sources) component.sources.map! do |source_id, grouping| - id = source_id.is_a?(Fixnum) ? source_id : resolved_names[source_id] || raise("cannot resolve #{component.clazz.name} source id=#{source_id.inspect}") + id = source_id.is_a?(Fixnum) ? source_id : resolved_names[source_id.to_s] || raise("cannot resolve #{component.clazz.name} source id=#{source_id.to_s}") [id, grouping] end end @@ -149,11 +158,15 @@ module RedStorm end def self.spouts - @spouts ||= [] + self.components.select{|c| c.is_a?(SpoutDefinition)} end def self.bolts - @bolts ||= [] + self.components.select{|c| c.is_a?(BoltDefinition)} + end + + def self.components + @components ||= [] end def self.topology_name diff --git a/spec/red_storm/simple_bolt_spec.rb b/spec/red_storm/simple_bolt_spec.rb index 57d873f..5559bee 100644 --- a/spec/red_storm/simple_bolt_spec.rb +++ b/spec/red_storm/simple_bolt_spec.rb @@ -3,6 +3,12 @@ require 'red_storm/simple_bolt' describe RedStorm::SimpleBolt do + before(:each) do + Object.send(:remove_const, "Bolt1") if Object.const_defined?("Bolt1") + Object.send(:remove_const, "Bolt2") if Object.const_defined?("Bolt2") + Object.send(:remove_const, "Bolt3") if Object.const_defined?("Bolt3") + end + describe "interface" do it "should implement bolt proxy" do spout = RedStorm::SimpleBolt.new @@ -24,37 +30,37 @@ describe RedStorm::SimpleBolt do describe "output_field statement" do it "should parse single argument" do - class BoltOutputField1 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt output_fields :f1 end - bolt = BoltOutputField1.new - BoltOutputField1.send(:fields).should == ["f1"] + bolt = Bolt1.new + Bolt1.send(:fields).should == ["f1"] end it "should parse multiple arguments" do - class BoltOutputField2 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt output_fields :f1, :f2 end - BoltOutputField2.send(:fields).should == ["f1", "f2"] + Bolt1.send(:fields).should == ["f1", "f2"] end it "should parse string and symbol arguments" do - class BoltOutputField3 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt output_fields :f1, "f2" end - BoltOutputField3.send(:fields).should == ["f1", "f2"] + Bolt1.send(:fields).should == ["f1", "f2"] end it "should not share state over mutiple classes" do - class BoltOutputField4 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt output_fields :f1 end - class BoltOutputField5 < RedStorm::SimpleBolt + class Bolt2 < RedStorm::SimpleBolt output_fields :f2 end RedStorm::SimpleBolt.send(:fields).should == [] - BoltOutputField4.send(:fields).should == ["f1"] - BoltOutputField5.send(:fields).should == ["f2"] + Bolt1.send(:fields).should == ["f1"] + Bolt2.send(:fields).should == ["f2"] end end @@ -76,111 +82,163 @@ describe RedStorm::SimpleBolt do describe "with block argument" do it "should parse without options" do - class BoltBlockArgument1 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive {} end - BoltBlockArgument1.receive_options.should == DEFAULT_RECEIVE_OPTIONS - BoltBlockArgument1.send(:emit?).should be_true - BoltBlockArgument1.send(:ack?).should be_false - BoltBlockArgument1.send(:anchor?).should be_false + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS + Bolt1.send(:emit?).should be_true + Bolt1.send(:ack?).should be_false + Bolt1.send(:anchor?).should be_false end it "should parse :emit option" do - class BoltBlockArgument2 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive :emit => false do end end - BoltBlockArgument2.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit => false) - BoltBlockArgument2.send(:emit?).should be_false + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit => false) + Bolt1.send(:emit?).should be_false end it "should parse :ack option" do - class BoltBlockArgument3 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive :ack => true do end end - BoltBlockArgument3.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:ack => true) - BoltBlockArgument3.send(:ack?).should be_true + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:ack => true) + Bolt1.send(:ack?).should be_true end it "should parse :anchor option" do - class BoltBlockArgument4 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive :anchor => true do end end - BoltBlockArgument4.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:anchor => true) - BoltBlockArgument4.send(:anchor?).should be_true + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:anchor => true) + Bolt1.send(:anchor?).should be_true end it "should parse multiple option" do - class BoltBlockArgument5 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive :emit => false, :ack =>true, :anchor => true do end end - BoltBlockArgument5.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) - BoltBlockArgument5.send(:emit?).should be_false - BoltBlockArgument5.send(:ack?).should be_true - BoltBlockArgument5.send(:anchor?).should be_true + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) + Bolt1.send(:emit?).should be_false + Bolt1.send(:ack?).should be_true + Bolt1.send(:anchor?).should be_true end end describe "with method name" do it "should parse without options" do - class BoltMethodName1 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive :test_method end - BoltMethodName1.receive_options.should == DEFAULT_RECEIVE_OPTIONS - BoltMethodName1.send(:emit?).should be_true - BoltMethodName1.send(:ack?).should be_false - BoltMethodName1.send(:anchor?).should be_false + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS + Bolt1.send(:emit?).should be_true + Bolt1.send(:ack?).should be_false + Bolt1.send(:anchor?).should be_false end it "should parse :emit option" do - class BoltMethodName2 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive :test_method, :emit => false end - BoltMethodName2.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit => false) - BoltMethodName2.send(:emit?).should be_false + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit => false) + Bolt1.send(:emit?).should be_false end it "should parse :ack option" do - class BoltMethodName3 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive :ack => true do end end - BoltMethodName3.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:ack => true) - BoltMethodName3.send(:ack?).should be_true + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:ack => true) + Bolt1.send(:ack?).should be_true end it "should parse :anchor option" do - class BoltMethodName4 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive :anchor => true do end end - BoltMethodName4.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:anchor => true) - BoltMethodName4.send(:anchor?).should be_true + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:anchor => true) + Bolt1.send(:anchor?).should be_true end it "should parse multiple option" do - class BoltMethodName5 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive :emit => false, :ack =>true, :anchor => true do end end - BoltMethodName5.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) - BoltMethodName5.send(:emit?).should be_false - BoltMethodName5.send(:ack?).should be_true - BoltMethodName5.send(:anchor?).should be_true + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) + Bolt1.send(:emit?).should be_false + Bolt1.send(:ack?).should be_true + Bolt1.send(:anchor?).should be_true + end + + end + + describe "with default method" do + + it "should parse without options" do + class Bolt1 < RedStorm::SimpleBolt + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS + Bolt1.send(:emit?).should be_true + Bolt1.send(:ack?).should be_false + Bolt1.send(:anchor?).should be_false + end + + it "should parse :emit option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :emit => false + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit => false) + Bolt1.send(:emit?).should be_false + end + + it "should parse :ack option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :ack => true + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:ack => true) + Bolt1.send(:ack?).should be_true + end + + it "should parse :anchor option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :anchor => true + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:anchor => true) + Bolt1.send(:anchor?).should be_true + end + + it "should parse multiple option" do + class Bolt1 < RedStorm::SimpleBolt + on_receive :emit => false, :ack =>true, :anchor => true + end + + Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true) + Bolt1.send(:emit?).should be_false + Bolt1.send(:ack?).should be_true + Bolt1.send(:anchor?).should be_true end end @@ -189,21 +247,21 @@ describe RedStorm::SimpleBolt do describe "on_init statement" do it "should parse block argument" do - class BoltOnInitBlockArgument1 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_init {self.test_block_call} end - bolt = BoltOnInitBlockArgument1.new + bolt = Bolt1.new bolt.should_receive(:test_block_call) bolt.prepare(nil, nil, nil) end it "should parse method name" do - class BoltOnInitMethodName1 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_init :test_method end - bolt = BoltOnInitMethodName1.new + bolt = Bolt1.new bolt.should_receive(:test_method) bolt.prepare(nil, nil, nil) end @@ -212,21 +270,21 @@ describe RedStorm::SimpleBolt do describe "on_close statement" do it "should parse block argument" do - class BoltOnCloseBlockArgument1 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_close {self.test_block_call} end - bolt = BoltOnCloseBlockArgument1.new + bolt = Bolt1.new bolt.should_receive(:test_block_call) bolt.cleanup end it "should parse method name" do - class BoltOnCloseMethodName1 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_close :test_method end - bolt = BoltOnCloseMethodName1.new + bolt = Bolt1.new bolt.should_receive(:test_method) bolt.cleanup end @@ -237,84 +295,171 @@ describe RedStorm::SimpleBolt do describe "execute" do + class RedStorm::Values; end + it "should auto emit on single value output" do - class BoltNextTuple1 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive {|tuple| tuple} end + class Bolt2 < RedStorm::SimpleBolt + on_receive :my_method + def my_method(tuple); tuple; end + end + class Bolt3 < RedStorm::SimpleBolt + def on_receive(tuple); tuple; end + end + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values") + collector.should_receive(:emit).with("values").exactly(3).times - class RedStorm::Values; end - RedStorm::Values.should_receive(:new).with("output").and_return("values") - collector.should_receive(:emit).with("values") + bolt = Bolt1.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") - bolt = BoltNextTuple1.new + bolt = Bolt2.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") + + bolt = Bolt3.new bolt.prepare(nil, nil, collector) bolt.execute("output") end it "should auto emit on multiple value output" do - class BoltNextTuple2 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive {|tuple| tuple} end + class Bolt2 < RedStorm::SimpleBolt + on_receive :my_method + def my_method(tuple); tuple; end + end + class Bolt3 < RedStorm::SimpleBolt + def on_receive(tuple); tuple; end + end + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output1", "output2").exactly(3).times.and_return("values") + collector.should_receive(:emit).with("values").exactly(3).times - class RedStorm::Values; end - RedStorm::Values.should_receive(:new).with("output1", "output2").and_return("values") - collector.should_receive(:emit).with("values") + bolt = Bolt1.new + bolt.prepare(nil, nil, collector) + bolt.execute(["output1", "output2"]) - bolt = BoltNextTuple2.new + bolt = Bolt2.new + bolt.prepare(nil, nil, collector) + bolt.execute(["output1", "output2"]) + + bolt = Bolt3.new bolt.prepare(nil, nil, collector) bolt.execute(["output1", "output2"]) end it "should anchor on single value output" do - class BoltNextTuple3 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive :anchor => true do |tuple| "output" end end + class Bolt2 < RedStorm::SimpleBolt + on_receive :my_method, :anchor => true + def my_method(tuple) + "output" + end + end + class Bolt3 < RedStorm::SimpleBolt + on_receive :anchor => true + def on_receive(tuple) + "output" + end + end + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values") + collector.should_receive(:emit).with("tuple", "values").exactly(3).times - class RedStorm::Values; end - RedStorm::Values.should_receive(:new).with("output").and_return("values") - collector.should_receive(:emit).with("tuple", "values") + bolt = Bolt1.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") - bolt = BoltNextTuple3.new + bolt = Bolt2.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") + + bolt = Bolt3.new bolt.prepare(nil, nil, collector) bolt.execute("tuple") end it "should ack on single value output" do - class BoltNextTuple4 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive :anchor => true, :ack => true do |tuple| "output" end end + class Bolt2 < RedStorm::SimpleBolt + on_receive :my_method, :anchor => true, :ack => true + def my_method(tuple) + "output" + end + end + class Bolt3 < RedStorm::SimpleBolt + on_receive :anchor => true, :ack => true + def on_receive(tuple) + "output" + end + end + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values") + collector.should_receive(:emit).with("tuple", "values").exactly(3).times + collector.should_receive(:ack).with("tuple").exactly(3).times - class RedStorm::Values; end - RedStorm::Values.should_receive(:new).with("output").and_return("values") - collector.should_receive(:emit).with("tuple", "values") - collector.should_receive(:ack).with("tuple") + bolt = Bolt1.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") - bolt = BoltNextTuple4.new + bolt = Bolt2.new + bolt.prepare(nil, nil, collector) + bolt.execute("tuple") + + bolt = Bolt3.new bolt.prepare(nil, nil, collector) bolt.execute("tuple") end it "should not emit" do - class BoltNextTuple5 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_receive :emit => false do |tuple| tuple end end - collector = mock("Collector") + class Bolt2 < RedStorm::SimpleBolt + on_receive :my_method, :emit => false + def my_method(tuple) + tuple + end + end + class Bolt3 < RedStorm::SimpleBolt + on_receive :emit => false + def on_receive(tuple) + tuple + end + end - class RedStorm::Values; end + collector = mock("Collector") RedStorm::Values.should_receive(:new).never collector.should_receive(:emit).never - bolt = BoltNextTuple5.new + bolt = Bolt1.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") + + bolt = Bolt2.new + bolt.prepare(nil, nil, collector) + bolt.execute("output") + + bolt = Bolt3.new bolt.prepare(nil, nil, collector) bolt.execute("output") end @@ -322,12 +467,39 @@ describe RedStorm::SimpleBolt do describe "prepare" do it "should assing collector, context, config and call init block" do - class BoltPrepare1 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_init {trigger} end - bolt = BoltPrepare1.new - bolt.should_receive(:trigger).once + class Bolt2 < RedStorm::SimpleBolt + on_init :my_method + def my_method; trigger; end + end + class Bolt3 < RedStorm::SimpleBolt + def on_init; trigger; end + end + bolt = Bolt1.new + bolt.should_receive(:trigger).once + bolt.config.should be_nil + bolt.context.should be_nil + bolt.collector.should be_nil + bolt.prepare("config", "context", "collector") + bolt.config.should == "config" + bolt.context.should == "context" + bolt.collector.should == "collector" + + bolt = Bolt2.new + bolt.should_receive(:trigger).once + bolt.config.should be_nil + bolt.context.should be_nil + bolt.collector.should be_nil + bolt.prepare("config", "context", "collector") + bolt.config.should == "config" + bolt.context.should == "context" + bolt.collector.should == "collector" + + bolt = Bolt3.new + bolt.should_receive(:trigger).once bolt.config.should be_nil bolt.context.should be_nil bolt.collector.should be_nil @@ -340,22 +512,37 @@ describe RedStorm::SimpleBolt do describe "cleanup" do it "should call close block" do - class BoltClose1 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt on_close {trigger} end - bolt = BoltClose1.new - bolt.should_receive(:trigger).once + class Bolt2 < RedStorm::SimpleBolt + on_close :my_method + def my_method; trigger; end + end + class Bolt3 < RedStorm::SimpleBolt + def on_close; trigger; end + end + bolt = Bolt1.new + bolt.should_receive(:trigger).once + bolt.cleanup + + bolt = Bolt2.new + bolt.should_receive(:trigger).once + bolt.cleanup + + bolt = Bolt3.new + bolt.should_receive(:trigger).once bolt.cleanup end end describe "declare_output_fields" do it "should declare fields" do - class BoltDeclare1 < RedStorm::SimpleBolt + class Bolt1 < RedStorm::SimpleBolt output_fields :f1, :f2 end - bolt = BoltDeclare1.new + bolt = Bolt1.new class RedStorm::Fields; end declarer = mock("Declarer") declarer.should_receive(:declare).with("fields") diff --git a/spec/red_storm/simple_spout_spec.rb b/spec/red_storm/simple_spout_spec.rb index c9d2dd3..e57c576 100644 --- a/spec/red_storm/simple_spout_spec.rb +++ b/spec/red_storm/simple_spout_spec.rb @@ -3,6 +3,12 @@ require 'red_storm/simple_spout' describe RedStorm::SimpleSpout do + before(:each) do + Object.send(:remove_const, "Spout1") if Object.const_defined?("Spout1") + Object.send(:remove_const, "Spout2") if Object.const_defined?("Spout2") + Object.send(:remove_const, "Spout3") if Object.const_defined?("Spout3") + end + describe "interface" do it "should implement spout proxy" do spout = RedStorm::SimpleSpout.new @@ -46,37 +52,36 @@ describe RedStorm::SimpleSpout do describe "output_field statement" do it "should parse single argument" do - class Test1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout output_fields :f1 end - test1 = Test1.new - Test1.send(:fields).should == ["f1"] + Spout1.send(:fields).should == ["f1"] end it "should parse multiple arguments" do - class Test2 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout output_fields :f1, :f2 end - Test2.send(:fields).should == ["f1", "f2"] + Spout1.send(:fields).should == ["f1", "f2"] end it "should parse string and symbol arguments" do - class Test3 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout output_fields :f1, "f2" end - Test3.send(:fields).should == ["f1", "f2"] + Spout1.send(:fields).should == ["f1", "f2"] end it "should not share state over mutiple classes" do - class Test4 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout output_fields :f1 end - class Test5 < RedStorm::SimpleSpout + class Spout2 < RedStorm::SimpleSpout output_fields :f2 end RedStorm::SimpleSpout.send(:fields).should == [] - Test4.send(:fields).should == ["f1"] - Test5.send(:fields).should == ["f2"] + Spout1.send(:fields).should == ["f1"] + Spout2.send(:fields).should == ["f2"] end end @@ -90,29 +95,29 @@ describe RedStorm::SimpleSpout do describe "with block argument" do it "should parse without options" do - class BlockArgument1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_send {self.test_method} end - BlockArgument1.send_options.should == DEFAULT_SEND_OPTIONS + Spout1.send_options.should == DEFAULT_SEND_OPTIONS - spout = BlockArgument1.new + spout = Spout1.new spout.should_receive(:test_method) - BlockArgument1.should_receive(:emit?).and_return(false) + Spout1.should_receive(:emit?).and_return(false) spout.next_tuple end it "should parse :emit option" do - class BlockArgument2 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_send :emit => false do self.test_method end end - BlockArgument2.send_options.should == DEFAULT_SEND_OPTIONS.merge(:emit => false) - BlockArgument2.send(:emit?).should be_false + Spout1.send_options.should == DEFAULT_SEND_OPTIONS.merge(:emit => false) + Spout1.send(:emit?).should be_false - spout = BlockArgument2.new + spout = Spout1.new spout.should_receive(:test_method) spout.next_tuple end @@ -121,51 +126,92 @@ describe RedStorm::SimpleSpout do describe "with method name" do it "should parse without options" do - class MethodName1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_send :test_method end - MethodName1.send_options.should == DEFAULT_SEND_OPTIONS + Spout1.send_options.should == DEFAULT_SEND_OPTIONS - spout = MethodName1.new + spout = Spout1.new spout.should_receive(:test_method) - MethodName1.should_receive(:emit?).and_return(false) + Spout1.should_receive(:emit?).and_return(false) spout.next_tuple end it "should parse :emit option" do - class MethodName2 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_send :test_method, :emit => false end - MethodName2.send_options.should == DEFAULT_SEND_OPTIONS.merge(:emit => false) - MethodName2.send(:emit?).should be_false + Spout1.send_options.should == DEFAULT_SEND_OPTIONS.merge(:emit => false) + Spout1.send(:emit?).should be_false - spout = MethodName2.new + spout = Spout1.new + spout.should_receive(:test_method) + spout.next_tuple + end + end + + describe "with method" do + + it "should parse without options" do + class Spout1 < RedStorm::SimpleSpout + def on_send; test_method; end + end + + Spout1.send_options.should == DEFAULT_SEND_OPTIONS + + spout = Spout1.new + spout.should_receive(:test_method) + Spout1.should_receive(:emit?).and_return(false) + spout.next_tuple + end + + it "should parse :emit option" do + class Spout1 < RedStorm::SimpleSpout + on_send :emit => false + def on_send; test_method; end + end + + Spout1.send_options.should == DEFAULT_SEND_OPTIONS.merge(:emit => false) + Spout1.send(:emit?).should be_false + + spout = Spout1.new spout.should_receive(:test_method) spout.next_tuple end end end + describe "on_init statement" do it "should parse block argument" do - class OnInitBlockArgument1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_init {self.test_block_call} end - spout = OnInitBlockArgument1.new + spout = Spout1.new spout.should_receive(:test_block_call) spout.open(nil, nil, nil) end it "should parse method name" do - class OnInitMethodName1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_init :test_method end - spout = OnInitMethodName1.new + spout = Spout1.new + spout.should_receive(:test_method) + spout.open(nil, nil, nil) + end + + it "should call method" do + class Spout1 < RedStorm::SimpleSpout + def on_init; test_method; end + end + + spout = Spout1.new spout.should_receive(:test_method) spout.open(nil, nil, nil) end @@ -174,21 +220,31 @@ describe RedStorm::SimpleSpout do describe "on_close statement" do it "should parse block argument" do - class OnCloseBlockArgument1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_close {self.test_block_call} end - spout = OnCloseBlockArgument1.new + spout = Spout1.new spout.should_receive(:test_block_call) spout.close end it "should parse method name" do - class OnCloseMethodName1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_close :test_method end - spout = OnCloseMethodName1.new + spout = Spout1.new + spout.should_receive(:test_method) + spout.close + end + + it "should call method" do + class Spout1 < RedStorm::SimpleSpout + def on_close; test_method; end + end + + spout = Spout1.new spout.should_receive(:test_method) spout.close end @@ -197,21 +253,31 @@ describe RedStorm::SimpleSpout do describe "on_ack statement" do it "should parse block argument" do - class OnAckBlockArgument1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_ack {|msg_id| self.test_block_call(msg_id)} end - spout = OnAckBlockArgument1.new + spout = Spout1.new spout.should_receive(:test_block_call).with("test") spout.ack("test") end it "should parse method name" do - class OnAckMethodName1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_ack :test_method end - spout = OnAckMethodName1.new + spout = Spout1.new + spout.should_receive(:test_method).with("test") + spout.ack("test") + end + + it "should call method " do + class Spout1 < RedStorm::SimpleSpout + def on_ack(msg_id); test_method(msg_id); end + end + + spout = Spout1.new spout.should_receive(:test_method).with("test") spout.ack("test") end @@ -220,21 +286,31 @@ describe RedStorm::SimpleSpout do describe "on_fail statement" do it "should parse block argument" do - class OnFailBlockArgument1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_fail {|msg_id| self.test_block_call(msg_id)} end - spout = OnFailBlockArgument1.new + spout = Spout1.new spout.should_receive(:test_block_call).with("test") spout.fail("test") end it "should parse method name" do - class OnFailMethodName1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_fail :test_method end - spout = OnFailMethodName1.new + spout = Spout1.new + spout.should_receive(:test_method).with("test") + spout.fail("test") + end + + it "should parse method name" do + class Spout1 < RedStorm::SimpleSpout + def on_fail(msg_id); test_method(msg_id); end + end + + spout = Spout1.new spout.should_receive(:test_method).with("test") spout.fail("test") end @@ -244,67 +320,130 @@ describe RedStorm::SimpleSpout do describe "spout" do + class RedStorm::Values; end + describe "next_tuple" do it "should auto emit on single value output" do - class SpoutNextTuple1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_send {"output"} end + class Spout2 < RedStorm::SimpleSpout + on_send :my_method + def my_method; "output"; end + end + class Spout3 < RedStorm::SimpleSpout + def on_send; "output"; end + end + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values") + collector.should_receive(:emit).with("values").exactly(3).times - class RedStorm::Values; end - RedStorm::Values.should_receive(:new).with("output").and_return("values") - collector.should_receive(:emit).with("values") + spout = Spout1.new + spout.open(nil, nil, collector) + spout.next_tuple - spout = SpoutNextTuple1.new + spout = Spout2.new + spout.open(nil, nil, collector) + spout.next_tuple + + spout = Spout3.new spout.open(nil, nil, collector) spout.next_tuple end it "should auto emit on multiple values output" do - class SpoutNextTuple2 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_send {["output1", "output2"]} end + class Spout2 < RedStorm::SimpleSpout + on_send :my_method + def my_method; ["output1", "output2"]; end + end + class Spout3 < RedStorm::SimpleSpout + def on_send; ["output1", "output2"]; end + end + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output1", "output2").exactly(3).times.and_return("values") + collector.should_receive(:emit).with("values").exactly(3).times - class RedStorm::Values; end - RedStorm::Values.should_receive(:new).with("output1", "output2").and_return("values") - collector.should_receive(:emit).with("values") + spout = Spout1.new + spout.open(nil, nil, collector) + spout.next_tuple - spout = SpoutNextTuple2.new + spout = Spout2.new + spout.open(nil, nil, collector) + spout.next_tuple + + spout = Spout3.new spout.open(nil, nil, collector) spout.next_tuple end it "should sleep on nil output" do - class SpoutNextTuple2 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_send {nil} end - collector = mock("Collector") + class Spout2 < RedStorm::SimpleSpout + on_send :my_method + def my_method; nil; end + end + class Spout3 < RedStorm::SimpleSpout + def on_send; nil; end + end - class RedStorm::Values; end + collector = mock("Collector") RedStorm::Values.should_receive(:new).never collector.should_receive(:emit).never - spout = SpoutNextTuple2.new + spout = Spout1.new + spout.should_receive(:sleep) + spout.open(nil, nil, collector) + spout.next_tuple + + spout = Spout2.new + spout.should_receive(:sleep) + spout.open(nil, nil, collector) + spout.next_tuple + + spout = Spout3.new spout.should_receive(:sleep) spout.open(nil, nil, collector) spout.next_tuple end it "should respect :emit => false" do - class SpoutNextTuple3 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_send :emit => false do "output" end end - collector = mock("Collector") + class Spout2 < RedStorm::SimpleSpout + on_send :my_method, :emit => false + def my_method; "output"; end + end + class Spout3 < RedStorm::SimpleSpout + on_send :emit => false + def on_send; "output" end + end - class RedStorm::Values; end + collector = mock("Collector") RedStorm::Values.should_receive(:new).never collector.should_receive(:emit).never - spout = SpoutNextTuple3.new + spout = Spout1.new + spout.should_receive(:sleep).never + spout.open(nil, nil, collector) + spout.next_tuple + + spout = Spout2.new + spout.should_receive(:sleep).never + spout.open(nil, nil, collector) + spout.next_tuple + + spout = Spout3.new spout.should_receive(:sleep).never spout.open(nil, nil, collector) spout.next_tuple @@ -313,12 +452,39 @@ describe RedStorm::SimpleSpout do describe "open" do it "should assing collector, context, config and call init block" do - class SpoutPrepare1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_init {trigger} end - spout = SpoutPrepare1.new - spout.should_receive(:trigger).once + class Spout2 < RedStorm::SimpleSpout + on_init :my_method + def my_method; trigger; end + end + class Spout3 < RedStorm::SimpleSpout + def on_init; trigger; end + end + spout = Spout1.new + spout.should_receive(:trigger).once + spout.config.should be_nil + spout.context.should be_nil + spout.collector.should be_nil + spout.open("config", "context", "collector") + spout.config.should == "config" + spout.context.should == "context" + spout.collector.should == "collector" + + spout = Spout2.new + spout.should_receive(:trigger).once + spout.config.should be_nil + spout.context.should be_nil + spout.collector.should be_nil + spout.open("config", "context", "collector") + spout.config.should == "config" + spout.context.should == "context" + spout.collector.should == "collector" + + spout = Spout3.new + spout.should_receive(:trigger).once spout.config.should be_nil spout.context.should be_nil spout.collector.should be_nil @@ -331,22 +497,37 @@ describe RedStorm::SimpleSpout do describe "close" do it "should call close block" do - class SpoutClose1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_close {trigger} end - spout = SpoutClose1.new - spout.should_receive(:trigger).once + class Spout2 < RedStorm::SimpleSpout + on_close :my_method + def my_method; trigger; end + end + class Spout3 < RedStorm::SimpleSpout + def on_close; trigger; end + end + spout = Spout1.new + spout.should_receive(:trigger).once + spout.close + + spout = Spout2.new + spout.should_receive(:trigger).once + spout.close + + spout = Spout2.new + spout.should_receive(:trigger).once spout.close end end describe "declare_output_fields" do it "should declare fields" do - class SpoutDeclare1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout output_fields :f1, :f2 end - spout = SpoutDeclare1.new + spout = Spout1.new class RedStorm::Fields; end declarer = mock("Declarer") declarer.should_receive(:declare).with("fields") @@ -358,20 +539,36 @@ describe RedStorm::SimpleSpout do describe "is_distributed" do it "should report is_distributed" do RedStorm::SimpleSpout.is_distributed?.should be_false - class SpoutIsDistributed1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout set :is_distributed => true end - spout = SpoutIsDistributed1.new + spout = Spout1.new spout.is_distributed.should be_true end end describe "ack" do it "should call ack block" do - class SpoutAck1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_ack {|msg_id| trigger(msg_id)} end - spout = SpoutAck1.new + class Spout2 < RedStorm::SimpleSpout + on_ack :my_method + def my_method(msg_id) trigger(msg_id); end + end + class Spout3 < RedStorm::SimpleSpout + def on_ack(msg_id) trigger(msg_id); end + end + + spout = Spout1.new + spout.should_receive(:trigger).once.with("test") + spout.ack("test") + + spout = Spout2.new + spout.should_receive(:trigger).once.with("test") + spout.ack("test") + + spout = Spout3.new spout.should_receive(:trigger).once.with("test") spout.ack("test") end @@ -379,10 +576,26 @@ describe RedStorm::SimpleSpout do describe "fail" do it "should call fail block" do - class SpoutFail1 < RedStorm::SimpleSpout + class Spout1 < RedStorm::SimpleSpout on_fail {|msg_id| trigger(msg_id)} end - spout = SpoutFail1.new + class Spout2 < RedStorm::SimpleSpout + on_fail :my_method + def my_method(msg_id) trigger(msg_id); end + end + class Spout3 < RedStorm::SimpleSpout + def on_fail(msg_id) trigger(msg_id); end + end + + spout = Spout1.new + spout.should_receive(:trigger).once.with("test") + spout.fail("test") + + spout = Spout2.new + spout.should_receive(:trigger).once.with("test") + spout.fail("test") + + spout = Spout3.new spout.should_receive(:trigger).once.with("test") spout.fail("test") end diff --git a/spec/red_storm/simple_topology_spec.rb b/spec/red_storm/simple_topology_spec.rb index 0357314..cc8c450 100644 --- a/spec/red_storm/simple_topology_spec.rb +++ b/spec/red_storm/simple_topology_spec.rb @@ -3,9 +3,15 @@ require 'red_storm/simple_topology' describe RedStorm::SimpleTopology do + before(:each) do + Object.send(:remove_const, "Topology1") if Object.const_defined?("Topology1") + Object.send(:remove_const, "Topology2") if Object.const_defined?("Topology2") + Object.send(:remove_const, "Topology3") if Object.const_defined?("Topology3") + end + it "should set default topology name" do - class DefaultTopologyName < RedStorm::SimpleTopology; end - DefaultTopologyName.topology_name.should == "default_topology_name" + class Topology1 < RedStorm::SimpleTopology; end + Topology1.topology_name.should == "topology1" end @@ -31,21 +37,24 @@ describe RedStorm::SimpleTopology do class SpoutClass2; end it "should parse single spout without options" do - RedStorm::SimpleTopology::SpoutDefinition.should_receive(:new).with(SpoutClass1, "spout_class1", 1).and_return("spout_definition") - class TopologySpout1 < RedStorm::SimpleTopology + spout = RedStorm::SimpleTopology::SpoutDefinition.new(SpoutClass1, "spout_class1", 1) + RedStorm::SimpleTopology::SpoutDefinition.should_receive(:new).with(SpoutClass1, "spout_class1", 1).and_return(spout) + class Topology1 < RedStorm::SimpleTopology spout SpoutClass1 end - TopologySpout1.spouts.should == ["spout_definition"] + Topology1.spouts.should == [spout] end it "should parse multiple spouts with options" do - RedStorm::SimpleTopology::SpoutDefinition.should_receive(:new).with(SpoutClass1, "id1", 2).and_return("spout_definition1") - RedStorm::SimpleTopology::SpoutDefinition.should_receive(:new).with(SpoutClass2, "id2", 3).and_return("spout_definition2") - class TopologySpout2 < RedStorm::SimpleTopology + spout1 = RedStorm::SimpleTopology::SpoutDefinition.new(SpoutClass1, "id1", 2) + spout2 = RedStorm::SimpleTopology::SpoutDefinition.new(SpoutClass2, "id2", 3) + RedStorm::SimpleTopology::SpoutDefinition.should_receive(:new).with(SpoutClass1, "id1", 2).and_return(spout1) + RedStorm::SimpleTopology::SpoutDefinition.should_receive(:new).with(SpoutClass2, "id2", 3).and_return(spout2) + class Topology1 < RedStorm::SimpleTopology spout SpoutClass1, :id => "id1", :parallelism => 2 spout SpoutClass2, :id => "id2", :parallelism => 3 end - TopologySpout2.spouts.should == ["spout_definition1", "spout_definition2"] + Topology1.spouts.should == [spout1, spout2] end end @@ -56,7 +65,7 @@ describe RedStorm::SimpleTopology do class BoltClass2; end it "should parse single bolt without options" do - bolt_definition = mock("BoltDefinition") + bolt_definition = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, "bolt_class1", 1) RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, "bolt_class1", 1).and_return(bolt_definition) bolt_definition.should_receive(:source).with(1, {:fields => ["f1"]}) class TopologyBolt1 < RedStorm::SimpleTopology @@ -68,25 +77,25 @@ describe RedStorm::SimpleTopology do end it "should parse single bolt with options" do - bolt_definition = mock("BoltDefinition") + bolt_definition = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, "id", 2) RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, "id", 2).and_return(bolt_definition) bolt_definition.should_receive(:source).with(1, :shuffle) - class TopologyBolt2 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology bolt BoltClass1, :id => "id", :parallelism => 2 do source 1, :shuffle end end - TopologyBolt2.bolts.should == [bolt_definition] + Topology1.bolts.should == [bolt_definition] end it "should parse multiple bolt with options" do - bolt_definition1 = mock("BoltDefinition") - bolt_definition2 = mock("BoltDefinition") + bolt_definition1 = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, "id1", 2) + bolt_definition2 = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass2, "id2", 3) RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, "id1", 2).and_return(bolt_definition1) RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass2, "id2", 3).and_return(bolt_definition2) bolt_definition1.should_receive(:source).with(1, :shuffle) bolt_definition2.should_receive(:source).with(2, {:fields => ["f1"]}) - class TopologyBolt3 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology bolt BoltClass1, :id => "id1", :parallelism => 2 do source 1, :shuffle end @@ -94,7 +103,7 @@ describe RedStorm::SimpleTopology do source 2, :fields => ["f1"] end end - TopologyBolt3.bolts.should == [bolt_definition1, bolt_definition2] + Topology1.bolts.should == [bolt_definition1, bolt_definition2] end end @@ -102,52 +111,52 @@ describe RedStorm::SimpleTopology do describe "configure statement" do it "should parse name options only" do - class TopologyConfigure1 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology configure "name" end - TopologyConfigure1.topology_name.should == "name" + Topology1.topology_name.should == "name" end it "should parse configuration block only" do - class TopologyConfigure2 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology configure {trigger} end - topology = TopologyConfigure2.new + topology = Topology1.new topology.should_receive(:trigger) - topology.instance_exec(&TopologyConfigure2.configure_block) + topology.instance_exec(&Topology1.configure_block) end it "should parse name and configuration block" do - class TopologyConfigure3 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology configure "name" do trigger end end - TopologyConfigure3.topology_name.should == "name" - topology = TopologyConfigure3.new + Topology1.topology_name.should == "name" + topology = Topology1.new topology.should_receive(:trigger) - topology.instance_exec(&TopologyConfigure3.configure_block) + topology.instance_exec(&Topology1.configure_block) end end define "on_submit statement" do it "should parse block param only" do - class TopologyOnsubmit1 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology on_submit {|env| trigger(env)} end - topology = TopologyOnsubmit1.new + topology = Topology1.new topology.should_receive(:trigger).with("env") - topology.instance_exec("env", &TopologyOnsubmit1.submit_block) + topology.instance_exec("env", &Topology1.submit_block) end it "should method name param only" do - class TopologyOnsubmit1 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology on_submit :my_method end - topology = TopologyOnsubmit1.new + topology = Topology1.new topology.should_receive(:my_method).with("env") - topology.instance_exec("env", &TopologyOnsubmit1.submit_block) + topology.instance_exec("env", &Topology1.submit_block) end end end @@ -160,9 +169,10 @@ describe RedStorm::SimpleTopology do class RedStorm::JRubySpout; end class RedStorm::JRubyBolt; end class RedStorm::Config; end + class RedStorm::Fields; end it "should start in :local env" do - class TopologyStart1 < RedStorm::SimpleTopology; end + class Topology1 < RedStorm::SimpleTopology; end builder = mock(RedStorm::TopologyBuilder) RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) @@ -172,29 +182,29 @@ describe RedStorm::SimpleTopology do configurator.should_receive(:config).and_return("config") cluster = mock(RedStorm::LocalCluster) RedStorm::LocalCluster.should_receive(:new).and_return(cluster) - cluster.should_receive(:submitTopology).with("topology_start1", "config", "topology") - TopologyStart1.new.start("base_path", :local) + cluster.should_receive(:submitTopology).with("topology1", "config", "topology") + Topology1.new.start("base_path", :local) end it "should start in :cluster env" do - class TopologyStart2 < RedStorm::SimpleTopology; end + class Topology1 < RedStorm::SimpleTopology; end builder = mock(RedStorm::TopologyBuilder) RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) builder.should_receive(:createTopology).and_return("topology") configurator = mock(RedStorm::SimpleTopology::Configurator) RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator) configurator.should_receive(:config).and_return("config") - RedStorm::StormSubmitter.should_receive("submitTopology").with("topology_start2", "config", "topology") - TopologyStart2.new.start("base_path", :cluster) + RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology") + Topology1.new.start("base_path", :cluster) end it "should raise for invalid env" do - class TopologyStart3 < RedStorm::SimpleTopology; end - lambda {TopologyStart3.new.start("base_path", :toto)}.should raise_error + class Topology1 < RedStorm::SimpleTopology; end + lambda {Topology1.new.start("base_path", :toto)}.should raise_error end it "should build spouts" do - class TopologyStart4 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology spout SpoutClass1 spout SpoutClass2 end @@ -213,19 +223,19 @@ describe RedStorm::SimpleTopology do builder.should_receive("setSpout").with(2, jruby_spout2, 1) configurator.should_receive(:config).and_return("config") builder.should_receive(:createTopology).and_return("topology") - RedStorm::StormSubmitter.should_receive("submitTopology").with("topology_start4", "config", "topology") - TopologyStart4.new.start("base_path", :cluster) + RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology") + Topology1.new.start("base_path", :cluster) end it "should build bolts" do - bolt_definition1 = mock("BoltDefinition") - bolt_definition2 = mock("BoltDefinition") + bolt_definition1 = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, "id1", 2) + bolt_definition2 = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass2, "id2", 3) RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, "id1", 2).and_return(bolt_definition1) RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass2, "id2", 3).and_return(bolt_definition2) bolt_definition1.should_receive(:source).with(1, :shuffle) bolt_definition2.should_receive(:source).with(2, {:fields => ["f1"]}) - class TopologyStart5 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology bolt BoltClass1, :id => "id1", :parallelism => 2 do source 1, :shuffle end @@ -260,13 +270,98 @@ describe RedStorm::SimpleTopology do configurator.should_receive(:config).and_return("config") builder.should_receive(:createTopology).and_return("topology") - RedStorm::StormSubmitter.should_receive("submitTopology").with("topology_start5", "config", "topology") + RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology") - TopologyStart5.new.start("base_path", :cluster) + Topology1.new.start("base_path", :cluster) + end + + + describe "grouping" do + + before(:each) do + builder = mock(RedStorm::TopologyBuilder) + configurator = mock(RedStorm::SimpleTopology::Configurator) + jruby_bolt = mock(RedStorm::JRubyBolt) + @declarer = mock("InputDeclarer") + RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) + RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator) + RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1").and_return(jruby_bolt) + builder.should_receive("setBolt").with(1, jruby_bolt, 1).and_return(@declarer) + configurator.should_receive(:config).and_return("config") + builder.should_receive(:createTopology).and_return("topology") + RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology") + end + + it "should support fields" do + class Topology1 < RedStorm::SimpleTopology + bolt BoltClass1 do + source 1, :fields => "f1" + end + end + + RedStorm::Fields.should_receive(:new).with("f1").and_return("fields") + @declarer.should_receive("fieldsGrouping").with(1, "fields") + Topology1.new.start("base_path", :cluster) + end + + it "should support shuffle" do + class Topology1 < RedStorm::SimpleTopology + bolt BoltClass1 do + source 1, :shuffle + end + end + + @declarer.should_receive("shuffleGrouping").with(1) + Topology1.new.start("base_path", :cluster) + end + + it "should support none" do + class Topology1 < RedStorm::SimpleTopology + bolt BoltClass1 do + source 1, :none + end + end + + @declarer.should_receive("noneGrouping").with(1) + Topology1.new.start("base_path", :cluster) + end + + it "should support global" do + class Topology1 < RedStorm::SimpleTopology + bolt BoltClass1 do + source 1, :global + end + end + + @declarer.should_receive("globalGrouping").with(1) + Topology1.new.start("base_path", :cluster) + end + + it "should support all" do + class Topology1 < RedStorm::SimpleTopology + bolt BoltClass1 do + source 1, :all + end + end + + @declarer.should_receive("allGrouping").with(1) + Topology1.new.start("base_path", :cluster) + end + + it "should support direct" do + class Topology1 < RedStorm::SimpleTopology + bolt BoltClass1 do + source 1, :direct + end + end + + @declarer.should_receive("directGrouping").with(1) + Topology1.new.start("base_path", :cluster) + end end it "should configure" do - class TopologyStart6 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology configure do debug true max_task_parallelism 3 @@ -281,13 +376,13 @@ describe RedStorm::SimpleTopology do builder = mock(RedStorm::TopologyBuilder) RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) builder.should_receive(:createTopology).and_return("topology") - RedStorm::StormSubmitter.should_receive("submitTopology").with("topology_start6", config, "topology") + RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", config, "topology") - TopologyStart6.new.start("base_path", :cluster) + Topology1.new.start("base_path", :cluster) end it "should provide local cluster reference" do - class TopologyStart7 < RedStorm::SimpleTopology; end + class Topology1 < RedStorm::SimpleTopology; end builder = mock(RedStorm::TopologyBuilder) RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) @@ -298,16 +393,16 @@ describe RedStorm::SimpleTopology do cluster = mock(RedStorm::LocalCluster) RedStorm::LocalCluster.should_receive(:new).and_return(cluster) - cluster.should_receive(:submitTopology).with("topology_start7", "config", "topology").and_return("cluster") + cluster.should_receive(:submitTopology).with("topology1", "config", "topology").and_return("cluster") - topology = TopologyStart7.new + topology = Topology1.new topology.start("base_path", :local) topology.cluster.should == cluster end it "should keep numeric ids" do - class TopologyNumericIds1 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology spout SpoutClass1, :id => 1 bolt BoltClass1, :id => 2 do @@ -315,19 +410,19 @@ describe RedStorm::SimpleTopology do end end - TopologyNumericIds1.spouts.first.id.should == 1 - TopologyNumericIds1.bolts.first.id.should == 2 - TopologyNumericIds1.bolts.first.sources.first.should == [1, {:shuffle => nil}] + Topology1.spouts.first.id.should == 1 + Topology1.bolts.first.id.should == 2 + Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}] - TopologyNumericIds1.resolve_ids!(TopologyNumericIds1.spouts + TopologyNumericIds1.bolts) + Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts) - TopologyNumericIds1.spouts.first.id.should == 1 - TopologyNumericIds1.bolts.first.id.should == 2 - TopologyNumericIds1.bolts.first.sources.first.should == [1, {:shuffle => nil}] + Topology1.spouts.first.id.should == 1 + Topology1.bolts.first.id.should == 2 + Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}] end it "should resolve explicit symbolic ids" do - class TopologySymbolicIds1 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology spout SpoutClass1, :id => "id1" bolt BoltClass1, :id => "id2" do @@ -335,19 +430,19 @@ describe RedStorm::SimpleTopology do end end - TopologySymbolicIds1.spouts.first.id.should == "id1" - TopologySymbolicIds1.bolts.first.id.should == "id2" - TopologySymbolicIds1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}] + Topology1.spouts.first.id.should == "id1" + Topology1.bolts.first.id.should == "id2" + Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}] - TopologySymbolicIds1.resolve_ids!(TopologySymbolicIds1.spouts + TopologySymbolicIds1.bolts) + Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts) - TopologySymbolicIds1.spouts.first.id.should == 1 - TopologySymbolicIds1.bolts.first.id.should == 2 - TopologySymbolicIds1.bolts.first.sources.first.should == [1, {:shuffle => nil}] + Topology1.spouts.first.id.should == 1 + Topology1.bolts.first.id.should == 2 + Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}] end it "should resolve implicit symbolic ids" do - class TopologySymbolicIds2 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology spout SpoutClass1 bolt BoltClass1 do @@ -355,19 +450,19 @@ describe RedStorm::SimpleTopology do end end - TopologySymbolicIds2.spouts.first.id.should == "spout_class1" - TopologySymbolicIds2.bolts.first.id.should == "bolt_class1" - TopologySymbolicIds2.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}] + Topology1.spouts.first.id.should == "spout_class1" + Topology1.bolts.first.id.should == "bolt_class1" + Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}] - TopologySymbolicIds2.resolve_ids!(TopologySymbolicIds2.spouts + TopologySymbolicIds2.bolts) + Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts) - TopologySymbolicIds2.spouts.first.id.should == 1 - TopologySymbolicIds2.bolts.first.id.should == 2 - TopologySymbolicIds2.bolts.first.sources.first.should == [1, {:shuffle => nil}] + Topology1.spouts.first.id.should == 1 + Topology1.bolts.first.id.should == 2 + Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}] end it "should raise on unresolvable" do - class TopologySymbolicIds3 < RedStorm::SimpleTopology + class Topology1 < RedStorm::SimpleTopology spout SpoutClass1 bolt BoltClass1 do @@ -375,11 +470,23 @@ describe RedStorm::SimpleTopology do end end - TopologySymbolicIds3.spouts.first.id.should == "spout_class1" - TopologySymbolicIds3.bolts.first.id.should == "bolt_class1" - TopologySymbolicIds3.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}] + Topology1.spouts.first.id.should == "spout_class1" + Topology1.bolts.first.id.should == "bolt_class1" + Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}] - lambda {TopologySymbolicIds3.resolve_ids!(TopologySymbolicIds3.spouts + TopologySymbolicIds3.bolts)}.should raise_error RuntimeError, "cannot resolve BoltClass1 source id=\"dummy\"" + lambda {Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)}.should raise_error RuntimeError, "cannot resolve BoltClass1 source id=dummy" + end + + it "should raise on duplicate conflict" do + class Topology1 < RedStorm::SimpleTopology + spout SpoutClass1 + spout SpoutClass1 + end + + Topology1.spouts.first.id.should == "spout_class1" + Topology1.spouts.last.id.should == "spout_class1" + + lambda {Topology1.resolve_ids!(Topology1.spouts)}.should raise_error RuntimeError, "duplicate symbolic id in SpoutClass1 on id=spout_class1" end end