issue #5 - support for namespaced topology class

This commit is contained in:
Colin Surprenant 2011-12-09 15:28:23 -05:00
parent 0051ef6fec
commit f482f3124a
12 changed files with 80 additions and 45 deletions

View File

@ -234,7 +234,7 @@ configure topology_name do |env|
end
```
The `configure` statement is **optional**.
The `configure` statement is **required**.
- `topology_name` — alternate topology name (**default** is topology class name)
- `env` — is set to `:local` or `:cluster` for you to set enviroment specific configurations

View File

@ -3,6 +3,8 @@ require 'examples/native/split_sentence_bolt'
require 'examples/native/word_count_bolt'
class ClusterWordCountTopology
RedStorm::Configuration.topology_class = self
def start(base_class_path, env)
builder = TopologyBuilder.new
builder.setSpout('1', JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5)

View File

@ -4,6 +4,8 @@ require 'examples/native/exclamation_bolt'
# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt
class LocalExclamationTopology
RedStorm::Configuration.topology_class = self
def start(base_class_path, env)
builder = TopologyBuilder.new

View File

@ -18,6 +18,8 @@ end
# this example topology uses the Storm TestWordSpout and our own JRuby ExclamationBolt
class LocalExclamationTopology2
RedStorm::Configuration.topology_class = self
def start(base_class_path, env)
builder = TopologyBuilder.new

View File

@ -41,6 +41,8 @@ class RedisWordSpout
end
class LocalRedisWordCountTopology
RedStorm::Configuration.topology_class = self
def start(base_class_path, env)
builder = TopologyBuilder.new
builder.setSpout('1', JRubySpout.new(base_class_path, "RedisWordSpout"), 1)

View File

@ -2,20 +2,25 @@ require 'examples/native/random_sentence_spout'
require 'examples/native/split_sentence_bolt'
require 'examples/native/word_count_bolt'
class LocalWordCountTopology
def start(base_class_path, env)
builder = TopologyBuilder.new
builder.setSpout('1', JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5)
builder.setBolt('2', JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 8).shuffleGrouping('1')
builder.setBolt('3', JRubyBolt.new(base_class_path, "WordCountBolt"), 12).fieldsGrouping('2', Fields.new("word"))
conf = Config.new
conf.setDebug(true)
conf.setMaxTaskParallelism(3)
module Examples
class LocalWordCountTopology
RedStorm::Configuration.topology_class = self
cluster = LocalCluster.new
cluster.submitTopology("word-count", conf, builder.createTopology)
sleep(5)
cluster.shutdown
def start(base_class_path, env)
builder = TopologyBuilder.new
builder.setSpout('1', JRubySpout.new(base_class_path, "RandomSentenceSpout"), 5)
builder.setBolt('2', JRubyBolt.new(base_class_path, "SplitSentenceBolt"), 8).shuffleGrouping('1')
builder.setBolt('3', JRubyBolt.new(base_class_path, "WordCountBolt"), 12).fieldsGrouping('2', Fields.new("word"))
conf = Config.new
conf.setDebug(true)
conf.setMaxTaskParallelism(3)
cluster = LocalCluster.new
cluster.submitTopology("word-count", conf, builder.createTopology)
sleep(5)
cluster.shutdown
end
end
end

View File

@ -2,33 +2,35 @@ require 'examples/simple/random_sentence_spout'
require 'examples/simple/split_sentence_bolt'
require 'examples/simple/word_count_bolt'
class WordCountTopology < RedStorm::SimpleTopology
spout RandomSentenceSpout, :parallelism => 5
bolt SplitSentenceBolt, :parallelism => 8 do
source RandomSentenceSpout, :shuffle
end
bolt WordCountBolt, :parallelism => 12 do
source SplitSentenceBolt, :fields => ["word"]
end
configure :word_count do |env|
case env
when :local
debug true
max_task_parallelism 3
when :cluster
debug true
num_workers 20
max_spout_pending(1000);
module RedStorm
class WordCountTopology < SimpleTopology
spout RandomSentenceSpout, :parallelism => 5
bolt SplitSentenceBolt, :parallelism => 8 do
source RandomSentenceSpout, :shuffle
end
bolt WordCountBolt, :parallelism => 12 do
source SplitSentenceBolt, :fields => ["word"]
end
end
on_submit do |env|
if env == :local
sleep(5)
cluster.shutdown
configure :word_count do |env|
case env
when :local
debug true
max_task_parallelism 3
when :cluster
debug true
num_workers 20
max_spout_pending(1000);
end
end
on_submit do |env|
if env == :local
sleep(5)
cluster.shutdown
end
end
end
end

View File

@ -3,6 +3,7 @@ module RedStorm
end
require 'red_storm/version'
require 'red_storm/configuration'
require 'red_storm/application'
require 'red_storm/simple_bolt'
require 'red_storm/simple_spout'

View File

@ -0,0 +1,16 @@
module RedStorm
module Configuration
extend self
@topology_class = nil
def topology_class=(clazz)
@topology_class = clazz
end
def topology_class
@topology_class
end
end
end

View File

@ -1,3 +1,5 @@
require 'red_storm/configuration'
module RedStorm
class SimpleTopology
@ -88,6 +90,7 @@ module RedStorm
end
def self.configure(name = nil, &configure_block)
Configuration.topology_class = self
@topology_name = name if name
@configure_block = configure_block if block_given?
end

View File

@ -24,7 +24,7 @@ java_import 'redstorm.storm.jruby.JRubySpout'
java_package 'redstorm'
# TopologyLauncher is the application entry point when launching a topology. Basically it will
# call require on the specified Ruby topology/project class file path and call its start method
# call require on the specified Ruby topology class file path and call its start method
class TopologyLauncher
java_signature 'void main(String[])'
@ -35,12 +35,12 @@ class TopologyLauncher
end
env = args[0].to_sym
class_path = args[1]
clazz = camel_case(class_path.split('/').last.split('.').first)
puts("RedStorm v#{RedStorm::VERSION} starting topology #{clazz} in #{env.to_s} environment")
require class_path
Object.module_eval(clazz).new.start(class_path, env)
topology_name = RedStorm::Configuration.topology_class.respond_to?(:topology_name) ? "/#{RedStorm::Configuration.topology_class.topology_name}" : ''
puts("RedStorm v#{RedStorm::VERSION} starting topology #{RedStorm::Configuration.topology_class.name}#{topology_name} in #{env.to_s} environment")
RedStorm::Configuration.topology_class.new.start(class_path, env)
end
private

View File

@ -1,3 +1,3 @@
module RedStorm
VERSION = '0.2.1'
VERSION = '0.3.0'
end