Merge pull request #61 from rtyler/issues/58-java-exception

Wrap calls to the underlying Java producer with Ruby-based exceptions
This commit is contained in:
R. Tyler Croy 2014-10-24 12:32:06 -07:00
commit 0cc02f40b7
4 changed files with 33 additions and 6 deletions

View File

@ -2,7 +2,20 @@
module Hermann
module Errors
# Error for connectivity problems with the Kafka brokers
class ConnectivityError; end;
class ConnectivityError < StandardError
attr_reader :java_exception
# Initialize a connectivity error
#
# @param [String] message Exception's message
# @param [Hash[ options
# @option options [Java::Lang::RuntimeException] :java_exception An
# underlying Java exception
def initialize(message, options={})
super(message)
@java_exception = options[:java_exception]
end
end
# For passing incorrect config and options to kafka
class ConfigurationError < StandardError; end

View File

@ -17,4 +17,4 @@ module Hermann
module KafkaUtil
include_package "kafka.util"
end
end
end

View File

@ -47,7 +47,12 @@ module Hermann
def push_single(msg, topic, unused)
Concurrent::Promise.execute {
data = ProducerUtil::KeyedMessage.new(topic, msg)
@producer.send(data)
begin
@producer.send(data)
rescue Java::KafkaCommon::FailedToSendMessageException => jexc
raise Hermann::Errors::ConnectivityError.new(jexc.message,
:java_exception => jexc)
end
}
end

View File

@ -13,9 +13,6 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
subject(:result) { producer.push_single('foo', topic, nil) }
let(:passed_topic) { 'foo' }
before do
allow_any_instance_of(described_class).to receive(:broker_list) { brokers }
end
it 'returns an executing Promise' do
expect(result.wait(1).pending?).to eq false
@ -51,6 +48,18 @@ describe Hermann::Provider::JavaProducer, :platform => :java do
let(:topic) { '' }
it_behaves_like 'an error condition'
end
context 'when the broker is down' do
before :each do
expect(producer.producer).to receive(:send).and_raise(::Java::KafkaCommon::FailedToSendMessageException.new('rspec', nil))
end
it 'should raise a ConnectivityError' do
expect {
result.value!
}.to raise_error(Hermann::Errors::ConnectivityError)
end
end
end
end