issue #84 - remove simple from DSL classes and add DSL namespace

This commit is contained in:
Colin Surprenant 2013-06-14 01:37:09 -04:00
parent e1c11816ae
commit 3d717383e9
12 changed files with 692 additions and 668 deletions

View File

@ -1,9 +1,10 @@
require 'rubygems'
require 'red_storm/environment'
require 'red_storm/version'
require 'red_storm/environment'
require 'red_storm/configuration'
require 'red_storm/simple_bolt'
require 'red_storm/simple_spout'
require 'red_storm/simple_topology'
require 'red_storm/simple_drpc_topology'
require 'red_storm/dsl/bolt'
require 'red_storm/dsl/spout'
require 'red_storm/dsl/topology'
require 'red_storm/dsl/drpc_topology'
require 'red_storm/dsl/tuple'

141
lib/red_storm/dsl/bolt.rb Normal file
View File

@ -0,0 +1,141 @@
require 'java'
require 'red_storm/configurator'
module RedStorm
module DSL
class Bolt
attr_reader :collector, :context, :config
# DSL class methods
def self.log
@log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end
def self.output_fields(*fields)
@fields = fields.map(&:to_s)
end
def self.configure(&configure_block)
@configure_block = block_given? ? configure_block : lambda {}
end
def self.on_receive(*args, &on_receive_block)
options = args.last.is_a?(Hash) ? args.pop : {}
method_name = args.first
self.receive_options.merge!(options)
@on_receive_block = block_given? ? on_receive_block : lambda {|tuple| self.send(method_name || :on_receive, tuple)}
end
def self.on_init(method_name = nil, &on_init_block)
@on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)}
end
def self.on_close(method_name = nil, &close_block)
@close_block = block_given? ? close_block : lambda {self.send(method_name || :on_close)}
end
# DSL instance methods
def log
self.class.log
end
def unanchored_emit(*values)
@collector.emit(Values.new(*values))
end
def anchored_emit(tuple, *values)
@collector.emit(tuple, Values.new(*values))
end
def ack(tuple)
@collector.ack(tuple)
end
def fail(tuple)
@collector.fail(tuple)
end
# Bolt proxy interface
def execute(tuple)
output = instance_exec(tuple, &self.class.on_receive_block)
if output && self.class.emit?
values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output
values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)}
@collector.ack(tuple) if self.class.ack?
end
end
def prepare(config, context, collector)
@collector = collector
@context = context
@config = config
instance_exec(&self.class.on_init_block)
end
def cleanup
instance_exec(&self.class.close_block)
end
def declare_output_fields(declarer)
declarer.declare(Fields.new(self.class.fields))
end
def get_component_configuration
configurator = Configurator.new
configurator.instance_exec(&self.class.configure_block)
configurator.config
end
private
# default noop optional dsl callbacks
def on_init; end
def on_close; end
def self.fields
@fields ||= []
end
def self.configure_block
@configure_block ||= lambda {}
end
def self.on_receive_block
@on_receive_block ||= lambda {|tuple| self.send(:on_receive, tuple)}
end
def self.on_init_block
@on_init_block ||= lambda {self.send(:on_init)}
end
def self.close_block
@close_block ||= lambda {self.send(:on_close)}
end
def self.receive_options
@receive_options ||= {:emit => true, :ack => false, :anchor => false}
end
def self.emit?
!!self.receive_options[:emit]
end
def self.ack?
!!self.receive_options[:ack]
end
def self.anchor?
!!self.receive_options[:anchor]
end
end
end
# for backward compatibility
SimpleBolt = DSL::Bolt
end

View File

@ -0,0 +1,92 @@
require 'java'
require 'red_storm/configuration'
require 'red_storm/configurator'
module RedStorm
module DSL
class InputBoltDefinition < Topology::BoltDefinition
attr_accessor :grouping
def initialize(*args)
super
@grouping = :none
end
def grouping(grouping)
@grouping = grouping
end
def define_grouping(declarer)
case @grouping
when :fields
declarer.fieldsGrouping(Fields.new(*([params].flatten.map(&:to_s))))
when :global
declarer.globalGrouping()
when :shuffle
declarer.shuffleGrouping()
when :local_or_shuffle
declarer.localOrShuffleGrouping()
when :none
declarer.noneGrouping()
when :all
declarer.allGrouping()
when :direct
declarer.directGrouping()
else
raise("unknown grouper=#{grouper.inspect}")
end
end
end
class DRPCTopology < Topology
def self.spout
raise TopologyDefinitionError, "DRPC spout is already defined"
end
def start(base_class_path, env)
builder = Java::BacktypeStormDrpc::LinearDRPCTopologyBuilder.new(self.class.topology_name)
self.class.bolts.each do |bolt|
declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end
# set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}
configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)
drpc = nil
if env == :local
drpc = LocalDRPC.new
submitter = @cluster = LocalCluster.new
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createLocalTopology(drpc))
else
submitter = StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createRemoteTopology)
end
instance_exec(env, drpc, &self.class.submit_block)
end
def self.input_bolt(bolt_class, *args, &bolt_block)
set_topology_class!
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = InputBoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
bolt.instance_exec(&bolt_block)
self.components << bolt
end
end
end
# for backward compatibility
SimpleDRPCTopology = DSL::DRPCTopology
end

