added symbolic ids support

This commit is contained in:
Colin Surprenant 2011-11-14 22:56:27 -05:00
parent ea0106ca92
commit b29ec72695
2 changed files with 125 additions and 14 deletions

View File

@ -4,10 +4,11 @@ module RedStorm
attr_reader :cluster # LocalCluster reference usable in on_submit block for example
class BoltDefinition
attr_reader :bolt_class, :id, :parallelism
attr_reader :clazz, :parallelism
attr_accessor :id, :sources
def initialize(bolt_class, id, parallelism)
@bolt_class = bolt_class
@clazz = bolt_class
@id = id
@parallelism = parallelism
@sources = []
@ -34,10 +35,11 @@ module RedStorm
end
class SpoutDefinition
attr_reader :spout_class, :id, :parallelism
attr_reader :clazz, :parallelism
attr_accessor :id
def initialize(spout_class, id, parallelism)
@spout_class = spout_class
@clazz = spout_class
@id = id
@parallelism = parallelism
end
@ -88,12 +90,14 @@ module RedStorm
# topology proxy interface
def start(base_class_path, env)
self.class.resolve_ids!(self.class.spouts + self.class.bolts)
builder = TopologyBuilder.new
self.class.spouts.each do |spout|
builder.setSpout(spout.id, JRubySpout.new(base_class_path, spout.spout_class.name), spout.parallelism)
builder.setSpout(spout.id, JRubySpout.new(base_class_path, spout.clazz.name), spout.parallelism)
end
self.class.bolts.each do |bolt|
storm_bolt = builder.setBolt(bolt.id, JRubyBolt.new(base_class_path, bolt.bolt_class.name), bolt.parallelism)
storm_bolt = builder.setBolt(bolt.id, JRubyBolt.new(base_class_path, bolt.clazz.name), bolt.parallelism)
bolt.define_grouping(storm_bolt)
end
@ -115,6 +119,35 @@ module RedStorm
private
def self.resolve_ids!(components)
next_id = 1
resolved_names = {}
numeric_ids, symbolic_ids = components.map(&:id).partition{|id| id.is_a?(Fixnum)}
# map unused numeric ids to symbolic ids
symbolic_ids.map(&:to_s).uniq.each do |id|
unless resolved_names.has_key?(id)
next_id += 1 while numeric_ids.include?(next_id)
numeric_ids << next_id
resolved_names[id] = next_id
end
end
# reassign numeric ids in all components
components.each do |component|
unless component.id.is_a?(Fixnum)
component.id = resolved_names[component.id] || raise("cannot resolve #{component.clazz.name} id=#{component.id.inspect}")
end
if component.respond_to?(:sources)
component.sources.map! do |source_id, grouping|
id = source_id.is_a?(Fixnum) ? source_id : resolved_names[source_id] || raise("cannot resolve #{component.clazz.name} source id=#{source_id.inspect}")
[id, grouping]
end
end
end
end
def self.spouts
@spouts ||= []
end

View File

