Bolt/Spouts in topos need to support streams

This commit makes it possible to declare streams and fields for bolts
and streams inside of the Topology definition.
This commit is contained in:
Keith Walters 2015-03-31 19:56:04 +00:00
parent dbf7888bb7
commit 7b7f3f7c99
7 changed files with 125 additions and 78 deletions

View File

@ -1,6 +1,7 @@
require 'java'
require 'red_storm/configurator'
require 'red_storm/environment'
require 'red_storm/dsl/output_fields'
require 'pathname'
java_import 'backtype.storm.tuple.Fields'
@ -14,6 +15,8 @@ module RedStorm
class Bolt
attr_reader :collector, :context, :config
include OutputFields
def self.java_proxy; "Java::RedstormStormJruby::JRubyBolt"; end
# DSL class methods
@ -22,19 +25,6 @@ module RedStorm
@log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end
def self.output_fields(*fields)
@fields ||= []
fields.each do |field|
if field.kind_of? Hash
@fields << Hash[
field.map { |k, v| [k.to_s, v.kind_of?(Array) ? v.map(&:to_s) : v.to_s] }
]
else
@fields << field.to_s
end
end
end
def self.configure(&configure_block)
@configure_block = block_given? ? configure_block : lambda {}
end
@ -67,10 +57,6 @@ module RedStorm
self.class.log
end
def stream
self.class.stream
end
def unanchored_emit(*values)
@collector.emit_tuple(Values.new(*values))
end
@ -132,21 +118,6 @@ module RedStorm
on_close
end
def declare_output_fields(declarer)
default_fields = []
self.class.fields.each do |field|
if field.kind_of? Hash
field.each do |stream, fields|
declarer.declareStream(stream, Fields.new(fields))
end
else
default_fields << field
end
end
declarer.declare(Fields.new(default_fields.flatten)) unless default_fields.empty?
end
def get_component_configuration
configurator = Configurator.new
configurator.instance_exec(&self.class.configure_block)
@ -159,10 +130,6 @@ module RedStorm
def on_init; end
def on_close; end
def self.fields
@fields ||= []
end
def self.configure_block
@configure_block ||= lambda {}
end
@ -183,14 +150,6 @@ module RedStorm
!!self.receive_options[:anchor]
end
def self.stream?
self.receive_options[:stream] && !self.receive_options[:stream].empty?
end
def self.stream
self.receive_options[:stream]
end
# below non-dry see Spout class
def self.inherited(subclass)
path = (caller.first.to_s =~ /^(.+):\d+.*$/) ? $1 : raise(BoltError, "unable to extract base topology class path from #{caller.first.inspect}")

View File

@ -0,0 +1,58 @@
module RedStorm
module DSL
module OutputFields
def self.included(base)
base.extend ClassMethods
end
def declare_output_fields(declarer)
default_fields = []
self.class.fields.each do |field|
if field.kind_of? Hash
field.each do |stream, fields|
declarer.declareStream(stream, Fields.new(fields))
end
else
default_fields << field
end
end
declarer.declare(Fields.new(default_fields.flatten)) unless default_fields.empty?
end
def stream
self.class.stream
end
module ClassMethods
def output_fields(*fields)
@fields ||= []
fields.each do |field|
if field.kind_of? Hash
@fields << Hash[
field.map { |k, v| [k.to_s, v.kind_of?(Array) ? v.map(&:to_s) : v.to_s] }
]
else
@fields << field.to_s
end
end
end
def fields
@fields ||= []
end
def stream?
self.receive_options[:stream] && !self.receive_options[:stream].empty?
end
def stream
self.receive_options[:stream]
end
end
end
end
end

View File

