Support more the one bolt in LinearDRPC, add support for grouping

This commit is contained in:
Paul Bergeron 2012-12-10 16:36:07 -08:00
parent 348ac62965
commit 94ccdce6ee
1 changed files with 28 additions and 9 deletions

View File

@ -5,14 +5,36 @@ require 'red_storm/configurator'
module RedStorm
class InputBoltDefinition < SimpleTopology::BoltDefinition
attr_accessor :grouping
def source(source_fields)
@sources << source_fields
def initialize(*args)
super
@grouping = :none
end
def grouping(grouping)
@grouping = @grouping
end
def define_grouping(declarer)
@sources.each do |source_fields|
declare.declare(Fields.new(*(Array.wrap(source_fields).map(&:to_s))))
case @grouping
when :fields
declarer.fieldsGrouping(Fields.new(*([params].flatten.map(&:to_s))))
when :global
declarer.globalGrouping()
when :shuffle
declarer.shuffleGrouping()
when :local_or_shuffle
declarer.localOrShuffleGrouping()
when :none
declarer.noneGrouping()
when :all
declarer.allGrouping()
when :direct
declarer.directGrouping()
else
raise("unknown grouper=#{grouper.inspect}")
end
end
end
@ -23,16 +45,13 @@ module RedStorm
raise TopologyDefinitionError, "DRPC spout is already defined"
end
def start(base_class_path, env)
# self.class.resolve_ids!(self.class.components)
builder = LinearDRPCTopologyBuilder.new(self.class.topology_name)
self.class.bolts.each do |bolt|
declarer = builder.addBolt(bolt.new_instance(base_class_path), bolt.parallelism.to_java)
#declarer.addConfigurations(bolt.config)
#bolt.define_grouping(declarer)
declarer.addConfigurations(bolt.config)
bolt.define_grouping(declarer)
end
# set the JRuby compatibility mode option for Storm workers, default to current JRuby mode