Restructure KafkaPoller to make catching and logging exceptions easier

I've not cleaned this up enough to make things easier to test, partially
because mocking the Kafka client APIs is a giant pain in the ass

Fixes #16
This commit is contained in:
R. Tyler Croy 2015-01-30 02:43:35 -08:00
parent 0fa983d0e8
commit 7b13050ee2
2 changed files with 93 additions and 37 deletions

View File

@ -19,6 +19,7 @@ import scala.collection.JavaConversions
*/
//@TypeChecked
class KafkaPoller extends Thread {
private final Integer POLLER_DELAY = (1 * 1000)
private final String KAFKA_CLIENT_ID = 'VerspaetungClient'
private final Integer KAFKA_TIMEOUT = (5 * 1000)
private final Integer KAFKA_BUFFER = (100 * 1024)
@ -26,17 +27,21 @@ class KafkaPoller extends Thread {
private Boolean keepRunning = true
private Boolean shouldReconnect = false
private ConcurrentHashMap<Integer, SimpleConsumer> brokerConsumerMap = [:]
private List<Broker> brokers = []
private ConcurrentHashMap<Integer, SimpleConsumer> brokerConsumerMap
private AbstractMap<TopicPartition, List<zk.ConsumerOffset>> consumersMap
private List<Closure> onDelta = []
private List<Broker> brokers
private List<Closure> onDelta
KafkaPoller(AbstractMap map) {
this.consumersMap = map
this.brokerConsumerMap = [:]
this.brokers = []
this.onDelta = []
}
void run() {
logger.info("Starting wait loop")
while (keepRunning) {
logger.debug("poll loop")
@ -48,38 +53,63 @@ class KafkaPoller extends Thread {
dumpMetadata()
}
Thread.sleep(1 * 1000)
Thread.sleep(POLLER_DELAY)
}
}
void dumpMetadata() {
logger.debug("dumping meta-data")
def topics = this.consumersMap.keySet().collect { TopicPartition k -> k.topic }
def metadata = ClientUtils.fetchTopicMetadata(toScalaSet(new HashSet(topics)),
brokersSeq,
KAFKA_CLIENT_ID,
KAFKA_TIMEOUT,
0)
def metadata = fetchMetadataForCurrentTopics()
withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f ->
withScalaCollection(f.partitionsMetadata).each { p ->
Long offset = latestFromLeader(p.leader.get()?.id, f.topic, p.partitionId)
TopicPartition tp = new TopicPartition(f.topic, p.partitionId)
this.consumersMap[tp].each { zk.ConsumerOffset c ->
logger.debug("Values for ${c.groupName} on ${tp.topic}:${tp.partition}: ${offset} - ${c.offset}")
Long delta = offset - c.offset
this.onDelta.each { Closure callback ->
callback.call(c.groupName, tp, delta)
}
}
withTopicsAndPartitions(metadata) { tp, p ->
try {
processDeltasFor(tp, p)
}
catch (Exception ex) {
logger.error("Failed to process deltas for ${tp.topic}:${tp.partition}", ex)
}
}
logger.debug("finished dumping meta-data")
}
/**
* Invoke the given closure with the TopicPartition and Partition meta-data
* informationn for all of the topic meta-data that was passed in.
*
* The 'metadata' is the expected return from
* kafka.client.ClientUtils.fetchTopicMetadata
*/
void withTopicsAndPartitions(Object metadata, Closure closure) {
withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f ->
withScalaCollection(f.partitionsMetadata).each { p ->
TopicPartition tp = new TopicPartition(f.topic, p.partitionId)
closure.call(tp, p)
}
}
}
/**
* Fetch the leader metadata and invoke our callbacks with the right deltas
* for the given topic and partition information
*/
void processDeltasFor(TopicPartition tp, Object partitionMetadata) {
Integer leaderId = partitionMetadata.leader.get()?.id
Integer partitionId = partitionMetadata.partitionId
Long offset = latestFromLeader(leaderId, tp.topic, partitionId)
this.consumersMap[tp].each { zk.ConsumerOffset c ->
logger.debug("Values for ${c.groupName} on ${tp.topic}:${tp.partition}: ${offset} - ${c.offset}")
Long delta = offset - c.offset
this.onDelta.each { Closure callback ->
callback.call(c.groupName, tp, delta)
}
}
}
Long latestFromLeader(Integer leaderId, String topic, Integer partition) {
SimpleConsumer consumer = this.brokerConsumerMap[leaderId]
@ -109,21 +139,6 @@ class KafkaPoller extends Thread {
this.shouldReconnect =false
}
/**
* Return the brokers list as an immutable Seq collection for the Kafka
* scala underpinnings
*/
scala.collection.immutable.Seq getBrokersSeq() {
return JavaConversions.asScalaBuffer(this.brokers).toList()
}
/**
* Return scala.collection.mutable.Set for the given List
*/
scala.collection.mutable.Set toScalaSet(Set set) {
return JavaConversions.asScalaSet(set)
}
/**
* Signal the runloop to safely die after it's next iteration
*/
@ -143,4 +158,39 @@ class KafkaPoller extends Thread {
}
this.shouldReconnect = true
}
/**
* Collect all the topics from our given consumer map and return a list of
* them
*/
private List<String> collectCurrentTopics() {
return this.consumersMap.keySet().collect { TopicPartition k -> k.topic }
}
/**
* Return the brokers list as an immutable Seq collection for the Kafka
* scala underpinnings
*/
private scala.collection.immutable.Seq getBrokersSeq() {
return JavaConversions.asScalaBuffer(this.brokers).toList()
}
/**
* Return scala.collection.mutable.Set for the given List
*/
private scala.collection.mutable.Set toScalaSet(Set set) {
return JavaConversions.asScalaSet(set)
}
private Object fetchMetadataForCurrentTopics() {
return ClientUtils.fetchTopicMetadata(
toScalaSet(new HashSet(collectCurrentTopics())),
brokersSeq,
KAFKA_CLIENT_ID,
KAFKA_TIMEOUT,
0)
}
}

View File

@ -0,0 +1,6 @@
package com.github.lookout.verspaetung
import spock.lang.*
class KafkaPollerSpec extends Specification {
}