Compare commits

...

12 Commits

Author SHA1 Message Date
R. Tyler Croy 148359d816
Pretty much any node is going to have Java, because Jenkins, duh 2016-08-03 21:22:45 -07:00
R. Tyler Croy 0bac696847
Archive test results, etc 2015-12-26 21:51:54 -08:00
R. Tyler Croy dc1cfab361 Merge pull request #38 from reiseburo/ci
Add a Jenkinsfile for building
2015-12-26 21:46:07 -08:00
R. Tyler Croy 324b3faba7
Add a Jenkinsfile for building 2015-12-26 21:32:16 -08:00
R. Tyler Croy f901cba7e1
Bump version to release some of mkristian's refactorings 2015-12-26 15:19:42 -08:00
R. Tyler Croy 48a258c3bc Merge pull request #37 from mkristian/broker-watcher-refactor
Broker watcher refactor
2015-12-26 15:13:56 -08:00
Christian Meier 7a6ef71eef refactor the BrokerTreeWatcher
split things into smaller pieces and as preparation of having only
ONE TreeWatcher running which itself dispatch the events to the interresting
parties.

Sponsored by Lookout Inc.
2015-11-10 22:42:07 +01:00
Christian Meier 618aaf915e make the connection or disconnection of consumers failsafe
i.e. ignore errors on disconnect and do not register consumers which
fail to connect
2015-11-10 22:15:57 +01:00
Christian Meier 66c0c332bb make is proper POJO and remove json magic to allow easier testing 2015-11-10 22:15:57 +01:00
Christian Meier 4c655995f9 nicer logging 2015-11-10 22:15:57 +01:00
Christian Meier dc918a700e not needed 2015-11-10 22:15:57 +01:00
R. Tyler Croy 416168cbff
Correct link to download button 2015-11-07 13:18:31 -08:00
14 changed files with 466 additions and 98 deletions

39
Jenkinsfile vendored Normal file
View File

@ -0,0 +1,39 @@
/*
* This is a multibranch workflow file for defining how this project should be
* built, tested and deployed
*/
node {
stage 'Clean workspace'
/* Running on a fresh Docker instance makes this redundant, but just in
* case the host isn't configured to give us a new Docker image for every
* build, make sure we clean things before we do anything
*/
deleteDir()
stage 'Checkout source'
/*
* Represents the SCM configuration in a "Workflow from SCM" project build. Use checkout
* scm to check out sources matching Jenkinsfile with the SCM details from
* the build that is executing this Jenkinsfile.
*
* when not in multibranch: https://issues.jenkins-ci.org/browse/JENKINS-31386
*/
checkout scm
stage 'Build and test'
/* if we can't install everything we need for Ruby in less than 15 minutes
* we might as well just give up
*/
timeout(30) {
sh './gradlew -iS'
}
stage 'Capture test results and artifacts'
step([$class: 'JUnitResultArchiver', testResults: 'build/test-results/**/*.xml'])
step([$class: 'ArtifactArchiver', artifacts: 'build/libs/*.jar', fingerprint: true])
}
// vim: ft=groovy

View File

@ -1,6 +1,6 @@
image:https://travis-ci.org/reiseburo/verspaetung.svg?branch=master["Build Status", link="https://travis-ci.org/reiseburo/verspaetung"]
image::https://api.bintray.com/packages/reiseburo/systems/verspaetung/images/download.svg[link="https://bintray.com/reiseburo/systems/verspaetung/_latestVersion"]
image::https://api.bintray.com/packages/reiseburo/apps/verspaetung/images/download.svg[link="https://bintray.com/reiseburo/apps/verspaetung/_latestVersion"]
Verspätung is a small utility which aims to help identify delay of link:http://kafka.apache.org[Kafka] consumers.

View File

