From 8b792d72fdb9a48b97e8d64be694c0325c590b91 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 19 Jan 2015 13:08:01 -0800 Subject: [PATCH] Properly use the passed in AbstractMap for the StandardTreeWatcher --- .../lookout/verspaetung/KafkaPoller.groovy | 2 ++ .../com/github/lookout/verspaetung/Main.groovy | 16 +++++++++------- .../verspaetung/zk/AbstractTreeWatcher.groovy | 11 +++++++---- .../verspaetung/zk/StandardTreeWatcher.groovy | 2 ++ .../zk/AbstractTreeWatcherSpec.groovy | 4 +++- .../zk/StandardTreeWatcherSpec.groovy | 2 +- 6 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index f2d4b97..743e579 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -2,6 +2,8 @@ package com.github.lookout.verspaetung import groovy.transform.TypeChecked +import kafka.client.ClientUtils + @TypeChecked class KafkaPoller extends Thread { private Boolean keepRunning = true diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index b9d0d39..1189dd2 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -1,6 +1,7 @@ package com.github.lookout.verspaetung import com.github.lookout.verspaetung.zk.BrokerTreeWatcher +import com.github.lookout.verspaetung.zk.StandardTreeWatcher import java.util.concurrent.ConcurrentHashMap import groovy.transform.TypeChecked @@ -8,11 +9,7 @@ import groovy.transform.TypeChecked import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.CuratorFramework -import org.apache.zookeeper.KeeperException import org.apache.curator.framework.recipes.cache.TreeCache -import org.apache.curator.framework.recipes.cache.TreeCacheListener - -import kafka.client.ClientUtils //@TypeChecked class Main { @@ -26,21 +23,26 @@ class Main { client.start() TreeCache cache = new TreeCache(client, '/consumers') - cache.listenable.addListener(new zk.StandardTreeWatcher(consumers)) KafkaPoller poller = new KafkaPoller() + StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(consumers) + consumerWatcher.onInitComplete = { + println "standard consumers initialized to ${consumers.size()} (topic, partition) tuples" + } + BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client) brokerWatcher.onBrokerUpdates = { brokers -> poller.refresh(brokers) } - poller.start() + cache.listenable.addListener(consumerWatcher) + poller.start() brokerWatcher.start() cache.start() println 'started..' - Thread.sleep(5 * 1000) + Thread.sleep(9 * 1000) println 'exiting..' poller.die() diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy index 4d6a676..e485313 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy @@ -1,7 +1,6 @@ package com.github.lookout.verspaetung.zk import com.github.lookout.verspaetung.TopicPartition -import java.util.concurrent.ConcurrentHashMap import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.ChildData @@ -15,10 +14,10 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent * further down the pipeline */ abstract class AbstractTreeWatcher implements TreeCacheListener { - protected Map consumersMap = [:] + protected AbstractMap consumersMap + protected Closure onInitComplete - AbstractTreeWatcher() { } - AbstractTreeWatcher(Map consumers) { + AbstractTreeWatcher(AbstractMap consumers) { this.consumersMap = consumers } @@ -31,6 +30,10 @@ abstract class AbstractTreeWatcher implements TreeCacheListener { * Primary TreeCache event processing callback */ void childEvent(CuratorFramework client, TreeCacheEvent event) { + if (event?.type == TreeCacheEvent.Type.INITIALIZED) { + this.onInitComplete?.call() + } + /* bail out early if we don't care about the event */ if (!isNodeEvent(event)) { return diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy index 7603f2b..39bb381 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy @@ -1,6 +1,7 @@ package com.github.lookout.verspaetung.zk import groovy.transform.TypeChecked +import groovy.transform.InheritConstructors import org.apache.curator.framework.recipes.cache.ChildData /** @@ -8,6 +9,7 @@ import org.apache.curator.framework.recipes.cache.ChildData * consumers */ @TypeChecked +@InheritConstructors class StandardTreeWatcher extends AbstractTreeWatcher { /** diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy index 3c4967d..27243c5 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy @@ -10,6 +10,9 @@ class AbstractTreeWatcherSpec extends Specification { private AbstractTreeWatcher watcher class MockWatcher extends AbstractTreeWatcher { + MockWatcher() { + super([:]) + } ConsumerOffset processChildData(ChildData d) { } } @@ -38,7 +41,6 @@ class AbstractTreeWatcherSpec extends Specification { watcher.isNodeEvent(event) == true } - def "childEvent() not processChildData if the event is not to be processed"() { given: watcher = Spy(MockWatcher) diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy index f27e9e3..b23c820 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy @@ -8,7 +8,7 @@ class StandardTreeWatcherSpec extends Specification { private StandardTreeWatcher watcher def setup() { - this.watcher = new StandardTreeWatcher() + this.watcher = new StandardTreeWatcher([:]) } def "processChildData should return null if null is given"() {