cleanup warnings

- fix partition selection function
- get ruby >= 2 calling the correct no-gvl function
This commit is contained in:
Ben Osheroff 2015-06-13 19:47:07 -07:00
parent a362ce153e
commit 5c898144f2
1 changed files with 11 additions and 8 deletions

View File

@ -32,7 +32,7 @@
/* Much of the librdkafka library calls were lifted from rdkafka_example.c */
#include "hermann_lib.h"
#include <ruby/version.h>
/* how long to let librdkafka block on the socket before returning back to the interpreter.
* essentially defines how long we wait before consumer_consume_stop_callback() can fire */
@ -120,7 +120,7 @@ static void msg_delivered(rd_kafka_t *rk,
/* call back into our Hermann::Result if it exists, discarding the
* return value
*/
if (NULL != push_ctx->result) {
if (NULL != (void *)push_ctx->result) {
rb_funcall(push_ctx->result,
hermann_result_fulfill_method,
2,
@ -153,12 +153,15 @@ static int32_t producer_partitioner_callback(const rd_kafka_topic_t *rkt,
void *msg_opaque) {
/* Pick a random partition */
int retry = 0;
int32_t partition = RD_KAFKA_PARTITION_UA;
for (; retry < partition_cnt; retry++) {
int32_t partition = rand() % partition_cnt;
partition = rand() % partition_cnt;
if (rd_kafka_topic_partition_available(rkt, partition)) {
break; /* this one will do */
}
}
return partition;
}
/**
@ -396,7 +399,7 @@ static void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
TRACER("\n");
while (consumerConfig->run) {
#ifdef HAVE_RB_THREAD_BLOCKING_REGION
#if HAVE_RB_THREAD_BLOCKING_REGION && RUBY_API_VERSION_MAJOR < 2
msg = (rd_kafka_message_t *) rb_thread_blocking_region((rb_blocking_function_t *) consumer_recv_msg,
consumerConfig,
consumer_consume_stop_callback,
@ -446,7 +449,7 @@ static VALUE consumer_consume(VALUE self, VALUE topic) {
if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1) {
fprintf(stderr, "%% Failed to start consuming: %s\n",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
rb_raise(rb_eRuntimeError,
rb_raise(rb_eRuntimeError, "%s",
rd_kafka_err2str(rd_kafka_errno2err(errno)));
return Qnil;
}
@ -575,7 +578,7 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
delivery_ctx->producer = producerConfig;
delivery_ctx->result = NULL;
delivery_ctx->result = (VALUE) NULL;
TRACER("producerConfig: %p\n", producerConfig);
@ -666,7 +669,7 @@ static VALUE producer_tick(VALUE self, VALUE timeout) {
events = rd_kafka_poll(conf->rk, timeout_ms);
if (conf->isErrored) {
rb_raise(rb_eStandardError, conf->error);
rb_raise(rb_eStandardError, "%s", conf->error);
}
return rb_int_new(events);
@ -689,7 +692,7 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
err = rd_kafka_metadata(producerConfig->rk,
0,
producerConfig->rkt,
&data,
(const struct rd_kafka_metadata **)&data,
timeout_ms);
TRACER("err: %s (%i)\n", rd_kafka_err2str(err), err);