From 6aa739338b2da1a13540c2ca631aafb46d473379 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Mon, 7 Nov 2011 15:43:46 -0500 Subject: [PATCH] examples & cosmetic cleanup --- Rakefile | 9 ++- examples/cluster_word_count_topology.rb | 70 ++-------------- ...xclamation_bolt.rb => exclamation_bolt.rb} | 2 +- ...ology.rb => local_exclamation_topology.rb} | 10 ++- ...ogy2.rb => local_exclamation_topology2.rb} | 10 ++- ....rb => local_redis_word_count_topology.rb} | 31 ++------ examples/local_word_count_topology.rb | 21 +++++ examples/random_sentence_spout.rb | 26 ++++++ examples/ruby_word_count_topology.rb | 79 ------------------- examples/split_sentence_bolt.rb | 13 +++ examples/word_count_bolt.rb | 19 +++++ lib/red_storm/application.rb | 13 ++- lib/red_storm/topology_launcher.rb | 2 +- lib/red_storm/version.rb | 2 +- pom.xml | 4 +- 15 files changed, 124 insertions(+), 187 deletions(-) rename examples/{ruby_exclamation_bolt.rb => exclamation_bolt.rb} (91%) rename examples/{ruby_exclamation_topology.rb => local_exclamation_topology.rb} (54%) rename examples/{ruby_exclamation_topology2.rb => local_exclamation_topology2.rb} (67%) rename examples/{ruby_redis_word_count_topology.rb => local_redis_word_count_topology.rb} (60%) create mode 100644 examples/local_word_count_topology.rb create mode 100644 examples/random_sentence_spout.rb delete mode 100644 examples/ruby_word_count_topology.rb create mode 100644 examples/split_sentence_bolt.rb create mode 100644 examples/word_count_bolt.rb diff --git a/Rakefile b/Rakefile index 10f3eae..62c45ba 100644 --- a/Rakefile +++ b/Rakefile @@ -15,6 +15,7 @@ TARGET_SRC_DIR = "#{TARGET_DIR}/src" TARGET_CLASSES_DIR = "#{TARGET_DIR}/classes" TARGET_DEPENDENCY_DIR = "#{TARGET_DIR}/dependency" TARGET_DEPENDENCY_UNPACKED_DIR = "#{TARGET_DIR}/dependency-unpacked" +TARGET_CLUSTER_JAR = "#{TARGET_DIR}/cluster-topology.jar" JAVA_SRC_DIR = "#{RedStorm::REDSTORM_HOME}/src/main" JRUBY_SRC_DIR = "#{RedStorm::REDSTORM_HOME}/lib/red_storm" @@ -46,14 +47,16 @@ task :setup do end end -task :install => [:deps, :build] +task :install => [:deps, :build] do + puts("\nRedStorm install completed. All dependencies installed in #{TARGET_DIR}") +end task :unpack do system("rmvn dependency:unpack -f #{RedStorm::REDSTORM_HOME}/pom.xml -DoutputDirectory=#{TARGET_DEPENDENCY_UNPACKED_DIR}") end task :jar => [:unpack, :clean_jar] do - ant.jar :destfile => "#{TARGET_DIR}/cluster-topology.jar" do + ant.jar :destfile => TARGET_CLUSTER_JAR do fileset :dir => TARGET_CLASSES_DIR fileset :dir => TARGET_DEPENDENCY_UNPACKED_DIR fileset :dir => CWD do @@ -63,6 +66,7 @@ task :jar => [:unpack, :clean_jar] do attribute :name => "Main-Class", :value => "redstorm.TopologyLauncher" end end + puts("\nRedStorm jar completed. Generated jar file #{TARGET_CLUSTER_JAR}") end task :examples do @@ -78,6 +82,7 @@ task :examples do puts("copying examples into #{DST_EXAMPLES}") system("mkdir #{DST_EXAMPLES}") system("cp -r #{SRC_EXAMPLES}/* #{DST_EXAMPLES}") + puts("\nRedStorm examples completed. All examples copied in #{DST_EXAMPLES}") end task :deps do diff --git a/examples/cluster_word_count_topology.rb b/examples/cluster_word_count_topology.rb index 778583b..c40ab05 100644 --- a/examples/cluster_word_count_topology.rb +++ b/examples/cluster_word_count_topology.rb @@ -1,71 +1,13 @@ -class RubyRandomSentenceSpout - attr_reader :is_distributed - - def initialize - @is_distributed = true - @sentences = [ - "the cow jumped over the moon", - "an apple a day keeps the doctor away", - "four score and seven years ago", - "snow white and the seven dwarfs", - "i am at two with nature" - ] - end - - def open(conf, context, collector) - @collector = collector - end - - def next_tuple - @collector.emit(Values.new(@sentences[rand(@sentences.length)])) - end - - def declare_output_fields(declarer) - declarer.declare(Fields.new("word")) - end -end - -class RubySplitSentence - def prepare(conf, context, collector) - @collector = collector - end - - def execute(tuple) - tuple.getString(0).split(" ").each {|w| @collector.emit(Values.new(w)) } - end - - def declare_output_fields(declarer) - declarer.declare(Fields.new("word")) - end -end - -class RubyWordCount - def initialize - @counts = Hash.new{|h, k| h[k] = 0} - end - - def prepare(conf, context, collector) - @collector = collector - end - - def execute(tuple) - word = tuple.getString(0) - @counts[word] += 1 - @collector.emit(Values.new(word, @counts[word])) - end - - def declare_output_fields(declarer) - declarer.declare(Fields.new("word", "count")) - end -end - +require 'examples/random_sentence_spout' +require 'examples/split_sentence_bolt' +require 'examples/word_count_bolt' class ClusterWordCountTopology def start(base_class_path) builder = TopologyBuilder.new - builder.setSpout(1, JRubySpout.new(base_class_path, "RubyRandomSentenceSpout"), 5) - builder.setBolt(2, JRubyBolt.new(base_class_path, "RubySplitSentence"), 4).shuffleGrouping(1) - builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyWordCount"), 4).fieldsGrouping(2, Fields.new("word")) + builder.setSpout(1, JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5) + builder.setBolt(2, JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 4).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new(base_class_path, "WordCountBolt"), 4).fieldsGrouping(2, Fields.new("word")) conf = Config.new conf.setDebug(true) diff --git a/examples/ruby_exclamation_bolt.rb b/examples/exclamation_bolt.rb similarity index 91% rename from examples/ruby_exclamation_bolt.rb rename to examples/exclamation_bolt.rb index 72d1071..abf4222 100644 --- a/examples/ruby_exclamation_bolt.rb +++ b/examples/exclamation_bolt.rb @@ -1,4 +1,4 @@ -class RubyExclamationBolt +class ExclamationBolt def prepare(conf, context, collector) @collector = collector end diff --git a/examples/ruby_exclamation_topology.rb b/examples/local_exclamation_topology.rb similarity index 54% rename from examples/ruby_exclamation_topology.rb rename to examples/local_exclamation_topology.rb index 25590d7..17cda37 100644 --- a/examples/ruby_exclamation_topology.rb +++ b/examples/local_exclamation_topology.rb @@ -1,13 +1,15 @@ java_import 'backtype.storm.testing.TestWordSpout' -require 'examples/ruby_exclamation_bolt' +require 'examples/exclamation_bolt' -class RubyExclamationTopology +# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt + +class LocalExclamationTopology def start(base_class_path) builder = TopologyBuilder.new builder.setSpout(1, TestWordSpout.new, 10) - builder.setBolt(2, JRubyBolt.new(base_class_path, "RubyExclamationBolt"), 3).shuffleGrouping(1) - builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyExclamationBolt"), 2).shuffleGrouping(2) + builder.setBolt(2, JRubyBolt.new(base_class_path, "ExclamationBolt"), 3).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new(base_class_path, "ExclamationBolt"), 2).shuffleGrouping(2) conf = Config.new conf.setDebug(true) diff --git a/examples/ruby_exclamation_topology2.rb b/examples/local_exclamation_topology2.rb similarity index 67% rename from examples/ruby_exclamation_topology2.rb rename to examples/local_exclamation_topology2.rb index 87a4ae0..8c97b0c 100644 --- a/examples/ruby_exclamation_topology2.rb +++ b/examples/local_exclamation_topology2.rb @@ -1,6 +1,6 @@ java_import 'backtype.storm.testing.TestWordSpout' -class RubyExclamationBolt2 +class ExclamationBolt2 def prepare(conf, context, collector) @collector = collector end @@ -15,13 +15,15 @@ class RubyExclamationBolt2 end end -class RubyExclamationTopology2 +# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt + +class LocalExclamationTopology2 def start(base_class_path) builder = TopologyBuilder.new builder.setSpout(1, TestWordSpout.new, 10) - builder.setBolt(2, JRubyBolt.new(base_class_path, "RubyExclamationBolt2"), 3).shuffleGrouping(1) - builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyExclamationBolt2"), 2).shuffleGrouping(2) + builder.setBolt(2, JRubyBolt.new(base_class_path, "ExclamationBolt2"), 3).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new(base_class_path, "ExclamationBolt2"), 2).shuffleGrouping(2) conf = Config.new conf.setDebug(true) diff --git a/examples/ruby_redis_word_count_topology.rb b/examples/local_redis_word_count_topology.rb similarity index 60% rename from examples/ruby_redis_word_count_topology.rb rename to examples/local_redis_word_count_topology.rb index b510a74..1755393 100644 --- a/examples/ruby_redis_word_count_topology.rb +++ b/examples/local_redis_word_count_topology.rb @@ -1,9 +1,10 @@ require 'redis' require 'thread' +require 'examples/word_count_bolt' -# RubyRedisWordSpout reads the Redis queue "test" on localhost:6379 +# RedisWordSpout reads the Redis queue "test" on localhost:6379 # and emits each word items pop'ed from the queue. -class RubyRedisWordSpout +class RedisWordSpout def open(conf, context, collector) @collector = collector @q = Queue.new @@ -39,31 +40,11 @@ class RubyRedisWordSpout end end -class RubyRedisWordCount - def initialize - @counts = Hash.new{|h, k| h[k] = 0} - end - - def prepare(conf, context, collector) - @collector = collector - end - - def execute(tuple) - word = tuple.getString(0) - @counts[word] += 1 - @collector.emit(Values.new(word, @counts[word])) - end - - def declare_output_fields(declarer) - declarer.declare(Fields.new("word", "count")) - end -end - -class RubyRedisWordCountTopology +class LocalRedisWordCountTopology def start(base_class_path) builder = TopologyBuilder.new - builder.setSpout(1, JRubySpout.new(base_class_path, "RubyRedisWordSpout"), 1) - builder.setBolt(2, JRubyBolt.new(base_class_path, "RubyRedisWordCount"), 3).fieldsGrouping(1, Fields.new("word")) + builder.setSpout(1, JRubySpout.new(base_class_path, "RedisWordSpout"), 1) + builder.setBolt(2, JRubyBolt.new(base_class_path, "WordCountBolt"), 3).fieldsGrouping(1, Fields.new("word")) conf = Config.new conf.setDebug(true) diff --git a/examples/local_word_count_topology.rb b/examples/local_word_count_topology.rb new file mode 100644 index 0000000..65af1c7 --- /dev/null +++ b/examples/local_word_count_topology.rb @@ -0,0 +1,21 @@ +require 'examples/random_sentence_spout' +require 'examples/split_sentence_bolt' +require 'examples/word_count_bolt' + +class LocalWordCountTopology + def start(base_class_path) + builder = TopologyBuilder.new + builder.setSpout(1, JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5) + builder.setBolt(2, JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 8).shuffleGrouping(1) + builder.setBolt(3, JRubyBolt.new(base_class_path, "WordCountBolt"), 12).fieldsGrouping(2, Fields.new("word")) + + conf = Config.new + conf.setDebug(true) + conf.setMaxTaskParallelism(3) + + cluster = LocalCluster.new + cluster.submitTopology("word-count", conf, builder.createTopology) + sleep(5) + cluster.shutdown + end +end \ No newline at end of file diff --git a/examples/random_sentence_spout.rb b/examples/random_sentence_spout.rb new file mode 100644 index 0000000..0cf2c30 --- /dev/null +++ b/examples/random_sentence_spout.rb @@ -0,0 +1,26 @@ +class RandomSentenceSpout + attr_reader :is_distributed + + def initialize + @is_distributed = true + @sentences = [ + "the cow jumped over the moon", + "an apple a day keeps the doctor away", + "four score and seven years ago", + "snow white and the seven dwarfs", + "i am at two with nature" + ] + end + + def open(conf, context, collector) + @collector = collector + end + + def next_tuple + @collector.emit(Values.new(@sentences[rand(@sentences.length)])) + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new("word")) + end +end diff --git a/examples/ruby_word_count_topology.rb b/examples/ruby_word_count_topology.rb deleted file mode 100644 index 0b53a85..0000000 --- a/examples/ruby_word_count_topology.rb +++ /dev/null @@ -1,79 +0,0 @@ -class RubyRandomSentenceSpout - attr_reader :is_distributed - - def initialize - @is_distributed = true - @sentences = [ - "the cow jumped over the moon", - "an apple a day keeps the doctor away", - "four score and seven years ago", - "snow white and the seven dwarfs", - "i am at two with nature" - ] - end - - def open(conf, context, collector) - @collector = collector - end - - def next_tuple - @collector.emit(Values.new(@sentences[rand(@sentences.length)])) - end - - def declare_output_fields(declarer) - declarer.declare(Fields.new("word")) - end -end - -class RubySplitSentence - def prepare(conf, context, collector) - @collector = collector - end - - def execute(tuple) - tuple.getString(0).split(" ").each {|w| @collector.emit(Values.new(w)) } - end - - def declare_output_fields(declarer) - declarer.declare(Fields.new("word")) - end -end - -class RubyWordCount - def initialize - @counts = Hash.new{|h, k| h[k] = 0} - end - - def prepare(conf, context, collector) - @collector = collector - end - - def execute(tuple) - word = tuple.getString(0) - @counts[word] += 1 - @collector.emit(Values.new(word, @counts[word])) - end - - def declare_output_fields(declarer) - declarer.declare(Fields.new("word", "count")) - end -end - - -class RubyWordCountTopology - def start(base_class_path) - builder = TopologyBuilder.new - builder.setSpout(1, JRubySpout.new(base_class_path, "RubyRandomSentenceSpout"), 5) - builder.setBolt(2, JRubyBolt.new(base_class_path, "RubySplitSentence"), 8).shuffleGrouping(1) - builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyWordCount"), 12).fieldsGrouping(2, Fields.new("word")) - - conf = Config.new - conf.setDebug(true) - conf.setMaxTaskParallelism(3) - - cluster = LocalCluster.new - cluster.submitTopology("word-count", conf, builder.createTopology) - sleep(5) - cluster.shutdown - end -end \ No newline at end of file diff --git a/examples/split_sentence_bolt.rb b/examples/split_sentence_bolt.rb new file mode 100644 index 0000000..56a995a --- /dev/null +++ b/examples/split_sentence_bolt.rb @@ -0,0 +1,13 @@ +class SplitSentenceBolt + def prepare(conf, context, collector) + @collector = collector + end + + def execute(tuple) + tuple.getString(0).split(" ").each {|w| @collector.emit(Values.new(w)) } + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new("word")) + end +end diff --git a/examples/word_count_bolt.rb b/examples/word_count_bolt.rb new file mode 100644 index 0000000..f86498b --- /dev/null +++ b/examples/word_count_bolt.rb @@ -0,0 +1,19 @@ +class WordCountBolt + def initialize + @counts = Hash.new{|h, k| h[k] = 0} + end + + def prepare(conf, context, collector) + @collector = collector + end + + def execute(tuple) + word = tuple.getString(0) + @counts[word] += 1 + @collector.emit(Values.new(word, @counts[word])) + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new("word", "count")) + end +end diff --git a/lib/red_storm/application.rb b/lib/red_storm/application.rb index ede0d97..36b24c0 100644 --- a/lib/red_storm/application.rb +++ b/lib/red_storm/application.rb @@ -3,13 +3,18 @@ require 'rake' class RedStorm::Application def run(args) - load("#{RedStorm::REDSTORM_HOME}/Rakefile") - if args.size == 1 && File.exist?(args.first) + load("#{RedStorm::REDSTORM_HOME}/Rakefile") Rake::Task['launch'].invoke(args) else - Rake::Task[args.shift].invoke(args) + task = args.shift + if ["install", "examples", "jar"].include?(task) + load("#{RedStorm::REDSTORM_HOME}/Rakefile") + Rake::Task[task].invoke(args) + else + puts("\nUsage: redstorm install|examples|jar|topology_class_file_name") + exit(1) + end end end - end \ No newline at end of file diff --git a/lib/red_storm/topology_launcher.rb b/lib/red_storm/topology_launcher.rb index 7d749c8..68df2bf 100644 --- a/lib/red_storm/topology_launcher.rb +++ b/lib/red_storm/topology_launcher.rb @@ -31,7 +31,7 @@ class TopologyLauncher java_signature 'void main(String[])' def self.main(args) unless args.size > 0 - puts("Usage: redstorm topology_class_file") + puts("Usage: redstorm topology_class_file_name") exit(1) end class_path = args[0] diff --git a/lib/red_storm/version.rb b/lib/red_storm/version.rb index b69bcaa..674d98f 100644 --- a/lib/red_storm/version.rb +++ b/lib/red_storm/version.rb @@ -1,3 +1,3 @@ module RedStorm - VERSION = '0.1' + VERSION = '0.1.0' end \ No newline at end of file diff --git a/pom.xml b/pom.xml index 899638b..5dc6589 100644 --- a/pom.xml +++ b/pom.xml @@ -4,8 +4,8 @@ redstorm redstorm - 0.0.1 - RedStorm Storm JRuby bindings + 0.1.0 + RedStorm JRuby on Storm UTF-8