Moving rk (instance pointer for the RdKafka producer, consumer, etc.) into per instance config.

This commit is contained in:
Stan Campbell 2014-06-05 09:11:53 -07:00
parent 66c7cbdbea
commit 841e33ecb9
4 changed files with 15 additions and 15 deletions

Binary file not shown.

View File

@ -116,7 +116,7 @@ static void msg_consume (rd_kafka_message_t *rkmessage,
}
}
static void sig_usr1 (int sig) {
static void sig_usr1 (int sig, rd_kafka_t *rk) {
rd_kafka_dump(stdout, rk);
}
@ -175,7 +175,7 @@ void actAsConsumer(VALUE self) {
fprintf(stderr, "Create a Kafka handle\n");
/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_CONSUMER, consumerConfig->conf,
if (!(consumerConfig->rk = rd_kafka_new(RD_KAFKA_CONSUMER, consumerConfig->conf,
consumerConfig->errstr, sizeof(consumerConfig->errstr)))) {
fprintf(stderr, "%% Failed to create new consumer: %s\n", consumerConfig->errstr);
exit(1);
@ -189,14 +189,14 @@ void actAsConsumer(VALUE self) {
/* Add brokers */
fprintf(stderr, "About to add brokers..");
if (rd_kafka_brokers_add(rk, consumerConfig->brokers) == 0) {
if (rd_kafka_brokers_add(consumerConfig->rk, consumerConfig->brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
exit(1);
}
fprintf(stderr, "Brokers added\n");
/* Create topic */
consumerConfig->rkt = rd_kafka_topic_new(rk, consumerConfig->topic, consumerConfig->topic_conf);
consumerConfig->rkt = rd_kafka_topic_new(consumerConfig->rk, consumerConfig->topic, consumerConfig->topic_conf);
fprintf(stderr, "Topic created\n");
/* Start consuming */
@ -232,7 +232,7 @@ void actAsConsumer(VALUE self) {
rd_kafka_topic_destroy(consumerConfig->rkt);
rd_kafka_destroy(rk);
rd_kafka_destroy(consumerConfig->rk);
/* todo: may or may not be necessary depending on how underlying threads are handled */
/* Let background threads clean up and terminate cleanly. */
@ -283,23 +283,23 @@ static VALUE push(VALUE self, VALUE message) {
rd_kafka_conf_set_dr_cb(producerConfig->conf, msg_delivered);
/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, producerConfig->conf, producerConfig->errstr, sizeof(producerConfig->errstr)))) {
if (!(producerConfig->rk = rd_kafka_new(RD_KAFKA_PRODUCER, producerConfig->conf, producerConfig->errstr, sizeof(producerConfig->errstr)))) {
fprintf(stderr,
"%% Failed to create new producer: %s\n", producerConfig->errstr);
exit(1);
}
/* Set logger */
rd_kafka_set_logger(rk, logger);
rd_kafka_set_log_level(rk, LOG_DEBUG);
rd_kafka_set_logger(producerConfig->rk, logger);
rd_kafka_set_log_level(producerConfig->rk, LOG_DEBUG);
if(rd_kafka_brokers_add(rk, producerConfig->brokers) == 0) {
if(rd_kafka_brokers_add(producerConfig->rk, producerConfig->brokers) == 0) {
fprintf(stderr, "%% No valid brokers specified\n");
exit(1);
}
/* Create topic */
producerConfig->rkt = rd_kafka_topic_new(rk, producerConfig->topic, producerConfig->topic_conf);
producerConfig->rkt = rd_kafka_topic_new(producerConfig->rk, producerConfig->topic, producerConfig->topic_conf);
/* todo: copy message into buf */
@ -323,17 +323,17 @@ static VALUE push(VALUE self, VALUE message) {
rd_kafka_err2str(rd_kafka_errno2err(errno)));
/* Poll to handle delivery reports */
rd_kafka_poll(rk, 0);
rd_kafka_poll(producerConfig->rk, 0);
} else {
fprintf(stderr, "%% Sent %zd bytes to topic %s partition %i\n", len, rd_kafka_topic_name(producerConfig->rkt), producerConfig->partition);
}
/* Wait for messages to be delivered */
while (run && rd_kafka_outq_len(rk) > 0)
rd_kafka_poll(rk, 100);
while (run && rd_kafka_outq_len(producerConfig->rk) > 0)
rd_kafka_poll(producerConfig->rk, 100);
/* Destroy the handle */
rd_kafka_destroy(rk);
rd_kafka_destroy(producerConfig->rk);
/* todo: may or may not be necessary depending on how underlying threads are handled */
/* Let background threads clean up and terminate cleanly. */

View File

@ -19,7 +19,6 @@ static VALUE m_hermann;
// From rdkafka_example.c
static int run = 1;
static rd_kafka_t *rk; // todo: this is the instance of the Prod or Consumer, move into struct
static int exit_eof = 0;
static int quiet = 0;
static enum {
@ -32,6 +31,7 @@ typedef struct HermannInstanceConfig {
char* topic;
/* Kafka configuration */
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
char *brokers;
int partition;

Binary file not shown.