190
lib/red_storm/dsl/spout.rb Normal file
View File

@ -0,0 +1,190 @@
require 'java'
require 'red_storm/configurator'
module RedStorm
module DSL
class Spout
attr_reader :config, :context, :collector
# DSL class methods
def self.configure(&configure_block)
@configure_block = block_given? ? configure_block : lambda {}
end
def self.log
@log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end
def self.output_fields(*fields)
@fields = fields.map(&:to_s)
end
def self.on_send(*args, &on_send_block)
options = args.last.is_a?(Hash) ? args.pop : {}
method_name = args.first
self.send_options.merge!(options)
@on_send_block = block_given? ? on_send_block : lambda {self.send(method_name || :on_send)}
end
def self.on_init(method_name = nil, &on_init_block)
@on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)}
end
def self.on_close(method_name = nil, &on_close_block)
@on_close_block = block_given? ? on_close_block : lambda {self.send(method_name || :on_close)}
end
def self.on_activate(method_name = nil, &on_activate_block)
@on_activate_block = block_given? ? on_activate_block : lambda {self.send(method_name || :on_activate)}
end
def self.on_deactivate(method_name = nil, &on_deactivate_block)
@on_deactivate_block = block_given? ? on_deactivate_block : lambda {self.send(method_name || :on_deactivate)}
end
def self.on_ack(method_name = nil, &on_ack_block)
@on_ack_block = block_given? ? on_ack_block : lambda {|msg_id| self.send(method_name || :on_ack, msg_id)}
end
def self.on_fail(method_name = nil, &on_fail_block)
@on_fail_block = block_given? ? on_fail_block : lambda {|msg_id| self.send(method_name || :on_fail, msg_id)}
end
# DSL instance methods
def reliable_emit(message_id, *values)
@collector.emit(Values.new(*values), message_id)
end
def unreliable_emit(*values)
@collector.emit(Values.new(*values))
end
alias_method :emit, :unreliable_emit
def log
self.class.log
end
# Spout proxy interface
def next_tuple
output = instance_exec(&self.class.on_send_block)
if self.class.emit?
if output
values = [output].flatten
if self.class.reliable?
message_id = values.shift
reliable_emit(message_id, *values)
else
unreliable_emit(*values)
end
else
sleep(0.1)
end
end
end
def open(config, context, collector)
@collector = collector
@context = context
@config = config
instance_exec(&self.class.on_init_block)
end
def close
instance_exec(&self.class.on_close_block)
end
def activate
instance_exec(&self.class.on_activate_block)
end
def deactivate
instance_exec(&self.class.on_deactivate_block)
end
def declare_output_fields(declarer)
declarer.declare(Fields.new(self.class.fields))
end
def ack(msg_id)
instance_exec(msg_id, &self.class.on_ack_block)
end
def fail(msg_id)
instance_exec(msg_id, &self.class.on_fail_block)
end
def get_component_configuration
configurator = Configurator.new
configurator.instance_exec(&self.class.configure_block)
configurator.config
end
private
# default optional noop dsl methods/callbacks
def on_init; end
def on_close; end
def on_activate; end
def on_deactivate; end
def on_ack(msg_id); end
def on_fail(msg_id); end
def self.fields
@fields ||= []
end
def self.configure_block
@configure_block ||= lambda {}
end
def self.on_send_block
@on_send_block ||= lambda {self.send(:on_send)}
end
def self.on_init_block
@on_init_block ||= lambda {self.send(:on_init)}
end
def self.on_close_block
@on_close_block ||= lambda {self.send(:on_close)}
end
def self.on_activate_block
@on_activate_block ||= lambda {self.send(:on_activate)}
end
def self.on_deactivate_block
@on_deactivate_block ||= lambda {self.send(:on_deactivate)}
end
def self.on_ack_block
@on_ack_block ||= lambda {|msg_id| self.send(:on_ack, msg_id)}
end
def self.on_fail_block
@on_fail_block ||= lambda {|msg_id| self.send(:on_fail, msg_id)}
end
def self.send_options
@send_options ||= {:emit => true, :reliable => false}
end
def self.emit?
!!self.send_options[:emit]
end
def self.reliable?
!!self.send_options[:reliable]
end
end
end
# for backward compatibility
SimpleSpout = DSL::Spout
end

