issue #41 - suppport shell bolts and spouts, issue #28 - support output_fields definition in topology DSL

This commit is contained in:
Colin Surprenant 2012-07-12 16:18:43 -04:00
parent a87735eef8
commit df9795a9ac
4 changed files with 93 additions and 46 deletions

View File

@ -2,6 +2,7 @@ require 'java'
require 'red_storm/configuration'
require 'red_storm/configurator'
module RedStorm
class TopologyDefinitionError < StandardError; end
@ -13,14 +14,20 @@ module RedStorm
DEFAULT_BOLT_PARALLELISM = 1
class ComponentDefinition < Configurator
attr_reader :clazz, :parallelism
attr_accessor :id # ids are forced to string
attr_reader :clazz, :constructor_args, :parallelism
attr_accessor :output_fields, :id # ids are forced to string
def initialize(component_class, id, parallelism)
def initialize(component_class, constructor_args, id, parallelism)
super()
@clazz = component_class
@constructor_args = constructor_args
@id = id.to_s
@parallelism = parallelism
@output_fields = []
end
def output_fields(*args)
@output_fields = *args
end
def is_java?
@ -29,13 +36,22 @@ module RedStorm
end
class SpoutDefinition < ComponentDefinition
# WARNING non-dry see BoltDefinition#new_instance
def new_instance(base_class_path)
is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name)
if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout"
@clazz.new(constructor_args, @output_fields)
elsif is_java?
@clazz.new(*constructor_args)
else
JRubySpout.new(base_class_path, @clazz.name, @output_fields)
end
# is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name)
end
end
class BoltDefinition < ComponentDefinition
attr_accessor :sources
attr_accessor :sources, :command
def initialize(*args)
super
@ -72,7 +88,15 @@ module RedStorm
end
def new_instance(base_class_path)
is_java? ? @clazz.new : JRubyBolt.new(base_class_path, @clazz.name)
# WARNING non-dry see BoltDefinition#new_instance
if @clazz.name == "Java::RedstormStormJruby::JRubyShellBolt"
@clazz.new(constructor_args, @output_fields)
elsif is_java?
@clazz.new(*constructor_args)
else
JRubyBolt.new(base_class_path, @clazz.name, @output_fields)
end
# is_java? ? @clazz.new : @clazz.is_a?(SimpleBolt) ? JRubyBolt.new(base_class_path, @clazz.name) : @clazz.new
end
end
@ -80,16 +104,24 @@ module RedStorm
@log ||= org.apache.log4j.Logger.getLogger(self.name)
end
def self.spout(spout_class, options = {}, &spout_block)
# def self.spout(spout_class, contructor_args = [], options = {}, &spout_block)
def self.spout(spout_class, *args, &spout_block)
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options)
spout = SpoutDefinition.new(spout_class, spout_options[:id], spout_options[:parallelism])
spout = SpoutDefinition.new(spout_class, contructor_args, spout_options[:id], spout_options[:parallelism])
spout.instance_exec(&spout_block) if block_given?
self.components << spout
end
def self.bolt(bolt_class, options = {}, &bolt_block)
# def self.bolt(bolt_class, contructor_args = [], options = {}, &bolt_block)
def self.bolt(bolt_class, *args, &bolt_block)
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = BoltDefinition.new(bolt_class, bolt_options[:id], bolt_options[:parallelism])
bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
bolt.instance_exec(&bolt_block)
self.components << bolt

View File

