allow new nodes to be added when they pop up

also ensure the broker list is synchronized in both the KafkaPoller
as well the BrokerTreeWatcher threads
This commit is contained in:
Christian Meier 2015-11-02 19:51:32 +01:00
parent 59c8e79f4e
commit 45d98cb5f6
2 changed files with 27 additions and 15 deletions

View File

@ -29,7 +29,7 @@ class KafkaPoller extends Thread {
private final AbstractMap<TopicPartition, Long> topicOffsetMap private final AbstractMap<TopicPartition, Long> topicOffsetMap
private final List<Closure> onDelta private final List<Closure> onDelta
private final AbstractSet<String> currentTopics private final AbstractSet<String> currentTopics
private List<Broker> brokers private final List<Broker> brokers
KafkaPoller(AbstractMap map, AbstractSet topicSet) { KafkaPoller(AbstractMap map, AbstractSet topicSet) {
this.topicOffsetMap = map this.topicOffsetMap = map
@ -159,14 +159,16 @@ class KafkaPoller extends Thread {
void reconnect() { void reconnect() {
disconnectConsumers() disconnectConsumers()
LOGGER.info('Creating SimpleConsumer connections for brokers') LOGGER.info('Creating SimpleConsumer connections for brokers')
this.brokers.each { Broker b -> synchronized(this.brokers) {
SimpleConsumer consumer = new SimpleConsumer(b.host, this.brokers.each { Broker b ->
b.port, SimpleConsumer consumer = new SimpleConsumer(b.host,
KAFKA_TIMEOUT, b.port,
KAFKA_BUFFER, KAFKA_TIMEOUT,
KAFKA_CLIENT_ID) KAFKA_BUFFER,
consumer.connect() KAFKA_CLIENT_ID)
this.brokerConsumerMap[b.id] = consumer consumer.connect()
this.brokerConsumerMap[b.id] = consumer
}
} }
this.shouldReconnect = false this.shouldReconnect = false
} }
@ -190,8 +192,11 @@ class KafkaPoller extends Thread {
* Store a new list of KafkaBroker objects and signal a reconnection * Store a new list of KafkaBroker objects and signal a reconnection
*/ */
void refresh(List<KafkaBroker> brokers) { void refresh(List<KafkaBroker> brokers) {
this.brokers = brokers.collect { KafkaBroker b -> synchronized(this.brokers) {
new Broker(b.brokerId, b.host, b.port) this.brokers.clear()
this.brokers.addAll(brokers.collect { KafkaBroker b ->
new Broker(b.brokerId, b.host, b.port)
})
} }
this.shouldReconnect = true this.shouldReconnect = true
} }
@ -201,7 +206,9 @@ class KafkaPoller extends Thread {
* scala underpinnings * scala underpinnings
*/ */
private scala.collection.immutable.Seq getBrokersSeq() { private scala.collection.immutable.Seq getBrokersSeq() {
return JavaConversions.asScalaBuffer(this.brokers).toList() synchronized(this.brokers) {
return JavaConversions.asScalaBuffer(this.brokers).toList()
}
} }
/** /**

View File

@ -28,7 +28,7 @@ class BrokerTreeWatcher extends AbstractTreeWatcher {
super(client) super(client)
this.json = new JsonSlurper() this.json = new JsonSlurper()
this.brokers = [] this.brokers = Collections.synchronizedList([])
this.onBrokerUpdates = [] this.onBrokerUpdates = []
} }
@ -47,9 +47,8 @@ class BrokerTreeWatcher extends AbstractTreeWatcher {
*/ */
if (event.type == TreeCacheEvent.Type.INITIALIZED) { if (event.type == TreeCacheEvent.Type.INITIALIZED) {
this.isTreeInitialized = true this.isTreeInitialized = true
List threadsafeBrokers = Collections.synchronizedList(this.brokers)
this.onBrokerUpdates.each { Closure c -> this.onBrokerUpdates.each { Closure c ->
c?.call(threadsafeBrokers) c?.call(this.brokers)
} }
return return
} }
@ -68,6 +67,12 @@ class BrokerTreeWatcher extends AbstractTreeWatcher {
Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8')) Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8'))
this.brokers << new KafkaBroker(brokerData, brokerId) this.brokers << new KafkaBroker(brokerData, brokerId)
if (this.isTreeInitialized) {
this.onBrokerUpdates.each { Closure c ->
c?.call(this.brokers)
}
}
} }
/** /**