Merge branch 'master' of git://github.com/adsummos/redstorm into transactional

This commit is contained in:
Colin Surprenant 2013-05-10 15:37:58 -04:00
commit c55e69b983
18 changed files with 818 additions and 2 deletions

View File

@ -1,7 +1,7 @@
PATH
remote: .
specs:
redstorm (0.6.4)
redstorm (0.6.4.1)
rake
GEM

View File

@ -6,3 +6,4 @@ require 'red_storm/configuration'
require 'red_storm/simple_bolt'
require 'red_storm/simple_spout'
require 'red_storm/simple_topology'
require 'red_storm/helpers'

26
lib/red_storm/helpers.rb Normal file
View File

@ -0,0 +1,26 @@
# convience classes to change java method signature to ruby
module RedStorm
module TransactionalEmitter
def emitBatch(tx, coordinatorMeta, collector)
emit_batch(tx, coordinatorMeta, collector)
end
def cleanupBefore(txid)
cleanup_before(txid) if respond_to?(:cleanup_before)
end
end
module TransactionalCoordinator
def initializeTransaction(txid, prevMetadata)
initialize_transaction(txid, prevMetadata) if respond_to?(:initialize_transaction)
end
def isReady
respond_to?(:ready?) ? ready? : true
end
end
end

View File

@ -0,0 +1,63 @@
require 'java'
java_import 'backtype.storm.coordination.BatchOutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.topology.IRichBolt'
java_import 'backtype.storm.coordination.IBatchBolt'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.tuple.Tuple'
java_import 'java.util.Map'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
# the Bolt class is a proxy to the real bolt to avoid having to deal with all the
# Java artifacts when creating a bolt.
#
# The real bolt class implementation must define these methods:
# - prepare(conf, context, collector)
# - execute(tuple)
# - declare_output_fields
#
# and optionnaly:
# - cleanup
#
class BatchBolt
java_implements IBatchBolt
java_signature 'IBatchBolt (String base_class_path, String real_bolt_class_name)'
def initialize(base_class_path, real_bolt_class_name)
@real_bolt = Object.module_eval(real_bolt_class_name).new
rescue NameError
require base_class_path
@real_bolt = Object.module_eval(real_bolt_class_name).new
end
java_signature 'void prepare(Map, TopologyContext, BatchOutputCollector, Object)'
def prepare(conf, context, collector, id)
@real_bolt.prepare(conf, context, collector, id)
end
java_signature 'void execute(Tuple)'
def execute(tuple)
@real_bolt.execute(tuple)
end
java_signature 'void finishBatch()'
def finishBatch
@real_bolt.finish_batch if @real_bolt.respond_to?(:finish_batch)
end
java_signature 'void declareOutputFields(OutputFieldsDeclarer)'
def declareOutputFields(declarer)
@real_bolt.declare_output_fields(declarer)
end
java_signature 'Map<String, Object> getComponentConfiguration()'
def getComponentConfiguration
@real_bolt.get_component_configuration
end
end

View File

@ -0,0 +1,52 @@
require 'java'
java_import 'backtype.storm.coordination.BatchOutputCollector'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.coordination.IBatchBolt'
java_import 'backtype.storm.transactional.ICommitter'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'backtype.storm.tuple.Tuple'
java_import 'java.util.Map'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
class BatchCommitterBolt
java_implements 'ICommitter, IBatchBolt'
java_signature 'IBatchCommitterBolt (String base_class_path, String real_bolt_class_name)'
def initialize(base_class_path, real_bolt_class_name)
@real_bolt = Object.module_eval(real_bolt_class_name).new
rescue NameError
require base_class_path
@real_bolt = Object.module_eval(real_bolt_class_name).new
end
java_signature 'void prepare(Map, TopologyContext, BatchOutputCollector, Object)'
def prepare(conf, context, collector, id)
@real_bolt.prepare(conf, context, collector, id)
end
java_signature 'void execute(Tuple)'
def execute(tuple)
@real_bolt.execute(tuple)
end
java_signature 'void finishBatch()'
def finishBatch
@real_bolt.finish_batch if @real_bolt.respond_to?(:finish_batch)
end
java_signature 'void declareOutputFields(OutputFieldsDeclarer)'
def declareOutputFields(declarer)
@real_bolt.declare_output_fields(declarer)
end
java_signature 'Map<String, Object> getComponentConfiguration()'
def getComponentConfiguration
@real_bolt.get_component_configuration
end
end

