Merge pull request #14 from robbavey/master

Add DSL support for named streams.
This commit is contained in:
R. Tyler Croy 2015-08-27 13:47:24 -07:00
commit 954211ee86
11 changed files with 540 additions and 109 deletions

View File

@ -1,6 +1,7 @@
require 'java'
require 'red_storm/configurator'
require 'red_storm/environment'
require 'red_storm/dsl/output_fields'
require 'red_storm/loggable'
require 'pathname'
@ -16,14 +17,12 @@ module RedStorm
include Loggable
attr_reader :collector, :context, :config
include OutputFields
def self.java_proxy; "Java::RedstormStormJruby::JRubyBolt"; end
# DSL class methods
def self.output_fields(*fields)
@fields = fields.map(&:to_s)
end
def self.configure(&configure_block)
@configure_block = block_given? ? configure_block : lambda {}
end
@ -62,10 +61,18 @@ module RedStorm
@collector.emit_tuple(Values.new(*values))
end
def unanchored_stream_emit(stream, *values)
@collector.emit_tuple_stream(stream, Values.new(*values))
end
def anchored_emit(tuple, *values)
@collector.emit_anchor_tuple(tuple, Values.new(*values))
end
def anchored_stream_emit(stream, tuple, *values)
@collector.emit_anchor_tuple_stream(stream, tuple, Values.new(*values))
end
def ack(tuple)
@collector.ack(tuple)
end
@ -80,7 +87,21 @@ module RedStorm
output = on_receive(tuple)
if output && self.class.emit?
values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output
values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)}
values_list.each do |values|
if self.class.anchor?
if self.class.stream?
anchored_stream_emit(self.stream, tuple, *values)
else
anchored_emit(tuple, *values)
end
else
if self.class.stream?
unanchored_stream_emit(self.stream, *values)
else
unanchored_emit(*values)
end
end
end
@collector.ack(tuple) if self.class.ack?
end
end
@ -97,10 +118,6 @@ module RedStorm
on_close
end
def declare_output_fields(declarer)
declarer.declare(Fields.new(self.class.fields))
end
def get_component_configuration
configurator = Configurator.new
configurator.instance_exec(&self.class.configure_block)
@ -113,10 +130,6 @@ module RedStorm
def on_init; end
def on_close; end
def self.fields
@fields ||= []
end
def self.configure_block
@configure_block ||= lambda {}
end

View File

@ -6,4 +6,13 @@ java_import 'backtype.storm.tuple.Tuple'
class OutputCollector
java_alias :emit_tuple, :emit, [java.lang.Class.for_name("java.util.List")]
java_alias :emit_anchor_tuple, :emit, [Tuple.java_class, java.lang.Class.for_name("java.util.List")]
java_alias :emit_tuple_stream, :emit, [
java.lang.String,
java.lang.Class.for_name("java.util.List")
]
java_alias :emit_anchor_tuple_stream, :emit, [
java.lang.String,
Tuple.java_class,
java.lang.Class.for_name("java.util.List")
]
end

View File

@ -0,0 +1,48 @@
module RedStorm
module DSL
module OutputFields
def self.included(base)
base.extend ClassMethods
end
def declare_output_fields(declarer)
self.class.fields.each do |stream, fields|
declarer.declareStream(stream, Fields.new(fields))
end
end
def stream
self.class.stream
end
module ClassMethods
def output_fields(*fields)
@output_fields ||= Hash.new([])
fields.each do |field|
case field
when Hash
field.each { |k, v| @output_fields[k.to_s] = v.kind_of?(Array) ? v.map(&:to_s) : [v.to_s] }
else
@output_fields['default'] |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s]
end
end
end
def fields
@output_fields ||= Hash.new([])
end
def stream?
self.receive_options[:stream] && !self.receive_options[:stream].empty?
end
def stream
self.receive_options[:stream]
end
end
end
end
end

View File

