refactor and expose topology builder

This commit is contained in:
Colin Surprenant 2013-06-20 00:50:03 -04:00
parent 28578a5369
commit c76fa6eef1
5 changed files with 76 additions and 33 deletions

View File

@ -1,9 +1,13 @@
require 'java'
require 'red_storm/configurator'
require 'red_storm/environment'
require 'pathname'
module RedStorm
module DSL
class BoltError < StandardError; end
class Bolt
attr_reader :collector, :context, :config
@ -132,6 +136,21 @@ module RedStorm
def self.anchor?
!!self.receive_options[:anchor]
end
# below non-dry see Spout class
def self.inherited(subclass)
path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(BoltError, "unable to extract base topology class path from #{caller.first.inspect}")
subclass.base_class_path = Pathname.new(path).relative_path_from(Pathname.new(RedStorm::BASE_PATH)).to_s
end
def self.base_class_path=(path)
@base_class_path = path
end
def self.base_class_path
@base_class_path
end
end
end

View File

@ -1,9 +1,13 @@
require 'java'
require 'red_storm/configurator'
require 'red_storm/environment'
require 'pathname'
module RedStorm
module DSL
class SpoutError < StandardError; end
class Spout
attr_reader :config, :context, :collector
@ -181,6 +185,20 @@ module RedStorm
def self.reliable?
!!self.send_options[:reliable]
end
# below non-dry see Bolt class
def self.inherited(subclass)
path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(SpoutError, "unable to extract base topology class path from #{caller.first.inspect}")
subclass.base_class_path = Pathname.new(path).relative_path_from(Pathname.new(RedStorm::BASE_PATH)).to_s
end
def self.base_class_path=(path)
@base_class_path = path
end
def self.base_class_path
@base_class_path
end
end
end

View File

@ -2,6 +2,7 @@ require 'java'
require 'red_storm/configuration'
require 'red_storm/configurator'
java_import 'backtype.storm.topology.TopologyBuilder'
module RedStorm
module DSL
@ -39,15 +40,14 @@ module RedStorm
class SpoutDefinition < ComponentDefinition
# WARNING non-dry see BoltDefinition#new_instance
def new_instance(base_class_path)
def new_instance
if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout"
@clazz.new(constructor_args, @output_fields)
elsif is_java?
@clazz.new(*constructor_args)
else
JRubySpout.new(base_class_path, @clazz.name, @output_fields)
JRubySpout.new(@clazz.base_class_path, @clazz.name, @output_fields)
end
# is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name)
end
end
@ -88,16 +88,15 @@ module RedStorm
end
end
def new_instance(base_class_path)
def new_instance
# WARNING non-dry see BoltDefinition#new_instance
if @clazz.name == "Java::RedstormStormJruby::JRubyShellBolt"
@clazz.new(constructor_args, @output_fields)
elsif is_java?
@clazz.new(*constructor_args)
else
JRubyBolt.new(base_class_path, @clazz.name, @output_fields)
JRubyBolt.new(@clazz.base_class_path, @clazz.name, @output_fields)
end
# is_java? ? @clazz.new : @clazz.is_a?(Bolt) ? JRubyBolt.new(base_class_path, @clazz.name) : @clazz.new
end
end
@ -140,21 +139,24 @@ module RedStorm
@submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)}
end
# topology proxy interface
def start(base_class_path, env)
self.class.resolve_ids!(self.class.components)
def self.build_topology
resolve_ids!(components)
builder = TopologyBuilder.new
self.class.spouts.each do |spout|
declarer = builder.setSpout(spout.id, spout.new_instance(base_class_path), spout.parallelism.to_java)
spouts.each do |spout|
declarer = builder.setSpout(spout.id, spout.new_instance, spout.parallelism.to_java)
declarer.addConfigurations(spout.config)
end
self.class.bolts.each do |bolt|
declarer = builder.setBolt(bolt.id, bolt.new_instance(base_class_path), bolt.parallelism.to_java)
bolts.each do |bolt|
declarer = builder.setBolt(bolt.id, bolt.new_instance, bolt.parallelism.to_java)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end
builder.createTopology
end
def start(env)
topology = self.class.build_topology
# set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}
@ -163,7 +165,7 @@ module RedStorm
configurator.instance_exec(env, &self.class.configure_block)
submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology)
submitter.submitTopology(self.class.topology_name, configurator.config, topology)
instance_exec(env, &self.class.submit_block)
end

View File

