diff --git a/ext/hermann/extconf.rb b/ext/hermann/extconf.rb index 919684f..9387e76 100644 --- a/ext/hermann/extconf.rb +++ b/ext/hermann/extconf.rb @@ -147,6 +147,7 @@ $LOCAL_LIBS << File.join(librdkafka.path, 'lib', 'librdkafka.a') have_header('ruby/thread.h') have_header('ruby/intern.h') +have_header('ruby/version.h') have_func('rb_thread_blocking_region') have_func('rb_thread_call_without_gvl') diff --git a/ext/hermann/hermann_lib.c b/ext/hermann/hermann_lib.c index e2f04f7..446f2fc 100644 --- a/ext/hermann/hermann_lib.c +++ b/ext/hermann/hermann_lib.c @@ -33,6 +33,9 @@ #include "hermann_lib.h" +#ifdef HAVE_RUBY_VERSION_H +#include +#endif /* how long to let librdkafka block on the socket before returning back to the interpreter. * essentially defines how long we wait before consumer_consume_stop_callback() can fire */ @@ -120,7 +123,7 @@ static void msg_delivered(rd_kafka_t *rk, /* call back into our Hermann::Result if it exists, discarding the * return value */ - if (NULL != push_ctx->result) { + if (NULL != (void *)push_ctx->result) { rb_funcall(push_ctx->result, hermann_result_fulfill_method, 2, @@ -153,12 +156,15 @@ static int32_t producer_partitioner_callback(const rd_kafka_topic_t *rkt, void *msg_opaque) { /* Pick a random partition */ int retry = 0; + int32_t partition = RD_KAFKA_PARTITION_UA; + for (; retry < partition_cnt; retry++) { - int32_t partition = rand() % partition_cnt; + partition = rand() % partition_cnt; if (rd_kafka_topic_partition_available(rkt, partition)) { break; /* this one will do */ } } + return partition; } /** @@ -401,7 +407,7 @@ static VALUE consumer_consume_loop(VALUE self) { TRACER("\n"); while (consumerConfig->run) { -#ifdef HAVE_RB_THREAD_BLOCKING_REGION +#if HAVE_RB_THREAD_BLOCKING_REGION && RUBY_API_VERSION_MAJOR < 2 msg = (rd_kafka_message_t *) rb_thread_blocking_region((rb_blocking_function_t *) consumer_recv_msg, consumerConfig, consumer_consume_stop_callback, @@ -434,6 +440,7 @@ static VALUE consumer_consume_loop_stop(VALUE self) { Data_Get_Struct(self, HermannInstanceConfig, consumerConfig); rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition); + return Qnil; } /** @@ -465,7 +472,7 @@ static VALUE consumer_consume(VALUE self, VALUE topic) { if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1) { fprintf(stderr, "%% Failed to start consuming: %s\n", rd_kafka_err2str(rd_kafka_errno2err(errno))); - rb_raise(rb_eRuntimeError, + rb_raise(rb_eRuntimeError, "%s", rd_kafka_err2str(rd_kafka_errno2err(errno))); return Qnil; } @@ -589,7 +596,7 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE Data_Get_Struct(self, HermannInstanceConfig, producerConfig); delivery_ctx->producer = producerConfig; - delivery_ctx->result = NULL; + delivery_ctx->result = (VALUE) NULL; TRACER("producerConfig: %p\n", producerConfig); @@ -680,7 +687,7 @@ static VALUE producer_tick(VALUE self, VALUE timeout) { events = rd_kafka_poll(conf->rk, timeout_ms); if (conf->isErrored) { - rb_raise(rb_eStandardError, conf->error); + rb_raise(rb_eStandardError, "%s", conf->error); } return rb_int_new(events); @@ -704,7 +711,7 @@ static void *producer_metadata_request_nogvl(void *ptr) return (void *) rd_kafka_metadata(ctx->rk, ctx->topic ? 0 : 1, ctx->topic, - &(ctx->data), + (const struct rd_kafka_metadata **) &(ctx->data), ctx->timeout_ms); } @@ -713,7 +720,7 @@ static int producer_metadata_request(hermann_metadata_ctx_t *ctx) { int err; -#ifdef HAVE_RB_THREAD_BLOCKING_REGION +#if HAVE_RB_THREAD_BLOCKING_REGION && RUBY_API_VERSION_MAJOR < 2 err = (int) rb_thread_blocking_region((rb_blocking_function_t *) producer_metadata_request_nogvl, ctx, NULL, NULL); #elif HAVE_RB_THREAD_CALL_WITHOUT_GVL