Expose an #errored? method which gets set if the general error callback is invoked

Fixes #25
This commit is contained in:
R. Tyler Croy 2014-09-09 17:36:27 -07:00
parent 288653845a
commit 218c035793
4 changed files with 49 additions and 3 deletions

View File

@ -442,7 +442,11 @@ static void producer_error_callback(rd_kafka_t *rk,
int error,
const char *reason,
void *opaque) {
hermann_conf_t *conf = (hermann_conf_t *)rd_kafka_opaque(rk);
TRACER("error (%i): %s\n", error, reason);
conf->isErrored = error;
}
@ -466,7 +470,7 @@ void producer_init_kafka(VALUE self, HermannInstanceConfig* config) {
/* Add our `self` to the opaque pointer for error and logging callbacks
*/
rd_kafka_conf_set_opaque(config->conf, (void*)self);
rd_kafka_conf_set_opaque(config->conf, (void*)config);
rd_kafka_conf_set_error_cb(config->conf, producer_error_callback);
/* Topic configuration */
@ -639,6 +643,9 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
producerConfig->isConnected = 1;
result = Qtrue;
}
else {
producerConfig->isErrored = err;
}
rd_kafka_metadata_destroy(data);
@ -661,6 +668,18 @@ static VALUE producer_is_connected(VALUE self) {
return Qtrue;
}
static VALUE producer_is_errored(VALUE self) {
HermannInstanceConfig *producerConfig;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
if (producerConfig->isErrored) {
return Qtrue;
}
return Qfalse;
}
/**
* consumer_free
@ -842,7 +861,7 @@ static void producer_free(void *p) {
static VALUE producer_allocate(VALUE klass) {
VALUE obj;
HermannInstanceConfig* producerConfig =ALLOC(HermannInstanceConfig);
HermannInstanceConfig* producerConfig = ALLOC(HermannInstanceConfig);
producerConfig->topic = NULL;
producerConfig->rk = NULL;
@ -860,6 +879,7 @@ static VALUE producer_allocate(VALUE klass) {
producerConfig->quiet = 0;
producerConfig->isInitialized = 0;
producerConfig->isConnected = 0;
producerConfig->isErrored = 0;
obj = Data_Wrap_Struct(klass, 0, producer_free, producerConfig);
@ -982,6 +1002,9 @@ void Init_hermann_lib() {
/* Producer.connected? */
rb_define_method(c_producer, "connected?", producer_is_connected, 0);
/* Producer.errored? */
rb_define_method(c_producer, "errored?", producer_is_errored, 0);
/* Producer.connect */
rb_define_method(c_producer, "connect", producer_connect, 1);
}

View File

@ -88,7 +88,7 @@ typedef struct HermannInstanceConfig {
int isInitialized;
int isConnected;
int isErrored;
} HermannInstanceConfig;
typedef HermannInstanceConfig hermann_conf_t;

View File

@ -22,6 +22,11 @@ module Hermann
return @internal.connected?
end
# @return [Boolean] True if the underlying producer object has errored
def errored?
return @internal.errored?
end
def connect(timeout=0)
return @internal.connect(timeout * 1000)
end

View File

@ -27,6 +27,24 @@ describe Hermann::Lib::Producer do
end
end
describe '#errored?' do
subject { producer.errored? }
context 'by default' do
it { should be false }
end
context 'with an non-existing broker' do
let(:brokers) { 'localhost:13337' }
it 'should error after attempting to connect' do |example|
producer.push_single(example.full_description, nil)
producer.tick(timeout)
expect(producer).to be_errored
end
end
end
describe '#connected?' do
subject { producer.connected? }