View File

@ -0,0 +1,225 @@
require 'java'
require 'red_storm/configuration'
require 'red_storm/configurator'
module RedStorm
module DSL
class TopologyDefinitionError < StandardError; end
class Topology
attr_reader :cluster # LocalCluster reference usable in on_submit block, for example
DEFAULT_SPOUT_PARALLELISM = 1
DEFAULT_BOLT_PARALLELISM = 1
class ComponentDefinition < Configurator
attr_reader :clazz, :constructor_args, :parallelism
attr_accessor :id # ids are forced to string
def initialize(component_class, constructor_args, id, parallelism)
super()
@clazz = component_class
@constructor_args = constructor_args
@id = id.to_s
@parallelism = parallelism
@output_fields = []
end
def output_fields(*args)
args.empty? ? @output_fields : @output_fields = args.map(&:to_s)
end
def is_java?
@clazz.name.split('::').first.downcase == 'java'
end
end
class SpoutDefinition < ComponentDefinition
# WARNING non-dry see BoltDefinition#new_instance
def new_instance(base_class_path)
if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout"
@clazz.new(constructor_args, @output_fields)
elsif is_java?
@clazz.new(*constructor_args)
else
JRubySpout.new(base_class_path, @clazz.name, @output_fields)
end
# is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name)
end
end
class BoltDefinition < ComponentDefinition
attr_accessor :sources, :command
def initialize(*args)
super
@sources = []
end
def source(source_id, grouping)
@sources << [source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}]
end
def define_grouping(declarer)
@sources.each do |source_id, grouping|
grouper, params = grouping.first
# declarer.fieldsGrouping(source_id, Fields.new())
case grouper
when :fields
declarer.fieldsGrouping(source_id, Fields.new(*([params].flatten.map(&:to_s))))
when :global
declarer.globalGrouping(source_id)
when :shuffle
declarer.shuffleGrouping(source_id)
when :local_or_shuffle
declarer.localOrShuffleGrouping(source_id)
when :none
declarer.noneGrouping(source_id)
when :all
declarer.allGrouping(source_id)
when :direct
declarer.directGrouping(source_id)
else
raise("unknown grouper=#{grouper.inspect}")
end
end
end
def new_instance(base_class_path)
# WARNING non-dry see BoltDefinition#new_instance
if @clazz.name == "Java::RedstormStormJruby::JRubyShellBolt"
@clazz.new(constructor_args, @output_fields)
elsif is_java?
@clazz.new(*constructor_args)
else
JRubyBolt.new(base_class_path, @clazz.name, @output_fields)
end
# is_java? ? @clazz.new : @clazz.is_a?(Bolt) ? JRubyBolt.new(base_class_path, @clazz.name) : @clazz.new
end
end
def self.log
@log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end
# def self.spout(spout_class, contructor_args = [], options = {}, &spout_block)
def self.spout(spout_class, *args, &spout_block)
set_topology_class!
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options)
spout = SpoutDefinition.new(spout_class, contructor_args, spout_options[:id], spout_options[:parallelism])
spout.instance_exec(&spout_block) if block_given?
self.components << spout
end
# def self.bolt(bolt_class, contructor_args = [], options = {}, &bolt_block)
def self.bolt(bolt_class, *args, &bolt_block)
set_topology_class!
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
bolt.instance_exec(&bolt_block)
self.components << bolt
end
def self.configure(name = nil, &configure_block)
set_topology_class!
@topology_name = name.to_s if name
@configure_block = configure_block if block_given?
end
def self.on_submit(method_name = nil, &submit_block)
@submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)}
end
# topology proxy interface
def start(base_class_path, env)
self.class.resolve_ids!(self.class.components)
builder = TopologyBuilder.new
self.class.spouts.each do |spout|
declarer = builder.setSpout(spout.id, spout.new_instance(base_class_path), spout.parallelism.to_java)
declarer.addConfigurations(spout.config)
end
self.class.bolts.each do |bolt|
declarer = builder.setBolt(bolt.id, bolt.new_instance(base_class_path), bolt.parallelism.to_java)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end
# set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}
configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)
submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology)
instance_exec(env, &self.class.submit_block)
end
private
# this is a quirk to figure out the topology class at load time when the topology file
# is required in the TopologyLauncher. Since we want to make the "configure" DSL statement
# optional we can hook into any/all the other DSL statements that will be called at load time
# and set it there. This is somewhat inelegant but it works.
def self.set_topology_class!
Configuration.topology_class = self
end
def self.resolve_ids!(components)
# verify duplicate implicit ids
ids = components.map(&:id)
components.reverse.each do |component|
raise("duplicate id in #{component.clazz.name} on id=#{component.id}") if ids.select{|id| id == component.id}.size > 1
# verify source_id references
if component.respond_to?(:sources)
component.sources.each{|source_id, grouping| raise("cannot resolve #{component.clazz.name} source id=#{source_id}") unless ids.include?(source_id)}
end
end
end
def self.spouts
self.components.select{|c| c.is_a?(SpoutDefinition)}
end
def self.bolts
self.components.select{|c| c.is_a?(BoltDefinition)}
end
def self.components
@components ||= []
end
def self.topology_name
@topology_name ||= self.underscore(self.name)
end
def self.configure_block
@configure_block ||= lambda{|env|}
end
def self.submit_block
@submit_block ||= lambda{|env|}
end
def self.underscore(camel_case)
camel_case.to_s.split('::').last.gsub(/(.)([A-Z])/,'\1_\2').downcase!
end
end
end
# for backward compatibility
SimpleTopology = DSL::Topology
end

