removed ruby bolt proxy class, JRubyBolt uses JRuby API directly, output collector optimization

This commit is contained in:
Colin Surprenant 2013-07-29 15:26:45 -04:00
parent 5147578e46
commit 498b16b2fe
5 changed files with 80 additions and 37 deletions

View File

@ -3,8 +3,10 @@ require 'rubygems'
require 'red_storm/version'
require 'red_storm/environment'
require 'red_storm/configuration'
require 'red_storm/configurator'
require 'red_storm/dsl/bolt'
require 'red_storm/dsl/spout'
require 'red_storm/dsl/topology'
require 'red_storm/dsl/drpc_topology'
require 'red_storm/dsl/tuple'
require 'red_storm/dsl/output_collector'

View File

@ -1,3 +1,12 @@
# This hack get rif of the "Use RbConfig instead of obsolete and deprecated Config"
# deprecation warning that is triggered by "java_import 'backtype.storm.Config'".
Object.send :remove_const, :Config
Config = RbConfig
module Backtype
java_import 'backtype.storm.Config'
end
module RedStorm
class Configurator

View File

@ -3,6 +3,9 @@ require 'red_storm/configurator'
require 'red_storm/environment'
require 'pathname'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Values'
module RedStorm
module DSL
@ -54,11 +57,11 @@ module RedStorm
end
def unanchored_emit(*values)
@collector.emit(Values.new(*values))
@collector.emit_tuple(Values.new(*values))
end
def anchored_emit(tuple, *values)
@collector.emit(tuple, Values.new(*values))
@collector.emit_anchor_tuple(tuple, Values.new(*values))
end
def ack(tuple)

View File

@ -0,0 +1,8 @@
require 'java'
java_import 'backtype.storm.task.OutputCollector'
# make alias methods to specific signatures to avoid selection overhead for heavily overloaded method
class OutputCollector
java_alias :emit_tuple, :emit, [java.lang.Class.for_name("java.util.List")]
java_alias :emit_anchor_tuple, :emit, [java.lang.Class.for_name("backtype.storm.tuple.Tuple"), java.lang.Class.for_name("java.util.List")]
end

View File

@ -8,81 +8,102 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import java.util.Map;
import org.jruby.Ruby;
import org.jruby.RubyObject;
import org.jruby.runtime.Helpers;
import org.jruby.runtime.builtin.IRubyObject;
import org.jruby.javasupport.JavaUtil;
import org.jruby.RubyModule;
import org.jruby.exceptions.RaiseException;
/**
* the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization process when dispatching
* the bolts to the workers. JRuby does not yet support serialization from Java
* (Java serialization call on a JRuby class).
* the JRubyBolt class is a proxy class to the actual bolt implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization issues of JRuby objects.
* JRuby classes are not support Java serialization.
*
* Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after
* deserialization at the worker and in the declareOutputFields method which is called once before
* serialization at topology creation.
* Note that the JRuby bolt class is instanciated in the prepare method which is called after
* deserialization at the worker and in the declareOutputFields & getComponentConfiguration
* methods which are called once before serialization at topology creation.
*/
public class JRubyBolt implements IRichBolt {
IRichBolt _proxyBolt;
String _realBoltClassName;
String _baseClassPath;
String[] _fields;
private final String _realBoltClassName;
private final String[] _fields;
private final String _bootstrap;
// transient to avoid serialization
private transient IRubyObject _ruby_bolt;
private transient Ruby __ruby__;
/**
* create a new JRubyBolt
*
* @param baseClassPath the topology/project base JRuby class file path
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realBoltClassName the fully qualified JRuby bolt implementation class name
* @param fields the output fields names
*/
public JRubyBolt(String baseClassPath, String realBoltClassName, String[] fields) {
_baseClassPath = baseClassPath;
_realBoltClassName = realBoltClassName;
_fields = fields;
_bootstrap = "require '" + baseClassPath + "'";
}
@Override
public void prepare(final Map stormConf, final TopologyContext context, final OutputCollector collector) {
// create instance of the jruby class here, after deserialization in the workers.
_proxyBolt = newProxyBolt(_baseClassPath, _realBoltClassName);
_proxyBolt.prepare(stormConf, context, collector);
public void prepare(final Map conf, final TopologyContext context, final OutputCollector collector) {
_ruby_bolt = initialize_ruby_bolt();
IRubyObject ruby_conf = JavaUtil.convertJavaToRuby(__ruby__, conf);
IRubyObject ruby_context = JavaUtil.convertJavaToRuby(__ruby__, context);
IRubyObject ruby_collector = JavaUtil.convertJavaToRuby(__ruby__, collector);
Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "prepare", ruby_conf, ruby_context, ruby_collector);
}
@Override
public void execute(Tuple input) {
_proxyBolt.execute(input);
IRubyObject ruby_input = JavaUtil.convertJavaToRuby(__ruby__, input);
Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "execute", ruby_input);
}
@Override
public void cleanup() {
_proxyBolt.cleanup();
Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "cleanup");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declareOutputFields is executed in the topology creation time, before serialisation.
// do not set the _proxyBolt instance variable here to avoid JRuby serialization
// issues. Just create tmp bolt instance to call declareOutputFields.
// just create tmp bolt instance to call declareOutputFields.
if (_fields.length > 0) {
declarer.declare(new Fields(_fields));
} else {
IRichBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName);
bolt.declareOutputFields(declarer);
IRubyObject ruby_bolt = initialize_ruby_bolt();
IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer);
Helpers.invoke(__ruby__.getCurrentContext(), ruby_bolt, "declare_output_fields", ruby_declarer);
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
// getComponentConfiguration is executed in the topology creation time, before serialisation.
// do not set the _proxyBolt instance variable here to avoid JRuby serialization
// issues. Just create tmp bolt instance to call declareOutputFields.
IRichBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName);
return bolt.getComponentConfiguration();
}
// just create tmp bolt instance to call getComponentConfiguration.
private static IRichBolt newProxyBolt(String baseClassPath, String realBoltClassName) {
IRubyObject ruby_bolt = initialize_ruby_bolt();
IRubyObject ruby_result = Helpers.invoke(__ruby__.getCurrentContext(), ruby_bolt, "get_component_configuration");
return (Map)ruby_result.toJava(Map.class);
}
private IRubyObject initialize_ruby_bolt() {
__ruby__ = Ruby.getGlobalRuntime();
RubyModule ruby_class;
try {
redstorm.proxy.Bolt proxy = new redstorm.proxy.Bolt(baseClassPath, realBoltClassName);
return proxy;
ruby_class = __ruby__.getClassFromPath(_realBoltClassName);
}
catch (Exception e) {
throw new RuntimeException(e);
catch (RaiseException e) {
// after deserialization we need to recreate ruby environment
__ruby__.evalScriptlet(_bootstrap);
ruby_class = __ruby__.getClassFromPath(_realBoltClassName);
}
return Helpers.invoke(__ruby__.getCurrentContext(), ruby_class, "new");
}
}