Push a HashMap through into the TreeWatchres to start keeping tabs on consumer offsets

This data structure isn't fully bake dyet, but at least gives some organization
to the data we're caching out of the Zookeeper tree
This commit is contained in:
R. Tyler Croy 2015-01-18 10:25:04 -08:00
parent afd2ee8567
commit c4bd4ef9c2
5 changed files with 83 additions and 24 deletions

View File

@ -1,5 +1,8 @@
package com.github.lookout.verspaetung
import java.util.concurrent.ConcurrentHashMap
import groovy.transform.TypeChecked
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.CuratorFramework
@ -7,42 +10,28 @@ import org.apache.zookeeper.KeeperException
import org.apache.curator.framework.recipes.cache.TreeCache
import org.apache.curator.framework.recipes.cache.TreeCacheListener
@TypeChecked
class Main {
static void main(String[] args) {
println "Running ${args}"
// XXX: Early exit until testing
return
//return
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(args[0], retry)
ConcurrentHashMap<String, zk.ConsumerOffset> consumers = new ConcurrentHashMap()
client.start()
TreeCache cache = new TreeCache(client, '/consumers')
println cache
cache.listenable.addListener(new zk.StandardTreeWatcher())
cache.listenable.addListener(new zk.StandardTreeWatcher(consumers))
cache.start()
println 'started..'
Thread.sleep(5 * 1000)
Thread.sleep(9 * 1000)
println 'exiting..'
return
client.children.forPath('/consumers').each { path ->
try {
client.children.forPath("/consumers/${path}/offsets").each { topic ->
client.children.forPath("/consumers/${path}/offsets/${topic}").each { partition ->
String offset = new String(client.data.forPath("/consumers/${path}/offsets/${topic}/${partition}"))
println "${path}:${topic}:${partition} = ${offset}"
}
}
}
catch (KeeperException ex) {
println "no offsets for ${path}"
}
}
println client
}
}

View File

@ -1,6 +1,7 @@
package com.github.lookout.verspaetung.zk
import groovy.transform.TypeChecked
import java.util.concurrent.ConcurrentHashMap
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheListener
@ -12,8 +13,13 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent
* watchers is to process events from the TreeCache and emit processed events
* further down the pipeline
*/
@TypeChecked
abstract class AbstractTreeWatcher implements TreeCacheListener {
protected Map consumersMap = [:]
AbstractTreeWatcher() { }
AbstractTreeWatcher(Map consumers) {
this.consumersMap = consumers
}
/**
* Process the ChildData associated with an event
@ -32,7 +38,23 @@ abstract class AbstractTreeWatcher implements TreeCacheListener {
ConsumerOffset offset = processChildData(event?.data)
if (offset != null) {
println offset
trackConsumerOffset(offset)
}
}
/**
*
*/
void trackConsumerOffset(ConsumerOffset offset) {
if (this.consumersMap == null) {
return
}
if (this.consumersMap.containsKey(offset.topic)) {
this.consumersMap[offset.topic] << offset
}
else {
this.consumersMap[offset.topic] = [offset]
}
}

View File

@ -12,6 +12,14 @@ class ConsumerOffset {
private Integer partition
private ChildData rawData
ConsumerOffset() {
}
ConsumerOffset(String topic, Integer partition, Integer offset) {
this.topic = topic
this.partition = partition
this.offset = offset
}
String toString() {
return "ConsumerOffset[${hashCode()}] ${topic}:${partition} ${groupName} is at ${offset}"

View File

@ -1,11 +1,13 @@
package com.github.lookout.verspaetung.zk
import groovy.transform.TypeChecked
import org.apache.curator.framework.recipes.cache.ChildData
/**
* StandardTreeWatcher processes Zookeeper paths for standard high-level Kafka
* consumers
*/
@TypeChecked
class StandardTreeWatcher extends AbstractTreeWatcher {
/**
@ -21,9 +23,9 @@ ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473
*/
ConsumerOffset offset = new ConsumerOffset()
List pathParts = data.path.split(/\//)
List<String> pathParts = data.path.split(/\//) as List<String>
if (pathParts.size != 6) {
if (pathParts.size() != 6) {
return null
}

View File

@ -36,4 +36,42 @@ class AbstractTreeWatcherSpec extends Specification {
expect:
watcher.isNodeEvent(event) == true
}
def "childEvent() not processChildData if the event is not to be processed"() {
given:
watcher = Spy(MockWatcher)
1 * watcher.isNodeEvent(_) >> false
0 * watcher.processChildData(_) >> null
expect:
watcher.childEvent(null, null)
}
def "trackConsumerOffset() should create a new list for new topics in the map"() {
given:
ConsumerOffset offset = new ConsumerOffset('spock-topic', 0, 1337)
when:
watcher.trackConsumerOffset(offset)
then:
watcher.consumersMap.size() == 1
}
def "trackConsumerOffset() should append to a list for existing topics in the map"() {
given:
String topic = 'spock-topic'
ConsumerOffset offset = new ConsumerOffset(topic, 0, 1337)
ConsumerOffset secondOffset = new ConsumerOffset(topic, 1, 0)
when:
watcher.trackConsumerOffset(offset)
watcher.trackConsumerOffset(secondOffset)
then:
watcher.consumersMap.size() == 1
watcher.consumersMap[topic].size() == 2
}
}