View File

@ -1,135 +0,0 @@
require 'java'
require 'red_storm/configurator'
module RedStorm
class SimpleBolt
attr_reader :collector, :context, :config
# DSL class methods
def self.log
@log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end
def self.output_fields(*fields)
@fields = fields.map(&:to_s)
end
def self.configure(&configure_block)
@configure_block = block_given? ? configure_block : lambda {}
end
def self.on_receive(*args, &on_receive_block)
options = args.last.is_a?(Hash) ? args.pop : {}
method_name = args.first
self.receive_options.merge!(options)
@on_receive_block = block_given? ? on_receive_block : lambda {|tuple| self.send(method_name || :on_receive, tuple)}
end
def self.on_init(method_name = nil, &on_init_block)
@on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)}
end
def self.on_close(method_name = nil, &close_block)
@close_block = block_given? ? close_block : lambda {self.send(method_name || :on_close)}
end
# DSL instance methods
def log
self.class.log
end
def unanchored_emit(*values)
@collector.emit(Values.new(*values))
end
def anchored_emit(tuple, *values)
@collector.emit(tuple, Values.new(*values))
end
def ack(tuple)
@collector.ack(tuple)
end
def fail(tuple)
@collector.fail(tuple)
end
# Bolt proxy interface
def execute(tuple)
output = instance_exec(tuple, &self.class.on_receive_block)
if output && self.class.emit?
values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output
values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)}
@collector.ack(tuple) if self.class.ack?
end
end
def prepare(config, context, collector)
@collector = collector
@context = context
@config = config
instance_exec(&self.class.on_init_block)
end
def cleanup
instance_exec(&self.class.close_block)
end
def declare_output_fields(declarer)
declarer.declare(Fields.new(self.class.fields))
end
def get_component_configuration
configurator = Configurator.new
configurator.instance_exec(&self.class.configure_block)
configurator.config
end
private
# default noop optional dsl callbacks
def on_init; end
def on_close; end
def self.fields
@fields ||= []
end
def self.configure_block
@configure_block ||= lambda {}
end
def self.on_receive_block
@on_receive_block ||= lambda {|tuple| self.send(:on_receive, tuple)}
end
def self.on_init_block
@on_init_block ||= lambda {self.send(:on_init)}
end
def self.close_block
@close_block ||= lambda {self.send(:on_close)}
end
def self.receive_options
@receive_options ||= {:emit => true, :ack => false, :anchor => false}
end
def self.emit?
!!self.receive_options[:emit]
end
def self.ack?
!!self.receive_options[:ack]
end
def self.anchor?
!!self.receive_options[:anchor]
end
end
end

View File

