Removed some extraneous logging. A little code cleanup.

This commit is contained in:
Stan Campbell 2014-06-06 12:25:03 -07:00
parent 6f0779d1b7
commit 874f02e673
3 changed files with 44 additions and 37 deletions

Binary file not shown.

View File

@ -144,6 +144,10 @@ void consumer_init_kafka(HermannInstanceConfig* config) {
fprintf(stderr, "%% Failed to create new consumer: %s\n", config->errstr);
exit(1);
}
/* Set logger */
rd_kafka_set_logger(config->rk, logger);
rd_kafka_set_log_level(config->rk, LOG_DEBUG);
/* TODO: offset calculation */
config->start_offset = RD_KAFKA_OFFSET_END;
@ -243,8 +247,7 @@ void producer_init_kafka(HermannInstanceConfig* config) {
config->isInitialized = 1;
}
/* Hermann::Producer.push(msg) */
static VALUE producer_push(VALUE self, VALUE message) {
static VALUE producer_push_single(VALUE self, VALUE message) {
HermannInstanceConfig* producerConfig;
char buf[2048];
@ -253,7 +256,7 @@ static VALUE producer_push(VALUE self, VALUE message) {
if(producerConfig->topic==NULL) {
fprintf(stderr, "Topic is null!");
return;
return self;
}
if(!producerConfig->isInitialized) {
@ -288,36 +291,46 @@ static VALUE producer_push(VALUE self, VALUE message) {
/* Must poll to handle delivery reports */
rd_kafka_poll(producerConfig->rk, 0);
return self;
}
/** Hermann::Producer.close */
static VALUE producer_close(VALUE self) {
static VALUE producer_push_array(VALUE self, int length, VALUE array) {
HermannInstanceConfig* producerConfig;
int i;
VALUE message;
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
for(i=0;i<length;i++) {
message = RARRAY_PTR(array)[i];
producer_push_single(self, message);
}
/* Destroy the handle */
rd_kafka_destroy(producerConfig->rk);
/* Let background threads clean up and terminate cleanly. */
rd_kafka_wait_destroyed(2000);
return self;
}
/* Hermann::Producer.batch { block } */
static VALUE producer_batch(VALUE c) {
/* todo: not implemented */
/* Hermann::Producer.push(msg) */
static VALUE producer_push(VALUE self, VALUE message) {
VALUE arrayP = rb_check_array_type(message);
if(!NIL_P(arrayP)) {
return producer_push_array(self, RARRAY_LEN(arrayP), message);
} else {
return producer_push_single(self, message);
}
}
static void consumer_free(void * p) {
HermannInstanceConfig* config = (HermannInstanceConfig *)p;
// the p *should* contain a pointer to the consumerConfig which also must be freed
// rd_kafka_topic_destroy(consumerConfig->rkt);
rd_kafka_topic_destroy(config->rkt);
// rd_kafka_destroy(consumerConfig->rk);
rd_kafka_destroy(config->rk);
/* todo: may or may not be necessary depending on how underlying threads are handled */
/* Let background threads clean up and terminate cleanly. */
// rd_kafka_wait_destroyed(2000);
// clean up the struct
free(config);
}
static VALUE consumer_allocate(VALUE klass) {
@ -327,8 +340,6 @@ static VALUE consumer_allocate(VALUE klass) {
HermannInstanceConfig* consumerConfig = ALLOC(HermannInstanceConfig);
obj = Data_Wrap_Struct(klass, 0, consumer_free, consumerConfig);
log_debug("Consumer allocated");
return obj;
}
@ -348,8 +359,6 @@ static VALUE consumer_initialize(VALUE self, VALUE topic) {
consumerConfig->exit_eof = 0;
consumerConfig->quiet = 0;
log_debug("Consumer initialized");
return self;
}
@ -375,10 +384,17 @@ static VALUE consumer_init_copy(VALUE copy, VALUE orig) {
}
static void producer_free(void * p) {
/* todo: not implemented */
/* Destroy the handle */
// Handle is in the config, the config is probably in that p pointer
// rd_kafka_destroy(producerConfig->rk);
HermannInstanceConfig* config = (HermannInstanceConfig *)p;
// Clean up the topic
rd_kafka_topic_destroy(config->rkt);
// Take care of the producer instance
rd_kafka_destroy(config->rk);
// Free the struct
free(config);
}
static VALUE producer_allocate(VALUE klass) {
@ -388,8 +404,6 @@ static VALUE producer_allocate(VALUE klass) {
HermannInstanceConfig* producerConfig = ALLOC(HermannInstanceConfig);
obj = Data_Wrap_Struct(klass, 0, producer_free, producerConfig);
log_debug("Producer allocated");
return obj;
}
@ -409,8 +423,6 @@ static VALUE producer_initialize(VALUE self, VALUE topic) {
producerConfig->exit_eof = 0;
producerConfig->quiet = 0;
log_debug("Producer initialized");
return self;
}
@ -466,9 +478,4 @@ void Init_hermann_lib() {
/* Producer.push(msg) */
rb_define_method( c_producer, "push", producer_push, 1 );
/* Producer.batch(array) */
rb_define_method( c_producer, "batch", producer_batch, 1 );
/* Producer.close() */
rb_define_method( c_producer, "close", producer_close, 0 );
}

Binary file not shown.