@ -1,6 +1,7 @@
require 'java'
require 'red_storm/configurator'
require 'red_storm/environment'
require 'red_storm/dsl/output_fields'
require 'pathname'
module RedStorm
@ -11,6 +12,8 @@ module RedStorm
class Spout
attr_reader :config, :context, :collector
include OutputFields
def self.java_proxy; "Java::RedstormStormJruby::JRubySpout"; end
# DSL class methods
@ -23,10 +26,6 @@ module RedStorm
@log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end
def self.output_fields(*fields)
@fields = fields.map(&:to_s)
end
def self.on_send(*args, &on_send_block)
options = args.last.is_a?(Hash) ? args.pop : {}
method_name = args.first
@ -126,10 +125,6 @@ module RedStorm
on_deactivate
end
def declare_output_fields(declarer)
declarer.declare(Fields.new(self.class.fields))
end
def ack(msg_id)
on_ack(msg_id)
end
@ -154,10 +149,6 @@ module RedStorm
def on_ack(msg_id); end
def on_fail(msg_id); end
def self.fields
@fields ||= []
end
def self.configure_block
@configure_block ||= lambda {}
end

View File

@ -26,16 +26,37 @@ module RedStorm
@constructor_args = constructor_args
@id = id.to_s
@parallelism = parallelism
@output_fields = []
@output_fields = Hash.new([])
end
def output_fields(*args)
args.empty? ? @output_fields : @output_fields = args.map(&:to_s)
args.each do |field|
if field.kind_of? Hash
field.each { |k, v| merge_fields(k.to_s, v) }
else
merge_fields('default', field)
end
end
@output_fields
end
def is_java?
@clazz.name.split('::').first.downcase == 'java'
end
private
def java_safe_fields
java_hash = java.util.HashMap.new()
@output_fields.each do |k, v|
java_hash.put(k, v.to_java('java.lang.String'))
end
java_hash
end
def merge_fields(stream, fields)
@output_fields[stream] |= fields.kind_of?(Array) ? fields.map(&:to_s) : [fields.to_s]
end
end
class SpoutDefinition < ComponentDefinition
@ -47,7 +68,7 @@ module RedStorm
elsif is_java?
@clazz.new(*constructor_args)
else
Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields)
Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields)
end
end
end
@ -100,7 +121,7 @@ module RedStorm
elsif is_java?
@clazz.new(*constructor_args)
else
Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, @output_fields)
Object.module_eval(@clazz.java_proxy).new(@clazz.base_class_path, @clazz.name, java_safe_fields)
end
end
end

View File

@ -1,6 +1,8 @@
require 'spec_helper'
require 'red_storm/dsl/topology'
require 'pry'
describe RedStorm::SimpleTopology do
# mock Storm imported classes
@ -112,8 +114,11 @@ describe RedStorm::SimpleTopology do
output_fields :f3
end
end
Topology1.spouts.first.output_fields.should == ["f1", "f2"]
Topology1.spouts.last.output_fields.should == [ "f3"]
# Pry.config.input = STDIN
# Pry.config.output = STDOUT
# binding.pry
Topology1.spouts.first.output_fields.should == { "default" => ["f1", "f2"] }
Topology1.spouts.last.output_fields.should == { "default" => ["f3"] }
end
end
@ -195,8 +200,8 @@ describe RedStorm::SimpleTopology do
output_fields :f3
end
end
Topology1.bolts.first.output_fields.should == ["f1", "f2"]
Topology1.bolts.last.output_fields.should == [ "f3"]
Topology1.bolts.first.output_fields.should == { "default" => ["f1", "f2"] }
Topology1.bolts.last.output_fields.should == { "default" => ["f3"] }
end
end
@ -307,8 +312,8 @@ describe RedStorm::SimpleTopology do
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout1)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", []).and_return(jruby_spout2)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", {}).and_return(jruby_spout1)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", {}).and_return(jruby_spout2)
builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1).and_return(declarer)
builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1).and_return(declarer)
@ -345,8 +350,8 @@ describe RedStorm::SimpleTopology do
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt1)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", []).and_return(jruby_bolt2)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", {}).and_return(jruby_bolt1)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", {}).and_return(jruby_bolt2)
builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return(declarer)
builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer)
@ -377,8 +382,8 @@ describe RedStorm::SimpleTopology do
backtype_config = mock(Backtype::Config)
Backtype::Config.should_receive(:new).any_number_of_times.and_return(backtype_config)
backtype_config.should_receive(:put)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", {}).and_return(jruby_bolt)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", {}).and_return(jruby_spout)
builder.should_receive("setBolt").with('bolt_class1', jruby_bolt, 1).and_return(@declarer)
builder.should_receive("setSpout").with('1', jruby_spout, 1).and_return(@declarer)
@declarer.should_receive("addConfigurations").twice

