mirror of https://github.com/reiseburo/hermann
unify jruby/mri Consumer#initialize
While we do require different options between jruby & mri, there is a common "topic" option. The rest we pass in the options hash.
This commit is contained in:
parent
0690634e4c
commit
3792ee206b
|
@ -10,30 +10,27 @@ module Hermann
|
|||
# Hermann::Consumer provides a simple consumer API which is only safe to be
|
||||
# executed in a single thread
|
||||
class Consumer
|
||||
attr_reader :topic, :brokers, :partition, :internal
|
||||
attr_reader :topic, :internal
|
||||
|
||||
|
||||
# Instantiate Consumer
|
||||
#
|
||||
# @params [String] kafka topic
|
||||
#
|
||||
# @params [String] group ID
|
||||
#
|
||||
# @params [String] comma separated zookeeper list
|
||||
#
|
||||
# @params [Hash] options for Consumer
|
||||
# @option opts [String] :brokers (for MRI) Comma separated list of brokers
|
||||
# @option opts [Integer] :partition (for MRI) The kafka partition
|
||||
def initialize(topic, groupId, zookeepers, opts={})
|
||||
# @option opts [String] :brokers (for MRI) Comma separated list of brokers
|
||||
# @option opts [Integer] :partition (for MRI) The kafka partition
|
||||
# @option opts [Integer] :zookeepers (for jruby) list of zookeeper servers
|
||||
# @option opts [Integer] :group_id (for jruby) client group_id
|
||||
#
|
||||
def initialize(topic, opts = {})
|
||||
@topic = topic
|
||||
@brokers = brokers
|
||||
@partition = partition
|
||||
|
||||
if Hermann.jruby?
|
||||
@internal = Hermann::Provider::JavaSimpleConsumer.new(zookeepers, groupId, topic, opts)
|
||||
zookeepers, group_id = require_values_at(opts, :zookeepers, :group_id)
|
||||
|
||||
@internal = Hermann::Provider::JavaSimpleConsumer.new(zookeepers, group_id, topic, opts)
|
||||
else
|
||||
brokers = opts.delete(:brokers)
|
||||
partition = opts.delete(:partition)
|
||||
brokers, partition = require_values_at(opts, :brokers, :partition)
|
||||
|
||||
@internal = Hermann::Lib::Consumer.new(topic, brokers, partition)
|
||||
end
|
||||
end
|
||||
|
@ -51,5 +48,12 @@ module Hermann
|
|||
#no op
|
||||
end
|
||||
end
|
||||
|
||||
def require_values_at(opts, *args)
|
||||
args.map do |a|
|
||||
raise "Please provide :#{a} option!" unless opts[a]
|
||||
opts[a]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2,7 +2,7 @@ require 'rubygems'
|
|||
require 'lib/hermann'
|
||||
require 'lib/hermann/consumer'
|
||||
|
||||
c = Hermann::Consumer.new( "lms_messages", "localhost:9092", 0 )
|
||||
c = Hermann::Consumer.new( "lms_messages", :zookeepers => "localhost:2181", :group_id => "lms_message_consumer" )
|
||||
t1 = 0
|
||||
c.consume() do
|
||||
|msg| puts("Received: #{msg}")
|
||||
|
|
|
@ -4,7 +4,7 @@ require 'hermann/consumer'
|
|||
# XXX: Hermann::Consumer isn't really supported anywhere, MRI included right
|
||||
# now
|
||||
describe Hermann::Consumer do
|
||||
subject(:consumer) { described_class.new(topic, nil, nil, opts) }
|
||||
subject(:consumer) { described_class.new(topic, opts) }
|
||||
|
||||
let(:topic) { 'rspec' }
|
||||
let(:brokers) { 'localhost:1337' }
|
||||
|
@ -46,7 +46,7 @@ describe Hermann::Consumer do
|
|||
end
|
||||
|
||||
context 'on Jruby', :platform => :java do
|
||||
subject(:consumer) { described_class.new(topic, groupId, zookeepers) }
|
||||
subject(:consumer) { described_class.new(topic, group_id: groupId, zookeepers: zookeepers) }
|
||||
|
||||
let(:zookeepers) { 'localhost:2181' }
|
||||
let(:groupId) { 'groupId' }
|
||||
|
|
Loading…
Reference in New Issue