From 7b13050ee2d345560034f99076c4e5b86131ce7a Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 30 Jan 2015 02:43:35 -0800 Subject: [PATCH] Restructure KafkaPoller to make catching and logging exceptions easier I've not cleaned this up enough to make things easier to test, partially because mocking the Kafka client APIs is a giant pain in the ass Fixes #16 --- .../lookout/verspaetung/KafkaPoller.groovy | 124 ++++++++++++------ .../verspaetung/KafkaPollerSpec.groovy | 6 + 2 files changed, 93 insertions(+), 37 deletions(-) create mode 100644 src/test/groovy/com/github/lookout/verspaetung/KafkaPollerSpec.groovy diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index 7ffa0a2..9337770 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -19,6 +19,7 @@ import scala.collection.JavaConversions */ //@TypeChecked class KafkaPoller extends Thread { + private final Integer POLLER_DELAY = (1 * 1000) private final String KAFKA_CLIENT_ID = 'VerspaetungClient' private final Integer KAFKA_TIMEOUT = (5 * 1000) private final Integer KAFKA_BUFFER = (100 * 1024) @@ -26,17 +27,21 @@ class KafkaPoller extends Thread { private Boolean keepRunning = true private Boolean shouldReconnect = false - private ConcurrentHashMap brokerConsumerMap = [:] - private List brokers = [] + private ConcurrentHashMap brokerConsumerMap private AbstractMap> consumersMap - private List onDelta = [] + private List brokers + private List onDelta KafkaPoller(AbstractMap map) { this.consumersMap = map + this.brokerConsumerMap = [:] + this.brokers = [] + this.onDelta = [] } void run() { logger.info("Starting wait loop") + while (keepRunning) { logger.debug("poll loop") @@ -48,38 +53,63 @@ class KafkaPoller extends Thread { dumpMetadata() } - Thread.sleep(1 * 1000) + Thread.sleep(POLLER_DELAY) } } void dumpMetadata() { logger.debug("dumping meta-data") - def topics = this.consumersMap.keySet().collect { TopicPartition k -> k.topic } - def metadata = ClientUtils.fetchTopicMetadata(toScalaSet(new HashSet(topics)), - brokersSeq, - KAFKA_CLIENT_ID, - KAFKA_TIMEOUT, - 0) + def metadata = fetchMetadataForCurrentTopics() - withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f -> - withScalaCollection(f.partitionsMetadata).each { p -> - Long offset = latestFromLeader(p.leader.get()?.id, f.topic, p.partitionId) - TopicPartition tp = new TopicPartition(f.topic, p.partitionId) - - this.consumersMap[tp].each { zk.ConsumerOffset c -> - logger.debug("Values for ${c.groupName} on ${tp.topic}:${tp.partition}: ${offset} - ${c.offset}") - Long delta = offset - c.offset - this.onDelta.each { Closure callback -> - callback.call(c.groupName, tp, delta) - } - } + withTopicsAndPartitions(metadata) { tp, p -> + try { + processDeltasFor(tp, p) + } + catch (Exception ex) { + logger.error("Failed to process deltas for ${tp.topic}:${tp.partition}", ex) } } logger.debug("finished dumping meta-data") } + /** + * Invoke the given closure with the TopicPartition and Partition meta-data + * informationn for all of the topic meta-data that was passed in. + * + * The 'metadata' is the expected return from + * kafka.client.ClientUtils.fetchTopicMetadata + */ + void withTopicsAndPartitions(Object metadata, Closure closure) { + withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f -> + withScalaCollection(f.partitionsMetadata).each { p -> + TopicPartition tp = new TopicPartition(f.topic, p.partitionId) + closure.call(tp, p) + } + } + } + + + /** + * Fetch the leader metadata and invoke our callbacks with the right deltas + * for the given topic and partition information + */ + void processDeltasFor(TopicPartition tp, Object partitionMetadata) { + Integer leaderId = partitionMetadata.leader.get()?.id + Integer partitionId = partitionMetadata.partitionId + + Long offset = latestFromLeader(leaderId, tp.topic, partitionId) + + this.consumersMap[tp].each { zk.ConsumerOffset c -> + logger.debug("Values for ${c.groupName} on ${tp.topic}:${tp.partition}: ${offset} - ${c.offset}") + + Long delta = offset - c.offset + this.onDelta.each { Closure callback -> + callback.call(c.groupName, tp, delta) + } + } + } Long latestFromLeader(Integer leaderId, String topic, Integer partition) { SimpleConsumer consumer = this.brokerConsumerMap[leaderId] @@ -109,21 +139,6 @@ class KafkaPoller extends Thread { this.shouldReconnect =false } - /** - * Return the brokers list as an immutable Seq collection for the Kafka - * scala underpinnings - */ - scala.collection.immutable.Seq getBrokersSeq() { - return JavaConversions.asScalaBuffer(this.brokers).toList() - } - - /** - * Return scala.collection.mutable.Set for the given List - */ - scala.collection.mutable.Set toScalaSet(Set set) { - return JavaConversions.asScalaSet(set) - } - /** * Signal the runloop to safely die after it's next iteration */ @@ -143,4 +158,39 @@ class KafkaPoller extends Thread { } this.shouldReconnect = true } + + /** + * Collect all the topics from our given consumer map and return a list of + * them + */ + private List collectCurrentTopics() { + return this.consumersMap.keySet().collect { TopicPartition k -> k.topic } + } + + /** + * Return the brokers list as an immutable Seq collection for the Kafka + * scala underpinnings + */ + private scala.collection.immutable.Seq getBrokersSeq() { + return JavaConversions.asScalaBuffer(this.brokers).toList() + } + + /** + * Return scala.collection.mutable.Set for the given List + */ + private scala.collection.mutable.Set toScalaSet(Set set) { + return JavaConversions.asScalaSet(set) + } + + + private Object fetchMetadataForCurrentTopics() { + return ClientUtils.fetchTopicMetadata( + toScalaSet(new HashSet(collectCurrentTopics())), + brokersSeq, + KAFKA_CLIENT_ID, + KAFKA_TIMEOUT, + 0) + } + + } diff --git a/src/test/groovy/com/github/lookout/verspaetung/KafkaPollerSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/KafkaPollerSpec.groovy new file mode 100644 index 0000000..013e126 --- /dev/null +++ b/src/test/groovy/com/github/lookout/verspaetung/KafkaPollerSpec.groovy @@ -0,0 +1,6 @@ +package com.github.lookout.verspaetung + +import spock.lang.* + +class KafkaPollerSpec extends Specification { +}