@ -1,7 +1,7 @@
require 'java'
# This hack get rif of the "Use RbConfig instead of obsolete and deprecated Config"
# deprecation warning that is triggered by "java_import 'backtype.storm.Config'".
# deprecation warning that is triggered by "java_import 'backtype.storm.Config'".
Object.send :remove_const, :Config
Config = RbConfig
@ -59,7 +59,7 @@ class TopologyLauncher
topology_name = RedStorm::Configuration.topology_class.respond_to?(:topology_name) ? "/#{RedStorm::Configuration.topology_class.topology_name}" : ''
puts("RedStorm v#{RedStorm::VERSION} starting topology #{RedStorm::Configuration.topology_class.name}#{topology_name} in #{env.to_s} environment")
RedStorm::Configuration.topology_class.new.start(class_path, env)
RedStorm::Configuration.topology_class.new.start(env)
end
private

View File

@ -25,6 +25,10 @@ describe RedStorm::SimpleTopology do
class SpoutClass2; end
class BoltClass1; end
class BoltClass2; end
SpoutClass1.should_receive(:base_class_path).at_least(0).times.and_return("base_path")
SpoutClass2.should_receive(:base_class_path).at_least(0).times.and_return("base_path")
BoltClass1.should_receive(:base_class_path).at_least(0).times.and_return("base_path")
BoltClass2.should_receive(:base_class_path).at_least(0).times.and_return("base_path")
end
it "should set default topology name" do
@ -265,7 +269,7 @@ describe RedStorm::SimpleTopology do
cluster = mock(RedStorm::LocalCluster)
RedStorm::LocalCluster.should_receive(:new).and_return(cluster)
cluster.should_receive(:submitTopology).with("topology1", "config", "topology")
Topology1.new.start("base_path", :local)
Topology1.new.start(:local)
end
it "should start in :cluster env" do
@ -277,7 +281,7 @@ describe RedStorm::SimpleTopology do
RedStorm::Configurator.should_receive(:new).and_return(configurator)
configurator.should_receive(:config).and_return("config")
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology")
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
it "should raise for invalid env" do
@ -308,7 +312,7 @@ describe RedStorm::SimpleTopology do
configurator.should_receive(:config).and_return("config")
builder.should_receive(:createTopology).and_return("topology")
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology")
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
it "should build bolts" do
@ -352,7 +356,7 @@ describe RedStorm::SimpleTopology do
builder.should_receive(:createTopology).and_return("topology")
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology")
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
@ -387,7 +391,7 @@ describe RedStorm::SimpleTopology do
RedStorm::Fields.should_receive(:new).with("f1").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
it "should support single symbolic fields" do
@ -400,7 +404,7 @@ describe RedStorm::SimpleTopology do
RedStorm::Fields.should_receive(:new).with("s1").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
it "should support string array fields" do
@ -413,7 +417,7 @@ describe RedStorm::SimpleTopology do
RedStorm::Fields.should_receive(:new).with("f1", "f2").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
it "should support symbolic array fields" do
@ -426,7 +430,7 @@ describe RedStorm::SimpleTopology do
RedStorm::Fields.should_receive(:new).with("s1", "s2").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
it "should support shuffle" do
@ -438,7 +442,7 @@ describe RedStorm::SimpleTopology do
end
@declarer.should_receive("shuffleGrouping").with('1')
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
it "should support local_or_shuffle" do
@ -450,7 +454,7 @@ describe RedStorm::SimpleTopology do
end
@declarer.should_receive("localOrShuffleGrouping").with('1')
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
it "should support none" do
@ -462,7 +466,7 @@ describe RedStorm::SimpleTopology do
end
@declarer.should_receive("noneGrouping").with('1')
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
it "should support global" do
@ -474,7 +478,7 @@ describe RedStorm::SimpleTopology do
end
@declarer.should_receive("globalGrouping").with('1')
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
it "should support all" do
@ -486,7 +490,7 @@ describe RedStorm::SimpleTopology do
end
@declarer.should_receive("allGrouping").with('1')
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
it "should support direct" do
@ -498,7 +502,7 @@ describe RedStorm::SimpleTopology do
end
@declarer.should_receive("directGrouping").with('1')
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
end
@ -521,7 +525,7 @@ describe RedStorm::SimpleTopology do
builder.should_receive(:createTopology).and_return("topology")
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", config, "topology")
Topology1.new.start("base_path", :cluster)
Topology1.new.start(:cluster)
end
it "should provide local cluster reference" do
@ -539,7 +543,7 @@ describe RedStorm::SimpleTopology do
cluster.should_receive(:submitTopology).with("topology1", "config", "topology").and_return("cluster")
topology = Topology1.new
topology.start("base_path", :local)
topology.start(:local)
topology.cluster.should == cluster
end