View File

@ -0,0 +1,59 @@
require 'java'
java_import 'backtype.storm.task.TopologyContext'
java_import 'storm.trident.operation.TridentCollector'
java_import 'storm.trident.spout.IBatchSpout'
java_import 'backtype.storm.tuple.Fields'
java_import 'java.util.Map'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
# the Spout class is a proxy to the real spout to avoid having to deal with all the
# Java artifacts when creating a spout.
class BatchSpout
java_implements IBatchSpout
java_signature 'IBatchSpout (String base_class_path, String real_spout_class_name)'
def initialize(base_class_path, real_spout_class_name)
@real_spout = Object.module_eval(real_spout_class_name).new
rescue NameError
require base_class_path
@real_spout = Object.module_eval(real_spout_class_name).new
end
java_signature 'void open(Map, TopologyContext)'
def open(conf, context)
@real_spout.open(conf, context) if @real_spout.respond_to?(:open)
end
java_signature 'void emitBatch(long, TridentCollector)'
def emitBatch(batch_id, collector)
@real_spout.emit_batch(batch_id, collector)
end
java_signature 'void close()'
def close
@real_spout.close if @real_spout.respond_to?(:close)
end
java_signature 'void ack(long)'
def ack(batch_id)
@real_spout.ack(batch_id) if @real_spout.respond_to?(:ack)
end
java_signature 'Fields getOutputFields()'
def getOutputFields()
@real_spout.get_output_fields
end
java_signature 'Map<String, Object> getComponentConfiguration()'
def getComponentConfiguration
@real_spout.get_component_configuration
end
end

View File

@ -0,0 +1,47 @@
require 'java'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.transactional.ITransactionalSpout'
java_import 'backtype.storm.transactional.ICommitterTransactionalSpout'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'java.util.Map'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
class TransactionalCommitterSpout
java_implements 'ICommitterTransactionalSpout'
java_signature 'ICommitterTransactionalSpout (String base_class_path, String real_spout_class_name)'
def initialize(base_class_path, real_spout_class_name)
@real_spout = Object.module_eval(real_spout_class_name).new
rescue NameError
require base_class_path
@real_spout = Object.module_eval(real_spout_class_name).new
end
java_signature 'ICommitterTransactionalSpout.Emitter getEmitter(Map, TopologyContext)'
def getEmitter(conf, context)
@real_spout.get_emitter(conf, context)
end
java_signature 'Coordinator getCoordinator(Map, TopologyContext)'
def getCoordinator(conf, context)
@real_spout.get_coordinator(conf, context)
end
java_signature 'void declareOutputFields(OutputFieldsDeclarer)'
def declareOutputFields(declarer)
@real_spout.declare_output_fields(declarer)
end
java_signature 'Map<String, Object> getComponentConfiguration()'
def getComponentConfiguration
@real_spout.get_component_configuration
end
end

View File

