Select stream when setting source for bolt

This commit is contained in:
Keith Walters 2015-03-03 22:47:37 +00:00
parent cbb62efc52
commit 54e43e39dc
6 changed files with 275 additions and 29 deletions

View File

@ -58,14 +58,26 @@ module RedStorm
self.class.log
end
def stream
self.class.stream
end
def unanchored_emit(*values)
@collector.emit_tuple(Values.new(*values))
end
def unanchored_stream_emit(stream, *values)
@collector.emit_tuple_stream(stream, Values.new(*values))
end
def anchored_emit(tuple, *values)
@collector.emit_anchor_tuple(tuple, Values.new(*values))
end
def anchored_stream_emit(stream, tuple, *values)
@collector.emit_anchor_tuple_stream(stream, tuple, Values.new(*values))
end
def ack(tuple)
@collector.ack(tuple)
end
@ -80,7 +92,21 @@ module RedStorm
output = on_receive(tuple)
if output && self.class.emit?
values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output
values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)}
values_list.each do |values|
if self.class.anchor?
if self.class.stream?
anchored_stream_emit(self.stream, tuple, *values)
else
anchored_emit(tuple, *values)
end
else
if self.class.stream?
unanchored_stream_emit(self.stream, *values)
else
unanchored_emit(*values)
end
end
end
@collector.ack(tuple) if self.class.ack?
end
end
@ -137,6 +163,14 @@ module RedStorm
!!self.receive_options[:anchor]
end
def self.stream?
self.receive_options[:stream] && !self.receive_options[:stream].empty?
end
def self.stream
self.receive_options[:stream]
end
# below non-dry see Spout class
def self.inherited(subclass)
path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(BoltError, "unable to extract base topology class path from #{caller.first.inspect}")

View File

@ -6,4 +6,13 @@ java_import 'backtype.storm.tuple.Tuple'
class OutputCollector
java_alias :emit_tuple, :emit, [java.lang.Class.for_name("java.util.List")]
java_alias :emit_anchor_tuple, :emit, [Tuple.java_class, java.lang.Class.for_name("java.util.List")]
java_alias :emit_tuple_stream, :emit, [
java.lang.String,
java.lang.Class.for_name("java.util.List")
]
java_alias :emit_anchor_tuple_stream, :emit, [
java.lang.String,
Tuple.java_class,
java.lang.Class.for_name("java.util.List")
]
end

View File

@ -60,29 +60,33 @@ module RedStorm
@sources = []
end
def source(source_id, grouping)
@sources << [source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}]
def source(source_id, grouping, stream = 'default')
@sources << [
source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s,
grouping.is_a?(Hash) ? grouping : {grouping => nil},
stream.to_s
]
end
def define_grouping(declarer)
@sources.each do |source_id, grouping|
@sources.each do |source_id, grouping, stream|
grouper, params = grouping.first
# declarer.fieldsGrouping(source_id, Fields.new())
case grouper
when :fields
declarer.fieldsGrouping(source_id, Fields.new(*([params].flatten.map(&:to_s))))
declarer.fieldsGrouping(source_id, stream, Fields.new(*([params].flatten.map(&:to_s))))
when :global
declarer.globalGrouping(source_id)
declarer.globalGrouping(source_id, stream)
when :shuffle
declarer.shuffleGrouping(source_id)
declarer.shuffleGrouping(source_id, stream)
when :local_or_shuffle
declarer.localOrShuffleGrouping(source_id)
declarer.localOrShuffleGrouping(source_id, stream)
when :none
declarer.noneGrouping(source_id)
declarer.noneGrouping(source_id, stream)
when :all
declarer.allGrouping(source_id)
declarer.allGrouping(source_id, stream)
when :direct
declarer.directGrouping(source_id)
declarer.directGrouping(source_id, stream)
else
raise("unknown grouper=#{grouper.inspect}")
end

View File

