use a Redis reader thread
This commit is contained in:
parent
eb1c3254ef
commit
7f85fca4cb
|
@ -1,6 +1,7 @@
|
|||
require 'java'
|
||||
require 'rubygems'
|
||||
require 'redis'
|
||||
require 'thread'
|
||||
|
||||
java_import 'backtype.storm.Config'
|
||||
java_import 'backtype.storm.LocalCluster'
|
||||
|
@ -29,18 +30,37 @@ class RubyRedisWordSpout
|
|||
|
||||
def open(conf, context, collector)
|
||||
@collector = collector
|
||||
@redis = Redis.new(:host => "localhost", :port => 6379)
|
||||
|
||||
@q = Queue.new
|
||||
@redis_reader = detach_redis_reader
|
||||
end
|
||||
|
||||
def next_tuple
|
||||
if w = @redis.blpop("test", 0)
|
||||
@collector.emit(Values.new(w[1]))
|
||||
if @q.size > 0
|
||||
@collector.emit(Values.new(@q.pop))
|
||||
else
|
||||
sleep(0.1)
|
||||
end
|
||||
end
|
||||
|
||||
def declare_output_fields(declarer)
|
||||
declarer.declare(Fields.new("word"))
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def detach_redis_reader
|
||||
Thread.new do
|
||||
Thread.current.abort_on_exception = true
|
||||
|
||||
redis = Redis.new(:host => "localhost", :port => 6379)
|
||||
loop do
|
||||
if data = redis.blpop("test", 0)
|
||||
@q << data[1]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class RubyRedisWordCount
|
||||
|
|
Loading…
Reference in New Issue