refactor the BrokerTreeWatcher

split things into smaller pieces and as preparation of having only
ONE TreeWatcher running which itself dispatch the events to the interresting
parties.

Sponsored by Lookout Inc.
This commit is contained in:
Christian Meier 2015-11-10 16:53:27 +01:00
parent 618aaf915e
commit 7a6ef71eef
9 changed files with 389 additions and 86 deletions

View File

@ -37,7 +37,7 @@ class KafkaPoller extends Thread {
this.brokerConsumerMap = [:]
this.brokers = []
this.onDelta = []
setName("Verspaetung Kafka Poller")
setName('Verspaetung Kafka Poller')
}
/* There are a number of cases where we intentionally swallow stacktraces
@ -89,7 +89,7 @@ class KafkaPoller extends Thread {
}
}
@SuppressWarnings(['CatchException'])
@SuppressWarnings('CatchException')
private void dumpMetadata() {
LOGGER.debug('dumping meta-data')
@ -158,6 +158,7 @@ class KafkaPoller extends Thread {
/**
* Blocking reconnect to the Kafka brokers
*/
@SuppressWarnings('CatchException')
private void reconnect() {
disconnectConsumers()
LOGGER.info('Creating SimpleConsumer connections for brokers {}', this.brokers)
@ -172,7 +173,7 @@ class KafkaPoller extends Thread {
consumer.connect()
this.brokerConsumerMap[broker.id] = consumer
}
catch(Exception e) {
catch (Exception e) {
LOGGER.info('Error connecting cunsumer to {}', broker, e)
}
}
@ -187,13 +188,14 @@ class KafkaPoller extends Thread {
this.keepRunning = false
}
@SuppressWarnings('CatchException')
private void disconnectConsumers() {
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
LOGGER.info('Disconnecting {}', client)
try {
client?.disconnect()
}
catch(Exception e) {
catch (Exception e) {
LOGGER.info('Error disconnecting {}', client, e)
}
}

View File

@ -0,0 +1,73 @@
package com.github.reiseburo.verspaetung.zk
import com.github.reiseburo.verspaetung.KafkaBroker
import groovy.transform.TypeChecked
import org.slf4j.Logger
import org.slf4j.LoggerFactory
/**
* manage a list of brokers. can be online or offline. offline means the
* internal list is hidden, i.e. the list() gives you an empty list.
*/
@TypeChecked
class BrokerManager {
private static final Logger logger = LoggerFactory.getLogger(BrokerManager)
private final List<KafkaBroker> brokers = Collections.synchronizedList([])
// we start with being offline
private boolean offline = true
void add(KafkaBroker broker) {
synchronized(brokers) {
if (broker != null && this.brokers.indexOf(broker) == -1) {
this.brokers.add(broker)
logger.info('broker added: {}', broker)
}
}
}
void update(KafkaBroker broker) {
synchronized(brokers) {
if (broker == null) {
return
}
if (this.brokers.indexOf(broker) != -1) {
this.brokers.remove(broker)
}
this.brokers.add(broker)
logger.info('broker updated: {}', broker)
}
}
void remove(KafkaBroker broker) {
synchronized(brokers) {
if (broker != null && this.brokers.remove(broker)) {
logger.info('broker removed: {}', broker)
}
}
}
// TODO not sure if this is correct - see BrokerTreeWatcher
@SuppressWarnings('ConfusingMethodName')
void offline() {
this.offline = true
}
// TODO not sure if this is correct - see BrokerTreeWatcher
void online() {
this.offline = false
}
Collection<KafkaBroker> list() {
if (this.offline) {
[]
}
else {
this.brokers
}
}
}

View File

@ -6,7 +6,6 @@ import groovy.json.JsonSlurper
import groovy.transform.TypeChecked
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
@ -16,19 +15,17 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent
*/
@TypeChecked
class BrokerTreeWatcher extends AbstractTreeWatcher {
static final Integer INVALID_BROKER_ID = -1
private static final String BROKERS_PATH = '/brokers/ids'
private Boolean isTreeInitialized = false
private final JsonSlurper json
private final List<Closure> onBrokerUpdates
private final List<KafkaBroker> brokers
private final BrokerManager manager
private final PojoFactory factory
BrokerTreeWatcher(CuratorFramework client) {
super(client)
this.json = new JsonSlurper()
this.brokers = Collections.synchronizedList([])
this.factory = new PojoFactory(new JsonSlurper())
this.manager = new BrokerManager()
this.onBrokerUpdates = []
}
@ -37,57 +34,39 @@ class BrokerTreeWatcher extends AbstractTreeWatcher {
}
/**
* Process events like NODE_ADDED and NODE_REMOVED to keep an up to date
* list of brokers
* Process events to keep an up to date list of brokers
*/
@Override
void childEvent(CuratorFramework client, TreeCacheEvent event) {
/* If we're initialized that means we should have all our brokers in
* our internal list already and we can fire an event
*/
if (event.type == TreeCacheEvent.Type.INITIALIZED) {
this.isTreeInitialized = true
this.onBrokerUpdates.each { Closure c ->
c?.call(this.brokers)
}
return
switch (event.type) {
case TreeCacheEvent.Type.INITIALIZED:
this.manager.online()
break
case TreeCacheEvent.Type.NODE_ADDED:
KafkaBroker broker = this.factory.createKafkaBroker(event)
this.manager.add(broker)
break
case TreeCacheEvent.Type.NODE_UPDATED:
KafkaBroker broker = this.factory.createKafkaBroker(event)
this.manager.update(broker)
break
case TreeCacheEvent.Type.NODE_REMOVED:
KafkaBroker broker = this.factory.createKafkaBroker(event)
this.manager.remove(broker)
break
// TODO these 3 events might come with path which can be mapped
// to a specific broker
case TreeCacheEvent.Type.CONNECTION_LOST:
case TreeCacheEvent.Type.CONNECTION_SUSPENDED:
this.manager.offline()
break
case TreeCacheEvent.Type.CONNECTION_RECONNECTED:
this.manager.online()
break
}
if (event.type != TreeCacheEvent.Type.NODE_ADDED) {
return
this.onBrokerUpdates.each { Closure c ->
c?.call(this.manager.list())
}
ChildData nodeData = event.data
Integer brokerId = brokerIdFromPath(nodeData.path)
if (brokerId == INVALID_BROKER_ID) {
return
}
Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8'))
this.brokers << new KafkaBroker(brokerData, brokerId)
if (this.isTreeInitialized) {
this.onBrokerUpdates.each { Closure c ->
c?.call(this.brokers)
}
}
}
/**
* Process a path string from Zookeeper for the Kafka broker's ID
*
* We're expecting paths like: /brokers/ids/1231524312
*/
Integer brokerIdFromPath(String path) {
List<String> pathParts = path?.split('/') as List
if ((pathParts == null) ||
(pathParts.size() != 4)) {
return INVALID_BROKER_ID
}
return new Integer(pathParts[-1])
}
}

View File

@ -0,0 +1,42 @@
package com.github.reiseburo.verspaetung.zk
import groovy.transform.TypeChecked
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
* parsing TreeCacheEvents and provide helper method to access its data in typed manner
*/
@TypeChecked
class EventData {
private final String data
private final List<String> pathParts
EventData(TreeCacheEvent event) {
ChildData data = event.data
this.pathParts = data.path == null ? null : (data.path.split('/') as List)
this.data = data.data == null ? null : new String(data.data, 'UTF-8')
}
Integer asInteger() {
new Integer(data)
}
String asString() {
data
}
Integer pathPartsSize() {
pathParts.size()
}
Integer getPathPartAsInteger(int pos) {
if (pathParts.size() <= pos) {
return null
}
new Integer(pathParts[pos])
}
}

View File

@ -0,0 +1,42 @@
package com.github.reiseburo.verspaetung.zk
import com.github.reiseburo.verspaetung.KafkaBroker
import groovy.json.JsonSlurper
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
* factory creating Pojo's from TreeCacheEvents
*/
class PojoFactory {
private final JsonSlurper json
PojoFactory(JsonSlurper json) {
this.json = json
}
/**
* converts an treeCacheEvent into a KafkaBroker. the caller
* is responsible for calling the factory method with the
* right data. i.e. path starts with /brokers/ids. the implementation
* just uses whatever it is given to create a KafkaBroker object
*/
KafkaBroker createKafkaBroker(TreeCacheEvent event) {
EventData data = new EventData(event)
// We're expecting paths like: /brokers/ids/1231524312
Integer id = data.getPathPartAsInteger(3)
if (id == null) {
return
}
String json = data.asString()
if (json == null) {
new KafkaBroker('', 0, id)
}
else {
Object payload = this.json.parseText(json)
new KafkaBroker(payload.host, payload.port, id)
}
}
}

View File

@ -0,0 +1,95 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import groovy.json.JsonSlurper
import com.github.reiseburo.verspaetung.KafkaBroker
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class BrokerManagerSpec extends Specification {
BrokerManager manager = new BrokerManager()
def "new instance is offline, i.e. has empty list of brokers"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
expect:
manager.list().size() == 0
}
def "new instance is offline after add broker"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
manager.add(broker)
expect:
manager.list().size() == 0
}
def "list shows brokers after getting online"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('', 0, 123)
manager.add(broker)
manager.online()
manager.remove(broker2)
expect:
manager.list().size() == 1
}
def "can remove brokers based on its id"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
manager.add(broker)
manager.online()
manager.remove(broker2)
expect:
manager.list().size() == 0
}
def "can update brokers"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
manager.online()
manager.add(broker)
manager.update(broker2)
expect:
manager.list().size() == 1
manager.list().first().host == 'localhost'
}
def "can go offline anytime"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
manager.online()
manager.add(broker)
manager.add(broker2)
manager.offline()
expect:
manager.list().size() == 0
}
def "can go offline and online again"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
manager.online()
manager.add(broker)
manager.add(broker2)
manager.offline()
manager.online()
expect:
manager.list().size() == 2
}
}