@ -37,8 +37,8 @@ describe RedStorm::SimpleTopology do
class SpoutClass2; end
it "should parse single spout without options" do
spout = RedStorm::SimpleTopology::SpoutDefinition.new(SpoutClass1, "spout_class1", 1)
RedStorm::SimpleTopology::SpoutDefinition.should_receive(:new).with(SpoutClass1, "spout_class1", 1).and_return(spout)
spout = RedStorm::SimpleTopology::SpoutDefinition.new(SpoutClass1, [], "spout_class1", 1)
RedStorm::SimpleTopology::SpoutDefinition.should_receive(:new).with(SpoutClass1, [], "spout_class1", 1).and_return(spout)
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1
end
@ -46,10 +46,10 @@ describe RedStorm::SimpleTopology do
end
it "should parse multiple spouts with options" do
spout1 = RedStorm::SimpleTopology::SpoutDefinition.new(SpoutClass1, "id1", 2)
spout2 = RedStorm::SimpleTopology::SpoutDefinition.new(SpoutClass2, "id2", 3)
RedStorm::SimpleTopology::SpoutDefinition.should_receive(:new).with(SpoutClass1, "id1", 2).and_return(spout1)
RedStorm::SimpleTopology::SpoutDefinition.should_receive(:new).with(SpoutClass2, "id2", 3).and_return(spout2)
spout1 = RedStorm::SimpleTopology::SpoutDefinition.new(SpoutClass1, [], "id1", 2)
spout2 = RedStorm::SimpleTopology::SpoutDefinition.new(SpoutClass2, [], "id2", 3)
RedStorm::SimpleTopology::SpoutDefinition.should_receive(:new).with(SpoutClass1, [], "id1", 2).and_return(spout1)
RedStorm::SimpleTopology::SpoutDefinition.should_receive(:new).with(SpoutClass2, [], "id2", 3).and_return(spout2)
class Topology1 < RedStorm::SimpleTopology
spout SpoutClass1, :id => "id1", :parallelism => 2
spout SpoutClass2, :id => "id2", :parallelism => 3
@ -65,8 +65,8 @@ describe RedStorm::SimpleTopology do
class BoltClass2; end
it "should parse single bolt without options" do
bolt_definition = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, "bolt_class1", 1)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, "bolt_class1", 1).and_return(bolt_definition)
bolt_definition = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, [], "bolt_class1", 1)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, [], "bolt_class1", 1).and_return(bolt_definition)
bolt_definition.should_receive(:source).with(1, {:fields => ["f1"]})
class Topology1 < RedStorm::SimpleTopology
bolt BoltClass1 do
@ -77,8 +77,8 @@ describe RedStorm::SimpleTopology do
end
it "should parse single bolt with options" do
bolt_definition = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, "id", 2)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, "id", 2).and_return(bolt_definition)
bolt_definition = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, [], "id", 2)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, [], "id", 2).and_return(bolt_definition)
bolt_definition.should_receive(:source).with(1, :shuffle)
class Topology1 < RedStorm::SimpleTopology
bolt BoltClass1, :id => "id", :parallelism => 2 do
@ -89,10 +89,10 @@ describe RedStorm::SimpleTopology do
end
it "should parse multiple bolt with options" do
bolt_definition1 = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, "id1", 2)
bolt_definition2 = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass2, "id2", 3)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, "id1", 2).and_return(bolt_definition1)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass2, "id2", 3).and_return(bolt_definition2)
bolt_definition1 = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, [], "id1", 2)
bolt_definition2 = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass2, [], "id2", 3)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, [], "id1", 2).and_return(bolt_definition1)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass2, [], "id2", 3).and_return(bolt_definition2)
bolt_definition1.should_receive(:source).with(1, :shuffle)
bolt_definition2.should_receive(:source).with(2, {:fields => ["f1"]})
class Topology1 < RedStorm::SimpleTopology
@ -107,8 +107,8 @@ describe RedStorm::SimpleTopology do
end
it "should parse single symbolic fields" do
bolt_definition = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, "bolt_class1", 1)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, "bolt_class1", 1).and_return(bolt_definition)
bolt_definition = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, [], "bolt_class1", 1)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, [], "bolt_class1", 1).and_return(bolt_definition)
bolt_definition.should_receive(:source).with(1, {:fields => :g1})
class Topology1 < RedStorm::SimpleTopology
bolt BoltClass1 do
@ -119,8 +119,8 @@ describe RedStorm::SimpleTopology do
end
it "should parse symbolic fields array" do
bolt_definition = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, "bolt_class1", 1)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, "bolt_class1", 1).and_return(bolt_definition)
bolt_definition = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, [], "bolt_class1", 1)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, [], "bolt_class1", 1).and_return(bolt_definition)
bolt_definition.should_receive(:source).with(1, {:fields => [:g1, :g2]})
class Topology1 < RedStorm::SimpleTopology
bolt BoltClass1 do
@ -243,8 +243,8 @@ describe RedStorm::SimpleTopology do
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1").and_return(jruby_spout1)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2").and_return(jruby_spout2)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout1)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2", []).and_return(jruby_spout2)
builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1).and_return(declarer)
builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1).and_return(declarer)
@ -256,10 +256,10 @@ describe RedStorm::SimpleTopology do
end
it "should build bolts" do
bolt_definition1 = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, "id1", 2)
bolt_definition2 = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass2, "id2", 3)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, "id1", 2).and_return(bolt_definition1)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass2, "id2", 3).and_return(bolt_definition2)
bolt_definition1 = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass1, [], "id1", 2)
bolt_definition2 = RedStorm::SimpleTopology::BoltDefinition.new(BoltClass2, [], "id2", 3)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass1, [], "id1", 2).and_return(bolt_definition1)
RedStorm::SimpleTopology::BoltDefinition.should_receive(:new).with(BoltClass2, [], "id2", 3).and_return(bolt_definition2)
bolt_definition1.should_receive(:source).with(1, :shuffle)
bolt_definition2.should_receive(:source).with(2, {:fields => ["f1"]})
@ -280,8 +280,8 @@ describe RedStorm::SimpleTopology do
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1").and_return(jruby_bolt1)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2").and_return(jruby_bolt2)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt1)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", []).and_return(jruby_bolt2)
builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return(declarer)
builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer)
@ -310,8 +310,8 @@ describe RedStorm::SimpleTopology do
@declarer = mock("InputDeclarer")
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1").and_return(jruby_bolt)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1").and_return(jruby_spout)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1", []).and_return(jruby_bolt)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1", []).and_return(jruby_spout)
builder.should_receive("setBolt").with('bolt_class1', jruby_bolt, 1).and_return(@declarer)
builder.should_receive("setSpout").with('1', jruby_spout, 1).and_return(@declarer)
@declarer.should_receive("addConfigurations").twice

