Merge pull request #47 from lookout/topic_push

Allow java producer push to change topics
This commit is contained in:
R. Tyler Croy 2014-10-13 15:48:48 -07:00
commit 4bb3d12f5d
4 changed files with 39 additions and 11 deletions

View File

@ -16,7 +16,7 @@ module Hermann
@topic = topic
@brokers = brokers
if RUBY_PLATFORM == "java"
@internal = Hermann::Provider::JavaProducer.new(topic, brokers)
@internal = Hermann::Provider::JavaProducer.new(brokers)
else
@internal = Hermann::Lib::Producer.new(topic, brokers)
end
@ -45,16 +45,21 @@ module Hermann
# @param [Array] value An array of values to push, will push each one
# separately
# @param [Object] value A single object to push
#
# @param [Hash] opts to pass to push method
# @params opts [String] :topic The topic to push messages to
#
# @return [Hermann::Result] A future-like object which will store the
# result from the broker
def push(value)
def push(value, opts={})
topic = opts[:topic] || @topic
result = create_result
if value.kind_of? Array
return value.map { |e| self.push(e) }
else
if RUBY_PLATFORM == "java"
result = @internal.push_single(value)
result = @internal.push_single(value, topic)
@children << result
else
@internal.push_single(value, result)

View File

@ -10,10 +10,9 @@ module Hermann
# will raise the exception and the rejected flag will be set to true
#
class JavaProducer
attr_accessor :topic, :producer
attr_accessor :producer
def initialize(topic, brokers)
@topic = topic
def initialize(brokers)
properties = create_properties(:brokers => brokers)
config = create_config(properties)
@producer = JavaApiUtil::Producer.new(config)
@ -28,13 +27,14 @@ module Hermann
# Push a value onto the Kafka topic passed to this +Producer+
#
# @param [Object] value A single object to push
# @param [String] topic to push message to
#
# @return +Concurrent::Promise+ Representa a promise to send the
# data to the kafka broker. Upon execution the Promise's status
# will be set
def push_single(msg)
def push_single(msg, topic)
Concurrent::Promise.execute {
data = ProducerUtil::KeyedMessage.new(@topic, msg)
data = ProducerUtil::KeyedMessage.new(topic, msg)
@producer.send(data)
}
end

View File

@ -19,6 +19,23 @@ describe Hermann::Producer do
expect(producer.children).to_not be_empty
end
end
describe '#push' do
let(:msg) { 'foo' }
let(:passed_topic) { 'bar' }
context 'without topic passed' do
it 'uses initialized topic' do
expect_any_instance_of(Hermann::Provider::JavaProducer).to receive(:push_single).with(msg, topic)
producer.push(msg)
end
end
context 'with topic passed' do
it 'can change topic' do
expect_any_instance_of(Hermann::Provider::JavaProducer).to receive(:push_single).with(msg, passed_topic)
producer.push(msg, :topic => passed_topic)
end
end
end
end
context "not java", :platform => :mri do

View File

@ -2,7 +2,7 @@ require 'spec_helper'
require 'hermann/provider/java_producer'
describe Hermann::Provider::JavaProducer, :platform => :java do
subject(:producer) { described_class.new(topic, zookeepers) }
subject(:producer) { described_class.new(zookeepers) }
let(:topic) { 'rspec' }
let(:zookeepers) { 'localhost:2181' }
@ -10,8 +10,9 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
describe '#push_single' do
subject(:result) { producer.push_single('foo') }
subject(:result) { producer.push_single('foo', topic) }
let(:passed_topic) { 'foo' }
before do
allow_any_instance_of(described_class).to receive(:broker_list) { brokers }
end
@ -20,10 +21,15 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
expect(result.wait(1).pending?).to eq false
end
it 'can change topic' do
expect(Hermann::ProducerUtil::KeyedMessage).to receive(:new).with(passed_topic, 'bar')
producer.push_single('bar', passed_topic).wait(1)
end
context 'error conditions' do
shared_examples 'an error condition' do
it 'should be rejected' do
promise = producer.push_single('rspec').wait(1)
promise = producer.push_single('rspec', topic).wait(1)
expect(promise).to be_rejected
expect { promise.value! }.to raise_error
end