@ -1,87 +0,0 @@
require 'java'
require 'red_storm/configuration'
require 'red_storm/configurator'
module RedStorm
class InputBoltDefinition < SimpleTopology::BoltDefinition
attr_accessor :grouping
def initialize(*args)
super
@grouping = :none
end
def grouping(grouping)
@grouping = grouping
end
def define_grouping(declarer)
case @grouping
when :fields
declarer.fieldsGrouping(Fields.new(*([params].flatten.map(&:to_s))))
when :global
declarer.globalGrouping()
when :shuffle
declarer.shuffleGrouping()
when :local_or_shuffle
declarer.localOrShuffleGrouping()
when :none
declarer.noneGrouping()
when :all
declarer.allGrouping()
when :direct
declarer.directGrouping()
else
raise("unknown grouper=#{grouper.inspect}")
end
end
end
class SimpleDRPCTopology < SimpleTopology
def self.spout
raise TopologyDefinitionError, "DRPC spout is already defined"
end
def start(base_class_path, env)
builder = Java::BacktypeStormDrpc::LinearDRPCTopologyBuilder.new(self.class.topology_name)
self.class.bolts.each do |bolt|
declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end
# set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}
configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)
drpc = nil
if env == :local
drpc = LocalDRPC.new
submitter = @cluster = LocalCluster.new
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createLocalTopology(drpc))
else
submitter = StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createRemoteTopology)
end
instance_exec(env, drpc, &self.class.submit_block)
end
def self.input_bolt(bolt_class, *args, &bolt_block)
set_topology_class!
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = InputBoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
bolt.instance_exec(&bolt_block)
self.components << bolt
end
end
end

View File

@ -1,184 +0,0 @@
require 'java'
require 'red_storm/configurator'
module RedStorm
class SimpleSpout
attr_reader :config, :context, :collector
# DSL class methods
def self.configure(&configure_block)
@configure_block = block_given? ? configure_block : lambda {}
end
def self.log
@log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end
def self.output_fields(*fields)
@fields = fields.map(&:to_s)
end
def self.on_send(*args, &on_send_block)
options = args.last.is_a?(Hash) ? args.pop : {}
method_name = args.first
self.send_options.merge!(options)
@on_send_block = block_given? ? on_send_block : lambda {self.send(method_name || :on_send)}
end
def self.on_init(method_name = nil, &on_init_block)
@on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)}
end
def self.on_close(method_name = nil, &on_close_block)
@on_close_block = block_given? ? on_close_block : lambda {self.send(method_name || :on_close)}
end
def self.on_activate(method_name = nil, &on_activate_block)
@on_activate_block = block_given? ? on_activate_block : lambda {self.send(method_name || :on_activate)}
end
def self.on_deactivate(method_name = nil, &on_deactivate_block)
@on_deactivate_block = block_given? ? on_deactivate_block : lambda {self.send(method_name || :on_deactivate)}
end
def self.on_ack(method_name = nil, &on_ack_block)
@on_ack_block = block_given? ? on_ack_block : lambda {|msg_id| self.send(method_name || :on_ack, msg_id)}
end
def self.on_fail(method_name = nil, &on_fail_block)
@on_fail_block = block_given? ? on_fail_block : lambda {|msg_id| self.send(method_name || :on_fail, msg_id)}
end
# DSL instance methods
def reliable_emit(message_id, *values)
@collector.emit(Values.new(*values), message_id)
end
def unreliable_emit(*values)
@collector.emit(Values.new(*values))
end
alias_method :emit, :unreliable_emit
def log
self.class.log
end
# Spout proxy interface
def next_tuple
output = instance_exec(&self.class.on_send_block)
if self.class.emit?
if output
values = [output].flatten
if self.class.reliable?
message_id = values.shift
reliable_emit(message_id, *values)
else
unreliable_emit(*values)
end
else
sleep(0.1)
end
end
end
def open(config, context, collector)
@collector = collector
@context = context
@config = config
instance_exec(&self.class.on_init_block)
end
def close
instance_exec(&self.class.on_close_block)
end
def activate
instance_exec(&self.class.on_activate_block)
end
def deactivate
instance_exec(&self.class.on_deactivate_block)
end
def declare_output_fields(declarer)
declarer.declare(Fields.new(self.class.fields))
end
def ack(msg_id)
instance_exec(msg_id, &self.class.on_ack_block)
end
def fail(msg_id)
instance_exec(msg_id, &self.class.on_fail_block)
end
def get_component_configuration
configurator = Configurator.new
configurator.instance_exec(&self.class.configure_block)
configurator.config
end
private
# default optional noop dsl methods/callbacks
def on_init; end
def on_close; end
def on_activate; end
def on_deactivate; end
def on_ack(msg_id); end
def on_fail(msg_id); end
def self.fields
@fields ||= []
end
def self.configure_block
@configure_block ||= lambda {}
end
def self.on_send_block
@on_send_block ||= lambda {self.send(:on_send)}
end
def self.on_init_block
@on_init_block ||= lambda {self.send(:on_init)}
end
def self.on_close_block
@on_close_block ||= lambda {self.send(:on_close)}
end
def self.on_activate_block
@on_activate_block ||= lambda {self.send(:on_activate)}
end
def self.on_deactivate_block
@on_deactivate_block ||= lambda {self.send(:on_deactivate)}
end
def self.on_ack_block
@on_ack_block ||= lambda {|msg_id| self.send(:on_ack, msg_id)}
end
def self.on_fail_block
@on_fail_block ||= lambda {|msg_id| self.send(:on_fail, msg_id)}
end
def self.send_options
@send_options ||= {:emit => true, :reliable => false}
end
def self.emit?
!!self.send_options[:emit]
end
def self.reliable?
!!self.send_options[:reliable]
end
end
end

