From cac572d2f34a7ac309bd5e5680825ca53ac0e0bd Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Wed, 9 Nov 2011 17:40:54 -0500 Subject: [PATCH] adding simple bolt and spout and woring examples --- .../cluster_word_count_topology.rb | 0 examples/{ => native}/exclamation_bolt.rb | 0 .../local_exclamation_topology.rb | 0 .../local_exclamation_topology2.rb | 0 .../local_redis_word_count_topology.rb | 0 .../{ => native}/local_word_count_topology.rb | 0 .../{ => native}/random_sentence_spout.rb | 0 examples/{ => native}/split_sentence_bolt.rb | 0 examples/{ => native}/word_count_bolt.rb | 0 .../simple/cluster_word_count_topology.rb | 19 +++ examples/simple/exclamation_bolt.rb | 4 + examples/simple/local_exclamation_topology.rb | 25 ++++ .../simple/local_exclamation_topology2.rb | 29 +++++ .../simple/local_redis_word_count_topology.rb | 52 ++++++++ examples/simple/local_word_count_topology.rb | 22 ++++ examples/simple/random_sentence_spout.rb | 16 +++ examples/simple/split_sentence_bolt.rb | 4 + examples/simple/word_count_bolt.rb | 32 +++++ lib/red_storm.rb | 2 + lib/red_storm/application.rb | 25 ++-- lib/red_storm/simple_bolt.rb | 81 +++++++++++++ lib/red_storm/simple_spout.rb | 113 ++++++++++++++++++ 22 files changed, 413 insertions(+), 11 deletions(-) rename examples/{ => native}/cluster_word_count_topology.rb (100%) rename examples/{ => native}/exclamation_bolt.rb (100%) rename examples/{ => native}/local_exclamation_topology.rb (100%) rename examples/{ => native}/local_exclamation_topology2.rb (100%) rename examples/{ => native}/local_redis_word_count_topology.rb (100%) rename examples/{ => native}/local_word_count_topology.rb (100%) rename examples/{ => native}/random_sentence_spout.rb (100%) rename examples/{ => native}/split_sentence_bolt.rb (100%) rename examples/{ => native}/word_count_bolt.rb (100%) create mode 100644 examples/simple/cluster_word_count_topology.rb create mode 100644 examples/simple/exclamation_bolt.rb create mode 100644 examples/simple/local_exclamation_topology.rb create mode 100644 examples/simple/local_exclamation_topology2.rb create mode 100644 examples/simple/local_redis_word_count_topology.rb create mode 100644 examples/simple/local_word_count_topology.rb create mode 100644 examples/simple/random_sentence_spout.rb create mode 100644 examples/simple/split_sentence_bolt.rb create mode 100644 examples/simple/word_count_bolt.rb create mode 100644 lib/red_storm/simple_bolt.rb create mode 100644 lib/red_storm/simple_spout.rb diff --git a/examples/cluster_word_count_topology.rb b/examples/native/cluster_word_count_topology.rb similarity index 100% rename from examples/cluster_word_count_topology.rb rename to examples/native/cluster_word_count_topology.rb diff --git a/examples/exclamation_bolt.rb b/examples/native/exclamation_bolt.rb similarity index 100% rename from examples/exclamation_bolt.rb rename to examples/native/exclamation_bolt.rb diff --git a/examples/local_exclamation_topology.rb b/examples/native/local_exclamation_topology.rb similarity index 100% rename from examples/local_exclamation_topology.rb rename to examples/native/local_exclamation_topology.rb diff --git a/examples/local_exclamation_topology2.rb b/examples/native/local_exclamation_topology2.rb similarity index 100% rename from examples/local_exclamation_topology2.rb rename to examples/native/local_exclamation_topology2.rb diff --git a/examples/local_redis_word_count_topology.rb b/examples/native/local_redis_word_count_topology.rb similarity index 100% rename from examples/local_redis_word_count_topology.rb rename to examples/native/local_redis_word_count_topology.rb diff --git a/examples/local_word_count_topology.rb b/examples/native/local_word_count_topology.rb similarity index 100% rename from examples/local_word_count_topology.rb rename to examples/native/local_word_count_topology.rb diff --git a/examples/random_sentence_spout.rb b/examples/native/random_sentence_spout.rb similarity index 100% rename from examples/random_sentence_spout.rb rename to examples/native/random_sentence_spout.rb diff --git a/examples/split_sentence_bolt.rb b/examples/native/split_sentence_bolt.rb similarity index 100% rename from examples/split_sentence_bolt.rb rename to examples/native/split_sentence_bolt.rb diff --git a/examples/word_count_bolt.rb b/examples/native/word_count_bolt.rb similarity index 100% rename from examples/word_count_bolt.rb rename to examples/native/word_count_bolt.rb diff --git a/examples/simple/cluster_word_count_topology.rb b/examples/simple/cluster_word_count_topology.rb new file mode 100644 index 0000000..54e148e --- /dev/null +++ b/examples/simple/cluster_word_count_topology.rb @@ -0,0 +1,19 @@ +require 'red_storm' +require 'examples/simple/random_sentence_spout' +require 'examples/simple/split_sentence_bolt' +require 'examples/simple/word_count_bolt' + +class ClusterWordCountTopology + def start(base_class_path) + builder = TopologyBuilder.new + builder.setSpout(1, JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5) + builder.setBolt(2, JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 4).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new(base_class_path, "WordCountBolt"), 4).fieldsGrouping(2, Fields.new("word")) + + conf = Config.new + conf.setDebug(true) + conf.setNumWorkers(20); + conf.setMaxSpoutPending(1000); + StormSubmitter.submitTopology("word-count", conf, builder.createTopology); + end +end \ No newline at end of file diff --git a/examples/simple/exclamation_bolt.rb b/examples/simple/exclamation_bolt.rb new file mode 100644 index 0000000..faacc7d --- /dev/null +++ b/examples/simple/exclamation_bolt.rb @@ -0,0 +1,4 @@ +class ExclamationBolt < RedStorm::SimpleBolt + output_fields :word + on_tuple(:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"} +end diff --git a/examples/simple/local_exclamation_topology.rb b/examples/simple/local_exclamation_topology.rb new file mode 100644 index 0000000..dcb1d98 --- /dev/null +++ b/examples/simple/local_exclamation_topology.rb @@ -0,0 +1,25 @@ +java_import 'backtype.storm.testing.TestWordSpout' + +require 'red_storm' +require 'examples/simple/exclamation_bolt' + +# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt + +class LocalExclamationTopology + def start(base_class_path) + builder = TopologyBuilder.new + + builder.setSpout(1, TestWordSpout.new, 10) + builder.setBolt(2, JRubyBolt.new(base_class_path, "ExclamationBolt"), 3).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new(base_class_path, "ExclamationBolt"), 2).shuffleGrouping(2) + + conf = Config.new + conf.setDebug(true) + + cluster = LocalCluster.new + cluster.submitTopology("test", conf, builder.createTopology) + sleep(5) + cluster.killTopology("test") + cluster.shutdown + end +end diff --git a/examples/simple/local_exclamation_topology2.rb b/examples/simple/local_exclamation_topology2.rb new file mode 100644 index 0000000..e9b99ff --- /dev/null +++ b/examples/simple/local_exclamation_topology2.rb @@ -0,0 +1,29 @@ +java_import 'backtype.storm.testing.TestWordSpout' + +require 'red_storm' + +class ExclamationBolt2 < RedStorm::SimpleBolt + output_fields :word + on_tuple(:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"} +end + +# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt + +class LocalExclamationTopology2 + def start(base_class_path) + builder = TopologyBuilder.new + + builder.setSpout(1, TestWordSpout.new, 10) + builder.setBolt(2, JRubyBolt.new(base_class_path, "ExclamationBolt2"), 3).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new(base_class_path, "ExclamationBolt2"), 2).shuffleGrouping(2) + + conf = Config.new + conf.setDebug(true) + + cluster = LocalCluster.new + cluster.submitTopology("test", conf, builder.createTopology) + sleep(5) + cluster.killTopology("test") + cluster.shutdown + end +end diff --git a/examples/simple/local_redis_word_count_topology.rb b/examples/simple/local_redis_word_count_topology.rb new file mode 100644 index 0000000..b5286f7 --- /dev/null +++ b/examples/simple/local_redis_word_count_topology.rb @@ -0,0 +1,52 @@ +require 'redis' +require 'thread' + +require 'red_storm' +require 'examples/simple/word_count_bolt' + +# RedisWordSpout reads the Redis queue "test" on localhost:6379 +# and emits each word items pop'ed from the queue. + +class RedisWordSpout + output_fields :word + + on_next_tuple {@q.pop if @q.size > 0} + + on_init do + @q = Queue.new + @redis_reader = detach_redis_reader + end + + private + + def detach_redis_reader + Thread.new do + Thread.current.abort_on_exception = true + + redis = Redis.new(:host => "localhost", :port => 6379) + loop do + if data = redis.blpop("test", 0) + @q << data[1] + end + end + end + end +end + + +class LocalRedisWordCountTopology + def start(base_class_path) + builder = TopologyBuilder.new + builder.setSpout(1, JRubySpout.new(base_class_path, "RedisWordSpout"), 1) + builder.setBolt(2, JRubyBolt.new(base_class_path, "WordCountBolt"), 3).fieldsGrouping(1, Fields.new("word")) + + conf = Config.new + conf.setDebug(true) + conf.setMaxTaskParallelism(3) + + cluster = LocalCluster.new + cluster.submitTopology("redis-word-count", conf, builder.createTopology) + sleep(600) + cluster.shutdown + end +end \ No newline at end of file diff --git a/examples/simple/local_word_count_topology.rb b/examples/simple/local_word_count_topology.rb new file mode 100644 index 0000000..e6cd029 --- /dev/null +++ b/examples/simple/local_word_count_topology.rb @@ -0,0 +1,22 @@ +require 'red_storm' +require 'examples/simple/random_sentence_spout' +require 'examples/simple/split_sentence_bolt' +require 'examples/simple/word_count_bolt' + +class LocalWordCountTopology + def start(base_class_path) + builder = TopologyBuilder.new + builder.setSpout(1, JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5) + builder.setBolt(2, JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 8).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new(base_class_path, "WordCountBolt"), 12).fieldsGrouping(2, Fields.new("word")) + + conf = Config.new + conf.setDebug(true) + conf.setMaxTaskParallelism(3) + + cluster = LocalCluster.new + cluster.submitTopology("word-count", conf, builder.createTopology) + sleep(5) + cluster.shutdown + end +end \ No newline at end of file diff --git a/examples/simple/random_sentence_spout.rb b/examples/simple/random_sentence_spout.rb new file mode 100644 index 0000000..546d8ee --- /dev/null +++ b/examples/simple/random_sentence_spout.rb @@ -0,0 +1,16 @@ +class RandomSentenceSpout < RedStorm::SimpleSpout + set :is_distributed => true + output_fields :word + + on_next_tuple {@sentences[rand(@sentences.length)]} + + on_init do + @sentences = [ + "the cow jumped over the moon", + "an apple a day keeps the doctor away", + "four score and seven years ago", + "snow white and the seven dwarfs", + "i am at two with nature" + ] + end +end diff --git a/examples/simple/split_sentence_bolt.rb b/examples/simple/split_sentence_bolt.rb new file mode 100644 index 0000000..65da6a6 --- /dev/null +++ b/examples/simple/split_sentence_bolt.rb @@ -0,0 +1,4 @@ +class SplitSentenceBolt < RedStorm::SimpleBolt + output_fields :word + on_tuple(:emit => false) {|tuple| tuple.getString(0).split(' ').each {|w| emit(w)}} +end diff --git a/examples/simple/word_count_bolt.rb b/examples/simple/word_count_bolt.rb new file mode 100644 index 0000000..0a3fd09 --- /dev/null +++ b/examples/simple/word_count_bolt.rb @@ -0,0 +1,32 @@ +class WordCountBolt < RedStorm::SimpleBolt + output_fields :word, :count + + on_tuple do |tuple| + word = tuple.getString(0) + @counts[word] += 1 + + [word, @counts[word]] + end + + def initialize + @counts = Hash.new{|h, k| h[k] = 0} + end +end + + +# class WordCountBolt < RedStorm::SimpleBolt + +# output_fields :word, :count +# on_tuple :count_word, :ack => true, :anchor => true + +# def count_word(tuple) +# word = tuple.getString(0) +# @counts[word] += 1 + +# [word, @counts[word]] +# end + +# def initialize +# @counts = Hash.new{|h, k| h[k] = 0} +# end +# end diff --git a/lib/red_storm.rb b/lib/red_storm.rb index 4b9d984..d3fc287 100644 --- a/lib/red_storm.rb +++ b/lib/red_storm.rb @@ -4,3 +4,5 @@ end require 'red_storm/version' require 'red_storm/application' +require 'red_storm/simple_bolt' +require 'red_storm/simple_spout' diff --git a/lib/red_storm/application.rb b/lib/red_storm/application.rb index 36b24c0..fa128fb 100644 --- a/lib/red_storm/application.rb +++ b/lib/red_storm/application.rb @@ -1,19 +1,22 @@ require 'rake' -class RedStorm::Application +module RedStorm + + class Application - def run(args) - if args.size == 1 && File.exist?(args.first) - load("#{RedStorm::REDSTORM_HOME}/Rakefile") - Rake::Task['launch'].invoke(args) - else - task = args.shift - if ["install", "examples", "jar"].include?(task) + def run(args) + if args.size == 1 && File.exist?(args.first) load("#{RedStorm::REDSTORM_HOME}/Rakefile") - Rake::Task[task].invoke(args) + Rake::Task['launch'].invoke(args) else - puts("\nUsage: redstorm install|examples|jar|topology_class_file_name") - exit(1) + task = args.shift + if ["install", "examples", "jar"].include?(task) + load("#{RedStorm::REDSTORM_HOME}/Rakefile") + Rake::Task[task].invoke(args) + else + puts("\nUsage: redstorm install|examples|jar|topology_class_file_name") + exit(1) + end end end end diff --git a/lib/red_storm/simple_bolt.rb b/lib/red_storm/simple_bolt.rb new file mode 100644 index 0000000..3c1dddd --- /dev/null +++ b/lib/red_storm/simple_bolt.rb @@ -0,0 +1,81 @@ +module RedStorm + + class SimpleBolt + + # DSL class mthods + + def self.output_fields(*fields) + @fields = fields.map(&:to_s) + end + + def self.on_tuple(method_name = nil, options = {}, &execute_block) + self.execute_options.merge!(options) + @execute_block = block_given? ? execute_block : lambda {|tuple| self.send(method_name, tuple)} + end + + def self.on_init(method_name = nil, &init_block) + @init_block = block_given? ? init_block : lambda {self.send(method_name)} + end + + def emit(*values) + @collector.emit(Values.new(*values)) + end + + def ack(tuple) + @collector.ack(tuple) + end + + # Bolt interface + + def execute(tuple) + if (output = instance_exec(tuple, &self.class.execute_block)) && self.class.emit? + values = [output].flatten + self.class.anchor? ? @collector.emit(tuple, Values.new(*values)) : @collector.emit(Values.new(*values)) + @collector.ack(tuple) if self.class.ack? + end + end + + def prepare(config, context, collector) + @collector = collector + @context = context + @config = config + instance_exec(&self.class.init_block) + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new(self.class.fields)) + end + + private + + def self.fields + @fields + end + + def self.execute_block + @execute_block ||= lambda {} + end + + def self.init_block + @init_block ||= lambda {} + end + + def self.emit? + !!self.execute_options[:emit] + end + + def self.ack? + !!self.execute_options[:ack] + end + + def self.anchor? + !!self.execute_options[:anchor] + end + + def self.execute_options + @execute_options ||= {:emit => true, :ack => false, :anchor => false} + end + + end + +end diff --git a/lib/red_storm/simple_spout.rb b/lib/red_storm/simple_spout.rb new file mode 100644 index 0000000..4fbba4d --- /dev/null +++ b/lib/red_storm/simple_spout.rb @@ -0,0 +1,113 @@ +module RedStorm + + class SimpleSpout + + + # DSL class methods + + def self.output_fields(*fields) + @fields = fields.map(&:to_s) + end + + def self.on_next_tuple(method_name = nil, options = {}, &execute_block) + self.execute_options.merge!(options) + @execute_block = block_given? ? execute_block : lambda {self.send(method_name)} + end + + def self.on_init(method_name = nil, &init_block) + @init_block = block_given? ? init_block : lambda {self.send(method_name)} + end + + def self.on_ack(method_name = nil, &ack_block) + @ack_block = block_given? ? ack_block : lambda {|msg_id| self.send(method_name, msg_id)} + end + + def self.on_fail(method_name = nil, &fail_block) + @fail_block = block_given? ? fail_block : lambda {|msg_id| self.send(method_name, msg_id)} + end + + def self.set(options = {}) + self.global_options.merge!(options) + end + + def emit(*values) + @collector.emit(Values.new(*values)) + end + + # Spout interface + + def next_tuple + output = instance_exec(&self.class.execute_block) + if self.class.emit? + if output + values = [output].flatten + @collector.emit(Values.new(*values)) + else + sleep(0.1) + end + end + end + + def open(config, context, collector) + @collector = collector + @context = context + @config = config + instance_exec(&self.class.init_block) + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new(self.class.fields)) + end + + def is_distributed + self.class.is_distributed? + end + + def ack(msg_id) + instance_exec(msg_id, &self.class.ack_block) + end + + def fail(msg_id) + instance_exec(msg_id, &self.class.fail_block) + end + + private + + def self.fields + @fields + end + + def self.execute_block + @execute_block ||= lambda {} + end + + def self.init_block + @init_block ||= lambda {} + end + + def self.ack_block + @ack_block ||= lambda {} + end + + def self.fail_block + @fail_block ||= lambda {} + end + + def self.is_distributed? + !!@global_options[:is_distributed] + end + + def self.execute_options + @execute_options ||= {:emit => true} + end + + def self.global_options + @global_options ||= {:is_distributed => false} + end + + def self.emit? + !!self.execute_options[:emit] + end + end + +end