@ -163,7 +163,7 @@ describe RedStorm::SimpleTopology do
it "should start in :local env" do
class TopologyStart1 < RedStorm::SimpleTopology; end
builder = mock(RedStorm::TopologyBuilder)
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
builder.should_receive(:createTopology).and_return("topology")
@ -209,8 +209,8 @@ describe RedStorm::SimpleTopology do
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1").and_return(jruby_spout1)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2").and_return(jruby_spout2)
builder.should_receive("setSpout").with("spout_class1", jruby_spout1, 1)
builder.should_receive("setSpout").with("spout_class2", jruby_spout2, 1)
builder.should_receive("setSpout").with(1, jruby_spout1, 1)
builder.should_receive("setSpout").with(2, jruby_spout2, 1)
configurator.should_receive(:config).and_return("config")
builder.should_receive(:createTopology).and_return("topology")
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology_start4", "config", "topology")
@ -249,12 +249,14 @@ describe RedStorm::SimpleTopology do
bolt_definition1.should_receive(:define_grouping).with("storm_bolt1")
bolt_definition2.should_receive(:define_grouping).with("storm_bolt2")
bolt_definition1.should_receive(:bolt_class).and_return(BoltClass1)
bolt_definition2.should_receive(:bolt_class).and_return(BoltClass2)
bolt_definition1.should_receive(:clazz).and_return(BoltClass1)
bolt_definition2.should_receive(:clazz).and_return(BoltClass2)
bolt_definition1.should_receive(:parallelism).and_return(2)
bolt_definition2.should_receive(:parallelism).and_return(3)
bolt_definition1.should_receive(:id).and_return("id1")
bolt_definition2.should_receive(:id).and_return("id2")
bolt_definition1.should_receive(:id).any_number_of_times.and_return("id1")
bolt_definition2.should_receive(:id).any_number_of_times.and_return("id2")
bolt_definition1.should_receive(:id=).with(1)
bolt_definition2.should_receive(:id=).with(2)
configurator.should_receive(:config).and_return("config")
builder.should_receive(:createTopology).and_return("topology")
@ -285,7 +287,7 @@ describe RedStorm::SimpleTopology do
end
it "should provide local cluster reference" do
class TopologyStart7< RedStorm::SimpleTopology; end
class TopologyStart7 < RedStorm::SimpleTopology; end
builder = mock(RedStorm::TopologyBuilder)
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
@ -304,5 +306,81 @@ describe RedStorm::SimpleTopology do
topology.cluster.should == cluster
end
it "should keep numeric ids" do
class TopologyNumericIds1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1, :id => 2 do
source 1, :shuffle
end
end
TopologyNumericIds1.spouts.first.id.should == 1
TopologyNumericIds1.bolts.first.id.should == 2
TopologyNumericIds1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
TopologyNumericIds1.resolve_ids!(TopologyNumericIds1.spouts + TopologyNumericIds1.bolts)
TopologyNumericIds1.spouts.first.id.should == 1
TopologyNumericIds1.bolts.first.id.should == 2
TopologyNumericIds1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
end
it "should resolve explicit symbolic ids" do
class TopologySymbolicIds1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => "id1"
bolt BoltClass1, :id => "id2" do
source "id1", :shuffle
end
end
TopologySymbolicIds1.spouts.first.id.should == "id1"
TopologySymbolicIds1.bolts.first.id.should == "id2"
TopologySymbolicIds1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}]
TopologySymbolicIds1.resolve_ids!(TopologySymbolicIds1.spouts + TopologySymbolicIds1.bolts)
TopologySymbolicIds1.spouts.first.id.should == 1
TopologySymbolicIds1.bolts.first.id.should == 2
TopologySymbolicIds1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
end
it "should resolve implicit symbolic ids" do
class TopologySymbolicIds2 < RedStorm::SimpleTopology
spout SpoutClass1
bolt BoltClass1 do
source "spout_class1", :shuffle
end
end
TopologySymbolicIds2.spouts.first.id.should == "spout_class1"
TopologySymbolicIds2.bolts.first.id.should == "bolt_class1"
TopologySymbolicIds2.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}]
TopologySymbolicIds2.resolve_ids!(TopologySymbolicIds2.spouts + TopologySymbolicIds2.bolts)
TopologySymbolicIds2.spouts.first.id.should == 1
TopologySymbolicIds2.bolts.first.id.should == 2
TopologySymbolicIds2.bolts.first.sources.first.should == [1, {:shuffle => nil}]
end
it "should raise on unresolvable" do
class TopologySymbolicIds3 < RedStorm::SimpleTopology
spout SpoutClass1
bolt BoltClass1 do
source "dummy", :shuffle
end
end
TopologySymbolicIds3.spouts.first.id.should == "spout_class1"
TopologySymbolicIds3.bolts.first.id.should == "bolt_class1"
TopologySymbolicIds3.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}]
lambda {TopologySymbolicIds3.resolve_ids!(TopologySymbolicIds3.spouts + TopologySymbolicIds3.bolts)}.should raise_error RuntimeError, "cannot resolve BoltClass1 source id=\"dummy\""
end
end
end