View File

@ -1,219 +0,0 @@
require 'java'
require 'red_storm/configuration'
require 'red_storm/configurator'
module RedStorm
class TopologyDefinitionError < StandardError; end
class SimpleTopology
attr_reader :cluster # LocalCluster reference usable in on_submit block, for example
DEFAULT_SPOUT_PARALLELISM = 1
DEFAULT_BOLT_PARALLELISM = 1
class ComponentDefinition < Configurator
attr_reader :clazz, :constructor_args, :parallelism
attr_accessor :id # ids are forced to string
def initialize(component_class, constructor_args, id, parallelism)
super()
@clazz = component_class
@constructor_args = constructor_args
@id = id.to_s
@parallelism = parallelism
@output_fields = []
end
def output_fields(*args)
args.empty? ? @output_fields : @output_fields = args.map(&:to_s)
end
def is_java?
@clazz.name.split('::').first.downcase == 'java'
end
end
class SpoutDefinition < ComponentDefinition
# WARNING non-dry see BoltDefinition#new_instance
def new_instance(base_class_path)
if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout"
@clazz.new(constructor_args, @output_fields)
elsif is_java?
@clazz.new(*constructor_args)
else
JRubySpout.new(base_class_path, @clazz.name, @output_fields)
end
# is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name)
end
end
class BoltDefinition < ComponentDefinition
attr_accessor :sources, :command
def initialize(*args)
super
@sources = []
end
def source(source_id, grouping)
@sources << [source_id.is_a?(Class) ? SimpleTopology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}]
end
def define_grouping(declarer)
@sources.each do |source_id, grouping|
grouper, params = grouping.first
# declarer.fieldsGrouping(source_id, Fields.new())
case grouper
when :fields
declarer.fieldsGrouping(source_id, Fields.new(*([params].flatten.map(&:to_s))))
when :global
declarer.globalGrouping(source_id)
when :shuffle
declarer.shuffleGrouping(source_id)
when :local_or_shuffle
declarer.localOrShuffleGrouping(source_id)
when :none
declarer.noneGrouping(source_id)
when :all
declarer.allGrouping(source_id)
when :direct
declarer.directGrouping(source_id)
else
raise("unknown grouper=#{grouper.inspect}")
end
end
end
def new_instance(base_class_path)
# WARNING non-dry see BoltDefinition#new_instance
if @clazz.name == "Java::RedstormStormJruby::JRubyShellBolt"
@clazz.new(constructor_args, @output_fields)
elsif is_java?
@clazz.new(*constructor_args)
else
JRubyBolt.new(base_class_path, @clazz.name, @output_fields)
end
# is_java? ? @clazz.new : @clazz.is_a?(SimpleBolt) ? JRubyBolt.new(base_class_path, @clazz.name) : @clazz.new
end
end
def self.log
@log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name)
end
# def self.spout(spout_class, contructor_args = [], options = {}, &spout_block)
def self.spout(spout_class, *args, &spout_block)
set_topology_class!
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options)
spout = SpoutDefinition.new(spout_class, contructor_args, spout_options[:id], spout_options[:parallelism])
spout.instance_exec(&spout_block) if block_given?
self.components << spout
end
# def self.bolt(bolt_class, contructor_args = [], options = {}, &bolt_block)
def self.bolt(bolt_class, *args, &bolt_block)
set_topology_class!
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
bolt.instance_exec(&bolt_block)
self.components << bolt
end
def self.configure(name = nil, &configure_block)
set_topology_class!
@topology_name = name.to_s if name
@configure_block = configure_block if block_given?
end
def self.on_submit(method_name = nil, &submit_block)
@submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)}
end
# topology proxy interface
def start(base_class_path, env)
self.class.resolve_ids!(self.class.components)
builder = TopologyBuilder.new
self.class.spouts.each do |spout|
declarer = builder.setSpout(spout.id, spout.new_instance(base_class_path), spout.parallelism.to_java)
declarer.addConfigurations(spout.config)
end
self.class.bolts.each do |bolt|
declarer = builder.setBolt(bolt.id, bolt.new_instance(base_class_path), bolt.parallelism.to_java)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end
# set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}
configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)
submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology)
instance_exec(env, &self.class.submit_block)
end
private
# this is a quirk to figure out the topology class at load time when the topology file
# is required in the TopologyLauncher. Since we want to make the "configure" DSL statement
# optional we can hook into any/all the other DSL statements that will be called at load time
# and set it there. This is somewhat inelegant but it works.
def self.set_topology_class!
Configuration.topology_class = self
end
def self.resolve_ids!(components)
# verify duplicate implicit ids
ids = components.map(&:id)
components.reverse.each do |component|
raise("duplicate id in #{component.clazz.name} on id=#{component.id}") if ids.select{|id| id == component.id}.size > 1
# verify source_id references
if component.respond_to?(:sources)
component.sources.each{|source_id, grouping| raise("cannot resolve #{component.clazz.name} source id=#{source_id}") unless ids.include?(source_id)}
end
end
end
def self.spouts
self.components.select{|c| c.is_a?(SpoutDefinition)}
end
def self.bolts
self.components.select{|c| c.is_a?(BoltDefinition)}
end
def self.components
@components ||= []
end
def self.topology_name
@topology_name ||= self.underscore(self.name)
end
def self.configure_block
@configure_block ||= lambda{|env|}
end
def self.submit_block
@submit_block ||= lambda{|env|}
end
def self.underscore(camel_case)
camel_case.to_s.split('::').last.gsub(/(.)([A-Z])/,'\1_\2').downcase!
end
end
end

