2012-09-18 15:41:32 +00:00
|
|
|
package redstorm.storm.jruby;
|
|
|
|
|
|
|
|
import backtype.storm.task.OutputCollector;
|
|
|
|
import backtype.storm.task.TopologyContext;
|
|
|
|
import backtype.storm.transactional.TransactionAttempt;
|
|
|
|
import backtype.storm.topology.base.BaseTransactionalBolt;
|
|
|
|
import backtype.storm.coordination.BatchOutputCollector;
|
|
|
|
import backtype.storm.coordination.IBatchBolt;
|
|
|
|
import backtype.storm.topology.OutputFieldsDeclarer;
|
|
|
|
import backtype.storm.tuple.Tuple;
|
2013-05-13 04:03:36 +00:00
|
|
|
import backtype.storm.tuple.Fields;
|
2012-09-18 15:41:32 +00:00
|
|
|
import java.util.Map;
|
|
|
|
|
2014-03-02 08:04:01 +00:00
|
|
|
import org.jruby.Ruby;
|
|
|
|
import org.jruby.RubyObject;
|
|
|
|
import org.jruby.runtime.Helpers;
|
|
|
|
import org.jruby.runtime.builtin.IRubyObject;
|
|
|
|
import org.jruby.javasupport.JavaUtil;
|
|
|
|
import org.jruby.RubyModule;
|
|
|
|
import org.jruby.exceptions.RaiseException;
|
|
|
|
|
2012-09-18 15:41:32 +00:00
|
|
|
/**
|
2014-03-02 08:04:01 +00:00
|
|
|
* the JRubyBolt class is a proxy class to the actual bolt implementation in JRuby.
|
|
|
|
* this proxy is required to bypass the serialization/deserialization issues of JRuby objects.
|
|
|
|
* JRuby classes do not support Java serialization.
|
2012-09-18 15:41:32 +00:00
|
|
|
*
|
2014-03-02 08:04:01 +00:00
|
|
|
* Note that the JRuby bolt class is instanciated in the prepare method which is called after
|
|
|
|
* deserialization at the worker and in the declareOutputFields & getComponentConfiguration
|
|
|
|
* methods which are called once before serialization at topology creation.
|
2012-09-18 15:41:32 +00:00
|
|
|
*/
|
|
|
|
public class JRubyTransactionalBolt extends BaseTransactionalBolt {
|
2014-03-02 08:04:01 +00:00
|
|
|
private final String _realBoltClassName;
|
|
|
|
private final String[] _fields;
|
|
|
|
private final String _bootstrap;
|
|
|
|
|
|
|
|
// transient to avoid serialization
|
|
|
|
protected transient IRubyObject _ruby_bolt;
|
|
|
|
protected transient Ruby __ruby__;
|
2013-05-13 04:03:36 +00:00
|
|
|
|
2014-03-02 08:04:01 +00:00
|
|
|
/**
|
2012-09-18 15:41:32 +00:00
|
|
|
* create a new JRubyBolt
|
2014-03-02 08:04:01 +00:00
|
|
|
*
|
|
|
|
* @param baseClassPath the topology/project base JRuby class file path
|
2012-09-18 15:41:32 +00:00
|
|
|
* @param realBoltClassName the fully qualified JRuby bolt implementation class name
|
2014-03-02 08:04:01 +00:00
|
|
|
* @param fields the output fields names
|
2012-09-18 15:41:32 +00:00
|
|
|
*/
|
2013-05-13 04:03:36 +00:00
|
|
|
public JRubyTransactionalBolt(String baseClassPath, String realBoltClassName, String[] fields) {
|
2012-09-18 15:41:32 +00:00
|
|
|
_realBoltClassName = realBoltClassName;
|
2013-05-13 04:03:36 +00:00
|
|
|
_fields = fields;
|
2014-03-02 08:04:01 +00:00
|
|
|
_bootstrap = "require '" + baseClassPath + "'";
|
2012-09-18 15:41:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
2014-03-02 08:04:01 +00:00
|
|
|
public void prepare(final Map conf, final TopologyContext context, final BatchOutputCollector collector, final TransactionAttempt id) {
|
|
|
|
_ruby_bolt = initialize_ruby_bolt();
|
|
|
|
IRubyObject ruby_conf = JavaUtil.convertJavaToRuby(__ruby__, conf);
|
|
|
|
IRubyObject ruby_context = JavaUtil.convertJavaToRuby(__ruby__, context);
|
|
|
|
IRubyObject ruby_collector = JavaUtil.convertJavaToRuby(__ruby__, collector);
|
|
|
|
IRubyObject ruby_id = JavaUtil.convertJavaToRuby(__ruby__, id);
|
|
|
|
Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "prepare", ruby_conf, ruby_context, ruby_collector, ruby_id);
|
2012-09-18 15:41:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void execute(Tuple input) {
|
2014-03-02 08:04:01 +00:00
|
|
|
IRubyObject ruby_input = JavaUtil.convertJavaToRuby(__ruby__, input);
|
|
|
|
Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "execute", ruby_input);
|
2012-09-18 15:41:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void finishBatch() {
|
2014-03-02 08:04:01 +00:00
|
|
|
Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "finish_batch");
|
2012-09-18 15:41:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public void declareOutputFields(OutputFieldsDeclarer declarer) {
|
|
|
|
// declareOutputFields is executed in the topology creation time, before serialisation.
|
2014-03-02 08:04:01 +00:00
|
|
|
// just create tmp bolt instance to call declareOutputFields.
|
|
|
|
|
2013-05-13 04:03:36 +00:00
|
|
|
if (_fields.length > 0) {
|
|
|
|
declarer.declare(new Fields(_fields));
|
|
|
|
} else {
|
2014-03-02 08:04:01 +00:00
|
|
|
IRubyObject ruby_bolt = initialize_ruby_bolt();
|
|
|
|
IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer);
|
|
|
|
Helpers.invoke(__ruby__.getCurrentContext(), ruby_bolt, "declare_output_fields", ruby_declarer);
|
2013-05-13 04:03:36 +00:00
|
|
|
}
|
2012-09-18 15:41:32 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
|
|
public Map<String, Object> getComponentConfiguration() {
|
|
|
|
// getComponentConfiguration is executed in the topology creation time, before serialisation.
|
2014-03-02 08:04:01 +00:00
|
|
|
// just create tmp bolt instance to call getComponentConfiguration.
|
|
|
|
|
|
|
|
IRubyObject ruby_bolt = initialize_ruby_bolt();
|
|
|
|
IRubyObject ruby_result = Helpers.invoke(__ruby__.getCurrentContext(), ruby_bolt, "get_component_configuration");
|
|
|
|
return (Map)ruby_result.toJava(Map.class);
|
2012-09-18 15:41:32 +00:00
|
|
|
}
|
2014-03-02 08:04:01 +00:00
|
|
|
|
|
|
|
protected IRubyObject initialize_ruby_bolt() {
|
|
|
|
__ruby__ = Ruby.getGlobalRuntime();
|
|
|
|
|
|
|
|
RubyModule ruby_class;
|
2012-09-18 15:41:32 +00:00
|
|
|
try {
|
2014-03-02 08:04:01 +00:00
|
|
|
ruby_class = __ruby__.getClassFromPath(_realBoltClassName);
|
2012-09-18 15:41:32 +00:00
|
|
|
}
|
2014-03-02 08:04:01 +00:00
|
|
|
catch (RaiseException e) {
|
|
|
|
// after deserialization we need to recreate ruby environment
|
|
|
|
__ruby__.evalScriptlet(_bootstrap);
|
|
|
|
ruby_class = __ruby__.getClassFromPath(_realBoltClassName);
|
2012-09-18 15:41:32 +00:00
|
|
|
}
|
2014-03-02 08:04:01 +00:00
|
|
|
return Helpers.invoke(__ruby__.getCurrentContext(), ruby_class, "new");
|
2012-09-18 15:41:32 +00:00
|
|
|
}
|
|
|
|
}
|