support for separape jvm execution of real cluster

This commit is contained in:
Colin Surprenant 2011-11-03 13:52:41 -04:00
parent 66358e08a5
commit be6483c257
10 changed files with 59 additions and 52 deletions

View File

@ -2,12 +2,12 @@ java_import 'backtype.storm.testing.TestWordSpout'
require 'examples/ruby_exclamation_bolt'
class RubyExclamationTopology
def start
def start(base_class_path)
builder = TopologyBuilder.new
builder.setSpout(1, TestWordSpout.new, 10)
builder.setBolt(2, JRubyBolt.new("RubyExclamationBolt"), 3).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new("RubyExclamationBolt"), 2).shuffleGrouping(2)
builder.setBolt(2, JRubyBolt.new(base_class_path, "RubyExclamationBolt"), 3).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyExclamationBolt"), 2).shuffleGrouping(2)
conf = Config.new
conf.setDebug(true)

View File

@ -16,12 +16,12 @@ class RubyExclamationBolt2
end
class RubyExclamationTopology2
def start
def start(base_class_path)
builder = TopologyBuilder.new
builder.setSpout(1, TestWordSpout.new, 10)
builder.setBolt(2, JRubyBolt.new("RubyExclamationBolt2"), 3).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new("RubyExclamationBolt2"), 2).shuffleGrouping(2)
builder.setBolt(2, JRubyBolt.new(base_class_path, "RubyExclamationBolt2"), 3).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyExclamationBolt2"), 2).shuffleGrouping(2)
conf = Config.new
conf.setDebug(true)

View File

@ -4,13 +4,8 @@ require 'thread'
# RubyRedisWordSpout reads the Redis queue "test" on localhost:6379
# and emits each word items pop'ed from the queue.
class RubyRedisWordSpout
def is_distributed
false
end
def open(conf, context, collector)
@collector = collector
@q = Queue.new
@redis_reader = detach_redis_reader
end
@ -65,10 +60,10 @@ class RubyRedisWordCount
end
class RubyRedisWordCountTopology
def start
def start(base_class_path)
builder = TopologyBuilder.new
builder.setSpout(1, JRubySpout.new("RubyRedisWordSpout"), 1)
builder.setBolt(2, JRubyBolt.new("RubyRedisWordCount"), 3).fieldsGrouping(1, Fields.new("word"))
builder.setSpout(1, JRubySpout.new(base_class_path, "RubyRedisWordSpout"), 1)
builder.setBolt(2, JRubyBolt.new(base_class_path, "RubyRedisWordCount"), 3).fieldsGrouping(1, Fields.new("word"))
conf = Config.new
conf.setDebug(true)

View File

@ -1,5 +1,8 @@
class RubyRandomSentenceSpout
attr_reader :is_distributed
def initialize
@is_distributed = true
@sentences = [
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
@ -9,10 +12,6 @@ class RubyRandomSentenceSpout
]
end
def is_distributed
true
end
def open(conf, context, collector)
@collector = collector
end
@ -62,11 +61,11 @@ end
class RubyWordCountTopology
def start
def start(base_class_path)
builder = TopologyBuilder.new
builder.setSpout(1, JRubySpout.new("RubyRandomSentenceSpout"), 5)
builder.setBolt(2, JRubyBolt.new("RubySplitSentence"), 8).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new("RubyWordCount"), 12).fieldsGrouping(2, Fields.new("word"))
builder.setSpout(1, JRubySpout.new(base_class_path, "RubyRandomSentenceSpout"), 5)
builder.setBolt(2, JRubyBolt.new(base_class_path, "RubySplitSentence"), 8).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new(base_class_path, "RubyWordCount"), 12).fieldsGrouping(2, Fields.new("word"))
conf = Config.new
conf.setDebug(true)

View File

@ -4,9 +4,7 @@ java_import 'backtype.storm.task.OutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.topology.IRichBolt'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
java_package 'redstorm.proxy'
@ -25,8 +23,11 @@ java_package 'redstorm.proxy'
class Bolt
java_implements IRichBolt
java_signature 'IRichBolt (String real_bolt_class_name)'
def initialize(real_bolt_class_name)
java_signature 'IRichBolt (String base_class_path, String real_bolt_class_name)'
def initialize(base_class_path, real_bolt_class_name)
@real_bolt = Object.module_eval(real_bolt_class_name).new
rescue NameError
require base_class_path
@real_bolt = Object.module_eval(real_bolt_class_name).new
end
@ -49,5 +50,4 @@ class Bolt
def declareOutputFields(declarer)
@real_bolt.declare_output_fields(declarer)
end
end

View File

@ -4,9 +4,7 @@ java_import 'backtype.storm.spout.SpoutOutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.topology.IRichSpout'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
java_package 'redstorm.proxy'
@ -28,14 +26,17 @@ java_package 'redstorm.proxy'
class Spout
java_implements IRichSpout
java_signature 'IRichSpout (String real_spout_class_name)'
def initialize(real_spout_class_name)
java_signature 'IRichSpout (String base_class_path, String real_spout_class_name)'
def initialize(base_class_path, real_spout_class_name)
@real_spout = Object.module_eval(real_spout_class_name).new
rescue NameError
require base_class_path
@real_spout = Object.module_eval(real_spout_class_name).new
end
java_signature 'boolean isDistributed()'
def isDistributed
@real_spout.is_distributed
@real_spout.respond_to?(:is_distributed) ? @real_spout.is_distributed : false
end
java_signature 'void open(Map, TopologyContext, SpoutOutputCollector)'
@ -67,5 +68,4 @@ class Spout
def declareOutputFields(declarer)
@real_spout.declare_output_fields(declarer)
end
end

