Dont log decode errors in Fetch replies since they are most likely partial messages, which are okay

This commit is contained in:
Magnus Edenhill 2014-06-11 22:36:15 +02:00
parent f7e914e5b4
commit d278f553ed
1 changed files with 18 additions and 11 deletions

View File

@ -692,11 +692,13 @@ static void rd_kafka_broker_buf_enq (rd_kafka_broker_t *rkb,
*/
#define _FAIL(fmt...) do { \
rd_rkb_log(rkb, LOG_WARNING, "PROTOERR", \
"Protocol parse failure at %s:%i", \
__FUNCTION__, __LINE__); \
rd_rkb_log(rkb, LOG_WARNING, "PROTOERR", fmt); \
goto err; \
if (log_decode_errors) { \
rd_rkb_log(rkb, LOG_WARNING, "PROTOERR", \
"Protocol parse failure at %s:%i", \
__FUNCTION__, __LINE__); \
rd_rkb_log(rkb, LOG_WARNING, "PROTOERR", fmt); \
} \
goto err; \
} while (0)
#define _REMAIN() (size - of)
@ -828,6 +830,7 @@ rd_kafka_metadata_handle (rd_kafka_broker_t *rkb,
int msh_size;
struct rd_kafka_metadata *md = NULL;
int rkb_namelen = strlen(rkb->rkb_name)+1;
const int log_decode_errors = 1;
/* We assume that the marshalled representation is
* no more than 4 times larger than the wire representation. */
@ -1793,6 +1796,7 @@ rd_kafka_produce_reply_handle (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf) {
int16_t ErrorCode;
int64_t Offset;
} RD_PACKED *hdr;
const int log_decode_errors = 1;
_READ_I32(&TopicArrayCnt);
if (TopicArrayCnt != 1)
@ -2681,6 +2685,8 @@ static rd_kafka_resp_err_t rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
void *buf, size_t size) {
size_t of = 0;
rd_kafka_buf_t *rkbufz;
/* Dont log decode errors since Fetch replies may be partial. */
const int log_decode_errors = 0;
if (_REMAIN() == 0)
_FAIL("%s [%"PRId32"] empty messageset",
@ -2715,8 +2721,7 @@ static rd_kafka_resp_err_t rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
* Clients should handle this case."
* We're handling it by not passing the error upstream.
*/
rkb->rkb_c.rx_partial++;
return 0;
goto err;
}
/* Ignore CRC (for now) */
@ -2885,10 +2890,8 @@ static rd_kafka_resp_err_t rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
return 0;
err:
rd_rkb_log(rkb, LOG_WARNING, "PROTOERR",
"Previous parsing error for topic %s [%"PRId32"]",
rktp->rktp_rkt->rkt_topic->str, rktp->rktp_partition);
/* Count all errors as partial message errors. */
rkb->rkb_c.rx_partial++;
return RD_KAFKA_RESP_ERR_NO_ERROR;
}
@ -2913,6 +2916,7 @@ static rd_kafka_resp_err_t rd_kafka_fetch_reply_handle (rd_kafka_broker_t *rkb,
size_t of = 0;
int32_t TopicArrayCnt;
int i;
const int log_decode_errors = 1;
_READ_I32(&TopicArrayCnt);
/* Verify that TopicArrayCnt seems to be in line with remaining size */
@ -3154,6 +3158,7 @@ rd_kafka_toppar_offsetcommit_reply_handle (rd_kafka_broker_t *rkb,
size_t of = 0;
int32_t TopicArrayCnt;
int i;
const int log_decode_errors = 1;
_READ_I32(&TopicArrayCnt);
for (i = 0 ; i < TopicArrayCnt ; i++) {
@ -3392,6 +3397,7 @@ rd_kafka_toppar_offsetfetch_reply_handle (rd_kafka_broker_t *rkb,
size_t of = 0;
int32_t TopicArrayCnt;
int i;
const int log_decode_errors = 1;
rd_hexdump(stdout, "pkt", buf, size);
@ -3464,6 +3470,7 @@ rd_kafka_toppar_offset_reply_handle (rd_kafka_broker_t *rkb,
size_t of = 0;
int32_t TopicArrayCnt;
int i;
const int log_decode_errors = 1;
_READ_I32(&TopicArrayCnt);
for (i = 0 ; i < TopicArrayCnt ; i++) {