Generator correctly handles Trident Function, created proxy class for Trident Functions
This commit is contained in:
parent
af0948a46e
commit
5eb7161841
10
generator.rb
10
generator.rb
|
@ -31,17 +31,21 @@ to_generate.each do |klass|
|
|||
|
||||
# Boil down functions to {:function_name => {:return_type => type, :args => {:arg_var_name => :arg_var_type, ...} } }
|
||||
functions = _functions.reduce({}) do |memo, f|
|
||||
before_serialization = %w{ }.include?(f.name.to_s)
|
||||
memoize = %w{ prepare execute }.include?(f.name.to_s)
|
||||
memo[:"#{f.name}"] = {
|
||||
:return_type => f.return_type ? f.return_type.name.split('.')[-1] : "void",
|
||||
:args => f.argument_types.map {|at| {:"_#{at.name.split('.')[-1].camelize(:lower)}" => at.name.split('.')[-1]} }.reduce({}){|m,o| m.merge(o)}
|
||||
:args => f.argument_types.map {|at| {:"_#{at.name.split('.')[-1].camelize(:lower)}" => at.name.split('.')[-1]} }.reduce({}){|m,o| m.merge(o)},
|
||||
:before_serialization => before_serialization,
|
||||
:memoize => memoize
|
||||
}
|
||||
memo
|
||||
end
|
||||
|
||||
interface_name = klass.split(".")[-1]
|
||||
|
||||
# IBlah to Blah if IBlah
|
||||
ruby_class_name = interface_name.starts_with?('I') ? interface_name[1..-1] : interface_name
|
||||
# IBlah to ProxyBlah if IBlah
|
||||
ruby_class_name = "Proxy#{interface_name.starts_with?('I') ? interface_name[1..-1] : interface_name}"
|
||||
|
||||
java_class_name = "JRuby#{ruby_class_name}"
|
||||
|
||||
|
|
|
@ -24,19 +24,20 @@ public class <%= java_class_name %> implements <%= interface_name %> {
|
|||
if (_fields.length > 0) {
|
||||
<%= params[:args].values[0] %>.declare(new Fields(_fields));
|
||||
} else {
|
||||
newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:function_args_and_types].values.join(', ') %>);
|
||||
newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
|
||||
}
|
||||
<% elsif params[:before_serialization] %>
|
||||
newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:function_args_and_types].values.join(', ') %>);
|
||||
newProxy(_baseClassPath, _realClassName).<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
|
||||
<% elsif params[:memoize] %>
|
||||
if(_proxy == null) {
|
||||
_proxy = newProxy(_baseClassPath, _realClassName);
|
||||
}
|
||||
_proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
|
||||
<% else %>
|
||||
_proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>)
|
||||
_proxy.<%= function_name %>(<%= params[:args].keys.flatten.join(', ') %>);
|
||||
<% end %>
|
||||
}
|
||||
<% end %>
|
||||
@Override
|
||||
public Map<String, Object> getComponentConfiguration() {
|
||||
newProxy(_baseClassPath, _realClassName).getComponentConfiguration();
|
||||
}
|
||||
|
||||
private static <%= interface_name %> newProxy(final String baseClassPath, final String realClassName) {
|
||||
try {
|
||||
|
|
|
@ -1,63 +1,71 @@
|
|||
require 'java'
|
||||
|
||||
|
||||
java_import 'java.util.Map'
|
||||
|
||||
java_import 'backtype.storm.task.TopologyContext'
|
||||
|
||||
java_import 'storm.trident.operation.TridentCollector'
|
||||
|
||||
java_import 'backtype.storm.tuple.Fields'
|
||||
|
||||
java_import 'backtype.storm.task.TopologyContext'
|
||||
java_import 'storm.trident.spout.IBatchSpout'
|
||||
|
||||
|
||||
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
|
||||
java_import 'backtype.storm.tuple.Tuple'
|
||||
java_import 'backtype.storm.tuple.Fields'
|
||||
java_import 'backtype.storm.tuple.Values'
|
||||
java_import 'java.util.Map'
|
||||
module Backtype
|
||||
java_import 'backtype.storm.Config'
|
||||
end
|
||||
|
||||
java_package 'redstorm.proxy'
|
||||
|
||||
# the BatchSpout class is a proxy to the real batch spout to avoid having to deal with all the
|
||||
# Java artifacts when creating a spout.
|
||||
#
|
||||
# The real batch spout class implementation must define these methods:
|
||||
# - open(conf, context, collector)
|
||||
# - emitBatch
|
||||
# - getOutputFields
|
||||
# - ack(batch_id)
|
||||
#
|
||||
# and optionnaly:
|
||||
# - close
|
||||
#
|
||||
|
||||
class BatchSpout
|
||||
java_implements IBatchSpout
|
||||
|
||||
java_signature 'IBatchSpout (String base_class_path, String real_class_name)'
|
||||
def initialize(base_class_path, real_class_name)
|
||||
@real = Object.module_eval(real_class_name).new
|
||||
java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)'
|
||||
def initialize(base_class_path, real_spout_class_name)
|
||||
@real_spout = Object.module_eval(real_spout_class_name).new
|
||||
rescue NameError
|
||||
require base_class_path
|
||||
@real = Object.module_eval(real_class_name).new
|
||||
@real_spout = Object.module_eval(real_spout_class_name).new
|
||||
end
|
||||
|
||||
java_signature 'void open(Map, TopologyContext)'
|
||||
def open(_map, _topology_context)
|
||||
@real.open(Map, TopologyContext)
|
||||
def open(conf, context)
|
||||
@real_spout.open(conf, context)
|
||||
end
|
||||
|
||||
java_signature 'void close()'
|
||||
def close()
|
||||
@real.close()
|
||||
def close
|
||||
@real_spout.close if @real_spout.respond_to?(:close)
|
||||
end
|
||||
|
||||
java_signature 'void emitBatch(long, TridentCollector)'
|
||||
def emitBatch(batch_id, collector)
|
||||
@real_spout.emit_batch(batch_id, collector)
|
||||
end
|
||||
|
||||
java_signature 'void ack(long)'
|
||||
def ack(_long)
|
||||
@real.ack(long)
|
||||
def ack(batch_id)
|
||||
@real_spout.ack(batch_id)
|
||||
end
|
||||
|
||||
java_signature 'void emit_batch(long, TridentCollector)'
|
||||
def emit_batch(_long, _trident_collector)
|
||||
@real.emit_batch(long, TridentCollector)
|
||||
java_signature 'Fields getOutputFields()'
|
||||
def getOutputFields
|
||||
@real_spout.get_output_fields()
|
||||
end
|
||||
|
||||
java_signature 'Map get_component_configuration()'
|
||||
def get_component_configuration()
|
||||
@real.get_component_configuration()
|
||||
java_signature 'Map<String, Object> getComponentConfiguration()'
|
||||
def getComponentConfiguration
|
||||
@real_spout.get_component_configuration
|
||||
end
|
||||
|
||||
java_signature 'Fields get_output_fields()'
|
||||
def get_output_fields()
|
||||
@real.get_output_fields()
|
||||
end
|
||||
|
||||
|
||||
end
|
||||
|
|
|
@ -0,0 +1,48 @@
|
|||
require 'java'
|
||||
|
||||
|
||||
java_import 'storm.trident.tuple.TridentTuple'
|
||||
|
||||
java_import 'storm.trident.operation.TridentCollector'
|
||||
|
||||
java_import 'java.util.Map'
|
||||
|
||||
java_import 'storm.trident.operation.TridentOperationContext'
|
||||
|
||||
java_import 'storm.trident.operation.Function'
|
||||
|
||||
|
||||
module Backtype
|
||||
java_import 'backtype.storm.Config'
|
||||
end
|
||||
|
||||
java_package 'redstorm.proxy'
|
||||
|
||||
class ProxyFunction
|
||||
java_implements Function
|
||||
|
||||
java_signature 'Function (String base_class_path, String real_class_name)'
|
||||
def initialize(base_class_path, real_class_name)
|
||||
@real = Object.module_eval(real_class_name).new
|
||||
rescue NameError
|
||||
require base_class_path
|
||||
@real = Object.module_eval(real_class_name).new
|
||||
end
|
||||
|
||||
java_signature 'void execute(TridentTuple, TridentCollector)'
|
||||
def execute(_trident_tuple, _trident_collector)
|
||||
@real.execute(_trident_tuple, _trident_collector)
|
||||
end
|
||||
|
||||
java_signature 'void cleanup()'
|
||||
def cleanup()
|
||||
@real.cleanup()
|
||||
end
|
||||
|
||||
java_signature 'void prepare(Map, TridentOperationContext)'
|
||||
def prepare(_map, _trident_operation_context)
|
||||
@real.prepare(_map, _trident_operation_context)
|
||||
end
|
||||
|
||||
|
||||
end
|
|
@ -1,75 +1,85 @@
|
|||
package redstorm.storm.jruby;
|
||||
|
||||
import java.util.Map;
|
||||
import backtype.storm.task.TopologyContext;
|
||||
import storm.trident.operation.TridentCollector;
|
||||
import backtype.storm.tuple.Fields;
|
||||
import backtype.storm.task.TopologyContext;
|
||||
import storm.trident.spout.IBatchSpout;
|
||||
import backtype.storm.topology.OutputFieldsDeclarer;
|
||||
import backtype.storm.tuple.Tuple;
|
||||
import backtype.storm.tuple.Fields;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* the JRubyBatchSpout class is a simple proxy class to the actual spout implementation in JRuby.
|
||||
* this proxy is required to bypass the serialization/deserialization process when dispatching
|
||||
* the spout to the workers. JRuby does not yet support serialization from Java
|
||||
* (Java serialization call on a JRuby class).
|
||||
*
|
||||
* Note that the JRuby spout proxy class is instanciated in the open method which is called after
|
||||
* deserialization at the worker and in both the declareOutputFields and isDistributed methods which
|
||||
* are called once before serialization at topology creation.
|
||||
*/
|
||||
public class JRubyBatchSpout implements IBatchSpout {
|
||||
IBatchSpout _proxy;
|
||||
String _realClassName;
|
||||
IBatchSpout _proxySpout;
|
||||
String _realSpoutClassName;
|
||||
String _baseClassPath;
|
||||
String[] _fields;
|
||||
|
||||
public JRubyBatchSpout(final String baseClassPath, final String realClassName, final String[] fields) {
|
||||
/**
|
||||
* create a new JRubyBatchSpout
|
||||
*
|
||||
* @param baseClassPath the topology/project base JRuby class file path
|
||||
* @param realSpoutClassName the fully qualified JRuby spout implementation class name
|
||||
*/
|
||||
public JRubyBatchSpout(String baseClassPath, String realSpoutClassName, String[] fields) {
|
||||
_baseClassPath = baseClassPath;
|
||||
_realClassName = realClassName;
|
||||
_realSpoutClassName = realSpoutClassName;
|
||||
_fields = fields;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(final Map conf, final TopologyContext context) {
|
||||
// create instance of the jruby proxy class here, after deserialization in the workers.
|
||||
_proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName);
|
||||
_proxySpout.open(conf, context);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void open(final Map _map, final TopologyContext _topologyContext) {
|
||||
|
||||
_proxy = newProxy(_baseClassPath, _realClassName);
|
||||
_proxy.open(_map, _topologyContext);
|
||||
|
||||
public void emitBatch(final long batchId, final TridentCollector collector) {
|
||||
_proxySpout.emitBatch(batchId, collector);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
|
||||
_proxy.close()
|
||||
|
||||
_proxySpout.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void ack(final long _long) {
|
||||
|
||||
_proxy.ack(_long)
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void emitBatch(final long _long, final TridentCollector _tridentCollector) {
|
||||
|
||||
_proxy.emitBatch(_long, _tridentCollector)
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map getComponentConfiguration() {
|
||||
|
||||
_proxy.getComponentConfiguration()
|
||||
|
||||
public void ack(final long batchId) {
|
||||
_proxySpout.ack(batchId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Fields getOutputFields() {
|
||||
|
||||
_proxy.getOutputFields()
|
||||
|
||||
// getOutputFields is executed in the topology creation time before serialisation.
|
||||
// do not set the _proxySpout instance variable here to avoid JRuby serialization
|
||||
// issues. Just create tmp spout instance to call declareOutputFields.
|
||||
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
|
||||
return spout.getOutputFields();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getComponentConfiguration() {
|
||||
newProxy(_baseClassPath, _realClassName).getComponentConfiguration();
|
||||
// getComponentConfiguration is executed in the topology creation time before serialisation.
|
||||
// do not set the _proxySpout instance variable here to avoid JRuby serialization
|
||||
// issues. Just create tmp spout instance to call declareOutputFields.
|
||||
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
|
||||
return spout.getComponentConfiguration();
|
||||
}
|
||||
|
||||
private static IBatchSpout newProxy(String baseClassPath, String realClassName) {
|
||||
private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) {
|
||||
try {
|
||||
redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realClassName);
|
||||
redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName);
|
||||
return proxy;
|
||||
}
|
||||
catch (Exception e) {
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
package redstorm.storm.jruby;
|
||||
|
||||
import storm.trident.tuple.TridentTuple;
|
||||
import storm.trident.operation.TridentCollector;
|
||||
import java.util.Map;
|
||||
import storm.trident.operation.TridentOperationContext;
|
||||
import storm.trident.operation.Function;
|
||||
|
||||
public class JRubyProxyFunction implements Function {
|
||||
Function _proxy;
|
||||
String _realClassName;
|
||||
String _baseClassPath;
|
||||
String[] _fields;
|
||||
|
||||
public JRubyProxyFunction(final String baseClassPath, final String realClassName, final String[] fields) {
|
||||
_baseClassPath = baseClassPath;
|
||||
_realClassName = realClassName;
|
||||
_fields = fields;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void execute(final TridentTuple _tridentTuple, final TridentCollector _tridentCollector) {
|
||||
|
||||
if(_proxy == null) {
|
||||
_proxy = newProxy(_baseClassPath, _realClassName);
|
||||
}
|
||||
_proxy.execute(_tridentTuple, _tridentCollector);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cleanup() {
|
||||
|
||||
_proxy.cleanup();
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare(final Map _map, final TridentOperationContext _tridentOperationContext) {
|
||||
|
||||
if(_proxy == null) {
|
||||
_proxy = newProxy(_baseClassPath, _realClassName);
|
||||
}
|
||||
_proxy.prepare(_map, _tridentOperationContext);
|
||||
|
||||
}
|
||||
|
||||
|
||||
private static Function newProxy(final String baseClassPath, final String realClassName) {
|
||||
try {
|
||||
redstorm.proxy.ProxyFunction proxy = new redstorm.proxy.ProxyFunction(baseClassPath, realClassName);
|
||||
return proxy;
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue