Merge branch 'master' of git://github.com/dinedal/redstorm into drpc_trident

Conflicts:
	lib/tasks/red_storm.rake
This commit is contained in:
Colin Surprenant 2013-05-10 15:50:17 -04:00
commit cff9c5bc5a
13 changed files with 516 additions and 18 deletions

59
generator.rb Normal file
View File

@ -0,0 +1,59 @@
require 'erb'
require 'pry'
require 'java'
require 'active_support/core_ext'
Dir["../triggit-storm/target/dependency/storm/default/*"].each{|f| $CLASSPATH << File.expand_path(f) }
PROXY_JRUBY_TEMPLATE = File.read("./ruby_proxy.erb")
PROXY_JAVA_TEMPLATE = File.read("./java_proxy.erb")
to_generate = ["storm.trident.operation.Function"]
# Return all java functions of a java class
def get_functions(jlass)
jlass.declared_instance_methods.concat( jlass.interfaces.map{|i| get_functions(i) }.flatten )
end
# Return all java deps of a class
def get_java_deps(functions, klass)
functions.map{|f| [f.argument_types.map{|at| at.name}, f.return_type ? f.return_type.name : "void"] }.flatten.uniq.reject{|t| t.split('.').count == 1} << klass
end
to_generate.each do |klass|
_functions = get_functions(Object.const_get(java_import(klass)[0].to_s.split("::")[-1]).java_class)
java_deps = get_java_deps(_functions, klass)
# Boil down functions to {:function_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } }
functions = _functions.reduce({}) do |memo, f|
before_serialization = %w{ }.include?(f.name.to_s)
memoize = %w{ prepare execute }.include?(f.name.to_s)
memo[:"#{f.name}"] = {
:return_type => f.return_type ? f.return_type.name.split('.')[-1] : "void",
:args => f.argument_types.map {|at| {:"_#{at.name.split('.')[-1].camelize(:lower)}" => at.name.split('.')[-1]} }.reduce({}){|m,o| m.merge(o)},
:before_serialization => before_serialization,
:memoize => memoize
}
memo
end
interface_name = klass.split(".")[-1]
# IBlah to ProxyBlah if IBlah
ruby_class_name = "Proxy#{interface_name.starts_with?('I') ? interface_name[1..-1] : interface_name}"
java_class_name = "JRuby#{ruby_class_name}"
# Rubyify java functions into {:method_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } }
methods = functions.map do |f_name, params|
{f_name.to_s.underscore.to_sym => {:return_type => params[:return_type], :args => params[:args].map{|name, type| {name.to_s.underscore.to_sym => type}}.reduce({}){|m,o| m.merge(o)} }}
end.reduce({}){|m,o| m.merge(o)}
File.open("./lib/red_storm/proxy/#{ruby_class_name.underscore}.rb", 'w') {|f| f.write(ERB.new(PROXY_JRUBY_TEMPLATE).result(binding)) }
File.open("./src/main/redstorm/storm/jruby/#{java_class_name}.java", 'w') {|f| f.write(ERB.new(PROXY_JAVA_TEMPLATE).result(binding)) }
end

51
java_proxy.erb Normal file
View File

@ -0,0 +1,51 @@
package redstorm.storm.jruby;
<% java_deps.each do |dep| %>
import <%= dep %>;<% end %>
public class <%= java_class_name %> implements <%= interface_name %> {
<%= interface_name %> _proxy;
String _realClassName;
String _baseClassPath;
String[] _fields;
public <%= java_class_name %>(final String baseClassPath, final String realClassName, final String[] fields) {
_baseClassPath = baseClassPath;
_realClassName = realClassName;
_fields = fields;
}
<% functions.each do |function_name, params| %>
@Override
public <%= params[:return_type] %> <%= function_name %>(<%= params[:args].map{|n,t| ["final #{t}", n].join(' ') }.flatten.join(', ') %>) {
<% if function_name == :open %>
_proxy = newProxy(_baseClassPath, _realClassName);
_proxy.open(<%= params[:args].keys.flatten.join(', ') %>);
<% elsif function_name == :declareOutputFields %>
if (_fields.length > 0) {
<%= params[:args].values[0] %>.declare(new Fields(_fields));
} else {
newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
}
<% elsif params[:before_serialization] %>
newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
<% elsif params[:memoize] %>
if(_proxy == null) {
_proxy = newProxy(_baseClassPath, _realClassName);
}
_proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
<% else %>
_proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
<% end %>
}
<% end %>
private static <%= interface_name %> newProxy(final String baseClassPath, final String realClassName) {
try {
redstorm.proxy.<%= ruby_class_name %> proxy = new redstorm.proxy.<%= ruby_class_name %>(baseClassPath, realClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -6,3 +6,4 @@ require 'red_storm/configuration'
require 'red_storm/simple_bolt'
require 'red_storm/simple_spout'
require 'red_storm/simple_topology'
require 'red_storm/simple_drpc_topology'

View File

@ -7,7 +7,7 @@ TARGET_LIB_DIR = "#{TARGET_DIR}/lib"
TARGET_SRC_DIR = "#{TARGET_DIR}/src"
TARGET_GEM_DIR = "#{TARGET_DIR}/gems/gems"
TARGET_SPECS_DIR = "#{TARGET_DIR}/gems/specifications"
TARGET_CLASSES_DIR = "#{TARGET_DIR}/classes"
TARGET_CLASSES_DIR = "#{TARGET_DIR}/classes"
TARGET_DEPENDENCY_DIR = "#{TARGET_DIR}/dependency"
TARGET_DEPENDENCY_UNPACKED_DIR = "#{TARGET_DIR}/dependency-unpacked"
TARGET_CLUSTER_JAR = "#{TARGET_DIR}/cluster-topology.jar"
@ -26,9 +26,9 @@ CUSTOM_IVY_SETTINGS = "#{DST_IVY_DIR}/settings.xml"
module RedStorm
class Application
TASKS_FILE = "#{RedStorm::REDSTORM_HOME}/lib/tasks/red_storm.rake"
class Application
TASKS_FILE = "#{RedStorm::REDSTORM_HOME}/lib/tasks/red_storm.rake"
def self.local_storm_command(class_file, ruby_mode = nil)
src_dir = File.expand_path(File.dirname(class_file))
@ -38,7 +38,7 @@ module RedStorm
def self.cluster_storm_command(class_file, ruby_mode = nil)
"storm jar #{TARGET_CLUSTER_JAR} -Djruby.compat.version=#{RedStorm.jruby_mode_token(ruby_mode)} redstorm.TopologyLauncher cluster #{class_file}"
end
def self.usage
puts("usage: redstorm version")
puts(" redstorm install")
@ -82,4 +82,4 @@ module RedStorm
end
end
end

View File

@ -0,0 +1,71 @@
require 'java'
java_import 'storm.trident.operation.TridentCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'storm.trident.spout.IBatchSpout'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
# the BatchSpout class is a proxy to the real batch spout to avoid having to deal with all the
# Java artifacts when creating a spout.
#
# The real batch spout class implementation must define these methods:
# - open(conf, context, collector)
# - emitBatch
# - getOutputFields
# - ack(batch_id)
#
# and optionnaly:
# - close
#
class BatchSpout
java_implements IBatchSpout
java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)'
def initialize(base_class_path, real_spout_class_name)
@real_spout = Object.module_eval(real_spout_class_name).new
rescue NameError
require base_class_path
@real_spout = Object.module_eval(real_spout_class_name).new
end
java_signature 'void open(Map, TopologyContext)'
def open(conf, context)
@real_spout.open(conf, context)
end
java_signature 'void close()'
def close
@real_spout.close if @real_spout.respond_to?(:close)
end
java_signature 'void emitBatch(long, TridentCollector)'
def emitBatch(batch_id, collector)
@real_spout.emit_batch(batch_id, collector)
end
java_signature 'void ack(long)'
def ack(batch_id)
@real_spout.ack(batch_id)
end
java_signature 'Fields getOutputFields()'
def getOutputFields
@real_spout.get_output_fields()
end
java_signature 'Map<String, Object> getComponentConfiguration()'
def getComponentConfiguration
@real_spout.get_component_configuration
end
end

View File

@ -0,0 +1,48 @@
require 'java'
java_import 'storm.trident.tuple.TridentTuple'
java_import 'storm.trident.operation.TridentCollector'
java_import 'java.util.Map'
java_import 'storm.trident.operation.TridentOperationContext'
java_import 'storm.trident.operation.Function'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
class ProxyFunction
java_implements Function
java_signature 'Function (String base_class_path, String real_class_name)'
def initialize(base_class_path, real_class_name)
@real = Object.module_eval(real_class_name).new
rescue NameError
require base_class_path
@real = Object.module_eval(real_class_name).new
end
java_signature 'void execute(TridentTuple, TridentCollector)'
def execute(_trident_tuple, _trident_collector)
@real.execute(_trident_tuple, _trident_collector)
end
java_signature 'void cleanup()'
def cleanup()
@real.cleanup()
end
java_signature 'void prepare(Map, TridentOperationContext)'
def prepare(_map, _trident_operation_context)
@real.prepare(_map, _trident_operation_context)
end
end

View File

@ -0,0 +1,87 @@
require 'java'
require 'red_storm/configuration'
require 'red_storm/configurator'
module RedStorm
class InputBoltDefinition < SimpleTopology::BoltDefinition
attr_accessor :grouping
def initialize(*args)
super
@grouping = :none
end
def grouping(grouping)
@grouping = @grouping
end
def define_grouping(declarer)
case @grouping
when :fields
declarer.fieldsGrouping(Fields.new(*([params].flatten.map(&:to_s))))
when :global
declarer.globalGrouping()
when :shuffle
declarer.shuffleGrouping()
when :local_or_shuffle
declarer.localOrShuffleGrouping()
when :none
declarer.noneGrouping()
when :all
declarer.allGrouping()
when :direct
declarer.directGrouping()
else
raise("unknown grouper=#{grouper.inspect}")
end
end
end
class SimpleDRPCTopology < SimpleTopology
def self.spout
raise TopologyDefinitionError, "DRPC spout is already defined"
end
def start(base_class_path, env)
builder = Java::BacktypeStormDrpc::LinearDRPCTopologyBuilder.new(self.class.topology_name)
self.class.bolts.each do |bolt|
declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end
# set the JRuby compatibility mode option for Storm workers, default to current JRuby mode
defaults = {"topology.worker.childopts" => "-Djruby.compat.version=#{RedStorm.jruby_mode_token}"}
configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)
drpc = nil
if env == :local
drpc = LocalDRPC.new
submitter = @cluster = LocalCluster.new
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createLocalTopology(drpc))
else
submitter = StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createRemoteTopology)
end
instance_exec(env, drpc, &self.class.submit_block)
end
def self.input_bolt(bolt_class, *args, &bolt_block)
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = InputBoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
bolt.instance_exec(&bolt_block)
self.components << bolt
end
end
end

View File

@ -36,7 +36,7 @@ module RedStorm
end
class SpoutDefinition < ComponentDefinition
# WARNING non-dry see BoltDefinition#new_instance
def new_instance(base_class_path)
if @clazz.name == "Java::RedstormStormJruby::JRubyShellSpout"
@ -49,7 +49,7 @@ module RedStorm
# is_java? ? @clazz.new : JRubySpout.new(base_class_path, @clazz.name)
end
end
class BoltDefinition < ComponentDefinition
attr_accessor :sources, :command
@ -119,7 +119,7 @@ module RedStorm
def self.bolt(bolt_class, *args, &bolt_block)
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt_options = {:id => options[:id] ? options[:id] : self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
@ -164,7 +164,7 @@ module RedStorm
configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)
submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology)
instance_exec(env, &self.class.submit_block)

View File

@ -6,18 +6,21 @@ module Backtype
end
java_import 'backtype.storm.LocalCluster'
java_import 'backtype.storm.LocalDRPC'
java_import 'backtype.storm.StormSubmitter'
java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.drpc.LinearDRPCTopologyBuilder'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'redstorm.storm.jruby.JRubyBolt'
java_import 'redstorm.storm.jruby.JRubySpout'
java_import 'redstorm.storm.jruby.JRubyBatchSpout'
java_package 'redstorm'
# TopologyLauncher is the application entry point when launching a topology. Basically it will
# TopologyLauncher is the application entry point when launching a topology. Basically it will
# call require on the specified Ruby topology class file path and call its start method
class TopologyLauncher
@ -36,14 +39,14 @@ class TopologyLauncher
$:.unshift File.expand_path(launch_path + '/lib')
$:.unshift File.expand_path(launch_path + '/target/lib')
require "#{class_path}"
require "#{class_path}"
topology_name = RedStorm::Configuration.topology_class.respond_to?(:topology_name) ? "/#{RedStorm::Configuration.topology_class.topology_name}" : ''
puts("RedStorm v#{RedStorm::VERSION} starting topology #{RedStorm::Configuration.topology_class.name}#{topology_name} in #{env.to_s} environment")
RedStorm::Configuration.topology_class.new.start(class_path, env)
end
private
private
def self.camel_case(s)
s.to_s.gsub(/\/(.?)/) { "::#{$1.upcase}" }.gsub(/(?:^|_)(.)/) { $1.upcase }

View File

@ -11,7 +11,7 @@ require 'jruby/jrubyc'
require 'red_storm'
require 'red_storm/application'
DEP_STORM_VERSION = "0.8.1"
DEP_STORM_VERSION = "0.8.2"
DEP_JRUBY_VERSION = "1.6.8"
INSTALL_IVY_VERSION = "2.2.0"
@ -27,7 +27,7 @@ DEFAULT_DEPENDENCIES = {
task :launch, :env, :ruby_mode, :class_file do |t, args|
# use ruby mode parameter or default to current interpreter version
version_token = RedStorm.jruby_mode_token(args[:ruby_mode])
command = case args[:env]
when "local"
RedStorm::Application.local_storm_command(args[:class_file], args[:ruby_mode])
@ -170,7 +170,7 @@ task :deps => "ivy:install" do
artifact, transitive = dependency.split(/\s*,\s*/)
ivy_retrieve(*artifact.split(':').concat([transitive.split(/\s*=\s*/).last, "#{TARGET_DEPENDENCY_DIR}/topology", "default"]))
end
end
end
task :jar, [:include_dir] => [:clean_jar] do |t, args|
puts("\n--> Generating JAR file #{TARGET_CLUSTER_JAR}")
@ -230,8 +230,8 @@ def build_java_dir(source_folder)
'listfiles' => true
) do
# compilerarg :value => "-Xlint:unchecked"
end
end
end
end
def build_jruby(source_path)
puts("\n--> Compiling JRuby")