@ -2,6 +2,7 @@ require 'java'
require 'red_storm/configurator'
require 'red_storm/environment'
require 'red_storm/loggable'
require 'red_storm/dsl/output_fields'
require 'pathname'
module RedStorm
@ -13,6 +14,8 @@ module RedStorm
include Loggable
attr_reader :config, :context, :collector
include OutputFields
def self.java_proxy; "Java::RedstormStormJruby::JRubySpout"; end
# DSL class methods
@ -21,10 +24,6 @@ module RedStorm
@configure_block = block_given? ? configure_block : lambda {}
end
def self.output_fields(*fields)
@fields = fields.map(&:to_s)
end
def self.on_send(*args, &on_send_block)
options = args.last.is_a?(Hash) ? args.pop : {}
method_name = args.first
@ -120,10 +119,6 @@ module RedStorm
on_deactivate
end
def declare_output_fields(declarer)
declarer.declare(Fields.new(self.class.fields))
end
def ack(msg_id)
on_ack(msg_id)
end
@ -148,10 +143,6 @@ module RedStorm
def on_ack(msg_id); end
def on_fail(msg_id); end
def self.fields
@fields ||= []
end
def self.configure_block
@configure_block ||= lambda {}
end
@ -171,7 +162,7 @@ module RedStorm
# below non-dry see Bolt class
def self.inherited(subclass)
path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(SpoutError, "unable to extract base topology class path from #{caller.first.inspect}")
subclass.base_class_path = File.expand_path(path)
subclass.base_class_path = Pathname.new(path).relative_path_from(Pathname.new(RedStorm::BASE_PATH)).to_s
end
def self.base_class_path=(path)

View File

@ -5,6 +5,7 @@ require 'red_storm/loggable'
java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.generated.SubmitOptions'
java_import 'backtype.storm.utils.Utils'
module RedStorm
module DSL
@ -27,16 +28,45 @@ module RedStorm
@constructor_args = constructor_args
@id = id.to_s
@parallelism = parallelism
@output_fields = []
@output_fields = Hash.new([])
initialize_output_fields
end
def output_fields(*args)
args.empty? ? @output_fields : @output_fields = args.map(&:to_s)
def output_fields(*fields)
default_fields = []
fields.each do |field|
case field
when Hash
field.each { |k, v| @output_fields[k.to_s] = v.kind_of?(Array) ? v.map(&:to_s) : [v.to_s] }
else
default_fields |= field.kind_of?(Array) ? field.map(&:to_s) : [field.to_s]
end
end
@output_fields[Utils::DEFAULT_STREAM_ID] = default_fields unless default_fields.empty?
@output_fields
end
def is_java?
@clazz.name.split('::').first.downcase == 'java'
end
private
def initialize_output_fields
if @clazz.ancestors.include?(RedStorm::DSL::OutputFields)
@output_fields = @clazz.fields.clone
end
end
def java_safe_fields
java_hash = java.util.HashMap.new()
@output_fields.each do |k, v|
java_hash.put(k, v.to_java('java.lang.String')) unless v.empty?
end
java_hash
end
end
class SpoutDefinition < ComponentDefinition
@ -48,7 +78,7 @@ module RedStorm
elsif is_java?
@clazz.new(*constructor_args)
else
Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields)
Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields)
end
end
end
@ -61,29 +91,33 @@ module RedStorm
@sources = []
end
def source(source_id, grouping)
@sources << [source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}]
def source(source_id, grouping, stream = Utils::DEFAULT_STREAM_ID)
@sources << [
source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s,
grouping.is_a?(Hash) ? grouping : {grouping => nil},
stream.to_s
]
end
def define_grouping(declarer)
@sources.each do |source_id, grouping|
@sources.each do |source_id, grouping, stream|
grouper, params = grouping.first
# declarer.fieldsGrouping(source_id, Fields.new())
case grouper
when :fields
declarer.fieldsGrouping(source_id, Fields.new(*([params].flatten.map(&:to_s))))
declarer.fieldsGrouping(source_id, stream, Fields.new(*([params].flatten.map(&:to_s))))
when :global
declarer.globalGrouping(source_id)
declarer.globalGrouping(source_id, stream)
when :shuffle
declarer.shuffleGrouping(source_id)
declarer.shuffleGrouping(source_id, stream)
when :local_or_shuffle
declarer.localOrShuffleGrouping(source_id)
declarer.localOrShuffleGrouping(source_id, stream)
when :none
declarer.noneGrouping(source_id)
declarer.noneGrouping(source_id, stream)
when :all
declarer.allGrouping(source_id)
declarer.allGrouping(source_id, stream)
when :direct
declarer.directGrouping(source_id)
declarer.directGrouping(source_id, stream)
else
raise("unknown grouper=#{grouper.inspect}")
end
@ -97,7 +131,7 @@ module RedStorm
elsif is_java?
@clazz.new(*constructor_args)
else
Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields)
Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields)
end
end
end

