diff --git a/lib/red_storm/simple_topology.rb b/lib/red_storm/simple_topology.rb index e12a34b..b516cef 100644 --- a/lib/red_storm/simple_topology.rb +++ b/lib/red_storm/simple_topology.rb @@ -4,10 +4,11 @@ module RedStorm attr_reader :cluster # LocalCluster reference usable in on_submit block for example class BoltDefinition - attr_reader :bolt_class, :id, :parallelism + attr_reader :clazz, :parallelism + attr_accessor :id, :sources def initialize(bolt_class, id, parallelism) - @bolt_class = bolt_class + @clazz = bolt_class @id = id @parallelism = parallelism @sources = [] @@ -34,10 +35,11 @@ module RedStorm end class SpoutDefinition - attr_reader :spout_class, :id, :parallelism + attr_reader :clazz, :parallelism + attr_accessor :id def initialize(spout_class, id, parallelism) - @spout_class = spout_class + @clazz = spout_class @id = id @parallelism = parallelism end @@ -88,12 +90,14 @@ module RedStorm # topology proxy interface def start(base_class_path, env) + self.class.resolve_ids!(self.class.spouts + self.class.bolts) + builder = TopologyBuilder.new self.class.spouts.each do |spout| - builder.setSpout(spout.id, JRubySpout.new(base_class_path, spout.spout_class.name), spout.parallelism) + builder.setSpout(spout.id, JRubySpout.new(base_class_path, spout.clazz.name), spout.parallelism) end self.class.bolts.each do |bolt| - storm_bolt = builder.setBolt(bolt.id, JRubyBolt.new(base_class_path, bolt.bolt_class.name), bolt.parallelism) + storm_bolt = builder.setBolt(bolt.id, JRubyBolt.new(base_class_path, bolt.clazz.name), bolt.parallelism) bolt.define_grouping(storm_bolt) end @@ -115,6 +119,35 @@ module RedStorm private + def self.resolve_ids!(components) + next_id = 1 + resolved_names = {} + + numeric_ids, symbolic_ids = components.map(&:id).partition{|id| id.is_a?(Fixnum)} + + # map unused numeric ids to symbolic ids + symbolic_ids.map(&:to_s).uniq.each do |id| + unless resolved_names.has_key?(id) + next_id += 1 while numeric_ids.include?(next_id) + numeric_ids << next_id + resolved_names[id] = next_id + end + end + + # reassign numeric ids in all components + components.each do |component| + unless component.id.is_a?(Fixnum) + component.id = resolved_names[component.id] || raise("cannot resolve #{component.clazz.name} id=#{component.id.inspect}") + end + if component.respond_to?(:sources) + component.sources.map! do |source_id, grouping| + id = source_id.is_a?(Fixnum) ? source_id : resolved_names[source_id] || raise("cannot resolve #{component.clazz.name} source id=#{source_id.inspect}") + [id, grouping] + end + end + end + end + def self.spouts @spouts ||= [] end diff --git a/spec/red_storm/simple_topology_spec.rb b/spec/red_storm/simple_topology_spec.rb index 249c30c..0357314 100644 --- a/spec/red_storm/simple_topology_spec.rb +++ b/spec/red_storm/simple_topology_spec.rb @@ -163,7 +163,7 @@ describe RedStorm::SimpleTopology do it "should start in :local env" do class TopologyStart1 < RedStorm::SimpleTopology; end - + builder = mock(RedStorm::TopologyBuilder) RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) builder.should_receive(:createTopology).and_return("topology") @@ -209,8 +209,8 @@ describe RedStorm::SimpleTopology do RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1").and_return(jruby_spout1) RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2").and_return(jruby_spout2) - builder.should_receive("setSpout").with("spout_class1", jruby_spout1, 1) - builder.should_receive("setSpout").with("spout_class2", jruby_spout2, 1) + builder.should_receive("setSpout").with(1, jruby_spout1, 1) + builder.should_receive("setSpout").with(2, jruby_spout2, 1) configurator.should_receive(:config).and_return("config") builder.should_receive(:createTopology).and_return("topology") RedStorm::StormSubmitter.should_receive("submitTopology").with("topology_start4", "config", "topology") @@ -249,12 +249,14 @@ describe RedStorm::SimpleTopology do bolt_definition1.should_receive(:define_grouping).with("storm_bolt1") bolt_definition2.should_receive(:define_grouping).with("storm_bolt2") - bolt_definition1.should_receive(:bolt_class).and_return(BoltClass1) - bolt_definition2.should_receive(:bolt_class).and_return(BoltClass2) + bolt_definition1.should_receive(:clazz).and_return(BoltClass1) + bolt_definition2.should_receive(:clazz).and_return(BoltClass2) bolt_definition1.should_receive(:parallelism).and_return(2) bolt_definition2.should_receive(:parallelism).and_return(3) - bolt_definition1.should_receive(:id).and_return("id1") - bolt_definition2.should_receive(:id).and_return("id2") + bolt_definition1.should_receive(:id).any_number_of_times.and_return("id1") + bolt_definition2.should_receive(:id).any_number_of_times.and_return("id2") + bolt_definition1.should_receive(:id=).with(1) + bolt_definition2.should_receive(:id=).with(2) configurator.should_receive(:config).and_return("config") builder.should_receive(:createTopology).and_return("topology") @@ -285,7 +287,7 @@ describe RedStorm::SimpleTopology do end it "should provide local cluster reference" do - class TopologyStart7< RedStorm::SimpleTopology; end + class TopologyStart7 < RedStorm::SimpleTopology; end builder = mock(RedStorm::TopologyBuilder) RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) @@ -304,5 +306,81 @@ describe RedStorm::SimpleTopology do topology.cluster.should == cluster end + it "should keep numeric ids" do + class TopologyNumericIds1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => 1 + + bolt BoltClass1, :id => 2 do + source 1, :shuffle + end + end + + TopologyNumericIds1.spouts.first.id.should == 1 + TopologyNumericIds1.bolts.first.id.should == 2 + TopologyNumericIds1.bolts.first.sources.first.should == [1, {:shuffle => nil}] + + TopologyNumericIds1.resolve_ids!(TopologyNumericIds1.spouts + TopologyNumericIds1.bolts) + + TopologyNumericIds1.spouts.first.id.should == 1 + TopologyNumericIds1.bolts.first.id.should == 2 + TopologyNumericIds1.bolts.first.sources.first.should == [1, {:shuffle => nil}] + end + + it "should resolve explicit symbolic ids" do + class TopologySymbolicIds1 < RedStorm::SimpleTopology + spout SpoutClass1, :id => "id1" + + bolt BoltClass1, :id => "id2" do + source "id1", :shuffle + end + end + + TopologySymbolicIds1.spouts.first.id.should == "id1" + TopologySymbolicIds1.bolts.first.id.should == "id2" + TopologySymbolicIds1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}] + + TopologySymbolicIds1.resolve_ids!(TopologySymbolicIds1.spouts + TopologySymbolicIds1.bolts) + + TopologySymbolicIds1.spouts.first.id.should == 1 + TopologySymbolicIds1.bolts.first.id.should == 2 + TopologySymbolicIds1.bolts.first.sources.first.should == [1, {:shuffle => nil}] + end + + it "should resolve implicit symbolic ids" do + class TopologySymbolicIds2 < RedStorm::SimpleTopology + spout SpoutClass1 + + bolt BoltClass1 do + source "spout_class1", :shuffle + end + end + + TopologySymbolicIds2.spouts.first.id.should == "spout_class1" + TopologySymbolicIds2.bolts.first.id.should == "bolt_class1" + TopologySymbolicIds2.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}] + + TopologySymbolicIds2.resolve_ids!(TopologySymbolicIds2.spouts + TopologySymbolicIds2.bolts) + + TopologySymbolicIds2.spouts.first.id.should == 1 + TopologySymbolicIds2.bolts.first.id.should == 2 + TopologySymbolicIds2.bolts.first.sources.first.should == [1, {:shuffle => nil}] + end + + it "should raise on unresolvable" do + class TopologySymbolicIds3 < RedStorm::SimpleTopology + spout SpoutClass1 + + bolt BoltClass1 do + source "dummy", :shuffle + end + end + + TopologySymbolicIds3.spouts.first.id.should == "spout_class1" + TopologySymbolicIds3.bolts.first.id.should == "bolt_class1" + TopologySymbolicIds3.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}] + + lambda {TopologySymbolicIds3.resolve_ids!(TopologySymbolicIds3.spouts + TopologySymbolicIds3.bolts)}.should raise_error RuntimeError, "cannot resolve BoltClass1 source id=\"dummy\"" + end + end end \ No newline at end of file