Fix lock-order dead-lock on metadata updates
This commit is contained in:
parent
1ade4c90e4
commit
094ae5c44e
|
@ -628,7 +628,7 @@ void rd_kafka_toppar_broker_delegate (rd_kafka_toppar_t *rktp,
|
|||
|
||||
/**
|
||||
* Update the leader for a topic+partition.
|
||||
* Returns 1 if the leader was changed, else 0.
|
||||
* Returns 1 if the leader was changed, else 0, or -1 if leader is unknown.
|
||||
* NOTE: rd_kafka_topic_wrlock(rkt) MUST be held.
|
||||
*/
|
||||
static int rd_kafka_topic_leader_update (rd_kafka_topic_t *rkt,
|
||||
|
@ -664,13 +664,9 @@ static int rd_kafka_topic_leader_update (rd_kafka_topic_t *rkt,
|
|||
|
||||
rd_kafka_toppar_broker_delegate(rktp, NULL);
|
||||
|
||||
/* Query for the topic leader (async) */
|
||||
if (had_leader)
|
||||
rd_kafka_topic_leader_query(rk, rkt);
|
||||
|
||||
rd_kafka_toppar_destroy(rktp); /* from get() */
|
||||
|
||||
return had_leader ? 1 : 0;
|
||||
return had_leader ? -1 : 0;
|
||||
}
|
||||
|
||||
|
||||
|
@ -949,6 +945,7 @@ int rd_kafka_topic_metadata_update (rd_kafka_broker_t *rkb,
|
|||
int upd = 0;
|
||||
int j;
|
||||
rd_kafka_broker_t **partbrokers;
|
||||
int query_leader = 0;
|
||||
|
||||
if (!(rkt = rd_kafka_topic_find(rkb->rkb_rk, mdt->topic)))
|
||||
return -1; /* Ignore topics that we dont have locally. */
|
||||
|
@ -993,6 +990,8 @@ int rd_kafka_topic_metadata_update (rd_kafka_broker_t *rkb,
|
|||
|
||||
/* Update leader for each partition */
|
||||
for (j = 0 ; j < mdt->partition_cnt ; j++) {
|
||||
int r;
|
||||
|
||||
rd_rkb_dbg(rkb, METADATA, "METADATA",
|
||||
" Topic %s partition %i Leader %"PRId32,
|
||||
rkt->rkt_topic->str,
|
||||
|
@ -1000,9 +999,13 @@ int rd_kafka_topic_metadata_update (rd_kafka_broker_t *rkb,
|
|||
mdt->partitions[j].leader);
|
||||
|
||||
/* Update leader for partition */
|
||||
upd += rd_kafka_topic_leader_update(rkt,
|
||||
&mdt->partitions[j],
|
||||
partbrokers[j]);
|
||||
r = rd_kafka_topic_leader_update(rkt,
|
||||
&mdt->partitions[j],
|
||||
partbrokers[j]);
|
||||
if (r == -1)
|
||||
query_leader = 1;
|
||||
|
||||
upd += (r != 0 ? 1 : 0);
|
||||
|
||||
/* Drop reference to broker (from find()) */
|
||||
if (partbrokers[j])
|
||||
|
@ -1015,6 +1018,11 @@ int rd_kafka_topic_metadata_update (rd_kafka_broker_t *rkb,
|
|||
rd_kafka_topic_assign_uas(rkt);
|
||||
|
||||
rd_kafka_topic_unlock(rkt);
|
||||
|
||||
/* Query for the topic leader (async) */
|
||||
if (query_leader)
|
||||
rd_kafka_topic_leader_query(rkt->rkt_rk, rkt);
|
||||
|
||||
rd_kafka_topic_destroy0(rkt); /* from find() */
|
||||
|
||||
return upd;
|
||||
|
|
Loading…
Reference in New Issue