diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index f225845..af2671e 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -1,5 +1,8 @@ package com.github.lookout.verspaetung +import java.util.concurrent.ConcurrentHashMap +import groovy.transform.TypeChecked + import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.CuratorFramework @@ -7,42 +10,28 @@ import org.apache.zookeeper.KeeperException import org.apache.curator.framework.recipes.cache.TreeCache import org.apache.curator.framework.recipes.cache.TreeCacheListener +@TypeChecked class Main { static void main(String[] args) { println "Running ${args}" // XXX: Early exit until testing - return + //return ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3) CuratorFramework client = CuratorFrameworkFactory.newClient(args[0], retry) + ConcurrentHashMap consumers = new ConcurrentHashMap() client.start() TreeCache cache = new TreeCache(client, '/consumers') println cache - cache.listenable.addListener(new zk.StandardTreeWatcher()) + cache.listenable.addListener(new zk.StandardTreeWatcher(consumers)) cache.start() println 'started..' - Thread.sleep(5 * 1000) + Thread.sleep(9 * 1000) println 'exiting..' return - - client.children.forPath('/consumers').each { path -> - try { - client.children.forPath("/consumers/${path}/offsets").each { topic -> - client.children.forPath("/consumers/${path}/offsets/${topic}").each { partition -> - String offset = new String(client.data.forPath("/consumers/${path}/offsets/${topic}/${partition}")) - println "${path}:${topic}:${partition} = ${offset}" - } - } - } - catch (KeeperException ex) { - println "no offsets for ${path}" - } - } - - println client } } diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy index 4262d56..27b9fac 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy @@ -1,6 +1,7 @@ package com.github.lookout.verspaetung.zk -import groovy.transform.TypeChecked +import java.util.concurrent.ConcurrentHashMap + import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.ChildData import org.apache.curator.framework.recipes.cache.TreeCacheListener @@ -12,8 +13,13 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent * watchers is to process events from the TreeCache and emit processed events * further down the pipeline */ -@TypeChecked abstract class AbstractTreeWatcher implements TreeCacheListener { + protected Map consumersMap = [:] + + AbstractTreeWatcher() { } + AbstractTreeWatcher(Map consumers) { + this.consumersMap = consumers + } /** * Process the ChildData associated with an event @@ -32,7 +38,23 @@ abstract class AbstractTreeWatcher implements TreeCacheListener { ConsumerOffset offset = processChildData(event?.data) if (offset != null) { - println offset + trackConsumerOffset(offset) + } + } + + /** + * + */ + void trackConsumerOffset(ConsumerOffset offset) { + if (this.consumersMap == null) { + return + } + + if (this.consumersMap.containsKey(offset.topic)) { + this.consumersMap[offset.topic] << offset + } + else { + this.consumersMap[offset.topic] = [offset] } } diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/ConsumerOffset.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/ConsumerOffset.groovy index f8187bf..c956bd6 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/ConsumerOffset.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/ConsumerOffset.groovy @@ -12,6 +12,14 @@ class ConsumerOffset { private Integer partition private ChildData rawData + ConsumerOffset() { + } + + ConsumerOffset(String topic, Integer partition, Integer offset) { + this.topic = topic + this.partition = partition + this.offset = offset + } String toString() { return "ConsumerOffset[${hashCode()}] ${topic}:${partition} ${groupName} is at ${offset}" diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy index 4e16613..7603f2b 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy @@ -1,11 +1,13 @@ package com.github.lookout.verspaetung.zk +import groovy.transform.TypeChecked import org.apache.curator.framework.recipes.cache.ChildData /** * StandardTreeWatcher processes Zookeeper paths for standard high-level Kafka * consumers */ +@TypeChecked class StandardTreeWatcher extends AbstractTreeWatcher { /** @@ -21,9 +23,9 @@ ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473 */ ConsumerOffset offset = new ConsumerOffset() - List pathParts = data.path.split(/\//) + List pathParts = data.path.split(/\//) as List - if (pathParts.size != 6) { + if (pathParts.size() != 6) { return null } diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy index 30ec985..5b560cb 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy @@ -36,4 +36,42 @@ class AbstractTreeWatcherSpec extends Specification { expect: watcher.isNodeEvent(event) == true } + + + def "childEvent() not processChildData if the event is not to be processed"() { + given: + watcher = Spy(MockWatcher) + 1 * watcher.isNodeEvent(_) >> false + 0 * watcher.processChildData(_) >> null + + expect: + watcher.childEvent(null, null) + } + + def "trackConsumerOffset() should create a new list for new topics in the map"() { + given: + ConsumerOffset offset = new ConsumerOffset('spock-topic', 0, 1337) + + when: + watcher.trackConsumerOffset(offset) + + then: + watcher.consumersMap.size() == 1 + } + + def "trackConsumerOffset() should append to a list for existing topics in the map"() { + given: + String topic = 'spock-topic' + ConsumerOffset offset = new ConsumerOffset(topic, 0, 1337) + ConsumerOffset secondOffset = new ConsumerOffset(topic, 1, 0) + + when: + watcher.trackConsumerOffset(offset) + watcher.trackConsumerOffset(secondOffset) + + then: + watcher.consumersMap.size() == 1 + watcher.consumersMap[topic].size() == 2 + + } }