new dir structure, spout support, more examples

This commit is contained in:
Colin Surprenant 2011-10-26 18:48:02 -04:00
parent 865c914324
commit b1b27fd303
15 changed files with 441 additions and 94 deletions

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
storm/lib/
storm/classes/

View File

@ -9,7 +9,9 @@ This has been tested on OSX 10.6.8 using Storm 0.5.3 and JRuby 1.6.4
- edit Rakefile to ajust your `JRUBY_JAR`
``` sh
$ ./lein deps
rake build
rake storm class=storm.starter.RubyExclamationTopology
$ rake deps
$ rake build
$ rake storm class=RubyExclamationTopology
$ rake storm class=RubyExclamationTopology2
$ rake storm class=RubyWordCountTopology
```

View File

@ -1,23 +1,24 @@
require 'ant'
PROJECT_NAME = 'storm-jruby'
JAVA_SRC_DIR = 'src/jvm'
JRUBY_SRC_DIR = 'src/jruby'
RUNTIME_LIB_DIR = 'lib'
DEV_LIB_DIR = 'lib/dev'
require 'ant'
JRUBY_JAR = '/Users/colin/.rvm/rubies/jruby-1.6.4/lib/jruby.jar'
BUILD_DIR = 'build'
CLASSES_DIR = "classes"
STORM_DIR = './storm'
JAVA_SRC_DIR = "#{STORM_DIR}/src/jvm"
EXAMPLES_SRC_DIR = "./examples"
JRUBY_SRC_DIR = "./lib/red_storm"
RUNTIME_LIB_DIR = "#{STORM_DIR}/lib"
DEV_LIB_DIR = "#{STORM_DIR}/lib/dev"
CLASSES_DIR = "#{STORM_DIR}/classes"
task :default => [:clean, :build]
task :clean do
ant.delete :dir => BUILD_DIR
puts
end
task :clean_all => :clean do
ant.delete :dir => RUNTIME_LIB_DIR
end
task :clean do
ant.delete :dir => CLASSES_DIR
end
task :setup do
ant.mkdir :dir => CLASSES_DIR
@ -28,29 +29,40 @@ task :setup do
end
end
task :build => :setup do
build_java "#{JAVA_SRC_DIR}/backtype/storm/jruby"
build_jruby "#{JRUBY_SRC_DIR}/storm/starter"
end
def build_java(source_folder)
ant.javac :srcdir => source_folder, :destdir => CLASSES_DIR, :classpathref => 'classpath',
:source => "1.6", :target => "1.6", :debug => "yes", :includeantruntime => "no"
puts
end
def build_jruby(source_folder)
puts("compiling jruby")
exec("jrubyc -t #{CLASSES_DIR} --javac -c \"#{DEV_LIB_DIR}/storm-0.5.3.jar\" -c \"#{CLASSES_DIR}\" #{JRUBY_SRC_DIR}")
end
task :deps do
system("cd #{STORM_DIR}; ./lein deps")
end
task :build => :setup do
build_jruby("#{JRUBY_SRC_DIR}")
build_java("#{JAVA_SRC_DIR}/backtype/storm/jruby")
build_jruby("#{EXAMPLES_SRC_DIR}")
end
task :storm do
unless ENV['class']
puts("usage: rake storm class={fully qualified java class name}")
exit(1)
end
exec("java -cp \"./#{CLASSES_DIR}:./#{RUNTIME_LIB_DIR}/*:./#{DEV_LIB_DIR}/*:#{JRUBY_JAR}\" #{ENV['class']}")
system("java -cp \"./#{CLASSES_DIR}:./#{RUNTIME_LIB_DIR}/*:./#{DEV_LIB_DIR}/*:#{JRUBY_JAR}\" #{ENV['class']}")
end
def build_java(source_folder)
puts("\n--> Building Java:")
ant.javac(
:srcdir => source_folder,
:destdir => CLASSES_DIR,
:classpathref => 'classpath',
:source => "1.6",
:target => "1.6",
:debug => "yes",
:includeantruntime => "no",
:verbose => false,
:listfiles => true
)
end
def build_jruby(source_folder)
puts("\n--> Building JRuby #{source_folder}")
system("jrubyc -t #{CLASSES_DIR} --verbose --javac -c \"#{DEV_LIB_DIR}/storm-0.5.3.jar\" -c \"#{CLASSES_DIR}\" #{source_folder}")
end