@ -11,7 +11,7 @@ plugins {
group = "com.github.reiseburo"
description = "A utility for monitoring the delay of Kafka consumers"
version = '0.5.0'
version = '0.6.0'
mainClassName = 'com.github.reiseburo.verspaetung.Main'
defaultTasks 'check', 'assemble'

View File

@ -8,12 +8,30 @@ class KafkaBroker {
final private Integer port
final private Integer brokerId
KafkaBroker(Object jsonObject, Integer brokerId) {
this.host = jsonObject.host
this.port = jsonObject.port
KafkaBroker(String host, Integer port, Integer brokerId) {
this.host = host
this.port = port
this.brokerId = brokerId
}
@Override
int hashCode() {
return this.brokerId
}
@Override
boolean equals(Object compared) {
if (this.is(compared)) {
return true
}
if (!(compared instanceof KafkaBroker)) {
return false
}
return compared.brokerId == brokerId
}
@Override
String toString() {
return "broker<${this.brokerId}>@${this.host}:${this.port}"

View File

@ -37,6 +37,7 @@ class KafkaPoller extends Thread {
this.brokerConsumerMap = [:]
this.brokers = []
this.onDelta = []
setName('Verspaetung Kafka Poller')
}
/* There are a number of cases where we intentionally swallow stacktraces
@ -88,7 +89,7 @@ class KafkaPoller extends Thread {
}
}
@SuppressWarnings(['CatchException'])
@SuppressWarnings('CatchException')
private void dumpMetadata() {
LOGGER.debug('dumping meta-data')
@ -157,18 +158,24 @@ class KafkaPoller extends Thread {
/**
* Blocking reconnect to the Kafka brokers
*/
@SuppressWarnings('CatchException')
private void reconnect() {
disconnectConsumers()
LOGGER.info('Creating SimpleConsumer connections for brokers')
LOGGER.info('Creating SimpleConsumer connections for brokers {}', this.brokers)
synchronized(this.brokers) {
this.brokers.each { Broker b ->
SimpleConsumer consumer = new SimpleConsumer(b.host,
b.port,
this.brokers.each { Broker broker ->
SimpleConsumer consumer = new SimpleConsumer(broker.host,
broker.port,
KAFKA_TIMEOUT,
KAFKA_BUFFER,
KAFKA_CLIENT_ID)
consumer.connect()
this.brokerConsumerMap[b.id] = consumer
try {
consumer.connect()
this.brokerConsumerMap[broker.id] = consumer
}
catch (Exception e) {
LOGGER.info('Error connecting cunsumer to {}', broker, e)
}
}
}
this.shouldReconnect = false
@ -181,10 +188,16 @@ class KafkaPoller extends Thread {
this.keepRunning = false
}
@SuppressWarnings('CatchException')
private void disconnectConsumers() {
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
LOGGER.info('Disconnecting {}', client)
client?.disconnect()
try {
client?.disconnect()
}
catch (Exception e) {
LOGGER.info('Error disconnecting {}', client, e)
}
}
}

View File

@ -169,9 +169,6 @@ class Main {
}
});
logger.info('Starting wait loop...')
synchronized(this) {
wait()
}
}
static void registerMetricFor(KafkaConsumer consumer,

View File

@ -0,0 +1,73 @@
package com.github.reiseburo.verspaetung.zk
import com.github.reiseburo.verspaetung.KafkaBroker
import groovy.transform.TypeChecked
import org.slf4j.Logger
import org.slf4j.LoggerFactory
/**
* manage a list of brokers. can be online or offline. offline means the
* internal list is hidden, i.e. the list() gives you an empty list.
*/
@TypeChecked
class BrokerManager {
private static final Logger logger = LoggerFactory.getLogger(BrokerManager)
private final List<KafkaBroker> brokers = Collections.synchronizedList([])
// we start with being offline
private boolean offline = true
void add(KafkaBroker broker) {
synchronized(brokers) {
if (broker != null && this.brokers.indexOf(broker) == -1) {
this.brokers.add(broker)
logger.info('broker added: {}', broker)
}
}
}
void update(KafkaBroker broker) {
synchronized(brokers) {
if (broker == null) {
return
}
if (this.brokers.indexOf(broker) != -1) {
this.brokers.remove(broker)
}
this.brokers.add(broker)
logger.info('broker updated: {}', broker)
}
}
void remove(KafkaBroker broker) {
synchronized(brokers) {
if (broker != null && this.brokers.remove(broker)) {
logger.info('broker removed: {}', broker)
}
}
}
// TODO not sure if this is correct - see BrokerTreeWatcher
@SuppressWarnings('ConfusingMethodName')
void offline() {
this.offline = true
}
// TODO not sure if this is correct - see BrokerTreeWatcher
void online() {
this.offline = false
}
Collection<KafkaBroker> list() {
if (this.offline) {
[]
}
else {
this.brokers
}
}
}

View File

@ -6,7 +6,6 @@ import groovy.json.JsonSlurper
import groovy.transform.TypeChecked
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
@ -16,19 +15,17 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent
*/
@TypeChecked
class BrokerTreeWatcher extends AbstractTreeWatcher {
static final Integer INVALID_BROKER_ID = -1
private static final String BROKERS_PATH = '/brokers/ids'
private Boolean isTreeInitialized = false
private final JsonSlurper json
private final List<Closure> onBrokerUpdates
private final List<KafkaBroker> brokers
private final BrokerManager manager
private final PojoFactory factory
BrokerTreeWatcher(CuratorFramework client) {
super(client)
this.json = new JsonSlurper()
this.brokers = Collections.synchronizedList([])
this.factory = new PojoFactory(new JsonSlurper())
this.manager = new BrokerManager()
this.onBrokerUpdates = []
}
@ -37,57 +34,39 @@ class BrokerTreeWatcher extends AbstractTreeWatcher {
}
/**
* Process events like NODE_ADDED and NODE_REMOVED to keep an up to date
* list of brokers
* Process events to keep an up to date list of brokers
*/
@Override
void childEvent(CuratorFramework client, TreeCacheEvent event) {
/* If we're initialized that means we should have all our brokers in
* our internal list already and we can fire an event
*/
if (event.type == TreeCacheEvent.Type.INITIALIZED) {
this.isTreeInitialized = true
this.onBrokerUpdates.each { Closure c ->
c?.call(this.brokers)
}
return
switch (event.type) {
case TreeCacheEvent.Type.INITIALIZED:
this.manager.online()
break
case TreeCacheEvent.Type.NODE_ADDED:
KafkaBroker broker = this.factory.createKafkaBroker(event)
this.manager.add(broker)
break
case TreeCacheEvent.Type.NODE_UPDATED:
KafkaBroker broker = this.factory.createKafkaBroker(event)
this.manager.update(broker)
break
case TreeCacheEvent.Type.NODE_REMOVED:
KafkaBroker broker = this.factory.createKafkaBroker(event)
this.manager.remove(broker)
break
// TODO these 3 events might come with path which can be mapped
// to a specific broker
case TreeCacheEvent.Type.CONNECTION_LOST:
case TreeCacheEvent.Type.CONNECTION_SUSPENDED:
this.manager.offline()
break
case TreeCacheEvent.Type.CONNECTION_RECONNECTED:
this.manager.online()
break
}
if (event.type != TreeCacheEvent.Type.NODE_ADDED) {
return
this.onBrokerUpdates.each { Closure c ->
c?.call(this.manager.list())
}
ChildData nodeData = event.data
Integer brokerId = brokerIdFromPath(nodeData.path)
if (brokerId == INVALID_BROKER_ID) {
return
}
Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8'))
this.brokers << new KafkaBroker(brokerData, brokerId)
if (this.isTreeInitialized) {
this.onBrokerUpdates.each { Closure c ->
c?.call(this.brokers)
}
}
}
/**
* Process a path string from Zookeeper for the Kafka broker's ID
*
* We're expecting paths like: /brokers/ids/1231524312
*/
Integer brokerIdFromPath(String path) {
List<String> pathParts = path?.split('/') as List
if ((pathParts == null) ||
(pathParts.size() != 4)) {
return INVALID_BROKER_ID
}
return new Integer(pathParts[-1])
}
}

View File

@ -0,0 +1,42 @@
package com.github.reiseburo.verspaetung.zk
import groovy.transform.TypeChecked
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
* parsing TreeCacheEvents and provide helper method to access its data in typed manner
*/
@TypeChecked
class EventData {
private final String data
private final List<String> pathParts
EventData(TreeCacheEvent event) {
ChildData data = event.data
this.pathParts = data.path == null ? null : (data.path.split('/') as List)
this.data = data.data == null ? null : new String(data.data, 'UTF-8')
}
Integer asInteger() {
new Integer(data)
}
String asString() {
data
}
Integer pathPartsSize() {
pathParts.size()
}
Integer getPathPartAsInteger(int pos) {
if (pathParts.size() <= pos) {
return null
}
new Integer(pathParts[pos])
}
}

View File

@ -0,0 +1,42 @@
package com.github.reiseburo.verspaetung.zk
import com.github.reiseburo.verspaetung.KafkaBroker
import groovy.json.JsonSlurper
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
* factory creating Pojo's from TreeCacheEvents
*/
class PojoFactory {
private final JsonSlurper json
PojoFactory(JsonSlurper json) {
this.json = json
}
/**
* converts an treeCacheEvent into a KafkaBroker. the caller
* is responsible for calling the factory method with the
* right data. i.e. path starts with /brokers/ids. the implementation
* just uses whatever it is given to create a KafkaBroker object
*/
KafkaBroker createKafkaBroker(TreeCacheEvent event) {
EventData data = new EventData(event)
// We're expecting paths like: /brokers/ids/1231524312
Integer id = data.getPathPartAsInteger(3)
if (id == null) {
return
}
String json = data.asString()
if (json == null) {
new KafkaBroker('', 0, id)
}
else {
Object payload = this.json.parseText(json)
new KafkaBroker(payload.host, payload.port, id)
}
}
}

View File

@ -0,0 +1,95 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import groovy.json.JsonSlurper
import com.github.reiseburo.verspaetung.KafkaBroker
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class BrokerManagerSpec extends Specification {
BrokerManager manager = new BrokerManager()
def "new instance is offline, i.e. has empty list of brokers"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
expect:
manager.list().size() == 0
}
def "new instance is offline after add broker"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
manager.add(broker)
expect:
manager.list().size() == 0
}
def "list shows brokers after getting online"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('', 0, 123)
manager.add(broker)
manager.online()
manager.remove(broker2)
expect:
manager.list().size() == 1
}
def "can remove brokers based on its id"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
manager.add(broker)
manager.online()
manager.remove(broker2)
expect:
manager.list().size() == 0
}
def "can update brokers"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
manager.online()
manager.add(broker)
manager.update(broker2)
expect:
manager.list().size() == 1
manager.list().first().host == 'localhost'
}
def "can go offline anytime"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
manager.online()
manager.add(broker)
manager.add(broker2)
manager.offline()
expect:
manager.list().size() == 0
}
def "can go offline and online again"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
manager.online()
manager.add(broker)
manager.add(broker2)
manager.offline()
manager.online()
expect:
manager.list().size() == 2
}
}

