diff --git a/examples/simple/cluster_word_count_topology.rb b/examples/simple/cluster_word_count_topology.rb deleted file mode 100644 index 54e148e..0000000 --- a/examples/simple/cluster_word_count_topology.rb +++ /dev/null @@ -1,19 +0,0 @@ -require 'red_storm' -require 'examples/simple/random_sentence_spout' -require 'examples/simple/split_sentence_bolt' -require 'examples/simple/word_count_bolt' - -class ClusterWordCountTopology - def start(base_class_path) - builder = TopologyBuilder.new - builder.setSpout(1, JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5) - builder.setBolt(2, JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 4).shuffleGrouping(1) - builder.setBolt(3, JRubyBolt.new(base_class_path, "WordCountBolt"), 4).fieldsGrouping(2, Fields.new("word")) - - conf = Config.new - conf.setDebug(true) - conf.setNumWorkers(20); - conf.setMaxSpoutPending(1000); - StormSubmitter.submitTopology("word-count", conf, builder.createTopology); - end -end \ No newline at end of file diff --git a/examples/simple/exclamation_bolt.rb b/examples/simple/exclamation_bolt.rb index 86dd616..36a1093 100644 --- a/examples/simple/exclamation_bolt.rb +++ b/examples/simple/exclamation_bolt.rb @@ -1,4 +1,4 @@ class ExclamationBolt < RedStorm::SimpleBolt output_fields :word - on_receive(:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"} + on_receive (:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"} end diff --git a/examples/simple/exclamation_topology.rb b/examples/simple/exclamation_topology.rb new file mode 100644 index 0000000..6dc5517 --- /dev/null +++ b/examples/simple/exclamation_topology.rb @@ -0,0 +1,37 @@ +java_import 'backtype.storm.testing.TestWordSpout' + +require 'red_storm' +require 'examples/simple/exclamation_bolt' + +# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt + +class ExclamationTopology < RedStorm::SimpleTopology + spout TestWordSpout, :parallelism => 10 + + bolt ExclamationBolt, :parallelism => 3 do + source TestWordSpout, :shuffle + end + + bolt ExclamationBolt, :id => :exclamation_bolt2, :parallelism => 2 do + source ExclamationBolt, :shuffle + end + + configure do |env| + case env + when :local + debug true + max_task_parallelism 3 + when :cluster + debug true + num_workers 20 + max_spout_pending(1000); + end + end + + on_submit do |env| + if env == :local + sleep(5) + cluster.shutdown + end + end +end \ No newline at end of file diff --git a/examples/simple/exclamation_topology2.rb b/examples/simple/exclamation_topology2.rb new file mode 100644 index 0000000..f790e8c --- /dev/null +++ b/examples/simple/exclamation_topology2.rb @@ -0,0 +1,42 @@ +java_import 'backtype.storm.testing.TestWordSpout' + +require 'red_storm' + +# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt +# and a locally defined ExclamationBolt + +class ExclamationBolt < RedStorm::SimpleBolt + output_fields :word + on_receive(:ack => true, :anchor => true) {|tuple| "!#{tuple.getString(0)}!"} +end + +class ExclamationTopology2 < RedStorm::SimpleTopology + spout TestWordSpout, :parallelism => 10 + + bolt ExclamationBolt, :parallelism => 3 do + source TestWordSpout, :shuffle + end + + bolt ExclamationBolt, :id => :ignore, :parallelism => 2 do + source ExclamationBolt, :shuffle + end + + configure do |env| + case env + when :local + debug true + max_task_parallelism 3 + when :cluster + debug true + num_workers 20 + max_spout_pending(1000); + end + end + + on_submit do |env| + if env == :local + sleep(5) + cluster.shutdown + end + end +end \ No newline at end of file diff --git a/examples/simple/local_exclamation_topology.rb b/examples/simple/local_exclamation_topology.rb deleted file mode 100644 index dcb1d98..0000000 --- a/examples/simple/local_exclamation_topology.rb +++ /dev/null @@ -1,25 +0,0 @@ -java_import 'backtype.storm.testing.TestWordSpout' - -require 'red_storm' -require 'examples/simple/exclamation_bolt' - -# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt - -class LocalExclamationTopology - def start(base_class_path) - builder = TopologyBuilder.new - - builder.setSpout(1, TestWordSpout.new, 10) - builder.setBolt(2, JRubyBolt.new(base_class_path, "ExclamationBolt"), 3).shuffleGrouping(1) - builder.setBolt(3, JRubyBolt.new(base_class_path, "ExclamationBolt"), 2).shuffleGrouping(2) - - conf = Config.new - conf.setDebug(true) - - cluster = LocalCluster.new - cluster.submitTopology("test", conf, builder.createTopology) - sleep(5) - cluster.killTopology("test") - cluster.shutdown - end -end diff --git a/examples/simple/local_exclamation_topology2.rb b/examples/simple/local_exclamation_topology2.rb deleted file mode 100644 index 636e3c7..0000000 --- a/examples/simple/local_exclamation_topology2.rb +++ /dev/null @@ -1,29 +0,0 @@ -java_import 'backtype.storm.testing.TestWordSpout' - -require 'red_storm' - -class ExclamationBolt2 < RedStorm::SimpleBolt - output_fields :word - on_receive(:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"} -end - -# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt - -class LocalExclamationTopology2 - def start(base_class_path) - builder = TopologyBuilder.new - - builder.setSpout(1, TestWordSpout.new, 10) - builder.setBolt(2, JRubyBolt.new(base_class_path, "ExclamationBolt2"), 3).shuffleGrouping(1) - builder.setBolt(3, JRubyBolt.new(base_class_path, "ExclamationBolt2"), 2).shuffleGrouping(2) - - conf = Config.new - conf.setDebug(true) - - cluster = LocalCluster.new - cluster.submitTopology("test", conf, builder.createTopology) - sleep(5) - cluster.killTopology("test") - cluster.shutdown - end -end diff --git a/examples/simple/local_redis_word_count_topology.rb b/examples/simple/redis_word_count_topology.rb similarity index 54% rename from examples/simple/local_redis_word_count_topology.rb rename to examples/simple/redis_word_count_topology.rb index ceb1e41..3203a80 100644 --- a/examples/simple/local_redis_word_count_topology.rb +++ b/examples/simple/redis_word_count_topology.rb @@ -7,7 +7,7 @@ require 'examples/simple/word_count_bolt' # RedisWordSpout reads the Redis queue "test" on localhost:6379 # and emits each word items pop'ed from the queue. -class RedisWordSpout +class RedisWordSpout < RedStorm::SimpleSpout output_fields :word on_send {@q.pop if @q.size > 0} @@ -33,20 +33,22 @@ class RedisWordSpout end end +class RedisWordCountTopology < RedStorm::SimpleTopology + spout RedisWordSpout + + bolt WordCountBolt, :parallelism => 3 do + source RedisWordSpout, :fields => ["word"] + end -class LocalRedisWordCountTopology - def start(base_class_path) - builder = TopologyBuilder.new - builder.setSpout(1, JRubySpout.new(base_class_path, "RedisWordSpout"), 1) - builder.setBolt(2, JRubyBolt.new(base_class_path, "WordCountBolt"), 3).fieldsGrouping(1, Fields.new("word")) - - conf = Config.new - conf.setDebug(true) - conf.setMaxTaskParallelism(3) - - cluster = LocalCluster.new - cluster.submitTopology("redis-word-count", conf, builder.createTopology) - sleep(600) - cluster.shutdown + configure do |env| + case env + when :local + debug true + max_task_parallelism 3 + when :cluster + debug true + num_workers 20 + max_spout_pending(1000); + end end end \ No newline at end of file diff --git a/examples/simple/split_sentence_bolt.rb b/examples/simple/split_sentence_bolt.rb index f762264..d9d4abd 100644 --- a/examples/simple/split_sentence_bolt.rb +++ b/examples/simple/split_sentence_bolt.rb @@ -1,29 +1,27 @@ class SplitSentenceBolt < RedStorm::SimpleBolt output_fields :word + # block declaration style using auto-emit (default) + # + on_receive {|tuple| tuple.getString(0).split(' ').map{|w| [w]}} + # block declaration style no auto-emit # # on_receive :emit => false do |tuple| # tuple.getString(0).split(' ').each{|w| emit(w)} # end - # block declaration style with auto-emit - # - on_receive do |tuple| - tuple.getString(0).split(' ').map{|w| [w]} - end - # alternate declaration style using on_receive method # # on_receive :emit => false # def on_receive(tuple) - # tuple.getString(0).split(' ').each {|w| emit(w)} + # tuple.getString(0).split(' ').map{|w| [w]} # end # alternate declaration style using any specific method # # on_receive :my_method, :emit => false # def my_method(tuple) - # tuple.getString(0).split(' ').each {|w| emit(w)} + # tuple.getString(0).split(' ').map{|w| [w]} # end end diff --git a/examples/simple/word_count_bolt.rb b/examples/simple/word_count_bolt.rb index efc5ba3..44a1d53 100644 --- a/examples/simple/word_count_bolt.rb +++ b/examples/simple/word_count_bolt.rb @@ -1,8 +1,9 @@ class WordCountBolt < RedStorm::SimpleBolt output_fields :word, :count - on_init {@counts = Hash.new{|h, k| h[k] = 0}} + # block declaration style using auto-emit (default) + # on_receive do |tuple| word = tuple.getString(0) @counts[word] += 1 @@ -10,18 +11,3 @@ class WordCountBolt < RedStorm::SimpleBolt [word, @counts[word]] end end - -# below is the same bolt but passing a method name to on_tuple - -# class WordCountBolt < RedStorm::SimpleBolt -# output_fields :word, :count -# on_init {@counts = Hash.new{|h, k| h[k] = 0}} -# on_receive :count_word -# -# def count_word(tuple) -# word = tuple.getString(0) -# @counts[word] += 1 -# -# [word, @counts[word]] -# end -# end diff --git a/examples/simple/local_word_count_topology.rb b/examples/simple/word_count_topology.rb similarity index 70% rename from examples/simple/local_word_count_topology.rb rename to examples/simple/word_count_topology.rb index e0aedc7..402c86e 100644 --- a/examples/simple/local_word_count_topology.rb +++ b/examples/simple/word_count_topology.rb @@ -3,7 +3,7 @@ require 'examples/simple/random_sentence_spout' require 'examples/simple/split_sentence_bolt' require 'examples/simple/word_count_bolt' -class LocalWordCountTopology < RedStorm::SimpleTopology +class WordCountTopology < RedStorm::SimpleTopology spout RandomSentenceSpout, :parallelism => 5 bolt SplitSentenceBolt, :parallelism => 8 do @@ -15,8 +15,15 @@ class LocalWordCountTopology < RedStorm::SimpleTopology end configure :word_count do |env| - debug true - max_task_parallelism 3 + case env + when :local + debug true + max_task_parallelism 3 + when :cluster + debug true + num_workers 20 + max_spout_pending(1000); + end end on_submit do |env|