Immediate failure on unknown topic (issue #39) and fixed out-of-order problem for produced messages.

rd_kafka_produce() will now return -1 and set errno to ENOENT when
the topic is unknown in the cluster, and the DR callback will use
new error code RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC to indicte the same.

This also fixes an issue where messages could be produced out of order
during initial broker setup in some circumstances.
This commit is contained in:
Magnus Edenhill 2014-01-26 00:06:27 +07:00
parent 651707db3d
commit a2bfffe541
12 changed files with 746 additions and 182 deletions

View File

@ -485,6 +485,8 @@ const char *rd_kafka_err2str (rd_kafka_resp_err_t err) {
return "Local: Unknown partition";
case RD_KAFKA_RESP_ERR__FS:
return "Local: File or filesystem error";
case RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC:
return "Local: Unknown topic";
case RD_KAFKA_RESP_ERR_UNKNOWN:
return "Unknown error";

View File

@ -111,6 +111,9 @@ typedef enum {
* Partition does not
* exist in cluster. */
RD_KAFKA_RESP_ERR__FS = -189, /* File or filesystem error */
RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC = -188, /* Permanent:
* Topic does not exist
* in cluster. */
RD_KAFKA_RESP_ERR__END = -100, /* end internal error codes */
/* Standard Kafka errors: */
@ -734,6 +737,7 @@ rd_kafka_resp_err_t rd_kafka_offset_store (rd_kafka_topic_t *rkt,
* "queue.buffering.max.message"
* EMSGSIZE - message is larger than configured max size:
* "messages.max.bytes".
* ENOENT - topic is unknown in the Kafka cluster.
* ESRCH - requested 'partition' is unknown in the Kafka cluster.
*
*/

View File

@ -364,6 +364,9 @@ static void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
/* Undelegate all toppars from this broker. */
rd_kafka_broker_toppars_wrlock(rkb);
while ((rktp = TAILQ_FIRST(&rkb->rkb_toppars))) {
rd_kafka_topic_t *rkt = rktp->rktp_rkt;
rd_kafka_topic_keep(rkt); /* Hold on to rkt */
rd_kafka_toppar_keep(rktp);
rd_kafka_broker_toppars_unlock(rkb);
rd_rkb_dbg(rkb, TOPIC, "BRKTP",
@ -371,13 +374,16 @@ static void rd_kafka_broker_fail (rd_kafka_broker_t *rkb,
RD_KAFKAP_STR_PR(rktp->rktp_rkt->rkt_topic),
rktp->rktp_partition);
/* Undelegate */
rd_kafka_topic_wrlock(rktp->rktp_rkt);
/* Undelegate */
rd_kafka_toppar_broker_delegate(rktp, NULL);
rd_kafka_topic_unlock(rktp->rktp_rkt);
rd_kafka_broker_toppars_wrlock(rkb);
rd_kafka_toppar_destroy(rktp);
rd_kafka_topic_destroy(rkt); /* Let go of rkt */
rd_kafka_broker_toppars_wrlock(rkb);
}
rd_kafka_broker_toppars_unlock(rkb);
@ -654,10 +660,12 @@ static void rd_kafka_broker_buf_enq (rd_kafka_broker_t *rkb,
/**
* Handle a Metadata response message.
* If 'rkt' is non-NULL the metadata originated from a topic-specific request.
*
* Locality: broker thread
*/
static void rd_kafka_metadata_handle (rd_kafka_broker_t *rkb,
rd_kafka_topic_t *req_rkt,
const char *buf, size_t size) {
struct {
int32_t NodeId;
@ -665,30 +673,15 @@ static void rd_kafka_metadata_handle (rd_kafka_broker_t *rkb,
int32_t Port;
} *Brokers = NULL;
int32_t Broker_cnt;
struct {
int16_t ErrorCode;
rd_kafkap_str_t *Name;
struct {
int16_t ErrorCode;
int32_t PartitionId;
int32_t Leader;
int32_t *Replicas;
int32_t Replicas_cnt;
int32_t *Isr;
int32_t Isr_cnt;
} *PartitionMetadata;
int32_t PartitionMetadata_cnt;
} *TopicMetadata = NULL;
struct rd_kafka_TopicMetadata *TopicMetadata = NULL;
int32_t TopicMetadata_cnt;
int i, j, k;
int of = 0;
int req_rkt_seen = 0;
/* Read Brokers */
_READ_I32(&Broker_cnt);
rd_rkb_dbg(rkb, METADATA, "METADATA", "%"PRId32" brokers", Broker_cnt);
if (Broker_cnt > RD_KAFKAP_BROKERS_MAX)
_FAIL("Broker_cnt %"PRId32" > BROKERS_MAX %i",
Broker_cnt, RD_KAFKAP_BROKERS_MAX);
@ -705,6 +698,9 @@ static void rd_kafka_metadata_handle (rd_kafka_broker_t *rkb,
/* Read TopicMetadata */
_READ_I32(&TopicMetadata_cnt);
rd_rkb_dbg(rkb, METADATA, "METADATA", "%"PRId32" brokers, "
"%"PRId32" topics", Broker_cnt, TopicMetadata_cnt);
if (TopicMetadata_cnt > RD_KAFKAP_TOPICS_MAX)
_FAIL("TopicMetadata_cnt %"PRId32" > TOPICS_MAX %i",
TopicMetadata_cnt, RD_KAFKAP_TOPICS_MAX);
@ -800,42 +796,24 @@ static void rd_kafka_metadata_handle (rd_kafka_broker_t *rkb,
Brokers[i].NodeId);
}
/* Update Leader for each topic we know about */
/* Update partition count and leader for each topic we know about */
for (i = 0 ; i < TopicMetadata_cnt ; i++) {
const char *topic = rd_kafkap_strdupa(TopicMetadata[i].Name);
int upd;
upd = rd_kafka_topic_partition_cnt_update(rkb->rkb_rk, topic,
TopicMetadata[i].
PartitionMetadata_cnt);
if (req_rkt &&
!rd_kafkap_str_cmp(TopicMetadata[i].Name,
req_rkt->rkt_topic))
req_rkt_seen++;
for (j = 0 ;
j < TopicMetadata[i].PartitionMetadata_cnt ; j++) {
rd_rkb_dbg(rkb, METADATA, "METADATA",
" Topic #%i/%i: %s partition %"PRId32
" Leader %"PRId32,
i, TopicMetadata_cnt,
topic,
TopicMetadata[i].
PartitionMetadata[j].PartitionId,
TopicMetadata[i].
PartitionMetadata[j].Leader);
if (upd == -1)
continue;
rd_kafka_topic_metadata_update(rkb, &TopicMetadata[i]);
}
rd_kafka_topic_leader_update(rkb->rkb_rk,
topic,
TopicMetadata[i].
PartitionMetadata[j].
PartitionId,
TopicMetadata[i].
PartitionMetadata[j].
Leader);
}
/* Try to assign unassigned messages to new partitions */
if (upd != -1)
rd_kafka_topic_assign_uas(rkb->rkb_rk, topic);
/* Requested topics not seen in metadata? Propogate to topic code. */
if (req_rkt) {
rd_rkb_dbg(rkb, TOPIC, "METADATA",
"Requested topic %s %sseen in metadata",
req_rkt->rkt_topic->str, req_rkt_seen ? "" : "not ");
if (!req_rkt_seen)
rd_kafka_topic_metadata_none(req_rkt);
}
@ -852,6 +830,7 @@ static void rd_kafka_broker_metadata_reply (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *reply,
rd_kafka_buf_t *request,
void *opaque) {
rd_kafka_topic_t *rkt = opaque;
rd_rkb_dbg(rkb, METADATA, "METADATA",
"===== Received metadata from %s =====",
@ -867,12 +846,15 @@ static void rd_kafka_broker_metadata_reply (rd_kafka_broker_t *rkb,
"Metadata request failed: %s",
rd_kafka_err2str(err));
} else {
rd_kafka_metadata_handle(rkb,
rd_kafka_metadata_handle(rkb, rkt,
reply->rkbuf_buf2,
reply->rkbuf_len);
}
done:
if (rkt)
rd_kafka_topic_destroy(rkt);
rd_kafka_buf_destroy(request);
if (reply)
rd_kafka_buf_destroy(reply);
@ -975,10 +957,13 @@ static void rd_kafka_broker_metadata_req (rd_kafka_broker_t *rkb,
}
if (only_rkt)
rd_kafka_topic_keep(only_rkt);
rd_kafka_broker_buf_enq(rkb, RD_KAFKAP_Metadata,
buf, of,
RD_KAFKA_OP_F_FREE|RD_KAFKA_OP_F_FLASH,
rd_kafka_broker_metadata_reply, NULL);
rd_kafka_broker_metadata_reply, only_rkt);
}

View File

@ -484,6 +484,12 @@ struct rd_kafka_topic_s {
rd_ts_t rkt_ts_metadata; /* Timestamp of last metadata
* update for this topic. */
enum {
RD_KAFKA_TOPIC_S_INIT,
RD_KAFKA_TOPIC_S_EXISTS,
RD_KAFKA_TOPIC_S_UNKNOWN,
} rkt_state;
struct rd_kafka_s *rkt_rk;
rd_kafka_topic_conf_t rkt_conf;

View File

@ -104,7 +104,7 @@ int rd_kafka_msg_new (rd_kafka_topic_t *rkt, int32_t force_partition,
}
err = rd_kafka_msg_partitioner(rkt, rkm);
err = rd_kafka_msg_partitioner(rkt, rkm, 1);
if (likely(!err))
return 0;
@ -117,6 +117,8 @@ int rd_kafka_msg_new (rd_kafka_topic_t *rkt, int32_t force_partition,
/* Translate error codes to errnos. */
if (err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION)
errno = ESRCH;
else if (err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
errno = ENOENT;
else
errno = EINVAL; /* NOTREACHED */
@ -165,19 +167,41 @@ int32_t rd_kafka_msg_partitioner_random (const rd_kafka_topic_t *rkt,
/**
* Assigns a message to a topic partition using a partitioner.
* Returns RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION if partitioning failed,
* or 0 on success.
* Returns RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION or .._UNKNOWN_TOPIC if
* partitioning failed, or 0 on success.
*/
int rd_kafka_msg_partitioner (rd_kafka_topic_t *rkt, rd_kafka_msg_t *rkm) {
int rd_kafka_msg_partitioner (rd_kafka_topic_t *rkt, rd_kafka_msg_t *rkm,
int do_lock) {
int32_t partition;
rd_kafka_toppar_t *rktp_new;
rd_kafka_resp_err_t err;
rd_kafka_topic_rdlock(rkt);
if (do_lock)
rd_kafka_topic_rdlock(rkt);
/* Fast path for failing messages with forced partition
* when the partition is not available.
* Only fail the message if its forced partition does not
* exist in the Kafka cluster, given that the topic's metadata
* can be trusted (is not older than 3 times the metadata
* refresh interval). */
if (unlikely((rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN ||
(rkm->rkm_partition != RD_KAFKA_PARTITION_UA &&
(rkm->rkm_partition >= rkt->rkt_partition_cnt ))) &&
rd_clock() < rkt->rkt_ts_metadata +
(rkt->rkt_rk->rk_conf.metadata_refresh_interval_ms *
3 * 1000))) {
if (rkt->rkt_partition_cnt == 0)
err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
else
err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
if (do_lock)
rd_kafka_topic_unlock(rkt);
return err;
}
if (unlikely(rkt->rkt_partition_cnt == 0)) {
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "PART",
"%.*s has no partitions",
RD_KAFKAP_STR_PR(rkt->rkt_topic));
partition = RD_KAFKA_PARTITION_UA;
} else if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)
partition =
@ -194,28 +218,13 @@ int rd_kafka_msg_partitioner (rd_kafka_topic_t *rkt, rd_kafka_msg_t *rkm) {
if (partition >= rkt->rkt_partition_cnt) {
/* Partition is unknown (locally) */
/* Only fail the message if its forced partition does not
* exist in the Kafka cluster, given that the topic's metadata
* can be trusted (is not older than 3 times the metadata
* refresh interval). */
if (rkm->rkm_partition != RD_KAFKA_PARTITION_UA &&
rd_clock() < rkt->rkt_ts_metadata +
(rkt->rkt_rk->rk_conf.metadata_refresh_interval_ms *
3 * 1000)) {
/* Permanent error */
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "PART",
"%.*s partition [%"PRId32"] is unknown",
RD_KAFKAP_STR_PR(rkt->rkt_topic),
partition);
} else {
/* Temporary error, assign to UA partition for now */
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "PART",
"%.*s partition [%"PRId32"] not "
"currently available",
RD_KAFKAP_STR_PR(rkt->rkt_topic),
partition);
partition = RD_KAFKA_PARTITION_UA;
}
/* Temporary error, assign to UA partition for now */
rd_kafka_dbg(rkt->rkt_rk, TOPIC, "PART",
"%.*s partition [%"PRId32"] not "
"currently available",
RD_KAFKAP_STR_PR(rkt->rkt_topic),
partition);
partition = RD_KAFKA_PARTITION_UA;
/* FALLTHRU */
}
@ -233,17 +242,23 @@ int rd_kafka_msg_partitioner (rd_kafka_topic_t *rkt, rd_kafka_msg_t *rkm) {
/* Get new partition */
rktp_new = rd_kafka_toppar_get(rkt, partition, 0);
rd_kafka_dbg(rkt->rkt_rk, MSG, "PART",
"Assign to new part %p", rktp_new);
if (likely(!rktp_new)) {
/* Unknown partition */
rd_kafka_topic_unlock(rkt);
return RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
/* Unknown topic or partition */
if (rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN)
err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
else
err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
if (do_lock)
rd_kafka_topic_unlock(rkt);
return err;
}
/* Partition is available: enqueue msg on partition's queue */
rd_kafka_toppar_enq_msg(rktp_new, rkm);
rd_kafka_topic_unlock(rkt);
if (do_lock)
rd_kafka_topic_unlock(rkt);
rd_kafka_toppar_destroy(rktp_new); /* from _get() */
return 0;
}

View File

@ -150,4 +150,5 @@ int rd_kafka_msgq_age_scan (rd_kafka_msgq_t *rkmq,
rd_ts_t now);
int rd_kafka_msg_partitioner (rd_kafka_topic_t *rkt, rd_kafka_msg_t *rkm);
int rd_kafka_msg_partitioner (rd_kafka_topic_t *rkt, rd_kafka_msg_t *rkm,
int do_lock);

View File

@ -223,3 +223,23 @@ struct rd_kafkap_FetchRequest {
int32_t MinBytes;
int32_t TopicArrayCnt;
} RD_PACKED;
/* Non-protocol representation of a topic's metadata. */
struct rd_kafka_TopicMetadata {
int16_t ErrorCode;
rd_kafkap_str_t *Name;
struct {
int16_t ErrorCode;
int32_t PartitionId;
int32_t Leader;
struct rd_kafka_broker_s *rkb;
int32_t *Replicas;
int32_t Replicas_cnt;
int32_t *Isr;
int32_t Isr_cnt;
} *PartitionMetadata;
int32_t PartitionMetadata_cnt;
};

View File

@ -68,7 +68,6 @@ static rd_kafka_toppar_t *rd_kafka_toppar_new (rd_kafka_topic_t *rkt,
void rd_kafka_toppar_destroy0 (rd_kafka_toppar_t *rktp) {
rd_kafka_toppar_lock(rktp);
/* Clear queues */
rd_kafka_dr_msgq(rktp->rktp_rkt->rkt_rk, &rktp->rktp_xmit_msgq,
@ -77,9 +76,8 @@ void rd_kafka_toppar_destroy0 (rd_kafka_toppar_t *rktp) {
RD_KAFKA_RESP_ERR__DESTROY);
rd_kafka_q_purge(&rktp->rktp_fetchq);
rd_kafka_toppar_unlock(rktp);
rd_kafka_topic_destroy(rktp->rktp_rkt);
pthread_mutex_destroy(&rktp->rktp_lock);
free(rktp);
}
@ -464,7 +462,8 @@ const char *rd_kafka_topic_name (const rd_kafka_topic_t *rkt) {
* Delegates broker 'rkb' as leader for toppar 'rktp'.
* 'rkb' may be NULL to undelegate leader.
*
* Locks: Caller must have topic_lock held.
* Locks: Caller must have rd_kafka_topic_wrlock(rktp->rktp_rkt)
* AND rd_kafka_toppar_lock(rktp) held.
*/
void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
rd_kafka_broker_t *rkb) {
@ -525,27 +524,17 @@ void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
void rd_kafka_topic_leader_update (rd_kafka_t *rk,
const char *topic, int32_t partition,
int32_t leader) {
rd_kafka_topic_t *rkt;
/**
* Update the leader for a topic+partition.
* Returns 1 if the leader was changed, else 0.
* NOTE: rd_kafka_topic_wrlock(rkt) MUST be held.
*/
static int rd_kafka_topic_leader_update (rd_kafka_topic_t *rkt,
int32_t partition,
int32_t leader,
rd_kafka_broker_t *rkb) {
rd_kafka_t *rk = rkt->rkt_rk;
rd_kafka_toppar_t *rktp;
rd_kafka_broker_t *rkb = NULL;
if (!(rkt = rd_kafka_topic_find(rk, topic))) {
rd_kafka_dbg(rk, METADATA, "TOPICUPD",
"Ignoring topic %s: not found locally", topic);
return;
}
/* Find broker */
if (leader != -1) {
rd_kafka_lock(rk);
rkb = rd_kafka_broker_find_by_nodeid(rk, leader);
rd_kafka_unlock(rk);
}
rd_kafka_topic_wrlock(rkt);
rktp = rd_kafka_toppar_get(rkt, partition, 0);
assert(rktp);
@ -560,18 +549,20 @@ void rd_kafka_topic_leader_update (rd_kafka_t *rk,
"Topic %s [%"PRId32"] migrated to unknown "
"broker %"PRId32": "
"requesting metadata update",
topic, partition, leader);
rkt->rkt_topic->str, partition, leader);
rd_kafka_toppar_broker_delegate(rktp, NULL);
rd_kafka_topic_unlock(rkt);
/* Query for the topic leader (async) */
if (had_leader)
if (had_leader) {
rd_kafka_topic_keep(rkt);
rd_kafka_topic_leader_query(rk, rkt);
}
rd_kafka_toppar_unlock(rktp);
rd_kafka_toppar_destroy(rktp); /* from get() */
rd_kafka_topic_destroy(rkt); /* from find() */
return;
return had_leader ? 1 : 0;
}
@ -581,29 +572,24 @@ void rd_kafka_topic_leader_update (rd_kafka_t *rk,
rd_kafka_dbg(rk, TOPIC, "TOPICUPD",
"No leader change for topic %s "
"[%"PRId32"] with leader %"PRId32,
topic, partition, leader);
rd_kafka_topic_unlock(rkt);
rd_kafka_broker_destroy(rkb); /* refcnt from find */
rkt->rkt_topic->str, partition, leader);
rd_kafka_toppar_destroy(rktp); /* from get() */
rd_kafka_topic_destroy(rkt); /* from find() */
return;
return 0;
}
rd_kafka_dbg(rk, TOPIC, "TOPICUPD",
"Topic %s [%"PRId32"] migrated from "
"broker %"PRId32" to %"PRId32,
topic, partition, rktp->rktp_leader->rkb_nodeid,
rkt->rkt_topic->str, partition,
rktp->rktp_leader->rkb_nodeid,
rkb->rkb_nodeid);
}
rd_kafka_toppar_broker_delegate(rktp, rkb);
rd_kafka_topic_unlock(rkt);
rd_kafka_broker_destroy(rkb); /* refcnt from find */
rd_kafka_toppar_broker_delegate(rktp, rkb);
rd_kafka_toppar_destroy(rktp); /* from get() */
rd_kafka_topic_destroy(rkt); /* from find() */
return 1;
}
@ -653,48 +639,35 @@ void rd_kafka_topic_partitions_remove (rd_kafka_topic_t *rkt) {
/**
* Update the number of partitions for a topic and takes according actions.
* Returns 1 if the number of partitions changed, 0 if not, and -1 if the
* topic is unknown.
* Returns 1 if the partition count changed, else 0.
* NOTE: rd_kafka_topic_wrlock(rkt) MUST be held.
*/
int rd_kafka_topic_partition_cnt_update (rd_kafka_t *rk,
const char *topic,
int32_t partition_cnt) {
rd_kafka_topic_t *rkt;
static int rd_kafka_topic_partition_cnt_update (rd_kafka_topic_t *rkt,
int32_t partition_cnt) {
rd_kafka_t *rk = rkt->rkt_rk;
rd_kafka_toppar_t **rktps;
rd_kafka_toppar_t *rktp_ua;
rd_kafka_toppar_t *rktp;
int32_t i;
if (!(rkt = rd_kafka_topic_find(rk, topic))) {
rd_kafka_dbg(rk, METADATA, "PARTCNT",
"Ignore unknown topic %s", topic);
return -1; /* Ignore topics that we dont have locally. */
}
rd_kafka_topic_wrlock(rkt);
rkt->rkt_ts_metadata = rd_clock();
if (rkt->rkt_partition_cnt == partition_cnt) {
rd_kafka_dbg(rk, TOPIC, "PARTCNT",
"No change in partition count for topic %s",
topic);
rd_kafka_topic_unlock(rkt);
rd_kafka_topic_destroy(rkt); /* from find() */
rkt->rkt_topic->str);
return 0; /* No change in partition count */
}
if (unlikely(rkt->rkt_partition_cnt != 0))
rd_kafka_log(rk, LOG_NOTICE, "PARTCNT",
"Topic %.*s partition count changed "
"Topic %s partition count changed "
"from %"PRId32" to %"PRId32,
RD_KAFKAP_STR_PR(rkt->rkt_topic),
rkt->rkt_topic->str,
rkt->rkt_partition_cnt, partition_cnt);
else
rd_kafka_dbg(rk, TOPIC, "PARTCNT",
"Topic %.*s partition count changed "
"Topic %s partition count changed "
"from %"PRId32" to %"PRId32,
RD_KAFKAP_STR_PR(rkt->rkt_topic),
rkt->rkt_topic->str,
rkt->rkt_partition_cnt, partition_cnt);
@ -762,32 +735,27 @@ int rd_kafka_topic_partition_cnt_update (rd_kafka_t *rk,
rkt->rkt_partition_cnt = partition_cnt;
rd_kafka_topic_unlock(rkt);
rd_kafka_topic_destroy(rkt); /* from find() */
return 1;
}
void rd_kafka_topic_assign_uas (rd_kafka_t *rk, const char *topic) {
rd_kafka_topic_t *rkt;
/**
* Assign messages on the UA partition to available partitions.
* Locks: rd_kafka_topic_*lock() must be held.
*/
static void rd_kafka_topic_assign_uas (rd_kafka_topic_t *rkt) {
rd_kafka_t *rk = rkt->rkt_rk;
rd_kafka_toppar_t *rktp_ua;
rd_kafka_msg_t *rkm, *tmp;
rd_kafka_msgq_t uas = RD_KAFKA_MSGQ_INITIALIZER(uas);
rd_kafka_msgq_t failed = RD_KAFKA_MSGQ_INITIALIZER(failed);
int cnt;
if (!(rkt = rd_kafka_topic_find(rk, topic))) {
rd_kafka_dbg(rk, METADATA, "ASSIGNUA",
"Ignore unknown topic %s", topic);
return; /* Ignore topics that we dont have locally. */
}
rktp_ua = rd_kafka_toppar_get(rkt, RD_KAFKA_PARTITION_UA, 0);
if (unlikely(!rktp_ua)) {
rd_kafka_dbg(rk, TOPIC, "ASSIGNUA",
"No UnAssigned partition available for %s", topic);
rd_kafka_topic_destroy(rkt);
"No UnAssigned partition available for %s",
rkt->rkt_topic->str);
return;
}
@ -805,27 +773,143 @@ void rd_kafka_topic_assign_uas (rd_kafka_t *rk, const char *topic) {
rd_kafka_toppar_unlock(rktp_ua);
TAILQ_FOREACH_SAFE(rkm, &uas.rkmq_msgs, rkm_link, tmp) {
if (unlikely(rd_kafka_msg_partitioner(rkt, rkm) != 0)) {
/* Fast-path for failing messages with forced partition */
if (rkm->rkm_partition != RD_KAFKA_PARTITION_UA &&
rkm->rkm_partition >= rkt->rkt_partition_cnt &&
rkt->rkt_state != RD_KAFKA_TOPIC_S_INIT) {
rd_kafka_msgq_enq(&failed, rkm);
continue;
}
if (unlikely(rd_kafka_msg_partitioner(rkt, rkm, 0) != 0)) {
/* Desired partition not available */
rd_kafka_msgq_enq(&failed, rkm);
}
}
rd_kafka_dbg(rk, TOPIC, "UAS",
"%i/%i messages were partitioned",
cnt - failed.rkmq_msg_cnt, cnt);
"%i/%i messages were partitioned in topic %s",
cnt - failed.rkmq_msg_cnt, cnt, rkt->rkt_topic->str);
if (failed.rkmq_msg_cnt > 0) {
/* Fail the messages */
rd_kafka_dbg(rk, TOPIC, "UAS",
"%i/%i messages failed partitioning",
uas.rkmq_msg_cnt, cnt);
"%i/%i messages failed partitioning in topic %s",
uas.rkmq_msg_cnt, cnt, rkt->rkt_topic->str);
rd_kafka_dr_msgq(rk, &failed,
rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN ?
RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC :
RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION);
}
rd_kafka_toppar_destroy(rktp_ua); /* from get() */
}
/**
* Received metadata request contained no information about topic 'rkt'
* and thus indicates the topic is not available in the cluster.
*/
void rd_kafka_topic_metadata_none (rd_kafka_topic_t *rkt) {
rd_kafka_topic_wrlock(rkt);
rkt->rkt_ts_metadata = rd_clock();
rkt->rkt_state = RD_KAFKA_TOPIC_S_UNKNOWN;
/* Update number of partitions */
rd_kafka_topic_partition_cnt_update(rkt, 0);
/* Purge messages with forced partition */
rd_kafka_topic_assign_uas(rkt);
rd_kafka_topic_unlock(rkt);
}
/**
* Update a topic from metadata.
* Returns 1 if the number of partitions changed, 0 if not, and -1 if the
* topic is unknown.
*/
int rd_kafka_topic_metadata_update (rd_kafka_broker_t *rkb,
const struct rd_kafka_TopicMetadata *tm) {
rd_kafka_topic_t *rkt;
int upd = 0;
int j;
if (!(rkt = rd_kafka_topic_find(rkb->rkb_rk,
rd_kafkap_strdupa(tm->Name))))
return -1; /* Ignore topics that we dont have locally. */
if (tm->ErrorCode != RD_KAFKA_RESP_ERR_NO_ERROR)
rd_rkb_dbg(rkb, TOPIC, "METADATA",
"Error in metadata reply for "
"topic %s (PartCnt %"PRId32"): %s",
rkt->rkt_topic->str, tm->PartitionMetadata_cnt,
rd_kafka_err2str(tm->ErrorCode));
/* Look up brokers before acquiring rkt lock to preserve lock order */
rd_kafka_lock(rkb->rkb_rk);
for (j = 0 ; j < tm->PartitionMetadata_cnt ; j++) {
if (tm->PartitionMetadata[j].Leader == -1) {
tm->PartitionMetadata[j].rkb = NULL;
continue;
}
tm->PartitionMetadata[j].rkb =
rd_kafka_broker_find_by_nodeid(rkb->rkb_rk,
tm->PartitionMetadata[j].
Leader);
}
rd_kafka_unlock(rkb->rkb_rk);
rd_kafka_topic_wrlock(rkt);
rkt->rkt_ts_metadata = rd_clock();
/* Set topic state */
if (tm->ErrorCode == RD_KAFKA_RESP_ERR_UNKNOWN_TOPIC_OR_PART)
rkt->rkt_state = RD_KAFKA_TOPIC_S_UNKNOWN;
else
rkt->rkt_state = RD_KAFKA_TOPIC_S_EXISTS;
/* Update number of partitions */
upd += rd_kafka_topic_partition_cnt_update(rkt,
tm->PartitionMetadata_cnt);
/* Update leader for each partition */
for (j = 0 ; j < tm->PartitionMetadata_cnt ; j++) {
rd_rkb_dbg(rkb, METADATA, "METADATA",
" Topic %s partition %"PRId32" Leader %"PRId32,
rkt->rkt_topic->str,
tm->PartitionMetadata[j].PartitionId,
tm->PartitionMetadata[j].Leader);
/* Update leader for partition */
upd += rd_kafka_topic_leader_update(rkt,
tm->PartitionMetadata[j].
PartitionId,
tm->PartitionMetadata[j].
Leader,
tm->PartitionMetadata[j].
rkb);
}
/* Try to assign unassigned messages to new partitions, or fail them */
if (upd > 0 || rkt->rkt_state == RD_KAFKA_TOPIC_S_UNKNOWN)
rd_kafka_topic_assign_uas(rkt);
rd_kafka_topic_unlock(rkt);
rd_kafka_topic_destroy(rkt); /* from find() */
/* Drop broker references */
for (j = 0 ; j < tm->PartitionMetadata_cnt ; j++)
if (tm->PartitionMetadata[j].rkb)
rd_kafka_broker_destroy(tm->PartitionMetadata[j].rkb);
return upd;
}

View File

@ -64,16 +64,10 @@ int rd_kafka_toppar_ua_move (rd_kafka_topic_t *rkt, rd_kafka_msgq_t *rkmq);
void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
rd_kafka_broker_t *rkb);
void rd_kafka_topic_leader_update (rd_kafka_t *rk,
const char *topic, int32_t partition,
int32_t leader);
void rd_kafka_topic_assign_uas (rd_kafka_t *rk, const char *topic);
void rd_kafka_topic_partitions_remove (rd_kafka_topic_t *rkt);
int rd_kafka_topic_partition_cnt_update (rd_kafka_t *rk,
const char *topic,
int32_t partition_cnt);
void rd_kafka_topic_metadata_none (rd_kafka_topic_t *rkt);
int rd_kafka_topic_metadata_update (rd_kafka_broker_t *rkb,
const struct rd_kafka_TopicMetadata *tm);
int rd_kafka_topic_scan_all (rd_kafka_t *rk, rd_ts_t now);

142
tests/0005-order.c Normal file
View File

@ -0,0 +1,142 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2013, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Tests messages are produced in order.
*/
#define _GNU_SOURCE
#include <sys/time.h>
#include <time.h>
#include "test.h"
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */
static int msgid_next = 0;
static int fails = 0;
/**
* Delivery reported callback.
* Called for each message once to signal its delivery status.
*/
static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) {
int msgid = *(int *)msg_opaque;
free(msg_opaque);
if (err != RD_KAFKA_RESP_ERR_NO_ERROR)
TEST_FAIL("Message delivery failed: %s\n",
rd_kafka_err2str(err));
if (msgid != msgid_next) {
fails++;
TEST_FAIL("Delivered msg %i, expected %i\n",
msgid, msgid_next);
return;
}
msgid_next = msgid+1;
}
int main (int argc, char **argv) {
char *topic = "rdkafkatest1";
int partition = 0;
int r;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char errstr[512];
char msg[128];
int msgcnt = 100000;
int i;
test_conf_init(&conf, &topic_conf, 10);
/* Set delivery report callback */
rd_kafka_conf_set_dr_cb(conf, dr_cb);
/* Create kafka instance */
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr));
if (!rk)
TEST_FAIL("Failed to create rdkafka instance: %s\n", errstr);
TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
strerror(errno));
/* Produce a message */
for (i = 0 ; i < msgcnt ; i++) {
int *msgidp = malloc(sizeof(*msgidp));
*msgidp = i;
snprintf(msg, sizeof(msg), "%s test message #%i", argv[0], i);
r = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY,
msg, strlen(msg), NULL, 0, msgidp);
if (r == -1)
TEST_FAIL("Failed to produce message #%i: %s\n",
i, strerror(errno));
}
TEST_SAY("Produced %i messages, waiting for deliveries\n", msgcnt);
/* Wait for messages to time out */
while (rd_kafka_outq_len(rk) > 0)
rd_kafka_poll(rk, 50);
if (fails)
TEST_FAIL("%i failures, see previous errors", fails);
if (msgid_next != msgcnt)
TEST_FAIL("Still waiting for messages: next %i != end %i\n",
msgid_next, msgcnt);
/* Destroy topic */
rd_kafka_topic_destroy(rkt);
/* Destroy rdkafka instance */
TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
rd_kafka_destroy(rk);
/* Wait for everything to be cleaned up since broker destroys are
* handled in its own thread. */
test_wait_exit(10);
/* If we havent failed at this point then
* there were no threads leaked */
return 0;
}

147
tests/0007-autotopic.c Normal file
View File

@ -0,0 +1,147 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2013, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Auto create topics
*
* NOTE! This test requires auto.create.topics.enable=true to be
* configured on the broker!
*/
#define _GNU_SOURCE
#include <sys/time.h>
#include <time.h>
#include "test.h"
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */
static int msgs_wait = 0; /* bitmask */
/**
* Delivery report callback.
* Called for each message once to signal its delivery status.
*/
static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) {
int msgid = *(int *)msg_opaque;
free(msg_opaque);
if (!(msgs_wait & (1 << msgid)))
TEST_FAIL("Unwanted delivery report for message #%i "
"(waiting for 0x%x)\n", msgid, msgs_wait);
TEST_SAY("Delivery report for message #%i: %s\n",
msgid, rd_kafka_err2str(err));
msgs_wait &= ~(1 << msgid);
if (err)
TEST_FAIL("Message #%i failed with unexpected error %s\n",
msgid, rd_kafka_err2str(err));
}
int main (int argc, char **argv) {
char topic[64];
int partition = 0;
int r;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char errstr[512];
char msg[128];
int msgcnt = 10;
int i;
/* Generate unique topic name */
test_conf_init(&conf, &topic_conf, 10);
snprintf(topic, sizeof(topic), "rdkafkatest1_auto_%x%x",
rand(), rand());
TEST_SAY("\033[33mNOTE! This test requires "
"auto.create.topics.enable=true to be configured on "
"the broker!\033[0m\n");
/* Set delivery report callback */
rd_kafka_conf_set_dr_cb(conf, dr_cb);
/* Create kafka instance */
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr));
if (!rk)
TEST_FAIL("Failed to create rdkafka instance: %s\n", errstr);
TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
strerror(errno));
/* Produce a message */
for (i = 0 ; i < msgcnt ; i++) {
int *msgidp = malloc(sizeof(*msgidp));
*msgidp = i;
snprintf(msg, sizeof(msg), "%s test message #%i", argv[0], i);
r = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY,
msg, strlen(msg), NULL, 0, msgidp);
if (r == -1)
TEST_FAIL("Failed to produce message #%i: %s\n",
i, strerror(errno));
msgs_wait |= (1 << i);
}
/* Wait for messages to time out */
while (rd_kafka_outq_len(rk) > 0)
rd_kafka_poll(rk, 50);
if (msgs_wait != 0)
TEST_FAIL("Still waiting for messages: 0x%x\n", msgs_wait);
/* Destroy topic */
rd_kafka_topic_destroy(rkt);
/* Destroy rdkafka instance */
TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
rd_kafka_destroy(rk);
/* Wait for everything to be cleaned up since broker destroys are
* handled in its own thread. */
test_wait_exit(10);
/* If we havent failed at this point then
* there were no threads leaked */
return 0;
}

164
tests/1000-unktopic.c Normal file
View File

@ -0,0 +1,164 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2013, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/**
* Tests that producing to unknown topic fails.
* Issue #39
*
* NOTE! This test requires auto.create.topics.enable=false to be
* configured on the broker!
*/
#define _GNU_SOURCE
#include <sys/time.h>
#include <time.h>
#include "test.h"
/* Typical include path would be <librdkafka/rdkafka.h>, but this program
* is built from within the librdkafka source tree and thus differs. */
#include "rdkafka.h" /* for Kafka driver */
static int msgs_wait = 0; /* bitmask */
/**
* Delivery report callback.
* Called for each message once to signal its delivery status.
*/
static void dr_cb (rd_kafka_t *rk, void *payload, size_t len,
rd_kafka_resp_err_t err, void *opaque, void *msg_opaque) {
int msgid = *(int *)msg_opaque;
free(msg_opaque);
if (!(msgs_wait & (1 << msgid)))
TEST_FAIL("Unwanted delivery report for message #%i "
"(waiting for 0x%x)\n", msgid, msgs_wait);
TEST_SAY("Delivery report for message #%i: %s\n",
msgid, rd_kafka_err2str(err));
msgs_wait &= ~(1 << msgid);
if (err != RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
TEST_FAIL("Message #%i failed with unexpected error %s\n",
msgid, rd_kafka_err2str(err));
}
int main (int argc, char **argv) {
char topic[64];
int partition = 0;
int r;
rd_kafka_t *rk;
rd_kafka_topic_t *rkt;
rd_kafka_conf_t *conf;
rd_kafka_topic_conf_t *topic_conf;
char errstr[512];
char msg[128];
int msgcnt = 10;
int i;
/* Generate unique topic name */
test_conf_init(&conf, &topic_conf, 10);
snprintf(topic, sizeof(topic), "rdkafkatest1_unk_%x%x",
rand(), rand());
TEST_SAY("\033[33mNOTE! This test requires "
"auto.create.topics.enable=false to be configured on "
"the broker!\033[0m\n");
/* Set delivery report callback */
rd_kafka_conf_set_dr_cb(conf, dr_cb);
/* Create kafka instance */
rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf,
errstr, sizeof(errstr));
if (!rk)
TEST_FAIL("Failed to create rdkafka instance: %s\n", errstr);
TEST_SAY("Created kafka instance %s\n", rd_kafka_name(rk));
rkt = rd_kafka_topic_new(rk, topic, topic_conf);
if (!rkt)
TEST_FAIL("Failed to create topic: %s\n",
strerror(errno));
/* Produce a message */
for (i = 0 ; i < msgcnt ; i++) {
int *msgidp = malloc(sizeof(*msgidp));
*msgidp = i;
snprintf(msg, sizeof(msg), "%s test message #%i", argv[0], i);
r = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY,
msg, strlen(msg), NULL, 0, msgidp);
if (r == -1) {
if (errno == ENOENT)
TEST_SAY("Failed to produce message #%i: "
"unknown topic: good!\n", i);
else
TEST_FAIL("Failed to produce message #%i: %s\n",
i, strerror(errno));
} else {
if (i > 5)
TEST_FAIL("Message #%i produced: "
"should've failed\n", i);
msgs_wait |= (1 << i);
}
/* After half the messages: sleep to allow the metadata
* to be fetched from broker and update the actual partition
* count: this will make subsequent produce() calls fail
* immediately. */
if (i == 5)
sleep(2);
}
/* Wait for messages to time out */
while (rd_kafka_outq_len(rk) > 0)
rd_kafka_poll(rk, 50);
if (msgs_wait != 0)
TEST_FAIL("Still waiting for messages: 0x%x\n", msgs_wait);
/* Destroy topic */
rd_kafka_topic_destroy(rkt);
/* Destroy rdkafka instance */
TEST_SAY("Destroying kafka instance %s\n", rd_kafka_name(rk));
rd_kafka_destroy(rk);
/* Wait for everything to be cleaned up since broker destroys are
* handled in its own thread. */
test_wait_exit(10);
/* If we havent failed at this point then
* there were no threads leaked */
return 0;
}