Compare commits
12 Commits
Author | SHA1 | Date |
---|---|---|
R. Tyler Croy | 148359d816 | |
R. Tyler Croy | 0bac696847 | |
R. Tyler Croy | dc1cfab361 | |
R. Tyler Croy | 324b3faba7 | |
R. Tyler Croy | f901cba7e1 | |
R. Tyler Croy | 48a258c3bc | |
Christian Meier | 7a6ef71eef | |
Christian Meier | 618aaf915e | |
Christian Meier | 66c0c332bb | |
Christian Meier | 4c655995f9 | |
Christian Meier | dc918a700e | |
R. Tyler Croy | 416168cbff |
|
@ -0,0 +1,39 @@
|
||||||
|
/*
|
||||||
|
* This is a multibranch workflow file for defining how this project should be
|
||||||
|
* built, tested and deployed
|
||||||
|
*/
|
||||||
|
|
||||||
|
node {
|
||||||
|
stage 'Clean workspace'
|
||||||
|
/* Running on a fresh Docker instance makes this redundant, but just in
|
||||||
|
* case the host isn't configured to give us a new Docker image for every
|
||||||
|
* build, make sure we clean things before we do anything
|
||||||
|
*/
|
||||||
|
deleteDir()
|
||||||
|
|
||||||
|
|
||||||
|
stage 'Checkout source'
|
||||||
|
/*
|
||||||
|
* Represents the SCM configuration in a "Workflow from SCM" project build. Use checkout
|
||||||
|
* scm to check out sources matching Jenkinsfile with the SCM details from
|
||||||
|
* the build that is executing this Jenkinsfile.
|
||||||
|
*
|
||||||
|
* when not in multibranch: https://issues.jenkins-ci.org/browse/JENKINS-31386
|
||||||
|
*/
|
||||||
|
checkout scm
|
||||||
|
|
||||||
|
|
||||||
|
stage 'Build and test'
|
||||||
|
/* if we can't install everything we need for Ruby in less than 15 minutes
|
||||||
|
* we might as well just give up
|
||||||
|
*/
|
||||||
|
timeout(30) {
|
||||||
|
sh './gradlew -iS'
|
||||||
|
}
|
||||||
|
|
||||||
|
stage 'Capture test results and artifacts'
|
||||||
|
step([$class: 'JUnitResultArchiver', testResults: 'build/test-results/**/*.xml'])
|
||||||
|
step([$class: 'ArtifactArchiver', artifacts: 'build/libs/*.jar', fingerprint: true])
|
||||||
|
}
|
||||||
|
|
||||||
|
// vim: ft=groovy
|
|
@ -1,6 +1,6 @@
|
||||||
image:https://travis-ci.org/reiseburo/verspaetung.svg?branch=master["Build Status", link="https://travis-ci.org/reiseburo/verspaetung"]
|
image:https://travis-ci.org/reiseburo/verspaetung.svg?branch=master["Build Status", link="https://travis-ci.org/reiseburo/verspaetung"]
|
||||||
|
|
||||||
image::https://api.bintray.com/packages/reiseburo/systems/verspaetung/images/download.svg[link="https://bintray.com/reiseburo/systems/verspaetung/_latestVersion"]
|
image::https://api.bintray.com/packages/reiseburo/apps/verspaetung/images/download.svg[link="https://bintray.com/reiseburo/apps/verspaetung/_latestVersion"]
|
||||||
|
|
||||||
Verspätung is a small utility which aims to help identify delay of link:http://kafka.apache.org[Kafka] consumers.
|
Verspätung is a small utility which aims to help identify delay of link:http://kafka.apache.org[Kafka] consumers.
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ plugins {
|
||||||
|
|
||||||
group = "com.github.reiseburo"
|
group = "com.github.reiseburo"
|
||||||
description = "A utility for monitoring the delay of Kafka consumers"
|
description = "A utility for monitoring the delay of Kafka consumers"
|
||||||
version = '0.5.0'
|
version = '0.6.0'
|
||||||
mainClassName = 'com.github.reiseburo.verspaetung.Main'
|
mainClassName = 'com.github.reiseburo.verspaetung.Main'
|
||||||
defaultTasks 'check', 'assemble'
|
defaultTasks 'check', 'assemble'
|
||||||
|
|
||||||
|
|
|
@ -8,12 +8,30 @@ class KafkaBroker {
|
||||||
final private Integer port
|
final private Integer port
|
||||||
final private Integer brokerId
|
final private Integer brokerId
|
||||||
|
|
||||||
KafkaBroker(Object jsonObject, Integer brokerId) {
|
KafkaBroker(String host, Integer port, Integer brokerId) {
|
||||||
this.host = jsonObject.host
|
this.host = host
|
||||||
this.port = jsonObject.port
|
this.port = port
|
||||||
this.brokerId = brokerId
|
this.brokerId = brokerId
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
int hashCode() {
|
||||||
|
return this.brokerId
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean equals(Object compared) {
|
||||||
|
if (this.is(compared)) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(compared instanceof KafkaBroker)) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return compared.brokerId == brokerId
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
String toString() {
|
String toString() {
|
||||||
return "broker<${this.brokerId}>@${this.host}:${this.port}"
|
return "broker<${this.brokerId}>@${this.host}:${this.port}"
|
||||||
|
|
|
@ -37,6 +37,7 @@ class KafkaPoller extends Thread {
|
||||||
this.brokerConsumerMap = [:]
|
this.brokerConsumerMap = [:]
|
||||||
this.brokers = []
|
this.brokers = []
|
||||||
this.onDelta = []
|
this.onDelta = []
|
||||||
|
setName('Verspaetung Kafka Poller')
|
||||||
}
|
}
|
||||||
|
|
||||||
/* There are a number of cases where we intentionally swallow stacktraces
|
/* There are a number of cases where we intentionally swallow stacktraces
|
||||||
|
@ -88,7 +89,7 @@ class KafkaPoller extends Thread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings(['CatchException'])
|
@SuppressWarnings('CatchException')
|
||||||
private void dumpMetadata() {
|
private void dumpMetadata() {
|
||||||
LOGGER.debug('dumping meta-data')
|
LOGGER.debug('dumping meta-data')
|
||||||
|
|
||||||
|
@ -157,18 +158,24 @@ class KafkaPoller extends Thread {
|
||||||
/**
|
/**
|
||||||
* Blocking reconnect to the Kafka brokers
|
* Blocking reconnect to the Kafka brokers
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings('CatchException')
|
||||||
private void reconnect() {
|
private void reconnect() {
|
||||||
disconnectConsumers()
|
disconnectConsumers()
|
||||||
LOGGER.info('Creating SimpleConsumer connections for brokers')
|
LOGGER.info('Creating SimpleConsumer connections for brokers {}', this.brokers)
|
||||||
synchronized(this.brokers) {
|
synchronized(this.brokers) {
|
||||||
this.brokers.each { Broker b ->
|
this.brokers.each { Broker broker ->
|
||||||
SimpleConsumer consumer = new SimpleConsumer(b.host,
|
SimpleConsumer consumer = new SimpleConsumer(broker.host,
|
||||||
b.port,
|
broker.port,
|
||||||
KAFKA_TIMEOUT,
|
KAFKA_TIMEOUT,
|
||||||
KAFKA_BUFFER,
|
KAFKA_BUFFER,
|
||||||
KAFKA_CLIENT_ID)
|
KAFKA_CLIENT_ID)
|
||||||
consumer.connect()
|
try {
|
||||||
this.brokerConsumerMap[b.id] = consumer
|
consumer.connect()
|
||||||
|
this.brokerConsumerMap[broker.id] = consumer
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
LOGGER.info('Error connecting cunsumer to {}', broker, e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this.shouldReconnect = false
|
this.shouldReconnect = false
|
||||||
|
@ -181,10 +188,16 @@ class KafkaPoller extends Thread {
|
||||||
this.keepRunning = false
|
this.keepRunning = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings('CatchException')
|
||||||
private void disconnectConsumers() {
|
private void disconnectConsumers() {
|
||||||
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
|
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
|
||||||
LOGGER.info('Disconnecting {}', client)
|
LOGGER.info('Disconnecting {}', client)
|
||||||
client?.disconnect()
|
try {
|
||||||
|
client?.disconnect()
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
LOGGER.info('Error disconnecting {}', client, e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -169,9 +169,6 @@ class Main {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
logger.info('Starting wait loop...')
|
logger.info('Starting wait loop...')
|
||||||
synchronized(this) {
|
|
||||||
wait()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void registerMetricFor(KafkaConsumer consumer,
|
static void registerMetricFor(KafkaConsumer consumer,
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||||
|
|
||||||
|
import groovy.transform.TypeChecked
|
||||||
|
|
||||||
|
import org.slf4j.Logger
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
|
/**
|
||||||
|
* manage a list of brokers. can be online or offline. offline means the
|
||||||
|
* internal list is hidden, i.e. the list() gives you an empty list.
|
||||||
|
*/
|
||||||
|
@TypeChecked
|
||||||
|
class BrokerManager {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(BrokerManager)
|
||||||
|
|
||||||
|
private final List<KafkaBroker> brokers = Collections.synchronizedList([])
|
||||||
|
|
||||||
|
// we start with being offline
|
||||||
|
private boolean offline = true
|
||||||
|
|
||||||
|
void add(KafkaBroker broker) {
|
||||||
|
synchronized(brokers) {
|
||||||
|
if (broker != null && this.brokers.indexOf(broker) == -1) {
|
||||||
|
this.brokers.add(broker)
|
||||||
|
logger.info('broker added: {}', broker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void update(KafkaBroker broker) {
|
||||||
|
synchronized(brokers) {
|
||||||
|
if (broker == null) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if (this.brokers.indexOf(broker) != -1) {
|
||||||
|
this.brokers.remove(broker)
|
||||||
|
}
|
||||||
|
this.brokers.add(broker)
|
||||||
|
logger.info('broker updated: {}', broker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void remove(KafkaBroker broker) {
|
||||||
|
synchronized(brokers) {
|
||||||
|
if (broker != null && this.brokers.remove(broker)) {
|
||||||
|
logger.info('broker removed: {}', broker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO not sure if this is correct - see BrokerTreeWatcher
|
||||||
|
@SuppressWarnings('ConfusingMethodName')
|
||||||
|
void offline() {
|
||||||
|
this.offline = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO not sure if this is correct - see BrokerTreeWatcher
|
||||||
|
void online() {
|
||||||
|
this.offline = false
|
||||||
|
}
|
||||||
|
|
||||||
|
Collection<KafkaBroker> list() {
|
||||||
|
if (this.offline) {
|
||||||
|
[]
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
this.brokers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -6,7 +6,6 @@ import groovy.json.JsonSlurper
|
||||||
import groovy.transform.TypeChecked
|
import groovy.transform.TypeChecked
|
||||||
|
|
||||||
import org.apache.curator.framework.CuratorFramework
|
import org.apache.curator.framework.CuratorFramework
|
||||||
import org.apache.curator.framework.recipes.cache.ChildData
|
|
||||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -16,19 +15,17 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
*/
|
*/
|
||||||
@TypeChecked
|
@TypeChecked
|
||||||
class BrokerTreeWatcher extends AbstractTreeWatcher {
|
class BrokerTreeWatcher extends AbstractTreeWatcher {
|
||||||
static final Integer INVALID_BROKER_ID = -1
|
|
||||||
private static final String BROKERS_PATH = '/brokers/ids'
|
private static final String BROKERS_PATH = '/brokers/ids'
|
||||||
|
|
||||||
private Boolean isTreeInitialized = false
|
|
||||||
private final JsonSlurper json
|
|
||||||
private final List<Closure> onBrokerUpdates
|
private final List<Closure> onBrokerUpdates
|
||||||
private final List<KafkaBroker> brokers
|
private final BrokerManager manager
|
||||||
|
private final PojoFactory factory
|
||||||
|
|
||||||
BrokerTreeWatcher(CuratorFramework client) {
|
BrokerTreeWatcher(CuratorFramework client) {
|
||||||
super(client)
|
super(client)
|
||||||
|
|
||||||
this.json = new JsonSlurper()
|
this.factory = new PojoFactory(new JsonSlurper())
|
||||||
this.brokers = Collections.synchronizedList([])
|
this.manager = new BrokerManager()
|
||||||
this.onBrokerUpdates = []
|
this.onBrokerUpdates = []
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,57 +34,39 @@ class BrokerTreeWatcher extends AbstractTreeWatcher {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process events like NODE_ADDED and NODE_REMOVED to keep an up to date
|
* Process events to keep an up to date list of brokers
|
||||||
* list of brokers
|
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
void childEvent(CuratorFramework client, TreeCacheEvent event) {
|
void childEvent(CuratorFramework client, TreeCacheEvent event) {
|
||||||
/* If we're initialized that means we should have all our brokers in
|
switch (event.type) {
|
||||||
* our internal list already and we can fire an event
|
case TreeCacheEvent.Type.INITIALIZED:
|
||||||
*/
|
this.manager.online()
|
||||||
if (event.type == TreeCacheEvent.Type.INITIALIZED) {
|
break
|
||||||
this.isTreeInitialized = true
|
case TreeCacheEvent.Type.NODE_ADDED:
|
||||||
this.onBrokerUpdates.each { Closure c ->
|
KafkaBroker broker = this.factory.createKafkaBroker(event)
|
||||||
c?.call(this.brokers)
|
this.manager.add(broker)
|
||||||
}
|
break
|
||||||
return
|
case TreeCacheEvent.Type.NODE_UPDATED:
|
||||||
|
KafkaBroker broker = this.factory.createKafkaBroker(event)
|
||||||
|
this.manager.update(broker)
|
||||||
|
break
|
||||||
|
case TreeCacheEvent.Type.NODE_REMOVED:
|
||||||
|
KafkaBroker broker = this.factory.createKafkaBroker(event)
|
||||||
|
this.manager.remove(broker)
|
||||||
|
break
|
||||||
|
// TODO these 3 events might come with path which can be mapped
|
||||||
|
// to a specific broker
|
||||||
|
case TreeCacheEvent.Type.CONNECTION_LOST:
|
||||||
|
case TreeCacheEvent.Type.CONNECTION_SUSPENDED:
|
||||||
|
this.manager.offline()
|
||||||
|
break
|
||||||
|
case TreeCacheEvent.Type.CONNECTION_RECONNECTED:
|
||||||
|
this.manager.online()
|
||||||
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if (event.type != TreeCacheEvent.Type.NODE_ADDED) {
|
this.onBrokerUpdates.each { Closure c ->
|
||||||
return
|
c?.call(this.manager.list())
|
||||||
}
|
}
|
||||||
|
|
||||||
ChildData nodeData = event.data
|
|
||||||
Integer brokerId = brokerIdFromPath(nodeData.path)
|
|
||||||
|
|
||||||
if (brokerId == INVALID_BROKER_ID) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8'))
|
|
||||||
|
|
||||||
this.brokers << new KafkaBroker(brokerData, brokerId)
|
|
||||||
|
|
||||||
if (this.isTreeInitialized) {
|
|
||||||
this.onBrokerUpdates.each { Closure c ->
|
|
||||||
c?.call(this.brokers)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Process a path string from Zookeeper for the Kafka broker's ID
|
|
||||||
*
|
|
||||||
* We're expecting paths like: /brokers/ids/1231524312
|
|
||||||
*/
|
|
||||||
Integer brokerIdFromPath(String path) {
|
|
||||||
List<String> pathParts = path?.split('/') as List
|
|
||||||
|
|
||||||
if ((pathParts == null) ||
|
|
||||||
(pathParts.size() != 4)) {
|
|
||||||
return INVALID_BROKER_ID
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Integer(pathParts[-1])
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import groovy.transform.TypeChecked
|
||||||
|
|
||||||
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
|
/**
|
||||||
|
* parsing TreeCacheEvents and provide helper method to access its data in typed manner
|
||||||
|
*/
|
||||||
|
@TypeChecked
|
||||||
|
class EventData {
|
||||||
|
|
||||||
|
private final String data
|
||||||
|
private final List<String> pathParts
|
||||||
|
|
||||||
|
EventData(TreeCacheEvent event) {
|
||||||
|
ChildData data = event.data
|
||||||
|
this.pathParts = data.path == null ? null : (data.path.split('/') as List)
|
||||||
|
this.data = data.data == null ? null : new String(data.data, 'UTF-8')
|
||||||
|
}
|
||||||
|
|
||||||
|
Integer asInteger() {
|
||||||
|
new Integer(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
String asString() {
|
||||||
|
data
|
||||||
|
}
|
||||||
|
|
||||||
|
Integer pathPartsSize() {
|
||||||
|
pathParts.size()
|
||||||
|
}
|
||||||
|
|
||||||
|
Integer getPathPartAsInteger(int pos) {
|
||||||
|
if (pathParts.size() <= pos) {
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
new Integer(pathParts[pos])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||||
|
|
||||||
|
import groovy.json.JsonSlurper
|
||||||
|
|
||||||
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
|
/**
|
||||||
|
* factory creating Pojo's from TreeCacheEvents
|
||||||
|
*/
|
||||||
|
class PojoFactory {
|
||||||
|
|
||||||
|
private final JsonSlurper json
|
||||||
|
|
||||||
|
PojoFactory(JsonSlurper json) {
|
||||||
|
this.json = json
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* converts an treeCacheEvent into a KafkaBroker. the caller
|
||||||
|
* is responsible for calling the factory method with the
|
||||||
|
* right data. i.e. path starts with /brokers/ids. the implementation
|
||||||
|
* just uses whatever it is given to create a KafkaBroker object
|
||||||
|
*/
|
||||||
|
KafkaBroker createKafkaBroker(TreeCacheEvent event) {
|
||||||
|
EventData data = new EventData(event)
|
||||||
|
// We're expecting paths like: /brokers/ids/1231524312
|
||||||
|
Integer id = data.getPathPartAsInteger(3)
|
||||||
|
if (id == null) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
String json = data.asString()
|
||||||
|
if (json == null) {
|
||||||
|
new KafkaBroker('', 0, id)
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Object payload = this.json.parseText(json)
|
||||||
|
new KafkaBroker(payload.host, payload.port, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,95 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import spock.lang.*
|
||||||
|
|
||||||
|
import groovy.json.JsonSlurper
|
||||||
|
|
||||||
|
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||||
|
|
||||||
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
|
class BrokerManagerSpec extends Specification {
|
||||||
|
BrokerManager manager = new BrokerManager()
|
||||||
|
|
||||||
|
def "new instance is offline, i.e. has empty list of brokers"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
def "new instance is offline after add broker"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
manager.add(broker)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
def "list shows brokers after getting online"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
KafkaBroker broker2 = new KafkaBroker('', 0, 123)
|
||||||
|
manager.add(broker)
|
||||||
|
manager.online()
|
||||||
|
manager.remove(broker2)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 1
|
||||||
|
}
|
||||||
|
|
||||||
|
def "can remove brokers based on its id"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
|
||||||
|
manager.add(broker)
|
||||||
|
manager.online()
|
||||||
|
manager.remove(broker2)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
def "can update brokers"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
|
||||||
|
manager.online()
|
||||||
|
manager.add(broker)
|
||||||
|
manager.update(broker2)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 1
|
||||||
|
manager.list().first().host == 'localhost'
|
||||||
|
}
|
||||||
|
|
||||||
|
def "can go offline anytime"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
|
||||||
|
manager.online()
|
||||||
|
manager.add(broker)
|
||||||
|
manager.add(broker2)
|
||||||
|
manager.offline()
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
def "can go offline and online again"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
|
||||||
|
manager.online()
|
||||||
|
manager.add(broker)
|
||||||
|
manager.add(broker2)
|
||||||
|
manager.offline()
|
||||||
|
manager.online()
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 2
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,29 +0,0 @@
|
||||||
package com.github.reiseburo.verspaetung.zk
|
|
||||||
|
|
||||||
import spock.lang.*
|
|
||||||
|
|
||||||
class BrokerTreeWatcherSpec extends Specification {
|
|
||||||
BrokerTreeWatcher watcher
|
|
||||||
|
|
||||||
def setup() {
|
|
||||||
this.watcher = new BrokerTreeWatcher()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "brokerIdFromPath() should return the right ID with a valid path"() {
|
|
||||||
given:
|
|
||||||
String path = "/brokers/ids/1337"
|
|
||||||
|
|
||||||
expect:
|
|
||||||
watcher.brokerIdFromPath(path) == 1337
|
|
||||||
}
|
|
||||||
|
|
||||||
def "brokerIdFromPath() should return -1 on null paths"() {
|
|
||||||
expect:
|
|
||||||
watcher.brokerIdFromPath(null) == -1
|
|
||||||
}
|
|
||||||
|
|
||||||
def "brokerIdFromPath() should return -1 on empty/invalid paths"() {
|
|
||||||
expect:
|
|
||||||
watcher.brokerIdFromPath('/spock/ed') == -1
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import spock.lang.*
|
||||||
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
|
class EventDataSpec extends Specification {
|
||||||
|
EventData data
|
||||||
|
|
||||||
|
def "converts TreeCacheEvents to EventData"() {
|
||||||
|
given:
|
||||||
|
String path = "/brokers/ids/1337"
|
||||||
|
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||||
|
new ChildData(path, null, "be happy".bytes)))
|
||||||
|
|
||||||
|
expect:
|
||||||
|
data.getPathPartAsInteger(3) == 1337
|
||||||
|
data.asString() == 'be happy'
|
||||||
|
data.pathPartsSize() == 4
|
||||||
|
data.getPathPartAsInteger(4) == null
|
||||||
|
}
|
||||||
|
|
||||||
|
def "converts TreeCacheEvents to EventData with integer payload and no path"() {
|
||||||
|
given:
|
||||||
|
String path = "/brokers/ids/1337"
|
||||||
|
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||||
|
new ChildData("/", null, "42".bytes)))
|
||||||
|
|
||||||
|
expect:
|
||||||
|
data.asString() == '42'
|
||||||
|
data.asInteger() == 42
|
||||||
|
data.pathPartsSize() == 0
|
||||||
|
data.getPathPartAsInteger(0) == null
|
||||||
|
}
|
||||||
|
|
||||||
|
def "converts TreeCacheEvents to EventData with no payload"() {
|
||||||
|
given:
|
||||||
|
String path = "/brokers/ids/1337"
|
||||||
|
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||||
|
new ChildData("/123", null, null)))
|
||||||
|
|
||||||
|
expect:
|
||||||
|
data.asString() == null
|
||||||
|
data.pathPartsSize() == 2
|
||||||
|
data.getPathPartAsInteger(1) == 123
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,52 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import spock.lang.*
|
||||||
|
|
||||||
|
import groovy.json.JsonSlurper
|
||||||
|
|
||||||
|
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||||
|
|
||||||
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
|
class PojoFactorySpec extends Specification {
|
||||||
|
PojoFactory factory = new PojoFactory(new JsonSlurper())
|
||||||
|
|
||||||
|
def "create KafkaBroker from TreeCacheEvent"() {
|
||||||
|
given:
|
||||||
|
String path = "/brokers/ids/1337"
|
||||||
|
String payload = "{\"host\":\"localhost\",\"port\":9092}"
|
||||||
|
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||||
|
new ChildData(path, null, payload.bytes))
|
||||||
|
KafkaBroker broker = factory.createKafkaBroker(event)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
broker.brokerId == 1337
|
||||||
|
broker.host == 'localhost'
|
||||||
|
broker.port == 9092
|
||||||
|
}
|
||||||
|
|
||||||
|
def "create KafkaBroker from TreeCacheEvent without brokerId on path"() {
|
||||||
|
given:
|
||||||
|
String path = "/brokers/ids"
|
||||||
|
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||||
|
new ChildData(path, null, ''.bytes))
|
||||||
|
KafkaBroker broker = factory.createKafkaBroker(event)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
broker == null
|
||||||
|
}
|
||||||
|
|
||||||
|
def "create KafkaBroker from TreeCacheEvent without payload"() {
|
||||||
|
given:
|
||||||
|
String path = "/brokers/ids/123"
|
||||||
|
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||||
|
new ChildData(path, null, null))
|
||||||
|
KafkaBroker broker = factory.createKafkaBroker(event)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
broker.brokerId == 123
|
||||||
|
broker.host == ''
|
||||||
|
broker.port == 0
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue