Merge pull request #92 from zendesk/fix_gvl

use rb_thread_call_without_gvl
This commit is contained in:
R. Tyler Croy 2015-04-28 11:39:31 -07:00
commit e872906465
3 changed files with 74 additions and 41 deletions

View File

@ -146,4 +146,7 @@ dir_config('rdkafka', HEADER_DIRS, LIB_DIRS)
# <http://blog.zachallett.com/howto-ruby-c-extension-with-a-static-library>
$LOCAL_LIBS << File.join(librdkafka.path, 'lib', 'librdkafka.a')
have_header('ruby/thread.h')
have_func('rb_thread_call_without_gvl')
create_makefile('hermann/hermann_lib')

View File

@ -33,6 +33,11 @@
#include "hermann_lib.h"
/* how long to let librdkafka block on the socket before returning back to the interpreter.
* essentially defines how long we wait before consumer_consume_stop_callback() can fire */
#define CONSUMER_RECVMSG_TIMEOUT_MS 100
/**
* Convenience function
*
@ -204,22 +209,16 @@ static void hexdump(FILE *fp,
* @param rkmessage rd_kafka_message_t* the message
* @param opaque void* opaque context
*/
static void msg_consume(rd_kafka_message_t *rkmessage,
void *opaque) {
HermannInstanceConfig* cfg;
cfg = (HermannInstanceConfig*)opaque;
static void msg_consume(rd_kafka_message_t *rkmessage, HermannInstanceConfig *cfg) {
if (rkmessage->err) {
if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
fprintf(stderr,
"%% Consumer reached end of %s [%"PRId32"] "
"message queue at offset %"PRId64"\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset);
if (cfg->exit_eof) {
fprintf(stderr,
"%% Consumer reached end of %s [%"PRId32"] "
"message queue at offset %"PRId64"\n",
rd_kafka_topic_name(rkmessage->rkt),
rkmessage->partition, rkmessage->offset);
cfg->run = 0;
}
@ -341,7 +340,7 @@ void consumer_init_kafka(HermannInstanceConfig* config) {
// Ruby gem extensions
#ifdef RB_THREAD_BLOCKING_REGION
#if defined(RB_THREAD_BLOCKING_REGION) || defined(HAVE_RB_THREAD_CALL_WITHOUT_GVL)
/* NOTE: We only need this method defined if RB_THREAD_BLOCKING_REGION is
* defined, otherwise it's unused
*/
@ -360,21 +359,64 @@ static void consumer_consume_stop_callback(void *ptr) {
#endif
/**
* Loop on a timeout to receive messages from Kafka. When the consumer_consume_stop_callback is invoked by Ruby,
* we'll break out of our loop and return.
* consumer_recv_msg
*
* Consume a single message from the kafka stream. The only function that should be invoked
* without the GVL held.
*
* @param HermannInstanceConfig* The hermann configuration for this consumer
*
*/
void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
static void *consumer_recv_msg(void *ptr)
{
rd_kafka_message_t *ret;
HermannInstanceConfig *consumerConfig = (HermannInstanceConfig *) ptr;
ret = rd_kafka_consume(consumerConfig->rkt, consumerConfig->partition, CONSUMER_RECVMSG_TIMEOUT_MS);
if ( ret == NULL ) {
if ( errno != ETIMEDOUT )
fprintf(stderr, "%% Error: %s\n", rd_kafka_err2str( rd_kafka_errno2err(errno)));
}
return (void *) ret;
}
/**
* consumer_consume_loop
*
* A timeout-interrupted loop in which we drop the GVL and attemptto receive
* messages from Kafka. We'll check every CONSUMER_RECVMSG_TIMEOUT_MS, or
* after every message, to see if the ruby interpreter wants us to exit the
* loop.
*
* @param HermannInstanceConfig* The hermann configuration for this consumer
*/
static void consumer_consume_loop(HermannInstanceConfig* consumerConfig) {
rd_kafka_message_t *msg;
TRACER("\n");
while (consumerConfig->run) {
if (rd_kafka_consume_callback(consumerConfig->rkt, consumerConfig->partition,
1000/*timeout*/,
msg_consume,
consumerConfig) < 0) {
fprintf(stderr, "%% Error: %s\n", rd_kafka_err2str( rd_kafka_errno2err(errno)));
}
#ifdef RB_THREAD_BLOCKING_REGION
msg = rb_thread_blocking_region(consumer_recv_msg,
consumerConfig,
consumer_consume_stop_callback,
consumerConfig);
#elif HAVE_RB_THREAD_CALL_WITHOUT_GVL
msg = rb_thread_call_without_gvl(consumer_recv_msg,
consumerConfig,
consumer_consume_stop_callback,
consumerConfig);
#else
msg = consumer_recv_msg(consumerConfig);
#endif
if ( msg ) {
msg_consume(msg, consumerConfig);
rd_kafka_message_destroy(msg);
}
}
}
@ -412,23 +454,7 @@ static VALUE consumer_consume(VALUE self, VALUE topic) {
return Qnil;
}
#ifdef RB_THREAD_BLOCKING_REGION
/** The consumer will listen for incoming messages in a loop, timing out and checking the consumerConfig->run
* flag every second.
*
* Call rb_thread_blocking_region to release the GVM lock and allow Ruby to amuse itself while we wait on
* IO from Kafka.
*
* If Ruby needs to interrupt the consumer loop, the stop callback will be invoked and the loop should exit.
*/
rb_thread_blocking_region(consumer_consume_loop,
consumerConfig,
consumer_consume_stop_callback,
consumerConfig);
#else
consumer_consume_loop(consumerConfig);
#endif
consumer_consume_loop(consumerConfig);
/* Stop consuming */
rd_kafka_consume_stop(consumerConfig->rkt, consumerConfig->partition);
@ -669,7 +695,7 @@ static VALUE producer_connect(VALUE self, VALUE timeout) {
&data,
timeout_ms);
TRACER("err: %s (%i)\n", rd_kafka_err2str(err), err);
if (RD_KAFKA_RESP_ERR_NO_ERROR == err) {
TRACER("brokers: %i, topics: %i\n",
data->broker_cnt,

View File

@ -32,6 +32,10 @@
#include <ruby.h>
#ifdef HAVE_RUBY_THREAD_H
#include <ruby/thread.h>
#endif
#include <ctype.h>
#include <signal.h>
#include <string.h>