Merge pull request #33 from rtyler/issues/25-producer-errors

Propagate systemic errors from librdkafka up into Herman::Result objects
This commit is contained in:
Stan Campbell 2014-09-10 12:48:07 -07:00
commit 838904a035
6 changed files with 102 additions and 22 deletions

View File

@ -447,6 +447,20 @@ static void producer_error_callback(rd_kafka_t *rk,
TRACER("error (%i): %s\n", error, reason);
conf->isErrored = error;
if (error) {
/* If we have an old error string in here we need to make sure to
* free() it before we allocate a new string
*/
if (NULL != conf->error) {
free(conf->error);
}
/* Grab the length of the string plus the null character */
size_t error_length = strnlen(reason, HERMANN_MAX_ERRSTR_LEN) + 1;
conf->error = (char *)malloc((sizeof(char) * error_length));
(void)strncpy(conf->error, reason, error_length);
}
}
@ -589,7 +603,7 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE result) {
* @param message VALUE A Ruby FixNum of how many ms we should wait on librdkafka
*/
static VALUE producer_tick(VALUE self, VALUE timeout) {
HermannInstanceConfig *producerConfig;
hermann_conf_t *conf = NULL;
long timeout_ms = 0;
int events = 0;
@ -600,17 +614,21 @@ static VALUE producer_tick(VALUE self, VALUE timeout) {
rb_raise(rb_eArgError, "Cannot call `tick` with a nil timeout!\n");
}
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
Data_Get_Struct(self, hermann_conf_t, conf);
/*
* if the producerConfig is not initialized then we never properly called
* producer_push_single, so why are we ticking?
*/
if (!producerConfig->isInitialized) {
if (!conf->isInitialized) {
rb_raise(rb_eRuntimeError, "Cannot call `tick` without having ever sent a message\n");
}
events = rd_kafka_poll(producerConfig->rk, timeout_ms);
events = rd_kafka_poll(conf->rk, timeout_ms);
if (conf->isErrored) {
rb_raise(rb_eStandardError, conf->error);
}
return rb_int_new(events);
}
@ -880,6 +898,7 @@ static VALUE producer_allocate(VALUE klass) {
producerConfig->isInitialized = 0;
producerConfig->isConnected = 0;
producerConfig->isErrored = 0;
producerConfig->error = NULL;
obj = Data_Wrap_Struct(klass, 0, producer_free, producerConfig);

View File

@ -56,6 +56,8 @@
// Holds the defined Ruby module for Hermann
static VALUE hermann_module;
#define HERMANN_MAX_ERRSTR_LEN 512
static int DEBUG = 0;
// Should we expect rb_thread_blocking_region to be present?
@ -88,7 +90,9 @@ typedef struct HermannInstanceConfig {
int isInitialized;
int isConnected;
int isErrored;
char *error;
} HermannInstanceConfig;
typedef HermannInstanceConfig hermann_conf_t;

View File

@ -1,6 +1,5 @@
require 'hermann'
require 'hermann/result'
require 'hermann/timeout'
require 'hermann_lib'
module Hermann
@ -65,21 +64,20 @@ module Hermann
# @param [FixNum] timeout Seconds to block on the internal reactor
# @return [FixNum] Number of +Hermann::Result+ children reaped
def tick_reactor(timeout=0)
timeout = rounded_timeout(timeout)
if timeout == 0
@internal.tick(0)
else
(timeout * 2).times do
# We're going to Thread#sleep in Ruby to avoid a
# pthread_cond_timedwait(3) inside of librdkafka
events = @internal.tick(0)
# If we find events, break out early
break if events > 0
sleep 0.5
begin
execute_tick(rounded_timeout(timeout))
rescue StandardError => ex
@children.each do |child|
# Skip over any children that should already be reaped for other
# reasons
next if child.reap?
# Propagate errors to the remaining children
child.internal_set_error(ex)
end
end
# Reaping the children at this point will also reap any children marked
# as errored by an exception out of #execute_tick
return reap_children
end
@ -88,7 +86,8 @@ module Hermann
# Filter all children who are no longer pending/fulfilled
total_children = @children.size
@children = @children.reject { |c| c.reap? }
reaped = total_children - children.size
return (total_children - children.size)
end
@ -102,5 +101,22 @@ module Hermann
return timeout.round if timeout.kind_of?(Float)
return timeout
end
# Perform the actual reactor tick
# @raises [StandardError[ in case of underlying failures in librdkafka
def execute_tick(timeout)
if timeout == 0
@internal.tick(0)
else
(timeout * 2).times do
# We're going to Thread#sleep in Ruby to avoid a
# pthread_cond_timedwait(3) inside of librdkafka
events = @internal.tick(0)
# If we find events, break out early
break if events > 0
sleep 0.5
end
end
end
end
end

View File

@ -57,5 +57,18 @@ module Hermann
@state = :fulfilled
end
end
# INTERNAL METHOD ONLY. Do not use
#
# This method will set our internal #reason with the details from the
# exception
#
# @param [Exception] exception
def internal_set_error(exception)
return if exception.nil?
@reason = exception
@state = :rejected
end
end
end

View File

@ -39,7 +39,11 @@ describe Hermann::Lib::Producer do
it 'should error after attempting to connect' do |example|
producer.push_single(example.full_description, nil)
producer.tick(timeout)
begin
producer.tick(timeout)
rescue StandardError => ex
# swallow exceptions, since we're just testing #errored?
end
expect(producer).to be_errored
end
end
@ -54,13 +58,24 @@ describe Hermann::Lib::Producer do
end
describe '#push_single', :type => :integration do
subject(:push) { |example| producer.push_single(example.full_description, nil) }
let(:message) { |example| example.full_description }
subject(:push) { producer.push_single(message, nil) }
it 'should return' do
expect(push).not_to be_nil
producer.tick(timeout)
expect(producer).to be_connected
end
context 'with binary data' do
let(:message) { "\n+AuOzetQrTrdwSY14ig7I_1oUwjp3DvTx3YWhSTGD4Fo\022\0312014-09-10T00:18:47-07:00\032,\n\006scream\022\016missing_device\032\022app0\"\t\n\astarted*(\b\000\022$009f0305-b50a-455d-b137-e52b45f674aa*(\b\001\022$53c0d817-d94b-4b7a-9a58-95fe8cec4333" }
it 'should return' do
expect(push).not_to be_nil
producer.tick(timeout)
expect(producer).to be_connected
end
end
end
describe '#tick' do

View File

@ -29,6 +29,8 @@ describe Hermann::Producer do
end
describe '#push' do
subject(:result) { producer.push(value) }
context 'error conditions' do
shared_examples 'an error condition' do
it 'should raise an exception' do
@ -41,14 +43,25 @@ describe Hermann::Producer do
it_behaves_like 'an error condition'
end
context 'with a non-existing broker' do
let(:brokers) { 'localhost:13337' }
let(:timeout) { 2 }
let(:value) { 'rspec' }
it 'should reject' do
future = result
expect(future).not_to be_nil
producer.tick_reactor(timeout)
expect(future).to be_rejected
end
end
context 'with a bad topic' do
let(:topic) { '' }
it_behaves_like 'an error condition'
end
end
subject(:result) { producer.push(value) }
context 'with a single value' do
let(:value) { 'hello' }