@ -0,0 +1,46 @@
require 'java'
java_import 'backtype.storm.task.TopologyContext'
java_import 'backtype.storm.transactional.ITransactionalSpout'
java_import 'backtype.storm.topology.OutputFieldsDeclarer'
java_import 'java.util.Map'
module Backtype
java_import 'backtype.storm.Config'
end
java_package 'redstorm.proxy'
class TransactionalSpout
java_implements 'ITransactionalSpout'
java_signature 'ITransactionalSpout (String base_class_path, String real_spout_class_name)'
def initialize(base_class_path, real_spout_class_name)
@real_spout = Object.module_eval(real_spout_class_name).new
rescue NameError
require base_class_path
@real_spout = Object.module_eval(real_spout_class_name).new
end
java_signature 'Emitter getEmitter(Map, TopologyContext)'
def getEmitter(conf, context)
@real_spout.get_emitter(conf, context)
end
java_signature 'Coordinator getCoordinator(Map, TopologyContext)'
def getCoordinator(conf, context)
@real_spout.get_coordinator(conf, context)
end
java_signature 'void declareOutputFields(OutputFieldsDeclarer)'
def declareOutputFields(declarer)
@real_spout.declare_output_fields(declarer)
end
java_signature 'Map<String, Object> getComponentConfiguration()'
def getComponentConfiguration
@real_spout.get_component_configuration
end
end

View File

@ -8,12 +8,19 @@ end
java_import 'backtype.storm.LocalCluster'
java_import 'backtype.storm.StormSubmitter'
java_import 'backtype.storm.topology.TopologyBuilder'
java_import 'backtype.storm.coordination.BatchBoltExecutor'
java_import 'backtype.storm.tuple.Fields'
java_import 'backtype.storm.tuple.Tuple'
java_import 'backtype.storm.tuple.Values'
java_import 'redstorm.storm.jruby.JRubyBolt'
java_import 'redstorm.storm.jruby.JRubySpout'
java_import 'redstorm.storm.jruby.JRubyBatchBolt'
java_import 'redstorm.storm.jruby.JRubyBatchCommitterBolt'
java_import 'redstorm.storm.jruby.JRubyBatchSpout'
java_import 'redstorm.storm.jruby.JRubyTransactionalSpout'
java_import 'redstorm.storm.jruby.JRubyTransactionalBolt'
java_import 'redstorm.storm.jruby.JRubyTransactionalCommitterBolt'
java_package 'redstorm'

View File

@ -1,3 +1,3 @@
module RedStorm
VERSION = '0.6.4'
VERSION = '0.6.4.1'
end

95
src/main/MurmurHash.java Normal file
View File

@ -0,0 +1,95 @@
// Takes the field specified in the FIELD_TO_HASH pararmeter and create 32-bit Murmur hash out of it in a field called HashedId
// Using Murmurhash from http://murmurhash.googlepages.com/
public final class MurmurHash {
public static long hash64(final byte[] data, int length, int seed) {
final long m = 0xc6a4a7935bd1e995L;
final int r = 47;
long h = (seed & 0xffffffffl) ^ (length * m);
int length8 = length / 8;
for (int i = 0; i < length8; i++) {
final int i8 = i * 8;
long k = ((long) data[i8 + 0] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8)
+ (((long) data[i8 + 2] & 0xff) << 16) + (((long) data[i8 + 3] & 0xff) << 24)
+ (((long) data[i8 + 4] & 0xff) << 32) + (((long) data[i8 + 5] & 0xff) << 40)
+ (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 + 7] & 0xff) << 56);
k *= m;
k ^= k >>> r;
k *= m;
h ^= k;
h *= m;
}
switch (length % 8) {
case 7:
h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48;
case 6:
h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40;
case 5:
h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32;
case 4:
h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24;
case 3:
h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16;
case 2:
h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8;
case 1:
h ^= (long) (data[length & ~7] & 0xff);
h *= m;
}
;
h ^= h >>> r;
h *= m;
h ^= h >>> r;
return h;
}
/**
* Generates 64 bit hash from byte array with default seed value.
*
* @param data
* byte array to hash
* @param length
* length of the array to hash
* @return 64 bit hash of the given string
*/
public static long hash64(final byte[] data, int length) {
return hash64(data, length, 0xe17a1465);
}
/**
* Generates 64 bit hash from a string.
*
* @param text
* string to hash
* @return 64 bit hash of the given string
*/
public static long hash64(final String text) {
final byte[] bytes = text.getBytes();
return hash64(bytes, bytes.length);
}
/**
* Generates 64 bit hash from a substring.
*
* @param text
* string to hash
* @param from
* starting index
* @param length
* length of the substring to hash
* @return 64 bit hash of the given array
*/
public static long hash64(final String text, int from, int length) {
return hash64(text.substring(from, from + length));
}
}