View File

@ -0,0 +1,17 @@
class RubyExclamationBolt
def prepare(conf, context, collector)
@collector = collector
end
def execute(tuple)
@collector.emit(tuple, Values.new(tuple.getString(0) + "!!!"))
@collector.ack(tuple)
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word"))
end
end

View File

@ -15,9 +15,8 @@ java_import 'backtype.storm.utils.Utils'
java_import 'java.util.Map'
java_import 'backtype.storm.jruby.JRubyBolt'
java_import 'storm.starter.RubyExclamationBolt'
java_package 'storm.starter'
require 'examples/ruby_exclamation_bolt'
class RubyExclamationTopology
@ -26,15 +25,15 @@ class RubyExclamationTopology
builder = TopologyBuilder.new
builder.setSpout(1, TestWordSpout.new, 10)
builder.setBolt(2, JRubyBolt.new("storm.starter.RubyExclamationBolt"), 3).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new("storm.starter.RubyExclamationBolt"), 2).shuffleGrouping(2)
builder.setBolt(2, JRubyBolt.new("RubyExclamationBolt"), 3).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new("RubyExclamationBolt"), 2).shuffleGrouping(2)
conf = Config.new
conf.setDebug(true)
cluster = LocalCluster.new
cluster.submitTopology("test", conf, builder.createTopology)
Utils.sleep(10000)
sleep(5)
cluster.killTopology("test")
cluster.shutdown
end

View File

@ -0,0 +1,56 @@
require 'java'
java_import 'backtype.storm.Config'
java_import 'backtype.storm.LocalCluster'
java_import 'backtype.storm.task.OutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.testing.TestWordSpout'
java_import 'backtype.storm.topology.IRichBolt'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'backtype.storm.utils.Utils'
java_import 'java.util.Map'
java_import 'backtype.storm.jruby.JRubyBolt'
class RubyExclamationBolt2
def prepare(conf, context, collector)
@collector = collector
end
def execute(tuple)
@collector.emit(tuple, Values.new(tuple.getString(0) + "!!!"))
@collector.ack(tuple)
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word"))
end
end
class RubyExclamationTopology2
java_signature 'void main(String[])'
def self.main(args)
builder = TopologyBuilder.new
builder.setSpout(1, TestWordSpout.new, 10)
builder.setBolt(2, JRubyBolt.new("RubyExclamationBolt2"), 3).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new("RubyExclamationBolt2"), 2).shuffleGrouping(2)
conf = Config.new
conf.setDebug(true)
cluster = LocalCluster.new
cluster.submitTopology("test", conf, builder.createTopology)
sleep(5)
cluster.killTopology("test")
cluster.shutdown
end
end

View File

@ -0,0 +1,102 @@
require 'java'
java_import 'backtype.storm.Config'
java_import 'backtype.storm.LocalCluster'
java_import 'backtype.storm.task.OutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.testing.TestWordSpout'
java_import 'backtype.storm.topology.IRichBolt'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'backtype.storm.utils.Utils'
java_import 'java.util.Map'
java_import 'backtype.storm.jruby.JRubyBolt'
java_import 'backtype.storm.jruby.JRubySpout'
class RubyRandomSentenceSpout
def initialize
@sentences = [
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
]
end
def is_distributed
true
end
def open(conf, context, collector)
@collector = collector
end
def next_tuple
@collector.emit(Values.new(@sentences[rand(@sentences.length)]))
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word"))
end
end
class RubySplitSentence
def prepare(conf, context, collector)
@collector = collector
end
def execute(tuple)
tuple.getString(0).split(" ").each {|w| @collector.emit(Values.new(w)) }
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word"))
end
end
class RubyWordCount
def initialize
@counts = Hash.new{|h, k| h[k] = 0}
end
def prepare(conf, context, collector)
@collector = collector
end
def execute(tuple)
word = tuple.getString(0)
@counts[word] += 1
@collector.emit(Values.new(word, @counts[word]))
end
def declare_output_fields(declarer)
declarer.declare(Fields.new("word", "count"))
end
end
class RubyWordCountTopology
java_signature 'void main(String[])'
def self.main(args)
builder = TopologyBuilder.new
builder.setSpout(1, JRubySpout.new("RubyRandomSentenceSpout"), 5)
builder.setBolt(2, JRubyBolt.new("RubySplitSentence"), 8).shuffleGrouping(1)
builder.setBolt(3, JRubyBolt.new("RubyWordCount"), 12).fieldsGrouping(2, Fields.new("word"))
conf = Config.new
conf.setDebug(true)
conf.setMaxTaskParallelism(3)
cluster = LocalCluster.new
cluster.submitTopology("word-count", conf, builder.createTopology)
sleep(5)
cluster.shutdown
end
end