View File

@ -5,6 +5,7 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import java.util.Map;
/**
@ -20,16 +21,19 @@ import java.util.Map;
public class JRubyBolt implements IRichBolt {
IRichBolt _proxyBolt;
String _realBoltClassName;
String _baseClassPath;
String _baseClassPath;
String[] _fields;
/**
* create a new JRubyBolt
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realBoltClassName the fully qualified JRuby bolt implementation class name
*/
public JRubyBolt(String baseClassPath, String realBoltClassName) {
public JRubyBolt(String baseClassPath, String realBoltClassName, String[] fields) {
_baseClassPath = baseClassPath;
_realBoltClassName = realBoltClassName;
_fields = fields;
}
@Override
@ -54,8 +58,12 @@ public class JRubyBolt implements IRichBolt {
// declareOutputFields is executed in the topology creation time, before serialisation.
// do not set the _proxyBolt instance variable here to avoid JRuby serialization
// issues. Just create tmp bolt instance to call declareOutputFields.
IRichBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName);
bolt.declareOutputFields(declarer);
if (_fields.length > 0) {
declarer.declare(new Fields(_fields));
} else {
IRichBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName);
bolt.declareOutputFields(declarer);
}
}
@Override

View File

@ -5,6 +5,7 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import java.util.Map;
/**
@ -21,16 +22,18 @@ public class JRubySpout implements IRichSpout {
IRichSpout _proxySpout;
String _realSpoutClassName;
String _baseClassPath;
String[] _fields;
/**
* create a new JRubySpout
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realSpoutClassName the fully qualified JRuby spout implementation class name
*/
public JRubySpout(String baseClassPath, String realSpoutClassName) {
public JRubySpout(String baseClassPath, String realSpoutClassName, String[] fields) {
_baseClassPath = baseClassPath;
_realSpoutClassName = realSpoutClassName;
_fields = fields;
}
@Override
@ -75,8 +78,12 @@ public class JRubySpout implements IRichSpout {
// declareOutputFields is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call declareOutputFields.
IRichSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
spout.declareOutputFields(declarer);
if (_fields.length > 0) {
declarer.declare(new Fields(_fields));
} else {
IRichSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
spout.declareOutputFields(declarer);
}
}
@Override