examples & cosmetic cleanup

This commit is contained in:
Colin Surprenant 2011-11-07 15:43:46 -05:00
parent 59577a8cd1
commit 6aa739338b
15 changed files with 124 additions and 187 deletions

View File

@ -15,6 +15,7 @@ TARGET_SRC_DIR = "#{TARGET_DIR}/src"
TARGET_CLASSES_DIR = "#{TARGET_DIR}/classes"
TARGET_DEPENDENCY_DIR = "#{TARGET_DIR}/dependency"
TARGET_DEPENDENCY_UNPACKED_DIR = "#{TARGET_DIR}/dependency-unpacked"
TARGET_CLUSTER_JAR = "#{TARGET_DIR}/cluster-topology.jar"
JAVA_SRC_DIR = "#{RedStorm::REDSTORM_HOME}/src/main"
JRUBY_SRC_DIR = "#{RedStorm::REDSTORM_HOME}/lib/red_storm"
@ -46,14 +47,16 @@ task :setup do
end
end
task :install => [:deps, :build]
task :install => [:deps, :build] do
puts("\nRedStorm install completed. All dependencies installed in #{TARGET_DIR}")
end
task :unpack do
system("rmvn dependency:unpack -f #{RedStorm::REDSTORM_HOME}/pom.xml -DoutputDirectory=#{TARGET_DEPENDENCY_UNPACKED_DIR}")
end
task :jar => [:unpack, :clean_jar] do
ant.jar :destfile => "#{TARGET_DIR}/cluster-topology.jar" do
ant.jar :destfile => TARGET_CLUSTER_JAR do
fileset :dir => TARGET_CLASSES_DIR
fileset :dir => TARGET_DEPENDENCY_UNPACKED_DIR
fileset :dir => CWD do
@ -63,6 +66,7 @@ task :jar => [:unpack, :clean_jar] do
attribute :name => "Main-Class", :value => "redstorm.TopologyLauncher"
end
end
puts("\nRedStorm jar completed. Generated jar file #{TARGET_CLUSTER_JAR}")
end
task :examples do
@ -78,6 +82,7 @@ task :examples do
puts("copying examples into #{DST_EXAMPLES}")
system("mkdir #{DST_EXAMPLES}")
system("cp -r #{SRC_EXAMPLES}/* #{DST_EXAMPLES}")
puts("\nRedStorm examples completed. All examples copied in #{DST_EXAMPLES}")
end
task :deps do

View File

@ -1,71 +1,13 @@
class RubyRandomSentenceSpout
attr_reader :is_distributed
def initialize
@is_distributed = true
@sentences = [
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
]
end
def open(conf, context, collector)
@collector = collector
end
def next_tuple
@collector.emit(Values.new(@sentences[rand(@sentences.length)]))
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word"))
end
end
class RubySplitSentence
def prepare(conf, context, collector)
@collector = collector
end
def execute(tuple)
tuple.getString(0).split(" ").each {|w| @collector.emit(Values.new(w)) }
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word"))
end
end
class RubyWordCount
def initialize
@counts = Hash.new{|h, k| h[k] = 0}
end
def prepare(conf, context, collector)
@collector = collector
end
def execute(tuple)
word = tuple.getString(0)
@counts[word] += 1
@collector.emit(Values.new(word, @counts[word]))
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word", "count"))
end
end
require 'examples/random_sentence_spout'
require 'examples/split_sentence_bolt'
require 'examples/word_count_bolt'
class ClusterWordCountTopology
def start(base_class_path)
builder = TopologyBuilder.new
builder.setSpout(1, JRubySpout.new(base_class_path, "RubyRandomSentenceSpout"), 5)
builder.setBolt(2, JRubyBolt.new(base_class_path, "RubySplitSentence"), 4).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyWordCount"), 4).fieldsGrouping(2, Fields.new("word"))
builder.setSpout(1, JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5)
builder.setBolt(2, JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 4).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new(base_class_path, "WordCountBolt"), 4).fieldsGrouping(2, Fields.new("word"))
conf = Config.new
conf.setDebug(true)

View File

@ -1,4 +1,4 @@
class RubyExclamationBolt
class ExclamationBolt
def prepare(conf, context, collector)
@collector = collector
end

View File

@ -1,13 +1,15 @@
java_import 'backtype.storm.testing.TestWordSpout'
require 'examples/ruby_exclamation_bolt'
require 'examples/exclamation_bolt'
class RubyExclamationTopology
# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt
class LocalExclamationTopology
def start(base_class_path)
builder = TopologyBuilder.new
builder.setSpout(1, TestWordSpout.new, 10)
builder.setBolt(2, JRubyBolt.new(base_class_path, "RubyExclamationBolt"), 3).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyExclamationBolt"), 2).shuffleGrouping(2)
builder.setBolt(2, JRubyBolt.new(base_class_path, "ExclamationBolt"), 3).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new(base_class_path, "ExclamationBolt"), 2).shuffleGrouping(2)
conf = Config.new
conf.setDebug(true)

View File

@ -1,6 +1,6 @@
java_import 'backtype.storm.testing.TestWordSpout'
class RubyExclamationBolt2
class ExclamationBolt2
def prepare(conf, context, collector)
@collector = collector
end
@ -15,13 +15,15 @@ class RubyExclamationBolt2
end
end
class RubyExclamationTopology2
# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt
class LocalExclamationTopology2
def start(base_class_path)
builder = TopologyBuilder.new
builder.setSpout(1, TestWordSpout.new, 10)
builder.setBolt(2, JRubyBolt.new(base_class_path, "RubyExclamationBolt2"), 3).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyExclamationBolt2"), 2).shuffleGrouping(2)
builder.setBolt(2, JRubyBolt.new(base_class_path, "ExclamationBolt2"), 3).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new(base_class_path, "ExclamationBolt2"), 2).shuffleGrouping(2)
conf = Config.new
conf.setDebug(true)

View File

@ -1,9 +1,10 @@
require 'redis'
require 'thread'
require 'examples/word_count_bolt'
# RubyRedisWordSpout reads the Redis queue "test" on localhost:6379
# RedisWordSpout reads the Redis queue "test" on localhost:6379
# and emits each word items pop'ed from the queue.
class RubyRedisWordSpout
class RedisWordSpout
def open(conf, context, collector)
@collector = collector
@q = Queue.new
@ -39,31 +40,11 @@ class RubyRedisWordSpout
end
end
class RubyRedisWordCount
def initialize
@counts = Hash.new{|h, k| h[k] = 0}
end
def prepare(conf, context, collector)
@collector = collector
end
def execute(tuple)
word = tuple.getString(0)
@counts[word] += 1
@collector.emit(Values.new(word, @counts[word]))
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word", "count"))
end
end
class RubyRedisWordCountTopology
class LocalRedisWordCountTopology
def start(base_class_path)
builder = TopologyBuilder.new
builder.setSpout(1, JRubySpout.new(base_class_path, "RubyRedisWordSpout"), 1)
builder.setBolt(2, JRubyBolt.new(base_class_path, "RubyRedisWordCount"), 3).fieldsGrouping(1, Fields.new("word"))
builder.setSpout(1, JRubySpout.new(base_class_path, "RedisWordSpout"), 1)
builder.setBolt(2, JRubyBolt.new(base_class_path, "WordCountBolt"), 3).fieldsGrouping(1, Fields.new("word"))
conf = Config.new
conf.setDebug(true)

View File

@ -0,0 +1,21 @@
require 'examples/random_sentence_spout'
require 'examples/split_sentence_bolt'
require 'examples/word_count_bolt'
class LocalWordCountTopology
def start(base_class_path)
builder = TopologyBuilder.new
builder.setSpout(1, JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5)
builder.setBolt(2, JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 8).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new(base_class_path, "WordCountBolt"), 12).fieldsGrouping(2, Fields.new("word"))
conf = Config.new
conf.setDebug(true)
conf.setMaxTaskParallelism(3)
cluster = LocalCluster.new
cluster.submitTopology("word-count", conf, builder.createTopology)
sleep(5)
cluster.shutdown
end
end

View File

@ -0,0 +1,26 @@
class RandomSentenceSpout
attr_reader :is_distributed
def initialize
@is_distributed = true
@sentences = [
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
]
end
def open(conf, context, collector)
@collector = collector
end
def next_tuple
@collector.emit(Values.new(@sentences[rand(@sentences.length)]))
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word"))
end
end

View File

@ -1,79 +0,0 @@
class RubyRandomSentenceSpout
attr_reader :is_distributed
def initialize
@is_distributed = true
@sentences = [
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
]
end
def open(conf, context, collector)
@collector = collector
end
def next_tuple
@collector.emit(Values.new(@sentences[rand(@sentences.length)]))
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word"))
end
end
class RubySplitSentence
def prepare(conf, context, collector)
@collector = collector
end
def execute(tuple)
tuple.getString(0).split(" ").each {|w| @collector.emit(Values.new(w)) }
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word"))
end
end
class RubyWordCount
def initialize
@counts = Hash.new{|h, k| h[k] = 0}
end
def prepare(conf, context, collector)
@collector = collector
end
def execute(tuple)
word = tuple.getString(0)
@counts[word] += 1
@collector.emit(Values.new(word, @counts[word]))
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word", "count"))
end
end
class RubyWordCountTopology
def start(base_class_path)
builder = TopologyBuilder.new
builder.setSpout(1, JRubySpout.new(base_class_path, "RubyRandomSentenceSpout"), 5)
builder.setBolt(2, JRubyBolt.new(base_class_path, "RubySplitSentence"), 8).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyWordCount"), 12).fieldsGrouping(2, Fields.new("word"))
conf = Config.new
conf.setDebug(true)
conf.setMaxTaskParallelism(3)
cluster = LocalCluster.new
cluster.submitTopology("word-count", conf, builder.createTopology)
sleep(5)
cluster.shutdown
end
end

View File

@ -0,0 +1,13 @@
class SplitSentenceBolt
def prepare(conf, context, collector)
@collector = collector
end
def execute(tuple)
tuple.getString(0).split(" ").each {|w| @collector.emit(Values.new(w)) }
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word"))
end
end

View File

@ -0,0 +1,19 @@
class WordCountBolt
def initialize
@counts = Hash.new{|h, k| h[k] = 0}
end
def prepare(conf, context, collector)
@collector = collector
end
def execute(tuple)
word = tuple.getString(0)
@counts[word] += 1
@collector.emit(Values.new(word, @counts[word]))
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word", "count"))
end
end

View File

@ -3,13 +3,18 @@ require 'rake'
class RedStorm::Application
def run(args)
load("#{RedStorm::REDSTORM_HOME}/Rakefile")
if args.size == 1 && File.exist?(args.first)
load("#{RedStorm::REDSTORM_HOME}/Rakefile")
Rake::Task['launch'].invoke(args)
else
Rake::Task[args.shift].invoke(args)
task = args.shift
if ["install", "examples", "jar"].include?(task)
load("#{RedStorm::REDSTORM_HOME}/Rakefile")
Rake::Task[task].invoke(args)
else
puts("\nUsage: redstorm install|examples|jar|topology_class_file_name")
exit(1)
end
end
end
end

View File

@ -31,7 +31,7 @@ class TopologyLauncher
java_signature 'void main(String[])'
def self.main(args)
unless args.size > 0
puts("Usage: redstorm topology_class_file")
puts("Usage: redstorm topology_class_file_name")
exit(1)
end
class_path = args[0]

View File

@ -1,3 +1,3 @@
module RedStorm
VERSION = '0.1'
VERSION = '0.1.0'
end

View File

@ -4,8 +4,8 @@
<groupId>redstorm</groupId>
<artifactId>redstorm</artifactId>
<version>0.0.1</version>
<name>RedStorm Storm JRuby bindings</name>
<version>0.1.0</version>
<name>RedStorm JRuby on Storm</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>