From f4042894fed1bc0d1f87d37ea51d126e45ff0c44 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 30 Jan 2015 07:30:15 -0800 Subject: [PATCH 1/3] Add a compile and runtime dependency on the metrics core and graphite code References #17 --- build.gradle | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/build.gradle b/build.gradle index bda8333..b02d3af 100644 --- a/build.gradle +++ b/build.gradle @@ -57,6 +57,10 @@ dependencies { compile 'com.timgroup:java-statsd-client:3.1.2+' + ['metrics-core', 'metrics-graphite'].each { artifactName -> + compile "io.dropwizard.metrics:${artifactName}:3.1.0" + } + /* Logback is to be used for logging through the app */ compile 'ch.qos.logback:logback-classic:1.1.2+' From 71347594cab20c0f36a9b4cdf0074b1754724af9 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 6 Feb 2015 08:18:12 -0800 Subject: [PATCH 2/3] Major refactor to support dropwizard-metrics as the means of outputting metrics This commit changes the structure of Verspaetung pretty dramatically to allow for the registering of Gauges for the various offsets With this change the KafkaPoller is pushing the latest offsets into a map and the ZK consumer tree watchers are pushing consumer offsets into a separate map. References #17 --- build.gradle | 2 +- .../lookout/verspaetung/KafkaBroker.groovy | 2 +- .../lookout/verspaetung/KafkaConsumer.groovy | 51 +++++++ .../lookout/verspaetung/KafkaPoller.groovy | 41 ++---- .../github/lookout/verspaetung/Main.groovy | 130 ++++++++++++------ .../verspaetung/metrics/ConsumerGauge.groovy | 43 ++++++ .../verspaetung/metrics/HeartbeatGauge.groovy | 15 ++ .../zk/AbstractConsumerTreeWatcher.groovy | 26 ++-- .../zk/KafkaSpoutTreeWatcher.groovy | 6 +- .../verspaetung/KafkaConsumerSpec.groovy | 38 +++++ .../metrics/ConsumerGaugeSpec.groovy | 43 ++++++ .../zk/AbstractConsumerTreeWatcherSpec.groovy | 11 +- .../zk/KafkaSpoutTreeWatcherSpec.groovy | 2 +- .../zk/StandardTreeWatcherSpec.groovy | 2 +- 14 files changed, 322 insertions(+), 90 deletions(-) create mode 100644 src/main/groovy/com/github/lookout/verspaetung/KafkaConsumer.groovy create mode 100644 src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy create mode 100644 src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy create mode 100644 src/test/groovy/com/github/lookout/verspaetung/KafkaConsumerSpec.groovy create mode 100644 src/test/groovy/com/github/lookout/verspaetung/metrics/ConsumerGaugeSpec.groovy diff --git a/build.gradle b/build.gradle index b02d3af..144f50d 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ apply plugin: 'application' group = "com.github.lookout" description = "A utility for monitoring the delay of Kafka consumers" -version = '0.1.5' +version = '0.1.6' mainClassName = 'com.github.lookout.verspaetung.Main' defaultTasks 'clean', 'check' sourceCompatibility = '1.7' diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaBroker.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaBroker.groovy index c6ea3f9..c22434e 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaBroker.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaBroker.groovy @@ -8,7 +8,7 @@ class KafkaBroker { private Integer port private Integer brokerId - public KafkaBroker(Object jsonObject, Integer brokerId) { + KafkaBroker(Object jsonObject, Integer brokerId) { this.host = jsonObject.host this.port = jsonObject.port this.brokerId = brokerId diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaConsumer.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaConsumer.groovy new file mode 100644 index 0000000..a7db815 --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaConsumer.groovy @@ -0,0 +1,51 @@ +package com.github.lookout.verspaetung + +/** + * POJO containing the necessary information to model a Kafka consumers + */ +class KafkaConsumer { + String topic + Integer partition + String name + + KafkaConsumer(String topic, Integer partition, String name) { + this.topic = topic + this.partition = partition + this.name = name + } + + @Override + String toString() { + return "KafkaConsumer<${topic}:${partition} - ${name}>" + } + + @Override + int hashCode() { + return Objects.hash(this.topic, this.partition, this.name) + } + + /** + * Return true for any two KafkaConsumer instances which have the same + * topic, partition and name properties + */ + @Override + boolean equals(Object compared) { + /* bail early for object identity */ + if (this.is(compared)) { + return true + } + + if (!(compared instanceof KafkaConsumer)) { + return false + } + + if ((this.topic == compared.topic) && + (this.partition == compared.partition) && + (this.name == compared.name)) { + return true + } + + return false + } + +} diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index 9337770..d74e7a7 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -28,12 +28,14 @@ class KafkaPoller extends Thread { private Boolean keepRunning = true private Boolean shouldReconnect = false private ConcurrentHashMap brokerConsumerMap - private AbstractMap> consumersMap + private AbstractMap topicOffsetMap private List brokers private List onDelta + private AbstractSet currentTopics - KafkaPoller(AbstractMap map) { - this.consumersMap = map + KafkaPoller(AbstractMap map, AbstractSet topicSet) { + this.topicOffsetMap = map + this.currentTopics = topicSet this.brokerConsumerMap = [:] this.brokers = [] this.onDelta = [] @@ -49,7 +51,10 @@ class KafkaPoller extends Thread { reconnect() } - if (this.consumersMap.size() > 0) { + /* Only makes sense to try to dump meta-data if we've got some + * topics that we should keep an eye on + */ + if (this.currentTopics.size() > 0) { dumpMetadata() } @@ -64,10 +69,10 @@ class KafkaPoller extends Thread { withTopicsAndPartitions(metadata) { tp, p -> try { - processDeltasFor(tp, p) + captureLatestOffsetFor(tp, p) } catch (Exception ex) { - logger.error("Failed to process deltas for ${tp.topic}:${tp.partition}", ex) + logger.error("Failed to fetch latest for ${tp.topic}:${tp.partition}", ex) } } @@ -92,23 +97,15 @@ class KafkaPoller extends Thread { /** - * Fetch the leader metadata and invoke our callbacks with the right deltas - * for the given topic and partition information + * Fetch the leader metadata and update our data structures */ - void processDeltasFor(TopicPartition tp, Object partitionMetadata) { + void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) { Integer leaderId = partitionMetadata.leader.get()?.id Integer partitionId = partitionMetadata.partitionId Long offset = latestFromLeader(leaderId, tp.topic, partitionId) - this.consumersMap[tp].each { zk.ConsumerOffset c -> - logger.debug("Values for ${c.groupName} on ${tp.topic}:${tp.partition}: ${offset} - ${c.offset}") - - Long delta = offset - c.offset - this.onDelta.each { Closure callback -> - callback.call(c.groupName, tp, delta) - } - } + this.topicOffsetMap[tp] = offset } Long latestFromLeader(Integer leaderId, String topic, Integer partition) { @@ -159,14 +156,6 @@ class KafkaPoller extends Thread { this.shouldReconnect = true } - /** - * Collect all the topics from our given consumer map and return a list of - * them - */ - private List collectCurrentTopics() { - return this.consumersMap.keySet().collect { TopicPartition k -> k.topic } - } - /** * Return the brokers list as an immutable Seq collection for the Kafka * scala underpinnings @@ -185,7 +174,7 @@ class KafkaPoller extends Thread { private Object fetchMetadataForCurrentTopics() { return ClientUtils.fetchTopicMetadata( - toScalaSet(new HashSet(collectCurrentTopics())), + toScalaSet(currentTopics), brokersSeq, KAFKA_CLIENT_ID, KAFKA_TIMEOUT, diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index e3a7c20..c19ecc7 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -3,14 +3,14 @@ package com.github.lookout.verspaetung import com.github.lookout.verspaetung.zk.BrokerTreeWatcher import com.github.lookout.verspaetung.zk.KafkaSpoutTreeWatcher import com.github.lookout.verspaetung.zk.StandardTreeWatcher +import com.github.lookout.verspaetung.metrics.ConsumerGauge import java.util.AbstractMap import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentSkipListSet +import java.util.concurrent.TimeUnit import groovy.transform.TypeChecked -import com.timgroup.statsd.StatsDClient -import com.timgroup.statsd.NonBlockingDogStatsDClient - import org.apache.commons.cli.* import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.framework.CuratorFrameworkFactory @@ -19,11 +19,14 @@ import org.apache.curator.framework.recipes.cache.TreeCache import org.slf4j.Logger import org.slf4j.LoggerFactory +import com.codahale.metrics.* + class Main { private static final String METRICS_PREFIX = 'verspaetung' - private static StatsDClient statsd private static Logger logger + private static ScheduledReporter reporter + private static final MetricRegistry registry = new MetricRegistry() static void main(String[] args) { String statsdPrefix = METRICS_PREFIX @@ -53,74 +56,110 @@ class Main { statsdPrefix = "${cli.getOptionValue('prefix')}.${METRICS_PREFIX}" } + + registry.register(MetricRegistry.name(Main.class, 'heartbeat'), + new metrics.HeartbeatGauge()) + ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3) CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperHosts, retry) - ConcurrentHashMap> consumers = new ConcurrentHashMap() - - statsd = new NonBlockingDogStatsDClient(statsdPrefix, statsdHost, statsdPort) - client.start() - KafkaPoller poller = setupKafkaPoller(consumers, statsd, cli.hasOption('n')) + /* We need a good shared set of all the topics we should keep an eye on + * for the Kafka poller. This will be written to by the tree watchers + * and read from by the poller, e.g. + * Watcher --/write/--> watchedTopics --/read/--> KafkaPoller + */ + ConcurrentSkipListSet watchedTopics = new ConcurrentSkipListSet<>() + + /* consumerOffsets is where we will keep all the offsets from Zookeeper + * from the Kafka consumers + */ + ConcurrentHashMap consumerOffsets = new ConcurrentHashMap<>() + + /* topicOffsets is where the KafkaPoller should be writing all of it's + * latest offsets from querying the Kafka brokers + */ + ConcurrentHashMap topicOffsets = new ConcurrentHashMap<>() + + /* Hash map for keeping track of KafkaConsumer to ConsumerGauge + * instances. We're only really doing this because the MetricRegistry + * doesn't do a terrific job of exposing this for us + */ + ConcurrentHashMap consumerGauges = new ConcurrentHashMap<>() + + + KafkaPoller poller = new KafkaPoller(topicOffsets, watchedTopics) BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client).start() - StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, consumers).start() + brokerWatcher.onBrokerUpdates << { brokers -> poller.refresh(brokers) } + + poller.start() + + /* Need to reuse this closure for the KafkaSpoutTreeWatcher if we have + * one + */ + Closure gaugeRegistrar = { KafkaConsumer consumer -> + registerMetricFor(consumer, consumerGauges, consumerOffsets, topicOffsets) + } + + StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, + watchedTopics, + consumerOffsets) + consumerWatcher.onConsumerData << gaugeRegistrar + consumerWatcher.start() + /* Assuming that most people aren't needing to run Storm-based watchers * as well */ if (cli.hasOption('s')) { - KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, consumers) + KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, + watchedTopics, + consumerOffsets) + stormWatcher.onConsumerData << gaugeRegistrar stormWatcher.start() } - consumerWatcher.onInitComplete << { - logger.info("standard consumers initialized to ${consumers.size()} (topic, partition) tuples") + + if (cli.hasOption('n')) { + reporter = ConsoleReporter.forRegistry(registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build() } - brokerWatcher.onBrokerUpdates << { brokers -> - poller.refresh(brokers) - } - - logger.info("Started wait loop...") + /* Start the reporter if we've got it */ + reporter?.start(1, TimeUnit.SECONDS) + logger.info("Starting wait loop...") while (true) { - statsd?.recordGaugeValue('heartbeat', 1) Thread.sleep(1 * 1000) } logger.info("exiting..") poller.die() poller.join() + return } + static void registerMetricFor(KafkaConsumer consumer, + ConcurrentHashMap consumerGauges, + ConcurrentHashMap consumerOffsets, + ConcurrentHashMap topicOffsets) { + if (consumerGauges.containsKey(consumer)) { + return + } + + ConsumerGauge gauge = new ConsumerGauge(consumer, + consumerOffsets, + topicOffsets) + this.registry.register(gauge.name, gauge) + } + /** - * Create and start a KafkaPoller with the given statsd and consumers map + * Create the Options option necessary for verspaetung to have CLI options */ - static KafkaPoller setupKafkaPoller(AbstractMap consumers, - NonBlockingDogStatsDClient statsd, - Boolean dryRun) { - KafkaPoller poller = new KafkaPoller(consumers) - Closure deltaCallback = { String name, TopicPartition tp, Long delta -> - println "${tp.topic}:${tp.partition}-${name} = ${delta}" - } - - if (!dryRun) { - deltaCallback = { String name, TopicPartition tp, Long delta -> - statsd.recordGaugeValue(tp.topic, delta, [ - 'topic' : tp.topic, - 'partition' : tp.partition, - 'consumer-group' : name - ]) - } - } - - poller.onDelta << deltaCallback - poller.start() - return poller - } - static Options createCLI() { Options options = new Options() @@ -170,6 +209,11 @@ class Main { return options } + + /** + * Parse out all the command line options from the array of string + * arguments + */ static CommandLine parseCommandLine(String[] args) { Options options = createCLI() PosixParser parser = new PosixParser() diff --git a/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy b/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy new file mode 100644 index 0000000..ff9cd8b --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy @@ -0,0 +1,43 @@ +package com.github.lookout.verspaetung.metrics + +import java.util.AbstractMap +import com.codahale.metrics.Gauge +import groovy.transform.TypeChecked + +import com.github.lookout.verspaetung.KafkaConsumer +import com.github.lookout.verspaetung.TopicPartition + + +/** + * Dropwizard Metrics Gauge for reporting the value of a given KafkaConsumer + */ +@TypeChecked +class ConsumerGauge implements Gauge { + protected KafkaConsumer consumer + protected AbstractMap consumers + protected AbstractMap topics + private TopicPartition topicPartition + + ConsumerGauge(KafkaConsumer consumer, + AbstractMap consumers, + AbstractMap topics) { + this.consumer = consumer + this.consumers = consumers + this.topics = topics + + this.topicPartition = new TopicPartition(consumer.topic, consumer.partition) + } + + @Override + Integer getValue() { + if ((!this.consumers.containsKey(consumer)) || + (!this.topics.containsKey(topicPartition))) { + return 0 + } + return ((Integer)this.topics[topicPartition]) - this.consumers[consumer] + } + + String getName() { + return "verspaetung.${this.consumer.topic}.${this.consumer.partition}.${this.consumer.name}" + } +} diff --git a/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy b/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy new file mode 100644 index 0000000..410f6ea --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy @@ -0,0 +1,15 @@ +package com.github.lookout.verspaetung.metrics + +import com.codahale.metrics.* + + +/** + * A simple gauge that will always just return 1 indicating that the process is + * alive + */ +class HeartbeatGauge implements Gauge { + @Override + Integer getValue() { + return 1 + } +} diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy index 69fac7d..2a3e91a 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy @@ -1,5 +1,6 @@ package com.github.lookout.verspaetung.zk +import com.github.lookout.verspaetung.KafkaConsumer import com.github.lookout.verspaetung.TopicPartition import java.util.concurrent.CopyOnWriteArrayList @@ -11,12 +12,16 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent @TypeChecked abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher { - protected AbstractMap> consumersMap + protected AbstractMap consumerOffsets + protected AbstractSet watchedTopics + protected List onConsumerData = [] AbstractConsumerTreeWatcher(CuratorFramework client, - AbstractMap consumersMap) { + AbstractSet topics, + AbstractMap offsets) { super(client) - this.consumersMap = consumersMap + this.watchedTopics = topics + this.consumerOffsets = offsets } /** @@ -61,17 +66,18 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher { * this class on instantiation */ void trackConsumerOffset(ConsumerOffset offset) { - if (this.consumersMap == null) { + if (this.consumerOffsets == null) { return } - TopicPartition key = new TopicPartition(offset.topic, offset.partition) + this.watchedTopics << offset.topic + KafkaConsumer consumer = new KafkaConsumer(offset.topic, + offset.partition, + offset.groupName) + this.consumerOffsets[consumer] = offset.offset - if (this.consumersMap.containsKey(key)) { - this.consumersMap[key] << offset - } - else { - this.consumersMap[key] = new CopyOnWriteArrayList([offset]) + this.onConsumerData.each { Closure c -> + c.call(consumer) } } diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy index 1865823..eba5030 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy @@ -17,8 +17,10 @@ class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher { private static final String ZK_PATH = '/kafka_spout' private JsonSlurper json - KafkaSpoutTreeWatcher(CuratorFramework client, AbstractMap consumersMap) { - super(client, consumersMap) + KafkaSpoutTreeWatcher(CuratorFramework client, + AbstractSet topics, + AbstractMap offsets) { + super(client, topics, offsets) this.json = new JsonSlurper() } diff --git a/src/test/groovy/com/github/lookout/verspaetung/KafkaConsumerSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/KafkaConsumerSpec.groovy new file mode 100644 index 0000000..458b872 --- /dev/null +++ b/src/test/groovy/com/github/lookout/verspaetung/KafkaConsumerSpec.groovy @@ -0,0 +1,38 @@ +package com.github.lookout.verspaetung + +import spock.lang.* + +class KafkaConsumerSpec extends Specification { + String topic = 'spock-topic' + Integer partition = 2 + String consumerName = 'spock-consumer' + + def "the constructor should set the properties properly"() { + given: + KafkaConsumer consumer = new KafkaConsumer(topic, partition, consumerName) + + expect: + consumer instanceof KafkaConsumer + consumer.topic == topic + consumer.partition == partition + consumer.name == consumerName + } + + def "equals() is true with identical source material"() { + given: + KafkaConsumer consumer1 = new KafkaConsumer(topic, partition, consumerName) + KafkaConsumer consumer2 = new KafkaConsumer(topic, partition, consumerName) + + expect: + consumer1 == consumer2 + } + + def "equals() is false with differing source material"() { + given: + KafkaConsumer consumer1 = new KafkaConsumer(topic, partition, consumerName) + KafkaConsumer consumer2 = new KafkaConsumer(topic, partition, "i am different") + + expect: + consumer1 != consumer2 + } +} diff --git a/src/test/groovy/com/github/lookout/verspaetung/metrics/ConsumerGaugeSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/metrics/ConsumerGaugeSpec.groovy new file mode 100644 index 0000000..bf7300a --- /dev/null +++ b/src/test/groovy/com/github/lookout/verspaetung/metrics/ConsumerGaugeSpec.groovy @@ -0,0 +1,43 @@ +package com.github.lookout.verspaetung.metrics + +import spock.lang.* + +import com.github.lookout.verspaetung.KafkaConsumer +import com.github.lookout.verspaetung.TopicPartition + +class ConsumerGaugeSpec extends Specification { + private KafkaConsumer consumer + private TopicPartition tp + + def setup() { + this.tp = new TopicPartition('spock-topic', 1) + this.consumer = new KafkaConsumer(tp.topic, tp.partition, 'spock-consumer') + } + + def "constructor should work"() { + given: + ConsumerGauge gauge = new ConsumerGauge(consumer, [:], [:]) + + expect: + gauge.consumer instanceof KafkaConsumer + gauge.consumers instanceof AbstractMap + } + + def "getValue() should source a value from the map"() { + given: + ConsumerGauge gauge = new ConsumerGauge(this.consumer, + [(this.consumer) : 2], + [(this.tp) : 3]) + + expect: + gauge.value == 1 + } + + def "getValue() should return zero for a consumer not in the map"() { + given: + ConsumerGauge gauge = new ConsumerGauge(consumer, [:], [:]) + + expect: + gauge.value == 0 + } +} diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy index 90f7d35..2dba092 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy @@ -13,7 +13,7 @@ class AbstractConsumerTreeWatcherSpec extends Specification { class MockWatcher extends AbstractConsumerTreeWatcher { MockWatcher() { - super(null, [:]) + super(null, new HashSet(), [:]) } ConsumerOffset processChildData(ChildData d) { } String zookeeperPath() { return '/zk/spock' } @@ -63,10 +63,11 @@ class AbstractConsumerTreeWatcherSpec extends Specification { watcher.trackConsumerOffset(offset) then: - watcher.consumersMap.size() == 1 + watcher.consumerOffsets.size() == 1 + watcher.watchedTopics.size() == 1 } - def "trackConsumerOffset() should append to a list for existing topics in the map"() { + def "trackConsumerOffset() append an offset but not a topic for different group names"() { given: String topic = 'spock-topic' TopicPartition mapKey = new TopicPartition(topic, 0) @@ -80,8 +81,8 @@ class AbstractConsumerTreeWatcherSpec extends Specification { watcher.trackConsumerOffset(secondOffset) then: - watcher.consumersMap.size() == 1 - watcher.consumersMap[mapKey].size() == 2 + watcher.watchedTopics.size() == 1 + watcher.consumerOffsets.size() == 2 } diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy index a5f6d99..4a26333 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy @@ -11,7 +11,7 @@ class KafkaSpoutTreeWatcherSpec extends Specification { def setup() { this.mockCurator = Mock(CuratorFramework) - this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, [:]) + this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, new HashSet(), [:]) } def "consumerNameFromPath() should give the right name for a valid path"() { diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy index ef8a038..52463bb 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy @@ -11,7 +11,7 @@ class StandardTreeWatcherSpec extends Specification { def setup() { this.mockCurator = Mock(CuratorFramework) - this.watcher = new StandardTreeWatcher(this.mockCurator, [:]) + this.watcher = new StandardTreeWatcher(this.mockCurator, new HashSet(), [:]) } def "processChildData should return null if the path is invalid"() { From 38e8c62e004baa09e0b8a7a955b9538a3f3aca31 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 20 Mar 2015 10:58:45 -0700 Subject: [PATCH 3/3] Properly report metrics to datadog with the appropriate tags This requires a much more recent version of our metrics-datadog library but does result in the right values being reported into datadog. --- build.gradle | 3 +- .../github/lookout/verspaetung/Main.groovy | 26 +++++++++++------ .../verspaetung/metrics/ConsumerGauge.groovy | 29 +++++++++++++++++-- .../verspaetung/metrics/HeartbeatGauge.groovy | 1 - 4 files changed, 45 insertions(+), 14 deletions(-) diff --git a/build.gradle b/build.gradle index 144f50d..6db8f69 100644 --- a/build.gradle +++ b/build.gradle @@ -35,6 +35,7 @@ repositories { jcenter() maven { url 'https://dl.bintray.com/rtyler/maven' } + maven { url 'https://dl.bintray.com/lookout/systems' } } dependencies { @@ -55,7 +56,7 @@ dependencies { /* Needed for command line options parsing */ compile 'commons-cli:commons-cli:1.2+' - compile 'com.timgroup:java-statsd-client:3.1.2+' + compile 'com.github.lookout:metrics-datadog:0.1.3' ['metrics-core', 'metrics-graphite'].each { artifactName -> compile "io.dropwizard.metrics:${artifactName}:3.1.0" diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index c19ecc7..ba01124 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -16,11 +16,14 @@ import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.TreeCache +import org.coursera.metrics.datadog.DatadogReporter +import org.coursera.metrics.datadog.transport.UdpTransport import org.slf4j.Logger import org.slf4j.LoggerFactory import com.codahale.metrics.* + class Main { private static final String METRICS_PREFIX = 'verspaetung' @@ -126,20 +129,24 @@ class Main { .convertDurationsTo(TimeUnit.MILLISECONDS) .build() } + else { + UdpTransport transport = new UdpTransport.Builder() + .withPrefix(statsdPrefix) + .build() + + reporter = DatadogReporter.forRegistry(registry) + .withEC2Host() + .withTransport(transport) + .build() + } /* Start the reporter if we've got it */ reporter?.start(1, TimeUnit.SECONDS) logger.info("Starting wait loop...") - while (true) { - Thread.sleep(1 * 1000) + synchronized(this) { + wait() } - - logger.info("exiting..") - poller.die() - poller.join() - - return } static void registerMetricFor(KafkaConsumer consumer, @@ -153,7 +160,8 @@ class Main { ConsumerGauge gauge = new ConsumerGauge(consumer, consumerOffsets, topicOffsets) - this.registry.register(gauge.name, gauge) + consumerGauges.put(consumer, gauge) + this.registry.register(gauge.nameForRegistry, gauge) } diff --git a/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy b/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy index ff9cd8b..fad3c24 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy @@ -3,16 +3,16 @@ package com.github.lookout.verspaetung.metrics import java.util.AbstractMap import com.codahale.metrics.Gauge import groovy.transform.TypeChecked +import org.coursera.metrics.datadog.Tagged import com.github.lookout.verspaetung.KafkaConsumer import com.github.lookout.verspaetung.TopicPartition - /** * Dropwizard Metrics Gauge for reporting the value of a given KafkaConsumer */ @TypeChecked -class ConsumerGauge implements Gauge { +class ConsumerGauge implements Gauge, Tagged { protected KafkaConsumer consumer protected AbstractMap consumers protected AbstractMap topics @@ -37,7 +37,30 @@ class ConsumerGauge implements Gauge { return ((Integer)this.topics[topicPartition]) - this.consumers[consumer] } + @Override + List getTags() { + return ["partition:${this.consumer.partition}", + "topic:${this.consumer.topic}", + "consumer-group:${this.consumer.name}" + ].collect { s -> s.strings.join('') } + } + + /** + * return a unique name for this gauge + */ + String getNameForRegistry() { + return "${this.consumer.topic}.${this.consumer.partition}.${this.consumer.name}" + } + + @Override String getName() { - return "verspaetung.${this.consumer.topic}.${this.consumer.partition}.${this.consumer.name}" + return this.consumer.topic + + /* need to return this if we're just using the console or statsd + * reporters + + return "${this.consumer.topic}.${this.consumer.partition}.${this.consumer.name}" + + */ } } diff --git a/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy b/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy index 410f6ea..603864a 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy @@ -2,7 +2,6 @@ package com.github.lookout.verspaetung.metrics import com.codahale.metrics.* - /** * A simple gauge that will always just return 1 indicating that the process is * alive