mirror of https://github.com/reiseburo/hermann
commit
c5707f5515
|
@ -147,6 +147,7 @@ $LOCAL_LIBS << File.join(librdkafka.path, 'lib', 'librdkafka.a')
|
|||
|
||||
have_header('ruby/thread.h')
|
||||
have_header('ruby/intern.h')
|
||||
have_header('ruby/version.h')
|
||||
have_func('rb_thread_blocking_region')
|
||||
have_func('rb_thread_call_without_gvl')
|
||||
|
||||
|
|
|
@ -33,6 +33,9 @@
|
|||
|
||||
#include "hermann_lib.h"
|
||||
|
||||
#ifdef HAVE_RUBY_VERSION_H
|
||||
#include <ruby/version.h>
|
||||
#endif
|
||||
|
||||
/* how long to let librdkafka block on the socket before returning back to the interpreter.
|
||||
* essentially defines how long we wait before consumer_consume_stop_callback() can fire */
|
||||
|
@ -120,7 +123,7 @@ static void msg_delivered(rd_kafka_t *rk,
|
|||
/* call back into our Hermann::Result if it exists, discarding the
|
||||
* return value
|
||||
*/
|
||||
if (NULL != push_ctx->result) {
|
||||
if (NULL != (void *)push_ctx->result) {
|
||||
rb_funcall(push_ctx->result,
|
||||
hermann_result_fulfill_method,
|
||||
2,
|
||||
|
@ -153,12 +156,15 @@ static int32_t producer_partitioner_callback(const rd_kafka_topic_t *rkt,
|
|||
void *msg_opaque) {
|
||||
/* Pick a random partition */
|
||||
int retry = 0;
|
||||
int32_t partition = RD_KAFKA_PARTITION_UA;
|
||||
|
||||
for (; retry < partition_cnt; retry++) {
|
||||
int32_t partition = rand() % partition_cnt;
|
||||
partition = rand() % partition_cnt;
|
||||
if (rd_kafka_topic_partition_available(rkt, partition)) {
|
||||
break; /* this one will do */
|
||||
}
|
||||
}
|
||||
return partition;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -401,7 +407,7 @@ static VALUE consumer_consume_loop(VALUE self) {
|
|||
TRACER("\n");
|
||||
|
||||
while (consumerConfig->run) {
|
||||
#ifdef HAVE_RB_THREAD_BLOCKING_REGION
|
||||
#if HAVE_RB_THREAD_BLOCKING_REGION && RUBY_API_VERSION_MAJOR < 2
|
||||
msg = (rd_kafka_message_t *) rb_thread_blocking_region((rb_blocking_function_t *) consumer_recv_msg,
|
||||
consumerConfig,
|
||||
consumer_consume_stop_callback,
|
||||
|
@ -434,6 +440,7 @@ static VALUE consumer_consume_loop_stop(VALUE self) {
|
|||
Data_Get_Struct(self, HermannInstanceConfig, consumerConfig);
|
||||
|
||||
rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition);
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -465,7 +472,7 @@ static VALUE consumer_consume(VALUE self, VALUE topic) {
|
|||
if (rd_kafka_consume_start(consumerConfig->rkt, consumerConfig->partition, consumerConfig->start_offset) == -1) {
|
||||
fprintf(stderr, "%% Failed to start consuming: %s\n",
|
||||
rd_kafka_err2str(rd_kafka_errno2err(errno)));
|
||||
rb_raise(rb_eRuntimeError,
|
||||
rb_raise(rb_eRuntimeError, "%s",
|
||||
rd_kafka_err2str(rd_kafka_errno2err(errno)));
|
||||
return Qnil;
|
||||
}
|
||||
|
@ -589,7 +596,7 @@ static VALUE producer_push_single(VALUE self, VALUE message, VALUE topic, VALUE
|
|||
Data_Get_Struct(self, HermannInstanceConfig, producerConfig);
|
||||
|
||||
delivery_ctx->producer = producerConfig;
|
||||
delivery_ctx->result = NULL;
|
||||
delivery_ctx->result = (VALUE) NULL;
|
||||
|
||||
TRACER("producerConfig: %p\n", producerConfig);
|
||||
|
||||
|
@ -680,7 +687,7 @@ static VALUE producer_tick(VALUE self, VALUE timeout) {
|
|||
events = rd_kafka_poll(conf->rk, timeout_ms);
|
||||
|
||||
if (conf->isErrored) {
|
||||
rb_raise(rb_eStandardError, conf->error);
|
||||
rb_raise(rb_eStandardError, "%s", conf->error);
|
||||
}
|
||||
|
||||
return rb_int_new(events);
|
||||
|
@ -704,7 +711,7 @@ static void *producer_metadata_request_nogvl(void *ptr)
|
|||
return (void *) rd_kafka_metadata(ctx->rk,
|
||||
ctx->topic ? 0 : 1,
|
||||
ctx->topic,
|
||||
&(ctx->data),
|
||||
(const struct rd_kafka_metadata **) &(ctx->data),
|
||||
ctx->timeout_ms);
|
||||
}
|
||||
|
||||
|
@ -713,7 +720,7 @@ static int producer_metadata_request(hermann_metadata_ctx_t *ctx)
|
|||
{
|
||||
int err;
|
||||
|
||||
#ifdef HAVE_RB_THREAD_BLOCKING_REGION
|
||||
#if HAVE_RB_THREAD_BLOCKING_REGION && RUBY_API_VERSION_MAJOR < 2
|
||||
err = (int) rb_thread_blocking_region((rb_blocking_function_t *) producer_metadata_request_nogvl, ctx,
|
||||
NULL, NULL);
|
||||
#elif HAVE_RB_THREAD_CALL_WITHOUT_GVL
|
||||
|
|
Loading…
Reference in New Issue