Introduce the AbstractConsumerTreeWatcher to handle watchers on Kafka consumer trees

This gives us two kinds of AbstractTreeWatcher instances, those that watch
special-case subtrees (e.g. the BrokerTreeWatcher) and then those which need to
watch and report Kafka consumer offset information (e.g. StandardTreeWatcher)

References #9
This commit is contained in:
R. Tyler Croy 2015-01-28 02:01:53 -08:00
parent af19abfacb
commit b4b9fe9860
6 changed files with 184 additions and 141 deletions

View File

@ -0,0 +1,81 @@
package com.github.lookout.verspaetung.zk
import com.github.lookout.verspaetung.TopicPartition
import java.util.concurrent.CopyOnWriteArrayList
import groovy.transform.TypeChecked
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
@TypeChecked
abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
protected AbstractMap<TopicPartition, List<ConsumerOffset>> consumersMap
AbstractConsumerTreeWatcher(CuratorFramework client,
AbstractMap consumersMap) {
super(client)
this.consumersMap = consumersMap
}
/**
* Process the ChildData associated with an event
*/
abstract ConsumerOffset processChildData(ChildData data)
/**
* Primary TreeCache event processing callback
*/
void childEvent(CuratorFramework client, TreeCacheEvent event) {
if (event?.type == TreeCacheEvent.Type.INITIALIZED) {
this.onInitComplete.each { Closure c ->
c?.call()
}
}
/* bail out early if we don't care about the event */
if (!isNodeEvent(event)) {
return
}
ConsumerOffset offset = processChildData(event?.data)
if (offset != null) {
trackConsumerOffset(offset)
}
}
/**
* Keep track of a ConsumerOffset in the consumersMap that was passed into
* this class on instantiation
*/
void trackConsumerOffset(ConsumerOffset offset) {
if (this.consumersMap == null) {
return
}
TopicPartition key = new TopicPartition(offset.topic, offset.partition)
if (this.consumersMap.containsKey(key)) {
this.consumersMap[key] << offset
}
else {
this.consumersMap[key] = new CopyOnWriteArrayList([offset])
}
}
/**
* Return true if the TreeCacheEvent received pertains to a node event that
* we're interested in
*/
Boolean isNodeEvent(TreeCacheEvent event) {
if ((event?.type == TreeCacheEvent.Type.NODE_ADDED) ||
(event?.type == TreeCacheEvent.Type.NODE_UPDATED)) {
return true
}
return false
}
}

View File

