0.7 component configuration and Gemfile handling

This commit is contained in:
Colin Surprenant 2012-05-31 15:27:54 -04:00
parent 170da9b887
commit 31ad5849b7
13 changed files with 160 additions and 101 deletions

View File

@ -7,22 +7,27 @@ require 'examples/simple/exclamation_bolt'
module RedStorm
module Examples
class ExclamationTopology < RedStorm::SimpleTopology
spout TestWordSpout, :parallelism => 10
spout TestWordSpout, :parallelism => 5 do
debug true
end
bolt ExclamationBolt, :parallelism => 3 do
bolt ExclamationBolt, :parallelism => 2 do
source TestWordSpout, :shuffle
# max_task_parallelism 1
end
bolt ExclamationBolt, :id => :ExclamationBolt2, :parallelism => 2 do
source ExclamationBolt, :shuffle
# max_task_parallelism 1
debug true
end
configure do |env|
debug true
debug false
set "topology.worker.childopts", "-Djruby.compat.version=RUBY1_9"
case env
when :local
max_task_parallelism 3
max_task_parallelism 40
when :cluster
num_workers 20
max_spout_pending(1000);

View File

@ -9,13 +9,13 @@ module RedStorm
if JAR_CONTEXT
REDSTORM_HOME = LAUNCH_PATH
TARGET_PATH = LAUNCH_PATH
BUNDLE_GEMFILE = "#{TARGET_PATH}/Gemfile"
BUNDLE_GEMFILE = "#{TARGET_PATH}/bundler/Gemfile"
BUNDLE_PATH = "#{TARGET_PATH}/bundler/#{Gem.ruby_engine}/#{Gem::ConfigMap[:ruby_version]}/"
GEM_PATH = "#{TARGET_PATH}/gems/"
else
REDSTORM_HOME = File.expand_path(LAUNCH_PATH + '/..')
TARGET_PATH = Dir.pwd
BUNDLE_GEMFILE = "#{TARGET_PATH}/Gemfile"
BUNDLE_GEMFILE = "#{TARGET_PATH}/target/gems/bundler/Gemfile"
BUNDLE_PATH = "#{TARGET_PATH}/target/gems/bundler/#{Gem.ruby_engine}/#{Gem::ConfigMap[:ruby_version]}/"
GEM_PATH = "#{TARGET_PATH}/target/gems/gems"
end

View File

@ -0,0 +1,25 @@
module RedStorm
class Configurator
attr_reader :config
def initialize
@config = Backtype::Config.new
end
def set(attribute, value)
@config.put(attribute, value)
end
def method_missing(sym, *args)
config_method = "set#{self.class.camel_case(sym)}"
@config.send(config_method, *args)
end
private
def self.camel_case(s)
s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase }
end
end
end

13
lib/red_storm/loggable.rb Normal file
View File

@ -0,0 +1,13 @@
module RedStorm
module Loggable
def self.log
@log ||= Logger.getLogger(self.name)
end
def log
self.class.log
end
end
end

View File

@ -9,6 +9,9 @@ java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
java_import 'org.apache.log4j.Logger'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'

View File

@ -9,6 +9,9 @@ java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
java_import 'org.apache.log4j.Logger'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'

View File

@ -1,3 +1,5 @@
require 'red_storm/configurator'
module RedStorm
class SimpleBolt
@ -13,6 +15,10 @@ module RedStorm
@fields = fields.map(&:to_s)
end
def self.configure(&configure_block)
@configure_block = block_given? ? configure_block : lambda {}
end
def self.on_receive(*args, &on_receive_block)
options = args.last.is_a?(Hash) ? args.pop : {}
method_name = args.first
@ -73,8 +79,9 @@ module RedStorm
end
def get_component_configuration
# TODO: dummy implemetation
Backtype::Config.new
configurator = Configurator.new
configurator.instance_exec(&self.class.configure_block)
configurator.config
end
private
@ -87,6 +94,10 @@ module RedStorm
@fields ||= []
end
def self.configure_block
@configure_block ||= lambda {}
end
def self.on_receive_block
@on_receive_block ||= lambda {|tuple| self.send(:on_receive, tuple)}
end

View File

@ -1,3 +1,5 @@
require 'red_storm/configurator'
module RedStorm
class SimpleSpout
@ -5,8 +7,8 @@ module RedStorm
# DSL class methods
def self.set(options = {})
self.spout_options.merge!(options)
def self.configure(&configure_block)
@configure_block = block_given? ? configure_block : lambda {}
end
def self.log
@ -105,8 +107,9 @@ module RedStorm
end
def get_component_configuration
# TODO: dummy implemetation
Backtype::Config.new
configurator = Configurator.new
configurator.instance_exec(&self.class.configure_block)
configurator.config
end
private
@ -123,6 +126,10 @@ module RedStorm
@fields ||= []
end
def self.configure_block
@configure_block ||= lambda {}
end
def self.on_send_block
@on_send_block ||= lambda {self.send(:on_send)}
end
@ -155,11 +162,6 @@ module RedStorm
@send_options ||= {:emit => true}
end
def self.spout_options
# TODO remove is_distributed
@spout_options ||= {:is_distributed => false}
end
def self.emit?
!!self.send_options[:emit]
end