View File

@ -0,0 +1,82 @@
package redstorm.storm.jruby;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseBatchBolt;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.coordination.IBatchBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import java.util.Map;
/**
* the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization process when dispatching
* the bolts to the workers. JRuby does not yet support serialization from Java
* (Java serialization call on a JRuby class).
*
* Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after
* deserialization at the worker and in the declareOutputFields method which is called once before
* serialization at topology creation.
*/
public class JRubyBatchBolt extends BaseBatchBolt {
IBatchBolt _proxyBolt;
String _realBoltClassName;
String _baseClassPath;
/**
* create a new JRubyBolt
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realBoltClassName the fully qualified JRuby bolt implementation class name
*/
public JRubyBatchBolt(String baseClassPath, String realBoltClassName) {
_baseClassPath = baseClassPath;
_realBoltClassName = realBoltClassName;
}
@Override
public void prepare(final Map stormConf, final TopologyContext context, final BatchOutputCollector collector, final Object id) {
// create instance of the jruby class here, after deserialization in the workers.
_proxyBolt = newProxyBolt(_baseClassPath, _realBoltClassName);
_proxyBolt.prepare(stormConf, context, collector, id);
}
@Override
public void execute(Tuple input) {
_proxyBolt.execute(input);
}
@Override
public void finishBatch() {
_proxyBolt.finishBatch();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declareOutputFields is executed in the topology creation time, before serialisation.
// do not set the _proxyBolt instance variable here to avoid JRuby serialization
// issues. Just create tmp bolt instance to call declareOutputFields.
IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName);
bolt.declareOutputFields(declarer);
}
@Override
public Map<String, Object> getComponentConfiguration() {
// getComponentConfiguration is executed in the topology creation time, before serialisation.
// do not set the _proxyBolt instance variable here to avoid JRuby serialization
// issues. Just create tmp bolt instance to call declareOutputFields.
IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName);
return bolt.getComponentConfiguration();
}
private static IBatchBolt newProxyBolt(String baseClassPath, String realBoltClassName) {
try {
redstorm.proxy.BatchBolt proxy = new redstorm.proxy.BatchBolt(baseClassPath, realBoltClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,9 @@
package redstorm.storm.jruby;
import backtype.storm.transactional.ICommitter;
public class JRubyBatchCommitterBolt extends JRubyBatchBolt implements ICommitter {
public JRubyBatchCommitterBolt(String baseClassPath, String realBoltClassName) {
super(baseClassPath, realBoltClassName);
}
}

View File

@ -0,0 +1,88 @@
package redstorm.storm.jruby;
import backtype.storm.tuple.Fields;
import backtype.storm.task.TopologyContext;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;
import java.util.Map;
/**
* the JRubySpout class is a simple proxy class to the actual spout implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization process when dispatching
* the spout to the workers. JRuby does not yet support serialization from Java
* (Java serialization call on a JRuby class).
*
* Note that the JRuby spout proxy class is instanciated in the open method which is called after
* deserialization at the worker and in both the declareOutputFields and isDistributed methods which
* are called once before serialization at topology creation.
*/
public class JRubyBatchSpout implements IBatchSpout {
IBatchSpout _proxySpout;
String _realSpoutClassName;
String _baseClassPath;
/**
* create a new JRubySpout
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realSpoutClassName the fully qualified JRuby spout implementation class name
*/
public JRubyBatchSpout(String baseClassPath, String realSpoutClassName) {
_baseClassPath = baseClassPath;
_realSpoutClassName = realSpoutClassName;
}
@Override
public void open(final Map conf, final TopologyContext context) {
// create instance of the jruby proxy class here, after deserialization in the workers.
_proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName);
_proxySpout.open(conf, context);
}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
_proxySpout.emitBatch(batchId, collector);
}
@Override
public void close() {
_proxySpout.close();
}
@Override
public void ack(long batchId) {
_proxySpout.ack(batchId);
}
@Override
public Fields getOutputFields() {
if (_proxySpout == null) {
// getOutputFields is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call getOutputFields.
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.getOutputFields();
} else {
return _proxySpout.getOutputFields();
}
}
@Override
public Map<String, Object> getComponentConfiguration() {
// getComponentConfiguration is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call declareOutputFields.
IBatchSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.getComponentConfiguration();
}
private static IBatchSpout newProxySpout(String baseClassPath, String realSpoutClassName) {
try {
redstorm.proxy.BatchSpout proxy = new redstorm.proxy.BatchSpout(baseClassPath, realSpoutClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,82 @@
package redstorm.storm.jruby;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.topology.base.BaseTransactionalBolt;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.coordination.IBatchBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import java.util.Map;
/**
* the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization process when dispatching
* the bolts to the workers. JRuby does not yet support serialization from Java
* (Java serialization call on a JRuby class).
*
* Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after
* deserialization at the worker and in the declareOutputFields method which is called once before
* serialization at topology creation.
*/
public class JRubyTransactionalBolt extends BaseTransactionalBolt {
IBatchBolt _proxyBolt;
String _realBoltClassName;
String _baseClassPath;
/**
* create a new JRubyBolt
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realBoltClassName the fully qualified JRuby bolt implementation class name
*/
public JRubyTransactionalBolt(String baseClassPath, String realBoltClassName) {
_baseClassPath = baseClassPath;
_realBoltClassName = realBoltClassName;
}
@Override
public void prepare(final Map stormConf, final TopologyContext context, final BatchOutputCollector collector, final TransactionAttempt id) {
// create instance of the jruby class here, after deserialization in the workers.
_proxyBolt = newProxyBolt(_baseClassPath, _realBoltClassName);
_proxyBolt.prepare(stormConf, context, collector, id);
}
@Override
public void execute(Tuple input) {
_proxyBolt.execute(input);
}
@Override
public void finishBatch() {
_proxyBolt.finishBatch();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declareOutputFields is executed in the topology creation time, before serialisation.
// do not set the _proxyBolt instance variable here to avoid JRuby serialization
// issues. Just create tmp bolt instance to call declareOutputFields.
IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName);
bolt.declareOutputFields(declarer);
}
@Override
public Map<String, Object> getComponentConfiguration() {
// getComponentConfiguration is executed in the topology creation time, before serialisation.
// do not set the _proxyBolt instance variable here to avoid JRuby serialization
// issues. Just create tmp bolt instance to call declareOutputFields.
IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName);
return bolt.getComponentConfiguration();
}
private static IBatchBolt newProxyBolt(String baseClassPath, String realBoltClassName) {
try {
redstorm.proxy.BatchBolt proxy = new redstorm.proxy.BatchBolt(baseClassPath, realBoltClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,31 @@
package redstorm.storm.jruby;
import backtype.storm.coordination.IBatchBolt;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.transactional.ICommitter;
/**
* the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization process when dispatching
* the bolts to the workers. JRuby does not yet support serialization from Java
* (Java serialization call on a JRuby class).
*
* Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after
* deserialization at the worker and in the declareOutputFields method which is called once before
* serialization at topology creation.
*/
public class JRubyTransactionalCommitterBolt extends JRubyTransactionalBolt implements ICommitter {
public JRubyTransactionalCommitterBolt(String baseClassPath, String realBoltClassName) {
super(baseClassPath, realBoltClassName);
}
private static IBatchBolt newProxyBolt(String baseClassPath, String realBoltClassName) {
try {
redstorm.proxy.BatchCommitterBolt proxy = new redstorm.proxy.BatchCommitterBolt(baseClassPath, realBoltClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,44 @@
package redstorm.storm.jruby;
import backtype.storm.transactional.ICommitterTransactionalSpout;
import backtype.storm.transactional.ITransactionalSpout;
import backtype.storm.task.TopologyContext;
import java.util.Map;
/**
* the JRubyBolt class is a simple proxy class to the actual bolt implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization process when dispatching
* the bolts to the workers. JRuby does not yet support serialization from Java
* (Java serialization call on a JRuby class).
*
* Note that the JRuby bolt proxy class is instanciated in the prepare method which is called after
* deserialization at the worker and in the declareOutputFields method which is called once before
* serialization at topology creation.
*/
public class JRubyTransactionalCommitterSpout extends JRubyTransactionalSpout implements ICommitterTransactionalSpout {
ICommitterTransactionalSpout _proxySpout;
public JRubyTransactionalCommitterSpout(String baseClassPath, String realSpoutClassName) {
super(baseClassPath, realSpoutClassName);
}
@Override
public ICommitterTransactionalSpout.Emitter getEmitter(Map conf, TopologyContext context) {
// create instance of the jruby class here, after deserialization in the workers.
if (_proxySpout == null) {
_proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName);
}
return _proxySpout.getEmitter(conf, context);
}
private static ICommitterTransactionalSpout newProxySpout(String baseClassPath, String realSpoutClassName) {
try {
redstorm.proxy.TransactionalCommitterSpout proxy = new redstorm.proxy.TransactionalCommitterSpout(baseClassPath, realSpoutClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,84 @@
package redstorm.storm.jruby;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseTransactionalSpout;
import backtype.storm.transactional.ITransactionalSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import java.util.Map;
/**
* the JRubySpout class is a simple proxy class to the actual spout implementation in JRuby.
* this proxy is required to bypass the serialization/deserialization process when dispatching
* the spout to the workers. JRuby does not yet support serialization from Java
* (Java serialization call on a JRuby class).
*
* Note that the JRuby spout proxy class is instanciated in the open method which is called after
* deserialization at the worker and in both the declareOutputFields and isDistributed methods which
* are called once before serialization at topology creation.
*/
public class JRubyTransactionalSpout extends BaseTransactionalSpout {
ITransactionalSpout _proxySpout;
String _realSpoutClassName;
String _baseClassPath;
/**
* create a new JRubySpout
*
* @param baseClassPath the topology/project base JRuby class file path
* @param realSpoutClassName the fully qualified JRuby spout implementation class name
*/
public JRubyTransactionalSpout(String baseClassPath, String realSpoutClassName) {
_baseClassPath = baseClassPath;
_realSpoutClassName = realSpoutClassName;
}
@Override
public Coordinator getCoordinator(Map conf, TopologyContext context) {
// create instance of the jruby class here, after deserialization in the workers.
if (_proxySpout == null) {
_proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName);
}
return _proxySpout.getCoordinator(conf, context);
}
@Override
public Emitter getEmitter(Map conf, TopologyContext context) {
// create instance of the jruby class here, after deserialization in the workers.
if (_proxySpout == null) {
_proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName);
}
return _proxySpout.getEmitter(conf, context);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// declareOutputFields is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call declareOutputFields.
ITransactionalSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
spout.declareOutputFields(declarer);
}
@Override
public Map<String, Object> getComponentConfiguration() {
// getComponentConfiguration is executed in the topology creation time before serialisation.
// do not set the _proxySpout instance variable here to avoid JRuby serialization
// issues. Just create tmp spout instance to call declareOutputFields.
ITransactionalSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName);
return spout.getComponentConfiguration();
}
private static ITransactionalSpout newProxySpout(String baseClassPath, String realSpoutClassName) {
try {
redstorm.proxy.TransactionalSpout proxy = new redstorm.proxy.TransactionalSpout(baseClassPath, realSpoutClassName);
return proxy;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
}