package redstorm.storm.jruby; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.transactional.TransactionAttempt; import backtype.storm.topology.base.BaseTransactionalBolt; import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.coordination.IBatchBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import java.util.Map; import org.jruby.Ruby; import org.jruby.RubyObject; import org.jruby.runtime.Helpers; import org.jruby.runtime.builtin.IRubyObject; import org.jruby.javasupport.JavaUtil; import org.jruby.RubyModule; import org.jruby.exceptions.RaiseException; /** * the JRubyBolt class is a proxy class to the actual bolt implementation in JRuby. * this proxy is required to bypass the serialization/deserialization issues of JRuby objects. * JRuby classes do not support Java serialization. * * Note that the JRuby bolt class is instanciated in the prepare method which is called after * deserialization at the worker and in the declareOutputFields & getComponentConfiguration * methods which are called once before serialization at topology creation. */ public class JRubyTransactionalBolt extends BaseTransactionalBolt { private final String _realBoltClassName; private final String[] _fields; private final String _bootstrap; // transient to avoid serialization protected transient IRubyObject _ruby_bolt; protected transient Ruby __ruby__; /** * create a new JRubyBolt * * @param baseClassPath the topology/project base JRuby class file path * @param realBoltClassName the fully qualified JRuby bolt implementation class name * @param fields the output fields names */ public JRubyTransactionalBolt(String baseClassPath, String realBoltClassName, String[] fields) { _realBoltClassName = realBoltClassName; _fields = fields; _bootstrap = "require '" + baseClassPath + "'"; } @Override public void prepare(final Map conf, final TopologyContext context, final BatchOutputCollector collector, final TransactionAttempt id) { _ruby_bolt = initialize_ruby_bolt(); IRubyObject ruby_conf = JavaUtil.convertJavaToRuby(__ruby__, conf); IRubyObject ruby_context = JavaUtil.convertJavaToRuby(__ruby__, context); IRubyObject ruby_collector = JavaUtil.convertJavaToRuby(__ruby__, collector); IRubyObject ruby_id = JavaUtil.convertJavaToRuby(__ruby__, id); Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "prepare", ruby_conf, ruby_context, ruby_collector, ruby_id); } @Override public void execute(Tuple input) { IRubyObject ruby_input = JavaUtil.convertJavaToRuby(__ruby__, input); Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "execute", ruby_input); } @Override public void finishBatch() { Helpers.invoke(__ruby__.getCurrentContext(), _ruby_bolt, "finish_batch"); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // declareOutputFields is executed in the topology creation time, before serialisation. // just create tmp bolt instance to call declareOutputFields. if (_fields.length > 0) { declarer.declare(new Fields(_fields)); } else { IRubyObject ruby_bolt = initialize_ruby_bolt(); IRubyObject ruby_declarer = JavaUtil.convertJavaToRuby(__ruby__, declarer); Helpers.invoke(__ruby__.getCurrentContext(), ruby_bolt, "declare_output_fields", ruby_declarer); } } @Override public Map getComponentConfiguration() { // getComponentConfiguration is executed in the topology creation time, before serialisation. // just create tmp bolt instance to call getComponentConfiguration. IRubyObject ruby_bolt = initialize_ruby_bolt(); IRubyObject ruby_result = Helpers.invoke(__ruby__.getCurrentContext(), ruby_bolt, "get_component_configuration"); return (Map)ruby_result.toJava(Map.class); } protected IRubyObject initialize_ruby_bolt() { __ruby__ = Ruby.getGlobalRuntime(); RubyModule ruby_class; try { ruby_class = __ruby__.getClassFromPath(_realBoltClassName); } catch (RaiseException e) { // after deserialization we need to recreate ruby environment __ruby__.evalScriptlet(_bootstrap); ruby_class = __ruby__.getClassFromPath(_realBoltClassName); } return Helpers.invoke(__ruby__.getCurrentContext(), ruby_class, "new"); } }