all examples in RedStorm::Examples namespace and descriptive ids
This commit is contained in:
parent
f482f3124a
commit
0128e29279
|
@ -1,20 +1,25 @@
|
|||
require 'red_storm'
|
||||
require 'examples/native/random_sentence_spout'
|
||||
require 'examples/native/split_sentence_bolt'
|
||||
require 'examples/native/word_count_bolt'
|
||||
|
||||
class ClusterWordCountTopology
|
||||
RedStorm::Configuration.topology_class = self
|
||||
module RedStorm
|
||||
module Examples
|
||||
class ClusterWordCountTopology
|
||||
RedStorm::Configuration.topology_class = self
|
||||
|
||||
def start(base_class_path, env)
|
||||
builder = TopologyBuilder.new
|
||||
builder.setSpout('1', JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5)
|
||||
builder.setBolt('2', JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 4).shuffleGrouping('1')
|
||||
builder.setBolt('3', JRubyBolt.new(base_class_path, "WordCountBolt"), 4).fieldsGrouping('2', Fields.new("word"))
|
||||
def start(base_class_path, env)
|
||||
builder = TopologyBuilder.new
|
||||
builder.setSpout('RandomSentenceSpout', JRubySpout.new(base_class_path, "RedStorm::Examples::RandomSentenceSpout"), 5)
|
||||
builder.setBolt('SplitSentenceBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::SplitSentenceBolt"), 4).shuffleGrouping('RandomSentenceSpout')
|
||||
builder.setBolt('WordCountBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::WordCountBolt"), 4).fieldsGrouping('SplitSentenceBolt', Fields.new("word"))
|
||||
|
||||
conf = Config.new
|
||||
conf.setDebug(true)
|
||||
conf.setNumWorkers(20);
|
||||
conf.setMaxSpoutPending(1000);
|
||||
StormSubmitter.submitTopology("word-count", conf, builder.createTopology);
|
||||
conf = Config.new
|
||||
conf.setDebug(true)
|
||||
conf.setNumWorkers(20);
|
||||
conf.setMaxSpoutPending(1000);
|
||||
StormSubmitter.submitTopology("word_count", conf, builder.createTopology);
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,14 +1,18 @@
|
|||
class ExclamationBolt
|
||||
def prepare(conf, context, collector)
|
||||
@collector = collector
|
||||
end
|
||||
module RedStorm
|
||||
module Examples
|
||||
class ExclamationBolt
|
||||
def prepare(conf, context, collector)
|
||||
@collector = collector
|
||||
end
|
||||
|
||||
def execute(tuple)
|
||||
@collector.emit(tuple, Values.new(tuple.getString(0) + "!!!"))
|
||||
@collector.ack(tuple)
|
||||
end
|
||||
def execute(tuple)
|
||||
@collector.emit(tuple, Values.new(tuple.getString(0) + "!!!"))
|
||||
@collector.ack(tuple)
|
||||
end
|
||||
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word"))
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word"))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,25 +1,31 @@
|
|||
java_import 'backtype.storm.testing.TestWordSpout'
|
||||
|
||||
require 'lib/red_storm'
|
||||
require 'examples/native/exclamation_bolt'
|
||||
|
||||
# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt
|
||||
|
||||
class LocalExclamationTopology
|
||||
RedStorm::Configuration.topology_class = self
|
||||
module RedStorm
|
||||
module Examples
|
||||
class LocalExclamationTopology
|
||||
RedStorm::Configuration.topology_class = self
|
||||
|
||||
def start(base_class_path, env)
|
||||
builder = TopologyBuilder.new
|
||||
|
||||
builder.setSpout('1', TestWordSpout.new, 10)
|
||||
builder.setBolt('2', JRubyBolt.new(base_class_path, "ExclamationBolt"), 3).shuffleGrouping('1')
|
||||
builder.setBolt('3', JRubyBolt.new(base_class_path, "ExclamationBolt"), 2).shuffleGrouping('2')
|
||||
|
||||
conf = Config.new
|
||||
conf.setDebug(true)
|
||||
|
||||
cluster = LocalCluster.new
|
||||
cluster.submitTopology("test", conf, builder.createTopology)
|
||||
sleep(5)
|
||||
cluster.killTopology("test")
|
||||
cluster.shutdown
|
||||
def start(base_class_path, env)
|
||||
builder = TopologyBuilder.new
|
||||
|
||||
builder.setSpout('TestWordSpout', TestWordSpout.new, 10)
|
||||
builder.setBolt('ExclamationBolt1', JRubyBolt.new(base_class_path, 'RedStorm::Examples::ExclamationBolt'), 3).shuffleGrouping('TestWordSpout')
|
||||
builder.setBolt('ExclamationBolt2', JRubyBolt.new(base_class_path, 'RedStorm::Examples::ExclamationBolt'), 3).shuffleGrouping('ExclamationBolt1')
|
||||
|
||||
conf = Config.new
|
||||
conf.setDebug(true)
|
||||
|
||||
cluster = LocalCluster.new
|
||||
cluster.submitTopology("exclamation", conf, builder.createTopology)
|
||||
sleep(5)
|
||||
cluster.killTopology("exclamation")
|
||||
cluster.shutdown
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,39 +1,45 @@
|
|||
java_import 'backtype.storm.testing.TestWordSpout'
|
||||
|
||||
class ExclamationBolt2
|
||||
def prepare(conf, context, collector)
|
||||
@collector = collector
|
||||
end
|
||||
require 'lib/red_storm'
|
||||
|
||||
def execute(tuple)
|
||||
@collector.emit(tuple, Values.new(tuple.getString(0) + "!!!"))
|
||||
@collector.ack(tuple)
|
||||
end
|
||||
module RedStorm
|
||||
module Examples
|
||||
class ExclamationBolt2
|
||||
def prepare(conf, context, collector)
|
||||
@collector = collector
|
||||
end
|
||||
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word"))
|
||||
end
|
||||
end
|
||||
|
||||
# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt
|
||||
|
||||
class LocalExclamationTopology2
|
||||
RedStorm::Configuration.topology_class = self
|
||||
|
||||
def start(base_class_path, env)
|
||||
builder = TopologyBuilder.new
|
||||
|
||||
builder.setSpout('1', TestWordSpout.new, 10)
|
||||
builder.setBolt('2', JRubyBolt.new(base_class_path, "ExclamationBolt2"), 3).shuffleGrouping('1')
|
||||
builder.setBolt('3', JRubyBolt.new(base_class_path, "ExclamationBolt2"), 2).shuffleGrouping('2')
|
||||
|
||||
conf = Config.new
|
||||
conf.setDebug(true)
|
||||
|
||||
cluster = LocalCluster.new
|
||||
cluster.submitTopology("test", conf, builder.createTopology)
|
||||
sleep(5)
|
||||
cluster.killTopology("test")
|
||||
cluster.shutdown
|
||||
def execute(tuple)
|
||||
@collector.emit(tuple, Values.new("!#{tuple.getString(0)}!"))
|
||||
@collector.ack(tuple)
|
||||
end
|
||||
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word"))
|
||||
end
|
||||
end
|
||||
|
||||
# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt
|
||||
|
||||
class LocalExclamationTopology2
|
||||
RedStorm::Configuration.topology_class = self
|
||||
|
||||
def start(base_class_path, env)
|
||||
builder = TopologyBuilder.new
|
||||
|
||||
builder.setSpout('TestWordSpout', TestWordSpout.new, 10)
|
||||
builder.setBolt('ExclamationBolt21', JRubyBolt.new(base_class_path, "RedStorm::Examples::ExclamationBolt2"), 3).shuffleGrouping('TestWordSpout')
|
||||
builder.setBolt('ExclamationBolt22', JRubyBolt.new(base_class_path, "RedStorm::Examples::ExclamationBolt2"), 2).shuffleGrouping('ExclamationBolt21')
|
||||
|
||||
conf = Config.new
|
||||
conf.setDebug(true)
|
||||
|
||||
cluster = LocalCluster.new
|
||||
cluster.submitTopology("exclamation", conf, builder.createTopology)
|
||||
sleep(5)
|
||||
cluster.killTopology("exclamation")
|
||||
cluster.shutdown
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,60 +1,65 @@
|
|||
require 'redis'
|
||||
require 'thread'
|
||||
require 'lib/red_storm'
|
||||
require 'examples/native/word_count_bolt'
|
||||
|
||||
# RedisWordSpout reads the Redis queue "test" on localhost:6379
|
||||
# and emits each word items pop'ed from the queue.
|
||||
class RedisWordSpout
|
||||
def open(conf, context, collector)
|
||||
@collector = collector
|
||||
@q = Queue.new
|
||||
@redis_reader = detach_redis_reader
|
||||
end
|
||||
|
||||
def next_tuple
|
||||
# per doc nextTuple should not block, and sleep a bit when there's no data to process.
|
||||
if @q.size > 0
|
||||
@collector.emit(Values.new(@q.pop))
|
||||
else
|
||||
sleep(0.1)
|
||||
end
|
||||
end
|
||||
module RedStorm
|
||||
module Examples
|
||||
# RedisWordSpout reads the Redis queue "test" on localhost:6379
|
||||
# and emits each word items pop'ed from the queue.
|
||||
class RedisWordSpout
|
||||
def open(conf, context, collector)
|
||||
@collector = collector
|
||||
@q = Queue.new
|
||||
@redis_reader = detach_redis_reader
|
||||
end
|
||||
|
||||
def next_tuple
|
||||
# per doc nextTuple should not block, and sleep a bit when there's no data to process.
|
||||
if @q.size > 0
|
||||
@collector.emit(Values.new(@q.pop))
|
||||
else
|
||||
sleep(0.1)
|
||||
end
|
||||
end
|
||||
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word"))
|
||||
end
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word"))
|
||||
end
|
||||
|
||||
private
|
||||
private
|
||||
|
||||
def detach_redis_reader
|
||||
Thread.new do
|
||||
Thread.current.abort_on_exception = true
|
||||
def detach_redis_reader
|
||||
Thread.new do
|
||||
Thread.current.abort_on_exception = true
|
||||
|
||||
redis = Redis.new(:host => "localhost", :port => 6379)
|
||||
loop do
|
||||
if data = redis.blpop("test", 0)
|
||||
@q << data[1]
|
||||
redis = Redis.new(:host => "localhost", :port => 6379)
|
||||
loop do
|
||||
if data = redis.blpop("test", 0)
|
||||
@q << data[1]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class LocalRedisWordCountTopology
|
||||
RedStorm::Configuration.topology_class = self
|
||||
class LocalRedisWordCountTopology
|
||||
RedStorm::Configuration.topology_class = self
|
||||
|
||||
def start(base_class_path, env)
|
||||
builder = TopologyBuilder.new
|
||||
builder.setSpout('1', JRubySpout.new(base_class_path, "RedisWordSpout"), 1)
|
||||
builder.setBolt('2', JRubyBolt.new(base_class_path, "WordCountBolt"), 3).fieldsGrouping('1', Fields.new("word"))
|
||||
def start(base_class_path, env)
|
||||
builder = TopologyBuilder.new
|
||||
builder.setSpout('RedisWordSpout', JRubySpout.new(base_class_path, "RedStorm::Examples::RedisWordSpout"), 1)
|
||||
builder.setBolt('WordCountBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::WordCountBolt"), 3).fieldsGrouping('RedisWordSpout', Fields.new("word"))
|
||||
|
||||
conf = Config.new
|
||||
conf.setDebug(true)
|
||||
conf.setMaxTaskParallelism(3)
|
||||
conf = Config.new
|
||||
conf.setDebug(true)
|
||||
conf.setMaxTaskParallelism(3)
|
||||
|
||||
cluster = LocalCluster.new
|
||||
cluster.submitTopology("redis-word-count", conf, builder.createTopology)
|
||||
sleep(600)
|
||||
cluster.shutdown
|
||||
cluster = LocalCluster.new
|
||||
cluster.submitTopology("redis_word_count", conf, builder.createTopology)
|
||||
sleep(600)
|
||||
cluster.shutdown
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,3 +1,4 @@
|
|||
require 'lib/red_storm'
|
||||
require 'examples/native/random_sentence_spout'
|
||||
require 'examples/native/split_sentence_bolt'
|
||||
require 'examples/native/word_count_bolt'
|
||||
|
@ -9,16 +10,16 @@ module Examples
|
|||
|
||||
def start(base_class_path, env)
|
||||
builder = TopologyBuilder.new
|
||||
builder.setSpout('1', JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5)
|
||||
builder.setBolt('2', JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 8).shuffleGrouping('1')
|
||||
builder.setBolt('3', JRubyBolt.new(base_class_path, "WordCountBolt"), 12).fieldsGrouping('2', Fields.new("word"))
|
||||
builder.setSpout('RandomSentenceSpout', JRubySpout.new(base_class_path, "RedStorm::Examples::RandomSentenceSpout"), 5)
|
||||
builder.setBolt('SplitSentenceBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::SplitSentenceBolt"), 8).shuffleGrouping('RandomSentenceSpout')
|
||||
builder.setBolt('WordCountBolt', JRubyBolt.new(base_class_path, "RedStorm::Examples::WordCountBolt"), 12).fieldsGrouping('SplitSentenceBolt', Fields.new("word"))
|
||||
|
||||
conf = Config.new
|
||||
conf.setDebug(true)
|
||||
conf.setMaxTaskParallelism(3)
|
||||
|
||||
cluster = LocalCluster.new
|
||||
cluster.submitTopology("word-count", conf, builder.createTopology)
|
||||
cluster.submitTopology("word_count", conf, builder.createTopology)
|
||||
sleep(5)
|
||||
cluster.shutdown
|
||||
end
|
||||
|
|
|
@ -1,26 +1,30 @@
|
|||
class RandomSentenceSpout
|
||||
attr_reader :is_distributed
|
||||
module RedStorm
|
||||
module Examples
|
||||
class RandomSentenceSpout
|
||||
attr_reader :is_distributed
|
||||
|
||||
def initialize
|
||||
@is_distributed = true
|
||||
@sentences = [
|
||||
"the cow jumped over the moon",
|
||||
"an apple a day keeps the doctor away",
|
||||
"four score and seven years ago",
|
||||
"snow white and the seven dwarfs",
|
||||
"i am at two with nature"
|
||||
]
|
||||
end
|
||||
def initialize
|
||||
@is_distributed = true
|
||||
@sentences = [
|
||||
"the cow jumped over the moon",
|
||||
"an apple a day keeps the doctor away",
|
||||
"four score and seven years ago",
|
||||
"snow white and the seven dwarfs",
|
||||
"i am at two with nature"
|
||||
]
|
||||
end
|
||||
|
||||
def open(conf, context, collector)
|
||||
@collector = collector
|
||||
end
|
||||
|
||||
def next_tuple
|
||||
@collector.emit(Values.new(@sentences[rand(@sentences.length)]))
|
||||
end
|
||||
def open(conf, context, collector)
|
||||
@collector = collector
|
||||
end
|
||||
|
||||
def next_tuple
|
||||
@collector.emit(Values.new(@sentences[rand(@sentences.length)]))
|
||||
end
|
||||
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word"))
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word"))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,13 +1,17 @@
|
|||
class SplitSentenceBolt
|
||||
def prepare(conf, context, collector)
|
||||
@collector = collector
|
||||
end
|
||||
module RedStorm
|
||||
module Examples
|
||||
class SplitSentenceBolt
|
||||
def prepare(conf, context, collector)
|
||||
@collector = collector
|
||||
end
|
||||
|
||||
def execute(tuple)
|
||||
tuple.getString(0).split(" ").each {|w| @collector.emit(Values.new(w)) }
|
||||
end
|
||||
def execute(tuple)
|
||||
tuple.getString(0).split(" ").each {|w| @collector.emit(Values.new(w)) }
|
||||
end
|
||||
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word"))
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word"))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,19 +1,23 @@
|
|||
class WordCountBolt
|
||||
def initialize
|
||||
@counts = Hash.new{|h, k| h[k] = 0}
|
||||
end
|
||||
module RedStorm
|
||||
module Examples
|
||||
class WordCountBolt
|
||||
def initialize
|
||||
@counts = Hash.new{|h, k| h[k] = 0}
|
||||
end
|
||||
|
||||
def prepare(conf, context, collector)
|
||||
@collector = collector
|
||||
end
|
||||
def prepare(conf, context, collector)
|
||||
@collector = collector
|
||||
end
|
||||
|
||||
def execute(tuple)
|
||||
word = tuple.getString(0)
|
||||
@counts[word] += 1
|
||||
@collector.emit(Values.new(word, @counts[word]))
|
||||
end
|
||||
def execute(tuple)
|
||||
word = tuple.getString(0)
|
||||
@counts[word] += 1
|
||||
@collector.emit(Values.new(word, @counts[word]))
|
||||
end
|
||||
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word", "count"))
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word", "count"))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
require 'red_storm'
|
||||
|
||||
class ExclamationBolt < RedStorm::SimpleBolt
|
||||
output_fields :word
|
||||
on_receive(:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"}
|
||||
module RedStorm
|
||||
module Examples
|
||||
class ExclamationBolt < RedStorm::SimpleBolt
|
||||
output_fields :word
|
||||
on_receive(:ack => true, :anchor => true) {|tuple| tuple.getString(0) + "!!!"}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -4,33 +4,36 @@ require 'examples/simple/exclamation_bolt'
|
|||
|
||||
# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt
|
||||
|
||||
class ExclamationTopology < RedStorm::SimpleTopology
|
||||
spout TestWordSpout, :parallelism => 10
|
||||
|
||||
bolt ExclamationBolt, :parallelism => 3 do
|
||||
source TestWordSpout, :shuffle
|
||||
end
|
||||
|
||||
bolt ExclamationBolt, :id => :ignore, :parallelism => 2 do
|
||||
source ExclamationBolt, :shuffle
|
||||
end
|
||||
module RedStorm
|
||||
module Examples
|
||||
class ExclamationTopology < RedStorm::SimpleTopology
|
||||
spout TestWordSpout, :parallelism => 10
|
||||
|
||||
bolt ExclamationBolt, :parallelism => 3 do
|
||||
source TestWordSpout, :shuffle
|
||||
end
|
||||
|
||||
bolt ExclamationBolt, :id => :ExclamationBolt2, :parallelism => 2 do
|
||||
source ExclamationBolt, :shuffle
|
||||
end
|
||||
|
||||
configure do |env|
|
||||
case env
|
||||
when :local
|
||||
debug true
|
||||
max_task_parallelism 3
|
||||
when :cluster
|
||||
debug true
|
||||
num_workers 20
|
||||
max_spout_pending(1000);
|
||||
end
|
||||
end
|
||||
configure do |env|
|
||||
debug true
|
||||
case env
|
||||
when :local
|
||||
max_task_parallelism 3
|
||||
when :cluster
|
||||
num_workers 20
|
||||
max_spout_pending(1000);
|
||||
end
|
||||
end
|
||||
|
||||
on_submit do |env|
|
||||
if env == :local
|
||||
sleep(5)
|
||||
cluster.shutdown
|
||||
on_submit do |env|
|
||||
if env == :local
|
||||
sleep(5)
|
||||
cluster.shutdown
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -4,38 +4,41 @@ require 'red_storm'
|
|||
# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt
|
||||
# and a locally defined ExclamationBolt
|
||||
|
||||
class ExclamationBolt < RedStorm::SimpleBolt
|
||||
output_fields :word
|
||||
on_receive(:ack => true, :anchor => true) {|tuple| "!#{tuple.getString(0)}!"}
|
||||
end
|
||||
|
||||
class ExclamationTopology2 < RedStorm::SimpleTopology
|
||||
spout TestWordSpout, :parallelism => 10
|
||||
|
||||
bolt ExclamationBolt, :parallelism => 3 do
|
||||
source TestWordSpout, :shuffle
|
||||
end
|
||||
|
||||
bolt ExclamationBolt, :id => :ignore, :parallelism => 2 do
|
||||
source ExclamationBolt, :shuffle
|
||||
end
|
||||
|
||||
configure do |env|
|
||||
case env
|
||||
when :local
|
||||
debug true
|
||||
max_task_parallelism 3
|
||||
when :cluster
|
||||
debug true
|
||||
num_workers 20
|
||||
max_spout_pending(1000);
|
||||
module RedStorm
|
||||
module Examples
|
||||
class ExclamationBolt < RedStorm::SimpleBolt
|
||||
output_fields :word
|
||||
on_receive(:ack => true, :anchor => true) {|tuple| "!#{tuple.getString(0)}!"}
|
||||
end
|
||||
end
|
||||
|
||||
on_submit do |env|
|
||||
if env == :local
|
||||
sleep(5)
|
||||
cluster.shutdown
|
||||
class ExclamationTopology2 < RedStorm::SimpleTopology
|
||||
spout TestWordSpout, :parallelism => 10
|
||||
|
||||
bolt ExclamationBolt, :parallelism => 3 do
|
||||
source TestWordSpout, :shuffle
|
||||
end
|
||||
|
||||
bolt ExclamationBolt, :id => :ExclamationBolt2, :parallelism => 2 do
|
||||
source ExclamationBolt, :shuffle
|
||||
end
|
||||
|
||||
configure do |env|
|
||||
debug true
|
||||
case env
|
||||
when :local
|
||||
max_task_parallelism 3
|
||||
when :cluster
|
||||
num_workers 20
|
||||
max_spout_pending(1000);
|
||||
end
|
||||
end
|
||||
|
||||
on_submit do |env|
|
||||
if env == :local
|
||||
sleep(5)
|
||||
cluster.shutdown
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,18 +1,22 @@
|
|||
require 'red_storm'
|
||||
|
||||
class RandomSentenceSpout < RedStorm::SimpleSpout
|
||||
set :is_distributed => true
|
||||
output_fields :word
|
||||
module RedStorm
|
||||
module Examples
|
||||
class RandomSentenceSpout < RedStorm::SimpleSpout
|
||||
set :is_distributed => true
|
||||
output_fields :word
|
||||
|
||||
on_send {@sentences[rand(@sentences.length)]}
|
||||
on_send {@sentences[rand(@sentences.length)]}
|
||||
|
||||
on_init do
|
||||
@sentences = [
|
||||
"the cow jumped over the moon",
|
||||
"an apple a day keeps the doctor away",
|
||||
"four score and seven years ago",
|
||||
"snow white and the seven dwarfs",
|
||||
"i am at two with nature"
|
||||
]
|
||||
on_init do
|
||||
@sentences = [
|
||||
"the cow jumped over the moon",
|
||||
"an apple a day keeps the doctor away",
|
||||
"four score and seven years ago",
|
||||
"snow white and the seven dwarfs",
|
||||
"i am at two with nature"
|
||||
]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -5,51 +5,55 @@ require 'red_storm'
|
|||
|
||||
require 'examples/simple/word_count_bolt'
|
||||
|
||||
# RedisWordSpout reads the Redis queue "test" on localhost:6379
|
||||
# and emits each word items pop'ed from the queue.
|
||||
module RedStorm
|
||||
module Examples
|
||||
|
||||
class RedisWordSpout < RedStorm::SimpleSpout
|
||||
output_fields :word
|
||||
# RedisWordSpout reads the Redis queue "test" on localhost:6379
|
||||
# and emits each word items pop'ed from the queue.
|
||||
|
||||
on_send {@q.pop if @q.size > 0}
|
||||
class RedisWordSpout < RedStorm::SimpleSpout
|
||||
output_fields :word
|
||||
|
||||
on_init do
|
||||
@q = Queue.new
|
||||
@redis_reader = detach_redis_reader
|
||||
end
|
||||
|
||||
private
|
||||
on_send {@q.pop if @q.size > 0}
|
||||
|
||||
def detach_redis_reader
|
||||
Thread.new do
|
||||
Thread.current.abort_on_exception = true
|
||||
on_init do
|
||||
@q = Queue.new
|
||||
@redis_reader = detach_redis_reader
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
redis = Redis.new(:host => "localhost", :port => 6379)
|
||||
loop do
|
||||
if data = redis.blpop("test", 0)
|
||||
@q << data[1]
|
||||
def detach_redis_reader
|
||||
Thread.new do
|
||||
Thread.current.abort_on_exception = true
|
||||
|
||||
redis = Redis.new(:host => "localhost", :port => 6379)
|
||||
loop do
|
||||
if data = redis.blpop("test", 0)
|
||||
@q << data[1]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class RedisWordCountTopology < RedStorm::SimpleTopology
|
||||
spout RedisWordSpout
|
||||
|
||||
bolt WordCountBolt, :parallelism => 3 do
|
||||
source RedisWordSpout, :fields => ["word"]
|
||||
end
|
||||
|
||||
configure do |env|
|
||||
debug true
|
||||
case env
|
||||
when :local
|
||||
max_task_parallelism 3
|
||||
when :cluster
|
||||
num_workers 20
|
||||
max_spout_pending(1000);
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class RedisWordCountTopology < RedStorm::SimpleTopology
|
||||
spout RedisWordSpout
|
||||
|
||||
bolt WordCountBolt, :parallelism => 3 do
|
||||
source RedisWordSpout, :fields => ["word"]
|
||||
end
|
||||
|
||||
configure do |env|
|
||||
case env
|
||||
when :local
|
||||
debug true
|
||||
max_task_parallelism 3
|
||||
when :cluster
|
||||
debug true
|
||||
num_workers 20
|
||||
max_spout_pending(1000);
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,29 +1,33 @@
|
|||
require 'red_storm'
|
||||
|
||||
class SplitSentenceBolt < RedStorm::SimpleBolt
|
||||
output_fields :word
|
||||
module RedStorm
|
||||
module Examples
|
||||
class SplitSentenceBolt < RedStorm::SimpleBolt
|
||||
output_fields :word
|
||||
|
||||
# block declaration style using auto-emit (default)
|
||||
#
|
||||
on_receive {|tuple| tuple.getString(0).split(' ').map{|w| [w]}}
|
||||
# block declaration style using auto-emit (default)
|
||||
#
|
||||
on_receive {|tuple| tuple.getString(0).split(' ').map{|w| [w]}}
|
||||
|
||||
# block declaration style no auto-emit
|
||||
#
|
||||
# on_receive :emit => false do |tuple|
|
||||
# tuple.getString(0).split(' ').each{|w| unanchored_emit(w)}
|
||||
# end
|
||||
# block declaration style no auto-emit
|
||||
#
|
||||
# on_receive :emit => false do |tuple|
|
||||
# tuple.getString(0).split(' ').each{|w| unanchored_emit(w)}
|
||||
# end
|
||||
|
||||
# alternate declaration style using on_receive method
|
||||
#
|
||||
# on_receive :emit => true
|
||||
# def on_receive(tuple)
|
||||
# tuple.getString(0).split(' ').map{|w| [w]}
|
||||
# end
|
||||
# alternate declaration style using on_receive method
|
||||
#
|
||||
# on_receive :emit => true
|
||||
# def on_receive(tuple)
|
||||
# tuple.getString(0).split(' ').map{|w| [w]}
|
||||
# end
|
||||
|
||||
# alternate declaration style using any specific method
|
||||
#
|
||||
# on_receive :my_method, :emit => true
|
||||
# def my_method(tuple)
|
||||
# tuple.getString(0).split(' ').map{|w| [w]}
|
||||
# end
|
||||
# alternate declaration style using any specific method
|
||||
#
|
||||
# on_receive :my_method, :emit => true
|
||||
# def my_method(tuple)
|
||||
# tuple.getString(0).split(' ').map{|w| [w]}
|
||||
# end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,15 +1,19 @@
|
|||
require 'red_storm'
|
||||
|
||||
class WordCountBolt < RedStorm::SimpleBolt
|
||||
output_fields :word, :count
|
||||
on_init {@counts = Hash.new{|h, k| h[k] = 0}}
|
||||
module RedStorm
|
||||
module Examples
|
||||
class WordCountBolt < RedStorm::SimpleBolt
|
||||
output_fields :word, :count
|
||||
on_init {@counts = Hash.new{|h, k| h[k] = 0}}
|
||||
|
||||
# block declaration style using auto-emit (default)
|
||||
#
|
||||
on_receive do |tuple|
|
||||
word = tuple.getString(0)
|
||||
@counts[word] += 1
|
||||
# block declaration style using auto-emit (default)
|
||||
#
|
||||
on_receive do |tuple|
|
||||
word = tuple.getString(0)
|
||||
@counts[word] += 1
|
||||
|
||||
[word, @counts[word]]
|
||||
[word, @counts[word]]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -3,33 +3,34 @@ require 'examples/simple/split_sentence_bolt'
|
|||
require 'examples/simple/word_count_bolt'
|
||||
|
||||
module RedStorm
|
||||
class WordCountTopology < SimpleTopology
|
||||
spout RandomSentenceSpout, :parallelism => 5
|
||||
|
||||
bolt SplitSentenceBolt, :parallelism => 8 do
|
||||
source RandomSentenceSpout, :shuffle
|
||||
end
|
||||
|
||||
bolt WordCountBolt, :parallelism => 12 do
|
||||
source SplitSentenceBolt, :fields => ["word"]
|
||||
end
|
||||
|
||||
configure :word_count do |env|
|
||||
case env
|
||||
when :local
|
||||
debug true
|
||||
max_task_parallelism 3
|
||||
when :cluster
|
||||
debug true
|
||||
num_workers 20
|
||||
max_spout_pending(1000);
|
||||
module Examples
|
||||
class WordCountTopology < SimpleTopology
|
||||
spout RandomSentenceSpout, :parallelism => 5
|
||||
|
||||
bolt SplitSentenceBolt, :parallelism => 8 do
|
||||
source RandomSentenceSpout, :shuffle
|
||||
end
|
||||
|
||||
bolt WordCountBolt, :parallelism => 12 do
|
||||
source SplitSentenceBolt, :fields => ["word"]
|
||||
end
|
||||
end
|
||||
|
||||
on_submit do |env|
|
||||
if env == :local
|
||||
sleep(5)
|
||||
cluster.shutdown
|
||||
configure :word_count do |env|
|
||||
debug true
|
||||
case env
|
||||
when :local
|
||||
max_task_parallelism 3
|
||||
when :cluster
|
||||
num_workers 20
|
||||
max_spout_pending(1000);
|
||||
end
|
||||
end
|
||||
|
||||
on_submit do |env|
|
||||
if env == :local
|
||||
sleep(5)
|
||||
cluster.shutdown
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue