removed generator, using already merged batch spout support, cosmetics

This commit is contained in:
Colin Surprenant 2013-05-13 09:41:09 -04:00
parent cff9c5bc5a
commit 096e37411b
10 changed files with 57 additions and 256 deletions

View File

@ -1,59 +0,0 @@
require 'erb'
require 'pry'
require 'java'
require 'active_support/core_ext'
Dir["../triggit-storm/target/dependency/storm/default/*"].each{|f| $CLASSPATH << File.expand_path(f) }
PROXY_JRUBY_TEMPLATE = File.read("./ruby_proxy.erb")
PROXY_JAVA_TEMPLATE = File.read("./java_proxy.erb")
to_generate = ["storm.trident.operation.Function"]
# Return all java functions of a java class
def get_functions(jlass)
jlass.declared_instance_methods.concat( jlass.interfaces.map{|i| get_functions(i) }.flatten )
end
# Return all java deps of a class
def get_java_deps(functions, klass)
functions.map{|f| [f.argument_types.map{|at| at.name}, f.return_type ? f.return_type.name : "void"] }.flatten.uniq.reject{|t| t.split('.').count == 1} << klass
end
to_generate.each do |klass|
_functions = get_functions(Object.const_get(java_import(klass)[0].to_s.split("::")[-1]).java_class)
java_deps = get_java_deps(_functions, klass)
# Boil down functions to {:function_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } }
functions = _functions.reduce({}) do |memo, f|
before_serialization = %w{ }.include?(f.name.to_s)
memoize = %w{ prepare execute }.include?(f.name.to_s)
memo[:"#{f.name}"] = {
:return_type => f.return_type ? f.return_type.name.split('.')[-1] : "void",
:args => f.argument_types.map {|at| {:"_#{at.name.split('.')[-1].camelize(:lower)}" => at.name.split('.')[-1]} }.reduce({}){|m,o| m.merge(o)},
:before_serialization => before_serialization,
:memoize => memoize
}
memo
end
interface_name = klass.split(".")[-1]
# IBlah to ProxyBlah if IBlah
ruby_class_name = "Proxy#{interface_name.starts_with?('I') ? interface_name[1..-1] : interface_name}"
java_class_name = "JRuby#{ruby_class_name}"
# Rubyify java functions into {:method_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } }
methods = functions.map do |f_name, params|
{f_name.to_s.underscore.to_sym => {:return_type => params[:return_type], :args => params[:args].map{|name, type| {name.to_s.underscore.to_sym => type}}.reduce({}){|m,o| m.merge(o)} }}
end.reduce({}){|m,o| m.merge(o)}
File.open("./lib/red_storm/proxy/#{ruby_class_name.underscore}.rb", 'w') {|f| f.write(ERB.new(PROXY_JRUBY_TEMPLATE).result(binding)) }
File.open("./src/main/redstorm/storm/jruby/#{java_class_name}.java", 'w') {|f| f.write(ERB.new(PROXY_JAVA_TEMPLATE).result(binding)) }
end

View File

