Introduce the TreeWatcher objects for transforming events into something tangible

References #2
This commit is contained in:
R. Tyler Croy 2015-01-17 16:46:28 -08:00
parent 86878d5b10
commit afd2ee8567
7 changed files with 193 additions and 19 deletions

View File

@ -10,10 +10,8 @@ information, Verspätung computs the delta for each of the consumer groups and
reports it to statsd.
### Hacking
* *Running tests:* `./gradlew check`
* *Running the app locally:* `./gradlew run`
* *Running the app locally:* `./gradlew run -PzookeeperHosts=localhost:2181`
* *Building the app for distribution:* `./gradlew shadowJar`

View File

@ -10,33 +10,23 @@ import org.apache.curator.framework.recipes.cache.TreeCacheListener
class Main {
static void main(String[] args) {
println "Running ${args}"
// XXX: Early exit until testing
return
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(args[0], retry)
client.start()
TreeCache cache = new TreeCache(client, '/kafka_spout')
TreeCache cache = new TreeCache(client, '/consumers')
println cache
cache.listenable.addListener([
childEvent: { cl, ev ->
println "EV: ${ev}"
}
] as TreeCacheListener)
cache.listenable.addListener(new zk.StandardTreeWatcher())
cache.start()
println 'started..'
Boolean foundChildren = false
Map children = null
Thread.sleep(5 * 1000)
while ((children == null) || children.isEmpty()) {
children = cache.getCurrentChildren('/kafka_spout')
println "CHILDREN: ${children}"
Thread.sleep(100)
}
[1, 2, 3].each { Thread.sleep(300) }
println 'exiting..'
return
client.children.forPath('/consumers').each { path ->

View File

@ -0,0 +1,50 @@
package com.github.lookout.verspaetung.zk
import groovy.transform.TypeChecked
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheListener
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
* AbstractTreeWatcher defines the contract and base components for the various
* Zookeeper tree watchers Verspaetung needs. The responsibility of these
* watchers is to process events from the TreeCache and emit processed events
* further down the pipeline
*/
@TypeChecked
abstract class AbstractTreeWatcher implements TreeCacheListener {
/**
* Process the ChildData associated with an event
*/
abstract ConsumerOffset processChildData(ChildData data)
/**
* Primary TreeCache event processing callback
*/
void childEvent(CuratorFramework client, TreeCacheEvent event) {
/* bail out early if we don't care about the event */
if (!isNodeEvent(event)) {
return
}
ConsumerOffset offset = processChildData(event?.data)
if (offset != null) {
println offset
}
}
/**
* Return true if the TreeCacheEvent received pertains to a node event that
* we're interested in
*/
Boolean isNodeEvent(TreeCacheEvent event) {
if ((event?.type == TreeCacheEvent.Type.NODE_ADDED) ||
(event?.type == TreeCacheEvent.Type.NODE_UPDATED)) {
return true
}
return false
}
}

View File

@ -0,0 +1,20 @@
package com.github.lookout.verspaetung.zk
import org.apache.curator.framework.recipes.cache.ChildData
/**
* POJO representing data from Zookeeper for a consumer, topic and offset
*/
class ConsumerOffset {
private String topic
private String groupName
private Integer offset
private Integer partition
private ChildData rawData
String toString() {
return "ConsumerOffset[${hashCode()}] ${topic}:${partition} ${groupName} is at ${offset}"
}
}

View File

@ -0,0 +1,37 @@
package com.github.lookout.verspaetung.zk
import org.apache.curator.framework.recipes.cache.ChildData
/**
* StandardTreeWatcher processes Zookeeper paths for standard high-level Kafka
* consumers
*/
class StandardTreeWatcher extends AbstractTreeWatcher {
/**
* Extract the necessary information from a standard (i.e. high-level Kafka
* consumer) tree of offsets
*/
ConsumerOffset processChildData(ChildData data) {
if (data == null) {
return null
}
/*
ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473,8595174478,1416808804928,1416808805262,1,0,0,0,1,0,8595174473, data=[48]}
*/
ConsumerOffset offset = new ConsumerOffset()
List pathParts = data.path.split(/\//)
if (pathParts.size != 6) {
return null
}
offset.groupName = pathParts[2]
offset.topic = pathParts[4]
offset.partition = new Integer(pathParts[5])
offset.offset = new Integer(new String(data.data))
return offset
}
}

View File

@ -0,0 +1,39 @@
package com.github.lookout.verspaetung.zk
import spock.lang.*
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class AbstractTreeWatcherSpec extends Specification {
private AbstractTreeWatcher watcher
class MockWatcher extends AbstractTreeWatcher {
ConsumerOffset processChildData(ChildData d) { }
}
def setup() {
this.watcher = new MockWatcher()
}
def "isNodeEvent() returns false by default"() {
expect:
watcher.isNodeEvent(null) == false
}
def "isNodeEvent() return true for NODE_ADDED"() {
given:
def event = new TreeCacheEvent(TreeCacheEvent.Type.NODE_ADDED, null)
expect:
watcher.isNodeEvent(event) == true
}
def "isNodeEvent() return true for NODE_UPDATED"() {
given:
def event = new TreeCacheEvent(TreeCacheEvent.Type.NODE_UPDATED, null)
expect:
watcher.isNodeEvent(event) == true
}
}

View File

@ -0,0 +1,40 @@
package com.github.lookout.verspaetung.zk
import spock.lang.*
import org.apache.curator.framework.recipes.cache.ChildData
class StandardTreeWatcherSpec extends Specification {
private StandardTreeWatcher watcher
def setup() {
this.watcher = new StandardTreeWatcher()
}
def "processChildData should return null if null is given"() {
expect:
watcher.processChildData(null) == null
}
def "processChildData should return null if the path is invalid"() {
given:
ChildData data = new ChildData("/consumers", null, (byte[])[48])
ConsumerOffset offset = watcher.processChildData(data)
expect:
offset == null
}
def "processChildData should create a ConsumerOffset properly"() {
given:
String path = "/consumers/my-consumer-group/offsets/thetopic/3"
ChildData data = new ChildData(path, null, (byte[])[48])
ConsumerOffset offset = watcher.processChildData(data)
expect:
offset.groupName == 'my-consumer-group'
offset.topic == 'thetopic'
offset.partition == 3
offset.offset == 0
}
}