no more compiling topologies/bolts/spouts, added launcher, cleanups

This commit is contained in:
Colin Surprenant 2011-10-28 16:24:22 -04:00
parent 7f85fca4cb
commit 04f6f26d22
9 changed files with 50 additions and 109 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
storm/lib/
storm/classes/
tmp/

View File

@ -37,28 +37,15 @@ end
task :build => :setup do
# first compile the JRuby proxy classes, required by the Java bindings
build_jruby("#{JRUBY_SRC_DIR}")
build_jruby("#{JRUBY_SRC_DIR}/proxy")
# compile the Storm Java->JRuby bindings
build_java("#{JAVA_SRC_DIR}/backtype/storm/jruby")
# compile the Ruby examples
build_jruby("#{EXAMPLES_SRC_DIR}")
# compile the Ruby user-created topologies
unless Dir["#{TOPOLOGIES_SRC_DIR}/*"].empty?
build_jruby("#{TOPOLOGIES_SRC_DIR}")
end
# first compile the JRuby proxy classes, required by the Java bindings
build_jruby("#{JRUBY_SRC_DIR}/topology_launcher.rb")
end
task :storm do
unless ENV['class']
puts("usage: rake storm class={fully qualified java class name}")
exit(1)
end
system("java -cp \"./#{CLASSES_DIR}:./#{RUNTIME_LIB_DIR}/*:./#{DEV_LIB_DIR}/*:#{JRUBY_JAR}\" #{ENV['class']}")
end
def build_java(source_folder)
puts("\n--> Building Java:")
ant.javac(

7
bin/redstorm Executable file
View File

@ -0,0 +1,7 @@
JRUBY_JAR="${HOME}/.rvm/rubies/jruby-1.6.5/lib/jruby-complete.jar"
STORM_DIR="./storm"
RUNTIME_LIB_DIR="${STORM_DIR}/lib"
DEV_LIB_DIR="${STORM_DIR}/lib/dev"
CLASSES_DIR="${STORM_DIR}/classes"
java -cp "./${CLASSES_DIR}:./${RUNTIME_LIB_DIR}/*:./${DEV_LIB_DIR}/*:${JRUBY_JAR}" TopologyLauncher $@

View File

@ -1,6 +1,4 @@
class RubyExclamationBolt
def prepare(conf, context, collector)
@collector = collector
end
@ -13,5 +11,4 @@ class RubyExclamationBolt
def declare_output_fields(declarer)
declarer.declare(Fields.new("word"))
end
end

View File

@ -1,27 +1,8 @@
require 'java'
java_import 'backtype.storm.Config'
java_import 'backtype.storm.LocalCluster'
java_import 'backtype.storm.task.OutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.testing.TestWordSpout'
java_import 'backtype.storm.topology.IRichBolt'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'backtype.storm.utils.Utils'
java_import 'java.util.Map'
java_import 'backtype.storm.jruby.JRubyBolt'
require 'examples/ruby_exclamation_bolt'
class RubyExclamationTopology
java_signature 'void main(String[])'
def self.main(args)
def start
builder = TopologyBuilder.new
builder.setSpout(1, TestWordSpout.new, 10)
@ -37,5 +18,4 @@ class RubyExclamationTopology
cluster.killTopology("test")
cluster.shutdown
end
end

View File

@ -1,23 +1,6 @@
require 'java'
java_import 'backtype.storm.Config'
java_import 'backtype.storm.LocalCluster'
java_import 'backtype.storm.task.OutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.testing.TestWordSpout'
java_import 'backtype.storm.topology.IRichBolt'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'backtype.storm.utils.Utils'
java_import 'java.util.Map'
java_import 'backtype.storm.jruby.JRubyBolt'
class RubyExclamationBolt2
def prepare(conf, context, collector)
@collector = collector
end
@ -30,13 +13,10 @@ class RubyExclamationBolt2
def declare_output_fields(declarer)
declarer.declare(Fields.new("word"))
end
end
class RubyExclamationTopology2
java_signature 'void main(String[])'
def self.main(args)
def start
builder = TopologyBuilder.new
builder.setSpout(1, TestWordSpout.new, 10)
@ -52,5 +32,4 @@ class RubyExclamationTopology2
cluster.killTopology("test")
cluster.shutdown
end
end

View File

@ -1,29 +1,9 @@
require 'java'
require 'rubygems'
require 'redis'
require 'thread'
java_import 'backtype.storm.Config'
java_import 'backtype.storm.LocalCluster'
java_import 'backtype.storm.task.OutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.testing.TestWordSpout'
java_import 'backtype.storm.topology.IRichBolt'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'backtype.storm.utils.Utils'
java_import 'java.util.Map'
java_import 'backtype.storm.jruby.JRubyBolt'
java_import 'backtype.storm.jruby.JRubySpout'
# RubyRedisWordSpout reads the Redis queue "test" on localhost:6379
# and emits each word items pop'ed from the queue.
class RubyRedisWordSpout
def is_distributed
false
end
@ -36,6 +16,7 @@ class RubyRedisWordSpout
end
def next_tuple
# per doc nextTuple should not block, and sleep a bit when there's no data to process.
if @q.size > 0
@collector.emit(Values.new(@q.pop))
else
@ -64,7 +45,6 @@ class RubyRedisWordSpout
end
class RubyRedisWordCount
def initialize
@counts = Hash.new{|h, k| h[k] = 0}
end
@ -85,9 +65,7 @@ class RubyRedisWordCount
end
class RubyRedisWordCountTopology
java_signature 'void main(String[])'
def self.main(args)
def start
builder = TopologyBuilder.new
builder.setSpout(1, JRubySpout.new("RubyRedisWordSpout"), 1)
builder.setBolt(2, JRubyBolt.new("RubyRedisWordCount"), 3).fieldsGrouping(1, Fields.new("word"))

View File

@ -1,23 +1,3 @@
require 'java'
java_import 'backtype.storm.Config'
java_import 'backtype.storm.LocalCluster'
java_import 'backtype.storm.task.OutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.testing.TestWordSpout'
java_import 'backtype.storm.topology.IRichBolt'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'backtype.storm.utils.Utils'
java_import 'java.util.Map'
java_import 'backtype.storm.jruby.JRubyBolt'
java_import 'backtype.storm.jruby.JRubySpout'
class RubyRandomSentenceSpout
def initialize
@sentences = [
@ -82,9 +62,7 @@ end
class RubyWordCountTopology
java_signature 'void main(String[])'
def self.main(args)
def start
builder = TopologyBuilder.new
builder.setSpout(1, JRubySpout.new("RubyRandomSentenceSpout"), 5)
builder.setBolt(2, JRubyBolt.new("RubySplitSentence"), 8).shuffleGrouping(1)

View File

@ -0,0 +1,34 @@
require 'java'
require 'rubygems'
java_import 'backtype.storm.Config'
java_import 'backtype.storm.LocalCluster'
java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'backtype.storm.jruby.JRubyBolt'
java_import 'backtype.storm.jruby.JRubySpout'
class TopologyLauncher
java_signature 'void main(String[])'
def self.main(args)
unless args.size > 0
puts("usage: redstorm {ruby topology class file path}")
exit(1)
end
require args[0]
clazz = camel_case(args[0].split('/').last.split('.').first)
puts("redstorm launching #{clazz}")
Object.module_eval(clazz).new.start
end
private
def self.camel_case(s)
s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase }
end
end