30
ruby_proxy.erb Normal file
View File

@ -0,0 +1,30 @@
require 'java'
<% java_deps.each do |dep| %>
java_import '<%= dep %>'
<% end %>
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
class <%= ruby_class_name %>
java_implements <%= interface_name %>
java_signature '<%= interface_name %> (String base_class_path, String real_class_name)'
def initialize(base_class_path, real_class_name)
@real = Object.module_eval(real_class_name).new
rescue NameError
require base_class_path
@real = Object.module_eval(real_class_name).new
end
<% methods.each do |method_name, params| %>
java_signature '<%= params[:return_type] %> <%= method_name %>(<%= params[:args].values.join(', ') %>)'
def <%= method_name %>(<%= params[:args].keys.join(', ') %>)
@real.<%= method_name %>(<%= params[:args].keys.join(', ') %>)
end
<% end %>
end

View File

@ -0,0 +1,89 @@
package redstorm.storm.jruby;
import storm.trident.operation.TridentCollector;
import backtype.storm.task.TopologyContext;
import storm.trident.spout.IBatchSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import java.util.Map;
/**
* the JRubyBatchSpout class is a simple proxy class to the actual spout implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization process when dispatching
* the spout to the workers. JRuby does not yet support serialization from Java
* (Java serialization call on a JRuby class).
*
* Note that the JRuby spout proxy class is instanciated in the open method which is called after
* deserialization at the worker and in both the declareOutputFields and isDistributed methods which
* are called once before serialization at topology creation.
*/
public class JRubyBatchSpout implements IBatchSpout {
IBatchSpout _proxySpout;
String _realSpoutClassName;
String _baseClassPath;
String[] _fields;
/**
* create a new JRubyBatchSpout
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realSpoutClassName the fully qualified JRuby spout implementation class name
*/
public JRubyBatchSpout(String baseClassPath, String realSpoutClassName, String[] fields) {
_baseClassPath = baseClassPath;
_realSpoutClassName = realSpoutClassName;
_fields = fields;
}
@Override
public void open(final Map conf, final TopologyContext context) {
// create instance of the jruby proxy class here, after deserialization in the workers.
_proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName);
_proxySpout.open(conf, context);
}
@Override
public void emitBatch(final long batchId, final TridentCollector collector) {
_proxySpout.emitBatch(batchId, collector);
}
@Override
public void close() {
_proxySpout.close();
}
@Override
public void ack(final long batchId) {
_proxySpout.ack(batchId);
}
@Override
public Fields getOutputFields() {
// getOutputFields is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call declareOutputFields.
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.getOutputFields();
}
@Override
public Map<String, Object> getComponentConfiguration() {
// getComponentConfiguration is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call declareOutputFields.
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.getComponentConfiguration();
}
private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) {
try {
redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,59 @@
package redstorm.storm.jruby;
import storm.trident.tuple.TridentTuple;
import storm.trident.operation.TridentCollector;
import java.util.Map;
import storm.trident.operation.TridentOperationContext;
import storm.trident.operation.Function;
public class JRubyProxyFunction implements Function {
Function _proxy;
String _realClassName;
String _baseClassPath;
String[] _fields;
public JRubyProxyFunction(final String baseClassPath, final String realClassName, final String[] fields) {
_baseClassPath = baseClassPath;
_realClassName = realClassName;
_fields = fields;
}
@Override
public void execute(final TridentTuple _tridentTuple, final TridentCollector _tridentCollector) {
if(_proxy == null) {
_proxy = newProxy(_baseClassPath, _realClassName);
}
_proxy.execute(_tridentTuple, _tridentCollector);
}
@Override
public void cleanup() {
_proxy.cleanup();
}
@Override
public void prepare(final Map _map, final TridentOperationContext _tridentOperationContext) {
if(_proxy == null) {
_proxy = newProxy(_baseClassPath, _realClassName);
}
_proxy.prepare(_map, _tridentOperationContext);
}
private static Function newProxy(final String baseClassPath, final String realClassName) {
try {
redstorm.proxy.ProxyFunction proxy = new redstorm.proxy.ProxyFunction(baseClassPath, realClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}