Major refactor to support dropwizard-metrics as the means of outputting metrics

This commit changes the structure of Verspaetung pretty dramatically to allow
for the registering of Gauges for the various offsets

With this change the KafkaPoller is pushing the latest offsets into a map and the
ZK consumer tree watchers are pushing consumer offsets into a separate map.

References #17
This commit is contained in:
R. Tyler Croy 2015-02-06 08:18:12 -08:00
parent f4042894fe
commit 71347594ca
14 changed files with 322 additions and 90 deletions

View File

@ -7,7 +7,7 @@ apply plugin: 'application'
group = "com.github.lookout"
description = "A utility for monitoring the delay of Kafka consumers"
version = '0.1.5'
version = '0.1.6'
mainClassName = 'com.github.lookout.verspaetung.Main'
defaultTasks 'clean', 'check'
sourceCompatibility = '1.7'

View File

@ -8,7 +8,7 @@ class KafkaBroker {
private Integer port
private Integer brokerId
public KafkaBroker(Object jsonObject, Integer brokerId) {
KafkaBroker(Object jsonObject, Integer brokerId) {
this.host = jsonObject.host
this.port = jsonObject.port
this.brokerId = brokerId

View File

@ -0,0 +1,51 @@
package com.github.lookout.verspaetung
/**
* POJO containing the necessary information to model a Kafka consumers
*/
class KafkaConsumer {
String topic
Integer partition
String name
KafkaConsumer(String topic, Integer partition, String name) {
this.topic = topic
this.partition = partition
this.name = name
}
@Override
String toString() {
return "KafkaConsumer<${topic}:${partition} - ${name}>"
}
@Override
int hashCode() {
return Objects.hash(this.topic, this.partition, this.name)
}
/**
* Return true for any two KafkaConsumer instances which have the same
* topic, partition and name properties
*/
@Override
boolean equals(Object compared) {
/* bail early for object identity */
if (this.is(compared)) {
return true
}
if (!(compared instanceof KafkaConsumer)) {
return false
}
if ((this.topic == compared.topic) &&
(this.partition == compared.partition) &&
(this.name == compared.name)) {
return true
}
return false
}
}

View File

@ -28,12 +28,14 @@ class KafkaPoller extends Thread {
private Boolean keepRunning = true
private Boolean shouldReconnect = false
private ConcurrentHashMap<Integer, SimpleConsumer> brokerConsumerMap
private AbstractMap<TopicPartition, List<zk.ConsumerOffset>> consumersMap
private AbstractMap<TopicPartition, Long> topicOffsetMap
private List<Broker> brokers
private List<Closure> onDelta
private AbstractSet<String> currentTopics
KafkaPoller(AbstractMap map) {
this.consumersMap = map
KafkaPoller(AbstractMap map, AbstractSet topicSet) {
this.topicOffsetMap = map
this.currentTopics = topicSet
this.brokerConsumerMap = [:]
this.brokers = []
this.onDelta = []
@ -49,7 +51,10 @@ class KafkaPoller extends Thread {
reconnect()
}
if (this.consumersMap.size() > 0) {
/* Only makes sense to try to dump meta-data if we've got some
* topics that we should keep an eye on
*/
if (this.currentTopics.size() > 0) {
dumpMetadata()
}
@ -64,10 +69,10 @@ class KafkaPoller extends Thread {
withTopicsAndPartitions(metadata) { tp, p ->
try {
processDeltasFor(tp, p)
captureLatestOffsetFor(tp, p)
}
catch (Exception ex) {
logger.error("Failed to process deltas for ${tp.topic}:${tp.partition}", ex)
logger.error("Failed to fetch latest for ${tp.topic}:${tp.partition}", ex)
}
}
@ -92,23 +97,15 @@ class KafkaPoller extends Thread {
/**
* Fetch the leader metadata and invoke our callbacks with the right deltas
* for the given topic and partition information
* Fetch the leader metadata and update our data structures
*/
void processDeltasFor(TopicPartition tp, Object partitionMetadata) {
void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) {
Integer leaderId = partitionMetadata.leader.get()?.id
Integer partitionId = partitionMetadata.partitionId
Long offset = latestFromLeader(leaderId, tp.topic, partitionId)
this.consumersMap[tp].each { zk.ConsumerOffset c ->
logger.debug("Values for ${c.groupName} on ${tp.topic}:${tp.partition}: ${offset} - ${c.offset}")
Long delta = offset - c.offset
this.onDelta.each { Closure callback ->
callback.call(c.groupName, tp, delta)
}
}
this.topicOffsetMap[tp] = offset
}
Long latestFromLeader(Integer leaderId, String topic, Integer partition) {
@ -159,14 +156,6 @@ class KafkaPoller extends Thread {
this.shouldReconnect = true
}
/**
* Collect all the topics from our given consumer map and return a list of
* them
*/
private List<String> collectCurrentTopics() {
return this.consumersMap.keySet().collect { TopicPartition k -> k.topic }
}
/**
* Return the brokers list as an immutable Seq collection for the Kafka
* scala underpinnings
@ -185,7 +174,7 @@ class KafkaPoller extends Thread {
private Object fetchMetadataForCurrentTopics() {
return ClientUtils.fetchTopicMetadata(
toScalaSet(new HashSet(collectCurrentTopics())),
toScalaSet(currentTopics),
brokersSeq,
KAFKA_CLIENT_ID,
KAFKA_TIMEOUT,

View File

@ -3,14 +3,14 @@ package com.github.lookout.verspaetung
import com.github.lookout.verspaetung.zk.BrokerTreeWatcher
import com.github.lookout.verspaetung.zk.KafkaSpoutTreeWatcher
import com.github.lookout.verspaetung.zk.StandardTreeWatcher
import com.github.lookout.verspaetung.metrics.ConsumerGauge
import java.util.AbstractMap
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentSkipListSet
import java.util.concurrent.TimeUnit
import groovy.transform.TypeChecked
import com.timgroup.statsd.StatsDClient
import com.timgroup.statsd.NonBlockingDogStatsDClient
import org.apache.commons.cli.*
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.framework.CuratorFrameworkFactory
@ -19,11 +19,14 @@ import org.apache.curator.framework.recipes.cache.TreeCache
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import com.codahale.metrics.*
class Main {
private static final String METRICS_PREFIX = 'verspaetung'
private static StatsDClient statsd
private static Logger logger
private static ScheduledReporter reporter
private static final MetricRegistry registry = new MetricRegistry()
static void main(String[] args) {
String statsdPrefix = METRICS_PREFIX
@ -53,74 +56,110 @@ class Main {
statsdPrefix = "${cli.getOptionValue('prefix')}.${METRICS_PREFIX}"
}
registry.register(MetricRegistry.name(Main.class, 'heartbeat'),
new metrics.HeartbeatGauge())
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperHosts, retry)
ConcurrentHashMap<TopicPartition, List<zk.ConsumerOffset>> consumers = new ConcurrentHashMap()
statsd = new NonBlockingDogStatsDClient(statsdPrefix, statsdHost, statsdPort)
client.start()
KafkaPoller poller = setupKafkaPoller(consumers, statsd, cli.hasOption('n'))
/* We need a good shared set of all the topics we should keep an eye on
* for the Kafka poller. This will be written to by the tree watchers
* and read from by the poller, e.g.
* Watcher --/write/--> watchedTopics --/read/--> KafkaPoller
*/
ConcurrentSkipListSet<String> watchedTopics = new ConcurrentSkipListSet<>()
/* consumerOffsets is where we will keep all the offsets from Zookeeper
* from the Kafka consumers
*/
ConcurrentHashMap<KafkaConsumer, Integer> consumerOffsets = new ConcurrentHashMap<>()
/* topicOffsets is where the KafkaPoller should be writing all of it's
* latest offsets from querying the Kafka brokers
*/
ConcurrentHashMap<TopicPartition, Long> topicOffsets = new ConcurrentHashMap<>()
/* Hash map for keeping track of KafkaConsumer to ConsumerGauge
* instances. We're only really doing this because the MetricRegistry
* doesn't do a terrific job of exposing this for us
*/
ConcurrentHashMap<KafkaConsumer, ConsumerGauge> consumerGauges = new ConcurrentHashMap<>()
KafkaPoller poller = new KafkaPoller(topicOffsets, watchedTopics)
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client).start()
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, consumers).start()
brokerWatcher.onBrokerUpdates << { brokers -> poller.refresh(brokers) }
poller.start()
/* Need to reuse this closure for the KafkaSpoutTreeWatcher if we have
* one
*/
Closure gaugeRegistrar = { KafkaConsumer consumer ->
registerMetricFor(consumer, consumerGauges, consumerOffsets, topicOffsets)
}
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client,
watchedTopics,
consumerOffsets)
consumerWatcher.onConsumerData << gaugeRegistrar
consumerWatcher.start()
/* Assuming that most people aren't needing to run Storm-based watchers
* as well
*/
if (cli.hasOption('s')) {
KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, consumers)
KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client,
watchedTopics,
consumerOffsets)
stormWatcher.onConsumerData << gaugeRegistrar
stormWatcher.start()
}
consumerWatcher.onInitComplete << {
logger.info("standard consumers initialized to ${consumers.size()} (topic, partition) tuples")
if (cli.hasOption('n')) {
reporter = ConsoleReporter.forRegistry(registry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build()
}
brokerWatcher.onBrokerUpdates << { brokers ->
poller.refresh(brokers)
}
logger.info("Started wait loop...")
/* Start the reporter if we've got it */
reporter?.start(1, TimeUnit.SECONDS)
logger.info("Starting wait loop...")
while (true) {
statsd?.recordGaugeValue('heartbeat', 1)
Thread.sleep(1 * 1000)
}
logger.info("exiting..")
poller.die()
poller.join()
return
}
static void registerMetricFor(KafkaConsumer consumer,
ConcurrentHashMap<KafkaConsumer, ConsumerGauge> consumerGauges,
ConcurrentHashMap<KafkaConsumer, Integer> consumerOffsets,
ConcurrentHashMap<TopicPartition, Long> topicOffsets) {
if (consumerGauges.containsKey(consumer)) {
return
}
ConsumerGauge gauge = new ConsumerGauge(consumer,
consumerOffsets,
topicOffsets)
this.registry.register(gauge.name, gauge)
}
/**
* Create and start a KafkaPoller with the given statsd and consumers map
* Create the Options option necessary for verspaetung to have CLI options
*/
static KafkaPoller setupKafkaPoller(AbstractMap consumers,
NonBlockingDogStatsDClient statsd,
Boolean dryRun) {
KafkaPoller poller = new KafkaPoller(consumers)
Closure deltaCallback = { String name, TopicPartition tp, Long delta ->
println "${tp.topic}:${tp.partition}-${name} = ${delta}"
}
if (!dryRun) {
deltaCallback = { String name, TopicPartition tp, Long delta ->
statsd.recordGaugeValue(tp.topic, delta, [
'topic' : tp.topic,
'partition' : tp.partition,
'consumer-group' : name
])
}
}
poller.onDelta << deltaCallback
poller.start()
return poller
}
static Options createCLI() {
Options options = new Options()
@ -170,6 +209,11 @@ class Main {
return options
}
/**
* Parse out all the command line options from the array of string
* arguments
*/
static CommandLine parseCommandLine(String[] args) {
Options options = createCLI()
PosixParser parser = new PosixParser()

View File

@ -0,0 +1,43 @@
package com.github.lookout.verspaetung.metrics
import java.util.AbstractMap
import com.codahale.metrics.Gauge
import groovy.transform.TypeChecked
import com.github.lookout.verspaetung.KafkaConsumer
import com.github.lookout.verspaetung.TopicPartition
/**
* Dropwizard Metrics Gauge for reporting the value of a given KafkaConsumer
*/
@TypeChecked
class ConsumerGauge implements Gauge<Integer> {
protected KafkaConsumer consumer
protected AbstractMap<KafkaConsumer, Integer> consumers
protected AbstractMap<TopicPartition, Long> topics
private TopicPartition topicPartition
ConsumerGauge(KafkaConsumer consumer,
AbstractMap<KafkaConsumer, Integer> consumers,
AbstractMap<TopicPartition, Long> topics) {
this.consumer = consumer
this.consumers = consumers
this.topics = topics
this.topicPartition = new TopicPartition(consumer.topic, consumer.partition)
}
@Override
Integer getValue() {
if ((!this.consumers.containsKey(consumer)) ||
(!this.topics.containsKey(topicPartition))) {
return 0
}
return ((Integer)this.topics[topicPartition]) - this.consumers[consumer]
}
String getName() {
return "verspaetung.${this.consumer.topic}.${this.consumer.partition}.${this.consumer.name}"
}
}

View File

@ -0,0 +1,15 @@
package com.github.lookout.verspaetung.metrics
import com.codahale.metrics.*
/**
* A simple gauge that will always just return 1 indicating that the process is
* alive
*/
class HeartbeatGauge implements Gauge<Integer> {
@Override
Integer getValue() {
return 1
}
}

View File

@ -1,5 +1,6 @@
package com.github.lookout.verspaetung.zk
import com.github.lookout.verspaetung.KafkaConsumer
import com.github.lookout.verspaetung.TopicPartition
import java.util.concurrent.CopyOnWriteArrayList
@ -11,12 +12,16 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent
@TypeChecked
abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
protected AbstractMap<TopicPartition, List<ConsumerOffset>> consumersMap
protected AbstractMap<KafkaConsumer, Integer> consumerOffsets
protected AbstractSet<String> watchedTopics
protected List<Closure> onConsumerData = []
AbstractConsumerTreeWatcher(CuratorFramework client,
AbstractMap consumersMap) {
AbstractSet topics,
AbstractMap offsets) {
super(client)
this.consumersMap = consumersMap
this.watchedTopics = topics
this.consumerOffsets = offsets
}
/**
@ -61,17 +66,18 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
* this class on instantiation
*/
void trackConsumerOffset(ConsumerOffset offset) {
if (this.consumersMap == null) {
if (this.consumerOffsets == null) {
return
}
TopicPartition key = new TopicPartition(offset.topic, offset.partition)
this.watchedTopics << offset.topic
KafkaConsumer consumer = new KafkaConsumer(offset.topic,
offset.partition,
offset.groupName)
this.consumerOffsets[consumer] = offset.offset
if (this.consumersMap.containsKey(key)) {
this.consumersMap[key] << offset
}
else {
this.consumersMap[key] = new CopyOnWriteArrayList([offset])
this.onConsumerData.each { Closure c ->
c.call(consumer)
}
}

View File

@ -17,8 +17,10 @@ class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher {
private static final String ZK_PATH = '/kafka_spout'
private JsonSlurper json
KafkaSpoutTreeWatcher(CuratorFramework client, AbstractMap consumersMap) {
super(client, consumersMap)
KafkaSpoutTreeWatcher(CuratorFramework client,
AbstractSet topics,
AbstractMap offsets) {
super(client, topics, offsets)
this.json = new JsonSlurper()
}

View File

@ -0,0 +1,38 @@
package com.github.lookout.verspaetung
import spock.lang.*
class KafkaConsumerSpec extends Specification {
String topic = 'spock-topic'
Integer partition = 2
String consumerName = 'spock-consumer'
def "the constructor should set the properties properly"() {
given:
KafkaConsumer consumer = new KafkaConsumer(topic, partition, consumerName)
expect:
consumer instanceof KafkaConsumer
consumer.topic == topic
consumer.partition == partition
consumer.name == consumerName
}
def "equals() is true with identical source material"() {
given:
KafkaConsumer consumer1 = new KafkaConsumer(topic, partition, consumerName)
KafkaConsumer consumer2 = new KafkaConsumer(topic, partition, consumerName)
expect:
consumer1 == consumer2
}
def "equals() is false with differing source material"() {
given:
KafkaConsumer consumer1 = new KafkaConsumer(topic, partition, consumerName)
KafkaConsumer consumer2 = new KafkaConsumer(topic, partition, "i am different")
expect:
consumer1 != consumer2
}
}

View File

@ -0,0 +1,43 @@
package com.github.lookout.verspaetung.metrics
import spock.lang.*
import com.github.lookout.verspaetung.KafkaConsumer
import com.github.lookout.verspaetung.TopicPartition
class ConsumerGaugeSpec extends Specification {
private KafkaConsumer consumer
private TopicPartition tp
def setup() {
this.tp = new TopicPartition('spock-topic', 1)
this.consumer = new KafkaConsumer(tp.topic, tp.partition, 'spock-consumer')
}
def "constructor should work"() {
given:
ConsumerGauge gauge = new ConsumerGauge(consumer, [:], [:])
expect:
gauge.consumer instanceof KafkaConsumer
gauge.consumers instanceof AbstractMap
}
def "getValue() should source a value from the map"() {
given:
ConsumerGauge gauge = new ConsumerGauge(this.consumer,
[(this.consumer) : 2],
[(this.tp) : 3])
expect:
gauge.value == 1
}
def "getValue() should return zero for a consumer not in the map"() {
given:
ConsumerGauge gauge = new ConsumerGauge(consumer, [:], [:])
expect:
gauge.value == 0
}
}

View File

@ -13,7 +13,7 @@ class AbstractConsumerTreeWatcherSpec extends Specification {
class MockWatcher extends AbstractConsumerTreeWatcher {
MockWatcher() {
super(null, [:])
super(null, new HashSet(), [:])
}
ConsumerOffset processChildData(ChildData d) { }
String zookeeperPath() { return '/zk/spock' }
@ -63,10 +63,11 @@ class AbstractConsumerTreeWatcherSpec extends Specification {
watcher.trackConsumerOffset(offset)
then:
watcher.consumersMap.size() == 1
watcher.consumerOffsets.size() == 1
watcher.watchedTopics.size() == 1
}
def "trackConsumerOffset() should append to a list for existing topics in the map"() {
def "trackConsumerOffset() append an offset but not a topic for different group names"() {
given:
String topic = 'spock-topic'
TopicPartition mapKey = new TopicPartition(topic, 0)
@ -80,8 +81,8 @@ class AbstractConsumerTreeWatcherSpec extends Specification {
watcher.trackConsumerOffset(secondOffset)
then:
watcher.consumersMap.size() == 1
watcher.consumersMap[mapKey].size() == 2
watcher.watchedTopics.size() == 1
watcher.consumerOffsets.size() == 2
}

View File

@ -11,7 +11,7 @@ class KafkaSpoutTreeWatcherSpec extends Specification {
def setup() {
this.mockCurator = Mock(CuratorFramework)
this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, [:])
this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, new HashSet(), [:])
}
def "consumerNameFromPath() should give the right name for a valid path"() {

View File

@ -11,7 +11,7 @@ class StandardTreeWatcherSpec extends Specification {
def setup() {
this.mockCurator = Mock(CuratorFramework)
this.watcher = new StandardTreeWatcher(this.mockCurator, [:])
this.watcher = new StandardTreeWatcher(this.mockCurator, new HashSet(), [:])
}
def "processChildData should return null if the path is invalid"() {