Implementation of BatchSpout proxy class

This commit is contained in:
Paul Bergeron 2012-12-05 13:57:12 -08:00
parent 3b4e4d6f27
commit e3e5fb957a
2 changed files with 160 additions and 0 deletions

View File

@ -0,0 +1,71 @@
require 'java'
java_import 'storm.trident.operation.TridentCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'storm.trident.spout.IBatchSpout'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Values'
java_import 'java.util.Map'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
# the BatchSpout class is a proxy to the real batch spout to avoid having to deal with all the
# Java artifacts when creating a spout.
#
# The real batch spout class implementation must define these methods:
# - open(conf, context, collector)
# - emitBatch
# - getOutputFields
# - ack(batch_id)
#
# and optionnaly:
# - close
#
class BatchSpout
java_implements IBatchSpout
java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)'
def initialize(base_class_path, real_spout_class_name)
@real_spout = Object.module_eval(real_spout_class_name).new
rescue NameError
require base_class_path
@real_spout = Object.module_eval(real_spout_class_name).new
end
java_signature 'void open(Map, TopologyContext)'
def open(conf, context)
@real_spout.open(conf, context)
end
java_signature 'void close()'
def close
@real_spout.close if @real_spout.respond_to?(:close)
end
java_signature 'void emitBatch(long, TridentCollector)'
def emitBatch(batch_id, collector)
@real_spout.emit_batch(batch_id, collector)
end
java_signature 'void ack(long)'
def ack(batch_id)
@real_spout.ack(batch_id)
end
java_signature 'Fields getOutputFields()'
def getOutputFields
@real_spout.get_output_fields()
end
java_signature 'Map<String, Object> getComponentConfiguration()'
def getComponentConfiguration
@real_spout.get_component_configuration
end
end

View File

@ -0,0 +1,89 @@
package redstorm.storm.jruby;
import storm.trident.operation.TridentCollector;
import backtype.storm.task.TopologyContext;
import storm.trident.spout.IBatchSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import java.util.Map;
/**
* the JRubyBatchSpout class is a simple proxy class to the actual spout implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization process when dispatching
* the spout to the workers. JRuby does not yet support serialization from Java
* (Java serialization call on a JRuby class).
*
* Note that the JRuby spout proxy class is instanciated in the open method which is called after
* deserialization at the worker and in both the declareOutputFields and isDistributed methods which
* are called once before serialization at topology creation.
*/
public class JRubyBatchSpout implements IBatchSpout {
IBatchSpout _proxySpout;
String _realSpoutClassName;
String _baseClassPath;
String[] _fields;
/**
* create a new JRubyBatchSpout
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realSpoutClassName the fully qualified JRuby spout implementation class name
*/
public JRubyBatchSpout(String baseClassPath, String realSpoutClassName, String[] fields) {
_baseClassPath = baseClassPath;
_realSpoutClassName = realSpoutClassName;
_fields = fields;
}
@Override
public void open(final Map conf, final TopologyContext context) {
// create instance of the jruby proxy class here, after deserialization in the workers.
_proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName);
_proxySpout.open(conf, context);
}
@Override
public void emitBatch(final long batchId, final TridentCollector collector) {
_proxySpout.emitBatch(batchId, collector);
}
@Override
public void close() {
_proxySpout.close();
}
@Override
public void ack(final long batchId) {
_proxySpout.ack(batchId);
}
@Override
public Fields getOutputFields() {
// getOutputFields is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call declareOutputFields.
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.getOutputFields();
}
@Override
public Map<String, Object> getComponentConfiguration() {
// getComponentConfiguration is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call declareOutputFields.
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.getComponentConfiguration();
}
private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) {
try {
redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}