From 4d5c3b07143e33b0225e69a0baf82e75eb34c3b0 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Fri, 11 Nov 2011 18:32:02 -0500 Subject: [PATCH] work bolt & spout dsl and added topology dsl --- Rakefile | 7 +- examples/simple/exclamation_bolt.rb | 2 +- .../simple/local_exclamation_topology2.rb | 2 +- .../simple/local_redis_word_count_topology.rb | 2 +- examples/simple/local_word_count_topology.rb | 32 ++-- examples/simple/random_sentence_spout.rb | 2 +- examples/simple/split_sentence_bolt.rb | 2 +- examples/simple/word_count_bolt.rb | 4 +- lib/red_storm.rb | 1 + lib/red_storm/application.rb | 24 ++- lib/red_storm/simple_bolt.rb | 40 +++-- lib/red_storm/simple_spout.rb | 34 ++-- lib/red_storm/simple_topology.rb | 145 ++++++++++++++++++ lib/red_storm/topology_launcher.rb | 11 +- 14 files changed, 249 insertions(+), 59 deletions(-) create mode 100644 lib/red_storm/simple_topology.rb diff --git a/Rakefile b/Rakefile index 19c283d..d56f7cd 100644 --- a/Rakefile +++ b/Rakefile @@ -25,10 +25,11 @@ DST_EXAMPLES = "#{CWD}/examples" task :default => [:clean, :build] -task :launch, :class_file do |t, args| +task :launch, :env, :class_file do |t, args| gem_home = ENV["GEM_HOME"].to_s.empty? ? " -Djruby.gem.home=`gem env home`" : "" - puts("launching java -cp \"#{TARGET_CLASSES_DIR}:#{TARGET_DEPENDENCY_DIR}/*\"#{gem_home} redstorm.TopologyLauncher #{args[:class_file]}") - system("java -cp \"#{TARGET_CLASSES_DIR}:#{TARGET_DEPENDENCY_DIR}/*\"#{gem_home} redstorm.TopologyLauncher #{args[:class_file]}") + command = "java -cp \"#{TARGET_CLASSES_DIR}:#{TARGET_DEPENDENCY_DIR}/*\"#{gem_home} redstorm.TopologyLauncher #{args[:env]} #{args[:class_file]}" + puts("launching #{command}") + system(command) end task :clean do diff --git a/examples/simple/exclamation_bolt.rb b/examples/simple/exclamation_bolt.rb index faacc7d..86dd616 100644 --- a/examples/simple/exclamation_bolt.rb +++ b/examples/simple/exclamation_bolt.rb @@ -1,4 +1,4 @@ class ExclamationBolt < RedStorm::SimpleBolt output_fields :word - on_tuple(:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"} + on_receive(:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"} end diff --git a/examples/simple/local_exclamation_topology2.rb b/examples/simple/local_exclamation_topology2.rb index e9b99ff..636e3c7 100644 --- a/examples/simple/local_exclamation_topology2.rb +++ b/examples/simple/local_exclamation_topology2.rb @@ -4,7 +4,7 @@ require 'red_storm' class ExclamationBolt2 < RedStorm::SimpleBolt output_fields :word - on_tuple(:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"} + on_receive(:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"} end # this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt diff --git a/examples/simple/local_redis_word_count_topology.rb b/examples/simple/local_redis_word_count_topology.rb index b5286f7..ceb1e41 100644 --- a/examples/simple/local_redis_word_count_topology.rb +++ b/examples/simple/local_redis_word_count_topology.rb @@ -10,7 +10,7 @@ require 'examples/simple/word_count_bolt' class RedisWordSpout output_fields :word - on_next_tuple {@q.pop if @q.size > 0} + on_send {@q.pop if @q.size > 0} on_init do @q = Queue.new diff --git a/examples/simple/local_word_count_topology.rb b/examples/simple/local_word_count_topology.rb index e6cd029..0f37955 100644 --- a/examples/simple/local_word_count_topology.rb +++ b/examples/simple/local_word_count_topology.rb @@ -3,20 +3,26 @@ require 'examples/simple/random_sentence_spout' require 'examples/simple/split_sentence_bolt' require 'examples/simple/word_count_bolt' -class LocalWordCountTopology - def start(base_class_path) - builder = TopologyBuilder.new - builder.setSpout(1, JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5) - builder.setBolt(2, JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 8).shuffleGrouping(1) - builder.setBolt(3, JRubyBolt.new(base_class_path, "WordCountBolt"), 12).fieldsGrouping(2, Fields.new("word")) +class LocalWordCountTopology < RedStorm::SimpleTopology + spout RandomSentenceSpout, :id => 1, :parallelism => 5 + + bolt SplitSentenceBolt, :id => 2, :parallelism => 8 do + source 1, :shuffle + end + + bolt WordCountBolt, :id => 3, :parallelism => 12 do + source 2, :fields => ["word"] + end - conf = Config.new - conf.setDebug(true) - conf.setMaxTaskParallelism(3) + configure :word_count do |env| + debug true + max_task_parallelism 3 + end - cluster = LocalCluster.new - cluster.submitTopology("word-count", conf, builder.createTopology) - sleep(5) - cluster.shutdown + on_submit do |env| + if env == :local + sleep(5) + cluster.shutdown + end end end \ No newline at end of file diff --git a/examples/simple/random_sentence_spout.rb b/examples/simple/random_sentence_spout.rb index 546d8ee..9602a1d 100644 --- a/examples/simple/random_sentence_spout.rb +++ b/examples/simple/random_sentence_spout.rb @@ -2,7 +2,7 @@ class RandomSentenceSpout < RedStorm::SimpleSpout set :is_distributed => true output_fields :word - on_next_tuple {@sentences[rand(@sentences.length)]} + on_send {@sentences[rand(@sentences.length)]} on_init do @sentences = [ diff --git a/examples/simple/split_sentence_bolt.rb b/examples/simple/split_sentence_bolt.rb index 65da6a6..8ae9a3d 100644 --- a/examples/simple/split_sentence_bolt.rb +++ b/examples/simple/split_sentence_bolt.rb @@ -1,4 +1,4 @@ class SplitSentenceBolt < RedStorm::SimpleBolt output_fields :word - on_tuple(:emit => false) {|tuple| tuple.getString(0).split(' ').each {|w| emit(w)}} + on_receive(:emit => false) {|tuple| tuple.getString(0).split(' ').each {|w| emit(w)}} end diff --git a/examples/simple/word_count_bolt.rb b/examples/simple/word_count_bolt.rb index 5d659c7..efc5ba3 100644 --- a/examples/simple/word_count_bolt.rb +++ b/examples/simple/word_count_bolt.rb @@ -3,7 +3,7 @@ class WordCountBolt < RedStorm::SimpleBolt on_init {@counts = Hash.new{|h, k| h[k] = 0}} - on_tuple do |tuple| + on_receive do |tuple| word = tuple.getString(0) @counts[word] += 1 @@ -16,7 +16,7 @@ end # class WordCountBolt < RedStorm::SimpleBolt # output_fields :word, :count # on_init {@counts = Hash.new{|h, k| h[k] = 0}} -# on_tuple :count_word +# on_receive :count_word # # def count_word(tuple) # word = tuple.getString(0) diff --git a/lib/red_storm.rb b/lib/red_storm.rb index d3fc287..c01eb48 100644 --- a/lib/red_storm.rb +++ b/lib/red_storm.rb @@ -6,3 +6,4 @@ require 'red_storm/version' require 'red_storm/application' require 'red_storm/simple_bolt' require 'red_storm/simple_spout' +require 'red_storm/simple_topology' diff --git a/lib/red_storm/application.rb b/lib/red_storm/application.rb index fa128fb..8fda20a 100644 --- a/lib/red_storm/application.rb +++ b/lib/red_storm/application.rb @@ -4,19 +4,27 @@ module RedStorm class Application + def usage + puts("Usage: redstorm install|examples|jar") + puts(" redstorm local|cluster topology_class_file_name\n") + exit(1) + end + def run(args) - if args.size == 1 && File.exist?(args.first) - load("#{RedStorm::REDSTORM_HOME}/Rakefile") - Rake::Task['launch'].invoke(args) - else - task = args.shift - if ["install", "examples", "jar"].include?(task) + if args.size > 0 + + if ["install", "examples", "jar"].include?(args[0]) + task = args.shift load("#{RedStorm::REDSTORM_HOME}/Rakefile") Rake::Task[task].invoke(args) + elsif args.size == 2 && ["local", "cluster"].include?(args[0]) && File.exist?(args[1]) + load("#{RedStorm::REDSTORM_HOME}/Rakefile") + Rake::Task['launch'].invoke(*args) else - puts("\nUsage: redstorm install|examples|jar|topology_class_file_name") - exit(1) + usage end + else + usage end end end diff --git a/lib/red_storm/simple_bolt.rb b/lib/red_storm/simple_bolt.rb index be7cdb8..b515140 100644 --- a/lib/red_storm/simple_bolt.rb +++ b/lib/red_storm/simple_bolt.rb @@ -2,24 +2,30 @@ module RedStorm class SimpleBolt - # DSL class mthods + # DSL class methods def self.output_fields(*fields) @fields = fields.map(&:to_s) end - def self.on_tuple(*args, &tuple_block) + def self.on_receive(*args, &receive_block) options = args.last.is_a?(Hash) ? args.pop : {} method_name = args.first - self.execute_options.merge!(options) - @tuple_block = block_given? ? tuple_block : lambda {|tuple| self.send(method_name, tuple)} + self.receive_options.merge!(options) + @receive_block = block_given? ? receive_block : lambda {|tuple| self.send(method_name, tuple)} end def self.on_init(method_name = nil, &init_block) @init_block = block_given? ? init_block : lambda {self.send(method_name)} end + def self.on_close(method_name = nil, &close_block) + @close_block = block_given? ? close_block : lambda {self.send(method_name)} + end + + # DSL instance methods + def emit(*values) @collector.emit(Values.new(*values)) end @@ -28,10 +34,10 @@ module RedStorm @collector.ack(tuple) end - # Bolt interface + # Bolt proxy interface def execute(tuple) - if (output = instance_exec(tuple, &self.class.tuple_block)) && self.class.emit? + if (output = instance_exec(tuple, &self.class.receive_block)) && self.class.emit? values = [output].flatten self.class.anchor? ? @collector.emit(tuple, Values.new(*values)) : emit(*values) @collector.ack(tuple) if self.class.ack? @@ -45,6 +51,10 @@ module RedStorm instance_exec(&self.class.init_block) end + def cleanup + instance_exec(&self.class.close_block) + end + def declare_output_fields(declarer) declarer.declare(Fields.new(self.class.fields)) end @@ -55,28 +65,32 @@ module RedStorm @fields end - def self.tuple_block - @tuple_block ||= lambda {} + def self.receive_block + @receive_block ||= lambda {} end def self.init_block @init_block ||= lambda {} end - def self.execute_options - @execute_options ||= {:emit => true, :ack => false, :anchor => false} + def self.close_block + @close_block ||= lambda {} + end + + def self.receive_options + @receive_options ||= {:emit => true, :ack => false, :anchor => false} end def self.emit? - !!self.execute_options[:emit] + !!self.receive_options[:emit] end def self.ack? - !!self.execute_options[:ack] + !!self.receive_options[:ack] end def self.anchor? - !!self.execute_options[:anchor] + !!self.receive_options[:anchor] end end end diff --git a/lib/red_storm/simple_spout.rb b/lib/red_storm/simple_spout.rb index b6b3d04..41fffc5 100644 --- a/lib/red_storm/simple_spout.rb +++ b/lib/red_storm/simple_spout.rb @@ -9,18 +9,22 @@ module RedStorm @fields = fields.map(&:to_s) end - def self.on_next_tuple(*args, &next_tuple_block) + def self.on_send(*args, &send_block) options = args.last.is_a?(Hash) ? args.pop : {} method_name = args.first - self.execute_options.merge!(options) - @next_tuple_block = block_given? ? next_tuple_block : lambda {self.send(method_name)} + self.send_options.merge!(options) + @send_block = block_given? ? send_block : lambda {self.send(method_name)} end def self.on_init(method_name = nil, &init_block) @init_block = block_given? ? init_block : lambda {self.send(method_name)} end + def self.on_close(method_name = nil, &close_block) + @close_block = block_given? ? close_block : lambda {self.send(method_name)} + end + def self.on_ack(method_name = nil, &ack_block) @ack_block = block_given? ? ack_block : lambda {|msg_id| self.send(method_name, msg_id)} end @@ -33,14 +37,16 @@ module RedStorm self.spout_options.merge!(options) end + # DSL instance methods + def emit(*values) @collector.emit(Values.new(*values)) end - # Spout interface + # Spout proxy interface def next_tuple - output = instance_exec(&self.class.next_tuple_block) + output = instance_exec(&self.class.send_block) if self.class.emit? if output values = [output].flatten @@ -58,6 +64,10 @@ module RedStorm instance_exec(&self.class.init_block) end + def close + instance_exec(&self.class.close_block) + end + def declare_output_fields(declarer) declarer.declare(Fields.new(self.class.fields)) end @@ -80,14 +90,18 @@ module RedStorm @fields end - def self.next_tuple_block - @next_tuple_block ||= lambda {} + def self.send_block + @send_block ||= lambda {} end def self.init_block @init_block ||= lambda {} end + def self.close_block + @close_block ||= lambda {} + end + def self.ack_block @ack_block ||= lambda {} end @@ -96,8 +110,8 @@ module RedStorm @fail_block ||= lambda {} end - def self.execute_options - @execute_options ||= {:emit => true} + def self.send_options + @send_options ||= {:emit => true} end def self.spout_options @@ -109,7 +123,7 @@ module RedStorm end def self.emit? - !!self.execute_options[:emit] + !!self.send_options[:emit] end end end diff --git a/lib/red_storm/simple_topology.rb b/lib/red_storm/simple_topology.rb new file mode 100644 index 0000000..3531356 --- /dev/null +++ b/lib/red_storm/simple_topology.rb @@ -0,0 +1,145 @@ +module RedStorm + + class SimpleTopology + + class BoltDefinition + attr_reader :bolt_class, :id, :parallelism + + def initialize(bolt_class, id, parallelism) + @bolt_class = bolt_class + @id = id + @parallelism = parallelism + @sources = [] + end + + def source(source_id, grouping) + @sources << [source_id, grouping.is_a?(Hash) ? grouping : {grouping => nil}] + end + + def define_grouping(storm_bolt) + @sources.each do |source_id, grouping| + grouper, params = grouping.first + + case grouper + when :shuffle + storm_bolt.shuffleGrouping(source_id) + when :fields + storm_bolt.fieldsGrouping(source_id, Fields.new(*params)) + else + raise("unknown grouper=#{grouper.inspect}") + end + end + end + end + + class SpoutDefinition + attr_reader :spout_class, :id, :parallelism + + def initialize(spout_class, id, parallelism) + @spout_class = spout_class + @id = id + @parallelism = parallelism + end + end + + class Configurator + attr_reader :config + + def initialize + @config = Config.new + end + + def method_missing(sym, *args) + config_method = "set#{self.class.camel_case(sym)}" + @config.send(config_method, *args) + end + + private + + def self.camel_case(s) + s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase } + end + end + + + def self.spout(spout_class, options = {}) + spout_options = {:id => self.underscore(spout_class), :parallelism => 1}.merge(options) + spout = SpoutDefinition.new(spout_class, spout_options[:id], spout_options[:parallelism]) + self.spouts << spout + end + + def self.bolt(bolt_class, options = {}, &bolt_block) + bolt_options = {:id => self.underscore(bolt_class), :parallelism => 1}.merge(options) + bolt = BoltDefinition.new(bolt_class, bolt_options[:id], bolt_options[:parallelism]) + bolt.instance_exec(&bolt_block) + self.bolts << bolt + end + + def self.configure(name = nil, &configure_block) + @topology_name = name if name + @configure_block = configure_block if block_given? + end + + def self.on_submit(method_name = nil, &submit_block) + @submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)} + end + + # topology proxy interface + + def start(base_class_path, env) + builder = TopologyBuilder.new + self.class.spouts.each do |spout| + builder.setSpout(spout.id, JRubySpout.new(base_class_path, spout.spout_class.name), spout.parallelism) + end + self.class.bolts.each do |bolt| + storm_bolt = builder.setBolt(bolt.id, JRubyBolt.new(base_class_path, bolt.bolt_class.name), bolt.parallelism) + bolt.define_grouping(storm_bolt) + end + + configurator = Configurator.new + configurator.instance_exec(env, &self.class.configure_block) + + case env + when :local + @cluster = LocalCluster.new + @cluster.submitTopology(self.class.topology_name, configurator.config, builder.createTopology) + when :cluster + StormSubmitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology); + else + raise("unsupported env=#{env.inspect}, expecting :local or :cluster") + end + + instance_exec(env, &self.class.submit_block) + end + + def cluster + @cluster + end + + private + + def self.spouts + @spouts ||= [] + end + + def self.bolts + @bolts ||= [] + end + + def self.topology_name + @topology_name ||= self.underscore(self.class.name) + end + + def self.configure_block + @configure_block ||= lamda{|env|} + end + + def self.submit_block + @submit_block ||= lamda{|env|} + end + + def self.underscore(camel_case) + camel_case.to_s.split('::').last.gsub(/(.)([A-Z])/,'\1_\2').downcase! + end + end +end \ No newline at end of file diff --git a/lib/red_storm/topology_launcher.rb b/lib/red_storm/topology_launcher.rb index 213cefa..f0e834c 100644 --- a/lib/red_storm/topology_launcher.rb +++ b/lib/red_storm/topology_launcher.rb @@ -29,15 +29,16 @@ class TopologyLauncher java_signature 'void main(String[])' def self.main(args) - unless args.size > 0 - puts("Usage: redstorm topology_class_file_name") + unless args.size > 1 + puts("Usage: redstorm local|cluster topology_class_file_name") exit(1) end - class_path = args[0] + env = args[0].to_sym + class_path = args[1] clazz = camel_case(class_path.split('/').last.split('.').first) - puts("RedStorm v#{RedStorm::VERSION} starting topology #{clazz}") + puts("RedStorm v#{RedStorm::VERSION} starting topology #{clazz} in #{env.to_s} environment") require class_path - Object.module_eval(clazz).new.start(class_path) + Object.module_eval(clazz).new.start(class_path, env) end private