2
lib/red_storm.rb Normal file
View File

@ -0,0 +1,2 @@
require 'lib/red_storm/proxy/bolt'
require 'lib/red_storm/proxy/spout'

View File

@ -0,0 +1,43 @@
require 'java'
java_import 'backtype.storm.task.OutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.topology.IRichBolt'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
java_package 'redstorm.proxy'
class Bolt
java_implements IRichBolt
java_signature 'IRichBolt (String real_bolt_class_name)'
def initialize(real_bolt_class_name)
@real_bolt_class_name = real_bolt_class_name
end
java_signature 'void prepare(Map, TopologyContext, OutputCollector)'
def prepare(conf, context, collector)
@real_bolt = Object.module_eval(@real_bolt_class_name).new
@real_bolt.prepare(conf, context, collector)
end
java_signature 'void execute(Tuple)'
def execute(tuple)
@real_bolt.execute(tuple)
end
java_signature 'void cleanup()'
def cleanup
@real_bolt.cleanup if @real_bolt.respond_to?(:cleanup)
end
java_signature 'void declareOutputFields(OutputFieldsDeclarer)'
def declareOutputFields(declarer)
Object.module_eval(@real_bolt_class_name).new.declare_output_fields(declarer)
end
end

View File

@ -0,0 +1,58 @@
require 'java'
java_import 'backtype.storm.spout.SpoutOutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.topology.IRichSpout'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
java_package 'redstorm.proxy'
class Spout
java_implements IRichSpout
java_signature 'IRichSpout (String real_spout_class_name)'
def initialize(real_spout_class_name)
@real_spout_class_name = real_spout_class_name
end
java_signature 'boolean isDistributed()'
def isDistributed
Object.module_eval(@real_spout_class_name).new.is_distributed
end
java_signature 'void open(Map, TopologyContext, SpoutOutputCollector)'
def open(conf, context, collector)
@real_spout = Object.module_eval(@real_spout_class_name).new
@real_spout.open(conf, context, collector)
end
java_signature 'void close()'
def close
@real_spout.close if @real_spout.respond_to?(:close)
end
java_signature 'void nextTuple()'
def nextTuple
@real_spout.next_tuple
end
java_signature 'void ack(Object)'
def ack(msgId)
@real_spout.ack(msgId) if @real_spout.respond_to?(:close)
end
java_signature 'void fail(Object)'
def fail(msgId)
@real_spout.fail(msgId) if @real_spout.respond_to?(:close)
end
java_signature 'void declareOutputFields(OutputFieldsDeclarer)'
def declareOutputFields(declarer)
Object.module_eval(@real_spout_class_name).new.declare_output_fields(declarer)
end
end

View File

@ -1,37 +0,0 @@
require 'java'
java_import 'backtype.storm.task.OutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.topology.IRichBolt'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
java_package 'storm.starter'
class RubyExclamationBolt
java_implements IRichBolt
java_signature 'void prepare(Map, TopologyContext, OutputCollector)'
def prepare(conf, context, collector)
@collector = collector
end
java_signature 'void execute(Tuple)'
def execute(tuple)
@collector.emit(tuple, Values.new(tuple.getString(0) + "!!!"))
@collector.ack(tuple)
end
java_signature 'void cleanup()'
def cleanup
end
java_signature 'void declareOutputFields(OutputFieldsDeclarer)'
def declareOutputFields(declarer)
declarer.declare(Fields.new("word"))
end
end

View File

View File