View File

@ -1,29 +0,0 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
class BrokerTreeWatcherSpec extends Specification {
BrokerTreeWatcher watcher
def setup() {
this.watcher = new BrokerTreeWatcher()
}
def "brokerIdFromPath() should return the right ID with a valid path"() {
given:
String path = "/brokers/ids/1337"
expect:
watcher.brokerIdFromPath(path) == 1337
}
def "brokerIdFromPath() should return -1 on null paths"() {
expect:
watcher.brokerIdFromPath(null) == -1
}
def "brokerIdFromPath() should return -1 on empty/invalid paths"() {
expect:
watcher.brokerIdFromPath('/spock/ed') == -1
}
}

View File

@ -0,0 +1,47 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class EventDataSpec extends Specification {
EventData data
def "converts TreeCacheEvents to EventData"() {
given:
String path = "/brokers/ids/1337"
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, "be happy".bytes)))
expect:
data.getPathPartAsInteger(3) == 1337
data.asString() == 'be happy'
data.pathPartsSize() == 4
data.getPathPartAsInteger(4) == null
}
def "converts TreeCacheEvents to EventData with integer payload and no path"() {
given:
String path = "/brokers/ids/1337"
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData("/", null, "42".bytes)))
expect:
data.asString() == '42'
data.asInteger() == 42
data.pathPartsSize() == 0
data.getPathPartAsInteger(0) == null
}
def "converts TreeCacheEvents to EventData with no payload"() {
given:
String path = "/brokers/ids/1337"
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData("/123", null, null)))
expect:
data.asString() == null
data.pathPartsSize() == 2
data.getPathPartAsInteger(1) == 123
}
}

View File

@ -0,0 +1,52 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import groovy.json.JsonSlurper
import com.github.reiseburo.verspaetung.KafkaBroker
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class PojoFactorySpec extends Specification {
PojoFactory factory = new PojoFactory(new JsonSlurper())
def "create KafkaBroker from TreeCacheEvent"() {
given:
String path = "/brokers/ids/1337"
String payload = "{\"host\":\"localhost\",\"port\":9092}"
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, payload.bytes))
KafkaBroker broker = factory.createKafkaBroker(event)
expect:
broker.brokerId == 1337
broker.host == 'localhost'
broker.port == 9092
}
def "create KafkaBroker from TreeCacheEvent without brokerId on path"() {
given:
String path = "/brokers/ids"
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, ''.bytes))
KafkaBroker broker = factory.createKafkaBroker(event)
expect:
broker == null
}
def "create KafkaBroker from TreeCacheEvent without payload"() {
given:
String path = "/brokers/ids/123"
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, null))
KafkaBroker broker = factory.createKafkaBroker(event)
expect:
broker.brokerId == 123
broker.host == ''
broker.port == 0
}
}