Implement the KafkaSpoutTreeWatcher for processing Storm KafkaSpout offset data

Fixes #9
This commit is contained in:
R. Tyler Croy 2015-01-28 02:45:49 -08:00
parent b4b9fe9860
commit d0c99b9a34
8 changed files with 115 additions and 26 deletions

View File

@ -1,6 +1,7 @@
package com.github.lookout.verspaetung package com.github.lookout.verspaetung
import com.github.lookout.verspaetung.zk.BrokerTreeWatcher import com.github.lookout.verspaetung.zk.BrokerTreeWatcher
import com.github.lookout.verspaetung.zk.KafkaSpoutTreeWatcher
import com.github.lookout.verspaetung.zk.StandardTreeWatcher import com.github.lookout.verspaetung.zk.StandardTreeWatcher
import java.util.AbstractMap import java.util.AbstractMap
@ -56,9 +57,16 @@ class Main {
client.start() client.start()
KafkaPoller poller = setupKafkaPoller(consumers, statsd, cli.hasOption('n')) KafkaPoller poller = setupKafkaPoller(consumers, statsd, cli.hasOption('n'))
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client) BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client).start()
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, consumers) StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, consumers).start()
/* Assuming that most people aren't needing to run Storm-based watchers
* as well
*/
if (cli.hasOption('s')) {
KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, consumers)
stormWatcher.start()
}
consumerWatcher.onInitComplete << { consumerWatcher.onInitComplete << {
logger.info("standard consumers initialized to ${consumers.size()} (topic, partition) tuples") logger.info("standard consumers initialized to ${consumers.size()} (topic, partition) tuples")
@ -68,9 +76,6 @@ class Main {
poller.refresh(brokers) poller.refresh(brokers)
} }
brokerWatcher.start()
consumerWatcher.start()
logger.info("Started wait loop...") logger.info("Started wait loop...")
while (true) { while (true) {
@ -139,10 +144,15 @@ class Main {
.withLongOpt('dry-run') .withLongOpt('dry-run')
.create('n') .create('n')
Option stormSpouts = OptionBuilder.withDescription('Watch Storm KafkaSpout offsets (under /kafka_spout)')
.withLongOpt('storm')
.create('s')
options.addOption(zookeeper) options.addOption(zookeeper)
options.addOption(statsdHost) options.addOption(statsdHost)
options.addOption(statsdPort) options.addOption(statsdPort)
options.addOption(dryRun) options.addOption(dryRun)
options.addOption(stormSpouts)
return options return options
} }

View File

@ -24,6 +24,11 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
*/ */
abstract ConsumerOffset processChildData(ChildData data) abstract ConsumerOffset processChildData(ChildData data)
/**
* Determine whether a given path is of interest, i.e. path which contains
* offset data
*/
abstract Boolean isOffsetSubtree(String path)
/** /**
* Primary TreeCache event processing callback * Primary TreeCache event processing callback
*/ */
@ -39,6 +44,11 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
return return
} }
if ((event.data == null) ||
(!isOffsetSubtree(event.data?.path))) {
return
}
ConsumerOffset offset = processChildData(event?.data) ConsumerOffset offset = processChildData(event?.data)
if (offset != null) { if (offset != null) {

View File

@ -38,10 +38,11 @@ abstract class AbstractTreeWatcher implements TreeCacheListener {
abstract String zookeeperPath() abstract String zookeeperPath()
/** /**
* Start our internal cache * Start our internal cache and return ourselves for API cleanliness
*/ */
void start() { AbstractTreeWatcher start() {
this.cache?.start() this.cache?.start()
return this
} }
abstract void childEvent(CuratorFramework client, TreeCacheEvent event) abstract void childEvent(CuratorFramework client, TreeCacheEvent event)

View File

@ -0,0 +1,53 @@
package com.github.lookout.verspaetung.zk
import groovy.json.JsonSlurper
import groovy.transform.TypeChecked
import groovy.transform.TypeCheckingMode
import groovy.transform.InheritConstructors
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
/**
* KafkaSpoutTreeWatcher process Zookeeper paths normally associated with Storm
* KafkaSpout based consumers
*/
@TypeChecked
@InheritConstructors
class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher {
private static final String ZK_PATH = '/kafka_spout'
private JsonSlurper json
KafkaSpoutTreeWatcher(CuratorFramework client, AbstractMap consumersMap) {
super(client, consumersMap)
this.json = new JsonSlurper()
}
String zookeeperPath() { return ZK_PATH }
/* skipping type checking since Groovy's JsonSlurper gives us a pretty
* loose Object to deal with
*/
@TypeChecked(TypeCheckingMode.SKIP)
ConsumerOffset processChildData(ChildData nodeData) {
Object offsetData = json.parseText(new String(nodeData.data, 'UTF-8'))
/*
[broker:[host:REDACTED, port:6667], offset:179, partition:7, topic:device_data, topology:[id:01c0d1fc-e956-4b35-9891-dd835488cf45, name:unwrap_topology]]
*/
ConsumerOffset offset = new ConsumerOffset()
offset.groupName = offsetData.topology.name
offset.topic = offsetData.topic
offset.partition = offsetData.partition
offset.offset = offsetData.offset
return offset
}
/**
* We're expecting things to look like:
* /kafka_spout/topologyname/partition_0
*/
Boolean isOffsetSubtree(String path) {
return (path =~ /\/kafka_spout\/(.*)\/partition_(\d+)/)
}
}

View File

@ -13,26 +13,13 @@ import org.apache.curator.framework.recipes.cache.ChildData
class StandardTreeWatcher extends AbstractConsumerTreeWatcher { class StandardTreeWatcher extends AbstractConsumerTreeWatcher {
private static final String ZK_PATH = '/consumers' private static final String ZK_PATH = '/consumers'
String zookeeperPath() { String zookeeperPath() { return ZK_PATH }
return ZK_PATH
}
/** /**
* Extract the necessary information from a standard (i.e. high-level Kafka * Extract the necessary information from a standard (i.e. high-level Kafka
* consumer) tree of offsets * consumer) tree of offsets
*/ */
ConsumerOffset processChildData(ChildData data) { ConsumerOffset processChildData(ChildData data) {
if (data == null) {
return null
}
/* There are non-offset related subtrees in /consumers that we don't
* care about, let's just skip over them
*/
if (!isOffsetSubtree(data.path)) {
return null
}
/* /*
ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473,8595174478,1416808804928,1416808805262,1,0,0,0,1,0,8595174473, data=[48]} ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473,8595174478,1416808804928,1416808805262,1,0,0,0,1,0,8595174473, data=[48]}
*/ */

View File

@ -17,6 +17,7 @@ class AbstractConsumerTreeWatcherSpec extends Specification {
} }
ConsumerOffset processChildData(ChildData d) { } ConsumerOffset processChildData(ChildData d) { }
String zookeeperPath() { return '/zk/spock' } String zookeeperPath() { return '/zk/spock' }
Boolean isOffsetSubtree(String p) { return true }
} }
def setup() { def setup() {

View File

@ -0,0 +1,32 @@
package com.github.lookout.verspaetung.zk
import spock.lang.*
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
class KafkaSpoutTreeWatcherSpec extends Specification {
private KafkaSpoutTreeWatcher watcher
private CuratorFramework mockCurator
def setup() {
this.mockCurator = Mock(CuratorFramework)
this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, [:])
}
def "isOffsetSubtree should return true for a valid subtree path"() {
given:
String path = '/kafka_spout/spock-topology/partition_1'
expect:
watcher.isOffsetSubtree(path) == true
}
def "isOffsetSubtree should return false for a non-offset subtree path"() {
given:
String path = '/kafka_spout/spock-topology'
expect:
watcher.isOffsetSubtree(path) == false
}
}

View File

@ -14,11 +14,6 @@ class StandardTreeWatcherSpec extends Specification {
this.watcher = new StandardTreeWatcher(this.mockCurator, [:]) this.watcher = new StandardTreeWatcher(this.mockCurator, [:])
} }
def "processChildData should return null if null is given"() {
expect:
watcher.processChildData(null) == null
}
def "processChildData should return null if the path is invalid"() { def "processChildData should return null if the path is invalid"() {
given: given:
ChildData data = new ChildData("/consumers", null, (byte[])[48]) ChildData data = new ChildData("/consumers", null, (byte[])[48])