diff --git a/examples/simple/exclamation_topology.rb b/examples/simple/exclamation_topology.rb index 37fc86b..4706a38 100644 --- a/examples/simple/exclamation_topology.rb +++ b/examples/simple/exclamation_topology.rb @@ -7,22 +7,27 @@ require 'examples/simple/exclamation_bolt' module RedStorm module Examples class ExclamationTopology < RedStorm::SimpleTopology - spout TestWordSpout, :parallelism => 10 + spout TestWordSpout, :parallelism => 5 do + debug true + end - bolt ExclamationBolt, :parallelism => 3 do + bolt ExclamationBolt, :parallelism => 2 do source TestWordSpout, :shuffle + # max_task_parallelism 1 end bolt ExclamationBolt, :id => :ExclamationBolt2, :parallelism => 2 do source ExclamationBolt, :shuffle + # max_task_parallelism 1 + debug true end configure do |env| - debug true + debug false set "topology.worker.childopts", "-Djruby.compat.version=RUBY1_9" case env when :local - max_task_parallelism 3 + max_task_parallelism 40 when :cluster num_workers 20 max_spout_pending(1000); diff --git a/lib/red_storm.rb b/lib/red_storm.rb index 1d315d7..c9f795a 100644 --- a/lib/red_storm.rb +++ b/lib/red_storm.rb @@ -9,13 +9,13 @@ module RedStorm if JAR_CONTEXT REDSTORM_HOME = LAUNCH_PATH TARGET_PATH = LAUNCH_PATH - BUNDLE_GEMFILE = "#{TARGET_PATH}/Gemfile" + BUNDLE_GEMFILE = "#{TARGET_PATH}/bundler/Gemfile" BUNDLE_PATH = "#{TARGET_PATH}/bundler/#{Gem.ruby_engine}/#{Gem::ConfigMap[:ruby_version]}/" GEM_PATH = "#{TARGET_PATH}/gems/" else REDSTORM_HOME = File.expand_path(LAUNCH_PATH + '/..') TARGET_PATH = Dir.pwd - BUNDLE_GEMFILE = "#{TARGET_PATH}/Gemfile" + BUNDLE_GEMFILE = "#{TARGET_PATH}/target/gems/bundler/Gemfile" BUNDLE_PATH = "#{TARGET_PATH}/target/gems/bundler/#{Gem.ruby_engine}/#{Gem::ConfigMap[:ruby_version]}/" GEM_PATH = "#{TARGET_PATH}/target/gems/gems" end diff --git a/lib/red_storm/configurator.rb b/lib/red_storm/configurator.rb new file mode 100644 index 0000000..f7a4e27 --- /dev/null +++ b/lib/red_storm/configurator.rb @@ -0,0 +1,25 @@ +module RedStorm + + class Configurator + attr_reader :config + + def initialize + @config = Backtype::Config.new + end + + def set(attribute, value) + @config.put(attribute, value) + end + + def method_missing(sym, *args) + config_method = "set#{self.class.camel_case(sym)}" + @config.send(config_method, *args) + end + + private + + def self.camel_case(s) + s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase } + end + end +end \ No newline at end of file diff --git a/lib/red_storm/loggable.rb b/lib/red_storm/loggable.rb new file mode 100644 index 0000000..5dd0153 --- /dev/null +++ b/lib/red_storm/loggable.rb @@ -0,0 +1,13 @@ +module RedStorm + module Loggable + + def self.log + @log ||= Logger.getLogger(self.name) + end + + def log + self.class.log + end + + end +end diff --git a/lib/red_storm/proxy/bolt.rb b/lib/red_storm/proxy/bolt.rb index 04b403d..d8400c1 100644 --- a/lib/red_storm/proxy/bolt.rb +++ b/lib/red_storm/proxy/bolt.rb @@ -9,6 +9,9 @@ java_import 'backtype.storm.tuple.Fields' java_import 'backtype.storm.tuple.Values' java_import 'java.util.Map' java_import 'org.apache.log4j.Logger' +module Backtype + java_import 'backtype.storm.Config' +end java_package 'redstorm.proxy' diff --git a/lib/red_storm/proxy/spout.rb b/lib/red_storm/proxy/spout.rb index 74d034e..826a182 100644 --- a/lib/red_storm/proxy/spout.rb +++ b/lib/red_storm/proxy/spout.rb @@ -9,6 +9,9 @@ java_import 'backtype.storm.tuple.Fields' java_import 'backtype.storm.tuple.Values' java_import 'java.util.Map' java_import 'org.apache.log4j.Logger' +module Backtype + java_import 'backtype.storm.Config' +end java_package 'redstorm.proxy' diff --git a/lib/red_storm/simple_bolt.rb b/lib/red_storm/simple_bolt.rb index d67089f..2a9acd7 100644 --- a/lib/red_storm/simple_bolt.rb +++ b/lib/red_storm/simple_bolt.rb @@ -1,3 +1,5 @@ +require 'red_storm/configurator' + module RedStorm class SimpleBolt @@ -13,6 +15,10 @@ module RedStorm @fields = fields.map(&:to_s) end + def self.configure(&configure_block) + @configure_block = block_given? ? configure_block : lambda {} + end + def self.on_receive(*args, &on_receive_block) options = args.last.is_a?(Hash) ? args.pop : {} method_name = args.first @@ -73,8 +79,9 @@ module RedStorm end def get_component_configuration - # TODO: dummy implemetation - Backtype::Config.new + configurator = Configurator.new + configurator.instance_exec(&self.class.configure_block) + configurator.config end private @@ -87,6 +94,10 @@ module RedStorm @fields ||= [] end + def self.configure_block + @configure_block ||= lambda {} + end + def self.on_receive_block @on_receive_block ||= lambda {|tuple| self.send(:on_receive, tuple)} end diff --git a/lib/red_storm/simple_spout.rb b/lib/red_storm/simple_spout.rb index bf2f338..06ce001 100644 --- a/lib/red_storm/simple_spout.rb +++ b/lib/red_storm/simple_spout.rb @@ -1,3 +1,5 @@ +require 'red_storm/configurator' + module RedStorm class SimpleSpout @@ -5,8 +7,8 @@ module RedStorm # DSL class methods - def self.set(options = {}) - self.spout_options.merge!(options) + def self.configure(&configure_block) + @configure_block = block_given? ? configure_block : lambda {} end def self.log @@ -105,8 +107,9 @@ module RedStorm end def get_component_configuration - # TODO: dummy implemetation - Backtype::Config.new + configurator = Configurator.new + configurator.instance_exec(&self.class.configure_block) + configurator.config end private @@ -123,6 +126,10 @@ module RedStorm @fields ||= [] end + def self.configure_block + @configure_block ||= lambda {} + end + def self.on_send_block @on_send_block ||= lambda {self.send(:on_send)} end @@ -155,11 +162,6 @@ module RedStorm @send_options ||= {:emit => true} end - def self.spout_options - # TODO remove is_distributed - @spout_options ||= {:is_distributed => false} - end - def self.emit? !!self.send_options[:emit] end diff --git a/lib/red_storm/simple_topology.rb b/lib/red_storm/simple_topology.rb index 1a4d71d..b2d1278 100644 --- a/lib/red_storm/simple_topology.rb +++ b/lib/red_storm/simple_topology.rb @@ -1,25 +1,37 @@ require 'red_storm/configuration' +require 'red_storm/configurator' module RedStorm + class TopologyDefinitionError < StandardError; end + class SimpleTopology attr_reader :cluster # LocalCluster reference usable in on_submit block, for example DEFAULT_SPOUT_PARALLELISM = 1 DEFAULT_BOLT_PARALLELISM = 1 - class ComponentDefinition + class ComponentDefinition < Configurator attr_reader :clazz, :parallelism attr_accessor :id # ids are forced to string def initialize(component_class, id, parallelism) + super() @clazz = component_class @id = id.to_s @parallelism = parallelism end + + def is_java? + @clazz.name.split('::').first.downcase == 'java' + end end - class SpoutDefinition < ComponentDefinition; end + class SpoutDefinition < ComponentDefinition + def new_instance(base_class_path) + is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name) + end + end class BoltDefinition < ComponentDefinition attr_accessor :sources @@ -55,28 +67,9 @@ module RedStorm end end end - end - class Configurator - attr_reader :config - - def initialize - @config = Backtype::Config.new - end - - def set(attribute, value) - @config.put(attribute, value) - end - - def method_missing(sym, *args) - config_method = "set#{self.class.camel_case(sym)}" - @config.send(config_method, *args) - end - - private - - def self.camel_case(s) - s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase } + def new_instance(base_class_path) + is_java? ? @clazz.new : JRubyBolt.new(base_class_path, @clazz.name) end end @@ -84,16 +77,17 @@ module RedStorm @log ||= org.apache.log4j.Logger.getLogger(self.name) end - - def self.spout(spout_class, options = {}) + def self.spout(spout_class, options = {}, &spout_block) spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options) spout = SpoutDefinition.new(spout_class, spout_options[:id], spout_options[:parallelism]) + spout.instance_exec(&spout_block) if block_given? self.components << spout end def self.bolt(bolt_class, options = {}, &bolt_block) bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options) bolt = BoltDefinition.new(bolt_class, bolt_options[:id], bolt_options[:parallelism]) + raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given? bolt.instance_exec(&bolt_block) self.components << bolt end @@ -115,12 +109,12 @@ module RedStorm builder = TopologyBuilder.new self.class.spouts.each do |spout| - is_java = spout.clazz.name.split('::').first == 'Java' - builder.setSpout(spout.id, is_java ? spout.clazz.new : JRubySpout.new(base_class_path, spout.clazz.name), spout.parallelism) + declarer = builder.setSpout(spout.id, spout.new_instance(base_class_path), spout.parallelism) + declarer.addConfigurations(spout.config) end self.class.bolts.each do |bolt| - is_java = bolt.clazz.name.split('::').first == 'Java' - declarer = builder.setBolt(bolt.id, is_java ? bolt.clazz.new : JRubyBolt.new(base_class_path, bolt.clazz.name), bolt.parallelism) + declarer = builder.setBolt(bolt.id, bolt.new_instance(base_class_path), bolt.parallelism) + declarer.addConfigurations(bolt.config) bolt.define_grouping(declarer) end diff --git a/lib/tasks/red_storm.rake b/lib/tasks/red_storm.rake index 73b3003..a0120db 100644 --- a/lib/tasks/red_storm.rake +++ b/lib/tasks/red_storm.rake @@ -91,10 +91,6 @@ task :jar, [:include_dir] => [:unpack, :clean_jar] do |t, args| fileset :dir => TARGET_CLASSES_DIR fileset :dir => TARGET_DEPENDENCY_UNPACKED_DIR fileset :dir => TARGET_GEMS_DIR - fileset :dir => CWD do - include :name => "Gemfile" - include :name => "Gemfile.lock" - end fileset :dir => JRUBY_SRC_DIR do exclude :name => "tasks/**" end @@ -145,12 +141,17 @@ task :build => :setup do build_java_dir("#{TARGET_SRC_DIR}") end -task :gems, [:gemfile] => :setup do |t, args| - bundler_options = args[:gemfile].split(":").join(" ") +task :gems, [:bundler_options] => :setup do |t, args| + bundler_options = args[:bundler_options].split(":").join(" ") system("gem install bundler --install-dir #{TARGET_GEMS_DIR}/gems --no-ri --no-rdoc") system("gem install rake --version 0.9.2.2 --install-dir #{TARGET_GEMS_DIR}/gems --no-ri --no-rdoc") system("jruby #{RedStorm::RUNTIME['RUBY_VERSION']} -S bundle install #{bundler_options} --path #{TARGET_GEMS_DIR}/bundler/") + if bundler_options =~ /--gemfile\s+([^\s]+)/ + gemfile = $1 + system("cp #{gemfile} #{TARGET_GEMS_DIR}/bundler/") + system("cp #{gemfile}.lock #{TARGET_GEMS_DIR}/bundler/") + end end def build_java_dir(source_folder) diff --git a/spec/red_storm/simple_bolt_spec.rb b/spec/red_storm/simple_bolt_spec.rb index add2e87..7439491 100644 --- a/spec/red_storm/simple_bolt_spec.rb +++ b/spec/red_storm/simple_bolt_spec.rb @@ -16,9 +16,11 @@ describe RedStorm::SimpleBolt do bolt.should respond_to :cleanup bolt.should respond_to :prepare bolt.should respond_to :declare_output_fields - end + bolt.should respond_to :get_component_configuration + end it "should implement dsl class statements" do + RedStorm::SimpleBolt.should respond_to :configure RedStorm::SimpleBolt.should respond_to :output_fields RedStorm::SimpleBolt.should respond_to :on_init RedStorm::SimpleBolt.should respond_to :on_close diff --git a/spec/red_storm/simple_spout_spec.rb b/spec/red_storm/simple_spout_spec.rb index f5d287f..c9d0f25 100644 --- a/spec/red_storm/simple_spout_spec.rb +++ b/spec/red_storm/simple_spout_spec.rb @@ -15,17 +15,22 @@ describe RedStorm::SimpleSpout do spout.should respond_to :next_tuple spout.should respond_to :open spout.should respond_to :close + spout.should respond_to :activate + spout.should respond_to :deactivate + spout.should respond_to :close + spout.should respond_to :get_component_configuration spout.should respond_to :declare_output_fields - spout.should respond_to :is_distributed spout.should respond_to :ack spout.should respond_to :fail end it "should implement dsl class statement" do - RedStorm::SimpleSpout.should respond_to :set + RedStorm::SimpleSpout.should respond_to :configure RedStorm::SimpleSpout.should respond_to :output_fields RedStorm::SimpleSpout.should respond_to :on_init RedStorm::SimpleSpout.should respond_to :on_close + RedStorm::SimpleSpout.should respond_to :on_activate + RedStorm::SimpleSpout.should respond_to :on_deactivate RedStorm::SimpleSpout.should respond_to :on_send RedStorm::SimpleSpout.should respond_to :on_ack RedStorm::SimpleSpout.should respond_to :on_fail @@ -43,19 +48,15 @@ describe RedStorm::SimpleSpout do describe "dsl" do describe "set statement" do - DEFAULT_SPOUT_OPTIONS = {:is_distributed => false} + DEFAULT_SPOUT_OPTIONS = {} - it "should have default options" do - RedStorm::SimpleSpout.send(:is_distributed?).should be_false - end - - it "should parse options" do - class IsDistributedClass < RedStorm::SimpleSpout - set :is_distributed => true - end - IsDistributedClass.send(:spout_options).should == DEFAULT_SPOUT_OPTIONS.merge(:is_distributed => true) - IsDistributedClass.send(:is_distributed?).should be_true - end + # it "should parse options" do + # class IsDistributedClass < RedStorm::SimpleSpout + # set :is_distributed => true + # end + # IsDistributedClass.send(:spout_options).should == DEFAULT_SPOUT_OPTIONS.merge(:is_distributed => true) + # IsDistributedClass.send(:is_distributed?).should be_true + # end end describe "output_field statement" do @@ -614,16 +615,16 @@ describe RedStorm::SimpleSpout do end end - describe "is_distributed" do - it "should report is_distributed" do - RedStorm::SimpleSpout.is_distributed?.should be_false - class Spout1 < RedStorm::SimpleSpout - set :is_distributed => true - end - spout = Spout1.new - spout.is_distributed.should be_true - end - end + # describe "is_distributed" do + # it "should report is_distributed" do + # RedStorm::SimpleSpout.is_distributed?.should be_false + # class Spout1 < RedStorm::SimpleSpout + # set :is_distributed => true + # end + # spout = Spout1.new + # spout.is_distributed.should be_true + # end + # end describe "ack" do it "should call ack block" do diff --git a/spec/red_storm/simple_topology_spec.rb b/spec/red_storm/simple_topology_spec.rb index 3ce216e..0b50c3e 100644 --- a/spec/red_storm/simple_topology_spec.rb +++ b/spec/red_storm/simple_topology_spec.rb @@ -203,8 +203,8 @@ describe RedStorm::SimpleTopology do builder = mock(RedStorm::TopologyBuilder) RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) builder.should_receive(:createTopology).and_return("topology") - configurator = mock(RedStorm::SimpleTopology::Configurator) - RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator) + configurator = mock(RedStorm::Configurator) + RedStorm::Configurator.should_receive(:new).and_return(configurator) configurator.should_receive(:config).and_return("config") cluster = mock(RedStorm::LocalCluster) RedStorm::LocalCluster.should_receive(:new).and_return(cluster) @@ -217,8 +217,8 @@ describe RedStorm::SimpleTopology do builder = mock(RedStorm::TopologyBuilder) RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) builder.should_receive(:createTopology).and_return("topology") - configurator = mock(RedStorm::SimpleTopology::Configurator) - RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator) + configurator = mock(RedStorm::Configurator) + RedStorm::Configurator.should_receive(:new).and_return(configurator) configurator.should_receive(:config).and_return("config") RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology") Topology1.new.start("base_path", :cluster) @@ -236,17 +236,19 @@ describe RedStorm::SimpleTopology do end builder = mock(RedStorm::TopologyBuilder) - configurator = mock(RedStorm::SimpleTopology::Configurator) + configurator = mock(RedStorm::Configurator) jruby_spout1 = mock(RedStorm::JRubySpout) jruby_spout2 = mock(RedStorm::JRubySpout) + declarer = mock("Declarer") RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) - RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator) + RedStorm::Configurator.should_receive(:new).and_return(configurator) RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1").and_return(jruby_spout1) RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2").and_return(jruby_spout2) - builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1) - builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1) + builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1).and_return(declarer) + builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1).and_return(declarer) + declarer.should_receive("addConfigurations").twice configurator.should_receive(:config).and_return("config") builder.should_receive(:createTopology).and_return("topology") RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology") @@ -271,28 +273,24 @@ describe RedStorm::SimpleTopology do end builder = mock(RedStorm::TopologyBuilder) - configurator = mock(RedStorm::SimpleTopology::Configurator) + configurator = mock(RedStorm::Configurator) jruby_bolt1 = mock(RedStorm::JRubyBolt) jruby_bolt2 = mock(RedStorm::JRubyBolt) + declarer = mock("Declarer") RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) - RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator) + RedStorm::Configurator.should_receive(:new).and_return(configurator) RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1").and_return(jruby_bolt1) RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2").and_return(jruby_bolt2) - builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return("storm_bolt1") - builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return("storm_bolt2") + builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return(declarer) + builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer) + declarer.should_receive("addConfigurations").twice - bolt_definition1.should_receive(:define_grouping).with("storm_bolt1") - bolt_definition2.should_receive(:define_grouping).with("storm_bolt2") - bolt_definition1.should_receive(:clazz).twice.and_return(BoltClass1) - bolt_definition2.should_receive(:clazz).twice.and_return(BoltClass2) + bolt_definition1.should_receive(:define_grouping).with(declarer) + bolt_definition2.should_receive(:define_grouping).with(declarer) bolt_definition1.should_receive(:parallelism).and_return(2) bolt_definition2.should_receive(:parallelism).and_return(3) - bolt_definition1.should_receive(:id).any_number_of_times.and_return("id1") - bolt_definition2.should_receive(:id).any_number_of_times.and_return("id2") - # bolt_definition1.should_receive(:id=).with('1') - # bolt_definition2.should_receive(:id=).with('2') configurator.should_receive(:config).and_return("config") builder.should_receive(:createTopology).and_return("topology") @@ -306,16 +304,17 @@ describe RedStorm::SimpleTopology do before(:each) do builder = mock(RedStorm::TopologyBuilder) - configurator = mock(RedStorm::SimpleTopology::Configurator) + configurator = mock(RedStorm::Configurator) jruby_bolt = mock(RedStorm::JRubyBolt) jruby_spout = mock(RedStorm::JRubySpout) @declarer = mock("InputDeclarer") RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) - RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator) + RedStorm::Configurator.should_receive(:new).and_return(configurator) RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1").and_return(jruby_bolt) RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1").and_return(jruby_spout) builder.should_receive("setBolt").with('bolt_class1', jruby_bolt, 1).and_return(@declarer) builder.should_receive("setSpout").with('1', jruby_spout, 1).and_return(@declarer) + @declarer.should_receive("addConfigurations").twice configurator.should_receive(:config).and_return("config") builder.should_receive(:createTopology).and_return("topology") RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology") @@ -461,8 +460,8 @@ describe RedStorm::SimpleTopology do builder = mock(RedStorm::TopologyBuilder) RedStorm::TopologyBuilder.should_receive(:new).and_return(builder) builder.should_receive(:createTopology).and_return("topology") - configurator = mock(RedStorm::SimpleTopology::Configurator) - RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator) + configurator = mock(RedStorm::Configurator) + RedStorm::Configurator.should_receive(:new).and_return(configurator) configurator.should_receive(:config).and_return("config") cluster = mock(RedStorm::LocalCluster)