diff --git a/examples/rdkafka_example.c b/examples/rdkafka_example.c index f108d2d..5667696 100755 --- a/examples/rdkafka_example.c +++ b/examples/rdkafka_example.c @@ -298,8 +298,12 @@ int main (int argc, char **argv) { start_offset = RD_KAFKA_OFFSET_STORED; else if (!strcmp(optarg, "report")) report_offsets = 1; - else + else { start_offset = strtoll(optarg, NULL, 10); + + if (start_offset < 0) + start_offset = RD_KAFKA_OFFSET_TAIL(-start_offset); + } break; case 'e': exit_eof = 1; @@ -414,7 +418,8 @@ int main (int argc, char **argv) { " -b Broker address (localhost:9092)\n" " -z Enable compression:\n" " none|gzip|snappy\n" - " -o Start offset (consumer)\n" + " -o Start offset (consumer):\n" + " beginning, end, NNNNN or -NNNNN\n" " -o report Report message offsets (producer)\n" " -e Exit consumer when last message\n" " in partition has been received.\n" diff --git a/examples/rdkafka_performance.c b/examples/rdkafka_performance.c index 72d0666..3fde1b1 100644 --- a/examples/rdkafka_performance.c +++ b/examples/rdkafka_performance.c @@ -546,8 +546,13 @@ int main (int argc, char **argv) { start_offset = RD_KAFKA_OFFSET_BEGINNING; else if (!strcmp(optarg, "stored")) start_offset = RD_KAFKA_OFFSET_STORED; - else + else { start_offset = strtoll(optarg, NULL, 10); + + if (start_offset < 0) + start_offset = RD_KAFKA_OFFSET_TAIL(-start_offset); + } + break; case 'e': exit_eof = 1; @@ -667,6 +672,7 @@ int main (int argc, char **argv) { " -z Enable compression:\n" " none|gzip|snappy\n" " -o Start offset (consumer)\n" + " beginning, end, NNNNN or -NNNNN\n" " -d [facs..] Enable debugging contexts:\n" " %s\n" " -X Set arbitrary librdkafka " diff --git a/src/rdkafka.c b/src/rdkafka.c index db6663c..e7d05c8 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -1273,14 +1273,15 @@ static int rd_kafka_consume_start0 (rd_kafka_topic_t *rkt, int32_t partition, rd_kafka_topic_unlock(rkt); rd_kafka_toppar_lock(rktp); - switch (offset) - { - case RD_KAFKA_OFFSET_BEGINNING: - case RD_KAFKA_OFFSET_END: + + if (offset == RD_KAFKA_OFFSET_BEGINNING || + offset == RD_KAFKA_OFFSET_END || + (offset & RD_KAFKA_OFFSET_TAIL_TOK)) { rktp->rktp_query_offset = offset; rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY; - break; - case RD_KAFKA_OFFSET_STORED: + + } else if (offset == RD_KAFKA_OFFSET_STORED) { + if (!rkt->rkt_conf.auto_commit) { rd_kafka_toppar_unlock(rktp); rd_kafka_toppar_destroy(rktp); @@ -1288,8 +1289,14 @@ static int rd_kafka_consume_start0 (rd_kafka_topic_t *rkt, int32_t partition, return -1; } rd_kafka_offset_store_init(rktp); - break; - default: + + } else if (offset < 0) { + rd_kafka_toppar_unlock(rktp); + rd_kafka_toppar_destroy(rktp); + errno = EINVAL; + return -1; + + } else { rktp->rktp_next_offset = offset; rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_ACTIVE; } diff --git a/src/rdkafka.h b/src/rdkafka.h index 714b531..819f5db 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -666,12 +666,19 @@ void rd_kafka_queue_destroy (rd_kafka_queue_t *rkqu); #define RD_KAFKA_OFFSET_STORED -1000 /* Start consuming from offset retrieved * from offset store */ +#define RD_KAFKA_OFFSET_TAIL_TOK (-(1llu << 62)) /* internal: do not use */ + +/* Start consuming `CNT` messages from topic's current `.._END` offset. + * That is, if current end offset is 12345 and `CNT` is 200, it will start + * consuming from offset 12345-200 = 12145. */ +#define RD_KAFKA_OFFSET_TAIL(CNT) (RD_KAFKA_OFFSET_TAIL_TOK | (CNT)) /** * Start consuming messages for topic 'rkt' and 'partition' * at offset 'offset' which may either be a proper offset (0..N) * or one of the the special offsets: - * `RD_KAFKA_OFFSET_BEGINNING` or `RD_KAFKA_OFFSET_END`. + * `RD_KAFKA_OFFSET_BEGINNING`, `RD_KAFKA_OFFSET_END`, + * `RD_KAFKA_OFFSET_STORED`, `RD_KAFKA_OFFSET_TAIL(..)` * * rdkafka will attempt to keep 'queued.min.messages' (config property) * messages in the local queue by repeatedly fetching batches of messages diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index 9ebbe2c..8d5cf9a 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -3524,6 +3524,29 @@ rd_kafka_toppar_offset_reply_handle (rd_kafka_broker_t *rkb, * first one returned. */ _READ_I64(&Offset); + /* Adjust by TAIL count if, if wanted */ + if (rktp->rktp_query_offset & RD_KAFKA_OFFSET_TAIL_TOK){ + int64_t tail_cnt = + llabs(rktp->rktp_query_offset & + ~RD_KAFKA_OFFSET_TAIL_TOK); + + rd_rkb_dbg(rkb, TOPIC, "OFFSET", + "OffsetReply for " + "topic %s [%"PRId32"]: " + "offset %"PRId64": adjusting for " + "OFFSET_TAIL(%"PRId64"): " + "effective offset %"PRId64, + rktp->rktp_rkt->rkt_topic->str, + rktp->rktp_partition, + Offset, tail_cnt, + Offset - tail_cnt); + + if (tail_cnt > Offset) + return RD_KAFKA_RESP_ERR_OFFSET_OUT_OF_RANGE; + + Offset -= tail_cnt; + } + rd_rkb_dbg(rkb, TOPIC, "OFFSET", "OffsetReply for topic %s [%"PRId32"]: " "offset %"PRId64": activating fetch", @@ -3619,7 +3642,8 @@ static void rd_kafka_toppar_offset_reply (rd_kafka_broker_t *rkb, /* Signal error back to application */ rko = rd_kafka_op_new(RD_KAFKA_OP_ERR); rko->rko_err = err; - rko->rko_rkmessage.offset = rktp->rktp_query_offset; + rko->rko_rkmessage.offset = (rktp->rktp_query_offset & + ~RD_KAFKA_OFFSET_TAIL_TOK); rko->rko_rkmessage.rkt = rktp->rktp_rkt; rko->rko_rkmessage.partition = rktp->rktp_partition; rd_kafka_topic_keep(rko->rko_rkmessage.rkt); @@ -3729,7 +3753,10 @@ static void rd_kafka_toppar_offset_request (rd_kafka_broker_t *rkb, part2 = (void *)(part1+1); part2->PartitionArrayCnt = htonl(1); part2->Partition = htonl(rktp->rktp_partition); - part2->Time = htobe64(rktp->rktp_query_offset); + if (rktp->rktp_query_offset & RD_KAFKA_OFFSET_TAIL_TOK) + part2->Time = htobe64(RD_KAFKA_OFFSET_END); + else + part2->Time = htobe64(rktp->rktp_query_offset); part2->MaxNumberOfOffsets = htonl(1); rd_kafka_buf_push(rkbuf, part2, sizeof(*part2)); diff --git a/src/rdkafka_int.h b/src/rdkafka_int.h index 30e1ecf..316326b 100644 --- a/src/rdkafka_int.h +++ b/src/rdkafka_int.h @@ -661,7 +661,7 @@ typedef struct rd_kafka_toppar_s { } rktp_fetch_state; rd_ts_t rktp_ts_offset_req_next; - int64_t rktp_query_offset; + int64_t rktp_query_offset; /* Offset to query broker for*/ int64_t rktp_next_offset; /* Next offset to fetch */ int64_t rktp_app_offset; /* Last offset delivered to * application */ diff --git a/src/rdkafka_offset.c b/src/rdkafka_offset.c index 86111eb..ec0f6f1 100644 --- a/src/rdkafka_offset.c +++ b/src/rdkafka_offset.c @@ -364,16 +364,16 @@ void rd_kafka_offset_reset (rd_kafka_toppar_t *rktp, int64_t err_offset, rd_kafka_resp_err_t err, const char *reason) { int64_t offset = RD_KAFKA_OFFSET_ERROR; rd_kafka_op_t *rko; + int64_t offset_reset = rktp->rktp_rkt->rkt_conf.auto_offset_reset; - switch (rktp->rktp_rkt->rkt_conf.auto_offset_reset) - { - case RD_KAFKA_OFFSET_END: - case RD_KAFKA_OFFSET_BEGINNING: + if (offset_reset == RD_KAFKA_OFFSET_END || + offset_reset == RD_KAFKA_OFFSET_BEGINNING || + (offset_reset & RD_KAFKA_OFFSET_TAIL_TOK)) { offset = rktp->rktp_rkt->rkt_conf.auto_offset_reset; rktp->rktp_query_offset = offset; rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_OFFSET_QUERY; - break; - case RD_KAFKA_OFFSET_ERROR: + + } else if (offset_reset == RD_KAFKA_OFFSET_ERROR) { rko = rd_kafka_op_new(RD_KAFKA_OP_ERR); rko->rko_err = err; @@ -387,7 +387,6 @@ void rd_kafka_offset_reset (rd_kafka_toppar_t *rktp, int64_t err_offset, rd_kafka_q_enq(&rktp->rktp_fetchq, rko); rktp->rktp_fetch_state = RD_KAFKA_TOPPAR_FETCH_NONE; - break; } rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "OFFSET",