View File

@ -1,5 +1,5 @@
require 'spec_helper'
require 'red_storm/simple_bolt'
require 'red_storm/dsl/bolt'
describe RedStorm::SimpleBolt do
@ -495,19 +495,19 @@ describe RedStorm::SimpleBolt do
it "should anchor on single value output" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :anchor => true do |tuple|
on_receive :anchor => true do |tuple|
"output"
end
end
class Bolt2 < RedStorm::SimpleBolt
on_receive :my_method, :anchor => true
def my_method(tuple)
on_receive :my_method, :anchor => true
def my_method(tuple)
"output"
end
end
class Bolt3 < RedStorm::SimpleBolt
on_receive :anchor => true
def on_receive(tuple)
on_receive :anchor => true
def on_receive(tuple)
"output"
end
end
@ -531,19 +531,19 @@ describe RedStorm::SimpleBolt do
it "should ack on single value output" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :anchor => true, :ack => true do |tuple|
on_receive :anchor => true, :ack => true do |tuple|
"output"
end
end
class Bolt2 < RedStorm::SimpleBolt
on_receive :my_method, :anchor => true, :ack => true
def my_method(tuple)
def my_method(tuple)
"output"
end
end
class Bolt3 < RedStorm::SimpleBolt
on_receive :anchor => true, :ack => true
def on_receive(tuple)
on_receive :anchor => true, :ack => true
def on_receive(tuple)
"output"
end
end
@ -568,7 +568,7 @@ describe RedStorm::SimpleBolt do
it "should not emit" do
class Bolt1 < RedStorm::SimpleBolt
on_receive :emit => false do |tuple|
on_receive :emit => false do |tuple|
tuple
end
end
@ -580,7 +580,7 @@ describe RedStorm::SimpleBolt do
end
class Bolt3 < RedStorm::SimpleBolt
on_receive :emit => false
def on_receive(tuple)
def on_receive(tuple)
tuple
end
end

View File

@ -1,5 +1,5 @@
require 'spec_helper'
require 'red_storm/simple_spout'
require 'red_storm/dsl/spout'
describe RedStorm::SimpleSpout do
@ -33,8 +33,8 @@ describe RedStorm::SimpleSpout do
RedStorm::SimpleSpout.should respond_to :on_deactivate
RedStorm::SimpleSpout.should respond_to :on_send
RedStorm::SimpleSpout.should respond_to :on_ack
RedStorm::SimpleSpout.should respond_to :on_fail
RedStorm::SimpleSpout.should respond_to :log
RedStorm::SimpleSpout.should respond_to :on_fail
RedStorm::SimpleSpout.should respond_to :log
end
it "should implement dsl instance statements" do
@ -514,7 +514,7 @@ describe RedStorm::SimpleSpout do
it "should auto reliable emit on single value output" do
class Spout1 < RedStorm::SimpleSpout
on_send :reliable => true do
on_send :reliable => true do
[1, "output"]
end
end
@ -577,7 +577,7 @@ describe RedStorm::SimpleSpout do
it "should auto reliable emit on multiple values output" do
class Spout1 < RedStorm::SimpleSpout
on_send :reliable => true do
on_send :reliable => true do
[1, "output1", "output2"]
end
end
@ -643,7 +643,7 @@ describe RedStorm::SimpleSpout do
it "should respect :emit => false" do
class Spout1 < RedStorm::SimpleSpout
on_send :emit => false do
on_send :emit => false do
"output"
end
end
@ -678,12 +678,12 @@ describe RedStorm::SimpleSpout do
it "should support manual emit" do
class Spout1 < RedStorm::SimpleSpout
on_send :emit => false do
on_send :emit => false do
reliable_emit 1, "reliable output"
end
end
class Spout2 < RedStorm::SimpleSpout
on_send :emit => false do
on_send :emit => false do
unreliable_emit "unreliable output"
end
end
@ -777,7 +777,7 @@ describe RedStorm::SimpleSpout do
spout.close
end
end
describe "declare_output_fields" do
it "should declare fields" do
class Spout1 < RedStorm::SimpleSpout