View File

@ -6,9 +6,11 @@ import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import java.util.Iterator;
import java.util.Map;
import org.jruby.Ruby;
import org.jruby.RubyHash;
import org.jruby.RubyObject;
import org.jruby.runtime.Helpers;
import org.jruby.runtime.builtin.IRubyObject;
@ -27,7 +29,7 @@ import org.jruby.exceptions.RaiseException;
*/
public class JRubyBolt implements IRichBolt {
private final String _realBoltClassName;
private final String[] _fields;
private final Map<String, String[]> _fields;
private final String _bootstrap;
// transient to avoid serialization
@ -41,7 +43,7 @@ public class JRubyBolt implements IRichBolt {
* @param realBoltClassName the fully qualified JRuby bolt implementation class name
* @param fields the output fields names
*/
public JRubyBolt(String baseClassPath, String realBoltClassName, String[] fields) {
public JRubyBolt(String baseClassPath, String realBoltClassName, Map<String, String[]> fields) {
_realBoltClassName = realBoltClassName;
_fields = fields;
_bootstrap = "require '" + baseClassPath + "'";
@ -72,8 +74,13 @@ public class JRubyBolt implements IRichBolt {
// declareOutputFields is executed in the topology creation time, before serialisation.
// just create tmp bolt instance to call declareOutputFields.
if (_fields.length > 0) {
declarer.declare(new Fields(_fields));
if (_fields.size() > 0) {
Iterator iterator = _fields.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, String[]> field = (Map.Entry<String, String[]>)iterator.next();
declarer.declareStream(field.getKey(), new Fields(field.getValue()));
iterator.remove();
}
} else {
IRubyObject ruby_bolt = initialize_ruby_bolt();
IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer);

View File

@ -6,6 +6,7 @@ import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import java.util.Iterator;
import java.util.Map;
import org.jruby.Ruby;
@ -27,7 +28,7 @@ import org.jruby.exceptions.RaiseException;
*/
public class JRubySpout implements IRichSpout {
private final String _realSpoutClassName;
private final String[] _fields;
private final Map<String, String[]> _fields;
private final String _bootstrap;
// transient to avoid serialization
@ -41,7 +42,7 @@ public class JRubySpout implements IRichSpout {
* @param realSpoutClassName the fully qualified JRuby spout implementation class name
* @param fields the output fields names
*/
public JRubySpout(String baseClassPath, String realSpoutClassName, String[] fields) {
public JRubySpout(String baseClassPath, String realSpoutClassName, Map<String, String[]> fields) {
_realSpoutClassName = realSpoutClassName;
_fields = fields;
_bootstrap = "require '" + baseClassPath + "'";
@ -93,8 +94,13 @@ public class JRubySpout implements IRichSpout {
// declareOutputFields is executed in the topology creation time, before serialisation.
// just create tmp spout instance to call declareOutputFields.
if (_fields.length > 0) {
declarer.declare(new Fields(_fields));
if (_fields.size() > 0) {
Iterator iterator = _fields.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, String[]> field = (Map.Entry<String, String[]>)iterator.next();
declarer.declareStream(field.getKey(), new Fields(field.getValue()));
iterator.remove();
}
} else {
IRubyObject ruby_spout = initialize_ruby_spout();
IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer);