diff --git a/lib/red_storm.rb b/lib/red_storm.rb index ea6b0cd..2bf089b 100644 --- a/lib/red_storm.rb +++ b/lib/red_storm.rb @@ -1,9 +1,10 @@ require 'rubygems' -require 'red_storm/environment' require 'red_storm/version' +require 'red_storm/environment' require 'red_storm/configuration' -require 'red_storm/simple_bolt' -require 'red_storm/simple_spout' -require 'red_storm/simple_topology' -require 'red_storm/simple_drpc_topology' +require 'red_storm/dsl/bolt' +require 'red_storm/dsl/spout' +require 'red_storm/dsl/topology' +require 'red_storm/dsl/drpc_topology' +require 'red_storm/dsl/tuple' diff --git a/lib/red_storm/dsl/bolt.rb b/lib/red_storm/dsl/bolt.rb new file mode 100644 index 0000000..c632c01 --- /dev/null +++ b/lib/red_storm/dsl/bolt.rb @@ -0,0 +1,141 @@ +require 'java' +require 'red_storm/configurator' + +module RedStorm + module DSL + + class Bolt + attr_reader :collector, :context, :config + + # DSL class methods + + def self.log + @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) + end + + def self.output_fields(*fields) + @fields = fields.map(&:to_s) + end + + def self.configure(&configure_block) + @configure_block = block_given? ? configure_block : lambda {} + end + + def self.on_receive(*args, &on_receive_block) + options = args.last.is_a?(Hash) ? args.pop : {} + method_name = args.first + + self.receive_options.merge!(options) + @on_receive_block = block_given? ? on_receive_block : lambda {|tuple| self.send(method_name || :on_receive, tuple)} + end + + def self.on_init(method_name = nil, &on_init_block) + @on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)} + end + + def self.on_close(method_name = nil, &close_block) + @close_block = block_given? ? close_block : lambda {self.send(method_name || :on_close)} + end + + # DSL instance methods + + def log + self.class.log + end + + def unanchored_emit(*values) + @collector.emit(Values.new(*values)) + end + + def anchored_emit(tuple, *values) + @collector.emit(tuple, Values.new(*values)) + end + + def ack(tuple) + @collector.ack(tuple) + end + + def fail(tuple) + @collector.fail(tuple) + end + + # Bolt proxy interface + + def execute(tuple) + output = instance_exec(tuple, &self.class.on_receive_block) + if output && self.class.emit? + values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output + values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)} + @collector.ack(tuple) if self.class.ack? + end + end + + def prepare(config, context, collector) + @collector = collector + @context = context + @config = config + instance_exec(&self.class.on_init_block) + end + + def cleanup + instance_exec(&self.class.close_block) + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new(self.class.fields)) + end + + def get_component_configuration + configurator = Configurator.new + configurator.instance_exec(&self.class.configure_block) + configurator.config + end + + private + + # default noop optional dsl callbacks + def on_init; end + def on_close; end + + def self.fields + @fields ||= [] + end + + def self.configure_block + @configure_block ||= lambda {} + end + + def self.on_receive_block + @on_receive_block ||= lambda {|tuple| self.send(:on_receive, tuple)} + end + + def self.on_init_block + @on_init_block ||= lambda {self.send(:on_init)} + end + + def self.close_block + @close_block ||= lambda {self.send(:on_close)} + end + + def self.receive_options + @receive_options ||= {:emit => true, :ack => false, :anchor => false} + end + + def self.emit? + !!self.receive_options[:emit] + end + + def self.ack? + !!self.receive_options[:ack] + end + + def self.anchor? + !!self.receive_options[:anchor] + end + end + end + + # for backward compatibility + SimpleBolt = DSL::Bolt + +end diff --git a/lib/red_storm/dsl/drpc_topology.rb b/lib/red_storm/dsl/drpc_topology.rb new file mode 100644 index 0000000..9f2f72b --- /dev/null +++ b/lib/red_storm/dsl/drpc_topology.rb @@ -0,0 +1,92 @@ +require 'java' +require 'red_storm/configuration' +require 'red_storm/configurator' + +module RedStorm + module DSL + + class InputBoltDefinition < Topology::BoltDefinition + attr_accessor :grouping + + def initialize(*args) + super + @grouping = :none + end + + def grouping(grouping) + @grouping = grouping + end + + def define_grouping(declarer) + case @grouping + when :fields + declarer.fieldsGrouping(Fields.new(*([params].flatten.map(&:to_s)))) + when :global + declarer.globalGrouping() + when :shuffle + declarer.shuffleGrouping() + when :local_or_shuffle + declarer.localOrShuffleGrouping() + when :none + declarer.noneGrouping() + when :all + declarer.allGrouping() + when :direct + declarer.directGrouping() + else + raise("unknown grouper=#{grouper.inspect}") + end + end + end + + class DRPCTopology < Topology + + def self.spout + raise TopologyDefinitionError, "DRPC spout is already defined" + end + + def start(base_class_path, env) + builder = Java::BacktypeStormDrpc::LinearDRPCTopologyBuilder.new(self.class.topology_name) + + self.class.bolts.each do |bolt| + declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java) + declarer.addConfigurations(bolt.config) + bolt.define_grouping(declarer) + end + + # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode + defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"} + + configurator = Configurator.new(defaults) + configurator.instance_exec(env, &self.class.configure_block) + + drpc = nil + if env == :local + drpc = LocalDRPC.new + submitter = @cluster = LocalCluster.new + submitter.submitTopology(self.class.topology_name, configurator.config, builder.createLocalTopology(drpc)) + else + submitter = StormSubmitter + submitter.submitTopology(self.class.topology_name, configurator.config, builder.createRemoteTopology) + end + instance_exec(env, drpc, &self.class.submit_block) + end + + def self.input_bolt(bolt_class, *args, &bolt_block) + set_topology_class! + options = args.last.is_a?(Hash) ? args.pop : {} + contructor_args = !args.empty? ? args.pop : [] + bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options) + + bolt = InputBoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism]) + raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given? + bolt.instance_exec(&bolt_block) + self.components << bolt + end + end + end + + # for backward compatibility + SimpleDRPCTopology = DSL::DRPCTopology + +end diff --git a/lib/red_storm/dsl/spout.rb b/lib/red_storm/dsl/spout.rb new file mode 100644 index 0000000..f4b4228 --- /dev/null +++ b/lib/red_storm/dsl/spout.rb @@ -0,0 +1,190 @@ +require 'java' +require 'red_storm/configurator' + +module RedStorm + module DSL + + class Spout + attr_reader :config, :context, :collector + + # DSL class methods + + def self.configure(&configure_block) + @configure_block = block_given? ? configure_block : lambda {} + end + + def self.log + @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) + end + + def self.output_fields(*fields) + @fields = fields.map(&:to_s) + end + + def self.on_send(*args, &on_send_block) + options = args.last.is_a?(Hash) ? args.pop : {} + method_name = args.first + + self.send_options.merge!(options) + @on_send_block = block_given? ? on_send_block : lambda {self.send(method_name || :on_send)} + end + + def self.on_init(method_name = nil, &on_init_block) + @on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)} + end + + def self.on_close(method_name = nil, &on_close_block) + @on_close_block = block_given? ? on_close_block : lambda {self.send(method_name || :on_close)} + end + + def self.on_activate(method_name = nil, &on_activate_block) + @on_activate_block = block_given? ? on_activate_block : lambda {self.send(method_name || :on_activate)} + end + + def self.on_deactivate(method_name = nil, &on_deactivate_block) + @on_deactivate_block = block_given? ? on_deactivate_block : lambda {self.send(method_name || :on_deactivate)} + end + + def self.on_ack(method_name = nil, &on_ack_block) + @on_ack_block = block_given? ? on_ack_block : lambda {|msg_id| self.send(method_name || :on_ack, msg_id)} + end + + def self.on_fail(method_name = nil, &on_fail_block) + @on_fail_block = block_given? ? on_fail_block : lambda {|msg_id| self.send(method_name || :on_fail, msg_id)} + end + + # DSL instance methods + + def reliable_emit(message_id, *values) + @collector.emit(Values.new(*values), message_id) + end + + def unreliable_emit(*values) + @collector.emit(Values.new(*values)) + end + alias_method :emit, :unreliable_emit + + def log + self.class.log + end + + # Spout proxy interface + + def next_tuple + output = instance_exec(&self.class.on_send_block) + if self.class.emit? + if output + values = [output].flatten + if self.class.reliable? + message_id = values.shift + reliable_emit(message_id, *values) + else + unreliable_emit(*values) + end + else + sleep(0.1) + end + end + end + + def open(config, context, collector) + @collector = collector + @context = context + @config = config + instance_exec(&self.class.on_init_block) + end + + def close + instance_exec(&self.class.on_close_block) + end + + def activate + instance_exec(&self.class.on_activate_block) + end + + def deactivate + instance_exec(&self.class.on_deactivate_block) + end + + def declare_output_fields(declarer) + declarer.declare(Fields.new(self.class.fields)) + end + + def ack(msg_id) + instance_exec(msg_id, &self.class.on_ack_block) + end + + def fail(msg_id) + instance_exec(msg_id, &self.class.on_fail_block) + end + + def get_component_configuration + configurator = Configurator.new + configurator.instance_exec(&self.class.configure_block) + configurator.config + end + + private + + # default optional noop dsl methods/callbacks + def on_init; end + def on_close; end + def on_activate; end + def on_deactivate; end + def on_ack(msg_id); end + def on_fail(msg_id); end + + def self.fields + @fields ||= [] + end + + def self.configure_block + @configure_block ||= lambda {} + end + + def self.on_send_block + @on_send_block ||= lambda {self.send(:on_send)} + end + + def self.on_init_block + @on_init_block ||= lambda {self.send(:on_init)} + end + + def self.on_close_block + @on_close_block ||= lambda {self.send(:on_close)} + end + + def self.on_activate_block + @on_activate_block ||= lambda {self.send(:on_activate)} + end + + def self.on_deactivate_block + @on_deactivate_block ||= lambda {self.send(:on_deactivate)} + end + + def self.on_ack_block + @on_ack_block ||= lambda {|msg_id| self.send(:on_ack, msg_id)} + end + + def self.on_fail_block + @on_fail_block ||= lambda {|msg_id| self.send(:on_fail, msg_id)} + end + + def self.send_options + @send_options ||= {:emit => true, :reliable => false} + end + + def self.emit? + !!self.send_options[:emit] + end + + def self.reliable? + !!self.send_options[:reliable] + end + end + end + + # for backward compatibility + SimpleSpout = DSL::Spout + +end diff --git a/lib/red_storm/dsl/topology.rb b/lib/red_storm/dsl/topology.rb new file mode 100644 index 0000000..d3d0ab5 --- /dev/null +++ b/lib/red_storm/dsl/topology.rb @@ -0,0 +1,225 @@ +require 'java' +require 'red_storm/configuration' +require 'red_storm/configurator' + + +module RedStorm + module DSL + + class TopologyDefinitionError < StandardError; end + + class Topology + attr_reader :cluster # LocalCluster reference usable in on_submit block, for example + + DEFAULT_SPOUT_PARALLELISM = 1 + DEFAULT_BOLT_PARALLELISM = 1 + + class ComponentDefinition < Configurator + attr_reader :clazz, :constructor_args, :parallelism + attr_accessor :id # ids are forced to string + + def initialize(component_class, constructor_args, id, parallelism) + super() + @clazz = component_class + @constructor_args = constructor_args + @id = id.to_s + @parallelism = parallelism + @output_fields = [] + end + + def output_fields(*args) + args.empty? ? @output_fields : @output_fields = args.map(&:to_s) + end + + def is_java? + @clazz.name.split('::').first.downcase == 'java' + end + end + + class SpoutDefinition < ComponentDefinition + + # WARNING non-dry see BoltDefinition#new_instance + def new_instance(base_class_path) + if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout" + @clazz.new(constructor_args, @output_fields) + elsif is_java? + @clazz.new(*constructor_args) + else + JRubySpout.new(base_class_path, @clazz.name, @output_fields) + end + # is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name) + end + end + + class BoltDefinition < ComponentDefinition + attr_accessor :sources, :command + + def initialize(*args) + super + @sources = [] + end + + def source(source_id, grouping) + @sources << [source_id.is_a?(Class) ? Topology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}] + end + + def define_grouping(declarer) + @sources.each do |source_id, grouping| + grouper, params = grouping.first + # declarer.fieldsGrouping(source_id, Fields.new()) + case grouper + when :fields + declarer.fieldsGrouping(source_id, Fields.new(*([params].flatten.map(&:to_s)))) + when :global + declarer.globalGrouping(source_id) + when :shuffle + declarer.shuffleGrouping(source_id) + when :local_or_shuffle + declarer.localOrShuffleGrouping(source_id) + when :none + declarer.noneGrouping(source_id) + when :all + declarer.allGrouping(source_id) + when :direct + declarer.directGrouping(source_id) + else + raise("unknown grouper=#{grouper.inspect}") + end + end + end + + def new_instance(base_class_path) + # WARNING non-dry see BoltDefinition#new_instance + if @clazz.name == "Java::RedstormStormJruby::JRubyShellBolt" + @clazz.new(constructor_args, @output_fields) + elsif is_java? + @clazz.new(*constructor_args) + else + JRubyBolt.new(base_class_path, @clazz.name, @output_fields) + end + # is_java? ? @clazz.new : @clazz.is_a?(Bolt) ? JRubyBolt.new(base_class_path, @clazz.name) : @clazz.new + end + end + + def self.log + @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) + end + + # def self.spout(spout_class, contructor_args = [], options = {}, &spout_block) + def self.spout(spout_class, *args, &spout_block) + set_topology_class! + options = args.last.is_a?(Hash) ? args.pop : {} + contructor_args = !args.empty? ? args.pop : [] + spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options) + + spout = SpoutDefinition.new(spout_class, contructor_args, spout_options[:id], spout_options[:parallelism]) + spout.instance_exec(&spout_block) if block_given? + self.components << spout + end + + # def self.bolt(bolt_class, contructor_args = [], options = {}, &bolt_block) + def self.bolt(bolt_class, *args, &bolt_block) + set_topology_class! + options = args.last.is_a?(Hash) ? args.pop : {} + contructor_args = !args.empty? ? args.pop : [] + bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options) + + bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism]) + raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given? + bolt.instance_exec(&bolt_block) + self.components << bolt + end + + def self.configure(name = nil, &configure_block) + set_topology_class! + @topology_name = name.to_s if name + @configure_block = configure_block if block_given? + end + + def self.on_submit(method_name = nil, &submit_block) + @submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)} + end + + # topology proxy interface + + def start(base_class_path, env) + self.class.resolve_ids!(self.class.components) + + builder = TopologyBuilder.new + self.class.spouts.each do |spout| + declarer = builder.setSpout(spout.id, spout.new_instance(base_class_path), spout.parallelism.to_java) + declarer.addConfigurations(spout.config) + end + self.class.bolts.each do |bolt| + declarer = builder.setBolt(bolt.id, bolt.new_instance(base_class_path), bolt.parallelism.to_java) + declarer.addConfigurations(bolt.config) + bolt.define_grouping(declarer) + end + + # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode + defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"} + + configurator = Configurator.new(defaults) + configurator.instance_exec(env, &self.class.configure_block) + + submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter + submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology) + instance_exec(env, &self.class.submit_block) + end + + private + + # this is a quirk to figure out the topology class at load time when the topology file + # is required in the TopologyLauncher. Since we want to make the "configure" DSL statement + # optional we can hook into any/all the other DSL statements that will be called at load time + # and set it there. This is somewhat inelegant but it works. + def self.set_topology_class! + Configuration.topology_class = self + end + + def self.resolve_ids!(components) + # verify duplicate implicit ids + ids = components.map(&:id) + components.reverse.each do |component| + raise("duplicate id in #{component.clazz.name} on id=#{component.id}") if ids.select{|id| id == component.id}.size > 1 + # verify source_id references + if component.respond_to?(:sources) + component.sources.each{|source_id, grouping| raise("cannot resolve #{component.clazz.name} source id=#{source_id}") unless ids.include?(source_id)} + end + end + end + + def self.spouts + self.components.select{|c| c.is_a?(SpoutDefinition)} + end + + def self.bolts + self.components.select{|c| c.is_a?(BoltDefinition)} + end + + def self.components + @components ||= [] + end + + def self.topology_name + @topology_name ||= self.underscore(self.name) + end + + def self.configure_block + @configure_block ||= lambda{|env|} + end + + def self.submit_block + @submit_block ||= lambda{|env|} + end + + def self.underscore(camel_case) + camel_case.to_s.split('::').last.gsub(/(.)([A-Z])/,'\1_\2').downcase! + end + end + end + + # for backward compatibility + SimpleTopology = DSL::Topology + +end diff --git a/lib/red_storm/simple_bolt.rb b/lib/red_storm/simple_bolt.rb deleted file mode 100644 index 5ee297e..0000000 --- a/lib/red_storm/simple_bolt.rb +++ /dev/null @@ -1,135 +0,0 @@ -require 'java' -require 'red_storm/configurator' - -module RedStorm - - class SimpleBolt - attr_reader :collector, :context, :config - - # DSL class methods - - def self.log - @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) - end - - def self.output_fields(*fields) - @fields = fields.map(&:to_s) - end - - def self.configure(&configure_block) - @configure_block = block_given? ? configure_block : lambda {} - end - - def self.on_receive(*args, &on_receive_block) - options = args.last.is_a?(Hash) ? args.pop : {} - method_name = args.first - - self.receive_options.merge!(options) - @on_receive_block = block_given? ? on_receive_block : lambda {|tuple| self.send(method_name || :on_receive, tuple)} - end - - def self.on_init(method_name = nil, &on_init_block) - @on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)} - end - - def self.on_close(method_name = nil, &close_block) - @close_block = block_given? ? close_block : lambda {self.send(method_name || :on_close)} - end - - # DSL instance methods - - def log - self.class.log - end - - def unanchored_emit(*values) - @collector.emit(Values.new(*values)) - end - - def anchored_emit(tuple, *values) - @collector.emit(tuple, Values.new(*values)) - end - - def ack(tuple) - @collector.ack(tuple) - end - - def fail(tuple) - @collector.fail(tuple) - end - - # Bolt proxy interface - - def execute(tuple) - output = instance_exec(tuple, &self.class.on_receive_block) - if output && self.class.emit? - values_list = !output.is_a?(Array) ? [[output]] : !output.first.is_a?(Array) ? [output] : output - values_list.each{|values| self.class.anchor? ? anchored_emit(tuple, *values) : unanchored_emit(*values)} - @collector.ack(tuple) if self.class.ack? - end - end - - def prepare(config, context, collector) - @collector = collector - @context = context - @config = config - instance_exec(&self.class.on_init_block) - end - - def cleanup - instance_exec(&self.class.close_block) - end - - def declare_output_fields(declarer) - declarer.declare(Fields.new(self.class.fields)) - end - - def get_component_configuration - configurator = Configurator.new - configurator.instance_exec(&self.class.configure_block) - configurator.config - end - - private - - # default noop optional dsl callbacks - def on_init; end - def on_close; end - - def self.fields - @fields ||= [] - end - - def self.configure_block - @configure_block ||= lambda {} - end - - def self.on_receive_block - @on_receive_block ||= lambda {|tuple| self.send(:on_receive, tuple)} - end - - def self.on_init_block - @on_init_block ||= lambda {self.send(:on_init)} - end - - def self.close_block - @close_block ||= lambda {self.send(:on_close)} - end - - def self.receive_options - @receive_options ||= {:emit => true, :ack => false, :anchor => false} - end - - def self.emit? - !!self.receive_options[:emit] - end - - def self.ack? - !!self.receive_options[:ack] - end - - def self.anchor? - !!self.receive_options[:anchor] - end - end -end diff --git a/lib/red_storm/simple_drpc_topology.rb b/lib/red_storm/simple_drpc_topology.rb deleted file mode 100644 index b50de33..0000000 --- a/lib/red_storm/simple_drpc_topology.rb +++ /dev/null @@ -1,87 +0,0 @@ -require 'java' -require 'red_storm/configuration' -require 'red_storm/configurator' - -module RedStorm - - class InputBoltDefinition < SimpleTopology::BoltDefinition - attr_accessor :grouping - - def initialize(*args) - super - @grouping = :none - end - - def grouping(grouping) - @grouping = grouping - end - - def define_grouping(declarer) - case @grouping - when :fields - declarer.fieldsGrouping(Fields.new(*([params].flatten.map(&:to_s)))) - when :global - declarer.globalGrouping() - when :shuffle - declarer.shuffleGrouping() - when :local_or_shuffle - declarer.localOrShuffleGrouping() - when :none - declarer.noneGrouping() - when :all - declarer.allGrouping() - when :direct - declarer.directGrouping() - else - raise("unknown grouper=#{grouper.inspect}") - end - end - end - - class SimpleDRPCTopology < SimpleTopology - - def self.spout - raise TopologyDefinitionError, "DRPC spout is already defined" - end - - def start(base_class_path, env) - builder = Java::BacktypeStormDrpc::LinearDRPCTopologyBuilder.new(self.class.topology_name) - - self.class.bolts.each do |bolt| - declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java) - declarer.addConfigurations(bolt.config) - bolt.define_grouping(declarer) - end - - # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode - defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"} - - configurator = Configurator.new(defaults) - configurator.instance_exec(env, &self.class.configure_block) - - drpc = nil - if env == :local - drpc = LocalDRPC.new - submitter = @cluster = LocalCluster.new - submitter.submitTopology(self.class.topology_name, configurator.config, builder.createLocalTopology(drpc)) - else - submitter = StormSubmitter - submitter.submitTopology(self.class.topology_name, configurator.config, builder.createRemoteTopology) - end - instance_exec(env, drpc, &self.class.submit_block) - end - - def self.input_bolt(bolt_class, *args, &bolt_block) - set_topology_class! - options = args.last.is_a?(Hash) ? args.pop : {} - contructor_args = !args.empty? ? args.pop : [] - bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options) - - bolt = InputBoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism]) - raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given? - bolt.instance_exec(&bolt_block) - self.components << bolt - end - end - -end diff --git a/lib/red_storm/simple_spout.rb b/lib/red_storm/simple_spout.rb deleted file mode 100644 index efe36a3..0000000 --- a/lib/red_storm/simple_spout.rb +++ /dev/null @@ -1,184 +0,0 @@ -require 'java' -require 'red_storm/configurator' - -module RedStorm - - class SimpleSpout - attr_reader :config, :context, :collector - - # DSL class methods - - def self.configure(&configure_block) - @configure_block = block_given? ? configure_block : lambda {} - end - - def self.log - @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) - end - - def self.output_fields(*fields) - @fields = fields.map(&:to_s) - end - - def self.on_send(*args, &on_send_block) - options = args.last.is_a?(Hash) ? args.pop : {} - method_name = args.first - - self.send_options.merge!(options) - @on_send_block = block_given? ? on_send_block : lambda {self.send(method_name || :on_send)} - end - - def self.on_init(method_name = nil, &on_init_block) - @on_init_block = block_given? ? on_init_block : lambda {self.send(method_name || :on_init)} - end - - def self.on_close(method_name = nil, &on_close_block) - @on_close_block = block_given? ? on_close_block : lambda {self.send(method_name || :on_close)} - end - - def self.on_activate(method_name = nil, &on_activate_block) - @on_activate_block = block_given? ? on_activate_block : lambda {self.send(method_name || :on_activate)} - end - - def self.on_deactivate(method_name = nil, &on_deactivate_block) - @on_deactivate_block = block_given? ? on_deactivate_block : lambda {self.send(method_name || :on_deactivate)} - end - - def self.on_ack(method_name = nil, &on_ack_block) - @on_ack_block = block_given? ? on_ack_block : lambda {|msg_id| self.send(method_name || :on_ack, msg_id)} - end - - def self.on_fail(method_name = nil, &on_fail_block) - @on_fail_block = block_given? ? on_fail_block : lambda {|msg_id| self.send(method_name || :on_fail, msg_id)} - end - - # DSL instance methods - - def reliable_emit(message_id, *values) - @collector.emit(Values.new(*values), message_id) - end - - def unreliable_emit(*values) - @collector.emit(Values.new(*values)) - end - alias_method :emit, :unreliable_emit - - def log - self.class.log - end - - # Spout proxy interface - - def next_tuple - output = instance_exec(&self.class.on_send_block) - if self.class.emit? - if output - values = [output].flatten - if self.class.reliable? - message_id = values.shift - reliable_emit(message_id, *values) - else - unreliable_emit(*values) - end - else - sleep(0.1) - end - end - end - - def open(config, context, collector) - @collector = collector - @context = context - @config = config - instance_exec(&self.class.on_init_block) - end - - def close - instance_exec(&self.class.on_close_block) - end - - def activate - instance_exec(&self.class.on_activate_block) - end - - def deactivate - instance_exec(&self.class.on_deactivate_block) - end - - def declare_output_fields(declarer) - declarer.declare(Fields.new(self.class.fields)) - end - - def ack(msg_id) - instance_exec(msg_id, &self.class.on_ack_block) - end - - def fail(msg_id) - instance_exec(msg_id, &self.class.on_fail_block) - end - - def get_component_configuration - configurator = Configurator.new - configurator.instance_exec(&self.class.configure_block) - configurator.config - end - - private - - # default optional noop dsl methods/callbacks - def on_init; end - def on_close; end - def on_activate; end - def on_deactivate; end - def on_ack(msg_id); end - def on_fail(msg_id); end - - def self.fields - @fields ||= [] - end - - def self.configure_block - @configure_block ||= lambda {} - end - - def self.on_send_block - @on_send_block ||= lambda {self.send(:on_send)} - end - - def self.on_init_block - @on_init_block ||= lambda {self.send(:on_init)} - end - - def self.on_close_block - @on_close_block ||= lambda {self.send(:on_close)} - end - - def self.on_activate_block - @on_activate_block ||= lambda {self.send(:on_activate)} - end - - def self.on_deactivate_block - @on_deactivate_block ||= lambda {self.send(:on_deactivate)} - end - - def self.on_ack_block - @on_ack_block ||= lambda {|msg_id| self.send(:on_ack, msg_id)} - end - - def self.on_fail_block - @on_fail_block ||= lambda {|msg_id| self.send(:on_fail, msg_id)} - end - - def self.send_options - @send_options ||= {:emit => true, :reliable => false} - end - - def self.emit? - !!self.send_options[:emit] - end - - def self.reliable? - !!self.send_options[:reliable] - end - end -end diff --git a/lib/red_storm/simple_topology.rb b/lib/red_storm/simple_topology.rb deleted file mode 100644 index beaf849..0000000 --- a/lib/red_storm/simple_topology.rb +++ /dev/null @@ -1,219 +0,0 @@ -require 'java' -require 'red_storm/configuration' -require 'red_storm/configurator' - - -module RedStorm - - class TopologyDefinitionError < StandardError; end - - class SimpleTopology - attr_reader :cluster # LocalCluster reference usable in on_submit block, for example - - DEFAULT_SPOUT_PARALLELISM = 1 - DEFAULT_BOLT_PARALLELISM = 1 - - class ComponentDefinition < Configurator - attr_reader :clazz, :constructor_args, :parallelism - attr_accessor :id # ids are forced to string - - def initialize(component_class, constructor_args, id, parallelism) - super() - @clazz = component_class - @constructor_args = constructor_args - @id = id.to_s - @parallelism = parallelism - @output_fields = [] - end - - def output_fields(*args) - args.empty? ? @output_fields : @output_fields = args.map(&:to_s) - end - - def is_java? - @clazz.name.split('::').first.downcase == 'java' - end - end - - class SpoutDefinition < ComponentDefinition - - # WARNING non-dry see BoltDefinition#new_instance - def new_instance(base_class_path) - if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout" - @clazz.new(constructor_args, @output_fields) - elsif is_java? - @clazz.new(*constructor_args) - else - JRubySpout.new(base_class_path, @clazz.name, @output_fields) - end - # is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name) - end - end - - class BoltDefinition < ComponentDefinition - attr_accessor :sources, :command - - def initialize(*args) - super - @sources = [] - end - - def source(source_id, grouping) - @sources << [source_id.is_a?(Class) ? SimpleTopology.underscore(source_id) : source_id.to_s, grouping.is_a?(Hash) ? grouping : {grouping => nil}] - end - - def define_grouping(declarer) - @sources.each do |source_id, grouping| - grouper, params = grouping.first - # declarer.fieldsGrouping(source_id, Fields.new()) - case grouper - when :fields - declarer.fieldsGrouping(source_id, Fields.new(*([params].flatten.map(&:to_s)))) - when :global - declarer.globalGrouping(source_id) - when :shuffle - declarer.shuffleGrouping(source_id) - when :local_or_shuffle - declarer.localOrShuffleGrouping(source_id) - when :none - declarer.noneGrouping(source_id) - when :all - declarer.allGrouping(source_id) - when :direct - declarer.directGrouping(source_id) - else - raise("unknown grouper=#{grouper.inspect}") - end - end - end - - def new_instance(base_class_path) - # WARNING non-dry see BoltDefinition#new_instance - if @clazz.name == "Java::RedstormStormJruby::JRubyShellBolt" - @clazz.new(constructor_args, @output_fields) - elsif is_java? - @clazz.new(*constructor_args) - else - JRubyBolt.new(base_class_path, @clazz.name, @output_fields) - end - # is_java? ? @clazz.new : @clazz.is_a?(SimpleBolt) ? JRubyBolt.new(base_class_path, @clazz.name) : @clazz.new - end - end - - def self.log - @log ||= Java::OrgApacheLog4j::Logger.getLogger(self.name) - end - - # def self.spout(spout_class, contructor_args = [], options = {}, &spout_block) - def self.spout(spout_class, *args, &spout_block) - set_topology_class! - options = args.last.is_a?(Hash) ? args.pop : {} - contructor_args = !args.empty? ? args.pop : [] - spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options) - - spout = SpoutDefinition.new(spout_class, contructor_args, spout_options[:id], spout_options[:parallelism]) - spout.instance_exec(&spout_block) if block_given? - self.components << spout - end - - # def self.bolt(bolt_class, contructor_args = [], options = {}, &bolt_block) - def self.bolt(bolt_class, *args, &bolt_block) - set_topology_class! - options = args.last.is_a?(Hash) ? args.pop : {} - contructor_args = !args.empty? ? args.pop : [] - bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options) - - bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism]) - raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given? - bolt.instance_exec(&bolt_block) - self.components << bolt - end - - def self.configure(name = nil, &configure_block) - set_topology_class! - @topology_name = name.to_s if name - @configure_block = configure_block if block_given? - end - - def self.on_submit(method_name = nil, &submit_block) - @submit_block = block_given? ? submit_block : lambda {|env| self.send(method_name, env)} - end - - # topology proxy interface - - def start(base_class_path, env) - self.class.resolve_ids!(self.class.components) - - builder = TopologyBuilder.new - self.class.spouts.each do |spout| - declarer = builder.setSpout(spout.id, spout.new_instance(base_class_path), spout.parallelism.to_java) - declarer.addConfigurations(spout.config) - end - self.class.bolts.each do |bolt| - declarer = builder.setBolt(bolt.id, bolt.new_instance(base_class_path), bolt.parallelism.to_java) - declarer.addConfigurations(bolt.config) - bolt.define_grouping(declarer) - end - - # set the JRuby compatibility mode option for Storm workers, default to current JRuby mode - defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"} - - configurator = Configurator.new(defaults) - configurator.instance_exec(env, &self.class.configure_block) - - submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter - submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology) - instance_exec(env, &self.class.submit_block) - end - - private - - # this is a quirk to figure out the topology class at load time when the topology file - # is required in the TopologyLauncher. Since we want to make the "configure" DSL statement - # optional we can hook into any/all the other DSL statements that will be called at load time - # and set it there. This is somewhat inelegant but it works. - def self.set_topology_class! - Configuration.topology_class = self - end - - def self.resolve_ids!(components) - # verify duplicate implicit ids - ids = components.map(&:id) - components.reverse.each do |component| - raise("duplicate id in #{component.clazz.name} on id=#{component.id}") if ids.select{|id| id == component.id}.size > 1 - # verify source_id references - if component.respond_to?(:sources) - component.sources.each{|source_id, grouping| raise("cannot resolve #{component.clazz.name} source id=#{source_id}") unless ids.include?(source_id)} - end - end - end - - def self.spouts - self.components.select{|c| c.is_a?(SpoutDefinition)} - end - - def self.bolts - self.components.select{|c| c.is_a?(BoltDefinition)} - end - - def self.components - @components ||= [] - end - - def self.topology_name - @topology_name ||= self.underscore(self.name) - end - - def self.configure_block - @configure_block ||= lambda{|env|} - end - - def self.submit_block - @submit_block ||= lambda{|env|} - end - - def self.underscore(camel_case) - camel_case.to_s.split('::').last.gsub(/(.)([A-Z])/,'\1_\2').downcase! - end - end -end diff --git a/spec/red_storm/simple_bolt_spec.rb b/spec/red_storm/dsl/bolt_spec.rb similarity index 97% rename from spec/red_storm/simple_bolt_spec.rb rename to spec/red_storm/dsl/bolt_spec.rb index 3496457..4c84296 100644 --- a/spec/red_storm/simple_bolt_spec.rb +++ b/spec/red_storm/dsl/bolt_spec.rb @@ -1,5 +1,5 @@ require 'spec_helper' -require 'red_storm/simple_bolt' +require 'red_storm/dsl/bolt' describe RedStorm::SimpleBolt do @@ -495,19 +495,19 @@ describe RedStorm::SimpleBolt do it "should anchor on single value output" do class Bolt1 < RedStorm::SimpleBolt - on_receive :anchor => true do |tuple| + on_receive :anchor => true do |tuple| "output" end end class Bolt2 < RedStorm::SimpleBolt - on_receive :my_method, :anchor => true - def my_method(tuple) + on_receive :my_method, :anchor => true + def my_method(tuple) "output" end end class Bolt3 < RedStorm::SimpleBolt - on_receive :anchor => true - def on_receive(tuple) + on_receive :anchor => true + def on_receive(tuple) "output" end end @@ -531,19 +531,19 @@ describe RedStorm::SimpleBolt do it "should ack on single value output" do class Bolt1 < RedStorm::SimpleBolt - on_receive :anchor => true, :ack => true do |tuple| + on_receive :anchor => true, :ack => true do |tuple| "output" end end class Bolt2 < RedStorm::SimpleBolt on_receive :my_method, :anchor => true, :ack => true - def my_method(tuple) + def my_method(tuple) "output" end end class Bolt3 < RedStorm::SimpleBolt - on_receive :anchor => true, :ack => true - def on_receive(tuple) + on_receive :anchor => true, :ack => true + def on_receive(tuple) "output" end end @@ -568,7 +568,7 @@ describe RedStorm::SimpleBolt do it "should not emit" do class Bolt1 < RedStorm::SimpleBolt - on_receive :emit => false do |tuple| + on_receive :emit => false do |tuple| tuple end end @@ -580,7 +580,7 @@ describe RedStorm::SimpleBolt do end class Bolt3 < RedStorm::SimpleBolt on_receive :emit => false - def on_receive(tuple) + def on_receive(tuple) tuple end end diff --git a/spec/red_storm/simple_spout_spec.rb b/spec/red_storm/dsl/spout_spec.rb similarity index 98% rename from spec/red_storm/simple_spout_spec.rb rename to spec/red_storm/dsl/spout_spec.rb index a50f6ac..9f13cae 100644 --- a/spec/red_storm/simple_spout_spec.rb +++ b/spec/red_storm/dsl/spout_spec.rb @@ -1,5 +1,5 @@ require 'spec_helper' -require 'red_storm/simple_spout' +require 'red_storm/dsl/spout' describe RedStorm::SimpleSpout do @@ -33,8 +33,8 @@ describe RedStorm::SimpleSpout do RedStorm::SimpleSpout.should respond_to :on_deactivate RedStorm::SimpleSpout.should respond_to :on_send RedStorm::SimpleSpout.should respond_to :on_ack - RedStorm::SimpleSpout.should respond_to :on_fail - RedStorm::SimpleSpout.should respond_to :log + RedStorm::SimpleSpout.should respond_to :on_fail + RedStorm::SimpleSpout.should respond_to :log end it "should implement dsl instance statements" do @@ -514,7 +514,7 @@ describe RedStorm::SimpleSpout do it "should auto reliable emit on single value output" do class Spout1 < RedStorm::SimpleSpout - on_send :reliable => true do + on_send :reliable => true do [1, "output"] end end @@ -577,7 +577,7 @@ describe RedStorm::SimpleSpout do it "should auto reliable emit on multiple values output" do class Spout1 < RedStorm::SimpleSpout - on_send :reliable => true do + on_send :reliable => true do [1, "output1", "output2"] end end @@ -643,7 +643,7 @@ describe RedStorm::SimpleSpout do it "should respect :emit => false" do class Spout1 < RedStorm::SimpleSpout - on_send :emit => false do + on_send :emit => false do "output" end end @@ -678,12 +678,12 @@ describe RedStorm::SimpleSpout do it "should support manual emit" do class Spout1 < RedStorm::SimpleSpout - on_send :emit => false do + on_send :emit => false do reliable_emit 1, "reliable output" end end class Spout2 < RedStorm::SimpleSpout - on_send :emit => false do + on_send :emit => false do unreliable_emit "unreliable output" end end @@ -777,7 +777,7 @@ describe RedStorm::SimpleSpout do spout.close end end - + describe "declare_output_fields" do it "should declare fields" do class Spout1 < RedStorm::SimpleSpout diff --git a/spec/red_storm/simple_topology_spec.rb b/spec/red_storm/dsl/topology_spec.rb similarity index 99% rename from spec/red_storm/simple_topology_spec.rb rename to spec/red_storm/dsl/topology_spec.rb index f568cb6..7694adc 100644 --- a/spec/red_storm/simple_topology_spec.rb +++ b/spec/red_storm/dsl/topology_spec.rb @@ -1,5 +1,5 @@ require 'spec_helper' -require 'red_storm/simple_topology' +require 'red_storm/dsl/topology' describe RedStorm::SimpleTopology do @@ -266,7 +266,7 @@ describe RedStorm::SimpleTopology do RedStorm::LocalCluster.should_receive(:new).and_return(cluster) cluster.should_receive(:submitTopology).with("topology1", "config", "topology") Topology1.new.start("base_path", :local) - end + end it "should start in :cluster env" do class Topology1 < RedStorm::SimpleTopology; end @@ -278,7 +278,7 @@ describe RedStorm::SimpleTopology do configurator.should_receive(:config).and_return("config") RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology") Topology1.new.start("base_path", :cluster) - end + end it "should raise for invalid env" do class Topology1 < RedStorm::SimpleTopology; end @@ -290,7 +290,7 @@ describe RedStorm::SimpleTopology do spout SpoutClass1 spout SpoutClass2 end - + builder = mock(RedStorm::TopologyBuilder) configurator = mock(RedStorm::Configurator) jruby_spout1 = mock(RedStorm::JRubySpout) @@ -327,7 +327,7 @@ describe RedStorm::SimpleTopology do source 2, :fields => ["f1"] end end - + builder = mock(RedStorm::TopologyBuilder) configurator = mock(RedStorm::Configurator) jruby_bolt1 = mock(RedStorm::JRubyBolt) @@ -340,7 +340,7 @@ describe RedStorm::SimpleTopology do RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2", []).and_return(jruby_bolt2) builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return(declarer) - builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer) + builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer) declarer.should_receive("addConfigurations").twice bolt_definition1.should_receive(:define_grouping).with(declarer) @@ -436,7 +436,7 @@ describe RedStorm::SimpleTopology do source 1, :shuffle end end - + @declarer.should_receive("shuffleGrouping").with('1') Topology1.new.start("base_path", :cluster) end @@ -460,7 +460,7 @@ describe RedStorm::SimpleTopology do source 1, :none end end - + @declarer.should_receive("noneGrouping").with('1') Topology1.new.start("base_path", :cluster) end @@ -472,7 +472,7 @@ describe RedStorm::SimpleTopology do source 1, :global end end - + @declarer.should_receive("globalGrouping").with('1') Topology1.new.start("base_path", :cluster) end @@ -484,7 +484,7 @@ describe RedStorm::SimpleTopology do source 1, :all end end - + @declarer.should_receive("allGrouping").with('1') Topology1.new.start("base_path", :cluster) end @@ -496,7 +496,7 @@ describe RedStorm::SimpleTopology do source 1, :direct end end - + @declarer.should_receive("directGrouping").with('1') Topology1.new.start("base_path", :cluster) end @@ -566,7 +566,7 @@ describe RedStorm::SimpleTopology do source "id1", :shuffle end end - + Topology1.spouts.first.id.should == "id1" Topology1.bolts.first.id.should == "id2" Topology1.bolts.first.sources.first.should == ["id1", {:shuffle => nil}] @@ -580,7 +580,7 @@ describe RedStorm::SimpleTopology do source "spout_class1", :shuffle end end - + Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}] @@ -594,7 +594,7 @@ describe RedStorm::SimpleTopology do source :spout_class1, :shuffle end end - + Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" Topology1.bolts.first.sources.first.should == ['spout_class1', {:shuffle => nil}] @@ -608,7 +608,7 @@ describe RedStorm::SimpleTopology do source SpoutClass1, :shuffle end end - + Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" Topology1.bolts.first.sources.first.should == ["spout_class1", {:shuffle => nil}] @@ -622,7 +622,7 @@ describe RedStorm::SimpleTopology do source "dummy", :shuffle end end - + Topology1.spouts.first.id.should == "spout_class1" Topology1.bolts.first.id.should == "bolt_class1" Topology1.bolts.first.sources.first.should == ["dummy", {:shuffle => nil}] @@ -635,7 +635,7 @@ describe RedStorm::SimpleTopology do spout SpoutClass1 spout SpoutClass1 end - + Topology1.spouts.first.id.should == "spout_class1" Topology1.spouts.last.id.should == "spout_class1"