Merge pull request #37 from mkristian/broker-watcher-refactor

Broker watcher refactor
This commit is contained in:
R. Tyler Croy 2015-12-26 15:13:56 -08:00
commit 48a258c3bc
11 changed files with 425 additions and 96 deletions

View File

@ -8,12 +8,30 @@ class KafkaBroker {
final private Integer port
final private Integer brokerId
KafkaBroker(Object jsonObject, Integer brokerId) {
this.host = jsonObject.host
this.port = jsonObject.port
KafkaBroker(String host, Integer port, Integer brokerId) {
this.host = host
this.port = port
this.brokerId = brokerId
}
@Override
int hashCode() {
return this.brokerId
}
@Override
boolean equals(Object compared) {
if (this.is(compared)) {
return true
}
if (!(compared instanceof KafkaBroker)) {
return false
}
return compared.brokerId == brokerId
}
@Override
String toString() {
return "broker<${this.brokerId}>@${this.host}:${this.port}"

View File

@ -37,6 +37,7 @@ class KafkaPoller extends Thread {
this.brokerConsumerMap = [:]
this.brokers = []
this.onDelta = []
setName('Verspaetung Kafka Poller')
}
/* There are a number of cases where we intentionally swallow stacktraces
@ -88,7 +89,7 @@ class KafkaPoller extends Thread {
}
}
@SuppressWarnings(['CatchException'])
@SuppressWarnings('CatchException')
private void dumpMetadata() {
LOGGER.debug('dumping meta-data')
@ -157,18 +158,24 @@ class KafkaPoller extends Thread {
/**
* Blocking reconnect to the Kafka brokers
*/
@SuppressWarnings('CatchException')
private void reconnect() {
disconnectConsumers()
LOGGER.info('Creating SimpleConsumer connections for brokers')
LOGGER.info('Creating SimpleConsumer connections for brokers {}', this.brokers)
synchronized(this.brokers) {
this.brokers.each { Broker b ->
SimpleConsumer consumer = new SimpleConsumer(b.host,
b.port,
this.brokers.each { Broker broker ->
SimpleConsumer consumer = new SimpleConsumer(broker.host,
broker.port,
KAFKA_TIMEOUT,
KAFKA_BUFFER,
KAFKA_CLIENT_ID)
consumer.connect()
this.brokerConsumerMap[b.id] = consumer
try {
consumer.connect()
this.brokerConsumerMap[broker.id] = consumer
}
catch (Exception e) {
LOGGER.info('Error connecting cunsumer to {}', broker, e)
}
}
}
this.shouldReconnect = false
@ -181,10 +188,16 @@ class KafkaPoller extends Thread {
this.keepRunning = false
}
@SuppressWarnings('CatchException')
private void disconnectConsumers() {
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
LOGGER.info('Disconnecting {}', client)
client?.disconnect()
try {
client?.disconnect()
}
catch (Exception e) {
LOGGER.info('Error disconnecting {}', client, e)
}
}
}

View File

@ -169,9 +169,6 @@ class Main {
}
});
logger.info('Starting wait loop...')
synchronized(this) {
wait()
}
}
static void registerMetricFor(KafkaConsumer consumer,

View File

@ -0,0 +1,73 @@
package com.github.reiseburo.verspaetung.zk
import com.github.reiseburo.verspaetung.KafkaBroker
import groovy.transform.TypeChecked
import org.slf4j.Logger
import org.slf4j.LoggerFactory
/**
* manage a list of brokers. can be online or offline. offline means the
* internal list is hidden, i.e. the list() gives you an empty list.
*/
@TypeChecked
class BrokerManager {
private static final Logger logger = LoggerFactory.getLogger(BrokerManager)
private final List<KafkaBroker> brokers = Collections.synchronizedList([])
// we start with being offline
private boolean offline = true
void add(KafkaBroker broker) {
synchronized(brokers) {
if (broker != null && this.brokers.indexOf(broker) == -1) {
this.brokers.add(broker)
logger.info('broker added: {}', broker)
}
}
}
void update(KafkaBroker broker) {
synchronized(brokers) {
if (broker == null) {
return
}
if (this.brokers.indexOf(broker) != -1) {
this.brokers.remove(broker)
}
this.brokers.add(broker)
logger.info('broker updated: {}', broker)
}
}
void remove(KafkaBroker broker) {
synchronized(brokers) {
if (broker != null && this.brokers.remove(broker)) {
logger.info('broker removed: {}', broker)
}
}
}
// TODO not sure if this is correct - see BrokerTreeWatcher
@SuppressWarnings('ConfusingMethodName')
void offline() {
this.offline = true
}
// TODO not sure if this is correct - see BrokerTreeWatcher
void online() {
this.offline = false
}
Collection<KafkaBroker> list() {
if (this.offline) {
[]
}
else {
this.brokers
}
}
}

View File

@ -6,7 +6,6 @@ import groovy.json.JsonSlurper
import groovy.transform.TypeChecked
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
@ -16,19 +15,17 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent
*/
@TypeChecked
class BrokerTreeWatcher extends AbstractTreeWatcher {
static final Integer INVALID_BROKER_ID = -1
private static final String BROKERS_PATH = '/brokers/ids'
private Boolean isTreeInitialized = false
private final JsonSlurper json
private final List<Closure> onBrokerUpdates
private final List<KafkaBroker> brokers
private final BrokerManager manager
private final PojoFactory factory
BrokerTreeWatcher(CuratorFramework client) {
super(client)
this.json = new JsonSlurper()
this.brokers = Collections.synchronizedList([])
this.factory = new PojoFactory(new JsonSlurper())
this.manager = new BrokerManager()
this.onBrokerUpdates = []
}
@ -37,57 +34,39 @@ class BrokerTreeWatcher extends AbstractTreeWatcher {
}
/**
* Process events like NODE_ADDED and NODE_REMOVED to keep an up to date
* list of brokers
* Process events to keep an up to date list of brokers
*/
@Override
void childEvent(CuratorFramework client, TreeCacheEvent event) {
/* If we're initialized that means we should have all our brokers in
* our internal list already and we can fire an event
*/
if (event.type == TreeCacheEvent.Type.INITIALIZED) {
this.isTreeInitialized = true
this.onBrokerUpdates.each { Closure c ->
c?.call(this.brokers)
}
return
switch (event.type) {
case TreeCacheEvent.Type.INITIALIZED:
this.manager.online()
break
case TreeCacheEvent.Type.NODE_ADDED:
KafkaBroker broker = this.factory.createKafkaBroker(event)
this.manager.add(broker)
break
case TreeCacheEvent.Type.NODE_UPDATED:
KafkaBroker broker = this.factory.createKafkaBroker(event)
this.manager.update(broker)
break
case TreeCacheEvent.Type.NODE_REMOVED:
KafkaBroker broker = this.factory.createKafkaBroker(event)
this.manager.remove(broker)
break
// TODO these 3 events might come with path which can be mapped
// to a specific broker
case TreeCacheEvent.Type.CONNECTION_LOST:
case TreeCacheEvent.Type.CONNECTION_SUSPENDED:
this.manager.offline()
break
case TreeCacheEvent.Type.CONNECTION_RECONNECTED:
this.manager.online()
break
}
if (event.type != TreeCacheEvent.Type.NODE_ADDED) {
return
this.onBrokerUpdates.each { Closure c ->
c?.call(this.manager.list())
}
ChildData nodeData = event.data
Integer brokerId = brokerIdFromPath(nodeData.path)
if (brokerId == INVALID_BROKER_ID) {
return
}
Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8'))
this.brokers << new KafkaBroker(brokerData, brokerId)
if (this.isTreeInitialized) {
this.onBrokerUpdates.each { Closure c ->
c?.call(this.brokers)
}
}
}
/**
* Process a path string from Zookeeper for the Kafka broker's ID
*
* We're expecting paths like: /brokers/ids/1231524312
*/
Integer brokerIdFromPath(String path) {
List<String> pathParts = path?.split('/') as List
if ((pathParts == null) ||
(pathParts.size() != 4)) {
return INVALID_BROKER_ID
}
return new Integer(pathParts[-1])
}
}

View File

@ -0,0 +1,42 @@
package com.github.reiseburo.verspaetung.zk
import groovy.transform.TypeChecked
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
* parsing TreeCacheEvents and provide helper method to access its data in typed manner
*/
@TypeChecked
class EventData {
private final String data
private final List<String> pathParts
EventData(TreeCacheEvent event) {
ChildData data = event.data
this.pathParts = data.path == null ? null : (data.path.split('/') as List)
this.data = data.data == null ? null : new String(data.data, 'UTF-8')
}
Integer asInteger() {
new Integer(data)
}
String asString() {
data
}
Integer pathPartsSize() {
pathParts.size()
}
Integer getPathPartAsInteger(int pos) {
if (pathParts.size() <= pos) {
return null
}
new Integer(pathParts[pos])
}
}

View File

@ -0,0 +1,42 @@
package com.github.reiseburo.verspaetung.zk
import com.github.reiseburo.verspaetung.KafkaBroker
import groovy.json.JsonSlurper
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
* factory creating Pojo's from TreeCacheEvents
*/
class PojoFactory {
private final JsonSlurper json
PojoFactory(JsonSlurper json) {
this.json = json
}
/**
* converts an treeCacheEvent into a KafkaBroker. the caller
* is responsible for calling the factory method with the
* right data. i.e. path starts with /brokers/ids. the implementation
* just uses whatever it is given to create a KafkaBroker object
*/
KafkaBroker createKafkaBroker(TreeCacheEvent event) {
EventData data = new EventData(event)
// We're expecting paths like: /brokers/ids/1231524312
Integer id = data.getPathPartAsInteger(3)
if (id == null) {
return
}
String json = data.asString()
if (json == null) {
new KafkaBroker('', 0, id)
}
else {
Object payload = this.json.parseText(json)
new KafkaBroker(payload.host, payload.port, id)
}
}
}

View File

@ -0,0 +1,95 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import groovy.json.JsonSlurper
import com.github.reiseburo.verspaetung.KafkaBroker
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class BrokerManagerSpec extends Specification {
BrokerManager manager = new BrokerManager()
def "new instance is offline, i.e. has empty list of brokers"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
expect:
manager.list().size() == 0
}
def "new instance is offline after add broker"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
manager.add(broker)
expect:
manager.list().size() == 0
}
def "list shows brokers after getting online"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('', 0, 123)
manager.add(broker)
manager.online()
manager.remove(broker2)
expect:
manager.list().size() == 1
}
def "can remove brokers based on its id"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
manager.add(broker)
manager.online()
manager.remove(broker2)
expect:
manager.list().size() == 0
}
def "can update brokers"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
manager.online()
manager.add(broker)
manager.update(broker2)
expect:
manager.list().size() == 1
manager.list().first().host == 'localhost'
}
def "can go offline anytime"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
manager.online()
manager.add(broker)
manager.add(broker2)
manager.offline()
expect:
manager.list().size() == 0
}
def "can go offline and online again"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
manager.online()
manager.add(broker)
manager.add(broker2)
manager.offline()
manager.online()
expect:
manager.list().size() == 2
}
}

View File

@ -1,29 +0,0 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
class BrokerTreeWatcherSpec extends Specification {
BrokerTreeWatcher watcher
def setup() {
this.watcher = new BrokerTreeWatcher()
}
def "brokerIdFromPath() should return the right ID with a valid path"() {
given:
String path = "/brokers/ids/1337"
expect:
watcher.brokerIdFromPath(path) == 1337
}
def "brokerIdFromPath() should return -1 on null paths"() {
expect:
watcher.brokerIdFromPath(null) == -1
}
def "brokerIdFromPath() should return -1 on empty/invalid paths"() {
expect:
watcher.brokerIdFromPath('/spock/ed') == -1
}
}

View File

@ -0,0 +1,47 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class EventDataSpec extends Specification {
EventData data
def "converts TreeCacheEvents to EventData"() {
given:
String path = "/brokers/ids/1337"
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, "be happy".bytes)))
expect:
data.getPathPartAsInteger(3) == 1337
data.asString() == 'be happy'
data.pathPartsSize() == 4
data.getPathPartAsInteger(4) == null
}
def "converts TreeCacheEvents to EventData with integer payload and no path"() {
given:
String path = "/brokers/ids/1337"
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData("/", null, "42".bytes)))
expect:
data.asString() == '42'
data.asInteger() == 42
data.pathPartsSize() == 0
data.getPathPartAsInteger(0) == null
}
def "converts TreeCacheEvents to EventData with no payload"() {
given:
String path = "/brokers/ids/1337"
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData("/123", null, null)))
expect:
data.asString() == null
data.pathPartsSize() == 2
data.getPathPartAsInteger(1) == 123
}
}

View File

@ -0,0 +1,52 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import groovy.json.JsonSlurper
import com.github.reiseburo.verspaetung.KafkaBroker
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class PojoFactorySpec extends Specification {
PojoFactory factory = new PojoFactory(new JsonSlurper())
def "create KafkaBroker from TreeCacheEvent"() {
given:
String path = "/brokers/ids/1337"
String payload = "{\"host\":\"localhost\",\"port\":9092}"
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, payload.bytes))
KafkaBroker broker = factory.createKafkaBroker(event)
expect:
broker.brokerId == 1337
broker.host == 'localhost'
broker.port == 9092
}
def "create KafkaBroker from TreeCacheEvent without brokerId on path"() {
given:
String path = "/brokers/ids"
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, ''.bytes))
KafkaBroker broker = factory.createKafkaBroker(event)
expect:
broker == null
}
def "create KafkaBroker from TreeCacheEvent without payload"() {
given:
String path = "/brokers/ids/123"
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, null))
KafkaBroker broker = factory.createKafkaBroker(event)
expect:
broker.brokerId == 123
broker.host == ''
broker.port == 0
}
}