View File

@ -1,8 +1,11 @@
require 'java'
require 'rubygems'
require 'lib/red_storm/version'
java_import 'backtype.storm.Config'
java_import 'backtype.storm.LocalCluster'
java_import 'backtype.storm.StormSubmitter'
java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
@ -11,19 +14,21 @@ java_import 'backtype.storm.tuple.Values'
java_import 'backtype.storm.jruby.JRubyBolt'
java_import 'backtype.storm.jruby.JRubySpout'
# TopologyLauncher is the application entry point when launching a topology. Basically it will
# call require on the specified Ruby topology/project class file path and call its start method
class TopologyLauncher
java_signature 'void main(String[])'
def self.main(args)
unless args.size > 0
puts("usage: redstorm {ruby topology class file path}")
puts("Usage: redstorm topology_class_file")
exit(1)
end
require args[0]
clazz = camel_case(args[0].split('/').last.split('.').first)
puts("redstorm launching #{clazz}")
Object.module_eval(clazz).new.start
class_path = args[0]
clazz = camel_case(class_path.split('/').last.split('.').first)
puts("redstorm v#{Redstorm::VERSION} launching #{clazz}")
require class_path
Object.module_eval(clazz).new.start(class_path)
end
private
@ -31,4 +36,4 @@ class TopologyLauncher
def self.camel_case(s)
s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase }
end
end
end

3
lib/red_storm/version.rb Normal file
View File

@ -0,0 +1,3 @@
module Redstorm
VERSION = '0.1'
end

View File

@ -20,20 +20,22 @@ import java.util.Map;
public class JRubyBolt implements IRichBolt {
IRichBolt _proxyBolt;
String _realBoltClassName;
String _baseClassPath;
/**
* create a new JRubyBolt
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realBoltClassName the fully qualified JRuby bolt implementation class name
*/
public JRubyBolt(String realBoltClassName) {
public JRubyBolt(String baseClassPath, String realBoltClassName) {
_baseClassPath = baseClassPath;
_realBoltClassName = realBoltClassName;
}
@Override
public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
// create instance of the jruby class here, after deserialization in the workers.
_proxyBolt = newProxyBolt(_realBoltClassName);
_proxyBolt = newProxyBolt(_baseClassPath, _realBoltClassName);
_proxyBolt.prepare(stormConf, context, collector);
}
@ -52,13 +54,13 @@ public class JRubyBolt implements IRichBolt {
// declareOutputFields is executed in the topology creation time, before serialisation.
// do not set the _proxyBolt instance variable here to avoid JRuby serialization
// issues. Just create tmp bolt instance to call declareOutputFields.
IRichBolt bolt = newProxyBolt(_realBoltClassName);
IRichBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName);
bolt.declareOutputFields(declarer);
}
private static IRichBolt newProxyBolt(String realBoltClassName) {
private static IRichBolt newProxyBolt(String baseClassPath, String realBoltClassName) {
try {
redstorm.proxy.Bolt proxy = new redstorm.proxy.Bolt(realBoltClassName);
redstorm.proxy.Bolt proxy = new redstorm.proxy.Bolt(baseClassPath, realBoltClassName);
return proxy;
}
catch (Exception e) {

View File

@ -20,13 +20,16 @@ import java.util.Map;
public class JRubySpout implements IRichSpout {
IRichSpout _proxySpout;
String _realSpoutClassName;
String _baseClassPath;
/**
* create a new JRubySpout
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realSpoutClassName the fully qualified JRuby spout implementation class name
*/
public JRubySpout(String realSpoutClassName) {
public JRubySpout(String baseClassPath, String realSpoutClassName) {
_baseClassPath = baseClassPath;
_realSpoutClassName = realSpoutClassName;
}
@ -35,14 +38,14 @@ public class JRubySpout implements IRichSpout {
// isDistributed is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call isDistributed.
IRichSpout spout = newProxySpout(_realSpoutClassName);
IRichSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.isDistributed();
}
@Override
public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
// create instance of the jruby proxy class here, after deserialization in the workers.
_proxySpout = newProxySpout(_realSpoutClassName);
_proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName);
_proxySpout.open(conf, context, collector);
}
@ -71,13 +74,13 @@ public class JRubySpout implements IRichSpout {
// declareOutputFields is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call declareOutputFields.
IRichSpout spout = newProxySpout(_realSpoutClassName);
IRichSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
spout.declareOutputFields(declarer);
}
private static IRichSpout newProxySpout(String realSpoutClassName) {
private static IRichSpout newProxySpout(String baseClassPath, String realSpoutClassName) {
try {
redstorm.proxy.Spout proxy = new redstorm.proxy.Spout(realSpoutClassName);
redstorm.proxy.Spout proxy = new redstorm.proxy.Spout(baseClassPath, realSpoutClassName);
return proxy;
}
catch (Exception e) {