@ -1,6 +1,9 @@
(defproject storm-jruby "0.0.1"
:source-path "src/clj"
:java-source-path "src/jvm"
:library-path "lib/"
:dev-library-path "lib/dev/"
:compile-path "build/classes"
:javac-options {:debug "true" :fork "true"}
:resources-path "multilang"
:aot :all

View File

@ -19,48 +19,50 @@ import java.util.Map;
* post-deserialization at the workers.
*/
public class JRubyBolt implements IRichBolt {
IRichBolt _bolt;
String _jrubyClassName;
IRichBolt _proxyBolt;
String _realBoltClassName;
/**
* create a new JRubyBolt
*
* @param jrubyClassName the fully qualified JRuby bolt implementation class name
* @param realBoltClassName the fully qualified JRuby bolt implementation class name
*/
public JRubyBolt(String jrubyClassName) {
// create instance of the jruby class so its available for declareOutputFields
// which gets executed in the topology creation time, before the prepare method
_jrubyClassName = jrubyClassName;
_bolt = realJRubyBolt(_jrubyClassName);
public JRubyBolt(String realBoltClassName) {
_realBoltClassName = realBoltClassName;
}
@Override
public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
// create instance of the jruby class here, after deserialization for the workers.
_bolt = realJRubyBolt(_jrubyClassName);
_bolt.prepare(stormConf, context, collector);
_proxyBolt = newProxyBolt(_realBoltClassName);
_proxyBolt.prepare(stormConf, context, collector);
}
@Override
public void execute(Tuple input) {
_bolt.execute(input);
_proxyBolt.execute(input);
}
@Override
public void cleanup() {
_bolt.cleanup();
_proxyBolt.cleanup();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
_bolt.declareOutputFields(declarer);
// declareOutputFields is executed in the topology creation time, before the prepare method at
// pre serialisation. do not set the _proxyBolt instance variable here to avoid JRuby serialization
// issues. Just create tmp bolt instance to call declareOutputFields.
IRichBolt bolt = newProxyBolt(_realBoltClassName);
bolt.declareOutputFields(declarer);
}
private static IRichBolt realJRubyBolt(String realClassName) {
private static IRichBolt newProxyBolt(String realBoltClassName) {
try {
Class clazz = Class.forName(realClassName);
return (IRichBolt)clazz.newInstance();
} catch (Exception e) {
redstorm.proxy.Bolt proxy = new redstorm.proxy.Bolt(realBoltClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

View File

@ -0,0 +1,86 @@
package backtype.storm.jruby;
import backtype.storm.generated.StreamInfo;
import backtype.storm.spout.ISpout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* the JRubySpout class is a simple proxy class to the actual spout implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization process when dispatching
* the spout to the workers. JRuby does not yet support serialization from Java
* (Java serialization call on a JRuby class).
*
* Note that the JRuby spout class is instanciated twice, in the constructor and in the open
* method. The constructor instance is required to support the declareOutputFields at topology
* creation while the open instance is required for the actual spout execution,
* post-deserialization at the workers.
*/
public class JRubySpout implements IRichSpout {
IRichSpout _proxySpout;
String _realSpoutClassName;
/**
* create a new JRubySpout
*
* @param jrubyClassName the fully qualified JRuby spout implementation class name
*/
public JRubySpout(String realSpoutClassName) {
_realSpoutClassName = realSpoutClassName;
}
@Override
public boolean isDistributed() {
IRichSpout spout = newProxySpout(_realSpoutClassName);
return spout.isDistributed();
}
@Override
public void open(final Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_proxySpout = newProxySpout(_realSpoutClassName);
_proxySpout.open(conf, context, collector);
}
@Override
public void close() {
_proxySpout.close();
}
@Override
public void nextTuple() {
_proxySpout.nextTuple();
}
@Override
public void ack(Object msgId) {
_proxySpout.ack(msgId);
}
@Override
public void fail(Object msgId) {
_proxySpout.fail(msgId);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
IRichSpout spout = newProxySpout(_realSpoutClassName);
spout.declareOutputFields(declarer);
}
private static IRichSpout newProxySpout(String realSpoutClassName) {
try {
redstorm.proxy.Spout proxy = new redstorm.proxy.Spout(realSpoutClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}