@ -1,51 +0,0 @@
package redstorm.storm.jruby;
<% java_deps.each do |dep| %>
import <%= dep %>;<% end %>
public class <%= java_class_name %> implements <%= interface_name %> {
<%= interface_name %> _proxy;
String _realClassName;
String _baseClassPath;
String[] _fields;
public <%= java_class_name %>(final String baseClassPath, final String realClassName, final String[] fields) {
_baseClassPath = baseClassPath;
_realClassName = realClassName;
_fields = fields;
}
<% functions.each do |function_name, params| %>
@Override
public <%= params[:return_type] %> <%= function_name %>(<%= params[:args].map{|n,t| ["final #{t}", n].join(' ') }.flatten.join(', ') %>) {
<% if function_name == :open %>
_proxy = newProxy(_baseClassPath, _realClassName);
_proxy.open(<%= params[:args].keys.flatten.join(', ') %>);
<% elsif function_name == :declareOutputFields %>
if (_fields.length > 0) {
<%= params[:args].values[0] %>.declare(new Fields(_fields));
} else {
newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
}
<% elsif params[:before_serialization] %>
newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
<% elsif params[:memoize] %>
if(_proxy == null) {
_proxy = newProxy(_baseClassPath, _realClassName);
}
_proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
<% else %>
_proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
<% end %>
}
<% end %>
private static <%= interface_name %> newProxy(final String baseClassPath, final String realClassName) {
try {
redstorm.proxy.<%= ruby_class_name %> proxy = new redstorm.proxy.<%= ruby_class_name %>(baseClassPath, realClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -20,15 +20,17 @@ DST_EXAMPLES = "#{CWD}/examples"
SRC_IVY_DIR = "#{RedStorm::REDSTORM_HOME}/ivy"
DST_IVY_DIR = "#{CWD}/ivy"
CUSTOM_DEPENDENCIES = "#{CWD}/Dependencies"
DEFAULT_IVY_SETTINGS = "#{SRC_IVY_DIR}/settings.xml"
CUSTOM_IVY_SETTINGS = "#{DST_IVY_DIR}/settings.xml"
DEFAULT_IVY_STORM_DEPENDENCIES = "#{SRC_IVY_DIR}/storm_dependencies.xml"
CUSTOM_IVY_STORM_DEPENDENCIES = "#{DST_IVY_DIR}/storm_dependencies.xml"
DEFAULT_IVY_TOPOLOGY_DEPENDENCIES = "#{SRC_IVY_DIR}/topology_dependencies.xml"
CUSTOM_IVY_TOPOLOGY_DEPENDENCIES = "#{DST_IVY_DIR}/topology_dependencies.xml"
module RedStorm
class Application
TASKS_FILE = "#{RedStorm::REDSTORM_HOME}/lib/tasks/red_storm.rake"
class Application
TASKS_FILE = "#{RedStorm::REDSTORM_HOME}/lib/tasks/red_storm.rake"
def self.local_storm_command(class_file, ruby_mode = nil)
src_dir = File.expand_path(File.dirname(class_file))

View File

@ -1,31 +1,19 @@
require 'java'
java_import 'storm.trident.operation.TridentCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'storm.trident.operation.TridentCollector'
java_import 'storm.trident.spout.IBatchSpout'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
# the BatchSpout class is a proxy to the real batch spout to avoid having to deal with all the
# the Spout class is a proxy to the real spout to avoid having to deal with all the
# Java artifacts when creating a spout.
#
# The real batch spout class implementation must define these methods:
# - open(conf, context, collector)
# - emitBatch
# - getOutputFields
# - ack(batch_id)
#
# and optionnaly:
# - close
#
class BatchSpout
java_implements IBatchSpout
@ -40,7 +28,12 @@ class BatchSpout
java_signature 'void open(Map, TopologyContext)'
def open(conf, context)
@real_spout.open(conf, context)
@real_spout.open(conf, context) if @real_spout.respond_to?(:open)
end
java_signature 'void emitBatch(long, TridentCollector)'
def emitBatch(batch_id, collector)
@real_spout.emit_batch(batch_id, collector)
end
java_signature 'void close()'
@ -48,19 +41,14 @@ class BatchSpout
@real_spout.close if @real_spout.respond_to?(:close)
end
java_signature 'void emitBatch(long, TridentCollector)'
def emitBatch(batch_id, collector)
@real_spout.emit_batch(batch_id, collector)
end
java_signature 'void ack(long)'
def ack(batch_id)
@real_spout.ack(batch_id)
@real_spout.ack(batch_id) if @real_spout.respond_to?(:ack)
end
java_signature 'Fields getOutputFields()'
def getOutputFields
@real_spout.get_output_fields()
def getOutputFields()
@real_spout.get_output_fields
end
java_signature 'Map<String, Object> getComponentConfiguration()'

View File

@ -1,16 +1,10 @@
require 'java'
java_import 'storm.trident.tuple.TridentTuple'
java_import 'storm.trident.operation.TridentCollector'
java_import 'java.util.Map'
java_import 'storm.trident.operation.TridentOperationContext'
java_import 'storm.trident.operation.Function'
java_import 'java.util.Map'
module Backtype
java_import 'backtype.storm.Config'
@ -43,6 +37,4 @@ class ProxyFunction
def prepare(_map, _trident_operation_context)
@real.prepare(_map, _trident_operation_context)
end
end

View File

@ -119,7 +119,7 @@ module RedStorm
def self.bolt(bolt_class, *args, &bolt_block)
options = args.last.is_a?(Hash) ? args.pop : {}
contructor_args = !args.empty? ? args.pop : []
bolt_options = {:id => options[:id] ? options[:id] : self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt_options = {:id => self.underscore(bolt_class), :parallelism => DEFAULT_BOLT_PARALLELISM}.merge(options)
bolt = BoltDefinition.new(bolt_class, contructor_args, bolt_options[:id], bolt_options[:parallelism])
raise(TopologyDefinitionError, "#{bolt.clazz.name}, #{bolt.id}, bolt definition body required") unless block_given?
@ -164,7 +164,7 @@ module RedStorm
configurator = Configurator.new(defaults)
configurator.instance_exec(env, &self.class.configure_block)
submitter = (env == :local) ? @cluster = LocalCluster.new : StormSubmitter
submitter.submitTopology(self.class.topology_name, configurator.config, builder.createTopology)
instance_exec(env, &self.class.submit_block)

View File

@ -12,8 +12,8 @@ require 'red_storm'
require 'red_storm/application'
DEP_STORM_VERSION = "0.8.2"
DEP_JRUBY_VERSION = "1.6.8"
INSTALL_IVY_VERSION = "2.2.0"
DEP_JRUBY_VERSION = "1.7.3"
INSTALL_IVY_VERSION = "2.3.0"
DEFAULT_DEPENDENCIES = {
:storm_artifacts => [
@ -27,7 +27,7 @@ DEFAULT_DEPENDENCIES = {
task :launch, :env, :ruby_mode, :class_file do |t, args|
# use ruby mode parameter or default to current interpreter version
version_token = RedStorm.jruby_mode_token(args[:ruby_mode])
command = case args[:env]
when "local"
RedStorm::Application.local_storm_command(args[:class_file], args[:ruby_mode])
@ -158,18 +158,13 @@ end
task :deps => "ivy:install" do
puts("\n--> Installing dependencies")
dependencies = File.exists?(CUSTOM_DEPENDENCIES) ? eval(File.read(CUSTOM_DEPENDENCIES)) : DEFAULT_DEPENDENCIES
ant.configure 'file' => File.exists?(CUSTOM_IVY_SETTINGS) ? CUSTOM_IVY_SETTINGS : DEFAULT_IVY_SETTINGS
dependencies[:storm_artifacts].each do |dependency|
artifact, transitive = dependency.split(/\s*,\s*/)
ivy_retrieve(*artifact.split(':').concat([transitive.split(/\s*=\s*/).last, "#{TARGET_DEPENDENCY_DIR}/storm", "default"]))
end
ant.resolve 'file' => File.exists?(CUSTOM_IVY_STORM_DEPENDENCIES) ? CUSTOM_IVY_STORM_DEPENDENCIES : DEFAULT_IVY_STORM_DEPENDENCIES
ant.retrieve 'pattern' => "#{TARGET_DEPENDENCY_DIR}/storm/[conf]/[artifact]-[revision].[ext]", 'sync' => "true"
dependencies[:topology_artifacts].each do |dependency|
artifact, transitive = dependency.split(/\s*,\s*/)
ivy_retrieve(*artifact.split(':').concat([transitive.split(/\s*=\s*/).last, "#{TARGET_DEPENDENCY_DIR}/topology", "default"]))
end
ant.resolve 'file' => File.exists?(CUSTOM_IVY_TOPOLOGY_DEPENDENCIES) ? CUSTOM_IVY_TOPOLOGY_DEPENDENCIES : DEFAULT_IVY_TOPOLOGY_DEPENDENCIES
ant.retrieve 'pattern' => "#{TARGET_DEPENDENCY_DIR}/topology/[conf]/[artifact]-[revision].[ext]", 'sync' => "true"
end
task :jar, [:include_dir] => [:clean_jar] do |t, args|
@ -230,8 +225,8 @@ def build_java_dir(source_folder)
'listfiles' => true
) do
# compilerarg :value => "-Xlint:unchecked"
end
end
end
end
def build_jruby(source_path)
puts("\n--> Compiling JRuby")
@ -248,30 +243,3 @@ def build_jruby(source_path)
status = JRuby::Compiler::compile_argv(argv)
end
end
def truefalse(s)
return true if s.to_s.downcase =~ /1|yes|true/
return false if s.to_s.downcase =~ /0|no|false/
nil
end
def ivy_retrieve(org, mod, rev, transitive, dir, conf)
ant.resolve({
'organisation' => org,
'module' => mod,
'revision' => rev,
'inline' => true,
'transitive' => truefalse(transitive),
'conf' => conf,
})
ant.retrieve({
'organisation' => org,
'module' => mod,
'revision' => rev,
'pattern' => "#{dir}/[conf]/[artifact]-[revision].[ext]",
'inline' => true,
'transitive' => truefalse(transitive),
'conf' => conf,
})
end

View File

@ -1,30 +0,0 @@
require 'java'
<% java_deps.each do |dep| %>
java_import '<%= dep %>'
<% end %>
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
class <%= ruby_class_name %>
java_implements <%= interface_name %>
java_signature '<%= interface_name %> (String base_class_path, String real_class_name)'
def initialize(base_class_path, real_class_name)
@real = Object.module_eval(real_class_name).new
rescue NameError
require base_class_path
@real = Object.module_eval(real_class_name).new
end
<% methods.each do |method_name, params| %>
java_signature '<%= params[:return_type] %> <%= method_name %>(<%= params[:args].values.join(', ') %>)'
def <%= method_name %>(<%= params[:args].keys.join(', ') %>)
@real.<%= method_name %>(<%= params[:args].keys.join(', ') %>)
end
<% end %>
end

View File

@ -1,39 +1,35 @@
package redstorm.storm.jruby;
import storm.trident.operation.TridentCollector;
import backtype.storm.task.TopologyContext;
import storm.trident.spout.IBatchSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.task.TopologyContext;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;
import java.util.Map;
/**
* the JRubyBatchSpout class is a simple proxy class to the actual spout implementation in JRuby.
* the JRubySpout class is a simple proxy class to the actual spout implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization process when dispatching
* the spout to the workers. JRuby does not yet support serialization from Java
* (Java serialization call on a JRuby class).
* (Java serialization call on a JRuby class).
*
* Note that the JRuby spout proxy class is instanciated in the open method which is called after
* deserialization at the worker and in both the declareOutputFields and isDistributed methods which
* are called once before serialization at topology creation.
* Note that the JRuby spout proxy class is instanciated in the open method which is called after
* deserialization at the worker and in both the declareOutputFields and isDistributed methods which
* are called once before serialization at topology creation.
*/
public class JRubyBatchSpout implements IBatchSpout {
IBatchSpout _proxySpout;
String _realSpoutClassName;
String _baseClassPath;
String[] _fields;
/**
* create a new JRubyBatchSpout
*
* @param baseClassPath the topology/project base JRuby class file path
* create a new JRubySpout
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realSpoutClassName the fully qualified JRuby spout implementation class name
*/
public JRubyBatchSpout(String baseClassPath, String realSpoutClassName, String[] fields) {
public JRubyBatchSpout(String baseClassPath, String realSpoutClassName) {
_baseClassPath = baseClassPath;
_realSpoutClassName = realSpoutClassName;
_fields = fields;
}
@Override
@ -44,10 +40,9 @@ public class JRubyBatchSpout implements IBatchSpout {
}
@Override
public void emitBatch(final long batchId, final TridentCollector collector) {
public void emitBatch(long batchId, TridentCollector collector) {
_proxySpout.emitBatch(batchId, collector);
}
}
@Override
public void close() {
@ -55,17 +50,21 @@ public class JRubyBatchSpout implements IBatchSpout {
}
@Override
public void ack(final long batchId) {
public void ack(long batchId) {
_proxySpout.ack(batchId);
}
@Override
public Fields getOutputFields() {
// getOutputFields is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call declareOutputFields.
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.getOutputFields();
if (_proxySpout == null) {
// getOutputFields is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call getOutputFields.
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.getOutputFields();
} else {
return _proxySpout.getOutputFields();
}
}
@Override
@ -76,7 +75,7 @@ public class JRubyBatchSpout implements IBatchSpout {
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.getComponentConfiguration();
}
private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) {
try {
redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName);

View File

@ -10,40 +10,32 @@ public class JRubyProxyFunction implements Function {
Function _proxy;
String _realClassName;
String _baseClassPath;
String[] _fields;
public JRubyProxyFunction(final String baseClassPath, final String realClassName, final String[] fields) {
public JRubyProxyFunction(final String baseClassPath, final String realClassName) {
_baseClassPath = baseClassPath;
_realClassName = realClassName;
_fields = fields;
}
@Override
public void execute(final TridentTuple _tridentTuple, final TridentCollector _tridentCollector) {
if(_proxy == null) {
_proxy = newProxy(_baseClassPath, _realClassName);
}
_proxy.execute(_tridentTuple, _tridentCollector);
}
@Override
public void cleanup() {
_proxy.cleanup();
}
@Override
public void prepare(final Map _map, final TridentOperationContext _tridentOperationContext) {
if(_proxy == null) {
_proxy = newProxy(_baseClassPath, _realClassName);
}
_proxy.prepare(_map, _tridentOperationContext);
}