diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index cfe587d..c3ee6c8 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -29,7 +29,7 @@ class KafkaPoller extends Thread { private final AbstractMap topicOffsetMap private final List onDelta private final AbstractSet currentTopics - private List brokers + private final List brokers KafkaPoller(AbstractMap map, AbstractSet topicSet) { this.topicOffsetMap = map @@ -159,14 +159,16 @@ class KafkaPoller extends Thread { void reconnect() { disconnectConsumers() LOGGER.info('Creating SimpleConsumer connections for brokers') - this.brokers.each { Broker b -> - SimpleConsumer consumer = new SimpleConsumer(b.host, - b.port, - KAFKA_TIMEOUT, - KAFKA_BUFFER, - KAFKA_CLIENT_ID) - consumer.connect() - this.brokerConsumerMap[b.id] = consumer + synchronized(this.brokers) { + this.brokers.each { Broker b -> + SimpleConsumer consumer = new SimpleConsumer(b.host, + b.port, + KAFKA_TIMEOUT, + KAFKA_BUFFER, + KAFKA_CLIENT_ID) + consumer.connect() + this.brokerConsumerMap[b.id] = consumer + } } this.shouldReconnect = false } @@ -190,8 +192,11 @@ class KafkaPoller extends Thread { * Store a new list of KafkaBroker objects and signal a reconnection */ void refresh(List brokers) { - this.brokers = brokers.collect { KafkaBroker b -> - new Broker(b.brokerId, b.host, b.port) + synchronized(this.brokers) { + this.brokers.clear() + this.brokers.addAll(brokers.collect { KafkaBroker b -> + new Broker(b.brokerId, b.host, b.port) + }) } this.shouldReconnect = true } @@ -201,7 +206,9 @@ class KafkaPoller extends Thread { * scala underpinnings */ private scala.collection.immutable.Seq getBrokersSeq() { - return JavaConversions.asScalaBuffer(this.brokers).toList() + synchronized(this.brokers) { + return JavaConversions.asScalaBuffer(this.brokers).toList() + } } /** diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy index 310551a..8420fa2 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy @@ -28,7 +28,7 @@ class BrokerTreeWatcher extends AbstractTreeWatcher { super(client) this.json = new JsonSlurper() - this.brokers = [] + this.brokers = Collections.synchronizedList([]) this.onBrokerUpdates = [] } @@ -47,9 +47,8 @@ class BrokerTreeWatcher extends AbstractTreeWatcher { */ if (event.type == TreeCacheEvent.Type.INITIALIZED) { this.isTreeInitialized = true - List threadsafeBrokers = Collections.synchronizedList(this.brokers) this.onBrokerUpdates.each { Closure c -> - c?.call(threadsafeBrokers) + c?.call(this.brokers) } return } @@ -68,6 +67,12 @@ class BrokerTreeWatcher extends AbstractTreeWatcher { Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8')) this.brokers << new KafkaBroker(brokerData, brokerId) + + if (this.isTreeInitialized) { + this.onBrokerUpdates.each { Closure c -> + c?.call(this.brokers) + } + } } /**