View File

@ -45,21 +45,42 @@ describe RedStorm::SimpleBolt do
output_fields :f1
end
bolt = Bolt1.new
Bolt1.send(:fields).should == ["f1"]
Bolt1.send(:fields).should == {"default" => ["f1"]}
end
it "should parse multiple arguments" do
class Bolt1 < RedStorm::SimpleBolt
output_fields :f1, :f2
end
Bolt1.send(:fields).should == ["f1", "f2"]
Bolt1.send(:fields).should == {"default" => ["f1", "f2"]}
end
it "should parse string and symbol arguments" do
class Bolt1 < RedStorm::SimpleBolt
output_fields :f1, "f2"
end
Bolt1.send(:fields).should == ["f1", "f2"]
Bolt1.send(:fields).should == {"default" => ["f1", "f2"]}
end
it "should parse single hash argument" do
class Bolt1 < RedStorm::SimpleBolt
output_fields :stream => :f1
end
Bolt1.send(:fields).should == {"stream" => ["f1"]}
end
it "should parse hash of string and symbols" do
class Bolt1 < RedStorm::SimpleBolt
output_fields "stream" => [:f1, :f2]
end
Bolt1.send(:fields).should == {"stream" => ["f1", "f2"]}
end
it "should parse string and hash arguments" do
class Bolt1 < RedStorm::SimpleBolt
output_fields :f1, :stream => :f2
end
Bolt1.send(:fields).should == {"default" => ["f1"], "stream" => ["f2"]}
end
it "should not share state over mutiple classes" do
@ -69,9 +90,9 @@ describe RedStorm::SimpleBolt do
class Bolt2 < RedStorm::SimpleBolt
output_fields :f2
end
RedStorm::SimpleBolt.send(:fields).should == []
Bolt1.send(:fields).should == ["f1"]
Bolt2.send(:fields).should == ["f2"]
RedStorm::SimpleBolt.send(:fields).should == {}
Bolt1.send(:fields).should == {"default" => ["f1"]}
Bolt2.send(:fields).should == {"default" => ["f2"]}
end
end
@ -122,6 +143,7 @@ describe RedStorm::SimpleBolt do
Bolt1.send(:emit?).should be_true
Bolt1.send(:ack?).should be_false
Bolt1.send(:anchor?).should be_false
Bolt1.send(:stream?).should be_false
end
it "should parse :emit option" do
@ -154,16 +176,27 @@ describe RedStorm::SimpleBolt do
Bolt1.send(:anchor?).should be_true
end
it "should parse multiple option" do
it "should parse :stream option" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :emit => false, :ack =>true, :anchor => true do
on_receive :stream => "test" do
end
end
Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true)
Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:stream => "test")
Bolt1.send(:stream?).should be_true
end
it "should parse multiple option" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :emit => false, :ack =>true, :anchor => true, :stream => "test" do
end
end
Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true, :stream => "test")
Bolt1.send(:emit?).should be_false
Bolt1.send(:ack?).should be_true
Bolt1.send(:anchor?).should be_true
Bolt1.send(:stream?).should be_true
end
end
@ -173,13 +206,13 @@ describe RedStorm::SimpleBolt do
class Bolt1 < RedStorm::SimpleBolt
def test_method; end
on_receive :test_method
end
Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS
Bolt1.send(:emit?).should be_true
Bolt1.send(:ack?).should be_false
Bolt1.send(:anchor?).should be_false
Bolt1.send(:stream?).should be_false
end
it "should parse :emit option" do
@ -193,8 +226,7 @@ describe RedStorm::SimpleBolt do
it "should parse :ack option" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :ack => true do
end
on_receive :test_method, :ack => true
end
Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:ack => true)
@ -203,24 +235,32 @@ describe RedStorm::SimpleBolt do
it "should parse :anchor option" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :anchor => true do
end
on_receive :test_method, :anchor => true
end
Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:anchor => true)
Bolt1.send(:anchor?).should be_true
end
it "should parse multiple option" do
it "should parse :stream option" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :emit => false, :ack =>true, :anchor => true do
end
on_receive :test_method, :stream => "test"
end
Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true)
Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:stream => "test")
Bolt1.send(:stream?).should be_true
end
it "should parse multiple option" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :test_method, :emit => false, :ack =>true, :anchor => true, :stream => "test"
end
Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true, :stream => "test")
Bolt1.send(:emit?).should be_false
Bolt1.send(:ack?).should be_true
Bolt1.send(:anchor?).should be_true
Bolt1.send(:stream?).should be_true
end
end
@ -246,6 +286,7 @@ describe RedStorm::SimpleBolt do
Bolt1.send(:emit?).should be_true
Bolt1.send(:ack?).should be_false
Bolt1.send(:anchor?).should be_false
Bolt1.send(:stream?).should be_false
end
it "should parse :emit option" do
@ -275,15 +316,25 @@ describe RedStorm::SimpleBolt do
Bolt1.send(:anchor?).should be_true
end
it "should parse multiple option" do
it "should parse :stream option" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :emit => false, :ack =>true, :anchor => true
on_receive :stream => "test"
end
Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true)
Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:stream => "test")
Bolt1.send(:stream?).should be_true
end
it "should parse multiple option" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :emit => false, :ack =>true, :anchor => true, :stream => "test"
end
Bolt1.receive_options.should == DEFAULT_RECEIVE_OPTIONS.merge(:emit =>false, :ack => true, :anchor => true, :stream => "test")
Bolt1.send(:emit?).should be_false
Bolt1.send(:ack?).should be_true
Bolt1.send(:anchor?).should be_true
Bolt1.send(:stream?).should be_true
end
end
end
@ -658,6 +709,74 @@ describe RedStorm::SimpleBolt do
bolt.prepare(nil, nil, collector)
bolt.execute("output")
end
it "should emit tuple on a stream" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :stream => :custom_stream do |tuple|
tuple
end
end
class Bolt2 < RedStorm::SimpleBolt
on_receive :my_method, :stream => :custom_stream
def my_method(tuple); tuple; end
end
class Bolt3 < RedStorm::SimpleBolt
on_receive :stream => :custom_stream
def on_receive(tuple); tuple; end
end
collector = mock("Collector")
RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values")
collector.should_receive(:emit_tuple_stream).with(:custom_stream, "values").exactly(3).times
bolt = Bolt1.new
bolt.prepare(nil, nil, collector)
bolt.execute("output")
bolt = Bolt2.new
bolt.prepare(nil, nil, collector)
bolt.execute("output")
bolt = Bolt3.new
bolt.prepare(nil, nil, collector)
bolt.execute("output")
end
it "should emit anchored tuple on a stream" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :anchor => true, :stream => :custom_stream do |tuple|
"output"
end
end
class Bolt2 < RedStorm::SimpleBolt
on_receive :my_method, :anchor => true, :stream => :custom_stream
def my_method(tuple)
"output"
end
end
class Bolt3 < RedStorm::SimpleBolt
on_receive :anchor => true, :stream => :custom_stream
def on_receive(tuple)
"output"
end
end
collector = mock("Collector")
RedStorm::Values.should_receive(:new).with("output").exactly(3).times.and_return("values")
collector.should_receive(:emit_anchor_tuple_stream).with(:custom_stream, "tuple", "values").exactly(3).times
bolt = Bolt1.new
bolt.prepare(nil, nil, collector)
bolt.execute("tuple")
bolt = Bolt2.new
bolt.prepare(nil, nil, collector)
bolt.execute("tuple")
bolt = Bolt3.new
bolt.prepare(nil, nil, collector)
bolt.execute("tuple")
end
end
describe "prepare" do
@ -740,10 +859,36 @@ describe RedStorm::SimpleBolt do
bolt = Bolt1.new
class RedStorm::Fields; end
declarer = mock("Declarer")
declarer.should_receive(:declare).with("fields")
declarer.should_receive(:declareStream).with("default", "fields")
RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields")
bolt.declare_output_fields(declarer)
end
it "should declare stream with fields" do
class Bolt1 < RedStorm::SimpleBolt
output_fields :stream => [:f1, :f2]
end
bolt = Bolt1.new
class RedStorm::Fields; end
declarer = mock("Declarer")
declarer.should_receive(:declareStream).with("stream", "fields")
RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields")
bolt.declare_output_fields(declarer)
end
it "should declare default stream fields and custom stream fields" do
class Bolt1 < RedStorm::SimpleBolt
output_fields :f1, :f2, :stream => [:f3, :f4]
end
bolt = Bolt1.new
class RedStorm::Fields; end
declarer = mock("Declarer")
declarer.should_receive(:declareStream).with("stream", "stream_fields")
declarer.should_receive(:declareStream).with("default", "default_fields")
RedStorm::Fields.should_receive(:new).with(["f3", "f4"]).and_return("stream_fields")
RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("default_fields")
bolt.declare_output_fields(declarer)
end
end
describe "get_component_configuration" do

