issue #38, added support for spout reliable emit
This commit is contained in:
parent
b7bb6e4091
commit
35c30e6ab0
|
@ -53,9 +53,14 @@ module RedStorm
|
|||
|
||||
# DSL instance methods
|
||||
|
||||
def emit(*values)
|
||||
def reliable_emit(message_id, *values)
|
||||
@collector.emit(Values.new(*values), message_id)
|
||||
end
|
||||
|
||||
def unreliable_emit(*values)
|
||||
@collector.emit(Values.new(*values))
|
||||
end
|
||||
alias_method :emit, :unreliable_emit
|
||||
|
||||
def log
|
||||
self.class.log
|
||||
|
@ -68,7 +73,12 @@ module RedStorm
|
|||
if self.class.emit?
|
||||
if output
|
||||
values = [output].flatten
|
||||
@collector.emit(Values.new(*values))
|
||||
if self.class.reliable?
|
||||
message_id = values.shift
|
||||
reliable_emit(message_id, *values)
|
||||
else
|
||||
unreliable_emit(*values)
|
||||
end
|
||||
else
|
||||
sleep(0.1)
|
||||
end
|
||||
|
@ -159,11 +169,15 @@ module RedStorm
|
|||
end
|
||||
|
||||
def self.send_options
|
||||
@send_options ||= {:emit => true}
|
||||
@send_options ||= {:emit => true, :reliable => false}
|
||||
end
|
||||
|
||||
def self.emit?
|
||||
!!self.send_options[:emit]
|
||||
end
|
||||
|
||||
def self.reliable?
|
||||
!!self.send_options[:reliable]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -95,7 +95,7 @@ describe RedStorm::SimpleSpout do
|
|||
end
|
||||
|
||||
describe "on_send statement" do
|
||||
DEFAULT_SEND_OPTIONS = {:emit => true}
|
||||
DEFAULT_SEND_OPTIONS = {:emit => true, :reliable => false}
|
||||
|
||||
it "should emit by defaut" do
|
||||
RedStorm::SimpleSpout.send(:emit?).should be_true
|
||||
|
@ -403,7 +403,7 @@ describe RedStorm::SimpleSpout do
|
|||
|
||||
describe "next_tuple" do
|
||||
|
||||
it "should auto emit on single value output" do
|
||||
it "should auto unreliable emit on single value output" do
|
||||
class Spout1 < RedStorm::SimpleSpout
|
||||
on_send {"output"}
|
||||
end
|
||||
|
@ -432,7 +432,41 @@ describe RedStorm::SimpleSpout do
|
|||
spout.next_tuple
|
||||
end
|
||||
|
||||
it "should auto emit on multiple values output" do
|
||||
it "should auto reliable emit on single value output" do
|
||||
class Spout1 < RedStorm::SimpleSpout
|
||||
on_send :reliable => true do
|
||||
[1, "output"]
|
||||
end
|
||||
end
|
||||
class Spout2 < RedStorm::SimpleSpout
|
||||
on_send :my_method, :reliable => true
|
||||
def my_method; [2, "output"]; end
|
||||
end
|
||||
class Spout3 < RedStorm::SimpleSpout
|
||||
on_send :reliable => true
|
||||
def on_send; [3, "output"] end
|
||||
end
|
||||
|
||||
collector = mock("Collector")
|
||||
RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values")
|
||||
collector.should_receive(:emit).with("values", 1).once
|
||||
collector.should_receive(:emit).with("values", 2).once
|
||||
collector.should_receive(:emit).with("values", 3).once
|
||||
|
||||
spout = Spout1.new
|
||||
spout.open(nil, nil, collector)
|
||||
spout.next_tuple
|
||||
|
||||
spout = Spout2.new
|
||||
spout.open(nil, nil, collector)
|
||||
spout.next_tuple
|
||||
|
||||
spout = Spout3.new
|
||||
spout.open(nil, nil, collector)
|
||||
spout.next_tuple
|
||||
end
|
||||
|
||||
it "should auto unreliable emit on multiple values output" do
|
||||
class Spout1 < RedStorm::SimpleSpout
|
||||
on_send {["output1", "output2"]}
|
||||
end
|
||||
|
@ -461,6 +495,40 @@ describe RedStorm::SimpleSpout do
|
|||
spout.next_tuple
|
||||
end
|
||||
|
||||
it "should auto reliable emit on multiple values output" do
|
||||
class Spout1 < RedStorm::SimpleSpout
|
||||
on_send :reliable => true do
|
||||
[1, "output1", "output2"]
|
||||
end
|
||||
end
|
||||
class Spout2 < RedStorm::SimpleSpout
|
||||
on_send :my_method, :reliable => true
|
||||
def my_method; [2, "output1", "output2"]; end
|
||||
end
|
||||
class Spout3 < RedStorm::SimpleSpout
|
||||
on_send :reliable => true
|
||||
def on_send; [3, "output1", "output2"] end
|
||||
end
|
||||
|
||||
collector = mock("Collector")
|
||||
RedStorm::Values.should_receive(:new).with("output1", "output2").exactly(3).times.and_return("values")
|
||||
collector.should_receive(:emit).with("values", 1).once
|
||||
collector.should_receive(:emit).with("values", 2).once
|
||||
collector.should_receive(:emit).with("values", 3).once
|
||||
|
||||
spout = Spout1.new
|
||||
spout.open(nil, nil, collector)
|
||||
spout.next_tuple
|
||||
|
||||
spout = Spout2.new
|
||||
spout.open(nil, nil, collector)
|
||||
spout.next_tuple
|
||||
|
||||
spout = Spout3.new
|
||||
spout.open(nil, nil, collector)
|
||||
spout.next_tuple
|
||||
end
|
||||
|
||||
it "should sleep on nil output" do
|
||||
class Spout1 < RedStorm::SimpleSpout
|
||||
on_send {nil}
|
||||
|
@ -527,6 +595,35 @@ describe RedStorm::SimpleSpout do
|
|||
spout.open(nil, nil, collector)
|
||||
spout.next_tuple
|
||||
end
|
||||
|
||||
it "should support manual emit" do
|
||||
class Spout1 < RedStorm::SimpleSpout
|
||||
on_send :emit => false do
|
||||
reliable_emit 1, "reliable output"
|
||||
end
|
||||
end
|
||||
class Spout2 < RedStorm::SimpleSpout
|
||||
on_send :emit => false do
|
||||
unreliable_emit "unreliable output"
|
||||
end
|
||||
end
|
||||
|
||||
collector = mock("Collector")
|
||||
RedStorm::Values.should_receive(:new).once.with("reliable output").and_return("reliable values")
|
||||
RedStorm::Values.should_receive(:new).once.with("unreliable output").and_return("unreliable values")
|
||||
collector.should_receive(:emit).with("unreliable values").once
|
||||
collector.should_receive(:emit).with("reliable values", 1).once
|
||||
|
||||
spout = Spout1.new
|
||||
spout.should_receive(:sleep).never
|
||||
spout.open(nil, nil, collector)
|
||||
spout.next_tuple
|
||||
|
||||
spout = Spout2.new
|
||||
spout.should_receive(:sleep).never
|
||||
spout.open(nil, nil, collector)
|
||||
spout.next_tuple
|
||||
end
|
||||
end
|
||||
|
||||
describe "open" do
|
||||
|
|
Loading…
Reference in New Issue