From 72fbac4e92c878e72154a257850d87f0b948eb52 Mon Sep 17 00:00:00 2001 From: Colin Surprenant Date: Mon, 13 May 2013 00:03:36 -0400 Subject: [PATCH] added fields constructor arg for future dsl integration --- .../redstorm/storm/jruby/JRubyBatchBolt.java | 14 +++++++++++--- .../storm/jruby/JRubyBatchCommitterBolt.java | 4 ++-- .../storm/jruby/JRubyTransactionalBolt.java | 16 ++++++++++++---- .../JRubyTransactionalCommitterBolt.java | 4 ++-- .../JRubyTransactionalCommitterSpout.java | 4 ++-- .../storm/jruby/JRubyTransactionalSpout.java | 19 ++++++++++++------- 6 files changed, 41 insertions(+), 20 deletions(-) diff --git a/src/main/redstorm/storm/jruby/JRubyBatchBolt.java b/src/main/redstorm/storm/jruby/JRubyBatchBolt.java index 5d39277..f46e57c 100644 --- a/src/main/redstorm/storm/jruby/JRubyBatchBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyBatchBolt.java @@ -7,6 +7,7 @@ import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.coordination.IBatchBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Fields; import java.util.Map; /** @@ -23,15 +24,18 @@ public class JRubyBatchBolt extends BaseBatchBolt { IBatchBolt _proxyBolt; String _realBoltClassName; String _baseClassPath; + String[] _fields; + /** * create a new JRubyBolt * * @param baseClassPath the topology/project base JRuby class file path * @param realBoltClassName the fully qualified JRuby bolt implementation class name */ - public JRubyBatchBolt(String baseClassPath, String realBoltClassName) { + public JRubyBatchBolt(String baseClassPath, String realBoltClassName, String[] fields) { _baseClassPath = baseClassPath; _realBoltClassName = realBoltClassName; + _fields = fields; } @Override @@ -56,8 +60,12 @@ public class JRubyBatchBolt extends BaseBatchBolt { // declareOutputFields is executed in the topology creation time, before serialisation. // do not set the _proxyBolt instance variable here to avoid JRuby serialization // issues. Just create tmp bolt instance to call declareOutputFields. - IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); - bolt.declareOutputFields(declarer); + if (_fields.length > 0) { + declarer.declare(new Fields(_fields)); + } else { + IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); + bolt.declareOutputFields(declarer); + } } @Override diff --git a/src/main/redstorm/storm/jruby/JRubyBatchCommitterBolt.java b/src/main/redstorm/storm/jruby/JRubyBatchCommitterBolt.java index dce6628..657d276 100644 --- a/src/main/redstorm/storm/jruby/JRubyBatchCommitterBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyBatchCommitterBolt.java @@ -3,7 +3,7 @@ package redstorm.storm.jruby; import backtype.storm.transactional.ICommitter; public class JRubyBatchCommitterBolt extends JRubyBatchBolt implements ICommitter { - public JRubyBatchCommitterBolt(String baseClassPath, String realBoltClassName) { - super(baseClassPath, realBoltClassName); + public JRubyBatchCommitterBolt(String baseClassPath, String realBoltClassName, String[] fields) { + super(baseClassPath, realBoltClassName, fields); } } \ No newline at end of file diff --git a/src/main/redstorm/storm/jruby/JRubyTransactionalBolt.java b/src/main/redstorm/storm/jruby/JRubyTransactionalBolt.java index 2f58f97..7f162e5 100644 --- a/src/main/redstorm/storm/jruby/JRubyTransactionalBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyTransactionalBolt.java @@ -8,6 +8,7 @@ import backtype.storm.coordination.BatchOutputCollector; import backtype.storm.coordination.IBatchBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Fields; import java.util.Map; /** @@ -24,15 +25,18 @@ public class JRubyTransactionalBolt extends BaseTransactionalBolt { IBatchBolt _proxyBolt; String _realBoltClassName; String _baseClassPath; - /** + String[] _fields; + + /** * create a new JRubyBolt * * @param baseClassPath the topology/project base JRuby class file path * @param realBoltClassName the fully qualified JRuby bolt implementation class name */ - public JRubyTransactionalBolt(String baseClassPath, String realBoltClassName) { + public JRubyTransactionalBolt(String baseClassPath, String realBoltClassName, String[] fields) { _baseClassPath = baseClassPath; _realBoltClassName = realBoltClassName; + _fields = fields; } @Override @@ -57,8 +61,12 @@ public class JRubyTransactionalBolt extends BaseTransactionalBolt { // declareOutputFields is executed in the topology creation time, before serialisation. // do not set the _proxyBolt instance variable here to avoid JRuby serialization // issues. Just create tmp bolt instance to call declareOutputFields. - IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); - bolt.declareOutputFields(declarer); + if (_fields.length > 0) { + declarer.declare(new Fields(_fields)); + } else { + IBatchBolt bolt = newProxyBolt(_baseClassPath, _realBoltClassName); + bolt.declareOutputFields(declarer); + } } @Override diff --git a/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterBolt.java b/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterBolt.java index 88c9684..008f722 100644 --- a/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterBolt.java +++ b/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterBolt.java @@ -15,8 +15,8 @@ import backtype.storm.transactional.ICommitter; * serialization at topology creation. */ public class JRubyTransactionalCommitterBolt extends JRubyTransactionalBolt implements ICommitter { - public JRubyTransactionalCommitterBolt(String baseClassPath, String realBoltClassName) { - super(baseClassPath, realBoltClassName); + public JRubyTransactionalCommitterBolt(String baseClassPath, String realBoltClassName, String[] fields) { + super(baseClassPath, realBoltClassName, fields); } private static IBatchBolt newProxyBolt(String baseClassPath, String realBoltClassName) { diff --git a/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterSpout.java b/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterSpout.java index 076b6f8..633c1f9 100644 --- a/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterSpout.java +++ b/src/main/redstorm/storm/jruby/JRubyTransactionalCommitterSpout.java @@ -19,8 +19,8 @@ public class JRubyTransactionalCommitterSpout extends JRubyTransactionalSpout im ICommitterTransactionalSpout _proxySpout; - public JRubyTransactionalCommitterSpout(String baseClassPath, String realSpoutClassName) { - super(baseClassPath, realSpoutClassName); + public JRubyTransactionalCommitterSpout(String baseClassPath, String realSpoutClassName, String[] fields) { + super(baseClassPath, realSpoutClassName, fields); } @Override diff --git a/src/main/redstorm/storm/jruby/JRubyTransactionalSpout.java b/src/main/redstorm/storm/jruby/JRubyTransactionalSpout.java index f2550ec..60768a6 100644 --- a/src/main/redstorm/storm/jruby/JRubyTransactionalSpout.java +++ b/src/main/redstorm/storm/jruby/JRubyTransactionalSpout.java @@ -2,12 +2,11 @@ package redstorm.storm.jruby; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; - import backtype.storm.topology.base.BaseTransactionalSpout; -import backtype.storm.transactional.ITransactionalSpout; - import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.transactional.ITransactionalSpout; import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Fields; import java.util.Map; /** @@ -24,6 +23,7 @@ public class JRubyTransactionalSpout extends BaseTransactionalSpout { ITransactionalSpout _proxySpout; String _realSpoutClassName; String _baseClassPath; + String[] _fields; /** * create a new JRubySpout @@ -31,9 +31,10 @@ public class JRubyTransactionalSpout extends BaseTransactionalSpout { * @param baseClassPath the topology/project base JRuby class file path * @param realSpoutClassName the fully qualified JRuby spout implementation class name */ - public JRubyTransactionalSpout(String baseClassPath, String realSpoutClassName) { + public JRubyTransactionalSpout(String baseClassPath, String realSpoutClassName, String[] fields) { _baseClassPath = baseClassPath; _realSpoutClassName = realSpoutClassName; + _fields = fields; } @Override @@ -46,7 +47,7 @@ public class JRubyTransactionalSpout extends BaseTransactionalSpout { } @Override - public Emitter getEmitter(Map conf, TopologyContext context) { + public ITransactionalSpout.Emitter getEmitter(Map conf, TopologyContext context) { // create instance of the jruby class here, after deserialization in the workers. if (_proxySpout == null) { _proxySpout = newProxySpout(_baseClassPath, _realSpoutClassName); @@ -59,8 +60,12 @@ public class JRubyTransactionalSpout extends BaseTransactionalSpout { // declareOutputFields is executed in the topology creation time before serialisation. // do not set the _proxySpout instance variable here to avoid JRuby serialization // issues. Just create tmp spout instance to call declareOutputFields. - ITransactionalSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); - spout.declareOutputFields(declarer); + if (_fields.length > 0) { + declarer.declare(new Fields(_fields)); + } else { + ITransactionalSpout spout = newProxySpout(_baseClassPath, _realSpoutClassName); + spout.declareOutputFields(declarer); + } } @Override