View File

@ -10,5 +10,11 @@ describe OutputCollector do
# We should have an alias for #emit_anchor_tuple
it { should respond_to :emit_anchor_tuple }
# We should have an alias for #emit_tuple_stream
it { should respond_to :emit_tuple_stream }
# We should have an alias for #emit_anchor_tuple_stream
it { should respond_to :emit_anchor_tuple_stream }
end
end

View File

@ -65,21 +65,21 @@ describe RedStorm::SimpleSpout do
class Spout1 < RedStorm::SimpleSpout
output_fields :f1
end
Spout1.send(:fields).should == ["f1"]
Spout1.send(:fields).should == {"default" => ["f1"]}
end
it "should parse multiple arguments" do
class Spout1 < RedStorm::SimpleSpout
output_fields :f1, :f2
end
Spout1.send(:fields).should == ["f1", "f2"]
Spout1.send(:fields).should == {"default" => ["f1", "f2"]}
end
it "should parse string and symbol arguments" do
class Spout1 < RedStorm::SimpleSpout
output_fields :f1, "f2"
end
Spout1.send(:fields).should == ["f1", "f2"]
Spout1.send(:fields).should == {"default" => ["f1", "f2"]}
end
it "should not share state over mutiple classes" do
@ -89,9 +89,9 @@ describe RedStorm::SimpleSpout do
class Spout2 < RedStorm::SimpleSpout
output_fields :f2
end
RedStorm::SimpleSpout.send(:fields).should == []
Spout1.send(:fields).should == ["f1"]
Spout2.send(:fields).should == ["f2"]
RedStorm::SimpleSpout.send(:fields).should == {}
Spout1.send(:fields).should == {"default" => ["f1"]}
Spout2.send(:fields).should == {"default" => ["f2"]}
end
end
@ -804,7 +804,7 @@ describe RedStorm::SimpleSpout do
spout = Spout1.new
class RedStorm::Fields; end
declarer = mock("Declarer")
declarer.should_receive(:declare).with("fields")
declarer.should_receive(:declareStream).with("default", "fields")
RedStorm::Fields.should_receive(:new).with(["f1", "f2"]).and_return("fields")
spout.declare_output_fields(declarer)
end

