diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb index 923a323..121bca2 100644 --- a/lib/red_storm/dsl/topology.rb +++ b/lib/red_storm/dsl/topology.rb @@ -3,6 +3,7 @@ require 'red_storm/configuration' require 'red_storm/configurator' java_import 'backtype.storm.topology.TopologyBuilder' +java_import 'backtype.storm.generated.SubmitOptions' module RedStorm module DSL @@ -135,6 +136,10 @@ module RedStorm @configure_block = configure_block if block_given? end + def self.submit_options(&submit_options_block) + @submit_options_block = submit_options_block if block_given? + end + def self.on_submit(method_name = nil, &submit_block) @submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)} end @@ -165,7 +170,16 @@ module RedStorm configurator.instance_exec(env, &self.class.configure_block) submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter - submitter.submitTopology(self.class.topology_name, configurator.config, topology) + + if self.class.submit_options_block && env != :local + submit_options = SubmitOptions.new + submit_options.instance_exec(env, &self.class.submit_options_block) + + submitter.submitTopology(self.class.topology_name, configurator.config, topology, submit_options) + else + submitter.submitTopology(self.class.topology_name, configurator.config, topology) + end + instance_exec(env, &self.class.submit_block) end @@ -211,6 +225,10 @@ module RedStorm @configure_block ||= lambda{|env|} end + def self.submit_options_block + @submit_options_block + end + def self.submit_block @submit_block ||= lambda{|env|} end