View File

@ -1,5 +1,5 @@
require 'spec_helper'
require 'red_storm/simple_topology'
require 'red_storm/dsl/topology'
describe RedStorm::SimpleTopology do
@ -266,7 +266,7 @@ describe RedStorm::SimpleTopology do
RedStorm::LocalCluster.should_receive(:new).and_return(cluster)
cluster.should_receive(:submitTopology).with("topology1", "config", "topology")
Topology1.new.start("base_path", :local)
end
end
it "should start in :cluster env" do
class Topology1 < RedStorm::SimpleTopology; end
@ -278,7 +278,7 @@ describe RedStorm::SimpleTopology do
configurator.should_receive(:config).and_return("config")
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology")
Topology1.new.start("base_path", :cluster)
end
end
it "should raise for invalid env" do
class Topology1 < RedStorm::SimpleTopology; end
@ -290,7 +290,7 @@ describe RedStorm::SimpleTopology do
spout SpoutClass1
spout SpoutClass2
end
builder = mock(RedStorm::TopologyBuilder)
configurator = mock(RedStorm::Configurator)
jruby_spout1 = mock(RedStorm::JRubySpout)
@ -327,7 +327,7 @@ describe RedStorm::SimpleTopology do
source 2, :fields => ["f1"]
end
end
builder = mock(RedStorm::TopologyBuilder)
configurator = mock(RedStorm::Configurator)
jruby_bolt1 = mock(RedStorm::JRubyBolt)
@ -340,7 +340,7 @@ describe RedStorm::SimpleTopology do
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", []).and_return(jruby_bolt2)
builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return(declarer)
builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer)
builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer)
declarer.should_receive("addConfigurations").twice
bolt_definition1.should_receive(:define_grouping).with(declarer)
@ -436,7 +436,7 @@ describe RedStorm::SimpleTopology do
source 1, :shuffle
end
end
@declarer.should_receive("shuffleGrouping").with('1')
Topology1.new.start("base_path", :cluster)
end
@ -460,7 +460,7 @@ describe RedStorm::SimpleTopology do
source 1, :none
end
end
@declarer.should_receive("noneGrouping").with('1')
Topology1.new.start("base_path", :cluster)
end
@ -472,7 +472,7 @@ describe RedStorm::SimpleTopology do
source 1, :global
end
end
@declarer.should_receive("globalGrouping").with('1')
Topology1.new.start("base_path", :cluster)
end
@ -484,7 +484,7 @@ describe RedStorm::SimpleTopology do
source 1, :all
end
end
@declarer.should_receive("allGrouping").with('1')
Topology1.new.start("base_path", :cluster)
end
@ -496,7 +496,7 @@ describe RedStorm::SimpleTopology do
source 1, :direct
end
end
@declarer.should_receive("directGrouping").with('1')
Topology1.new.start("base_path", :cluster)
end
@ -566,7 +566,7 @@ describe RedStorm::SimpleTopology do
source "id1", :shuffle
end
end
Topology1.spouts.first.id.should == "id1"
Topology1.bolts.first.id.should == "id2"
Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}]
@ -580,7 +580,7 @@ describe RedStorm::SimpleTopology do
source "spout_class1", :shuffle
end
end
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}]
@ -594,7 +594,7 @@ describe RedStorm::SimpleTopology do
source :spout_class1, :shuffle
end
end
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}]
@ -608,7 +608,7 @@ describe RedStorm::SimpleTopology do
source SpoutClass1, :shuffle
end
end
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}]
@ -622,7 +622,7 @@ describe RedStorm::SimpleTopology do
source "dummy", :shuffle
end
end
Topology1.spouts.first.id.should == "spout_class1"
Topology1.bolts.first.id.should == "bolt_class1"
Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}]
@ -635,7 +635,7 @@ describe RedStorm::SimpleTopology do
spout SpoutClass1
spout SpoutClass1
end
Topology1.spouts.first.id.should == "spout_class1"
Topology1.spouts.last.id.should == "spout_class1"