From af19abfacb1d8caabc0d850254ebc76bc95d77e5 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 28 Jan 2015 01:44:53 -0800 Subject: [PATCH] Refactor the handling of TreeCache into the AbstractTreeWatcher itself This should lay the groundwork for refactoring much of the BrokerTreeWatcher up into the AbstractTreeWatcher --- .../github/lookout/verspaetung/Main.groovy | 7 ++---- .../verspaetung/zk/AbstractTreeWatcher.groovy | 22 ++++++++++++++++++- .../verspaetung/zk/StandardTreeWatcher.groovy | 5 +++++ .../zk/AbstractTreeWatcherSpec.groovy | 4 +++- .../zk/StandardTreeWatcherSpec.groovy | 5 ++++- 5 files changed, 35 insertions(+), 8 deletions(-) diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index 7faa0a8..d0561cc 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -55,10 +55,9 @@ class Main { client.start() - TreeCache cache = new TreeCache(client, '/consumers') KafkaPoller poller = setupKafkaPoller(consumers, statsd, cli.hasOption('n')) BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client) - StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(consumers) + StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, consumers) consumerWatcher.onInitComplete << { @@ -69,10 +68,8 @@ class Main { poller.refresh(brokers) } - cache.listenable.addListener(consumerWatcher) - brokerWatcher.start() - cache.start() + consumerWatcher.start() logger.info("Started wait loop...") diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy index 3034e5b..bd7fcbd 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy @@ -7,6 +7,7 @@ import groovy.transform.TypeChecked import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.ChildData +import org.apache.curator.framework.recipes.cache.TreeCache import org.apache.curator.framework.recipes.cache.TreeCacheListener import org.apache.curator.framework.recipes.cache.TreeCacheEvent import org.slf4j.Logger @@ -23,11 +24,17 @@ abstract class AbstractTreeWatcher implements TreeCacheListener { protected AbstractMap> consumersMap protected List onInitComplete protected Logger logger + protected CuratorFramework client + protected TreeCache cache - AbstractTreeWatcher(AbstractMap consumers) { + AbstractTreeWatcher(CuratorFramework client, AbstractMap consumers) { + this.client = client this.consumersMap = consumers this.onInitComplete = [] this.logger = LoggerFactory.getLogger(this.class) + + this.cache = new TreeCache(client, zookeeperPath()) + this.cache.listenable.addListener(this) } /** @@ -35,6 +42,19 @@ abstract class AbstractTreeWatcher implements TreeCacheListener { */ abstract ConsumerOffset processChildData(ChildData data) + /** + * Return the String of the path in Zookeeper this class should watch. This + * method must be safe to call from the initializer of the class + */ + abstract String zookeeperPath() + + /** + * Start our internal cache + */ + void start() { + this.cache?.start() + } + /** * Primary TreeCache event processing callback */ diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy index f7c0bbd..5297231 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy @@ -11,6 +11,11 @@ import org.apache.curator.framework.recipes.cache.ChildData @TypeChecked @InheritConstructors class StandardTreeWatcher extends AbstractTreeWatcher { + private static final String ZK_PATH = '/consumers' + + String zookeeperPath() { + return ZK_PATH + } /** * Extract the necessary information from a standard (i.e. high-level Kafka diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy index 27243c5..8687143 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy @@ -11,9 +11,11 @@ class AbstractTreeWatcherSpec extends Specification { class MockWatcher extends AbstractTreeWatcher { MockWatcher() { - super([:]) + super(null, [:]) } ConsumerOffset processChildData(ChildData d) { } + + String zookeeperPath() { return '/zk/spock' } } def setup() { diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy index 1beef20..d648649 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy @@ -2,13 +2,16 @@ package com.github.lookout.verspaetung.zk import spock.lang.* +import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.ChildData class StandardTreeWatcherSpec extends Specification { private StandardTreeWatcher watcher + private CuratorFramework mockCurator def setup() { - this.watcher = new StandardTreeWatcher([:]) + this.mockCurator = Mock(CuratorFramework) + this.watcher = new StandardTreeWatcher(this.mockCurator, [:]) } def "processChildData should return null if null is given"() {