diff --git a/lib/red_storm/simple_spout.rb b/lib/red_storm/simple_spout.rb index 06ce001..c95b6f4 100644 --- a/lib/red_storm/simple_spout.rb +++ b/lib/red_storm/simple_spout.rb @@ -53,9 +53,14 @@ module RedStorm # DSL instance methods - def emit(*values) + def reliable_emit(message_id, *values) + @collector.emit(Values.new(*values), message_id) + end + + def unreliable_emit(*values) @collector.emit(Values.new(*values)) end + alias_method :emit, :unreliable_emit def log self.class.log @@ -68,7 +73,12 @@ module RedStorm if self.class.emit? if output values = [output].flatten - @collector.emit(Values.new(*values)) + if self.class.reliable? + message_id = values.shift + reliable_emit(message_id, *values) + else + unreliable_emit(*values) + end else sleep(0.1) end @@ -159,11 +169,15 @@ module RedStorm end def self.send_options - @send_options ||= {:emit => true} + @send_options ||= {:emit => true, :reliable => false} end def self.emit? !!self.send_options[:emit] end + + def self.reliable? + !!self.send_options[:reliable] + end end end diff --git a/spec/red_storm/simple_spout_spec.rb b/spec/red_storm/simple_spout_spec.rb index c9d0f25..ec12398 100644 --- a/spec/red_storm/simple_spout_spec.rb +++ b/spec/red_storm/simple_spout_spec.rb @@ -95,7 +95,7 @@ describe RedStorm::SimpleSpout do end describe "on_send statement" do - DEFAULT_SEND_OPTIONS = {:emit => true} + DEFAULT_SEND_OPTIONS = {:emit => true, :reliable => false} it "should emit by defaut" do RedStorm::SimpleSpout.send(:emit?).should be_true @@ -403,7 +403,7 @@ describe RedStorm::SimpleSpout do describe "next_tuple" do - it "should auto emit on single value output" do + it "should auto unreliable emit on single value output" do class Spout1 < RedStorm::SimpleSpout on_send {"output"} end @@ -432,7 +432,41 @@ describe RedStorm::SimpleSpout do spout.next_tuple end - it "should auto emit on multiple values output" do + it "should auto reliable emit on single value output" do + class Spout1 < RedStorm::SimpleSpout + on_send :reliable => true do + [1, "output"] + end + end + class Spout2 < RedStorm::SimpleSpout + on_send :my_method, :reliable => true + def my_method; [2, "output"]; end + end + class Spout3 < RedStorm::SimpleSpout + on_send :reliable => true + def on_send; [3, "output"] end + end + + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values") + collector.should_receive(:emit).with("values", 1).once + collector.should_receive(:emit).with("values", 2).once + collector.should_receive(:emit).with("values", 3).once + + spout = Spout1.new + spout.open(nil, nil, collector) + spout.next_tuple + + spout = Spout2.new + spout.open(nil, nil, collector) + spout.next_tuple + + spout = Spout3.new + spout.open(nil, nil, collector) + spout.next_tuple + end + + it "should auto unreliable emit on multiple values output" do class Spout1 < RedStorm::SimpleSpout on_send {["output1", "output2"]} end @@ -461,6 +495,40 @@ describe RedStorm::SimpleSpout do spout.next_tuple end + it "should auto reliable emit on multiple values output" do + class Spout1 < RedStorm::SimpleSpout + on_send :reliable => true do + [1, "output1", "output2"] + end + end + class Spout2 < RedStorm::SimpleSpout + on_send :my_method, :reliable => true + def my_method; [2, "output1", "output2"]; end + end + class Spout3 < RedStorm::SimpleSpout + on_send :reliable => true + def on_send; [3, "output1", "output2"] end + end + + collector = mock("Collector") + RedStorm::Values.should_receive(:new).with("output1", "output2").exactly(3).times.and_return("values") + collector.should_receive(:emit).with("values", 1).once + collector.should_receive(:emit).with("values", 2).once + collector.should_receive(:emit).with("values", 3).once + + spout = Spout1.new + spout.open(nil, nil, collector) + spout.next_tuple + + spout = Spout2.new + spout.open(nil, nil, collector) + spout.next_tuple + + spout = Spout3.new + spout.open(nil, nil, collector) + spout.next_tuple + end + it "should sleep on nil output" do class Spout1 < RedStorm::SimpleSpout on_send {nil} @@ -527,6 +595,35 @@ describe RedStorm::SimpleSpout do spout.open(nil, nil, collector) spout.next_tuple end + + it "should support manual emit" do + class Spout1 < RedStorm::SimpleSpout + on_send :emit => false do + reliable_emit 1, "reliable output" + end + end + class Spout2 < RedStorm::SimpleSpout + on_send :emit => false do + unreliable_emit "unreliable output" + end + end + + collector = mock("Collector") + RedStorm::Values.should_receive(:new).once.with("reliable output").and_return("reliable values") + RedStorm::Values.should_receive(:new).once.with("unreliable output").and_return("unreliable values") + collector.should_receive(:emit).with("unreliable values").once + collector.should_receive(:emit).with("reliable values", 1).once + + spout = Spout1.new + spout.should_receive(:sleep).never + spout.open(nil, nil, collector) + spout.next_tuple + + spout = Spout2.new + spout.should_receive(:sleep).never + spout.open(nil, nil, collector) + spout.next_tuple + end end describe "open" do