Added consumer queue rerouting

This allows the application to consume from multiple topic+partitions
with a single rd_kafka_consume*() call, instead of one such
call per topic+partition.
This commit is contained in:
Magnus Edenhill 2014-07-30 23:07:47 +02:00
parent a923791bf1
commit 2e769153fe
2 changed files with 24 additions and 12 deletions

View File

@ -284,23 +284,33 @@ void rd_kafka_q_fwd_set (rd_kafka_q_t *rkq, rd_kafka_q_t *fwdq) {
*/
void rd_kafka_q_purge (rd_kafka_q_t *rkq) {
rd_kafka_op_t *rko, *next;
TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
pthread_mutex_lock(&rkq->rkq_lock);
if (!rkq->rkq_fwdq) {
next = TAILQ_FIRST(&rkq->rkq_q);
while ((rko = next)) {
next = TAILQ_NEXT(next, rko_link);
rd_kafka_op_destroy(rko);
}
TAILQ_INIT(&rkq->rkq_q);
rkq->rkq_qlen = 0;
rkq->rkq_qsize = 0;
} else
if (rkq->rkq_fwdq) {
rd_kafka_q_purge(rkq->rkq_fwdq);
pthread_mutex_unlock(&rkq->rkq_lock);
return;
}
/* Move ops queue to tmpq to avoid lock-order issue
* by locks taken from rd_kafka_op_destroy(). */
TAILQ_MOVE(&tmpq, &rkq->rkq_q, rko_link);
/* Zero out queue */
TAILQ_INIT(&rkq->rkq_q);
rkq->rkq_qlen = 0;
rkq->rkq_qsize = 0;
pthread_mutex_unlock(&rkq->rkq_lock);
/* Destroy the ops */
next = TAILQ_FIRST(&tmpq);
while ((rko = next)) {
next = TAILQ_NEXT(next, rko_link);
rd_kafka_op_destroy(rko);
}
}
@ -1671,7 +1681,8 @@ static void rd_kafka_dump0 (FILE *fp, rd_kafka_t *rk, int locks) {
rd_kafka_rdlock(rk);
fprintf(fp, "rd_kafka_t %p: %s\n", rk, rk->rk_name);
fprintf(fp, " refcnt %i\n", rk->rk_refcnt);
fprintf(fp, " refcnt %i, producer.msg_cnt %i\n",
rk->rk_refcnt, rk->rk_producer.msg_cnt);
fprintf(fp, " rk_rep reply queue: %i ops\n",
rd_kafka_q_len(&rk->rk_rep));

View File

@ -789,6 +789,7 @@ int rd_kafka_consume_callback (rd_kafka_topic_t *rkt, int32_t partition,
/**
* Queue consumers
*
* The following `..._queue()` functions are analogue to the functions above
* but reads messages from the provided queue `rkqu` instead.