View File

@ -1,25 +1,37 @@
require 'red_storm/configuration'
require 'red_storm/configurator'
module RedStorm
class TopologyDefinitionError < StandardError; end
class SimpleTopology
attr_reader :cluster # LocalCluster reference usable in on_submit block, for example
DEFAULT_SPOUT_PARALLELISM = 1
DEFAULT_BOLT_PARALLELISM = 1
class ComponentDefinition
class ComponentDefinition < Configurator
attr_reader :clazz, :parallelism
attr_accessor :id # ids are forced to string
def initialize(component_class, id, parallelism)
super()
@clazz = component_class
@id = id.to_s
@parallelism = parallelism
end
def is_java?
@clazz.name.split('::').first.downcase == 'java'
end
end
class SpoutDefinition < ComponentDefinition; end
class SpoutDefinition < ComponentDefinition
def new_instance(base_class_path)
is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name)
end
end
class BoltDefinition < ComponentDefinition
attr_accessor :sources
@ -55,28 +67,9 @@ module RedStorm
end
end
end
end
class Configurator
attr_reader :config
def initialize
@config = Backtype::Config.new
end
def set(attribute, value)
@config.put(attribute, value)
end
def method_missing(sym, *args)
config_method = "set#{self.class.camel_case(sym)}"
@config.send(config_method, *args)
end
private
def self.camel_case(s)
s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase }
def new_instance(base_class_path)
is_java? ? @clazz.new : JRubyBolt.new(base_class_path, @clazz.name)
end
end
@ -84,16 +77,17 @@ module RedStorm
@log ||= org.apache.log4j.Logger.getLogger(self.name)
end
def self.spout(spout_class, options = {})
def self.spout(spout_class, options = {}, &spout_block)
spout_options = {:id => self.underscore(spout_class), :parallelism => DEFAULT_SPOUT_PARALLELISM}.merge(options)
spout = SpoutDefinition.new(spout_class, spout_options[:id], spout_options[:parallelism])
spout.instance_exec(&spout_block) if block_given?
self.components << spout
end
def self.bolt(bolt_class, options = {}, &bolt_block)
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = BoltDefinition.new(bolt_class, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
bolt.instance_exec(&bolt_block)
self.components << bolt
end
@ -115,12 +109,12 @@ module RedStorm
builder = TopologyBuilder.new
self.class.spouts.each do |spout|
is_java = spout.clazz.name.split('::').first == 'Java'
builder.setSpout(spout.id, is_java ? spout.clazz.new : JRubySpout.new(base_class_path, spout.clazz.name), spout.parallelism)
declarer = builder.setSpout(spout.id, spout.new_instance(base_class_path), spout.parallelism)
declarer.addConfigurations(spout.config)
end
self.class.bolts.each do |bolt|
is_java = bolt.clazz.name.split('::').first == 'Java'
declarer = builder.setBolt(bolt.id, is_java ? bolt.clazz.new : JRubyBolt.new(base_class_path, bolt.clazz.name), bolt.parallelism)
declarer = builder.setBolt(bolt.id, bolt.new_instance(base_class_path), bolt.parallelism)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end

View File

@ -91,10 +91,6 @@ task :jar, [:include_dir] => [:unpack, :clean_jar] do |t, args|
fileset :dir => TARGET_CLASSES_DIR
fileset :dir => TARGET_DEPENDENCY_UNPACKED_DIR
fileset :dir => TARGET_GEMS_DIR
fileset :dir => CWD do
include :name => "Gemfile"
include :name => "Gemfile.lock"
end
fileset :dir => JRUBY_SRC_DIR do
exclude :name => "tasks/**"
end
@ -145,12 +141,17 @@ task :build => :setup do
build_java_dir("#{TARGET_SRC_DIR}")
end
task :gems, [:gemfile] => :setup do |t, args|
bundler_options = args[:gemfile].split(":").join(" ")
task :gems, [:bundler_options] => :setup do |t, args|
bundler_options = args[:bundler_options].split(":").join(" ")
system("gem install bundler --install-dir #{TARGET_GEMS_DIR}/gems --no-ri --no-rdoc")
system("gem install rake --version 0.9.2.2 --install-dir #{TARGET_GEMS_DIR}/gems --no-ri --no-rdoc")
system("jruby #{RedStorm::RUNTIME['RUBY_VERSION']} -S bundle install #{bundler_options} --path #{TARGET_GEMS_DIR}/bundler/")
if bundler_options =~ /--gemfile\s+([^\s]+)/
gemfile = $1
system("cp #{gemfile} #{TARGET_GEMS_DIR}/bundler/")
system("cp #{gemfile}.lock #{TARGET_GEMS_DIR}/bundler/")
end
end
def build_java_dir(source_folder)

View File

@ -16,9 +16,11 @@ describe RedStorm::SimpleBolt do
bolt.should respond_to :cleanup
bolt.should respond_to :prepare
bolt.should respond_to :declare_output_fields
end
bolt.should respond_to :get_component_configuration
end
it "should implement dsl class statements" do
RedStorm::SimpleBolt.should respond_to :configure
RedStorm::SimpleBolt.should respond_to :output_fields
RedStorm::SimpleBolt.should respond_to :on_init
RedStorm::SimpleBolt.should respond_to :on_close

View File

@ -15,17 +15,22 @@ describe RedStorm::SimpleSpout do
spout.should respond_to :next_tuple
spout.should respond_to :open
spout.should respond_to :close
spout.should respond_to :activate
spout.should respond_to :deactivate
spout.should respond_to :close
spout.should respond_to :get_component_configuration
spout.should respond_to :declare_output_fields
spout.should respond_to :is_distributed
spout.should respond_to :ack
spout.should respond_to :fail
end
it "should implement dsl class statement" do
RedStorm::SimpleSpout.should respond_to :set
RedStorm::SimpleSpout.should respond_to :configure
RedStorm::SimpleSpout.should respond_to :output_fields
RedStorm::SimpleSpout.should respond_to :on_init
RedStorm::SimpleSpout.should respond_to :on_close
RedStorm::SimpleSpout.should respond_to :on_activate
RedStorm::SimpleSpout.should respond_to :on_deactivate
RedStorm::SimpleSpout.should respond_to :on_send
RedStorm::SimpleSpout.should respond_to :on_ack
RedStorm::SimpleSpout.should respond_to :on_fail
@ -43,19 +48,15 @@ describe RedStorm::SimpleSpout do
describe "dsl" do
describe "set statement" do
DEFAULT_SPOUT_OPTIONS = {:is_distributed => false}
DEFAULT_SPOUT_OPTIONS = {}
it "should have default options" do
RedStorm::SimpleSpout.send(:is_distributed?).should be_false
end
it "should parse options" do
class IsDistributedClass < RedStorm::SimpleSpout
set :is_distributed => true
end
IsDistributedClass.send(:spout_options).should == DEFAULT_SPOUT_OPTIONS.merge(:is_distributed => true)
IsDistributedClass.send(:is_distributed?).should be_true
end
# it "should parse options" do
# class IsDistributedClass < RedStorm::SimpleSpout
# set :is_distributed => true
# end
# IsDistributedClass.send(:spout_options).should == DEFAULT_SPOUT_OPTIONS.merge(:is_distributed => true)
# IsDistributedClass.send(:is_distributed?).should be_true
# end
end
describe "output_field statement" do
@ -614,16 +615,16 @@ describe RedStorm::SimpleSpout do
end
end
describe "is_distributed" do
it "should report is_distributed" do
RedStorm::SimpleSpout.is_distributed?.should be_false
class Spout1 < RedStorm::SimpleSpout
set :is_distributed => true
end
spout = Spout1.new
spout.is_distributed.should be_true
end
end
# describe "is_distributed" do
# it "should report is_distributed" do
# RedStorm::SimpleSpout.is_distributed?.should be_false
# class Spout1 < RedStorm::SimpleSpout
# set :is_distributed => true
# end
# spout = Spout1.new
# spout.is_distributed.should be_true
# end
# end
describe "ack" do
it "should call ack block" do

View File

@ -203,8 +203,8 @@ describe RedStorm::SimpleTopology do
builder = mock(RedStorm::TopologyBuilder)
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
builder.should_receive(:createTopology).and_return("topology")
configurator = mock(RedStorm::SimpleTopology::Configurator)
RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator)
configurator = mock(RedStorm::Configurator)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
configurator.should_receive(:config).and_return("config")
cluster = mock(RedStorm::LocalCluster)
RedStorm::LocalCluster.should_receive(:new).and_return(cluster)
@ -217,8 +217,8 @@ describe RedStorm::SimpleTopology do
builder = mock(RedStorm::TopologyBuilder)
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
builder.should_receive(:createTopology).and_return("topology")
configurator = mock(RedStorm::SimpleTopology::Configurator)
RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator)
configurator = mock(RedStorm::Configurator)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
configurator.should_receive(:config).and_return("config")
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology")
Topology1.new.start("base_path", :cluster)
@ -236,17 +236,19 @@ describe RedStorm::SimpleTopology do
end
builder = mock(RedStorm::TopologyBuilder)
configurator = mock(RedStorm::SimpleTopology::Configurator)
configurator = mock(RedStorm::Configurator)
jruby_spout1 = mock(RedStorm::JRubySpout)
jruby_spout2 = mock(RedStorm::JRubySpout)
declarer = mock("Declarer")
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1").and_return(jruby_spout1)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass2").and_return(jruby_spout2)
builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1)
builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1)
builder.should_receive("setSpout").with('spout_class1', jruby_spout1, 1).and_return(declarer)
builder.should_receive("setSpout").with('spout_class2', jruby_spout2, 1).and_return(declarer)
declarer.should_receive("addConfigurations").twice
configurator.should_receive(:config).and_return("config")
builder.should_receive(:createTopology).and_return("topology")
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology")
@ -271,28 +273,24 @@ describe RedStorm::SimpleTopology do
end
builder = mock(RedStorm::TopologyBuilder)
configurator = mock(RedStorm::SimpleTopology::Configurator)
configurator = mock(RedStorm::Configurator)
jruby_bolt1 = mock(RedStorm::JRubyBolt)
jruby_bolt2 = mock(RedStorm::JRubyBolt)
declarer = mock("Declarer")
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1").and_return(jruby_bolt1)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass2").and_return(jruby_bolt2)
builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return("storm_bolt1")
builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return("storm_bolt2")
builder.should_receive("setBolt").with("id1", jruby_bolt1, 2).and_return(declarer)
builder.should_receive("setBolt").with("id2", jruby_bolt2, 3).and_return(declarer)
declarer.should_receive("addConfigurations").twice
bolt_definition1.should_receive(:define_grouping).with("storm_bolt1")
bolt_definition2.should_receive(:define_grouping).with("storm_bolt2")
bolt_definition1.should_receive(:clazz).twice.and_return(BoltClass1)
bolt_definition2.should_receive(:clazz).twice.and_return(BoltClass2)
bolt_definition1.should_receive(:define_grouping).with(declarer)
bolt_definition2.should_receive(:define_grouping).with(declarer)
bolt_definition1.should_receive(:parallelism).and_return(2)
bolt_definition2.should_receive(:parallelism).and_return(3)
bolt_definition1.should_receive(:id).any_number_of_times.and_return("id1")
bolt_definition2.should_receive(:id).any_number_of_times.and_return("id2")
# bolt_definition1.should_receive(:id=).with('1')
# bolt_definition2.should_receive(:id=).with('2')
configurator.should_receive(:config).and_return("config")
builder.should_receive(:createTopology).and_return("topology")
@ -306,16 +304,17 @@ describe RedStorm::SimpleTopology do
before(:each) do
builder = mock(RedStorm::TopologyBuilder)
configurator = mock(RedStorm::SimpleTopology::Configurator)
configurator = mock(RedStorm::Configurator)
jruby_bolt = mock(RedStorm::JRubyBolt)
jruby_spout = mock(RedStorm::JRubySpout)
@declarer = mock("InputDeclarer")
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
RedStorm::JRubyBolt.should_receive(:new).with("base_path", "BoltClass1").and_return(jruby_bolt)
RedStorm::JRubySpout.should_receive(:new).with("base_path", "SpoutClass1").and_return(jruby_spout)
builder.should_receive("setBolt").with('bolt_class1', jruby_bolt, 1).and_return(@declarer)
builder.should_receive("setSpout").with('1', jruby_spout, 1).and_return(@declarer)
@declarer.should_receive("addConfigurations").twice
configurator.should_receive(:config).and_return("config")
builder.should_receive(:createTopology).and_return("topology")
RedStorm::StormSubmitter.should_receive("submitTopology").with("topology1", "config", "topology")
@ -461,8 +460,8 @@ describe RedStorm::SimpleTopology do
builder = mock(RedStorm::TopologyBuilder)
RedStorm::TopologyBuilder.should_receive(:new).and_return(builder)
builder.should_receive(:createTopology).and_return("topology")
configurator = mock(RedStorm::SimpleTopology::Configurator)
RedStorm::SimpleTopology::Configurator.should_receive(:new).and_return(configurator)
configurator = mock(RedStorm::Configurator)
RedStorm::Configurator.should_receive(:new).and_return(configurator)
configurator.should_receive(:config).and_return("config")
cluster = mock(RedStorm::LocalCluster)