@ -1,15 +1,11 @@
package com.github.lookout.verspaetung.zk
import com.github.lookout.verspaetung.TopicPartition
import java.util.concurrent.CopyOnWriteArrayList
import groovy.transform.TypeChecked
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCache
import org.apache.curator.framework.recipes.cache.TreeCacheListener
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
import org.apache.curator.framework.recipes.cache.TreeCacheListener
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@ -21,27 +17,20 @@ import org.slf4j.LoggerFactory
*/
@TypeChecked
abstract class AbstractTreeWatcher implements TreeCacheListener {
protected AbstractMap<TopicPartition, List<ConsumerOffset>> consumersMap
protected List<Closure> onInitComplete
protected Logger logger
protected CuratorFramework client
protected TreeCache cache
AbstractTreeWatcher(CuratorFramework client, AbstractMap consumers) {
this.client = client
this.consumersMap = consumers
this.onInitComplete = []
AbstractTreeWatcher(CuratorFramework client) {
this.logger = LoggerFactory.getLogger(this.class)
this.client = client
this.onInitComplete = []
this.cache = new TreeCache(client, zookeeperPath())
this.cache.listenable.addListener(this)
}
/**
* Process the ChildData associated with an event
*/
abstract ConsumerOffset processChildData(ChildData data)
/**
* Return the String of the path in Zookeeper this class should watch. This
* method must be safe to call from the initializer of the class
@ -55,56 +44,5 @@ abstract class AbstractTreeWatcher implements TreeCacheListener {
this.cache?.start()
}
/**
* Primary TreeCache event processing callback
*/
void childEvent(CuratorFramework client, TreeCacheEvent event) {
if (event?.type == TreeCacheEvent.Type.INITIALIZED) {
this.onInitComplete.each { Closure c ->
c?.call()
}
}
/* bail out early if we don't care about the event */
if (!isNodeEvent(event)) {
return
}
ConsumerOffset offset = processChildData(event?.data)
if (offset != null) {
trackConsumerOffset(offset)
}
}
/**
* Keep track of a ConsumerOffset in the consumersMap that was passed into
* this class on instantiation
*/
void trackConsumerOffset(ConsumerOffset offset) {
if (this.consumersMap == null) {
return
}
TopicPartition key = new TopicPartition(offset.topic, offset.partition)
if (this.consumersMap.containsKey(key)) {
this.consumersMap[key] << offset
}
else {
this.consumersMap[key] = new CopyOnWriteArrayList([offset])
}
}
/**
* Return true if the TreeCacheEvent received pertains to a node event that
* we're interested in
*/
Boolean isNodeEvent(TreeCacheEvent event) {
if ((event?.type == TreeCacheEvent.Type.NODE_ADDED) ||
(event?.type == TreeCacheEvent.Type.NODE_UPDATED)) {
return true
}
return false
}
abstract void childEvent(CuratorFramework client, TreeCacheEvent event)
}

View File

@ -20,36 +20,34 @@ import org.slf4j.LoggerFactory
* information
*/
@TypeChecked
class BrokerTreeWatcher implements TreeCacheListener {
class BrokerTreeWatcher extends AbstractTreeWatcher {
static final Integer INVALID_BROKER_ID = -1
private static final String BROKERS_PATH = '/brokers/ids'
private final Logger logger = LoggerFactory.getLogger(BrokerTreeWatcher.class)
private JsonSlurper json
private TreeCache cache
private final String BROKERS_PATH = '/brokers/ids'
private List<Closure> onBrokerUpdates
private Boolean isTreeInitialized = false
private List<KafkaBroker> brokers
BrokerTreeWatcher(CuratorFramework client) {
super(client)
this.json = new JsonSlurper()
this.cache = new TreeCache(client, BROKERS_PATH)
this.cache.listenable.addListener(this)
this.brokers = []
this.onBrokerUpdates = []
}
/**
* Start our internal cache
*/
void start() {
this.cache?.start()
String zookeeperPath() {
return BROKERS_PATH
}
/**
* Process events like NODE_ADDED and NODE_REMOVED to keep an up to date
* list of brokers
*/
@Override
void childEvent(CuratorFramework client, TreeCacheEvent event) {
/* If we're initialized that means we should have all our brokers in
* our internal list already and we can fire an event

View File

@ -10,7 +10,7 @@ import org.apache.curator.framework.recipes.cache.ChildData
*/
@TypeChecked
@InheritConstructors
class StandardTreeWatcher extends AbstractTreeWatcher {
class StandardTreeWatcher extends AbstractConsumerTreeWatcher {
private static final String ZK_PATH = '/consumers'
String zookeeperPath() {

View File

@ -0,0 +1,86 @@
package com.github.lookout.verspaetung.zk
import spock.lang.*
import com.github.lookout.verspaetung.TopicPartition
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class AbstractConsumerTreeWatcherSpec extends Specification {
private AbstractConsumerTreeWatcher watcher
class MockWatcher extends AbstractConsumerTreeWatcher {
MockWatcher() {
super(null, [:])
}
ConsumerOffset processChildData(ChildData d) { }
String zookeeperPath() { return '/zk/spock' }
}
def setup() {
this.watcher = new MockWatcher()
}
def "isNodeEvent() returns false by default"() {
expect:
watcher.isNodeEvent(null) == false
}
def "isNodeEvent() return true for NODE_ADDED"() {
given:
def event = new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, null)
expect:
watcher.isNodeEvent(event) == true
}
def "isNodeEvent() return true for NODE_UPDATED"() {
given:
def event = new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, null)
expect:
watcher.isNodeEvent(event) == true
}
def "childEvent() not processChildData if the event is not to be processed"() {
given:
watcher = Spy(MockWatcher)
1 * watcher.isNodeEvent(_) >> false
0 * watcher.processChildData(_) >> null
expect:
watcher.childEvent(null, null)
}
def "trackConsumerOffset() should create a new list for new topics in the map"() {
given:
ConsumerOffset offset = new ConsumerOffset('spock-topic', 0, 1337)
when:
watcher.trackConsumerOffset(offset)
then:
watcher.consumersMap.size() == 1
}
def "trackConsumerOffset() should append to a list for existing topics in the map"() {
given:
String topic = 'spock-topic'
TopicPartition mapKey = new TopicPartition(topic, 0)
ConsumerOffset offset = new ConsumerOffset(topic, 0, 1337)
offset.groupName = 'spock-1'
ConsumerOffset secondOffset = new ConsumerOffset(topic, 0, 0)
secondOffset.groupName = 'spock-2'
when:
watcher.trackConsumerOffset(offset)
watcher.trackConsumerOffset(secondOffset)
then:
watcher.consumersMap.size() == 1
watcher.consumersMap[mapKey].size() == 2
}
}

View File

@ -3,8 +3,10 @@ package com.github.lookout.verspaetung.zk
import spock.lang.*
import com.github.lookout.verspaetung.TopicPartition
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class AbstractTreeWatcherSpec extends Specification {
private AbstractTreeWatcher watcher
@ -13,73 +15,11 @@ class AbstractTreeWatcherSpec extends Specification {
MockWatcher() {
super(null, [:])
}
ConsumerOffset processChildData(ChildData d) { }
void childEvent(CuratorFramework c, TreeCacheEvent e) { }
String zookeeperPath() { return '/zk/spock' }
}
def setup() {
this.watcher = new MockWatcher()
}
def "isNodeEvent() returns false by default"() {
expect:
watcher.isNodeEvent(null) == false
}
def "isNodeEvent() return true for NODE_ADDED"() {
given:
def event = new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, null)
expect:
watcher.isNodeEvent(event) == true
}
def "isNodeEvent() return true for NODE_UPDATED"() {
given:
def event = new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, null)
expect:
watcher.isNodeEvent(event) == true
}
def "childEvent() not processChildData if the event is not to be processed"() {
given:
watcher = Spy(MockWatcher)
1 * watcher.isNodeEvent(_) >> false
0 * watcher.processChildData(_) >> null
expect:
watcher.childEvent(null, null)
}
def "trackConsumerOffset() should create a new list for new topics in the map"() {
given:
ConsumerOffset offset = new ConsumerOffset('spock-topic', 0, 1337)
when:
watcher.trackConsumerOffset(offset)
then:
watcher.consumersMap.size() == 1
}
def "trackConsumerOffset() should append to a list for existing topics in the map"() {
given:
String topic = 'spock-topic'
TopicPartition mapKey = new TopicPartition(topic, 0)
ConsumerOffset offset = new ConsumerOffset(topic, 0, 1337)
offset.groupName = 'spock-1'
ConsumerOffset secondOffset = new ConsumerOffset(topic, 0, 0)
secondOffset.groupName = 'spock-2'
when:
watcher.trackConsumerOffset(offset)
watcher.trackConsumerOffset(secondOffset)
then:
watcher.consumersMap.size() == 1
watcher.consumersMap[mapKey].size() == 2
}
}