From 0128e292793a088c25819600a6774c22305a1b68 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Mon, 12 Dec 2011 17:16:49 -0500 Subject: [PATCH] all examples in RedStorm::Examples namespace and descriptive ids --- .../native/cluster_word_count_topology.rb | 29 +++--- examples/native/exclamation_bolt.rb | 24 +++-- examples/native/local_exclamation_topology.rb | 42 +++++---- .../native/local_exclamation_topology2.rb | 72 ++++++++------- .../native/local_redis_word_count_topology.rb | 91 ++++++++++--------- examples/native/local_word_count_topology.rb | 9 +- examples/native/random_sentence_spout.rb | 48 +++++----- examples/native/split_sentence_bolt.rb | 22 +++-- examples/native/word_count_bolt.rb | 32 ++++--- examples/simple/exclamation_bolt.rb | 10 +- examples/simple/exclamation_topology.rb | 53 ++++++----- examples/simple/exclamation_topology2.rb | 63 +++++++------ examples/simple/random_sentence_spout.rb | 30 +++--- examples/simple/redis_word_count_topology.rb | 80 ++++++++-------- examples/simple/split_sentence_bolt.rb | 48 +++++----- examples/simple/word_count_bolt.rb | 22 +++-- examples/simple/word_count_topology.rb | 51 ++++++----- 17 files changed, 396 insertions(+), 330 deletions(-) diff --git a/examples/native/cluster_word_count_topology.rb b/examples/native/cluster_word_count_topology.rb index 4ec25c9..16daff7 100644 --- a/examples/native/cluster_word_count_topology.rb +++ b/examples/native/cluster_word_count_topology.rb @@ -1,20 +1,25 @@ +require 'red_storm' require 'examples/native/random_sentence_spout' require 'examples/native/split_sentence_bolt' require 'examples/native/word_count_bolt' -class ClusterWordCountTopology - RedStorm::Configuration.topology_class = self +module RedStorm + module Examples + class ClusterWordCountTopology + RedStorm::Configuration.topology_class = self - def start(base_class_path, env) - builder = TopologyBuilder.new - builder.setSpout('1', JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5) - builder.setBolt('2', JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 4).shuffleGrouping('1') - builder.setBolt('3', JRubyBolt.new(base_class_path, "WordCountBolt"), 4).fieldsGrouping('2', Fields.new("word")) + def start(base_class_path, env) + builder = TopologyBuilder.new + builder.setSpout('RandomSentenceSpout', JRubySpout.new(base_class_path, "RedStorm::Examples::RandomSentenceSpout"), 5) + builder.setBolt('SplitSentenceBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::SplitSentenceBolt"), 4).shuffleGrouping('RandomSentenceSpout') + builder.setBolt('WordCountBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::WordCountBolt"), 4).fieldsGrouping('SplitSentenceBolt', Fields.new("word")) - conf = Config.new - conf.setDebug(true) - conf.setNumWorkers(20); - conf.setMaxSpoutPending(1000); - StormSubmitter.submitTopology("word-count", conf, builder.createTopology); + conf = Config.new + conf.setDebug(true) + conf.setNumWorkers(20); + conf.setMaxSpoutPending(1000); + StormSubmitter.submitTopology("word_count", conf, builder.createTopology); + end + end end end \ No newline at end of file diff --git a/examples/native/exclamation_bolt.rb b/examples/native/exclamation_bolt.rb index abf4222..bb81213 100644 --- a/examples/native/exclamation_bolt.rb +++ b/examples/native/exclamation_bolt.rb @@ -1,14 +1,18 @@ -class ExclamationBolt - def prepare(conf, context, collector) - @collector = collector - end +module RedStorm + module Examples + class ExclamationBolt + def prepare(conf, context, collector) + @collector = collector + end - def execute(tuple) - @collector.emit(tuple, Values.new(tuple.getString(0) + "!!!")) - @collector.ack(tuple) - end + def execute(tuple) + @collector.emit(tuple, Values.new(tuple.getString(0) + "!!!")) + @collector.ack(tuple) + end - def declare_output_fields(declarer) - declarer.declare(Fields.new("word")) + def declare_output_fields(declarer) + declarer.declare(Fields.new("word")) + end + end end end diff --git a/examples/native/local_exclamation_topology.rb b/examples/native/local_exclamation_topology.rb index b27257a..065237d 100644 --- a/examples/native/local_exclamation_topology.rb +++ b/examples/native/local_exclamation_topology.rb @@ -1,25 +1,31 @@ java_import 'backtype.storm.testing.TestWordSpout' + +require 'lib/red_storm' require 'examples/native/exclamation_bolt' # this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt -class LocalExclamationTopology - RedStorm::Configuration.topology_class = self +module RedStorm + module Examples + class LocalExclamationTopology + RedStorm::Configuration.topology_class = self - def start(base_class_path, env) - builder = TopologyBuilder.new - - builder.setSpout('1', TestWordSpout.new, 10) - builder.setBolt('2', JRubyBolt.new(base_class_path, "ExclamationBolt"), 3).shuffleGrouping('1') - builder.setBolt('3', JRubyBolt.new(base_class_path, "ExclamationBolt"), 2).shuffleGrouping('2') - - conf = Config.new - conf.setDebug(true) - - cluster = LocalCluster.new - cluster.submitTopology("test", conf, builder.createTopology) - sleep(5) - cluster.killTopology("test") - cluster.shutdown + def start(base_class_path, env) + builder = TopologyBuilder.new + + builder.setSpout('TestWordSpout', TestWordSpout.new, 10) + builder.setBolt('ExclamationBolt1', JRubyBolt.new(base_class_path, 'RedStorm::Examples::ExclamationBolt'), 3).shuffleGrouping('TestWordSpout') + builder.setBolt('ExclamationBolt2', JRubyBolt.new(base_class_path, 'RedStorm::Examples::ExclamationBolt'), 3).shuffleGrouping('ExclamationBolt1') + + conf = Config.new + conf.setDebug(true) + + cluster = LocalCluster.new + cluster.submitTopology("exclamation", conf, builder.createTopology) + sleep(5) + cluster.killTopology("exclamation") + cluster.shutdown + end + end end -end +end \ No newline at end of file diff --git a/examples/native/local_exclamation_topology2.rb b/examples/native/local_exclamation_topology2.rb index 801a3d5..9f14795 100644 --- a/examples/native/local_exclamation_topology2.rb +++ b/examples/native/local_exclamation_topology2.rb @@ -1,39 +1,45 @@ java_import 'backtype.storm.testing.TestWordSpout' -class ExclamationBolt2 - def prepare(conf, context, collector) - @collector = collector - end +require 'lib/red_storm' - def execute(tuple) - @collector.emit(tuple, Values.new(tuple.getString(0) + "!!!")) - @collector.ack(tuple) - end +module RedStorm + module Examples + class ExclamationBolt2 + def prepare(conf, context, collector) + @collector = collector + end - def declare_output_fields(declarer) - declarer.declare(Fields.new("word")) - end -end - -# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt - -class LocalExclamationTopology2 - RedStorm::Configuration.topology_class = self - - def start(base_class_path, env) - builder = TopologyBuilder.new - - builder.setSpout('1', TestWordSpout.new, 10) - builder.setBolt('2', JRubyBolt.new(base_class_path, "ExclamationBolt2"), 3).shuffleGrouping('1') - builder.setBolt('3', JRubyBolt.new(base_class_path, "ExclamationBolt2"), 2).shuffleGrouping('2') - - conf = Config.new - conf.setDebug(true) - - cluster = LocalCluster.new - cluster.submitTopology("test", conf, builder.createTopology) - sleep(5) - cluster.killTopology("test") - cluster.shutdown + def execute(tuple) + @collector.emit(tuple, Values.new("!#{tuple.getString(0)}!")) + @collector.ack(tuple) + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new("word")) + end + end + + # this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt + + class LocalExclamationTopology2 + RedStorm::Configuration.topology_class = self + + def start(base_class_path, env) + builder = TopologyBuilder.new + + builder.setSpout('TestWordSpout', TestWordSpout.new, 10) + builder.setBolt('ExclamationBolt21', JRubyBolt.new(base_class_path, "RedStorm::Examples::ExclamationBolt2"), 3).shuffleGrouping('TestWordSpout') + builder.setBolt('ExclamationBolt22', JRubyBolt.new(base_class_path, "RedStorm::Examples::ExclamationBolt2"), 2).shuffleGrouping('ExclamationBolt21') + + conf = Config.new + conf.setDebug(true) + + cluster = LocalCluster.new + cluster.submitTopology("exclamation", conf, builder.createTopology) + sleep(5) + cluster.killTopology("exclamation") + cluster.shutdown + end + end end end diff --git a/examples/native/local_redis_word_count_topology.rb b/examples/native/local_redis_word_count_topology.rb index 9062968..cc89480 100644 --- a/examples/native/local_redis_word_count_topology.rb +++ b/examples/native/local_redis_word_count_topology.rb @@ -1,60 +1,65 @@ require 'redis' require 'thread' +require 'lib/red_storm' require 'examples/native/word_count_bolt' -# RedisWordSpout reads the Redis queue "test" on localhost:6379 -# and emits each word items pop'ed from the queue. -class RedisWordSpout - def open(conf, context, collector) - @collector = collector - @q = Queue.new - @redis_reader = detach_redis_reader - end - - def next_tuple - # per doc nextTuple should not block, and sleep a bit when there's no data to process. - if @q.size > 0 - @collector.emit(Values.new(@q.pop)) - else - sleep(0.1) - end - end +module RedStorm + module Examples + # RedisWordSpout reads the Redis queue "test" on localhost:6379 + # and emits each word items pop'ed from the queue. + class RedisWordSpout + def open(conf, context, collector) + @collector = collector + @q = Queue.new + @redis_reader = detach_redis_reader + end + + def next_tuple + # per doc nextTuple should not block, and sleep a bit when there's no data to process. + if @q.size > 0 + @collector.emit(Values.new(@q.pop)) + else + sleep(0.1) + end + end - def declare_output_fields(declarer) - declarer.declare(Fields.new("word")) - end + def declare_output_fields(declarer) + declarer.declare(Fields.new("word")) + end - private + private - def detach_redis_reader - Thread.new do - Thread.current.abort_on_exception = true + def detach_redis_reader + Thread.new do + Thread.current.abort_on_exception = true - redis = Redis.new(:host => "localhost", :port => 6379) - loop do - if data = redis.blpop("test", 0) - @q << data[1] + redis = Redis.new(:host => "localhost", :port => 6379) + loop do + if data = redis.blpop("test", 0) + @q << data[1] + end + end end end end - end -end -class LocalRedisWordCountTopology - RedStorm::Configuration.topology_class = self + class LocalRedisWordCountTopology + RedStorm::Configuration.topology_class = self - def start(base_class_path, env) - builder = TopologyBuilder.new - builder.setSpout('1', JRubySpout.new(base_class_path, "RedisWordSpout"), 1) - builder.setBolt('2', JRubyBolt.new(base_class_path, "WordCountBolt"), 3).fieldsGrouping('1', Fields.new("word")) + def start(base_class_path, env) + builder = TopologyBuilder.new + builder.setSpout('RedisWordSpout', JRubySpout.new(base_class_path, "RedStorm::Examples::RedisWordSpout"), 1) + builder.setBolt('WordCountBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::WordCountBolt"), 3).fieldsGrouping('RedisWordSpout', Fields.new("word")) - conf = Config.new - conf.setDebug(true) - conf.setMaxTaskParallelism(3) + conf = Config.new + conf.setDebug(true) + conf.setMaxTaskParallelism(3) - cluster = LocalCluster.new - cluster.submitTopology("redis-word-count", conf, builder.createTopology) - sleep(600) - cluster.shutdown + cluster = LocalCluster.new + cluster.submitTopology("redis_word_count", conf, builder.createTopology) + sleep(600) + cluster.shutdown + end + end end end \ No newline at end of file diff --git a/examples/native/local_word_count_topology.rb b/examples/native/local_word_count_topology.rb index 0584b63..9631df7 100644 --- a/examples/native/local_word_count_topology.rb +++ b/examples/native/local_word_count_topology.rb @@ -1,3 +1,4 @@ +require 'lib/red_storm' require 'examples/native/random_sentence_spout' require 'examples/native/split_sentence_bolt' require 'examples/native/word_count_bolt' @@ -9,16 +10,16 @@ module Examples def start(base_class_path, env) builder = TopologyBuilder.new - builder.setSpout('1', JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5) - builder.setBolt('2', JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 8).shuffleGrouping('1') - builder.setBolt('3', JRubyBolt.new(base_class_path, "WordCountBolt"), 12).fieldsGrouping('2', Fields.new("word")) + builder.setSpout('RandomSentenceSpout', JRubySpout.new(base_class_path, "RedStorm::Examples::RandomSentenceSpout"), 5) + builder.setBolt('SplitSentenceBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::SplitSentenceBolt"), 8).shuffleGrouping('RandomSentenceSpout') + builder.setBolt('WordCountBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::WordCountBolt"), 12).fieldsGrouping('SplitSentenceBolt', Fields.new("word")) conf = Config.new conf.setDebug(true) conf.setMaxTaskParallelism(3) cluster = LocalCluster.new - cluster.submitTopology("word-count", conf, builder.createTopology) + cluster.submitTopology("word_count", conf, builder.createTopology) sleep(5) cluster.shutdown end diff --git a/examples/native/random_sentence_spout.rb b/examples/native/random_sentence_spout.rb index 0cf2c30..bd8c69a 100644 --- a/examples/native/random_sentence_spout.rb +++ b/examples/native/random_sentence_spout.rb @@ -1,26 +1,30 @@ -class RandomSentenceSpout - attr_reader :is_distributed +module RedStorm + module Examples + class RandomSentenceSpout + attr_reader :is_distributed - def initialize - @is_distributed = true - @sentences = [ - "the cow jumped over the moon", - "an apple a day keeps the doctor away", - "four score and seven years ago", - "snow white and the seven dwarfs", - "i am at two with nature" - ] - end + def initialize + @is_distributed = true + @sentences = [ + "the cow jumped over the moon", + "an apple a day keeps the doctor away", + "four score and seven years ago", + "snow white and the seven dwarfs", + "i am at two with nature" + ] + end - def open(conf, context, collector) - @collector = collector - end - - def next_tuple - @collector.emit(Values.new(@sentences[rand(@sentences.length)])) - end + def open(conf, context, collector) + @collector = collector + end + + def next_tuple + @collector.emit(Values.new(@sentences[rand(@sentences.length)])) + end - def declare_output_fields(declarer) - declarer.declare(Fields.new("word")) + def declare_output_fields(declarer) + declarer.declare(Fields.new("word")) + end + end end -end +end \ No newline at end of file diff --git a/examples/native/split_sentence_bolt.rb b/examples/native/split_sentence_bolt.rb index 56a995a..e4699ce 100644 --- a/examples/native/split_sentence_bolt.rb +++ b/examples/native/split_sentence_bolt.rb @@ -1,13 +1,17 @@ -class SplitSentenceBolt - def prepare(conf, context, collector) - @collector = collector - end +module RedStorm + module Examples + class SplitSentenceBolt + def prepare(conf, context, collector) + @collector = collector + end - def execute(tuple) - tuple.getString(0).split(" ").each {|w| @collector.emit(Values.new(w)) } - end + def execute(tuple) + tuple.getString(0).split(" ").each {|w| @collector.emit(Values.new(w)) } + end - def declare_output_fields(declarer) - declarer.declare(Fields.new("word")) + def declare_output_fields(declarer) + declarer.declare(Fields.new("word")) + end + end end end diff --git a/examples/native/word_count_bolt.rb b/examples/native/word_count_bolt.rb index f86498b..d3436ba 100644 --- a/examples/native/word_count_bolt.rb +++ b/examples/native/word_count_bolt.rb @@ -1,19 +1,23 @@ -class WordCountBolt - def initialize - @counts = Hash.new{|h, k| h[k] = 0} - end +module RedStorm + module Examples + class WordCountBolt + def initialize + @counts = Hash.new{|h, k| h[k] = 0} + end - def prepare(conf, context, collector) - @collector = collector - end + def prepare(conf, context, collector) + @collector = collector + end - def execute(tuple) - word = tuple.getString(0) - @counts[word] += 1 - @collector.emit(Values.new(word, @counts[word])) - end + def execute(tuple) + word = tuple.getString(0) + @counts[word] += 1 + @collector.emit(Values.new(word, @counts[word])) + end - def declare_output_fields(declarer) - declarer.declare(Fields.new("word", "count")) + def declare_output_fields(declarer) + declarer.declare(Fields.new("word", "count")) + end + end end end diff --git a/examples/simple/exclamation_bolt.rb b/examples/simple/exclamation_bolt.rb index a4fd652..f24035f 100644 --- a/examples/simple/exclamation_bolt.rb +++ b/examples/simple/exclamation_bolt.rb @@ -1,6 +1,10 @@ require 'red_storm' -class ExclamationBolt < RedStorm::SimpleBolt - output_fields :word - on_receive(:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"} +module RedStorm + module Examples + class ExclamationBolt < RedStorm::SimpleBolt + output_fields :word + on_receive(:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"} + end + end end diff --git a/examples/simple/exclamation_topology.rb b/examples/simple/exclamation_topology.rb index b992d42..3eaf40d 100644 --- a/examples/simple/exclamation_topology.rb +++ b/examples/simple/exclamation_topology.rb @@ -4,33 +4,36 @@ require 'examples/simple/exclamation_bolt' # this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt -class ExclamationTopology < RedStorm::SimpleTopology - spout TestWordSpout, :parallelism => 10 - - bolt ExclamationBolt, :parallelism => 3 do - source TestWordSpout, :shuffle - end - - bolt ExclamationBolt, :id => :ignore, :parallelism => 2 do - source ExclamationBolt, :shuffle - end +module RedStorm + module Examples + class ExclamationTopology < RedStorm::SimpleTopology + spout TestWordSpout, :parallelism => 10 + + bolt ExclamationBolt, :parallelism => 3 do + source TestWordSpout, :shuffle + end + + bolt ExclamationBolt, :id => :ExclamationBolt2, :parallelism => 2 do + source ExclamationBolt, :shuffle + end - configure do |env| - case env - when :local - debug true - max_task_parallelism 3 - when :cluster - debug true - num_workers 20 - max_spout_pending(1000); - end - end + configure do |env| + debug true + case env + when :local + max_task_parallelism 3 + when :cluster + num_workers 20 + max_spout_pending(1000); + end + end - on_submit do |env| - if env == :local - sleep(5) - cluster.shutdown + on_submit do |env| + if env == :local + sleep(5) + cluster.shutdown + end + end end end end \ No newline at end of file diff --git a/examples/simple/exclamation_topology2.rb b/examples/simple/exclamation_topology2.rb index dc4cdf7..2a0765d 100644 --- a/examples/simple/exclamation_topology2.rb +++ b/examples/simple/exclamation_topology2.rb @@ -4,38 +4,41 @@ require 'red_storm' # this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt # and a locally defined ExclamationBolt -class ExclamationBolt < RedStorm::SimpleBolt - output_fields :word - on_receive(:ack => true, :anchor => true) {|tuple| "!#{tuple.getString(0)}!"} -end - -class ExclamationTopology2 < RedStorm::SimpleTopology - spout TestWordSpout, :parallelism => 10 - - bolt ExclamationBolt, :parallelism => 3 do - source TestWordSpout, :shuffle - end - - bolt ExclamationBolt, :id => :ignore, :parallelism => 2 do - source ExclamationBolt, :shuffle - end - - configure do |env| - case env - when :local - debug true - max_task_parallelism 3 - when :cluster - debug true - num_workers 20 - max_spout_pending(1000); +module RedStorm + module Examples + class ExclamationBolt < RedStorm::SimpleBolt + output_fields :word + on_receive(:ack => true, :anchor => true) {|tuple| "!#{tuple.getString(0)}!"} end - end - on_submit do |env| - if env == :local - sleep(5) - cluster.shutdown + class ExclamationTopology2 < RedStorm::SimpleTopology + spout TestWordSpout, :parallelism => 10 + + bolt ExclamationBolt, :parallelism => 3 do + source TestWordSpout, :shuffle + end + + bolt ExclamationBolt, :id => :ExclamationBolt2, :parallelism => 2 do + source ExclamationBolt, :shuffle + end + + configure do |env| + debug true + case env + when :local + max_task_parallelism 3 + when :cluster + num_workers 20 + max_spout_pending(1000); + end + end + + on_submit do |env| + if env == :local + sleep(5) + cluster.shutdown + end + end end end end \ No newline at end of file diff --git a/examples/simple/random_sentence_spout.rb b/examples/simple/random_sentence_spout.rb index a0950ff..5e193aa 100644 --- a/examples/simple/random_sentence_spout.rb +++ b/examples/simple/random_sentence_spout.rb @@ -1,18 +1,22 @@ require 'red_storm' -class RandomSentenceSpout < RedStorm::SimpleSpout - set :is_distributed => true - output_fields :word +module RedStorm + module Examples + class RandomSentenceSpout < RedStorm::SimpleSpout + set :is_distributed => true + output_fields :word - on_send {@sentences[rand(@sentences.length)]} + on_send {@sentences[rand(@sentences.length)]} - on_init do - @sentences = [ - "the cow jumped over the moon", - "an apple a day keeps the doctor away", - "four score and seven years ago", - "snow white and the seven dwarfs", - "i am at two with nature" - ] + on_init do + @sentences = [ + "the cow jumped over the moon", + "an apple a day keeps the doctor away", + "four score and seven years ago", + "snow white and the seven dwarfs", + "i am at two with nature" + ] + end + end end -end +end \ No newline at end of file diff --git a/examples/simple/redis_word_count_topology.rb b/examples/simple/redis_word_count_topology.rb index a7471f4..48edfc6 100644 --- a/examples/simple/redis_word_count_topology.rb +++ b/examples/simple/redis_word_count_topology.rb @@ -5,51 +5,55 @@ require 'red_storm' require 'examples/simple/word_count_bolt' -# RedisWordSpout reads the Redis queue "test" on localhost:6379 -# and emits each word items pop'ed from the queue. +module RedStorm + module Examples -class RedisWordSpout < RedStorm::SimpleSpout - output_fields :word + # RedisWordSpout reads the Redis queue "test" on localhost:6379 + # and emits each word items pop'ed from the queue. - on_send {@q.pop if @q.size > 0} + class RedisWordSpout < RedStorm::SimpleSpout + output_fields :word - on_init do - @q = Queue.new - @redis_reader = detach_redis_reader - end - - private + on_send {@q.pop if @q.size > 0} - def detach_redis_reader - Thread.new do - Thread.current.abort_on_exception = true + on_init do + @q = Queue.new + @redis_reader = detach_redis_reader + end + + private - redis = Redis.new(:host => "localhost", :port => 6379) - loop do - if data = redis.blpop("test", 0) - @q << data[1] + def detach_redis_reader + Thread.new do + Thread.current.abort_on_exception = true + + redis = Redis.new(:host => "localhost", :port => 6379) + loop do + if data = redis.blpop("test", 0) + @q << data[1] + end + end + end + end + end + + class RedisWordCountTopology < RedStorm::SimpleTopology + spout RedisWordSpout + + bolt WordCountBolt, :parallelism => 3 do + source RedisWordSpout, :fields => ["word"] + end + + configure do |env| + debug true + case env + when :local + max_task_parallelism 3 + when :cluster + num_workers 20 + max_spout_pending(1000); end end end end -end - -class RedisWordCountTopology < RedStorm::SimpleTopology - spout RedisWordSpout - - bolt WordCountBolt, :parallelism => 3 do - source RedisWordSpout, :fields => ["word"] - end - - configure do |env| - case env - when :local - debug true - max_task_parallelism 3 - when :cluster - debug true - num_workers 20 - max_spout_pending(1000); - end - end end \ No newline at end of file diff --git a/examples/simple/split_sentence_bolt.rb b/examples/simple/split_sentence_bolt.rb index bc20033..13136fc 100644 --- a/examples/simple/split_sentence_bolt.rb +++ b/examples/simple/split_sentence_bolt.rb @@ -1,29 +1,33 @@ require 'red_storm' -class SplitSentenceBolt < RedStorm::SimpleBolt - output_fields :word +module RedStorm + module Examples + class SplitSentenceBolt < RedStorm::SimpleBolt + output_fields :word - # block declaration style using auto-emit (default) - # - on_receive {|tuple| tuple.getString(0).split(' ').map{|w| [w]}} + # block declaration style using auto-emit (default) + # + on_receive {|tuple| tuple.getString(0).split(' ').map{|w| [w]}} - # block declaration style no auto-emit - # - # on_receive :emit => false do |tuple| - # tuple.getString(0).split(' ').each{|w| unanchored_emit(w)} - # end + # block declaration style no auto-emit + # + # on_receive :emit => false do |tuple| + # tuple.getString(0).split(' ').each{|w| unanchored_emit(w)} + # end - # alternate declaration style using on_receive method - # - # on_receive :emit => true - # def on_receive(tuple) - # tuple.getString(0).split(' ').map{|w| [w]} - # end + # alternate declaration style using on_receive method + # + # on_receive :emit => true + # def on_receive(tuple) + # tuple.getString(0).split(' ').map{|w| [w]} + # end - # alternate declaration style using any specific method - # - # on_receive :my_method, :emit => true - # def my_method(tuple) - # tuple.getString(0).split(' ').map{|w| [w]} - # end + # alternate declaration style using any specific method + # + # on_receive :my_method, :emit => true + # def my_method(tuple) + # tuple.getString(0).split(' ').map{|w| [w]} + # end + end + end end diff --git a/examples/simple/word_count_bolt.rb b/examples/simple/word_count_bolt.rb index 94bc2cb..017567f 100644 --- a/examples/simple/word_count_bolt.rb +++ b/examples/simple/word_count_bolt.rb @@ -1,15 +1,19 @@ require 'red_storm' -class WordCountBolt < RedStorm::SimpleBolt - output_fields :word, :count - on_init {@counts = Hash.new{|h, k| h[k] = 0}} +module RedStorm + module Examples + class WordCountBolt < RedStorm::SimpleBolt + output_fields :word, :count + on_init {@counts = Hash.new{|h, k| h[k] = 0}} - # block declaration style using auto-emit (default) - # - on_receive do |tuple| - word = tuple.getString(0) - @counts[word] += 1 + # block declaration style using auto-emit (default) + # + on_receive do |tuple| + word = tuple.getString(0) + @counts[word] += 1 - [word, @counts[word]] + [word, @counts[word]] + end + end end end diff --git a/examples/simple/word_count_topology.rb b/examples/simple/word_count_topology.rb index 623642c..73b2188 100644 --- a/examples/simple/word_count_topology.rb +++ b/examples/simple/word_count_topology.rb @@ -3,33 +3,34 @@ require 'examples/simple/split_sentence_bolt' require 'examples/simple/word_count_bolt' module RedStorm - class WordCountTopology < SimpleTopology - spout RandomSentenceSpout, :parallelism => 5 - - bolt SplitSentenceBolt, :parallelism => 8 do - source RandomSentenceSpout, :shuffle - end - - bolt WordCountBolt, :parallelism => 12 do - source SplitSentenceBolt, :fields => ["word"] - end - - configure :word_count do |env| - case env - when :local - debug true - max_task_parallelism 3 - when :cluster - debug true - num_workers 20 - max_spout_pending(1000); + module Examples + class WordCountTopology < SimpleTopology + spout RandomSentenceSpout, :parallelism => 5 + + bolt SplitSentenceBolt, :parallelism => 8 do + source RandomSentenceSpout, :shuffle + end + + bolt WordCountBolt, :parallelism => 12 do + source SplitSentenceBolt, :fields => ["word"] end - end - on_submit do |env| - if env == :local - sleep(5) - cluster.shutdown + configure :word_count do |env| + debug true + case env + when :local + max_task_parallelism 3 + when :cluster + num_workers 20 + max_spout_pending(1000); + end + end + + on_submit do |env| + if env == :local + sleep(5) + cluster.shutdown + end end end end