diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index f2ff492..09995b2 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -115,6 +115,14 @@ class KafkaPoller extends Thread { Long latestFromLeader(Integer leaderId, String topic, Integer partition) { SimpleConsumer consumer = this.brokerConsumerMap[leaderId] + + /* If we don't have a proper SimpleConsumer instance (e.g. null) then + * we should reconnect on our next time around + */ + if (!(consumer instanceof SimpleConsumer)) { + this.shouldReconnect = true + return 0 + } TopicAndPartition topicAndPart = new TopicAndPartition(topic, partition) /* XXX: A zero clientId into this method might not be right */ return consumer.earliestOrLatestOffset(topicAndPart, -1, 0) @@ -128,6 +136,7 @@ class KafkaPoller extends Thread { * Blocking reconnect to the Kafka brokers */ void reconnect() { + disconnectConsumers() logger.info("Creating SimpleConsumer connections for brokers") this.brokers.each { Broker b -> SimpleConsumer consumer = new SimpleConsumer(b.host, @@ -138,7 +147,7 @@ class KafkaPoller extends Thread { consumer.connect() this.brokerConsumerMap[b.id] = consumer } - this.shouldReconnect =false + this.shouldReconnect = false } /** @@ -146,8 +155,13 @@ class KafkaPoller extends Thread { */ void die() { this.keepRunning = false + disconnectConsumers() + } + + private void disconnectConsumers() { this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client -> - client.disconnect() + logger.info("Disconnecting ${client}") + client?.disconnect() } }