Introducing the BrokerTreeWatcher to track changes to the broker list in Zookeeper

This commit includes a lot of work in progress kind of code. STill
experimenting with how to bind the events from the Zookeeper event-driven
system, in with the KafkaPoller busy-wait-loop system.
This commit is contained in:
R. Tyler Croy 2015-01-19 11:45:22 -08:00
parent c53413d5b9
commit 7e593f1235
5 changed files with 198 additions and 6 deletions

View File

@ -0,0 +1,21 @@
package com.github.lookout.verspaetung
/**
* POJO containing the necessary information to model a Kafka broker
*/
class KafkaBroker {
private String host
private Integer port
private Integer brokerId
public KafkaBroker(Object jsonObject, Integer brokerId) {
this.host = jsonObject.host
this.port = jsonObject.port
this.brokerId = brokerId
}
@Override
String toString() {
return "broker<${this.brokerId}>@${this.host}:${this.port}"
}
}

View File

@ -0,0 +1,45 @@
package com.github.lookout.verspaetung
import groovy.transform.TypeChecked
@TypeChecked
class KafkaPoller extends Thread {
private Boolean keepRunning = true
private List<KafkaBroker> brokers
private Boolean shouldReconnect = false
void run() {
while (keepRunning) {
println 'kafka poll'
if (shouldReconnect) {
reconnect()
}
Thread.sleep(1 * 1000)
}
}
/**
* Blocking reconnect to the Kafka brokers
*/
void reconnect() {
println "reconnecting"
}
/**
* Signal the runloop to safely die after it's next iteration
*/
void die() {
this.keepRunning = false
}
/**
* Store a new list of KafkaBroker objects and signal a reconnection
*/
void refresh(List brokers) {
println "refresh: ${brokers}"
this.brokers = brokers
this.shouldReconnect = true
}
}

View File

@ -1,5 +1,7 @@
package com.github.lookout.verspaetung
import com.github.lookout.verspaetung.zk.BrokerTreeWatcher
import java.util.concurrent.ConcurrentHashMap
import groovy.transform.TypeChecked
@ -10,28 +12,38 @@ import org.apache.zookeeper.KeeperException
import org.apache.curator.framework.recipes.cache.TreeCache
import org.apache.curator.framework.recipes.cache.TreeCacheListener
@TypeChecked
import kafka.client.ClientUtils
//@TypeChecked
class Main {
static void main(String[] args) {
println "Running ${args}"
// XXX: Early exit until testing
//return
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(args[0], retry)
ConcurrentHashMap<TopicPartition, List<zk.ConsumerOffset>> consumers = new ConcurrentHashMap()
client.start()
TreeCache cache = new TreeCache(client, '/consumers')
println cache
client.start()
TreeCache cache = new TreeCache(client, '/consumers')
cache.listenable.addListener(new zk.StandardTreeWatcher(consumers))
KafkaPoller poller = new KafkaPoller()
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client)
brokerWatcher.onBrokerUpdates = { brokers ->
poller.refresh(brokers)
}
poller.start()
brokerWatcher.start()
cache.start()
println 'started..'
Thread.sleep(5 * 1000)
println 'exiting..'
poller.die()
return
}
}

View File

@ -0,0 +1,85 @@
package com.github.lookout.verspaetung.zk
import com.github.lookout.verspaetung.KafkaBroker
import groovy.json.JsonSlurper
import groovy.transform.TypeChecked
import java.util.Collections
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCache
import org.apache.curator.framework.recipes.cache.TreeCacheListener
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
* The BrokerTreeWatcher is a kind of watcher whose sole purpose is
* to watch the segment of the Zookeeper tree where Kafka stores broker
* information
*/
@TypeChecked
class BrokerTreeWatcher implements TreeCacheListener {
static final Integer INVALID_BROKER_ID = -1
private JsonSlurper json
private TreeCache cache
private final String BROKERS_PATH = '/brokers/ids'
private Closure onBrokerUpdates
private Boolean isTreeInitialized = false
private List<KafkaBroker> brokers
BrokerTreeWatcher(CuratorFramework client) {
this.json = new JsonSlurper()
this.cache = new TreeCache(client, BROKERS_PATH)
this.cache.listenable.addListener(this)
this.brokers = []
}
/**
* Start our internal cache
*/
void start() {
this.cache?.start()
}
/**
* Process events like NODE_ADDED and NODE_REMOVED to keep an up to date
* list of brokers
*/
void childEvent(CuratorFramework client, TreeCacheEvent event) {
/* If we're initialized that means we should have all our brokers in
* our internal list already and we can fire an event
*/
if (event.type == TreeCacheEvent.Type.INITIALIZED) {
this.isTreeInitialized = true
this.onBrokerUpdates?.call(Collections.synchronizedList(this.brokers))
return
}
if (event.type != TreeCacheEvent.Type.NODE_ADDED) {
println event
return
}
ChildData nodeData = event.data
Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8'))
this.brokers << new KafkaBroker(brokerData, brokerIdFromPath(nodeData.path))
}
/**
* Process a path string from Zookeeper for the Kafka broker's ID
*
* We're expecting paths like: /brokers/ids/1231524312
*/
Integer brokerIdFromPath(String path) {
List<String> pathParts = path?.split('/') as List
if ((pathParts == null) ||
(pathParts.size() != 4)) {
return INVALID_BROKER_ID
}
return new Integer(pathParts[-1])
}
}

View File

@ -0,0 +1,29 @@
package com.github.lookout.verspaetung.zk
import spock.lang.*
class BrokerTreeWatcherSpec extends Specification {
BrokerTreeWatcher watcher
def setup() {
this.watcher = new BrokerTreeWatcher()
}
def "brokerIdFromPath() should return the right ID with a valid path"() {
given:
String path = "/brokers/ids/1337"
expect:
watcher.brokerIdFromPath(path) == 1337
}
def "brokerIdFromPath() should return -1 on null paths"() {
expect:
watcher.brokerIdFromPath(null) == -1
}
def "brokerIdFromPath() should return -1 on empty/invalid paths"() {
expect:
watcher.brokerIdFromPath('/spock/ed') == -1
}
}