Merge branch 'drpc_trident' into v0.6.5

Conflicts:
	lib/red_storm/topology_launcher.rb
This commit is contained in:
Colin Surprenant 2013-05-13 09:44:05 -04:00
commit 135c6818ff
8 changed files with 193 additions and 12 deletions

View File

@ -6,3 +6,4 @@ require 'red_storm/configuration'
require 'red_storm/simple_bolt'
require 'red_storm/simple_spout'
require 'red_storm/simple_topology'
require 'red_storm/simple_drpc_topology'

View File

@ -7,7 +7,7 @@ TARGET_LIB_DIR = "#{TARGET_DIR}/lib"
TARGET_SRC_DIR = "#{TARGET_DIR}/src"
TARGET_GEM_DIR = "#{TARGET_DIR}/gems/gems"
TARGET_SPECS_DIR = "#{TARGET_DIR}/gems/specifications"
TARGET_CLASSES_DIR = "#{TARGET_DIR}/classes"
TARGET_CLASSES_DIR = "#{TARGET_DIR}/classes"
TARGET_DEPENDENCY_DIR = "#{TARGET_DIR}/dependency"
TARGET_DEPENDENCY_UNPACKED_DIR = "#{TARGET_DIR}/dependency-unpacked"
TARGET_CLUSTER_JAR = "#{TARGET_DIR}/cluster-topology.jar"
@ -28,7 +28,7 @@ DEFAULT_IVY_TOPOLOGY_DEPENDENCIES = "#{SRC_IVY_DIR}/topology_dependencies.xml"
CUSTOM_IVY_TOPOLOGY_DEPENDENCIES = "#{DST_IVY_DIR}/topology_dependencies.xml"
module RedStorm
class Application
TASKS_FILE = "#{RedStorm::REDSTORM_HOME}/lib/tasks/red_storm.rake"
@ -40,7 +40,7 @@ module RedStorm
def self.cluster_storm_command(class_file, ruby_mode = nil)
"storm jar #{TARGET_CLUSTER_JAR} -Djruby.compat.version=#{RedStorm.jruby_mode_token(ruby_mode)} redstorm.TopologyLauncher cluster #{class_file}"
end
def self.usage
puts("usage: redstorm version")
puts(" redstorm install")
@ -84,4 +84,4 @@ module RedStorm
end
end
end

View File

@ -0,0 +1,40 @@
require 'java'
java_import 'storm.trident.tuple.TridentTuple'
java_import 'storm.trident.operation.TridentCollector'
java_import 'storm.trident.operation.TridentOperationContext'
java_import 'storm.trident.operation.Function'
java_import 'java.util.Map'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
class ProxyFunction
java_implements Function
java_signature 'Function (String base_class_path, String real_class_name)'
def initialize(base_class_path, real_class_name)
@real = Object.module_eval(real_class_name).new
rescue NameError
require base_class_path
@real = Object.module_eval(real_class_name).new
end
java_signature 'void execute(TridentTuple, TridentCollector)'
def execute(_trident_tuple, _trident_collector)
@real.execute(_trident_tuple, _trident_collector)
end
java_signature 'void cleanup()'
def cleanup()
@real.cleanup()
end
java_signature 'void prepare(Map, TridentOperationContext)'
def prepare(_map, _trident_operation_context)
@real.prepare(_map, _trident_operation_context)
end
end

View File

@ -0,0 +1,87 @@
require 'java'
require 'red_storm/configuration'
require 'red_storm/configurator'
module RedStorm
class InputBoltDefinition < SimpleTopology::BoltDefinition
attr_accessor :grouping
def initialize(*args)
super
@grouping = :none
end
def grouping(grouping)
@grouping = @grouping
end
def define_grouping(declarer)
case @grouping
when :fields
declarer.fieldsGrouping(Fields.new(*([params].flatten.map(&:to_s))))
when :global
declarer.globalGrouping()
when :shuffle
declarer.shuffleGrouping()
when :local_or_shuffle
declarer.localOrShuffleGrouping()
when :none
declarer.noneGrouping()
when :all
declarer.allGrouping()
when :direct
declarer.directGrouping()
else
raise("unknown grouper=#{grouper.inspect}")
end
end
end
class SimpleDRPCTopology < SimpleTopology
def self.spout
raise TopologyDefinitionError, "DRPC spout is already defined"
end
def start(base_class_path, env)
builder = Java::BacktypeStormDrpc::LinearDRPCTopologyBuilder.new(self.class.topology_name)
self.class.bolts.each do |bolt|
declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end
# set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}
configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)
drpc = nil
if env == :local
drpc = LocalDRPC.new
submitter = @cluster = LocalCluster.new
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createLocalTopology(drpc))
else
submitter = StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createRemoteTopology)
end
instance_exec(env, drpc, &self.class.submit_block)
end
def self.input_bolt(bolt_class, *args, &bolt_block)
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = InputBoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
bolt.instance_exec(&bolt_block)
self.components << bolt
end
end
end

