diff --git a/README.md b/README.md index fba22d4..824858f 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,8 @@ information, Verspätung computs the delta for each of the consumer groups and reports it to statsd. - - ### Hacking * *Running tests:* `./gradlew check` -* *Running the app locally:* `./gradlew run` +* *Running the app locally:* `./gradlew run -PzookeeperHosts=localhost:2181` * *Building the app for distribution:* `./gradlew shadowJar` diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index 7e1f003..f225845 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -10,33 +10,23 @@ import org.apache.curator.framework.recipes.cache.TreeCacheListener class Main { static void main(String[] args) { println "Running ${args}" + // XXX: Early exit until testing + return ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3) CuratorFramework client = CuratorFrameworkFactory.newClient(args[0], retry) client.start() - TreeCache cache = new TreeCache(client, '/kafka_spout') + TreeCache cache = new TreeCache(client, '/consumers') println cache - cache.listenable.addListener([ - childEvent: { cl, ev -> - println "EV: ${ev}" - } - ] as TreeCacheListener) - + cache.listenable.addListener(new zk.StandardTreeWatcher()) cache.start() println 'started..' - Boolean foundChildren = false - Map children = null + Thread.sleep(5 * 1000) - while ((children == null) || children.isEmpty()) { - children = cache.getCurrentChildren('/kafka_spout') - println "CHILDREN: ${children}" - - Thread.sleep(100) - } - [1, 2, 3].each { Thread.sleep(300) } + println 'exiting..' return client.children.forPath('/consumers').each { path -> diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy new file mode 100644 index 0000000..4262d56 --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy @@ -0,0 +1,50 @@ +package com.github.lookout.verspaetung.zk + +import groovy.transform.TypeChecked +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.recipes.cache.ChildData +import org.apache.curator.framework.recipes.cache.TreeCacheListener +import org.apache.curator.framework.recipes.cache.TreeCacheEvent + +/** + * AbstractTreeWatcher defines the contract and base components for the various + * Zookeeper tree watchers Verspaetung needs. The responsibility of these + * watchers is to process events from the TreeCache and emit processed events + * further down the pipeline + */ +@TypeChecked +abstract class AbstractTreeWatcher implements TreeCacheListener { + + /** + * Process the ChildData associated with an event + */ + abstract ConsumerOffset processChildData(ChildData data) + + /** + * Primary TreeCache event processing callback + */ + void childEvent(CuratorFramework client, TreeCacheEvent event) { + /* bail out early if we don't care about the event */ + if (!isNodeEvent(event)) { + return + } + + ConsumerOffset offset = processChildData(event?.data) + + if (offset != null) { + println offset + } + } + + /** + * Return true if the TreeCacheEvent received pertains to a node event that + * we're interested in + */ + Boolean isNodeEvent(TreeCacheEvent event) { + if ((event?.type == TreeCacheEvent.Type.NODE_ADDED) || + (event?.type == TreeCacheEvent.Type.NODE_UPDATED)) { + return true + } + return false + } +} diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/ConsumerOffset.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/ConsumerOffset.groovy new file mode 100644 index 0000000..f8187bf --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/ConsumerOffset.groovy @@ -0,0 +1,20 @@ +package com.github.lookout.verspaetung.zk + +import org.apache.curator.framework.recipes.cache.ChildData + +/** + * POJO representing data from Zookeeper for a consumer, topic and offset + */ +class ConsumerOffset { + private String topic + private String groupName + private Integer offset + private Integer partition + private ChildData rawData + + + String toString() { + return "ConsumerOffset[${hashCode()}] ${topic}:${partition} ${groupName} is at ${offset}" + } +} + diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy new file mode 100644 index 0000000..4e16613 --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy @@ -0,0 +1,37 @@ +package com.github.lookout.verspaetung.zk + +import org.apache.curator.framework.recipes.cache.ChildData + +/** + * StandardTreeWatcher processes Zookeeper paths for standard high-level Kafka + * consumers + */ +class StandardTreeWatcher extends AbstractTreeWatcher { + + /** + * Extract the necessary information from a standard (i.e. high-level Kafka + * consumer) tree of offsets + */ + ConsumerOffset processChildData(ChildData data) { + if (data == null) { + return null + } + /* +ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473,8595174478,1416808804928,1416808805262,1,0,0,0,1,0,8595174473, data=[48]} + */ + ConsumerOffset offset = new ConsumerOffset() + + List pathParts = data.path.split(/\//) + + if (pathParts.size != 6) { + return null + } + + offset.groupName = pathParts[2] + offset.topic = pathParts[4] + offset.partition = new Integer(pathParts[5]) + offset.offset = new Integer(new String(data.data)) + + return offset + } +} diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy new file mode 100644 index 0000000..30ec985 --- /dev/null +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy @@ -0,0 +1,39 @@ +package com.github.lookout.verspaetung.zk + +import spock.lang.* + +import org.apache.curator.framework.recipes.cache.ChildData +import org.apache.curator.framework.recipes.cache.TreeCacheEvent + +class AbstractTreeWatcherSpec extends Specification { + private AbstractTreeWatcher watcher + + class MockWatcher extends AbstractTreeWatcher { + ConsumerOffset processChildData(ChildData d) { } + } + + def setup() { + this.watcher = new MockWatcher() + } + + def "isNodeEvent() returns false by default"() { + expect: + watcher.isNodeEvent(null) == false + } + + def "isNodeEvent() return true for NODE_ADDED"() { + given: + def event = new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, null) + + expect: + watcher.isNodeEvent(event) == true + } + + def "isNodeEvent() return true for NODE_UPDATED"() { + given: + def event = new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, null) + + expect: + watcher.isNodeEvent(event) == true + } +} diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy new file mode 100644 index 0000000..f27e9e3 --- /dev/null +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy @@ -0,0 +1,40 @@ +package com.github.lookout.verspaetung.zk + +import spock.lang.* + +import org.apache.curator.framework.recipes.cache.ChildData + +class StandardTreeWatcherSpec extends Specification { + private StandardTreeWatcher watcher + + def setup() { + this.watcher = new StandardTreeWatcher() + } + + def "processChildData should return null if null is given"() { + expect: + watcher.processChildData(null) == null + } + + def "processChildData should return null if the path is invalid"() { + given: + ChildData data = new ChildData("/consumers", null, (byte[])[48]) + ConsumerOffset offset = watcher.processChildData(data) + + expect: + offset == null + } + + def "processChildData should create a ConsumerOffset properly"() { + given: + String path = "/consumers/my-consumer-group/offsets/thetopic/3" + ChildData data = new ChildData(path, null, (byte[])[48]) + ConsumerOffset offset = watcher.processChildData(data) + + expect: + offset.groupName == 'my-consumer-group' + offset.topic == 'thetopic' + offset.partition == 3 + offset.offset == 0 + } +}