storm ids are now strings
This commit is contained in:
parent
52f8628709
commit
851c412836
|
@ -8,11 +8,11 @@ module RedStorm
|
|||
|
||||
class ComponentDefinition
|
||||
attr_reader :clazz, :parallelism
|
||||
attr_accessor :id
|
||||
attr_accessor :id # ids are forced to string
|
||||
|
||||
def initialize(component_class, id, parallelism)
|
||||
@clazz = component_class
|
||||
@id = id
|
||||
@id = id.to_s
|
||||
@parallelism = parallelism
|
||||
end
|
||||
end
|
||||
|
@ -28,7 +28,7 @@ module RedStorm
|
|||
end
|
||||
|
||||
def source(source_id, grouping)
|
||||
@sources << [source_id.is_a?(Class) ? SimpleTopology.underscore(source_id) : source_id, grouping.is_a?(Hash) ? grouping : {grouping => nil}]
|
||||
@sources << [source_id.is_a?(Class) ? SimpleTopology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}]
|
||||
end
|
||||
|
||||
def define_grouping(declarer)
|
||||
|
@ -131,33 +131,44 @@ module RedStorm
|
|||
private
|
||||
|
||||
def self.resolve_ids!(components)
|
||||
next_numeric_id = 1
|
||||
resolved_names = {}
|
||||
|
||||
numeric_components, symbolic_components = components.partition{|c| c.id.is_a?(Fixnum)}
|
||||
numeric_ids = numeric_components.map(&:id)
|
||||
|
||||
# assign numeric ids to symbolic ids
|
||||
symbolic_components.each do |component|
|
||||
id = component.id.to_s
|
||||
raise("duplicate symbolic id in #{component.clazz.name} on id=#{id}") if resolved_names.has_key?(id)
|
||||
next_numeric_id += 1 while numeric_ids.include?(next_numeric_id)
|
||||
numeric_ids << next_numeric_id
|
||||
resolved_names[id] = next_numeric_id
|
||||
end
|
||||
|
||||
# reassign numeric ids to all components
|
||||
components.each do |component|
|
||||
unless component.id.is_a?(Fixnum)
|
||||
component.id = resolved_names[component.id.to_s] || raise("cannot resolve #{component.clazz.name} id=#{component.id.to_s}")
|
||||
end
|
||||
# verify duplicate implicit ids
|
||||
ids = components.map(&:id)
|
||||
components.reverse.each do |component|
|
||||
raise("duplicate id in #{component.clazz.name} on id=#{component.id}") if ids.select{|id| id == component.id}.size > 1
|
||||
if component.respond_to?(:sources)
|
||||
component.sources.map! do |source_id, grouping|
|
||||
id = source_id.is_a?(Fixnum) ? source_id : resolved_names[source_id.to_s] || raise("cannot resolve #{component.clazz.name} source id=#{source_id.to_s}")
|
||||
[id, grouping]
|
||||
end
|
||||
component.sources.each{|source_id, grouping| raise("cannot resolve #{component.clazz.name} source id=#{source_id}") unless ids.include?(source_id)}
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
# next_numeric_id = 1
|
||||
# resolved_names = {}
|
||||
|
||||
|
||||
# numeric_components, symbolic_components = components.partition{|c| c.id.is_a?(Fixnum) || c.id.to_s =~ /^\d+$/ }
|
||||
# numeric_ids = numeric_components.map(&:id).map(&:to_i)
|
||||
|
||||
# symbolic_components.each do |component|
|
||||
# id = component.id.to_s
|
||||
# raise("duplicate symbolic id in #{component.clazz.name} on id=#{id}") if resolved_names.has_key?(id)
|
||||
# # next_numeric_id += 1 while numeric_ids.include?(next_numeric_id)
|
||||
# # numeric_ids << next_numeric_id
|
||||
# # resolved_names[id] = next_numeric_id
|
||||
# resolved_names[id] = true
|
||||
# end
|
||||
|
||||
# # reassign numeric ids to all components
|
||||
# components.each do |component|
|
||||
# unless component.id.is_a?(Fixnum)
|
||||
# component.id = resolved_names[component.id.to_s] || raise("cannot resolve #{component.clazz.name} id=#{component.id.to_s}")
|
||||
# end
|
||||
# if component.respond_to?(:sources)
|
||||
# component.sources.map! do |source_id, grouping|
|
||||
# id = source_id.is_a?(Fixnum) ? source_id : resolved_names[source_id.to_s] || raise("cannot resolve #{component.clazz.name} source id=#{source_id.to_s}")
|
||||
# [id, grouping]
|
||||
# end
|
||||
# end
|
||||
# end
|
||||
end
|
||||
|
||||
def self.spouts
|
||||
|
|
|
@ -219,8 +219,8 @@ describe RedStorm::SimpleTopology do
|
|||
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1").and_return(jruby_spout1)
|
||||
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2").and_return(jruby_spout2)
|
||||
|
||||
builder.should_receive("setSpout").with(1, jruby_spout1, 1)
|
||||
builder.should_receive("setSpout").with(2, jruby_spout2, 1)
|
||||
builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1)
|
||||
builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1)
|
||||
configurator.should_receive(:config).and_return("config")
|
||||
builder.should_receive(:createTopology).and_return("topology")
|
||||
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology")
|
||||
|
@ -265,8 +265,8 @@ describe RedStorm::SimpleTopology do
|
|||
bolt_definition2.should_receive(:parallelism).and_return(3)
|
||||
bolt_definition1.should_receive(:id).any_number_of_times.and_return("id1")
|
||||
bolt_definition2.should_receive(:id).any_number_of_times.and_return("id2")
|
||||
bolt_definition1.should_receive(:id=).with(1)
|
||||
bolt_definition2.should_receive(:id=).with(2)
|
||||
# bolt_definition1.should_receive(:id=).with('1')
|
||||
# bolt_definition2.should_receive(:id=).with('2')
|
||||
|
||||
configurator.should_receive(:config).and_return("config")
|
||||
builder.should_receive(:createTopology).and_return("topology")
|
||||
|
@ -282,11 +282,14 @@ describe RedStorm::SimpleTopology do
|
|||
builder = mock(RedStorm::TopologyBuilder)
|
||||
configurator = mock(RedStorm::SimpleTopology::Configurator)
|
||||
jruby_bolt = mock(RedStorm::JRubyBolt)
|
||||
jruby_spout = mock(RedStorm::JRubySpout)
|
||||
@declarer = mock("InputDeclarer")
|
||||
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
|
||||
RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator)
|
||||
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1").and_return(jruby_bolt)
|
||||
builder.should_receive("setBolt").with(1, jruby_bolt, 1).and_return(@declarer)
|
||||
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1").and_return(jruby_spout)
|
||||
builder.should_receive("setBolt").with('bolt_class1', jruby_bolt, 1).and_return(@declarer)
|
||||
builder.should_receive("setSpout").with('1', jruby_spout, 1).and_return(@declarer)
|
||||
configurator.should_receive(:config).and_return("config")
|
||||
builder.should_receive(:createTopology).and_return("topology")
|
||||
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology")
|
||||
|
@ -294,68 +297,74 @@ describe RedStorm::SimpleTopology do
|
|||
|
||||
it "should support fields" do
|
||||
class Topology1 < RedStorm::SimpleTopology
|
||||
spout SpoutClass1, :id => 1
|
||||
bolt BoltClass1 do
|
||||
source 1, :fields => "f1"
|
||||
end
|
||||
end
|
||||
|
||||
RedStorm::Fields.should_receive(:new).with("f1").and_return("fields")
|
||||
@declarer.should_receive("fieldsGrouping").with(1, "fields")
|
||||
@declarer.should_receive("fieldsGrouping").with('1', "fields")
|
||||
Topology1.new.start("base_path", :cluster)
|
||||
end
|
||||
|
||||
it "should support shuffle" do
|
||||
class Topology1 < RedStorm::SimpleTopology
|
||||
spout SpoutClass1, :id => 1
|
||||
bolt BoltClass1 do
|
||||
source 1, :shuffle
|
||||
end
|
||||
end
|
||||
|
||||
@declarer.should_receive("shuffleGrouping").with(1)
|
||||
@declarer.should_receive("shuffleGrouping").with('1')
|
||||
Topology1.new.start("base_path", :cluster)
|
||||
end
|
||||
|
||||
it "should support none" do
|
||||
class Topology1 < RedStorm::SimpleTopology
|
||||
spout SpoutClass1, :id => 1
|
||||
bolt BoltClass1 do
|
||||
source 1, :none
|
||||
end
|
||||
end
|
||||
|
||||
@declarer.should_receive("noneGrouping").with(1)
|
||||
@declarer.should_receive("noneGrouping").with('1')
|
||||
Topology1.new.start("base_path", :cluster)
|
||||
end
|
||||
|
||||
it "should support global" do
|
||||
class Topology1 < RedStorm::SimpleTopology
|
||||
spout SpoutClass1, :id => 1
|
||||
bolt BoltClass1 do
|
||||
source 1, :global
|
||||
end
|
||||
end
|
||||
|
||||
@declarer.should_receive("globalGrouping").with(1)
|
||||
@declarer.should_receive("globalGrouping").with('1')
|
||||
Topology1.new.start("base_path", :cluster)
|
||||
end
|
||||
|
||||
it "should support all" do
|
||||
class Topology1 < RedStorm::SimpleTopology
|
||||
spout SpoutClass1, :id => 1
|
||||
bolt BoltClass1 do
|
||||
source 1, :all
|
||||
end
|
||||
end
|
||||
|
||||
@declarer.should_receive("allGrouping").with(1)
|
||||
@declarer.should_receive("allGrouping").with('1')
|
||||
Topology1.new.start("base_path", :cluster)
|
||||
end
|
||||
|
||||
it "should support direct" do
|
||||
class Topology1 < RedStorm::SimpleTopology
|
||||
spout SpoutClass1, :id => 1
|
||||
bolt BoltClass1 do
|
||||
source 1, :direct
|
||||
end
|
||||
end
|
||||
|
||||
@declarer.should_receive("directGrouping").with(1)
|
||||
@declarer.should_receive("directGrouping").with('1')
|
||||
Topology1.new.start("base_path", :cluster)
|
||||
end
|
||||
end
|
||||
|
@ -401,7 +410,7 @@ describe RedStorm::SimpleTopology do
|
|||
topology.cluster.should == cluster
|
||||
end
|
||||
|
||||
it "should keep numeric ids" do
|
||||
it "should support explicit numeric ids" do
|
||||
class Topology1 < RedStorm::SimpleTopology
|
||||
spout SpoutClass1, :id => 1
|
||||
|
||||
|
@ -410,18 +419,12 @@ describe RedStorm::SimpleTopology do
|
|||
end
|
||||
end
|
||||
|
||||
Topology1.spouts.first.id.should == 1
|
||||
Topology1.bolts.first.id.should == 2
|
||||
Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
|
||||
|
||||
Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)
|
||||
|
||||
Topology1.spouts.first.id.should == 1
|
||||
Topology1.bolts.first.id.should == 2
|
||||
Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
|
||||
Topology1.spouts.first.id.should == '1'
|
||||
Topology1.bolts.first.id.should == '2'
|
||||
Topology1.bolts.first.sources.first.should == ['1', {:shuffle => nil}]
|
||||
end
|
||||
|
||||
it "should resolve explicit symbolic ids" do
|
||||
it "should support explicit string ids" do
|
||||
class Topology1 < RedStorm::SimpleTopology
|
||||
spout SpoutClass1, :id => "id1"
|
||||
|
||||
|
@ -433,15 +436,9 @@ describe RedStorm::SimpleTopology do
|
|||
Topology1.spouts.first.id.should == "id1"
|
||||
Topology1.bolts.first.id.should == "id2"
|
||||
Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}]
|
||||
|
||||
Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)
|
||||
|
||||
Topology1.spouts.first.id.should == 1
|
||||
Topology1.bolts.first.id.should == 2
|
||||
Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
|
||||
end
|
||||
|
||||
it "should resolve implicit string ids" do
|
||||
it "should support implicit string ids" do
|
||||
class Topology1 < RedStorm::SimpleTopology
|
||||
spout SpoutClass1
|
||||
|
||||
|
@ -453,15 +450,9 @@ describe RedStorm::SimpleTopology do
|
|||
Topology1.spouts.first.id.should == "spout_class1"
|
||||
Topology1.bolts.first.id.should == "bolt_class1"
|
||||
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}]
|
||||
|
||||
Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)
|
||||
|
||||
Topology1.spouts.first.id.should == 1
|
||||
Topology1.bolts.first.id.should == 2
|
||||
Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
|
||||
end
|
||||
|
||||
it "should resolve implicit symbol ids" do
|
||||
it "should support implicit symbol ids" do
|
||||
class Topology1 < RedStorm::SimpleTopology
|
||||
spout SpoutClass1
|
||||
|
||||
|
@ -472,16 +463,10 @@ describe RedStorm::SimpleTopology do
|
|||
|
||||
Topology1.spouts.first.id.should == "spout_class1"
|
||||
Topology1.bolts.first.id.should == "bolt_class1"
|
||||
Topology1.bolts.first.sources.first.should == [:spout_class1, {:shuffle => nil}]
|
||||
|
||||
Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)
|
||||
|
||||
Topology1.spouts.first.id.should == 1
|
||||
Topology1.bolts.first.id.should == 2
|
||||
Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
|
||||
Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}]
|
||||
end
|
||||
|
||||
it "should resolve implicit class ids" do
|
||||
it "should support implicit class ids" do
|
||||
class Topology1 < RedStorm::SimpleTopology
|
||||
spout SpoutClass1
|
||||
|
||||
|
@ -493,12 +478,6 @@ describe RedStorm::SimpleTopology do
|
|||
Topology1.spouts.first.id.should == "spout_class1"
|
||||
Topology1.bolts.first.id.should == "bolt_class1"
|
||||
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}]
|
||||
|
||||
Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)
|
||||
|
||||
Topology1.spouts.first.id.should == 1
|
||||
Topology1.bolts.first.id.should == 2
|
||||
Topology1.bolts.first.sources.first.should == [1, {:shuffle => nil}]
|
||||
end
|
||||
|
||||
it "should raise on unresolvable" do
|
||||
|
@ -526,7 +505,7 @@ describe RedStorm::SimpleTopology do
|
|||
Topology1.spouts.first.id.should == "spout_class1"
|
||||
Topology1.spouts.last.id.should == "spout_class1"
|
||||
|
||||
lambda {Topology1.resolve_ids!(Topology1.spouts)}.should raise_error RuntimeError, "duplicate symbolic id in SpoutClass1 on id=spout_class1"
|
||||
lambda {Topology1.resolve_ids!(Topology1.spouts)}.should raise_error RuntimeError, "duplicate id in SpoutClass1 on id=spout_class1"
|
||||
end
|
||||
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue