Compare commits
22 Commits
Author | SHA1 | Date |
---|---|---|
R. Tyler Croy | 148359d816 | |
R. Tyler Croy | 0bac696847 | |
R. Tyler Croy | dc1cfab361 | |
R. Tyler Croy | 324b3faba7 | |
R. Tyler Croy | f901cba7e1 | |
R. Tyler Croy | 48a258c3bc | |
Christian Meier | 7a6ef71eef | |
Christian Meier | 618aaf915e | |
Christian Meier | 66c0c332bb | |
Christian Meier | 4c655995f9 | |
Christian Meier | dc918a700e | |
R. Tyler Croy | 416168cbff | |
R. Tyler Croy | 68ce9cd72e | |
R. Tyler Croy | 9af40ccdec | |
R. Tyler Croy | 1525ae30ac | |
R. Tyler Croy | fb4f152fed | |
R. Tyler Croy | f15aeaee0c | |
Christian Meier | 98af950498 | |
Christian Meier | 439e8f635e | |
Christian Meier | 45d98cb5f6 | |
Christian Meier | 59c8e79f4e | |
R. Tyler Croy | 07bf319972 |
|
@ -14,7 +14,6 @@ script:
|
|||
|
||||
env:
|
||||
global:
|
||||
secure: MzcJafov6+fztyym0hZFTxjirTAgVFqFRO4pSSoDUZV71jHBYRKLmQxiaYpqdl9d7Q7Jz7UfNZRSisNwZQdeZjs0B9yJwy9m1mDlJaUXIWN/xzW04qPnZ5zxh1yJHK+UHIw5G2qRZSE42m9G3TSRBlUz6OMk+tr2UYErfnKzcsc=
|
||||
- secure: X71NKVXJjyG1C6/fZUTxdQ6HwAcaNRWSc0m/VRXsaiZwgXjIr+C+Uz9X4iu6Q52YAc+7CrvftAbIxgQhf+TBtvvnGeZgWXuYqf6Cx1weyL/6xsttOLsEGdtHU9jvrtw4tHRXxu/6F+8QAot8VRwUsCB4IL5Y3epshddbk+/1d9Q=
|
||||
|
||||
after_success:
|
||||
"./gradlew bintrayUpload -PbintrayUser=lookouteng -PbintrayKey=${BINTRAY_KEY}"
|
||||
after_success: "./gradlew bintrayUpload -PbintrayUser=rtyler -PbintrayKey=${BINTRAY_KEY}"
|
||||
|
|
|
@ -20,4 +20,4 @@ NOTE: This is mostly meant for the developer team.
|
|||
Currently releases can be produced by simply pushing a Git tag to this GitHub
|
||||
repository. This will cause Travis CI to build and test the tag, which if it is
|
||||
successful, will automatically publish to
|
||||
link:https://bintray.com/lookout/systems/verspaetung[Bintray].
|
||||
link:https://bintray.com/reiseburo/apps/verspaetung[Bintray].
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* This is a multibranch workflow file for defining how this project should be
|
||||
* built, tested and deployed
|
||||
*/
|
||||
|
||||
node {
|
||||
stage 'Clean workspace'
|
||||
/* Running on a fresh Docker instance makes this redundant, but just in
|
||||
* case the host isn't configured to give us a new Docker image for every
|
||||
* build, make sure we clean things before we do anything
|
||||
*/
|
||||
deleteDir()
|
||||
|
||||
|
||||
stage 'Checkout source'
|
||||
/*
|
||||
* Represents the SCM configuration in a "Workflow from SCM" project build. Use checkout
|
||||
* scm to check out sources matching Jenkinsfile with the SCM details from
|
||||
* the build that is executing this Jenkinsfile.
|
||||
*
|
||||
* when not in multibranch: https://issues.jenkins-ci.org/browse/JENKINS-31386
|
||||
*/
|
||||
checkout scm
|
||||
|
||||
|
||||
stage 'Build and test'
|
||||
/* if we can't install everything we need for Ruby in less than 15 minutes
|
||||
* we might as well just give up
|
||||
*/
|
||||
timeout(30) {
|
||||
sh './gradlew -iS'
|
||||
}
|
||||
|
||||
stage 'Capture test results and artifacts'
|
||||
step([$class: 'JUnitResultArchiver', testResults: 'build/test-results/**/*.xml'])
|
||||
step([$class: 'ArtifactArchiver', artifacts: 'build/libs/*.jar', fingerprint: true])
|
||||
}
|
||||
|
||||
// vim: ft=groovy
|
|
@ -1,4 +1,4 @@
|
|||
*Copyright (c) 2015 Lookout, Inc*
|
||||
*Copyright (c) 2015 Lookout, Inc, R. Tyler Croy*
|
||||
|
||||
MIT License
|
||||
|
||||
|
|
11
README.adoc
11
README.adoc
|
@ -1,6 +1,6 @@
|
|||
image:https://travis-ci.org/lookout/verspaetung.svg?branch=master["Build Status", link="https://travis-ci.org/lookout/verspaetung"]
|
||||
image:https://travis-ci.org/reiseburo/verspaetung.svg?branch=master["Build Status", link="https://travis-ci.org/reiseburo/verspaetung"]
|
||||
|
||||
image::https://api.bintray.com/packages/lookout/systems/verspaetung/images/download.svg[link="https://bintray.com/lookout/systems/verspaetung/_latestVersion"]
|
||||
image::https://api.bintray.com/packages/reiseburo/apps/verspaetung/images/download.svg[link="https://bintray.com/reiseburo/apps/verspaetung/_latestVersion"]
|
||||
|
||||
Verspätung is a small utility which aims to help identify delay of link:http://kafka.apache.org[Kafka] consumers.
|
||||
|
||||
|
@ -15,13 +15,20 @@ reports it to statsd.
|
|||
|
||||
% java -jar verspaetung-*-all.jar --help
|
||||
usage: verspaetung
|
||||
-d,--delay <DELAY> Seconds to delay between reporting metrics to
|
||||
the metrics receiver (defaults: 5s)
|
||||
-H,--statsd-host <STATSD> Hostname for a statsd instance (defaults to
|
||||
localhost)
|
||||
-n,--dry-run Disable reporting to a statsd host
|
||||
-p,--statsd-port <PORT> Port for the statsd instance (defaults to
|
||||
8125)
|
||||
--prefix <PREFIX> Prefix all metrics with PREFIX before they're
|
||||
reported (e.g. PREFIX.verspaetung.mytopic)
|
||||
-s,--storm Watch Storm KafkaSpout offsets (under
|
||||
/kafka_spout)
|
||||
-x,--exclude <EXCLUDES> Regular expression for consumer groups to
|
||||
exclude from reporting (can be declared
|
||||
multiple times)
|
||||
-z,--zookeeper <HOSTS> Comma separated list of Zookeeper hosts (e.g.
|
||||
localhost:2181)
|
||||
|
||||
|
|
20
build.gradle
20
build.gradle
|
@ -9,10 +9,10 @@ plugins {
|
|||
id 'application'
|
||||
}
|
||||
|
||||
group = "com.github.lookout"
|
||||
group = "com.github.reiseburo"
|
||||
description = "A utility for monitoring the delay of Kafka consumers"
|
||||
version = '0.4.0'
|
||||
mainClassName = 'com.github.lookout.verspaetung.Main'
|
||||
version = '0.6.0'
|
||||
mainClassName = 'com.github.reiseburo.verspaetung.Main'
|
||||
defaultTasks 'check', 'assemble'
|
||||
|
||||
/* Ensure we properly build for JDK7 still */
|
||||
|
@ -142,8 +142,16 @@ artifacts {
|
|||
archives shadowJar
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
main {
|
||||
groovy {
|
||||
exclude '**/*.sw*'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* https://github.com/lookout/verspaetung/issues/28
|
||||
* https://github.com/reiseburo/verspaetung/issues/28
|
||||
*
|
||||
* disable the distZip and distTar tasks since we only need the shadow jar
|
||||
*/
|
||||
|
@ -171,8 +179,8 @@ bintray {
|
|||
configurations = ['archives']
|
||||
|
||||
pkg {
|
||||
userOrg = 'lookout'
|
||||
repo = 'systems'
|
||||
userOrg = 'reiseburo'
|
||||
repo = 'apps'
|
||||
name = 'verspaetung'
|
||||
labels = []
|
||||
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
|
||||
zookeeper:
|
||||
image: wurstmeister/zookeeper
|
||||
ports:
|
||||
- 2181:2181
|
||||
kafka:
|
||||
image: wurstmeister/kafka
|
||||
ports:
|
||||
- 9092:9092
|
||||
links:
|
||||
- zookeeper:zk
|
||||
environment:
|
||||
# Only using one node, so we're fine with localhost
|
||||
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
|
@ -1,21 +0,0 @@
|
|||
package com.github.lookout.verspaetung
|
||||
|
||||
/**
|
||||
* POJO containing the necessary information to model a Kafka broker
|
||||
*/
|
||||
class KafkaBroker {
|
||||
final private String host
|
||||
final private Integer port
|
||||
final private Integer brokerId
|
||||
|
||||
KafkaBroker(Object jsonObject, Integer brokerId) {
|
||||
this.host = jsonObject.host
|
||||
this.port = jsonObject.port
|
||||
this.brokerId = brokerId
|
||||
}
|
||||
|
||||
@Override
|
||||
String toString() {
|
||||
return "broker<${this.brokerId}>@${this.host}:${this.port}"
|
||||
}
|
||||
}
|
|
@ -1,88 +0,0 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
|
||||
import com.github.lookout.verspaetung.KafkaBroker
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
/**
|
||||
* The BrokerTreeWatcher is a kind of watcher whose sole purpose is
|
||||
* to watch the segment of the Zookeeper tree where Kafka stores broker
|
||||
* information
|
||||
*/
|
||||
@TypeChecked
|
||||
class BrokerTreeWatcher extends AbstractTreeWatcher {
|
||||
static final Integer INVALID_BROKER_ID = -1
|
||||
private static final String BROKERS_PATH = '/brokers/ids'
|
||||
|
||||
private Boolean isTreeInitialized = false
|
||||
private final JsonSlurper json
|
||||
private final List<Closure> onBrokerUpdates
|
||||
private final List<KafkaBroker> brokers
|
||||
|
||||
BrokerTreeWatcher(CuratorFramework client) {
|
||||
super(client)
|
||||
|
||||
this.json = new JsonSlurper()
|
||||
this.brokers = []
|
||||
this.onBrokerUpdates = []
|
||||
}
|
||||
|
||||
String zookeeperPath() {
|
||||
return BROKERS_PATH
|
||||
}
|
||||
|
||||
/**
|
||||
* Process events like NODE_ADDED and NODE_REMOVED to keep an up to date
|
||||
* list of brokers
|
||||
*/
|
||||
@Override
|
||||
void childEvent(CuratorFramework client, TreeCacheEvent event) {
|
||||
/* If we're initialized that means we should have all our brokers in
|
||||
* our internal list already and we can fire an event
|
||||
*/
|
||||
if (event.type == TreeCacheEvent.Type.INITIALIZED) {
|
||||
this.isTreeInitialized = true
|
||||
List threadsafeBrokers = Collections.synchronizedList(this.brokers)
|
||||
this.onBrokerUpdates.each { Closure c ->
|
||||
c?.call(threadsafeBrokers)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if (event.type != TreeCacheEvent.Type.NODE_ADDED) {
|
||||
return
|
||||
}
|
||||
|
||||
ChildData nodeData = event.data
|
||||
Integer brokerId = brokerIdFromPath(nodeData.path)
|
||||
|
||||
if (brokerId == INVALID_BROKER_ID) {
|
||||
return
|
||||
}
|
||||
|
||||
Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8'))
|
||||
|
||||
this.brokers << new KafkaBroker(brokerData, brokerId)
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a path string from Zookeeper for the Kafka broker's ID
|
||||
*
|
||||
* We're expecting paths like: /brokers/ids/1231524312
|
||||
*/
|
||||
Integer brokerIdFromPath(String path) {
|
||||
List<String> pathParts = path?.split('/') as List
|
||||
|
||||
if ((pathParts == null) ||
|
||||
(pathParts.size() != 4)) {
|
||||
return INVALID_BROKER_ID
|
||||
}
|
||||
|
||||
return new Integer(pathParts[-1])
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package com.github.reiseburo.verspaetung
|
||||
|
||||
/**
|
||||
* abstract the logic on how to reduce the polling speed and get back to
|
||||
* to full speed.
|
||||
*/
|
||||
class Delay {
|
||||
static final Integer POLLER_DELAY_MIN = (1 * 1000)
|
||||
static final Integer POLLER_DELAY_MAX = (2048 * 1000) // about half an hour
|
||||
|
||||
private Integer delay = POLLER_DELAY_MIN
|
||||
|
||||
boolean reset() {
|
||||
if (delay != POLLER_DELAY_MIN) {
|
||||
delay = POLLER_DELAY_MIN
|
||||
true
|
||||
}
|
||||
else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
boolean slower() {
|
||||
if (delay < POLLER_DELAY_MAX) {
|
||||
delay += delay
|
||||
true
|
||||
}
|
||||
else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
Integer value() {
|
||||
delay
|
||||
}
|
||||
|
||||
String toString() {
|
||||
"Delay[ ${delay / 1000} sec ]"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package com.github.reiseburo.verspaetung
|
||||
|
||||
/**
|
||||
* POJO containing the necessary information to model a Kafka broker
|
||||
*/
|
||||
class KafkaBroker {
|
||||
final private String host
|
||||
final private Integer port
|
||||
final private Integer brokerId
|
||||
|
||||
KafkaBroker(String host, Integer port, Integer brokerId) {
|
||||
this.host = host
|
||||
this.port = port
|
||||
this.brokerId = brokerId
|
||||
}
|
||||
|
||||
@Override
|
||||
int hashCode() {
|
||||
return this.brokerId
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean equals(Object compared) {
|
||||
if (this.is(compared)) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (!(compared instanceof KafkaBroker)) {
|
||||
return false
|
||||
}
|
||||
|
||||
return compared.brokerId == brokerId
|
||||
}
|
||||
|
||||
@Override
|
||||
String toString() {
|
||||
return "broker<${this.brokerId}>@${this.host}:${this.port}"
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung
|
||||
package com.github.reiseburo.verspaetung
|
||||
|
||||
/**
|
||||
* POJO containing the necessary information to model a Kafka consumers
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung
|
||||
package com.github.reiseburo.verspaetung
|
||||
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
@ -17,7 +17,7 @@ import scala.collection.JavaConversions
|
|||
* meta-data for them
|
||||
*/
|
||||
class KafkaPoller extends Thread {
|
||||
private static final Integer POLLER_DELAY = (1 * 1000)
|
||||
|
||||
private static final String KAFKA_CLIENT_ID = 'VerspaetungClient'
|
||||
private static final Integer KAFKA_TIMEOUT = (5 * 1000)
|
||||
private static final Integer KAFKA_BUFFER = (100 * 1024)
|
||||
|
@ -29,7 +29,7 @@ class KafkaPoller extends Thread {
|
|||
private final AbstractMap<TopicPartition, Long> topicOffsetMap
|
||||
private final List<Closure> onDelta
|
||||
private final AbstractSet<String> currentTopics
|
||||
private List<Broker> brokers
|
||||
private final List<Broker> brokers
|
||||
|
||||
KafkaPoller(AbstractMap map, AbstractSet topicSet) {
|
||||
this.topicOffsetMap = map
|
||||
|
@ -37,6 +37,7 @@ class KafkaPoller extends Thread {
|
|||
this.brokerConsumerMap = [:]
|
||||
this.brokers = []
|
||||
this.onDelta = []
|
||||
setName('Verspaetung Kafka Poller')
|
||||
}
|
||||
|
||||
/* There are a number of cases where we intentionally swallow stacktraces
|
||||
|
@ -48,7 +49,8 @@ class KafkaPoller extends Thread {
|
|||
@SuppressWarnings(['LoggingSwallowsStacktrace', 'CatchException'])
|
||||
void run() {
|
||||
LOGGER.info('Starting wait loop')
|
||||
|
||||
Delay delay = new Delay()
|
||||
LOGGER.error('polling ' + delay)
|
||||
while (keepRunning) {
|
||||
LOGGER.debug('poll loop')
|
||||
|
||||
|
@ -62,21 +64,33 @@ class KafkaPoller extends Thread {
|
|||
if (this.currentTopics.size() > 0) {
|
||||
try {
|
||||
dumpMetadata()
|
||||
if (delay.reset()) {
|
||||
LOGGER.error('back to normal ' + delay)
|
||||
}
|
||||
}
|
||||
catch (KafkaException kex) {
|
||||
LOGGER.error('Failed to interact with Kafka: {}', kex.message)
|
||||
slower(delay)
|
||||
}
|
||||
catch (Exception ex) {
|
||||
LOGGER.error('Failed to fetch and dump Kafka metadata', ex)
|
||||
slower(delay)
|
||||
}
|
||||
}
|
||||
|
||||
Thread.sleep(POLLER_DELAY)
|
||||
Thread.sleep(delay.value())
|
||||
}
|
||||
disconnectConsumers()
|
||||
}
|
||||
|
||||
private void slower(Delay delay) {
|
||||
if (delay.slower()) {
|
||||
LOGGER.error('using ' + delay)
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings(['CatchException'])
|
||||
void dumpMetadata() {
|
||||
@SuppressWarnings('CatchException')
|
||||
private void dumpMetadata() {
|
||||
LOGGER.debug('dumping meta-data')
|
||||
|
||||
Object metadata = fetchMetadataForCurrentTopics()
|
||||
|
@ -101,7 +115,7 @@ class KafkaPoller extends Thread {
|
|||
* The 'metadata' is the expected return from
|
||||
* kafka.client.ClientUtils.fetchTopicMetadata
|
||||
*/
|
||||
void withTopicsAndPartitions(Object metadata, Closure closure) {
|
||||
private void withTopicsAndPartitions(Object metadata, Closure closure) {
|
||||
withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f ->
|
||||
withScalaCollection(f.partitionsMetadata).each { p ->
|
||||
TopicPartition tp = new TopicPartition(f.topic, p.partitionId)
|
||||
|
@ -113,7 +127,7 @@ class KafkaPoller extends Thread {
|
|||
/**
|
||||
* Fetch the leader metadata and update our data structures
|
||||
*/
|
||||
void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) {
|
||||
private void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) {
|
||||
Integer leaderId = partitionMetadata.leader.get()?.id
|
||||
Integer partitionId = partitionMetadata.partitionId
|
||||
|
||||
|
@ -122,7 +136,7 @@ class KafkaPoller extends Thread {
|
|||
this.topicOffsetMap[tp] = offset
|
||||
}
|
||||
|
||||
Long latestFromLeader(Integer leaderId, String topic, Integer partition) {
|
||||
private Long latestFromLeader(Integer leaderId, String topic, Integer partition) {
|
||||
SimpleConsumer consumer = this.brokerConsumerMap[leaderId]
|
||||
|
||||
/* If we don't have a proper SimpleConsumer instance (e.g. null) then
|
||||
|
@ -137,24 +151,32 @@ class KafkaPoller extends Thread {
|
|||
return consumer.earliestOrLatestOffset(topicAndPart, -1, 0)
|
||||
}
|
||||
|
||||
Iterable withScalaCollection(scala.collection.Iterable iter) {
|
||||
private Iterable withScalaCollection(scala.collection.Iterable iter) {
|
||||
return JavaConversions.asJavaIterable(iter)
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocking reconnect to the Kafka brokers
|
||||
*/
|
||||
void reconnect() {
|
||||
@SuppressWarnings('CatchException')
|
||||
private void reconnect() {
|
||||
disconnectConsumers()
|
||||
LOGGER.info('Creating SimpleConsumer connections for brokers')
|
||||
this.brokers.each { Broker b ->
|
||||
SimpleConsumer consumer = new SimpleConsumer(b.host,
|
||||
b.port,
|
||||
KAFKA_TIMEOUT,
|
||||
KAFKA_BUFFER,
|
||||
KAFKA_CLIENT_ID)
|
||||
consumer.connect()
|
||||
this.brokerConsumerMap[b.id] = consumer
|
||||
LOGGER.info('Creating SimpleConsumer connections for brokers {}', this.brokers)
|
||||
synchronized(this.brokers) {
|
||||
this.brokers.each { Broker broker ->
|
||||
SimpleConsumer consumer = new SimpleConsumer(broker.host,
|
||||
broker.port,
|
||||
KAFKA_TIMEOUT,
|
||||
KAFKA_BUFFER,
|
||||
KAFKA_CLIENT_ID)
|
||||
try {
|
||||
consumer.connect()
|
||||
this.brokerConsumerMap[broker.id] = consumer
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOGGER.info('Error connecting cunsumer to {}', broker, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
this.shouldReconnect = false
|
||||
}
|
||||
|
@ -164,13 +186,18 @@ class KafkaPoller extends Thread {
|
|||
*/
|
||||
void die() {
|
||||
this.keepRunning = false
|
||||
disconnectConsumers()
|
||||
}
|
||||
|
||||
@SuppressWarnings('CatchException')
|
||||
private void disconnectConsumers() {
|
||||
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
|
||||
LOGGER.info('Disconnecting {}', client)
|
||||
client?.disconnect()
|
||||
try {
|
||||
client?.disconnect()
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOGGER.info('Error disconnecting {}', client, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -178,8 +205,11 @@ class KafkaPoller extends Thread {
|
|||
* Store a new list of KafkaBroker objects and signal a reconnection
|
||||
*/
|
||||
void refresh(List<KafkaBroker> brokers) {
|
||||
this.brokers = brokers.collect { KafkaBroker b ->
|
||||
new Broker(b.brokerId, b.host, b.port)
|
||||
synchronized(this.brokers) {
|
||||
this.brokers.clear()
|
||||
this.brokers.addAll(brokers.collect { KafkaBroker b ->
|
||||
new Broker(b.brokerId, b.host, b.port)
|
||||
})
|
||||
}
|
||||
this.shouldReconnect = true
|
||||
}
|
||||
|
@ -189,7 +219,9 @@ class KafkaPoller extends Thread {
|
|||
* scala underpinnings
|
||||
*/
|
||||
private scala.collection.immutable.Seq getBrokersSeq() {
|
||||
return JavaConversions.asScalaBuffer(this.brokers).toList()
|
||||
synchronized(this.brokers) {
|
||||
return JavaConversions.asScalaBuffer(this.brokers).toList()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
|
@ -1,10 +1,10 @@
|
|||
package com.github.lookout.verspaetung
|
||||
package com.github.reiseburo.verspaetung
|
||||
|
||||
import com.github.lookout.verspaetung.zk.BrokerTreeWatcher
|
||||
import com.github.lookout.verspaetung.zk.KafkaSpoutTreeWatcher
|
||||
import com.github.lookout.verspaetung.zk.StandardTreeWatcher
|
||||
import com.github.lookout.verspaetung.metrics.ConsumerGauge
|
||||
import com.github.lookout.verspaetung.metrics.HeartbeatGauge
|
||||
import com.github.reiseburo.verspaetung.zk.BrokerTreeWatcher
|
||||
import com.github.reiseburo.verspaetung.zk.KafkaSpoutTreeWatcher
|
||||
import com.github.reiseburo.verspaetung.zk.StandardTreeWatcher
|
||||
import com.github.reiseburo.verspaetung.metrics.ConsumerGauge
|
||||
import com.github.reiseburo.verspaetung.metrics.HeartbeatGauge
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ConcurrentSkipListSet
|
||||
|
@ -126,8 +126,9 @@ class Main {
|
|||
/* Assuming that most people aren't needing to run Storm-based watchers
|
||||
* as well
|
||||
*/
|
||||
KafkaSpoutTreeWatcher stormWatcher = null
|
||||
if (cli.hasOption('s')) {
|
||||
KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client,
|
||||
stormWatcher = new KafkaSpoutTreeWatcher(client,
|
||||
watchedTopics,
|
||||
consumerOffsets)
|
||||
stormWatcher.onConsumerData << gaugeRegistrar
|
||||
|
@ -155,10 +156,19 @@ class Main {
|
|||
/* Start the reporter if we've got it */
|
||||
reporter?.start(delayInSeconds, TimeUnit.SECONDS)
|
||||
|
||||
// shutdown threads
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
public void run() {
|
||||
Main.logger.info("showdown threads")
|
||||
poller.die()
|
||||
consumerWatcher.close()
|
||||
if (stormWatcher != null) {
|
||||
stormWatcher.close()
|
||||
}
|
||||
poller.join()
|
||||
}
|
||||
});
|
||||
logger.info('Starting wait loop...')
|
||||
synchronized(this) {
|
||||
wait()
|
||||
}
|
||||
}
|
||||
|
||||
static void registerMetricFor(KafkaConsumer consumer,
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung
|
||||
package com.github.reiseburo.verspaetung
|
||||
|
||||
/**
|
||||
* Simple container for Kafka topic names and partition IDs
|
|
@ -1,11 +1,11 @@
|
|||
package com.github.lookout.verspaetung.metrics
|
||||
package com.github.reiseburo.verspaetung.metrics
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
import groovy.transform.TypeChecked
|
||||
import org.coursera.metrics.datadog.Tagged
|
||||
|
||||
import com.github.lookout.verspaetung.KafkaConsumer
|
||||
import com.github.lookout.verspaetung.TopicPartition
|
||||
import com.github.reiseburo.verspaetung.KafkaConsumer
|
||||
import com.github.reiseburo.verspaetung.TopicPartition
|
||||
|
||||
/**
|
||||
* Dropwizard Metrics Gauge for reporting the value of a given KafkaConsumer
|
||||
|
@ -38,7 +38,7 @@ class ConsumerGauge implements Gauge<Integer>, Tagged {
|
|||
* Returning the maximum value of the computation and zero, there are
|
||||
* some cases where we can be "behind" on the Kafka latest offset
|
||||
* polling and this could result in an erroneous negative value. See:
|
||||
* <https://github.com/lookout/verspaetung/issues/25> for more details
|
||||
* <https://github.com/reiseburo/verspaetung/issues/25> for more details
|
||||
*/
|
||||
return Math.max(0,
|
||||
((Integer)this.topics[topicPartition]) - this.consumers[consumer])
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung.metrics
|
||||
package com.github.reiseburo.verspaetung.metrics
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
|
|
@ -1,6 +1,6 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import com.github.lookout.verspaetung.KafkaConsumer
|
||||
import com.github.reiseburo.verspaetung.KafkaConsumer
|
||||
|
||||
import groovy.transform.TypeChecked
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
|
@ -47,5 +47,13 @@ abstract class AbstractTreeWatcher implements TreeCacheListener {
|
|||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Close our internal cache and return ourselves for API cleanliness
|
||||
*/
|
||||
AbstractTreeWatcher close() {
|
||||
this.cache?.close()
|
||||
return this
|
||||
}
|
||||
|
||||
abstract void childEvent(CuratorFramework client, TreeCacheEvent event)
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
/**
|
||||
* manage a list of brokers. can be online or offline. offline means the
|
||||
* internal list is hidden, i.e. the list() gives you an empty list.
|
||||
*/
|
||||
@TypeChecked
|
||||
class BrokerManager {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(BrokerManager)
|
||||
|
||||
private final List<KafkaBroker> brokers = Collections.synchronizedList([])
|
||||
|
||||
// we start with being offline
|
||||
private boolean offline = true
|
||||
|
||||
void add(KafkaBroker broker) {
|
||||
synchronized(brokers) {
|
||||
if (broker != null && this.brokers.indexOf(broker) == -1) {
|
||||
this.brokers.add(broker)
|
||||
logger.info('broker added: {}', broker)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void update(KafkaBroker broker) {
|
||||
synchronized(brokers) {
|
||||
if (broker == null) {
|
||||
return
|
||||
}
|
||||
if (this.brokers.indexOf(broker) != -1) {
|
||||
this.brokers.remove(broker)
|
||||
}
|
||||
this.brokers.add(broker)
|
||||
logger.info('broker updated: {}', broker)
|
||||
}
|
||||
}
|
||||
|
||||
void remove(KafkaBroker broker) {
|
||||
synchronized(brokers) {
|
||||
if (broker != null && this.brokers.remove(broker)) {
|
||||
logger.info('broker removed: {}', broker)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO not sure if this is correct - see BrokerTreeWatcher
|
||||
@SuppressWarnings('ConfusingMethodName')
|
||||
void offline() {
|
||||
this.offline = true
|
||||
}
|
||||
|
||||
// TODO not sure if this is correct - see BrokerTreeWatcher
|
||||
void online() {
|
||||
this.offline = false
|
||||
}
|
||||
|
||||
Collection<KafkaBroker> list() {
|
||||
if (this.offline) {
|
||||
[]
|
||||
}
|
||||
else {
|
||||
this.brokers
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
/**
|
||||
* The BrokerTreeWatcher is a kind of watcher whose sole purpose is
|
||||
* to watch the segment of the Zookeeper tree where Kafka stores broker
|
||||
* information
|
||||
*/
|
||||
@TypeChecked
|
||||
class BrokerTreeWatcher extends AbstractTreeWatcher {
|
||||
private static final String BROKERS_PATH = '/brokers/ids'
|
||||
|
||||
private final List<Closure> onBrokerUpdates
|
||||
private final BrokerManager manager
|
||||
private final PojoFactory factory
|
||||
|
||||
BrokerTreeWatcher(CuratorFramework client) {
|
||||
super(client)
|
||||
|
||||
this.factory = new PojoFactory(new JsonSlurper())
|
||||
this.manager = new BrokerManager()
|
||||
this.onBrokerUpdates = []
|
||||
}
|
||||
|
||||
String zookeeperPath() {
|
||||
return BROKERS_PATH
|
||||
}
|
||||
|
||||
/**
|
||||
* Process events to keep an up to date list of brokers
|
||||
*/
|
||||
@Override
|
||||
void childEvent(CuratorFramework client, TreeCacheEvent event) {
|
||||
switch (event.type) {
|
||||
case TreeCacheEvent.Type.INITIALIZED:
|
||||
this.manager.online()
|
||||
break
|
||||
case TreeCacheEvent.Type.NODE_ADDED:
|
||||
KafkaBroker broker = this.factory.createKafkaBroker(event)
|
||||
this.manager.add(broker)
|
||||
break
|
||||
case TreeCacheEvent.Type.NODE_UPDATED:
|
||||
KafkaBroker broker = this.factory.createKafkaBroker(event)
|
||||
this.manager.update(broker)
|
||||
break
|
||||
case TreeCacheEvent.Type.NODE_REMOVED:
|
||||
KafkaBroker broker = this.factory.createKafkaBroker(event)
|
||||
this.manager.remove(broker)
|
||||
break
|
||||
// TODO these 3 events might come with path which can be mapped
|
||||
// to a specific broker
|
||||
case TreeCacheEvent.Type.CONNECTION_LOST:
|
||||
case TreeCacheEvent.Type.CONNECTION_SUSPENDED:
|
||||
this.manager.offline()
|
||||
break
|
||||
case TreeCacheEvent.Type.CONNECTION_RECONNECTED:
|
||||
this.manager.online()
|
||||
break
|
||||
}
|
||||
|
||||
this.onBrokerUpdates.each { Closure c ->
|
||||
c?.call(this.manager.list())
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
/**
|
||||
* POJO representing data from Zookeeper for a consumer, topic and offset
|
|
@ -0,0 +1,42 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
/**
|
||||
* parsing TreeCacheEvents and provide helper method to access its data in typed manner
|
||||
*/
|
||||
@TypeChecked
|
||||
class EventData {
|
||||
|
||||
private final String data
|
||||
private final List<String> pathParts
|
||||
|
||||
EventData(TreeCacheEvent event) {
|
||||
ChildData data = event.data
|
||||
this.pathParts = data.path == null ? null : (data.path.split('/') as List)
|
||||
this.data = data.data == null ? null : new String(data.data, 'UTF-8')
|
||||
}
|
||||
|
||||
Integer asInteger() {
|
||||
new Integer(data)
|
||||
}
|
||||
|
||||
String asString() {
|
||||
data
|
||||
}
|
||||
|
||||
Integer pathPartsSize() {
|
||||
pathParts.size()
|
||||
}
|
||||
|
||||
Integer getPathPartAsInteger(int pos) {
|
||||
if (pathParts.size() <= pos) {
|
||||
return null
|
||||
}
|
||||
new Integer(pathParts[pos])
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
import groovy.transform.TypeChecked
|
|
@ -0,0 +1,42 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
/**
|
||||
* factory creating Pojo's from TreeCacheEvents
|
||||
*/
|
||||
class PojoFactory {
|
||||
|
||||
private final JsonSlurper json
|
||||
|
||||
PojoFactory(JsonSlurper json) {
|
||||
this.json = json
|
||||
}
|
||||
|
||||
/**
|
||||
* converts an treeCacheEvent into a KafkaBroker. the caller
|
||||
* is responsible for calling the factory method with the
|
||||
* right data. i.e. path starts with /brokers/ids. the implementation
|
||||
* just uses whatever it is given to create a KafkaBroker object
|
||||
*/
|
||||
KafkaBroker createKafkaBroker(TreeCacheEvent event) {
|
||||
EventData data = new EventData(event)
|
||||
// We're expecting paths like: /brokers/ids/1231524312
|
||||
Integer id = data.getPathPartAsInteger(3)
|
||||
if (id == null) {
|
||||
return
|
||||
}
|
||||
String json = data.asString()
|
||||
if (json == null) {
|
||||
new KafkaBroker('', 0, id)
|
||||
}
|
||||
else {
|
||||
Object payload = this.json.parseText(json)
|
||||
new KafkaBroker(payload.host, payload.port, id)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import groovy.transform.TypeChecked
|
||||
import groovy.transform.InheritConstructors
|
|
@ -1,29 +0,0 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
class BrokerTreeWatcherSpec extends Specification {
|
||||
BrokerTreeWatcher watcher
|
||||
|
||||
def setup() {
|
||||
this.watcher = new BrokerTreeWatcher()
|
||||
}
|
||||
|
||||
def "brokerIdFromPath() should return the right ID with a valid path"() {
|
||||
given:
|
||||
String path = "/brokers/ids/1337"
|
||||
|
||||
expect:
|
||||
watcher.brokerIdFromPath(path) == 1337
|
||||
}
|
||||
|
||||
def "brokerIdFromPath() should return -1 on null paths"() {
|
||||
expect:
|
||||
watcher.brokerIdFromPath(null) == -1
|
||||
}
|
||||
|
||||
def "brokerIdFromPath() should return -1 on empty/invalid paths"() {
|
||||
expect:
|
||||
watcher.brokerIdFromPath('/spock/ed') == -1
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package com.github.reiseburo.verspaetung
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
class DelaySpec extends Specification {
|
||||
Delay delay = new Delay()
|
||||
|
||||
def "it should give default on first use"() {
|
||||
given:
|
||||
|
||||
expect:
|
||||
delay.value() == Delay.POLLER_DELAY_MIN
|
||||
}
|
||||
|
||||
def "slower has an upper bound"() {
|
||||
given:
|
||||
for(int i = 1; i < 20; i++) { delay.slower() }
|
||||
def firstLast = delay.value()
|
||||
def result = delay.slower()
|
||||
def secondLast = delay.value()
|
||||
|
||||
expect:
|
||||
firstLast == secondLast
|
||||
firstLast == Delay.POLLER_DELAY_MAX
|
||||
result == false
|
||||
}
|
||||
|
||||
def "increasing delay gives true"() {
|
||||
def result = true
|
||||
for(int i = 1; delay.value() < Delay.POLLER_DELAY_MAX; i++) {
|
||||
result = result && delay.slower()
|
||||
}
|
||||
def last = delay.slower()
|
||||
|
||||
expect:
|
||||
result == true
|
||||
last == false
|
||||
}
|
||||
|
||||
def "reset on min value gives false"() {
|
||||
given:
|
||||
def result = delay.reset()
|
||||
|
||||
expect:
|
||||
result == false
|
||||
}
|
||||
def "reset on none min value gives true"() {
|
||||
given:
|
||||
delay.slower()
|
||||
def result = delay.reset()
|
||||
|
||||
expect:
|
||||
result == true
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung
|
||||
package com.github.reiseburo.verspaetung
|
||||
|
||||
import spock.lang.*
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung
|
||||
package com.github.reiseburo.verspaetung
|
||||
|
||||
import spock.lang.*
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung
|
||||
package com.github.reiseburo.verspaetung
|
||||
|
||||
|
||||
import spock.lang.*
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung
|
||||
package com.github.reiseburo.verspaetung
|
||||
|
||||
import spock.lang.*
|
||||
|
|
@ -1,9 +1,9 @@
|
|||
package com.github.lookout.verspaetung.metrics
|
||||
package com.github.reiseburo.verspaetung.metrics
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
import com.github.lookout.verspaetung.KafkaConsumer
|
||||
import com.github.lookout.verspaetung.TopicPartition
|
||||
import com.github.reiseburo.verspaetung.KafkaConsumer
|
||||
import com.github.reiseburo.verspaetung.TopicPartition
|
||||
|
||||
class ConsumerGaugeSpec extends Specification {
|
||||
private KafkaConsumer consumer
|
|
@ -1,8 +1,8 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
import com.github.lookout.verspaetung.TopicPartition
|
||||
import com.github.reiseburo.verspaetung.TopicPartition
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
|
@ -1,8 +1,8 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
import com.github.lookout.verspaetung.TopicPartition
|
||||
import com.github.reiseburo.verspaetung.TopicPartition
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
|
@ -0,0 +1,95 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
|
||||
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
class BrokerManagerSpec extends Specification {
|
||||
BrokerManager manager = new BrokerManager()
|
||||
|
||||
def "new instance is offline, i.e. has empty list of brokers"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
|
||||
expect:
|
||||
manager.list().size() == 0
|
||||
}
|
||||
|
||||
def "new instance is offline after add broker"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
manager.add(broker)
|
||||
|
||||
expect:
|
||||
manager.list().size() == 0
|
||||
}
|
||||
|
||||
def "list shows brokers after getting online"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
KafkaBroker broker2 = new KafkaBroker('', 0, 123)
|
||||
manager.add(broker)
|
||||
manager.online()
|
||||
manager.remove(broker2)
|
||||
|
||||
expect:
|
||||
manager.list().size() == 1
|
||||
}
|
||||
|
||||
def "can remove brokers based on its id"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
|
||||
manager.add(broker)
|
||||
manager.online()
|
||||
manager.remove(broker2)
|
||||
|
||||
expect:
|
||||
manager.list().size() == 0
|
||||
}
|
||||
|
||||
def "can update brokers"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
|
||||
manager.online()
|
||||
manager.add(broker)
|
||||
manager.update(broker2)
|
||||
|
||||
expect:
|
||||
manager.list().size() == 1
|
||||
manager.list().first().host == 'localhost'
|
||||
}
|
||||
|
||||
def "can go offline anytime"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
|
||||
manager.online()
|
||||
manager.add(broker)
|
||||
manager.add(broker2)
|
||||
manager.offline()
|
||||
|
||||
expect:
|
||||
manager.list().size() == 0
|
||||
}
|
||||
|
||||
def "can go offline and online again"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
|
||||
manager.online()
|
||||
manager.add(broker)
|
||||
manager.add(broker2)
|
||||
manager.offline()
|
||||
manager.online()
|
||||
|
||||
expect:
|
||||
manager.list().size() == 2
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
class EventDataSpec extends Specification {
|
||||
EventData data
|
||||
|
||||
def "converts TreeCacheEvents to EventData"() {
|
||||
given:
|
||||
String path = "/brokers/ids/1337"
|
||||
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||
new ChildData(path, null, "be happy".bytes)))
|
||||
|
||||
expect:
|
||||
data.getPathPartAsInteger(3) == 1337
|
||||
data.asString() == 'be happy'
|
||||
data.pathPartsSize() == 4
|
||||
data.getPathPartAsInteger(4) == null
|
||||
}
|
||||
|
||||
def "converts TreeCacheEvents to EventData with integer payload and no path"() {
|
||||
given:
|
||||
String path = "/brokers/ids/1337"
|
||||
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||
new ChildData("/", null, "42".bytes)))
|
||||
|
||||
expect:
|
||||
data.asString() == '42'
|
||||
data.asInteger() == 42
|
||||
data.pathPartsSize() == 0
|
||||
data.getPathPartAsInteger(0) == null
|
||||
}
|
||||
|
||||
def "converts TreeCacheEvents to EventData with no payload"() {
|
||||
given:
|
||||
String path = "/brokers/ids/1337"
|
||||
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||
new ChildData("/123", null, null)))
|
||||
|
||||
expect:
|
||||
data.asString() == null
|
||||
data.pathPartsSize() == 2
|
||||
data.getPathPartAsInteger(1) == 123
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
|
@ -0,0 +1,52 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
|
||||
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
class PojoFactorySpec extends Specification {
|
||||
PojoFactory factory = new PojoFactory(new JsonSlurper())
|
||||
|
||||
def "create KafkaBroker from TreeCacheEvent"() {
|
||||
given:
|
||||
String path = "/brokers/ids/1337"
|
||||
String payload = "{\"host\":\"localhost\",\"port\":9092}"
|
||||
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||
new ChildData(path, null, payload.bytes))
|
||||
KafkaBroker broker = factory.createKafkaBroker(event)
|
||||
|
||||
expect:
|
||||
broker.brokerId == 1337
|
||||
broker.host == 'localhost'
|
||||
broker.port == 9092
|
||||
}
|
||||
|
||||
def "create KafkaBroker from TreeCacheEvent without brokerId on path"() {
|
||||
given:
|
||||
String path = "/brokers/ids"
|
||||
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||
new ChildData(path, null, ''.bytes))
|
||||
KafkaBroker broker = factory.createKafkaBroker(event)
|
||||
|
||||
expect:
|
||||
broker == null
|
||||
}
|
||||
|
||||
def "create KafkaBroker from TreeCacheEvent without payload"() {
|
||||
given:
|
||||
String path = "/brokers/ids/123"
|
||||
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||
new ChildData(path, null, null))
|
||||
KafkaBroker broker = factory.createKafkaBroker(event)
|
||||
|
||||
expect:
|
||||
broker.brokerId == 123
|
||||
broker.host == ''
|
||||
broker.port == 0
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
Loading…
Reference in New Issue