From d0c99b9a34bc6e4f714847dd2be7b1e1fef6d90a Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 28 Jan 2015 02:45:49 -0800 Subject: [PATCH] Implement the KafkaSpoutTreeWatcher for processing Storm KafkaSpout offset data Fixes #9 --- .../github/lookout/verspaetung/Main.groovy | 20 +++++-- .../zk/AbstractConsumerTreeWatcher.groovy | 10 ++++ .../verspaetung/zk/AbstractTreeWatcher.groovy | 5 +- .../zk/KafkaSpoutTreeWatcher.groovy | 53 +++++++++++++++++++ .../verspaetung/zk/StandardTreeWatcher.groovy | 15 +----- .../zk/AbstractConsumerTreeWatcherSpec.groovy | 1 + .../zk/KafkaSpoutTreeWatcherSpec.groovy | 32 +++++++++++ .../zk/StandardTreeWatcherSpec.groovy | 5 -- 8 files changed, 115 insertions(+), 26 deletions(-) create mode 100644 src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy create mode 100644 src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index d0561cc..0771987 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -1,6 +1,7 @@ package com.github.lookout.verspaetung import com.github.lookout.verspaetung.zk.BrokerTreeWatcher +import com.github.lookout.verspaetung.zk.KafkaSpoutTreeWatcher import com.github.lookout.verspaetung.zk.StandardTreeWatcher import java.util.AbstractMap @@ -56,9 +57,16 @@ class Main { client.start() KafkaPoller poller = setupKafkaPoller(consumers, statsd, cli.hasOption('n')) - BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client) - StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, consumers) + BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client).start() + StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, consumers).start() + /* Assuming that most people aren't needing to run Storm-based watchers + * as well + */ + if (cli.hasOption('s')) { + KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, consumers) + stormWatcher.start() + } consumerWatcher.onInitComplete << { logger.info("standard consumers initialized to ${consumers.size()} (topic, partition) tuples") @@ -68,9 +76,6 @@ class Main { poller.refresh(brokers) } - brokerWatcher.start() - consumerWatcher.start() - logger.info("Started wait loop...") while (true) { @@ -139,10 +144,15 @@ class Main { .withLongOpt('dry-run') .create('n') + Option stormSpouts = OptionBuilder.withDescription('Watch Storm KafkaSpout offsets (under /kafka_spout)') + .withLongOpt('storm') + .create('s') + options.addOption(zookeeper) options.addOption(statsdHost) options.addOption(statsdPort) options.addOption(dryRun) + options.addOption(stormSpouts) return options } diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy index f47d7dc..69fac7d 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy @@ -24,6 +24,11 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher { */ abstract ConsumerOffset processChildData(ChildData data) + /** + * Determine whether a given path is of interest, i.e. path which contains + * offset data + */ + abstract Boolean isOffsetSubtree(String path) /** * Primary TreeCache event processing callback */ @@ -39,6 +44,11 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher { return } + if ((event.data == null) || + (!isOffsetSubtree(event.data?.path))) { + return + } + ConsumerOffset offset = processChildData(event?.data) if (offset != null) { diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy index 97630b3..d9e3445 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy @@ -38,10 +38,11 @@ abstract class AbstractTreeWatcher implements TreeCacheListener { abstract String zookeeperPath() /** - * Start our internal cache + * Start our internal cache and return ourselves for API cleanliness */ - void start() { + AbstractTreeWatcher start() { this.cache?.start() + return this } abstract void childEvent(CuratorFramework client, TreeCacheEvent event) diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy new file mode 100644 index 0000000..21d04bb --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy @@ -0,0 +1,53 @@ +package com.github.lookout.verspaetung.zk + +import groovy.json.JsonSlurper +import groovy.transform.TypeChecked +import groovy.transform.TypeCheckingMode +import groovy.transform.InheritConstructors +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.recipes.cache.ChildData + +/** + * KafkaSpoutTreeWatcher process Zookeeper paths normally associated with Storm + * KafkaSpout based consumers + */ +@TypeChecked +@InheritConstructors +class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher { + private static final String ZK_PATH = '/kafka_spout' + private JsonSlurper json + + KafkaSpoutTreeWatcher(CuratorFramework client, AbstractMap consumersMap) { + super(client, consumersMap) + + this.json = new JsonSlurper() + } + + String zookeeperPath() { return ZK_PATH } + + /* skipping type checking since Groovy's JsonSlurper gives us a pretty + * loose Object to deal with + */ + @TypeChecked(TypeCheckingMode.SKIP) + ConsumerOffset processChildData(ChildData nodeData) { + Object offsetData = json.parseText(new String(nodeData.data, 'UTF-8')) + /* + [broker:[host:REDACTED, port:6667], offset:179, partition:7, topic:device_data, topology:[id:01c0d1fc-e956-4b35-9891-dd835488cf45, name:unwrap_topology]] + */ + ConsumerOffset offset = new ConsumerOffset() + offset.groupName = offsetData.topology.name + offset.topic = offsetData.topic + offset.partition = offsetData.partition + offset.offset = offsetData.offset + + return offset + } + + /** + * We're expecting things to look like: + * /kafka_spout/topologyname/partition_0 + */ + Boolean isOffsetSubtree(String path) { + return (path =~ /\/kafka_spout\/(.*)\/partition_(\d+)/) + } +} diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy index 17510ab..dd5e209 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy @@ -13,26 +13,13 @@ import org.apache.curator.framework.recipes.cache.ChildData class StandardTreeWatcher extends AbstractConsumerTreeWatcher { private static final String ZK_PATH = '/consumers' - String zookeeperPath() { - return ZK_PATH - } + String zookeeperPath() { return ZK_PATH } /** * Extract the necessary information from a standard (i.e. high-level Kafka * consumer) tree of offsets */ ConsumerOffset processChildData(ChildData data) { - if (data == null) { - return null - } - - /* There are non-offset related subtrees in /consumers that we don't - * care about, let's just skip over them - */ - if (!isOffsetSubtree(data.path)) { - return null - } - /* ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473,8595174478,1416808804928,1416808805262,1,0,0,0,1,0,8595174473, data=[48]} */ diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy index 7a86034..d8aded0 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy @@ -17,6 +17,7 @@ class AbstractConsumerTreeWatcherSpec extends Specification { } ConsumerOffset processChildData(ChildData d) { } String zookeeperPath() { return '/zk/spock' } + Boolean isOffsetSubtree(String p) { return true } } def setup() { diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy new file mode 100644 index 0000000..960b4a5 --- /dev/null +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy @@ -0,0 +1,32 @@ +package com.github.lookout.verspaetung.zk + +import spock.lang.* + +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.recipes.cache.ChildData + +class KafkaSpoutTreeWatcherSpec extends Specification { + private KafkaSpoutTreeWatcher watcher + private CuratorFramework mockCurator + + def setup() { + this.mockCurator = Mock(CuratorFramework) + this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, [:]) + } + + def "isOffsetSubtree should return true for a valid subtree path"() { + given: + String path = '/kafka_spout/spock-topology/partition_1' + + expect: + watcher.isOffsetSubtree(path) == true + } + + def "isOffsetSubtree should return false for a non-offset subtree path"() { + given: + String path = '/kafka_spout/spock-topology' + + expect: + watcher.isOffsetSubtree(path) == false + } +} diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy index d648649..ef8a038 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy @@ -14,11 +14,6 @@ class StandardTreeWatcherSpec extends Specification { this.watcher = new StandardTreeWatcher(this.mockCurator, [:]) } - def "processChildData should return null if null is given"() { - expect: - watcher.processChildData(null) == null - } - def "processChildData should return null if the path is invalid"() { given: ChildData data = new ChildData("/consumers", null, (byte[])[48])