View File

@ -1,5 +1,7 @@
require 'spec_helper'
require 'red_storm/dsl/topology'
require 'red_storm/dsl/spout'
require 'red_storm/dsl/bolt'
describe RedStorm::SimpleTopology do
@ -21,10 +23,10 @@ describe RedStorm::SimpleTopology do
Object.send(:remove_const, "SpoutClass2") if Object.const_defined?("SpoutClass2")
Object.send(:remove_const, "BoltClass1") if Object.const_defined?("BoltClass1")
Object.send(:remove_const, "BoltClass2") if Object.const_defined?("BoltClass2")
class SpoutClass1; end
class SpoutClass2; end
class BoltClass1; end
class BoltClass2; end
class SpoutClass1 < RedStorm::DSL::Spout; end
class SpoutClass2 < RedStorm::DSL::Spout; end
class BoltClass1 < RedStorm::DSL::Bolt; end
class BoltClass2 < RedStorm::DSL::Bolt; end
SpoutClass1.should_receive(:base_class_path).at_least(0).times.and_return("base_path")
SpoutClass2.should_receive(:base_class_path).at_least(0).times.and_return("base_path")
SpoutClass1.should_receive(:java_proxy).at_least(0).times.and_return("RedStorm::JRubySpout")
@ -125,8 +127,30 @@ describe RedStorm::SimpleTopology do
output_fields :f3
end
end
Topology1.spouts.first.output_fields.should == ["f1", "f2"]
Topology1.spouts.last.output_fields.should == [ "f3"]
Topology1.spouts.first.output_fields.should == { "default" => ["f1", "f2"] }
Topology1.spouts.last.output_fields.should == { "default" => ["f3"] }
end
it "should default output_fields to the class defined fields" do
class SpoutClass1
output_fields :f1, :f2
end
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1
end
Topology1.spouts.first.output_fields.should == { "default" => ["f1", "f2"] }
end
it "should override class defined fields with topology output fields" do
class SpoutClass1
output_fields :f1, :f2
end
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1 do
output_fields :f3, :f4
end
end
Topology1.spouts.first.output_fields.should == { "default" => ["f3", "f4"] }
end
end
@ -208,8 +232,31 @@ describe RedStorm::SimpleTopology do
output_fields :f3
end
end
Topology1.bolts.first.output_fields.should == ["f1", "f2"]
Topology1.bolts.last.output_fields.should == [ "f3"]
Topology1.bolts.first.output_fields.should == { "default" => ["f1", "f2"] }
Topology1.bolts.last.output_fields.should == { "default" => ["f3"] }
end
it "should default output_fields to the class defined fields" do
class BoltClass1
output_fields :f1, :f2
end
class Topology1 < RedStorm::SimpleTopology
bolt BoltClass1 do
end
end
Topology1.bolts.first.output_fields.should == { "default" => ["f1", "f2"] }
end
it "should override class defined fields with topology output fields" do
class BoltClass1
output_fields :f1, :f2
end
class Topology1 < RedStorm::SimpleTopology
bolt BoltClass1 do
output_fields :f3, :f4
end
end
Topology1.bolts.first.output_fields.should == { "default" => ["f3", "f4"] }
end
end
@ -320,8 +367,8 @@ describe RedStorm::SimpleTopology do
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout1)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", []).and_return(jruby_spout2)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", {}).and_return(jruby_spout1)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", {}).and_return(jruby_spout2)
builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1).and_return(declarer)
builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1).and_return(declarer)
@ -353,12 +400,13 @@ describe RedStorm::SimpleTopology do
configurator = mock(RedStorm::Configurator)
jruby_bolt1 = mock(RedStorm::JRubyBolt)
jruby_bolt2 = mock(RedStorm::JRubyBolt)
jruby_bolt3 = mock(RedStorm::JRubyBolt)
declarer = mock("Declarer")
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt1)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", []).and_return(jruby_bolt2)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", {}).and_return(jruby_bolt1)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", {}).and_return(jruby_bolt2)
builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return(declarer)
builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer)
@ -389,8 +437,8 @@ describe RedStorm::SimpleTopology do
backtype_config = mock(Backtype::Config)
Backtype::Config.should_receive(:new).any_number_of_times.and_return(backtype_config)
backtype_config.should_receive(:put)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", {}).and_return(jruby_bolt)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", {}).and_return(jruby_spout)
builder.should_receive("setBolt").with('bolt_class1', jruby_bolt, 1).and_return(@declarer)
builder.should_receive("setSpout").with('1', jruby_spout, 1).and_return(@declarer)
@declarer.should_receive("addConfigurations").twice
@ -407,7 +455,20 @@ describe RedStorm::SimpleTopology do
end
RedStorm::Fields.should_receive(:new).with("f1").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
@declarer.should_receive("fieldsGrouping").with('1', 'default', "fields")
Topology1.new.start(:cluster)
end
it "should support single string fields with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, { :fields => "f1" }, 'custom_stream'
end
end
RedStorm::Fields.should_receive(:new).with("f1").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields")
Topology1.new.start(:cluster)
end
@ -420,7 +481,20 @@ describe RedStorm::SimpleTopology do
end
RedStorm::Fields.should_receive(:new).with("s1").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
@declarer.should_receive("fieldsGrouping").with('1', 'default', "fields")
Topology1.new.start(:cluster)
end
it "should support single symbolic fields with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, { :fields => :s1 }, 'custom_stream'
end
end
RedStorm::Fields.should_receive(:new).with("s1").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields")
Topology1.new.start(:cluster)
end
@ -433,7 +507,20 @@ describe RedStorm::SimpleTopology do
end
RedStorm::Fields.should_receive(:new).with("f1", "f2").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
@declarer.should_receive("fieldsGrouping").with('1', 'default', "fields")
Topology1.new.start(:cluster)
end
it "should support string array fields with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, { :fields => ["f1", "f2"] }, 'custom_stream'
end
end
RedStorm::Fields.should_receive(:new).with("f1", "f2").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields")
Topology1.new.start(:cluster)
end
@ -446,7 +533,20 @@ describe RedStorm::SimpleTopology do
end
RedStorm::Fields.should_receive(:new).with("s1", "s2").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', "fields")
@declarer.should_receive("fieldsGrouping").with('1', 'default', "fields")
Topology1.new.start(:cluster)
end
it "should support symbolic array fields with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, { :fields => [:s1, :s2] }, 'custom_stream'
end
end
RedStorm::Fields.should_receive(:new).with("s1", "s2").and_return("fields")
@declarer.should_receive("fieldsGrouping").with('1', 'custom_stream', "fields")
Topology1.new.start(:cluster)
end
@ -458,7 +558,19 @@ describe RedStorm::SimpleTopology do
end
end
@declarer.should_receive("shuffleGrouping").with('1')
@declarer.should_receive("shuffleGrouping").with('1', 'default')
Topology1.new.start(:cluster)
end
it "should support shuffle with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :shuffle, 'custom_stream'
end
end
@declarer.should_receive("shuffleGrouping").with('1', 'custom_stream')
Topology1.new.start(:cluster)
end
@ -470,7 +582,19 @@ describe RedStorm::SimpleTopology do
end
end
@declarer.should_receive("localOrShuffleGrouping").with('1')
@declarer.should_receive("localOrShuffleGrouping").with('1', 'default')
Topology1.new.start(:cluster)
end
it "should support local_or_shuffle with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :local_or_shuffle, 'custom_stream'
end
end
@declarer.should_receive("localOrShuffleGrouping").with('1', 'custom_stream')
Topology1.new.start(:cluster)
end
@ -482,7 +606,19 @@ describe RedStorm::SimpleTopology do
end
end
@declarer.should_receive("noneGrouping").with('1')
@declarer.should_receive("noneGrouping").with('1', 'default')
Topology1.new.start(:cluster)
end
it "should support none" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :none, 'custom_stream'
end
end
@declarer.should_receive("noneGrouping").with('1', 'custom_stream')
Topology1.new.start(:cluster)
end
@ -494,7 +630,19 @@ describe RedStorm::SimpleTopology do
end
end
@declarer.should_receive("globalGrouping").with('1')
@declarer.should_receive("globalGrouping").with('1', 'default')
Topology1.new.start(:cluster)
end
it "should support global with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :global, 'custom_stream'
end
end
@declarer.should_receive("globalGrouping").with('1', 'custom_stream')
Topology1.new.start(:cluster)
end
@ -506,7 +654,19 @@ describe RedStorm::SimpleTopology do
end
end
@declarer.should_receive("allGrouping").with('1')
@declarer.should_receive("allGrouping").with('1', 'default')
Topology1.new.start(:cluster)
end
it "should support all with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :all, 'custom_stream'
end
end
@declarer.should_receive("allGrouping").with('1', 'custom_stream')
Topology1.new.start(:cluster)
end
@ -518,7 +678,19 @@ describe RedStorm::SimpleTopology do
end
end
@declarer.should_receive("directGrouping").with('1')
@declarer.should_receive("directGrouping").with('1', 'default')
Topology1.new.start(:cluster)
end
it "should support direct with a stream" do
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => 1
bolt BoltClass1 do
source 1, :direct, 'custom_stream'
end
end
@declarer.should_receive("directGrouping").with('1', 'custom_stream')
Topology1.new.start(:cluster)
end
end
@ -576,7 +748,7 @@ describe RedStorm::SimpleTopology do
Topology1.spouts.first.id.should == '1'
Topology1.bolts.first.id.should == '2'
Topology1.bolts.first.sources.first.should == ['1', {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ['1', {:shuffle => nil}, 'default']
end
it "should support explicit string ids" do
@ -590,7 +762,7 @@ describe RedStorm::SimpleTopology do
Topology1.spouts.first.id.should == "id1"
Topology1.bolts.first.id.should == "id2"
Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}, 'default']
end
it "should support implicit string ids" do
@ -604,7 +776,7 @@ describe RedStorm::SimpleTopology do
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}, 'default']
end
it "should support implicit symbol ids" do
@ -618,7 +790,7 @@ describe RedStorm::SimpleTopology do
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}, 'default']
end
it "should support implicit class ids" do
@ -632,7 +804,7 @@ describe RedStorm::SimpleTopology do
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}, 'default']
end
it "should raise on unresolvable" do
@ -646,7 +818,7 @@ describe RedStorm::SimpleTopology do
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}]
Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}, 'default']
lambda {Topology1.resolve_ids!(Topology1.spouts + Topology1.bolts)}.should raise_error RuntimeError, "cannot resolve BoltClass1 source id=dummy"
end

