From c76fa6eef172d516083cc0148c4c7c425e586776 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Thu, 20 Jun 2013 00:50:03 -0400 Subject: [PATCH] refactor and expose topology builder --- lib/red_storm/dsl/bolt.rb | 19 +++++++++++++++ lib/red_storm/dsl/spout.rb | 18 +++++++++++++++ lib/red_storm/dsl/topology.rb | 32 +++++++++++++------------ lib/red_storm/topology_launcher.rb | 4 ++-- spec/red_storm/dsl/topology_spec.rb | 36 ++++++++++++++++------------- 5 files changed, 76 insertions(+), 33 deletions(-) diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb index c632c01..9957595 100644 --- a/lib/red_storm/dsl/bolt.rb +++ b/lib/red_storm/dsl/bolt.rb @@ -1,9 +1,13 @@ require 'java' require 'red_storm/configurator' +require 'red_storm/environment' +require 'pathname' module RedStorm module DSL + class BoltError < StandardError; end + class Bolt attr_reader :collector, :context, :config @@ -132,6 +136,21 @@ module RedStorm def self.anchor? !!self.receive_options[:anchor] end + + # below non-dry see Spout class + def self.inherited(subclass) + path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(BoltError, "unable to extract base topology class path from #{caller.first.inspect}") + subclass.base_class_path = Pathname.new(path).relative_path_from(Pathname.new(RedStorm::BASE_PATH)).to_s + end + + def self.base_class_path=(path) + @base_class_path = path + end + + def self.base_class_path + @base_class_path + end + end end diff --git a/lib/red_storm/dsl/spout.rb b/lib/red_storm/dsl/spout.rb index f4b4228..545755a 100644 --- a/lib/red_storm/dsl/spout.rb +++ b/lib/red_storm/dsl/spout.rb @@ -1,9 +1,13 @@ require 'java' require 'red_storm/configurator' +require 'red_storm/environment' +require 'pathname' module RedStorm module DSL + class SpoutError < StandardError; end + class Spout attr_reader :config, :context, :collector @@ -181,6 +185,20 @@ module RedStorm def self.reliable? !!self.send_options[:reliable] end + + # below non-dry see Bolt class + def self.inherited(subclass) + path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(SpoutError, "unable to extract base topology class path from #{caller.first.inspect}") + subclass.base_class_path = Pathname.new(path).relative_path_from(Pathname.new(RedStorm::BASE_PATH)).to_s + end + + def self.base_class_path=(path) + @base_class_path = path + end + + def self.base_class_path + @base_class_path + end end end diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index d3d0ab5..d34562e 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -2,6 +2,7 @@ require 'java' require 'red_storm/configuration' require 'red_storm/configurator' +java_import 'backtype.storm.topology.TopologyBuilder' module RedStorm module DSL @@ -39,15 +40,14 @@ module RedStorm class SpoutDefinition < ComponentDefinition # WARNING non-dry see BoltDefinition#new_instance - def new_instance(base_class_path) + def new_instance if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout" @clazz.new(constructor_args, @output_fields) elsif is_java? @clazz.new(*constructor_args) else - JRubySpout.new(base_class_path, @clazz.name, @output_fields) + JRubySpout.new(@clazz.base_class_path, @clazz.name, @output_fields) end - # is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name) end end @@ -88,16 +88,15 @@ module RedStorm end end - def new_instance(base_class_path) + def new_instance # WARNING non-dry see BoltDefinition#new_instance if @clazz.name == "Java::RedstormStormJruby::JRubyShellBolt" @clazz.new(constructor_args, @output_fields) elsif is_java? @clazz.new(*constructor_args) else - JRubyBolt.new(base_class_path, @clazz.name, @output_fields) + JRubyBolt.new(@clazz.base_class_path, @clazz.name, @output_fields) end - # is_java? ? @clazz.new : @clazz.is_a?(Bolt) ? JRubyBolt.new(base_class_path, @clazz.name) : @clazz.new end end @@ -140,21 +139,24 @@ module RedStorm @submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)} end - # topology proxy interface - - def start(base_class_path, env) - self.class.resolve_ids!(self.class.components) + def self.build_topology + resolve_ids!(components) builder = TopologyBuilder.new - self.class.spouts.each do |spout| - declarer = builder.setSpout(spout.id, spout.new_instance(base_class_path), spout.parallelism.to_java) + spouts.each do |spout| + declarer = builder.setSpout(spout.id, spout.new_instance, spout.parallelism.to_java) declarer.addConfigurations(spout.config) end - self.class.bolts.each do |bolt| - declarer = builder.setBolt(bolt.id, bolt.new_instance(base_class_path), bolt.parallelism.to_java) + bolts.each do |bolt| + declarer = builder.setBolt(bolt.id, bolt.new_instance, bolt.parallelism.to_java) declarer.addConfigurations(bolt.config) bolt.define_grouping(declarer) end + builder.createTopology + end + + def start(env) + topology = self.class.build_topology # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"} @@ -163,7 +165,7 @@ module RedStorm configurator.instance_exec(env, &self.class.configure_block) submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter - submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology) + submitter.submitTopology(self.class.topology_name, configurator.config, topology) instance_exec(env, &self.class.submit_block) end diff --git a/lib/red_storm/topology_launcher.rb b/lib/red_storm/topology_launcher.rb index 7c4ca5d..62870e4 100644 --- a/lib/red_storm/topology_launcher.rb +++ b/lib/red_storm/topology_launcher.rb @@ -1,7 +1,7 @@ require 'java' # This hack get rif of the "Use RbConfig instead of obsolete and deprecated Config" -# deprecation warning that is triggered by "java_import 'backtype.storm.Config'". +# deprecation warning that is triggered by "java_import 'backtype.storm.Config'". Object.send :remove_const, :Config Config = RbConfig @@ -59,7 +59,7 @@ class TopologyLauncher topology_name = RedStorm::Configuration.topology_class.respond_to?(:topology_name) ? "/#{RedStorm::Configuration.topology_class.topology_name}" : '' puts("RedStorm v#{RedStorm::VERSION} starting topology #{RedStorm::Configuration.topology_class.name}#{topology_name} in #{env.to_s} environment") - RedStorm::Configuration.topology_class.new.start(class_path, env) + RedStorm::Configuration.topology_class.new.start(env) end private diff --git a/spec/red_storm/dsl/topology_spec.rb b/spec/red_storm/dsl/topology_spec.rb index 7694adc..2d6b5b9 100644 --- a/spec/red_storm/dsl/topology_spec.rb +++ b/spec/red_storm/dsl/topology_spec.rb @@ -25,6 +25,10 @@ describe RedStorm::SimpleTopology do class SpoutClass2; end class BoltClass1; end class BoltClass2; end + SpoutClass1.should_receive(:base_class_path).at_least(0).times.and_return("base_path") + SpoutClass2.should_receive(:base_class_path).at_least(0).times.and_return("base_path") + BoltClass1.should_receive(:base_class_path).at_least(0).times.and_return("base_path") + BoltClass2.should_receive(:base_class_path).at_least(0).times.and_return("base_path") end it "should set default topology name" do @@ -265,7 +269,7 @@ describe RedStorm::SimpleTopology do cluster = mock(RedStorm::LocalCluster) RedStorm::LocalCluster.should_receive(:new).and_return(cluster) cluster.should_receive(:submitTopology).with("topology1", "config", "topology") - Topology1.new.start("base_path", :local) + Topology1.new.start(:local) end it "should start in :cluster env" do @@ -277,7 +281,7 @@ describe RedStorm::SimpleTopology do RedStorm::Configurator.should_receive(:new).and_return(configurator) configurator.should_receive(:config).and_return("config") RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology") - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end it "should raise for invalid env" do @@ -308,7 +312,7 @@ describe RedStorm::SimpleTopology do configurator.should_receive(:config).and_return("config") builder.should_receive(:createTopology).and_return("topology") RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology") - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end it "should build bolts" do @@ -352,7 +356,7 @@ describe RedStorm::SimpleTopology do builder.should_receive(:createTopology).and_return("topology") RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology") - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end @@ -387,7 +391,7 @@ describe RedStorm::SimpleTopology do RedStorm::Fields.should_receive(:new).with("f1").and_return("fields") @declarer.should_receive("fieldsGrouping").with('1', "fields") - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end it "should support single symbolic fields" do @@ -400,7 +404,7 @@ describe RedStorm::SimpleTopology do RedStorm::Fields.should_receive(:new).with("s1").and_return("fields") @declarer.should_receive("fieldsGrouping").with('1', "fields") - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end it "should support string array fields" do @@ -413,7 +417,7 @@ describe RedStorm::SimpleTopology do RedStorm::Fields.should_receive(:new).with("f1", "f2").and_return("fields") @declarer.should_receive("fieldsGrouping").with('1', "fields") - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end it "should support symbolic array fields" do @@ -426,7 +430,7 @@ describe RedStorm::SimpleTopology do RedStorm::Fields.should_receive(:new).with("s1", "s2").and_return("fields") @declarer.should_receive("fieldsGrouping").with('1', "fields") - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end it "should support shuffle" do @@ -438,7 +442,7 @@ describe RedStorm::SimpleTopology do end @declarer.should_receive("shuffleGrouping").with('1') - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end it "should support local_or_shuffle" do @@ -450,7 +454,7 @@ describe RedStorm::SimpleTopology do end @declarer.should_receive("localOrShuffleGrouping").with('1') - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end it "should support none" do @@ -462,7 +466,7 @@ describe RedStorm::SimpleTopology do end @declarer.should_receive("noneGrouping").with('1') - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end it "should support global" do @@ -474,7 +478,7 @@ describe RedStorm::SimpleTopology do end @declarer.should_receive("globalGrouping").with('1') - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end it "should support all" do @@ -486,7 +490,7 @@ describe RedStorm::SimpleTopology do end @declarer.should_receive("allGrouping").with('1') - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end it "should support direct" do @@ -498,7 +502,7 @@ describe RedStorm::SimpleTopology do end @declarer.should_receive("directGrouping").with('1') - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end end @@ -521,7 +525,7 @@ describe RedStorm::SimpleTopology do builder.should_receive(:createTopology).and_return("topology") RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", config, "topology") - Topology1.new.start("base_path", :cluster) + Topology1.new.start(:cluster) end it "should provide local cluster reference" do @@ -539,7 +543,7 @@ describe RedStorm::SimpleTopology do cluster.should_receive(:submitTopology).with("topology1", "config", "topology").and_return("cluster") topology = Topology1.new - topology.start("base_path", :local) + topology.start(:local) topology.cluster.should == cluster end