From be33333bfabb9965cc8a3410df09edc1f180511b Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 14 Aug 2015 10:50:45 -0700 Subject: [PATCH] Bail out and try to force a reconnect if our broker's consumer is nulled Still need to explore how we're getting into this state, but this is a start to handling that scenario more gracefully References #130 --- .../lookout/verspaetung/KafkaPoller.groovy | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index f2ff492..09995b2 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -115,6 +115,14 @@ class KafkaPoller extends Thread { Long latestFromLeader(Integer leaderId, String topic, Integer partition) { SimpleConsumer consumer = this.brokerConsumerMap[leaderId] + + /* If we don't have a proper SimpleConsumer instance (e.g. null) then + * we should reconnect on our next time around + */ + if (!(consumer instanceof SimpleConsumer)) { + this.shouldReconnect = true + return 0 + } TopicAndPartition topicAndPart = new TopicAndPartition(topic, partition) /* XXX: A zero clientId into this method might not be right */ return consumer.earliestOrLatestOffset(topicAndPart, -1, 0) @@ -128,6 +136,7 @@ class KafkaPoller extends Thread { * Blocking reconnect to the Kafka brokers */ void reconnect() { + disconnectConsumers() logger.info("Creating SimpleConsumer connections for brokers") this.brokers.each { Broker b -> SimpleConsumer consumer = new SimpleConsumer(b.host, @@ -138,7 +147,7 @@ class KafkaPoller extends Thread { consumer.connect() this.brokerConsumerMap[b.id] = consumer } - this.shouldReconnect =false + this.shouldReconnect = false } /** @@ -146,8 +155,13 @@ class KafkaPoller extends Thread { */ void die() { this.keepRunning = false + disconnectConsumers() + } + + private void disconnectConsumers() { this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client -> - client.disconnect() + logger.info("Disconnecting ${client}") + client?.disconnect() } }