@ -603,6 +603,74 @@ describe RedStorm::SimpleBolt do
bolt.prepare(nil, nil, collector)
bolt.execute("output")
end
it "should emit tuple on a stream" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :stream => :custom_stream do |tuple|
tuple
end
end
class Bolt2 < RedStorm::SimpleBolt
on_receive :my_method, :stream => :custom_stream
def my_method(tuple); tuple; end
end
class Bolt3 < RedStorm::SimpleBolt
on_receive :stream => :custom_stream
def on_receive(tuple); tuple; end
end
collector = mock("Collector")
RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values")
collector.should_receive(:emit_tuple_stream).with(:custom_stream, "values").exactly(3).times
bolt = Bolt1.new
bolt.prepare(nil, nil, collector)
bolt.execute("output")
bolt = Bolt2.new
bolt.prepare(nil, nil, collector)
bolt.execute("output")
bolt = Bolt3.new
bolt.prepare(nil, nil, collector)
bolt.execute("output")
end
it "should emit anchored tuple on a stream" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :anchor => true, :stream => :custom_stream do |tuple|
"output"
end
end
class Bolt2 < RedStorm::SimpleBolt
on_receive :my_method, :anchor => true, :stream => :custom_stream
def my_method(tuple)
"output"
end
end
class Bolt3 < RedStorm::SimpleBolt
on_receive :anchor => true, :stream => :custom_stream
def on_receive(tuple)
"output"
end
end
collector = mock("Collector")
RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values")
collector.should_receive(:emit_anchor_tuple_stream).with(:custom_stream, "tuple", "values").exactly(3).times
bolt = Bolt1.new
bolt.prepare(nil, nil, collector)
bolt.execute("tuple")
bolt = Bolt2.new
bolt.prepare(nil, nil, collector)
bolt.execute("tuple")
bolt = Bolt3.new
bolt.prepare(nil, nil, collector)
bolt.execute("tuple")
end
end
describe "prepare" do
@ -701,4 +769,4 @@ describe RedStorm::SimpleBolt do
end
end
end
end

View File

@ -10,5 +10,11 @@ describe OutputCollector do
# We should have an alias for #emit_anchor_tuple
it { should respond_to :emit_anchor_tuple }
# We should have an alias for #emit_tuple_stream
it { should respond_to :emit_tuple_stream }
# We should have an alias for #emit_anchor_tuple_stream
it { should respond_to :emit_anchor_tuple_stream }
end
end

View File

