Added produce.offset.report to propagate message offset back to producer application

This commit is contained in:
Magnus Edenhill 2014-06-16 14:53:50 +02:00
parent 84a12a1974
commit c3dc4744fe
6 changed files with 50 additions and 4 deletions

View File

@ -51,6 +51,7 @@ request.required.acks | P | 1 | This field indi
enforce.isr.cnt | P | 0 | Fail messages locally if the currently known ISR count for a partition is less than this value. **NOTE**: The ISR count is fetched from the broker at regular intervals (`topic.metadata.refresh.interval.ms`) and might thus be outdated.
request.timeout.ms | P | 5000 | The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on `request.required.acks` being > 0.
message.timeout.ms | P | 300000 | Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite.
produce.offset.report | P | false | Report offset of produced message back to application. The application must be use the `dr_msg_cb` to retrieve the offset from `rd_kafka_message_t.offset`.
partitioner | P | | Partitioner callback (set with rd_kafka_topic_conf_set_partitioner_cb())
opaque | * | | Application opaque (set with rd_kafka_topic_conf_set_opaque())
auto.commit.enable | C | true | If true, periodically commit offset of the last message handed to the application. This commited offset will be used when the process restarts to pick up where it left off. If false, the application will have to call `rd_kafka_offset_store()` to store an offset (optional). **NOTE:** There is currently no zookeeper integration, offsets will be written to local file according to offset.store.path.

View File

@ -116,6 +116,17 @@ static void msg_delivered (rd_kafka_t *rk,
fprintf(stderr, "%% Message delivered (%zd bytes)\n", len);
}
static void msg_delivered2 (rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err)
fprintf(stderr, "%% Message delivery failed: %s\n",
rd_kafka_message_errstr(rkmessage));
else if (!quiet)
fprintf(stderr,
"%% Message delivered (%zd bytes, offset %"PRId64")\n",
rkmessage->len, rkmessage->offset);
}
static void msg_consume (rd_kafka_message_t *rkmessage,
void *opaque) {
@ -239,6 +250,7 @@ int main (int argc, char **argv) {
char errstr[512];
const char *debug = NULL;
int64_t start_offset = 0;
int report_offsets = 0;
int do_conf_dump = 0;
quiet = !isatty(STDIN_FILENO);
@ -281,6 +293,8 @@ int main (int argc, char **argv) {
start_offset = RD_KAFKA_OFFSET_BEGINNING;
else if (!strcmp(optarg, "stored"))
start_offset = RD_KAFKA_OFFSET_STORED;
else if (!strcmp(optarg, "report"))
report_offsets = 1;
else
start_offset = strtoll(optarg, NULL, 10);
break;
@ -398,6 +412,7 @@ int main (int argc, char **argv) {
" -z <codec> Enable compression:\n"
" none|gzip|snappy\n"
" -o <offset> Start offset (consumer)\n"
" -o report Report message offsets (producer)\n"
" -e Exit consumer when last message\n"
" in partition has been received.\n"
" -d [facs..] Enable debugging contexts:\n"
@ -449,7 +464,16 @@ int main (int argc, char **argv) {
/* Set up a message delivery report callback.
* It will be called once for each message, either on successful
* delivery to broker, or upon failure to deliver to broker. */
rd_kafka_conf_set_dr_cb(conf, msg_delivered);
/* If offset reporting (-o report) is enabled, use the
* richer dr_msg_cb instead. */
if (report_offsets) {
rd_kafka_topic_conf_set(topic_conf,
"produce.offset.report",
"true", errstr, sizeof(errstr));
rd_kafka_conf_set_dr_msg_cb(conf, msg_delivered2);
} else
rd_kafka_conf_set_dr_cb(conf, msg_delivered);
/* Create Kafka handle */
if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,

View File

@ -1415,6 +1415,7 @@ static void rd_kafka_poll_cb (rd_kafka_op_t *rko, void *opaque) {
.payload = rkm->rkm_payload,
.len = rkm->rkm_len,
.err = rko->rko_err,
.offset = rkm->rkm_offset,
/* FIXME: partition */
.partition = rkm->rkm_partition,
._private = rkm->rkm_opaque,

View File

@ -1784,7 +1784,8 @@ void rd_kafka_dr_msgq (rd_kafka_t *rk,
* Returns 0 on success or an error code on failure.
*/
static rd_kafka_resp_err_t
rd_kafka_produce_reply_handle (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
rd_kafka_produce_reply_handle (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf,
int64_t *offsetp) {
char *buf = rkbuf->rkbuf_buf2;
size_t size = rkbuf->rkbuf_len;
size_t of = 0;
@ -1814,6 +1815,8 @@ rd_kafka_produce_reply_handle (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
_READ_REF(hdr, sizeof(*hdr));
*offsetp = be64toh(hdr->Offset);
return ntohs(hdr->ErrorCode);
err:
@ -1830,6 +1833,7 @@ static void rd_kafka_produce_msgset_reply (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *request,
void *opaque) {
rd_kafka_toppar_t *rktp = opaque;
int64_t offset = -1;
rd_rkb_dbg(rkb, MSG, "MSGSET",
"MessageSet with %i message(s) %sdelivered",
@ -1837,7 +1841,7 @@ static void rd_kafka_produce_msgset_reply (rd_kafka_broker_t *rkb,
/* Parse Produce reply (unless the request errored) */
if (!err && reply)
err = rd_kafka_produce_reply_handle(rkb, reply);
err = rd_kafka_produce_reply_handle(rkb, reply, &offset);
if (err) {
@ -1894,6 +1898,12 @@ static void rd_kafka_produce_msgset_reply (rd_kafka_broker_t *rkb,
/* FALLTHRU */
}
/* produce.offset.report: Propagate assigned offset back to app. */
if (offset != -1 && rktp->rktp_rkt->rkt_conf.produce_offset_report) {
rd_kafka_msg_t *rkm;
TAILQ_FOREACH(rkm, &request->rkbuf_msgq.rkmq_msgs, rkm_link)
rkm->rkm_offset = offset++;
}
/* Enqueue messages for delivery report */
rd_kafka_dr_msgq(rkb->rkb_rk, &request->rkbuf_msgq, err);

View File

@ -318,8 +318,15 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
_RKT(message_timeout_ms),
"Local message timeout. "
"This value is only enforced locally and limits the time a "
"produced message waits for successful delivery. A time of 0 is infinite.",
"produced message waits for successful delivery. "
"A time of 0 is infinite.",
0, 900*1000, 300*1000 },
{ _RK_TOPIC|_RK_PRODUCER, "produce.offset.report", _RK_C_BOOL,
_RKT(produce_offset_report),
"Report offset of produced message back to application. "
"The application must be use the `dr_msg_cb` to retrieve the offset "
"from `rd_kafka_message_t.offset`.",
0, 1, 0 },
{ _RK_TOPIC|_RK_PRODUCER, "partitioner", _RK_C_PTR,
_RKT(partitioner),
"Partitioner callback "

View File

@ -239,6 +239,8 @@ struct rd_kafka_topic_conf_s {
void *rkt_opaque,
void *msg_opaque);
int produce_offset_report;
int auto_commit;
int auto_commit_interval_ms;
int auto_offset_reset;
@ -349,6 +351,7 @@ typedef struct rd_kafka_msg_s {
void *rkm_opaque;
int32_t rkm_partition; /* partition specified */
rd_kafkap_bytes_t *rkm_key;
int64_t rkm_offset;
rd_ts_t rkm_ts_timeout;
} rd_kafka_msg_t;