View File

@ -36,7 +36,7 @@ module RedStorm
end
class SpoutDefinition < ComponentDefinition
# WARNING non-dry see BoltDefinition#new_instance
def new_instance(base_class_path)
if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout"
@ -49,7 +49,7 @@ module RedStorm
# is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name)
end
end
class BoltDefinition < ComponentDefinition
attr_accessor :sources, :command

View File

@ -6,9 +6,11 @@ module Backtype
end
java_import 'backtype.storm.LocalCluster'
java_import 'backtype.storm.LocalDRPC'
java_import 'backtype.storm.StormSubmitter'
java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.coordination.BatchBoltExecutor'
java_import 'backtype.storm.drpc.LinearDRPCTopologyBuilder'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
@ -24,7 +26,7 @@ java_import 'redstorm.storm.jruby.JRubyTransactionalCommitterBolt'
java_package 'redstorm'
# TopologyLauncher is the application entry point when launching a topology. Basically it will
# TopologyLauncher is the application entry point when launching a topology. Basically it will
# call require on the specified Ruby topology class file path and call its start method
class TopologyLauncher
@ -43,14 +45,14 @@ class TopologyLauncher
$:.unshift File.expand_path(launch_path + '/lib')
$:.unshift File.expand_path(launch_path + '/target/lib')
require "#{class_path}"
require "#{class_path}"
topology_name = RedStorm::Configuration.topology_class.respond_to?(:topology_name) ? "/#{RedStorm::Configuration.topology_class.topology_name}" : ''
puts("RedStorm v#{RedStorm::VERSION} starting topology #{RedStorm::Configuration.topology_class.name}#{topology_name} in #{env.to_s} environment")
RedStorm::Configuration.topology_class.new.start(class_path, env)
end
private
private
def self.camel_case(s)
s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase }

View File

@ -11,9 +11,9 @@ require 'jruby/jrubyc'
require 'red_storm'
require 'red_storm/application'
DEP_STORM_VERSION = "0.8.1"
DEP_JRUBY_VERSION = "1.6.8"
INSTALL_IVY_VERSION = "2.2.0"
DEP_STORM_VERSION = "0.8.2"
DEP_JRUBY_VERSION = "1.7.3"
INSTALL_IVY_VERSION = "2.3.0"
DEFAULT_DEPENDENCIES = {
:storm_artifacts => [

View File

@ -0,0 +1,51 @@
package redstorm.storm.jruby;
import storm.trident.tuple.TridentTuple;
import storm.trident.operation.TridentCollector;
import java.util.Map;
import storm.trident.operation.TridentOperationContext;
import storm.trident.operation.Function;
public class JRubyProxyFunction implements Function {
Function _proxy;
String _realClassName;
String _baseClassPath;
public JRubyProxyFunction(final String baseClassPath, final String realClassName) {
_baseClassPath = baseClassPath;
_realClassName = realClassName;
}
@Override
public void execute(final TridentTuple _tridentTuple, final TridentCollector _tridentCollector) {
if(_proxy == null) {
_proxy = newProxy(_baseClassPath, _realClassName);
}
_proxy.execute(_tridentTuple, _tridentCollector);
}
@Override
public void cleanup() {
_proxy.cleanup();
}
@Override
public void prepare(final Map _map, final TridentOperationContext _tridentOperationContext) {
if(_proxy == null) {
_proxy = newProxy(_baseClassPath, _realClassName);
}
_proxy.prepare(_map, _tridentOperationContext);
}
private static Function newProxy(final String baseClassPath, final String realClassName) {
try {
redstorm.proxy.ProxyFunction proxy = new redstorm.proxy.ProxyFunction(baseClassPath, realClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}