@ -340,6 +340,7 @@ describe RedStorm::SimpleTopology do
configurator = mock(RedStorm::Configurator)
jruby_bolt1 = mock(RedStorm::JRubyBolt)
jruby_bolt2 = mock(RedStorm::JRubyBolt)
jruby_bolt3 = mock(RedStorm::JRubyBolt)
declarer = mock("Declarer")
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
@ -394,7 +395,20 @@ describe RedStorm::SimpleTopology do
end
RedStorm::Fields.should_receive(:new).with("f1").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
@declarer.should_receive("fieldsGrouping").with('1', 'default', "fields")
Topology1.new.start(:cluster)
end
it "should support single string fields with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, { :fields => "f1" }, 'custom_stream'
end
end
RedStorm::Fields.should_receive(:new).with("f1").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields")
Topology1.new.start(:cluster)
end
@ -407,7 +421,20 @@ describe RedStorm::SimpleTopology do
end
RedStorm::Fields.should_receive(:new).with("s1").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
@declarer.should_receive("fieldsGrouping").with('1', 'default', "fields")
Topology1.new.start(:cluster)
end
it "should support single symbolic fields with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, { :fields => :s1 }, 'custom_stream'
end
end
RedStorm::Fields.should_receive(:new).with("s1").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields")
Topology1.new.start(:cluster)
end
@ -420,7 +447,20 @@ describe RedStorm::SimpleTopology do
end
RedStorm::Fields.should_receive(:new).with("f1", "f2").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
@declarer.should_receive("fieldsGrouping").with('1', 'default', "fields")
Topology1.new.start(:cluster)
end
it "should support string array fields with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, { :fields => ["f1", "f2"] }, 'custom_stream'
end
end
RedStorm::Fields.should_receive(:new).with("f1", "f2").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields")
Topology1.new.start(:cluster)
end
@ -433,7 +473,20 @@ describe RedStorm::SimpleTopology do
end
RedStorm::Fields.should_receive(:new).with("s1", "s2").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
@declarer.should_receive("fieldsGrouping").with('1', 'default', "fields")
Topology1.new.start(:cluster)
end
it "should support symbolic array fields with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, { :fields => [:s1, :s2] }, 'custom_stream'
end
end
RedStorm::Fields.should_receive(:new).with("s1", "s2").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields")
Topology1.new.start(:cluster)
end
@ -445,7 +498,19 @@ describe RedStorm::SimpleTopology do
end
end
@declarer.should_receive("shuffleGrouping").with('1')
@declarer.should_receive("shuffleGrouping").with('1', 'default')
Topology1.new.start(:cluster)
end
it "should support shuffle with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :shuffle, 'custom_stream'
end
end
@declarer.should_receive("shuffleGrouping").with('1', 'custom_stream')
Topology1.new.start(:cluster)
end
@ -457,7 +522,19 @@ describe RedStorm::SimpleTopology do
end
end
@declarer.should_receive("localOrShuffleGrouping").with('1')
@declarer.should_receive("localOrShuffleGrouping").with('1', 'default')
Topology1.new.start(:cluster)
end
it "should support local_or_shuffle with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :local_or_shuffle, 'custom_stream'
end
end
@declarer.should_receive("localOrShuffleGrouping").with('1', 'custom_stream')
Topology1.new.start(:cluster)
end
@ -469,7 +546,19 @@ describe RedStorm::SimpleTopology do
end
end
@declarer.should_receive("noneGrouping").with('1')
@declarer.should_receive("noneGrouping").with('1', 'default')
Topology1.new.start(:cluster)
end
it "should support none" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :none, 'custom_stream'
end
end
@declarer.should_receive("noneGrouping").with('1', 'custom_stream')
Topology1.new.start(:cluster)
end
@ -481,7 +570,19 @@ describe RedStorm::SimpleTopology do
end
end
@declarer.should_receive("globalGrouping").with('1')
@declarer.should_receive("globalGrouping").with('1', 'default')
Topology1.new.start(:cluster)
end
it "should support global with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :global, 'custom_stream'
end
end
@declarer.should_receive("globalGrouping").with('1', 'custom_stream')
Topology1.new.start(:cluster)
end
@ -493,7 +594,19 @@ describe RedStorm::SimpleTopology do
end
end
@declarer.should_receive("allGrouping").with('1')
@declarer.should_receive("allGrouping").with('1', 'default')
Topology1.new.start(:cluster)
end
it "should support all with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :all, 'custom_stream'
end
end
@declarer.should_receive("allGrouping").with('1', 'custom_stream')
Topology1.new.start(:cluster)
end
@ -505,7 +618,19 @@ describe RedStorm::SimpleTopology do
end
end
@declarer.should_receive("directGrouping").with('1')
@declarer.should_receive("directGrouping").with('1', 'default')
Topology1.new.start(:cluster)
end
it "should support direct with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :direct, 'custom_stream'
end
end
@declarer.should_receive("directGrouping").with('1', 'custom_stream')
Topology1.new.start(:cluster)
end
end
@ -563,7 +688,7 @@ describe RedStorm::SimpleTopology do
Topology1.spouts.first.id.should == '1'
Topology1.bolts.first.id.should == '2'
Topology1.bolts.first.sources.first.should == ['1', {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ['1', {:shuffle => nil}, 'default']
end
it "should support explicit string ids" do
@ -577,7 +702,7 @@ describe RedStorm::SimpleTopology do
Topology1.spouts.first.id.should == "id1"
Topology1.bolts.first.id.should == "id2"
Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}, 'default']
end
it "should support implicit string ids" do
@ -591,7 +716,7 @@ describe RedStorm::SimpleTopology do
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}, 'default']
end
it "should support implicit symbol ids" do
@ -605,7 +730,7 @@ describe RedStorm::SimpleTopology do
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}, 'default']
end
it "should support implicit class ids" do
@ -619,7 +744,7 @@ describe RedStorm::SimpleTopology do
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}, 'default']
end
it "should raise on unresolvable" do
@ -633,7 +758,7 @@ describe RedStorm::SimpleTopology do
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}, 'default']
lambda {Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)}.should raise_error RuntimeError, "cannot resolve BoltClass1 source id=dummy"
end
@ -651,4 +776,4 @@ describe RedStorm::SimpleTopology do
end
end
end
end