From b4b9fe9860ffe5e7feb2ad0b1d12a445333e83b1 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 28 Jan 2015 02:01:53 -0800 Subject: [PATCH] Introduce the AbstractConsumerTreeWatcher to handle watchers on Kafka consumer trees This gives us two kinds of AbstractTreeWatcher instances, those that watch special-case subtrees (e.g. the BrokerTreeWatcher) and then those which need to watch and report Kafka consumer offset information (e.g. StandardTreeWatcher) References #9 --- .../zk/AbstractConsumerTreeWatcher.groovy | 81 +++++++++++++++++ .../verspaetung/zk/AbstractTreeWatcher.groovy | 72 ++-------------- .../verspaetung/zk/BrokerTreeWatcher.groovy | 18 ++-- .../verspaetung/zk/StandardTreeWatcher.groovy | 2 +- .../zk/AbstractConsumerTreeWatcherSpec.groovy | 86 +++++++++++++++++++ .../zk/AbstractTreeWatcherSpec.groovy | 66 +------------- 6 files changed, 184 insertions(+), 141 deletions(-) create mode 100644 src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy create mode 100644 src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy new file mode 100644 index 0000000..f47d7dc --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy @@ -0,0 +1,81 @@ +package com.github.lookout.verspaetung.zk + +import com.github.lookout.verspaetung.TopicPartition + +import java.util.concurrent.CopyOnWriteArrayList +import groovy.transform.TypeChecked + +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.recipes.cache.ChildData +import org.apache.curator.framework.recipes.cache.TreeCacheEvent + +@TypeChecked +abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher { + protected AbstractMap> consumersMap + + AbstractConsumerTreeWatcher(CuratorFramework client, + AbstractMap consumersMap) { + super(client) + this.consumersMap = consumersMap + } + + /** + * Process the ChildData associated with an event + */ + abstract ConsumerOffset processChildData(ChildData data) + + /** + * Primary TreeCache event processing callback + */ + void childEvent(CuratorFramework client, TreeCacheEvent event) { + if (event?.type == TreeCacheEvent.Type.INITIALIZED) { + this.onInitComplete.each { Closure c -> + c?.call() + } + } + + /* bail out early if we don't care about the event */ + if (!isNodeEvent(event)) { + return + } + + ConsumerOffset offset = processChildData(event?.data) + + if (offset != null) { + trackConsumerOffset(offset) + } + } + + /** + * Keep track of a ConsumerOffset in the consumersMap that was passed into + * this class on instantiation + */ + void trackConsumerOffset(ConsumerOffset offset) { + if (this.consumersMap == null) { + return + } + + TopicPartition key = new TopicPartition(offset.topic, offset.partition) + + if (this.consumersMap.containsKey(key)) { + this.consumersMap[key] << offset + } + else { + this.consumersMap[key] = new CopyOnWriteArrayList([offset]) + } + } + + /** + * Return true if the TreeCacheEvent received pertains to a node event that + * we're interested in + */ + Boolean isNodeEvent(TreeCacheEvent event) { + if ((event?.type == TreeCacheEvent.Type.NODE_ADDED) || + (event?.type == TreeCacheEvent.Type.NODE_UPDATED)) { + return true + } + return false + } +} + + diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy index bd7fcbd..97630b3 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy @@ -1,15 +1,11 @@ package com.github.lookout.verspaetung.zk -import com.github.lookout.verspaetung.TopicPartition - -import java.util.concurrent.CopyOnWriteArrayList import groovy.transform.TypeChecked import org.apache.curator.framework.CuratorFramework -import org.apache.curator.framework.recipes.cache.ChildData import org.apache.curator.framework.recipes.cache.TreeCache -import org.apache.curator.framework.recipes.cache.TreeCacheListener import org.apache.curator.framework.recipes.cache.TreeCacheEvent +import org.apache.curator.framework.recipes.cache.TreeCacheListener import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -21,27 +17,20 @@ import org.slf4j.LoggerFactory */ @TypeChecked abstract class AbstractTreeWatcher implements TreeCacheListener { - protected AbstractMap> consumersMap protected List onInitComplete protected Logger logger protected CuratorFramework client protected TreeCache cache - AbstractTreeWatcher(CuratorFramework client, AbstractMap consumers) { - this.client = client - this.consumersMap = consumers - this.onInitComplete = [] + AbstractTreeWatcher(CuratorFramework client) { this.logger = LoggerFactory.getLogger(this.class) + this.client = client + this.onInitComplete = [] this.cache = new TreeCache(client, zookeeperPath()) this.cache.listenable.addListener(this) } - /** - * Process the ChildData associated with an event - */ - abstract ConsumerOffset processChildData(ChildData data) - /** * Return the String of the path in Zookeeper this class should watch. This * method must be safe to call from the initializer of the class @@ -55,56 +44,5 @@ abstract class AbstractTreeWatcher implements TreeCacheListener { this.cache?.start() } - /** - * Primary TreeCache event processing callback - */ - void childEvent(CuratorFramework client, TreeCacheEvent event) { - if (event?.type == TreeCacheEvent.Type.INITIALIZED) { - this.onInitComplete.each { Closure c -> - c?.call() - } - } - - /* bail out early if we don't care about the event */ - if (!isNodeEvent(event)) { - return - } - - ConsumerOffset offset = processChildData(event?.data) - - if (offset != null) { - trackConsumerOffset(offset) - } - } - - /** - * Keep track of a ConsumerOffset in the consumersMap that was passed into - * this class on instantiation - */ - void trackConsumerOffset(ConsumerOffset offset) { - if (this.consumersMap == null) { - return - } - - TopicPartition key = new TopicPartition(offset.topic, offset.partition) - - if (this.consumersMap.containsKey(key)) { - this.consumersMap[key] << offset - } - else { - this.consumersMap[key] = new CopyOnWriteArrayList([offset]) - } - } - - /** - * Return true if the TreeCacheEvent received pertains to a node event that - * we're interested in - */ - Boolean isNodeEvent(TreeCacheEvent event) { - if ((event?.type == TreeCacheEvent.Type.NODE_ADDED) || - (event?.type == TreeCacheEvent.Type.NODE_UPDATED)) { - return true - } - return false - } + abstract void childEvent(CuratorFramework client, TreeCacheEvent event) } diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy index 477aad9..b0ccb28 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy @@ -20,36 +20,34 @@ import org.slf4j.LoggerFactory * information */ @TypeChecked -class BrokerTreeWatcher implements TreeCacheListener { +class BrokerTreeWatcher extends AbstractTreeWatcher { static final Integer INVALID_BROKER_ID = -1 + private static final String BROKERS_PATH = '/brokers/ids' private final Logger logger = LoggerFactory.getLogger(BrokerTreeWatcher.class) private JsonSlurper json - private TreeCache cache - private final String BROKERS_PATH = '/brokers/ids' private List onBrokerUpdates private Boolean isTreeInitialized = false private List brokers BrokerTreeWatcher(CuratorFramework client) { + super(client) + this.json = new JsonSlurper() - this.cache = new TreeCache(client, BROKERS_PATH) - this.cache.listenable.addListener(this) this.brokers = [] this.onBrokerUpdates = [] } - /** - * Start our internal cache - */ - void start() { - this.cache?.start() + + String zookeeperPath() { + return BROKERS_PATH } /** * Process events like NODE_ADDED and NODE_REMOVED to keep an up to date * list of brokers */ + @Override void childEvent(CuratorFramework client, TreeCacheEvent event) { /* If we're initialized that means we should have all our brokers in * our internal list already and we can fire an event diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy index 5297231..17510ab 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy @@ -10,7 +10,7 @@ import org.apache.curator.framework.recipes.cache.ChildData */ @TypeChecked @InheritConstructors -class StandardTreeWatcher extends AbstractTreeWatcher { +class StandardTreeWatcher extends AbstractConsumerTreeWatcher { private static final String ZK_PATH = '/consumers' String zookeeperPath() { diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy new file mode 100644 index 0000000..7a86034 --- /dev/null +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy @@ -0,0 +1,86 @@ +package com.github.lookout.verspaetung.zk + +import spock.lang.* + +import com.github.lookout.verspaetung.TopicPartition +import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.recipes.cache.ChildData +import org.apache.curator.framework.recipes.cache.TreeCacheEvent +import org.apache.curator.framework.recipes.cache.TreeCacheEvent + +class AbstractConsumerTreeWatcherSpec extends Specification { + private AbstractConsumerTreeWatcher watcher + + class MockWatcher extends AbstractConsumerTreeWatcher { + MockWatcher() { + super(null, [:]) + } + ConsumerOffset processChildData(ChildData d) { } + String zookeeperPath() { return '/zk/spock' } + } + + def setup() { + this.watcher = new MockWatcher() + } + + def "isNodeEvent() returns false by default"() { + expect: + watcher.isNodeEvent(null) == false + } + + def "isNodeEvent() return true for NODE_ADDED"() { + given: + def event = new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, null) + + expect: + watcher.isNodeEvent(event) == true + } + + def "isNodeEvent() return true for NODE_UPDATED"() { + given: + def event = new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, null) + + expect: + watcher.isNodeEvent(event) == true + } + + def "childEvent() not processChildData if the event is not to be processed"() { + given: + watcher = Spy(MockWatcher) + 1 * watcher.isNodeEvent(_) >> false + 0 * watcher.processChildData(_) >> null + + expect: + watcher.childEvent(null, null) + } + + def "trackConsumerOffset() should create a new list for new topics in the map"() { + given: + ConsumerOffset offset = new ConsumerOffset('spock-topic', 0, 1337) + + when: + watcher.trackConsumerOffset(offset) + + then: + watcher.consumersMap.size() == 1 + } + + def "trackConsumerOffset() should append to a list for existing topics in the map"() { + given: + String topic = 'spock-topic' + TopicPartition mapKey = new TopicPartition(topic, 0) + ConsumerOffset offset = new ConsumerOffset(topic, 0, 1337) + offset.groupName = 'spock-1' + ConsumerOffset secondOffset = new ConsumerOffset(topic, 0, 0) + secondOffset.groupName = 'spock-2' + + when: + watcher.trackConsumerOffset(offset) + watcher.trackConsumerOffset(secondOffset) + + then: + watcher.consumersMap.size() == 1 + watcher.consumersMap[mapKey].size() == 2 + + } +} diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy index 8687143..7114c87 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcherSpec.groovy @@ -3,8 +3,10 @@ package com.github.lookout.verspaetung.zk import spock.lang.* import com.github.lookout.verspaetung.TopicPartition +import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.ChildData import org.apache.curator.framework.recipes.cache.TreeCacheEvent +import org.apache.curator.framework.recipes.cache.TreeCacheEvent class AbstractTreeWatcherSpec extends Specification { private AbstractTreeWatcher watcher @@ -13,73 +15,11 @@ class AbstractTreeWatcherSpec extends Specification { MockWatcher() { super(null, [:]) } - ConsumerOffset processChildData(ChildData d) { } - + void childEvent(CuratorFramework c, TreeCacheEvent e) { } String zookeeperPath() { return '/zk/spock' } } def setup() { this.watcher = new MockWatcher() } - - def "isNodeEvent() returns false by default"() { - expect: - watcher.isNodeEvent(null) == false - } - - def "isNodeEvent() return true for NODE_ADDED"() { - given: - def event = new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, null) - - expect: - watcher.isNodeEvent(event) == true - } - - def "isNodeEvent() return true for NODE_UPDATED"() { - given: - def event = new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, null) - - expect: - watcher.isNodeEvent(event) == true - } - - def "childEvent() not processChildData if the event is not to be processed"() { - given: - watcher = Spy(MockWatcher) - 1 * watcher.isNodeEvent(_) >> false - 0 * watcher.processChildData(_) >> null - - expect: - watcher.childEvent(null, null) - } - - def "trackConsumerOffset() should create a new list for new topics in the map"() { - given: - ConsumerOffset offset = new ConsumerOffset('spock-topic', 0, 1337) - - when: - watcher.trackConsumerOffset(offset) - - then: - watcher.consumersMap.size() == 1 - } - - def "trackConsumerOffset() should append to a list for existing topics in the map"() { - given: - String topic = 'spock-topic' - TopicPartition mapKey = new TopicPartition(topic, 0) - ConsumerOffset offset = new ConsumerOffset(topic, 0, 1337) - offset.groupName = 'spock-1' - ConsumerOffset secondOffset = new ConsumerOffset(topic, 0, 0) - secondOffset.groupName = 'spock-2' - - when: - watcher.trackConsumerOffset(offset) - watcher.trackConsumerOffset(secondOffset) - - then: - watcher.consumersMap.size() == 1 - watcher.consumersMap[mapKey].size() == 2 - - } }