Add support for a partition key for producers.

Fixes lookout/Hermann#88
This commit is contained in:
Lee Butterman 2015-04-02 18:35:10 -07:00
parent 911b527b55
commit f059643efc
4 changed files with 30 additions and 6 deletions

View File

@ -50,6 +50,7 @@ module Hermann
# @param [Object] value A single object to push
# @param [Hash] opts to pass to push method
# @option opts [String] :topic The topic to push messages to
# :partition_key The string to partition by
#
# @return [Hermann::Result] A future-like object which will store the
# result from the broker
@ -62,7 +63,8 @@ module Hermann
end
if Hermann.jruby?
result = @internal.push_single(value, topic, nil)
key = opts.has_key?(:partition_key) ? opts[:partition_key].to_java : nil
result = @internal.push_single(value, topic, key)
unless result.nil?
@children << result
end

View File

@ -44,9 +44,9 @@ module Hermann
# @return +Concurrent::Promise+ Representa a promise to send the
# data to the kafka broker. Upon execution the Promise's status
# will be set
def push_single(msg, topic, unused)
def push_single(msg, topic, key)
Concurrent::Promise.execute {
data = ProducerUtil::KeyedMessage.new(topic, msg.to_java_bytes)
data = ProducerUtil::KeyedMessage.new(topic, nil, key, msg.to_java_bytes)
begin
@producer.send(data)
rescue Java::KafkaCommon::FailedToSendMessageException => jexc

View File

@ -38,12 +38,20 @@ describe Hermann::Producer do
describe '#push' do
let(:msg) { 'foo' }
let(:passed_topic) { 'bar' }
let(:partition_key) { 'syncml_shard_68_master' }
context 'without topic passed' do
it 'uses initialized topic' do
expect(producer.internal).to receive(:push_single).with(msg, topic, anything)
producer.push(msg)
end
end
context 'without topic passed', :platform => :java do
it 'uses initialized topic and does not have a partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, nil)
producer.push(msg)
end
end
context 'with topic passed' do
it 'can change topic' do
expect(producer.internal).to receive(:push_single).with(msg, passed_topic, anything)
@ -59,6 +67,13 @@ describe Hermann::Producer do
end
end
context 'with explicit partition key', :platform => :java do
it 'uses the partition key' do
expect(producer.internal).to receive(:push_single).with(msg, topic, partition_key.to_java)
producer.push(msg, :partition_key => partition_key)
end
end
context 'when reaping children', :platform => :java do
subject { producer.push('f', :topic => passed_topic) }

View File

@ -8,9 +8,11 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
let(:topic) { 'rspec' }
let(:brokers) { '0:1337'}
let(:opts) { {} }
let(:part_key) { "key".to_java }
let(:msg) { "bar" }
describe '#push_single' do
subject(:result) { producer.push_single('foo', topic, nil) }
subject(:result) { producer.push_single(msg, topic, nil) }
let(:passed_topic) { 'foo' }
@ -19,8 +21,13 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
end
it 'can change topic' do
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, anything)
producer.push_single('bar', passed_topic, nil).wait(1)
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, nil, anything)
producer.push_single(msg, passed_topic, nil).wait(1)
end
it 'can change partition key' do
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, nil, part_key, anything)
producer.push_single(msg, passed_topic, part_key).wait(1)
end
context 'error conditions' do