added shell topology example

This commit is contained in:
Colin Surprenant 2012-07-12 17:03:55 -04:00
parent df9795a9ac
commit bbbc8c3982
3 changed files with 256 additions and 0 deletions

View File

@ -0,0 +1,9 @@
import storm
class SplitSentenceBolt(storm.BasicBolt):
def process(self, tup):
words = tup.values[0].split(" ")
for word in words:
storm.emit([word])
SplitSentenceBolt().run()

View File

@ -0,0 +1,206 @@
import sys
import os
import traceback
from collections import deque
try:
import simplejson as json
except ImportError:
import json
json_encode = lambda x: json.dumps(x)
json_decode = lambda x: json.loads(x)
#reads lines and reconstructs newlines appropriately
def readMsg():
msg = ""
while True:
line = sys.stdin.readline()[0:-1]
if line == "end":
break
msg = msg + line + "\n"
return json_decode(msg[0:-1])
MODE = None
ANCHOR_TUPLE = None
#queue up commands we read while trying to read taskids
pending_commands = deque()
def readTaskIds():
if pending_taskids:
return pending_taskids.popleft()
else:
msg = readMsg()
while type(msg) is not list:
pending_commands.append(msg)
msg = readMsg()
return msg
#queue up taskids we read while trying to read commands/tuples
pending_taskids = deque()
def readCommand():
if pending_commands:
return pending_commands.popleft()
else:
msg = readMsg()
while type(msg) is list:
pending_taskids.append(msg)
msg = readMsg()
return msg
def readTuple():
cmd = readCommand()
return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
def sendMsgToParent(msg):
print json_encode(msg)
print "end"
sys.stdout.flush()
def sync():
sendMsgToParent({'command':'sync'})
def sendpid(heartbeatdir):
pid = os.getpid()
sendMsgToParent({'pid':pid})
open(heartbeatdir + "/" + str(pid), "w").close()
def emit(*args, **kwargs):
__emit(*args, **kwargs)
return readTaskIds()
def emitDirect(task, *args, **kwargs):
kwargs[directTask] = task
__emit(*args, **kwargs)
def __emit(*args, **kwargs):
global MODE
if MODE == Bolt:
emitBolt(*args, **kwargs)
elif MODE == Spout:
emitSpout(*args, **kwargs)
def emitBolt(tup, stream=None, anchors = [], directTask=None):
global ANCHOR_TUPLE
if ANCHOR_TUPLE is not None:
anchors = [ANCHOR_TUPLE]
m = {"command": "emit"}
if stream is not None:
m["stream"] = stream
m["anchors"] = map(lambda a: a.id, anchors)
if directTask is not None:
m["task"] = directTask
m["tuple"] = tup
sendMsgToParent(m)
def emitSpout(tup, stream=None, id=None, directTask=None):
m = {"command": "emit"}
if id is not None:
m["id"] = id
if stream is not None:
m["stream"] = stream
if directTask is not None:
m["task"] = directTask
m["tuple"] = tup
sendMsgToParent(m)
def ack(tup):
sendMsgToParent({"command": "ack", "id": tup.id})
def fail(tup):
sendMsgToParent({"command": "fail", "id": tup.id})
def log(msg):
sendMsgToParent({"command": "log", "msg": msg})
def initComponent():
setupInfo = readMsg()
sendpid(setupInfo['pidDir'])
return [setupInfo['conf'], setupInfo['context']]
class Tuple:
def __init__(self, id, component, stream, task, values):
self.id = id
self.component = component
self.stream = stream
self.task = task
self.values = values
def __repr__(self):
return '<%s%s>' % (
self.__class__.__name__,
''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
class Bolt:
def initialize(self, stormconf, context):
pass
def process(self, tuple):
pass
def run(self):
global MODE
MODE = Bolt
conf, context = initComponent()
self.initialize(conf, context)
try:
while True:
tup = readTuple()
self.process(tup)
except Exception, e:
log(traceback.format_exc(e))
class BasicBolt:
def initialize(self, stormconf, context):
pass
def process(self, tuple):
pass
def run(self):
global MODE
MODE = Bolt
global ANCHOR_TUPLE
conf, context = initComponent()
self.initialize(conf, context)
try:
while True:
tup = readTuple()
ANCHOR_TUPLE = tup
self.process(tup)
ack(tup)
except Exception, e:
log(traceback.format_exc(e))
class Spout:
def initialize(self, conf, context):
pass
def ack(self, id):
pass
def fail(self, id):
pass
def nextTuple(self):
pass
def run(self):
global MODE
MODE = Spout
conf, context = initComponent()
self.initialize(conf, context)
try:
while True:
msg = readCommand()
if msg["command"] == "next":
self.nextTuple()
if msg["command"] == "ack":
self.ack(msg["id"])
if msg["command"] == "fail":
self.fail(msg["id"])
sync()
except Exception, e:
log(traceback.format_exc(e))

View File

@ -0,0 +1,41 @@
require 'red_storm'
require 'thread'
java_import 'redstorm.storm.jruby.JRubyShellBolt'
class SimpleSpout < RedStorm::SimpleSpout
on_init do
@q = Queue.new
@q << "the quick red fox"
end
on_send do
# avoid putting the thread to sleep endlessly on @q.pop which will prevent local cluster.shutdown
sleep(1)
@q.pop unless @q.empty?
end
end
class ShellTopology < RedStorm::SimpleTopology
spout SimpleSpout do
output_fields :string
end
bolt JRubyShellBolt, ["python", "splitsentence.py"] do
output_fields "word"
source SimpleSpout, :shuffle
end
configure do |env|
debug true
end
on_submit do |env|
case env
when :local
sleep(10)
cluster.shutdown
end
end
end