View File

@ -1,29 +0,0 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
class BrokerTreeWatcherSpec extends Specification {
BrokerTreeWatcher watcher
def setup() {
this.watcher = new BrokerTreeWatcher()
}
def "brokerIdFromPath() should return the right ID with a valid path"() {
given:
String path = "/brokers/ids/1337"
expect:
watcher.brokerIdFromPath(path) == 1337
}
def "brokerIdFromPath() should return -1 on null paths"() {
expect:
watcher.brokerIdFromPath(null) == -1
}
def "brokerIdFromPath() should return -1 on empty/invalid paths"() {
expect:
watcher.brokerIdFromPath('/spock/ed') == -1
}
}

View File

@ -0,0 +1,47 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class EventDataSpec extends Specification {
EventData data
def "converts TreeCacheEvents to EventData"() {
given:
String path = "/brokers/ids/1337"
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, "be happy".bytes)))
expect:
data.getPathPartAsInteger(3) == 1337
data.asString() == 'be happy'
data.pathPartsSize() == 4
data.getPathPartAsInteger(4) == null
}
def "converts TreeCacheEvents to EventData with integer payload and no path"() {
given:
String path = "/brokers/ids/1337"
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData("/", null, "42".bytes)))
expect:
data.asString() == '42'
data.asInteger() == 42
data.pathPartsSize() == 0
data.getPathPartAsInteger(0) == null
}
def "converts TreeCacheEvents to EventData with no payload"() {
given:
String path = "/brokers/ids/1337"
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData("/123", null, null)))
expect:
data.asString() == null
data.pathPartsSize() == 2
data.getPathPartAsInteger(1) == 123
}
}

View File

@ -0,0 +1,52 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import groovy.json.JsonSlurper
import com.github.reiseburo.verspaetung.KafkaBroker
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class PojoFactorySpec extends Specification {
PojoFactory factory = new PojoFactory(new JsonSlurper())
def "create KafkaBroker from TreeCacheEvent"() {
given:
String path = "/brokers/ids/1337"
String payload = "{\"host\":\"localhost\",\"port\":9092}"
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, payload.bytes))
KafkaBroker broker = factory.createKafkaBroker(event)
expect:
broker.brokerId == 1337
broker.host == 'localhost'
broker.port == 9092
}
def "create KafkaBroker from TreeCacheEvent without brokerId on path"() {
given:
String path = "/brokers/ids"
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, ''.bytes))
KafkaBroker broker = factory.createKafkaBroker(event)
expect:
broker == null
}
def "create KafkaBroker from TreeCacheEvent without payload"() {
given:
String path = "/brokers/ids/123"
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, null))
KafkaBroker broker = factory.createKafkaBroker(event)
expect:
broker.brokerId == 123
broker.host == ''
broker.port == 0
}
}