View File

@ -6,9 +6,11 @@ import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import java.util.Iterator;
import java.util.Map;
import org.jruby.Ruby;
import org.jruby.RubyHash;
import org.jruby.RubyObject;
import org.jruby.runtime.Helpers;
import org.jruby.runtime.builtin.IRubyObject;
@ -27,7 +29,7 @@ import org.jruby.exceptions.RaiseException;
*/
public class JRubyBolt implements IRichBolt {
private final String _realBoltClassName;
private final String[] _fields;
private final Map<String, String[]> _fields;
private final String _bootstrap;
// transient to avoid serialization
@ -41,7 +43,7 @@ public class JRubyBolt implements IRichBolt {
* @param realBoltClassName the fully qualified JRuby bolt implementation class name
* @param fields the output fields names
*/
public JRubyBolt(String baseClassPath, String realBoltClassName, String[] fields) {
public JRubyBolt(String baseClassPath, String realBoltClassName, Map<String, String[]> fields) {
_realBoltClassName = realBoltClassName;
_fields = fields;
_bootstrap = "require '" + baseClassPath + "'";
@ -72,8 +74,13 @@ public class JRubyBolt implements IRichBolt {
// declareOutputFields is executed in the topology creation time, before serialisation.
// just create tmp bolt instance to call declareOutputFields.
if (_fields.length > 0) {
declarer.declare(new Fields(_fields));
if (_fields.size() > 0) {
Iterator iterator = _fields.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, String[]> field = (Map.Entry<String, String[]>)iterator.next();
declarer.declareStream(field.getKey(), new Fields(field.getValue()));
iterator.remove();
}
} else {
IRubyObject ruby_bolt = initialize_ruby_bolt();
IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer);

View File

@ -6,6 +6,7 @@ import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import java.util.Iterator;
import java.util.Map;
import org.jruby.Ruby;
@ -27,7 +28,7 @@ import org.jruby.exceptions.RaiseException;
*/
public class JRubySpout implements IRichSpout {
private final String _realSpoutClassName;
private final String[] _fields;
private final Map<String, String[]> _fields;
private final String _bootstrap;
// transient to avoid serialization
@ -41,7 +42,7 @@ public class JRubySpout implements IRichSpout {
* @param realSpoutClassName the fully qualified JRuby spout implementation class name
* @param fields the output fields names
*/
public JRubySpout(String baseClassPath, String realSpoutClassName, String[] fields) {
public JRubySpout(String baseClassPath, String realSpoutClassName, Map<String, String[]> fields) {
_realSpoutClassName = realSpoutClassName;
_fields = fields;
_bootstrap = "require '" + baseClassPath + "'";
@ -93,8 +94,13 @@ public class JRubySpout implements IRichSpout {
// declareOutputFields is executed in the topology creation time, before serialisation.
// just create tmp spout instance to call declareOutputFields.
if (_fields.length > 0) {
declarer.declare(new Fields(_fields));
if (_fields.size() > 0) {
Iterator iterator = _fields.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, String[]> field = (Map.Entry<String, String[]>)iterator.next();
declarer.declareStream(field.getKey(), new Fields(field.getValue()));
iterator.remove();
}
} else {
IRubyObject ruby_spout = initialize_ruby_spout();
IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer);