Compare commits
58 Commits
Author | SHA1 | Date |
---|---|---|
R. Tyler Croy | 148359d816 | |
R. Tyler Croy | 0bac696847 | |
R. Tyler Croy | dc1cfab361 | |
R. Tyler Croy | 324b3faba7 | |
R. Tyler Croy | f901cba7e1 | |
R. Tyler Croy | 48a258c3bc | |
Christian Meier | 7a6ef71eef | |
Christian Meier | 618aaf915e | |
Christian Meier | 66c0c332bb | |
Christian Meier | 4c655995f9 | |
Christian Meier | dc918a700e | |
R. Tyler Croy | 416168cbff | |
R. Tyler Croy | 68ce9cd72e | |
R. Tyler Croy | 9af40ccdec | |
R. Tyler Croy | 1525ae30ac | |
R. Tyler Croy | fb4f152fed | |
R. Tyler Croy | f15aeaee0c | |
Christian Meier | 98af950498 | |
Christian Meier | 439e8f635e | |
Christian Meier | 45d98cb5f6 | |
Christian Meier | 59c8e79f4e | |
R. Tyler Croy | 07bf319972 | |
R. Tyler Croy | 3778a24044 | |
R. Tyler Croy | 921ef7be64 | |
R. Tyler Croy | 7b918d448a | |
R. Tyler Croy | 4f416e84dd | |
R. Tyler Croy | e18ac81468 | |
R. Tyler Croy | fc97f67100 | |
R. Tyler Croy | 0168f5c805 | |
R. Tyler Croy | b6bbcf92a4 | |
R. Tyler Croy | be33333bfa | |
R. Tyler Croy | 5df6083e6d | |
R. Tyler Croy | 623aa6b10a | |
R. Tyler Croy | 9535100528 | |
R. Tyler Croy | 457d2a195c | |
R. Tyler Croy | e01e699861 | |
R. Tyler Croy | f968a2c1c8 | |
R. Tyler Croy | 8c7046aa44 | |
R. Tyler Croy | 39db2912ad | |
R. Tyler Croy | 08e4076f95 | |
R. Tyler Croy | cb3465f715 | |
R. Tyler Croy | 27d51c84e4 | |
R. Tyler Croy | 1c588f6bff | |
R. Tyler Croy | 5cc83eef08 | |
R. Tyler Croy | 685f0353f2 | |
R. Tyler Croy | 81d15d53ec | |
R. Tyler Croy | f053d9b68c | |
R. Tyler Croy | 3a9caa2535 | |
R. Tyler Croy | dc33298435 | |
R. Tyler Croy | 789e7e863c | |
R. Tyler Croy | 38e8c62e00 | |
R. Tyler Croy | 71347594ca | |
R. Tyler Croy | f4042894fe | |
R. Tyler Croy | 7b13050ee2 | |
R. Tyler Croy | 0fa983d0e8 | |
R. Tyler Croy | 95c244867a | |
R. Tyler Croy | 0c59dcee70 | |
R. Tyler Croy | 100b6ab28a |
|
@ -1,3 +1,5 @@
|
||||||
.gradle*
|
.gradle*
|
||||||
*.sw*
|
*.sw*
|
||||||
build/
|
build/
|
||||||
|
.idea/*
|
||||||
|
*.iml
|
||||||
|
|
17
.travis.yml
17
.travis.yml
|
@ -1,4 +1,19 @@
|
||||||
language: java
|
language: java
|
||||||
jdk:
|
jdk:
|
||||||
- oraclejdk7
|
|
||||||
- oraclejdk8
|
- oraclejdk8
|
||||||
|
|
||||||
|
# Using 'install' to run the clean to avoid Travis automatically calling
|
||||||
|
# `./gradlew assemble and duplicating my work
|
||||||
|
install:
|
||||||
|
- ./gradlew clean
|
||||||
|
|
||||||
|
# Invoke our default tasks, whatever is defined as important in the
|
||||||
|
# build.gradle file
|
||||||
|
script:
|
||||||
|
- ./gradlew
|
||||||
|
|
||||||
|
env:
|
||||||
|
global:
|
||||||
|
- secure: X71NKVXJjyG1C6/fZUTxdQ6HwAcaNRWSc0m/VRXsaiZwgXjIr+C+Uz9X4iu6Q52YAc+7CrvftAbIxgQhf+TBtvvnGeZgWXuYqf6Cx1weyL/6xsttOLsEGdtHU9jvrtw4tHRXxu/6F+8QAot8VRwUsCB4IL5Y3epshddbk+/1d9Q=
|
||||||
|
|
||||||
|
after_success: "./gradlew bintrayUpload -PbintrayUser=rtyler -PbintrayKey=${BINTRAY_KEY}"
|
||||||
|
|
|
@ -0,0 +1,23 @@
|
||||||
|
=== Hacking Verspätung
|
||||||
|
|
||||||
|
Verspätung is a link:http://groovy-lang.org[Groovy]-based application which is
|
||||||
|
built with link:http://gradle.org[Gradle]. As such, if you are already familiar
|
||||||
|
with these tools you should be able to find your way around the project with
|
||||||
|
relative ease.
|
||||||
|
|
||||||
|
|
||||||
|
A quick primer on what tasks are available:
|
||||||
|
|
||||||
|
* Running tests: `./gradlew check`
|
||||||
|
* Running the app locally: `./gradlew run -PzookeeperHosts=localhost:2181`
|
||||||
|
* Building the app for distribution: `./gradlew assemble`
|
||||||
|
|
||||||
|
|
||||||
|
=== Releasing Verspätung
|
||||||
|
|
||||||
|
NOTE: This is mostly meant for the developer team.
|
||||||
|
|
||||||
|
Currently releases can be produced by simply pushing a Git tag to this GitHub
|
||||||
|
repository. This will cause Travis CI to build and test the tag, which if it is
|
||||||
|
successful, will automatically publish to
|
||||||
|
link:https://bintray.com/reiseburo/apps/verspaetung[Bintray].
|
|
@ -0,0 +1,39 @@
|
||||||
|
/*
|
||||||
|
* This is a multibranch workflow file for defining how this project should be
|
||||||
|
* built, tested and deployed
|
||||||
|
*/
|
||||||
|
|
||||||
|
node {
|
||||||
|
stage 'Clean workspace'
|
||||||
|
/* Running on a fresh Docker instance makes this redundant, but just in
|
||||||
|
* case the host isn't configured to give us a new Docker image for every
|
||||||
|
* build, make sure we clean things before we do anything
|
||||||
|
*/
|
||||||
|
deleteDir()
|
||||||
|
|
||||||
|
|
||||||
|
stage 'Checkout source'
|
||||||
|
/*
|
||||||
|
* Represents the SCM configuration in a "Workflow from SCM" project build. Use checkout
|
||||||
|
* scm to check out sources matching Jenkinsfile with the SCM details from
|
||||||
|
* the build that is executing this Jenkinsfile.
|
||||||
|
*
|
||||||
|
* when not in multibranch: https://issues.jenkins-ci.org/browse/JENKINS-31386
|
||||||
|
*/
|
||||||
|
checkout scm
|
||||||
|
|
||||||
|
|
||||||
|
stage 'Build and test'
|
||||||
|
/* if we can't install everything we need for Ruby in less than 15 minutes
|
||||||
|
* we might as well just give up
|
||||||
|
*/
|
||||||
|
timeout(30) {
|
||||||
|
sh './gradlew -iS'
|
||||||
|
}
|
||||||
|
|
||||||
|
stage 'Capture test results and artifacts'
|
||||||
|
step([$class: 'JUnitResultArchiver', testResults: 'build/test-results/**/*.xml'])
|
||||||
|
step([$class: 'ArtifactArchiver', artifacts: 'build/libs/*.jar', fingerprint: true])
|
||||||
|
}
|
||||||
|
|
||||||
|
// vim: ft=groovy
|
|
@ -1,4 +1,4 @@
|
||||||
*Copyright (c) 2015 Lookout, Inc*
|
*Copyright (c) 2015 Lookout, Inc, R. Tyler Croy*
|
||||||
|
|
||||||
MIT License
|
MIT License
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,38 @@
|
||||||
|
image:https://travis-ci.org/reiseburo/verspaetung.svg?branch=master["Build Status", link="https://travis-ci.org/reiseburo/verspaetung"]
|
||||||
|
|
||||||
|
image::https://api.bintray.com/packages/reiseburo/apps/verspaetung/images/download.svg[link="https://bintray.com/reiseburo/apps/verspaetung/_latestVersion"]
|
||||||
|
|
||||||
|
Verspätung is a small utility which aims to help identify delay of link:http://kafka.apache.org[Kafka] consumers.
|
||||||
|
|
||||||
|
|
||||||
|
Verspätung monitors the topics and their latest offsets by talking to Kafka, it
|
||||||
|
will also keep track of how far along consumers are by monitoring the offsets
|
||||||
|
that they have committed to Zookeeper. Using both of these pieces of
|
||||||
|
information, Verspätung computs the delta for each of the consumer groups and
|
||||||
|
reports it to statsd.
|
||||||
|
|
||||||
|
=== Using
|
||||||
|
|
||||||
|
% java -jar verspaetung-*-all.jar --help
|
||||||
|
usage: verspaetung
|
||||||
|
-d,--delay <DELAY> Seconds to delay between reporting metrics to
|
||||||
|
the metrics receiver (defaults: 5s)
|
||||||
|
-H,--statsd-host <STATSD> Hostname for a statsd instance (defaults to
|
||||||
|
localhost)
|
||||||
|
-n,--dry-run Disable reporting to a statsd host
|
||||||
|
-p,--statsd-port <PORT> Port for the statsd instance (defaults to
|
||||||
|
8125)
|
||||||
|
--prefix <PREFIX> Prefix all metrics with PREFIX before they're
|
||||||
|
reported (e.g. PREFIX.verspaetung.mytopic)
|
||||||
|
-s,--storm Watch Storm KafkaSpout offsets (under
|
||||||
|
/kafka_spout)
|
||||||
|
-x,--exclude <EXCLUDES> Regular expression for consumer groups to
|
||||||
|
exclude from reporting (can be declared
|
||||||
|
multiple times)
|
||||||
|
-z,--zookeeper <HOSTS> Comma separated list of Zookeeper hosts (e.g.
|
||||||
|
localhost:2181)
|
||||||
|
|
||||||
|
Running Verspätung is rather easy, by default the daemon will monitor the
|
||||||
|
standard Kafka high-level consumer offset path of `/consumers` and start
|
||||||
|
reporting deltas automatically.
|
||||||
|
|
20
README.md
20
README.md
|
@ -1,20 +0,0 @@
|
||||||
# Verspätung
|
|
||||||
|
|
||||||
[![Build
|
|
||||||
Status](https://travis-ci.org/lookout/verspaetung.svg)](https://travis-ci.org/lookout/verspaetung)
|
|
||||||
|
|
||||||
Verspätung is a small utility which aims to help identify delay of
|
|
||||||
[Kafka](http://kafka.apache.org) consumers.
|
|
||||||
|
|
||||||
Verspätung monitors the topics and their latest offsets by talking to Kafka, it
|
|
||||||
will also keep track of how far along consumers are by monitoring the offsets
|
|
||||||
that they have committed to Zookeeper. Using both of these pieces of
|
|
||||||
information, Verspätung computs the delta for each of the consumer groups and
|
|
||||||
reports it to statsd.
|
|
||||||
|
|
||||||
|
|
||||||
### Hacking
|
|
||||||
|
|
||||||
* *Running tests:* `./gradlew check`
|
|
||||||
* *Running the app locally:* `./gradlew run -PzookeeperHosts=localhost:2181`
|
|
||||||
* *Building the app for distribution:* `./gradlew shadowJar`
|
|
128
build.gradle
128
build.gradle
|
@ -1,17 +1,36 @@
|
||||||
plugins {
|
plugins {
|
||||||
id "com.jfrog.bintray" version "1.0"
|
id "com.jfrog.bintray" version "1.0"
|
||||||
id 'com.github.johnrengelman.shadow' version '1.2.0'
|
id 'com.github.johnrengelman.shadow' version '1.2.0'
|
||||||
|
id "org.ajoberstar.github-pages" version "1.2.0"
|
||||||
|
id "org.asciidoctor.gradle.asciidoctor" version "1.5.1"
|
||||||
|
id 'codenarc'
|
||||||
|
id 'groovy'
|
||||||
|
id 'idea'
|
||||||
|
id 'application'
|
||||||
}
|
}
|
||||||
apply plugin: 'groovy'
|
|
||||||
apply plugin: 'application'
|
|
||||||
|
|
||||||
group = "com.github.lookout"
|
group = "com.github.reiseburo"
|
||||||
description = "A utility for monitoring the delay of Kafka consumers"
|
description = "A utility for monitoring the delay of Kafka consumers"
|
||||||
version = '0.1.4'
|
version = '0.6.0'
|
||||||
mainClassName = 'com.github.lookout.verspaetung.Main'
|
mainClassName = 'com.github.reiseburo.verspaetung.Main'
|
||||||
defaultTasks 'clean', 'check'
|
defaultTasks 'check', 'assemble'
|
||||||
sourceCompatibility = '1.7'
|
|
||||||
targetCompatibility = '1.7'
|
/* Ensure we properly build for JDK7 still */
|
||||||
|
plugins.withType(JavaPlugin) {
|
||||||
|
sourceCompatibility = 1.7
|
||||||
|
targetCompatibility = 1.7
|
||||||
|
|
||||||
|
|
||||||
|
project.tasks.withType(JavaCompile) { task ->
|
||||||
|
task.sourceCompatibility = project.sourceCompatibility
|
||||||
|
task.targetCompatibility = project.targetCompatibility
|
||||||
|
}
|
||||||
|
|
||||||
|
project.tasks.withType(GroovyCompile) { task ->
|
||||||
|
task.sourceCompatibility = project.sourceCompatibility
|
||||||
|
task.targetCompatibility = project.targetCompatibility
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
@ -31,14 +50,13 @@ test {
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
// DEPENDENCY MANAGEMENT
|
// DEPENDENCY MANAGEMENT
|
||||||
repositories {
|
repositories {
|
||||||
mavenCentral()
|
|
||||||
jcenter()
|
jcenter()
|
||||||
|
/* needed for forked metrics library */
|
||||||
maven { url 'https://dl.bintray.com/rtyler/maven' }
|
maven { url 'https://dl.bintray.com/lookout/systems' }
|
||||||
}
|
}
|
||||||
|
|
||||||
dependencies {
|
dependencies {
|
||||||
compile 'org.codehaus.groovy:groovy-all:2.4.0+'
|
compile 'org.codehaus.groovy:groovy-all:2.4.3+'
|
||||||
|
|
||||||
[
|
[
|
||||||
'curator-framework',
|
'curator-framework',
|
||||||
|
@ -55,21 +73,61 @@ dependencies {
|
||||||
/* Needed for command line options parsing */
|
/* Needed for command line options parsing */
|
||||||
compile 'commons-cli:commons-cli:1.2+'
|
compile 'commons-cli:commons-cli:1.2+'
|
||||||
|
|
||||||
compile 'com.timgroup:java-statsd-client:3.1.2+'
|
compile 'com.github.lookout:metrics-datadog:0.1.3'
|
||||||
|
|
||||||
|
['metrics-core', 'metrics-graphite'].each { artifactName ->
|
||||||
|
compile "io.dropwizard.metrics:${artifactName}:3.1.0"
|
||||||
|
}
|
||||||
|
|
||||||
/* Logback is to be used for logging through the app */
|
/* Logback is to be used for logging through the app */
|
||||||
compile 'ch.qos.logback:logback-classic:1.1.2+'
|
compile 'ch.qos.logback:logback-classic:1.1.2+'
|
||||||
|
|
||||||
|
codenarc "org.codenarc:CodeNarc:0.24"
|
||||||
|
|
||||||
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0'
|
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0'
|
||||||
testCompile 'cglib:cglib-nodep:2.2.+'
|
testCompile 'cglib:cglib-nodep:2.2.+'
|
||||||
}
|
}
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
|
||||||
|
|
||||||
|
idea {
|
||||||
////////////////////////////////////////////////////////////////////////////////
|
module {
|
||||||
jar {
|
downloadJavadoc = true
|
||||||
enabled = false
|
downloadSources = true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// PUBLISHING/DOCUMENTATION
|
||||||
|
assemble.dependsOn groovydoc
|
||||||
|
|
||||||
|
asciidoctor {
|
||||||
|
/* Using a single backend, so skipping the html5/ output dir */
|
||||||
|
separateOutputDirs false
|
||||||
|
}
|
||||||
|
assemble.dependsOn asciidoctor
|
||||||
|
|
||||||
|
githubPages {
|
||||||
|
repoUri = 'git@github.com:lookout/verspaetung.git'
|
||||||
|
pages {
|
||||||
|
into('groovydoc') { from groovydoc }
|
||||||
|
from asciidoctor
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////
|
||||||
|
codenarc {
|
||||||
|
configFile file("${projectDir}/gradle/codenarc.rules")
|
||||||
|
sourceSets = [sourceSets.main]
|
||||||
|
}
|
||||||
|
codenarcMain {
|
||||||
|
exclude '**/Main.groovy'
|
||||||
|
}
|
||||||
|
|
||||||
shadowJar {
|
shadowJar {
|
||||||
exclude 'META-INF/*.RSA', 'META-INF/*.DSA'
|
exclude 'META-INF/*.RSA', 'META-INF/*.DSA'
|
||||||
manifest {
|
manifest {
|
||||||
|
@ -84,17 +142,45 @@ artifacts {
|
||||||
archives shadowJar
|
archives shadowJar
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sourceSets {
|
||||||
|
main {
|
||||||
|
groovy {
|
||||||
|
exclude '**/*.sw*'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* https://github.com/reiseburo/verspaetung/issues/28
|
||||||
|
*
|
||||||
|
* disable the distZip and distTar tasks since we only need the shadow jar
|
||||||
|
*/
|
||||||
|
distZip.enabled = false
|
||||||
|
distTar.enabled = false
|
||||||
|
|
||||||
|
/* We're not building a library jar so we'll disable this default jar task */
|
||||||
|
jar.enabled = false
|
||||||
|
/* Remove the "library" jar from the archives configuration so it's not
|
||||||
|
* published
|
||||||
|
*/
|
||||||
|
configurations.archives.artifacts.removeAll { it.archiveTask.is jar }
|
||||||
|
|
||||||
|
|
||||||
bintray {
|
bintray {
|
||||||
user = project.bintrayUser
|
user = project.bintrayUser
|
||||||
key = project.bintrayKey
|
key = project.bintrayKey
|
||||||
publish = true
|
publish = true
|
||||||
dryRun = false
|
/*
|
||||||
|
* Only only publish when we're tagging a release and if we've executed on
|
||||||
|
* a JDK8 build. This is to prevent multiple attempts by the build matrix
|
||||||
|
* to publish the artifacts
|
||||||
|
*/
|
||||||
|
dryRun = !((System.env.TRAVIS_TAG as boolean) && (System.env.TRAVIS_JDK_VERSION == 'oraclejdk8'))
|
||||||
configurations = ['archives']
|
configurations = ['archives']
|
||||||
|
|
||||||
pkg {
|
pkg {
|
||||||
userOrg = 'lookout'
|
userOrg = 'reiseburo'
|
||||||
repo = 'systems'
|
repo = 'apps'
|
||||||
name = 'verspaetung'
|
name = 'verspaetung'
|
||||||
labels = []
|
labels = []
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
|
||||||
|
zookeeper:
|
||||||
|
image: wurstmeister/zookeeper
|
||||||
|
ports:
|
||||||
|
- 2181:2181
|
||||||
|
kafka:
|
||||||
|
image: wurstmeister/kafka
|
||||||
|
ports:
|
||||||
|
- 9092:9092
|
||||||
|
links:
|
||||||
|
- zookeeper:zk
|
||||||
|
environment:
|
||||||
|
# Only using one node, so we're fine with localhost
|
||||||
|
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
|
||||||
|
volumes:
|
||||||
|
- /var/run/docker.sock:/var/run/docker.sock
|
|
@ -1,3 +1,6 @@
|
||||||
org.gradle.daemon=true
|
# Disabling daemon mode since it makes PermGen leaks with codenarc more common
|
||||||
|
org.gradle.daemon=false
|
||||||
bintrayUser=
|
bintrayUser=
|
||||||
bintrayKey=
|
bintrayKey=
|
||||||
|
sourceCompatibility=1.7
|
||||||
|
targetCompatibility=1.7
|
||||||
|
|
|
@ -0,0 +1,379 @@
|
||||||
|
ruleset {
|
||||||
|
|
||||||
|
description '''
|
||||||
|
A Sample Groovy RuleSet containing all CodeNarc Rules
|
||||||
|
You can use this as a template for your own custom RuleSet.
|
||||||
|
Just delete the rules that you don't want to include.
|
||||||
|
'''
|
||||||
|
|
||||||
|
AbcComplexity // DEPRECATED: Use the AbcMetric rule instead. Requires the GMetrics jar
|
||||||
|
AbcMetric // Requires the GMetrics jar
|
||||||
|
AbstractClassName
|
||||||
|
AbstractClassWithPublicConstructor
|
||||||
|
AbstractClassWithoutAbstractMethod
|
||||||
|
AddEmptyString
|
||||||
|
AssertWithinFinallyBlock
|
||||||
|
AssignCollectionSort
|
||||||
|
AssignCollectionUnique
|
||||||
|
AssignmentInConditional
|
||||||
|
AssignmentToStaticFieldFromInstanceMethod
|
||||||
|
BigDecimalInstantiation
|
||||||
|
BitwiseOperatorInConditional
|
||||||
|
BlankLineBeforePackage
|
||||||
|
BooleanGetBoolean
|
||||||
|
BooleanMethodReturnsNull
|
||||||
|
BracesForClass
|
||||||
|
BracesForForLoop
|
||||||
|
BracesForIfElse
|
||||||
|
BracesForMethod
|
||||||
|
BracesForTryCatchFinally
|
||||||
|
BrokenNullCheck
|
||||||
|
BrokenOddnessCheck
|
||||||
|
BuilderMethodWithSideEffects
|
||||||
|
BusyWait
|
||||||
|
CatchArrayIndexOutOfBoundsException
|
||||||
|
CatchError
|
||||||
|
CatchException
|
||||||
|
CatchIllegalMonitorStateException
|
||||||
|
CatchIndexOutOfBoundsException
|
||||||
|
CatchNullPointerException
|
||||||
|
CatchRuntimeException
|
||||||
|
CatchThrowable
|
||||||
|
ChainedTest
|
||||||
|
ClassForName
|
||||||
|
ClassJavadoc
|
||||||
|
ClassName
|
||||||
|
ClassNameSameAsFilename
|
||||||
|
ClassNameSameAsSuperclass
|
||||||
|
ClassSize
|
||||||
|
CloneWithoutCloneable
|
||||||
|
CloneableWithoutClone
|
||||||
|
CloseWithoutCloseable
|
||||||
|
ClosureAsLastMethodParameter
|
||||||
|
ClosureStatementOnOpeningLineOfMultipleLineClosure
|
||||||
|
CollectAllIsDeprecated
|
||||||
|
CompareToWithoutComparable
|
||||||
|
ComparisonOfTwoConstants
|
||||||
|
ComparisonWithSelf
|
||||||
|
ConfusingClassNamedException
|
||||||
|
ConfusingMethodName
|
||||||
|
ConfusingMultipleReturns
|
||||||
|
ConfusingTernary
|
||||||
|
ConsecutiveBlankLines
|
||||||
|
ConsecutiveLiteralAppends
|
||||||
|
ConsecutiveStringConcatenation
|
||||||
|
ConstantAssertExpression
|
||||||
|
ConstantIfExpression
|
||||||
|
ConstantTernaryExpression
|
||||||
|
ConstantsOnlyInterface
|
||||||
|
CouldBeElvis
|
||||||
|
CoupledTestCase
|
||||||
|
CrapMetric // Requires the GMetrics jar and a Cobertura coverage file
|
||||||
|
CyclomaticComplexity // Requires the GMetrics jar
|
||||||
|
DeadCode
|
||||||
|
DirectConnectionManagement
|
||||||
|
DoubleCheckedLocking
|
||||||
|
DoubleNegative
|
||||||
|
DuplicateCaseStatement
|
||||||
|
DuplicateImport
|
||||||
|
DuplicateListLiteral
|
||||||
|
DuplicateMapKey
|
||||||
|
DuplicateMapLiteral
|
||||||
|
/*
|
||||||
|
This is a pointless check
|
||||||
|
DuplicateNumberLiteral
|
||||||
|
/*
|
||||||
|
DuplicateSetValue
|
||||||
|
DuplicateStringLiteral
|
||||||
|
ElseBlockBraces
|
||||||
|
EmptyCatchBlock
|
||||||
|
EmptyClass
|
||||||
|
EmptyElseBlock
|
||||||
|
EmptyFinallyBlock
|
||||||
|
EmptyForStatement
|
||||||
|
EmptyIfStatement
|
||||||
|
EmptyInstanceInitializer
|
||||||
|
EmptyMethod
|
||||||
|
EmptyMethodInAbstractClass
|
||||||
|
EmptyStaticInitializer
|
||||||
|
EmptySwitchStatement
|
||||||
|
EmptySynchronizedStatement
|
||||||
|
EmptyTryBlock
|
||||||
|
EmptyWhileStatement
|
||||||
|
EnumCustomSerializationIgnored
|
||||||
|
EqualsAndHashCode
|
||||||
|
EqualsOverloaded
|
||||||
|
ExceptionExtendsError
|
||||||
|
ExceptionExtendsThrowable
|
||||||
|
ExceptionNotThrown
|
||||||
|
ExplicitArrayListInstantiation
|
||||||
|
ExplicitCallToAndMethod
|
||||||
|
ExplicitCallToCompareToMethod
|
||||||
|
ExplicitCallToDivMethod
|
||||||
|
ExplicitCallToEqualsMethod
|
||||||
|
ExplicitCallToGetAtMethod
|
||||||
|
ExplicitCallToLeftShiftMethod
|
||||||
|
ExplicitCallToMinusMethod
|
||||||
|
ExplicitCallToModMethod
|
||||||
|
ExplicitCallToMultiplyMethod
|
||||||
|
ExplicitCallToOrMethod
|
||||||
|
ExplicitCallToPlusMethod
|
||||||
|
ExplicitCallToPowerMethod
|
||||||
|
ExplicitCallToRightShiftMethod
|
||||||
|
ExplicitCallToXorMethod
|
||||||
|
ExplicitGarbageCollection
|
||||||
|
ExplicitHashMapInstantiation
|
||||||
|
ExplicitHashSetInstantiation
|
||||||
|
ExplicitLinkedHashMapInstantiation
|
||||||
|
ExplicitLinkedListInstantiation
|
||||||
|
ExplicitStackInstantiation
|
||||||
|
ExplicitTreeSetInstantiation
|
||||||
|
FactoryMethodName
|
||||||
|
FieldName
|
||||||
|
FileCreateTempFile
|
||||||
|
FileEndsWithoutNewline
|
||||||
|
FinalClassWithProtectedMember
|
||||||
|
ForLoopShouldBeWhileLoop
|
||||||
|
ForStatementBraces
|
||||||
|
GStringAsMapKey
|
||||||
|
GStringExpressionWithinString
|
||||||
|
GetterMethodCouldBeProperty
|
||||||
|
GrailsDomainHasEquals
|
||||||
|
GrailsDomainHasToString
|
||||||
|
GrailsDomainReservedSqlKeywordName
|
||||||
|
GrailsDomainWithServiceReference
|
||||||
|
GrailsDuplicateConstraint
|
||||||
|
GrailsDuplicateMapping
|
||||||
|
GrailsMassAssignment
|
||||||
|
GrailsPublicControllerMethod
|
||||||
|
GrailsServletContextReference
|
||||||
|
GrailsSessionReference // DEPRECATED
|
||||||
|
GrailsStatelessService
|
||||||
|
GroovyLangImmutable
|
||||||
|
HardCodedWindowsFileSeparator
|
||||||
|
HardCodedWindowsRootDirectory
|
||||||
|
HashtableIsObsolete
|
||||||
|
IfStatementBraces
|
||||||
|
/*
|
||||||
|
"Uh yes, we'd like you to make your code more unreadable"
|
||||||
|
IfStatementCouldBeTernary
|
||||||
|
*/
|
||||||
|
IllegalClassMember
|
||||||
|
IllegalClassReference
|
||||||
|
IllegalPackageReference
|
||||||
|
IllegalRegex
|
||||||
|
IllegalString
|
||||||
|
IllegalSubclass
|
||||||
|
ImplementationAsType
|
||||||
|
ImportFromSamePackage
|
||||||
|
ImportFromSunPackages
|
||||||
|
InconsistentPropertyLocking
|
||||||
|
InconsistentPropertySynchronization
|
||||||
|
InsecureRandom
|
||||||
|
/*
|
||||||
|
instanceof is useful in Groovy, not sure why this check is here
|
||||||
|
Instanceof
|
||||||
|
*/
|
||||||
|
IntegerGetInteger
|
||||||
|
InterfaceName
|
||||||
|
InterfaceNameSameAsSuperInterface
|
||||||
|
InvertedIfElse
|
||||||
|
JUnitAssertAlwaysFails
|
||||||
|
JUnitAssertAlwaysSucceeds
|
||||||
|
JUnitAssertEqualsConstantActualValue
|
||||||
|
JUnitFailWithoutMessage
|
||||||
|
JUnitLostTest
|
||||||
|
JUnitPublicField
|
||||||
|
JUnitPublicNonTestMethod
|
||||||
|
JUnitPublicProperty
|
||||||
|
JUnitSetUpCallsSuper
|
||||||
|
JUnitStyleAssertions
|
||||||
|
JUnitTearDownCallsSuper
|
||||||
|
JUnitTestMethodWithoutAssert
|
||||||
|
JUnitUnnecessarySetUp
|
||||||
|
JUnitUnnecessaryTearDown
|
||||||
|
JUnitUnnecessaryThrowsException
|
||||||
|
JavaIoPackageAccess
|
||||||
|
JdbcConnectionReference
|
||||||
|
JdbcResultSetReference
|
||||||
|
JdbcStatementReference
|
||||||
|
LineLength
|
||||||
|
LocaleSetDefault
|
||||||
|
LoggerForDifferentClass
|
||||||
|
LoggerWithWrongModifiers
|
||||||
|
LoggingSwallowsStacktrace
|
||||||
|
LongLiteralWithLowerCaseL
|
||||||
|
MethodCount
|
||||||
|
MethodName
|
||||||
|
MethodSize
|
||||||
|
MisorderedStaticImports
|
||||||
|
MissingBlankLineAfterImports
|
||||||
|
MissingBlankLineAfterPackage
|
||||||
|
MissingNewInThrowStatement
|
||||||
|
MultipleLoggers
|
||||||
|
MultipleUnaryOperators
|
||||||
|
NestedBlockDepth
|
||||||
|
NestedForLoop
|
||||||
|
NestedSynchronization
|
||||||
|
NoDef
|
||||||
|
NoWildcardImports
|
||||||
|
NonFinalPublicField
|
||||||
|
NonFinalSubclassOfSensitiveInterface
|
||||||
|
ObjectFinalize
|
||||||
|
ObjectOverrideMisspelledMethodName
|
||||||
|
PackageName
|
||||||
|
PackageNameMatchesFilePath
|
||||||
|
ParameterCount
|
||||||
|
ParameterName
|
||||||
|
ParameterReassignment
|
||||||
|
PrintStackTrace
|
||||||
|
Println
|
||||||
|
PrivateFieldCouldBeFinal
|
||||||
|
PropertyName
|
||||||
|
PublicFinalizeMethod
|
||||||
|
PublicInstanceField
|
||||||
|
RandomDoubleCoercedToZero
|
||||||
|
RemoveAllOnSelf
|
||||||
|
RequiredRegex
|
||||||
|
RequiredString
|
||||||
|
ReturnFromFinallyBlock
|
||||||
|
ReturnNullFromCatchBlock
|
||||||
|
ReturnsNullInsteadOfEmptyArray
|
||||||
|
ReturnsNullInsteadOfEmptyCollection
|
||||||
|
SerialPersistentFields
|
||||||
|
SerialVersionUID
|
||||||
|
/* Not needed
|
||||||
|
SerializableClassMustDefineSerialVersionUID
|
||||||
|
*/
|
||||||
|
SimpleDateFormatMissingLocale
|
||||||
|
SpaceAfterCatch
|
||||||
|
SpaceAfterClosingBrace
|
||||||
|
SpaceAfterComma
|
||||||
|
SpaceAfterFor
|
||||||
|
SpaceAfterIf
|
||||||
|
SpaceAfterOpeningBrace
|
||||||
|
SpaceAfterSemicolon
|
||||||
|
SpaceAfterSwitch
|
||||||
|
SpaceAfterWhile
|
||||||
|
SpaceAroundClosureArrow
|
||||||
|
/*
|
||||||
|
I prefer a little extra space
|
||||||
|
SpaceAroundMapEntryColon
|
||||||
|
*/
|
||||||
|
SpaceAroundOperator
|
||||||
|
SpaceBeforeClosingBrace
|
||||||
|
SpaceBeforeOpeningBrace
|
||||||
|
SpockIgnoreRestUsed
|
||||||
|
StatelessClass
|
||||||
|
StatelessSingleton
|
||||||
|
StaticCalendarField
|
||||||
|
StaticConnection
|
||||||
|
StaticDateFormatField
|
||||||
|
StaticMatcherField
|
||||||
|
StaticSimpleDateFormatField
|
||||||
|
SwallowThreadDeath
|
||||||
|
SynchronizedMethod
|
||||||
|
SynchronizedOnBoxedPrimitive
|
||||||
|
SynchronizedOnGetClass
|
||||||
|
SynchronizedOnReentrantLock
|
||||||
|
SynchronizedOnString
|
||||||
|
SynchronizedOnThis
|
||||||
|
SynchronizedReadObjectMethod
|
||||||
|
SystemErrPrint
|
||||||
|
SystemExit
|
||||||
|
SystemOutPrint
|
||||||
|
SystemRunFinalizersOnExit
|
||||||
|
TernaryCouldBeElvis
|
||||||
|
ThisReferenceEscapesConstructor
|
||||||
|
ThreadGroup
|
||||||
|
ThreadLocalNotStaticFinal
|
||||||
|
ThreadYield
|
||||||
|
ThrowError
|
||||||
|
ThrowException
|
||||||
|
ThrowExceptionFromFinallyBlock
|
||||||
|
ThrowNullPointerException
|
||||||
|
ThrowRuntimeException
|
||||||
|
ThrowThrowable
|
||||||
|
ToStringReturnsNull
|
||||||
|
TrailingWhitespace
|
||||||
|
UnnecessaryBigDecimalInstantiation
|
||||||
|
UnnecessaryBigIntegerInstantiation
|
||||||
|
UnnecessaryBooleanExpression
|
||||||
|
UnnecessaryBooleanInstantiation
|
||||||
|
UnnecessaryCallForLastElement
|
||||||
|
UnnecessaryCallToSubstring
|
||||||
|
UnnecessaryCast
|
||||||
|
UnnecessaryCatchBlock
|
||||||
|
UnnecessaryCollectCall
|
||||||
|
UnnecessaryCollectionCall
|
||||||
|
UnnecessaryConstructor
|
||||||
|
UnnecessaryDefInFieldDeclaration
|
||||||
|
UnnecessaryDefInMethodDeclaration
|
||||||
|
UnnecessaryDefInVariableDeclaration
|
||||||
|
UnnecessaryDotClass
|
||||||
|
UnnecessaryDoubleInstantiation
|
||||||
|
UnnecessaryElseStatement
|
||||||
|
UnnecessaryFail
|
||||||
|
UnnecessaryFinalOnPrivateMethod
|
||||||
|
UnnecessaryFloatInstantiation
|
||||||
|
UnnecessaryGString
|
||||||
|
UnnecessaryGetter
|
||||||
|
UnnecessaryGroovyImport
|
||||||
|
UnnecessaryIfStatement
|
||||||
|
UnnecessaryInstanceOfCheck
|
||||||
|
UnnecessaryInstantiationToGetClass
|
||||||
|
UnnecessaryIntegerInstantiation
|
||||||
|
UnnecessaryLongInstantiation
|
||||||
|
UnnecessaryModOne
|
||||||
|
UnnecessaryNullCheck
|
||||||
|
UnnecessaryNullCheckBeforeInstanceOf
|
||||||
|
UnnecessaryObjectReferences
|
||||||
|
UnnecessaryOverridingMethod
|
||||||
|
UnnecessaryPackageReference
|
||||||
|
UnnecessaryParenthesesForMethodCallWithClosure
|
||||||
|
UnnecessaryPublicModifier
|
||||||
|
/*
|
||||||
|
I quite like explicit return estatements
|
||||||
|
UnnecessaryReturnKeyword
|
||||||
|
*/
|
||||||
|
UnnecessarySafeNavigationOperator
|
||||||
|
UnnecessarySelfAssignment
|
||||||
|
UnnecessarySemicolon
|
||||||
|
UnnecessaryStringInstantiation
|
||||||
|
UnnecessarySubstring
|
||||||
|
UnnecessaryTernaryExpression
|
||||||
|
UnnecessaryToString
|
||||||
|
UnnecessaryTransientModifier
|
||||||
|
UnsafeArrayDeclaration
|
||||||
|
UnsafeImplementationAsMap
|
||||||
|
UnusedArray
|
||||||
|
UnusedImport
|
||||||
|
/*
|
||||||
|
Some interfaces require no use for the method parameter
|
||||||
|
UnusedMethodParameter
|
||||||
|
*/
|
||||||
|
UnusedObject
|
||||||
|
UnusedPrivateField
|
||||||
|
UnusedPrivateMethod
|
||||||
|
UnusedPrivateMethodParameter
|
||||||
|
UnusedVariable
|
||||||
|
UseAssertEqualsInsteadOfAssertTrue
|
||||||
|
UseAssertFalseInsteadOfNegation
|
||||||
|
UseAssertNullInsteadOfAssertEquals
|
||||||
|
UseAssertSameInsteadOfAssertTrue
|
||||||
|
UseAssertTrueInsteadOfAssertEquals
|
||||||
|
UseAssertTrueInsteadOfNegation
|
||||||
|
UseCollectMany
|
||||||
|
UseCollectNested
|
||||||
|
UseOfNotifyMethod
|
||||||
|
VariableName
|
||||||
|
VectorIsObsolete
|
||||||
|
VolatileArrayField
|
||||||
|
VolatileLongOrDoubleField
|
||||||
|
WaitOutsideOfWhileLoop
|
||||||
|
WhileStatementBraces
|
||||||
|
}
|
||||||
|
|
||||||
|
doNotApplyToClassNames = 'Main'
|
||||||
|
|
||||||
|
// vim: ft=groovy
|
|
@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
|
||||||
distributionPath=wrapper/dists
|
distributionPath=wrapper/dists
|
||||||
zipStoreBase=GRADLE_USER_HOME
|
zipStoreBase=GRADLE_USER_HOME
|
||||||
zipStorePath=wrapper/dists
|
zipStorePath=wrapper/dists
|
||||||
distributionUrl=https\://services.gradle.org/distributions/gradle-2.2.1-bin.zip
|
distributionUrl=https\://services.gradle.org/distributions/gradle-2.6-all.zip
|
||||||
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
= Verspätung
|
||||||
|
:toc: right
|
||||||
|
|
||||||
|
|
||||||
|
== Overview/Readme
|
||||||
|
|
||||||
|
include::../../../README.adoc[]
|
||||||
|
|
||||||
|
== Developing
|
||||||
|
|
||||||
|
=== Useful links
|
||||||
|
|
||||||
|
* link:groovydoc/[Internal API docs]
|
||||||
|
|
||||||
|
include::../../../HACKING.adoc[]
|
|
@ -1,21 +0,0 @@
|
||||||
package com.github.lookout.verspaetung
|
|
||||||
|
|
||||||
/**
|
|
||||||
* POJO containing the necessary information to model a Kafka broker
|
|
||||||
*/
|
|
||||||
class KafkaBroker {
|
|
||||||
private String host
|
|
||||||
private Integer port
|
|
||||||
private Integer brokerId
|
|
||||||
|
|
||||||
public KafkaBroker(Object jsonObject, Integer brokerId) {
|
|
||||||
this.host = jsonObject.host
|
|
||||||
this.port = jsonObject.port
|
|
||||||
this.brokerId = brokerId
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
String toString() {
|
|
||||||
return "broker<${this.brokerId}>@${this.host}:${this.port}"
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,146 +0,0 @@
|
||||||
package com.github.lookout.verspaetung
|
|
||||||
|
|
||||||
import groovy.transform.TypeChecked
|
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
|
||||||
import org.slf4j.Logger
|
|
||||||
import org.slf4j.LoggerFactory
|
|
||||||
|
|
||||||
import kafka.cluster.Broker
|
|
||||||
import kafka.client.ClientUtils
|
|
||||||
import kafka.consumer.SimpleConsumer
|
|
||||||
import kafka.common.TopicAndPartition
|
|
||||||
import kafka.javaapi.*
|
|
||||||
/* UGH */
|
|
||||||
import scala.collection.JavaConversions
|
|
||||||
|
|
||||||
/* Can't type check this because it makes the calls in and out of Scala an
|
|
||||||
* atrocious pain in the ass
|
|
||||||
*/
|
|
||||||
//@TypeChecked
|
|
||||||
class KafkaPoller extends Thread {
|
|
||||||
private final String KAFKA_CLIENT_ID = 'VerspaetungClient'
|
|
||||||
private final Integer KAFKA_TIMEOUT = (5 * 1000)
|
|
||||||
private final Integer KAFKA_BUFFER = (100 * 1024)
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(KafkaPoller.class)
|
|
||||||
|
|
||||||
private Boolean keepRunning = true
|
|
||||||
private Boolean shouldReconnect = false
|
|
||||||
private ConcurrentHashMap<Integer, SimpleConsumer> brokerConsumerMap = [:]
|
|
||||||
private List<Broker> brokers = []
|
|
||||||
private AbstractMap<TopicPartition, List<zk.ConsumerOffset>> consumersMap
|
|
||||||
private List<Closure> onDelta = []
|
|
||||||
|
|
||||||
KafkaPoller(AbstractMap map) {
|
|
||||||
this.consumersMap = map
|
|
||||||
}
|
|
||||||
|
|
||||||
void run() {
|
|
||||||
logger.info("Starting wait loop")
|
|
||||||
while (keepRunning) {
|
|
||||||
logger.debug("poll loop")
|
|
||||||
|
|
||||||
if (shouldReconnect) {
|
|
||||||
reconnect()
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.consumersMap.size() > 0) {
|
|
||||||
dumpMetadata()
|
|
||||||
}
|
|
||||||
|
|
||||||
Thread.sleep(1 * 1000)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void dumpMetadata() {
|
|
||||||
logger.debug("dumping meta-data")
|
|
||||||
|
|
||||||
def topics = this.consumersMap.keySet().collect { TopicPartition k -> k.topic }
|
|
||||||
def metadata = ClientUtils.fetchTopicMetadata(toScalaSet(new HashSet(topics)),
|
|
||||||
brokersSeq,
|
|
||||||
KAFKA_CLIENT_ID,
|
|
||||||
KAFKA_TIMEOUT,
|
|
||||||
0)
|
|
||||||
|
|
||||||
withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f ->
|
|
||||||
withScalaCollection(f.partitionsMetadata).each { p ->
|
|
||||||
Long offset = latestFromLeader(p.leader.get()?.id, f.topic, p.partitionId)
|
|
||||||
TopicPartition tp = new TopicPartition(f.topic, p.partitionId)
|
|
||||||
|
|
||||||
this.consumersMap[tp].each { zk.ConsumerOffset c ->
|
|
||||||
logger.debug("Values for ${c.groupName} on ${tp.topic}:${tp.partition}: ${offset} - ${c.offset}")
|
|
||||||
Long delta = offset - c.offset
|
|
||||||
this.onDelta.each { Closure callback ->
|
|
||||||
callback.call(c.groupName, tp, delta)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.debug("finished dumping meta-data")
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
Long latestFromLeader(Integer leaderId, String topic, Integer partition) {
|
|
||||||
SimpleConsumer consumer = this.brokerConsumerMap[leaderId]
|
|
||||||
TopicAndPartition topicAndPart = new TopicAndPartition(topic, partition)
|
|
||||||
/* XXX: A zero clientId into this method might not be right */
|
|
||||||
return consumer.earliestOrLatestOffset(topicAndPart, -1, 0)
|
|
||||||
}
|
|
||||||
|
|
||||||
Iterable withScalaCollection(scala.collection.Iterable iter) {
|
|
||||||
return JavaConversions.asJavaIterable(iter)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Blocking reconnect to the Kafka brokers
|
|
||||||
*/
|
|
||||||
void reconnect() {
|
|
||||||
logger.info("Creating SimpleConsumer connections for brokers")
|
|
||||||
this.brokers.each { Broker b ->
|
|
||||||
SimpleConsumer consumer = new SimpleConsumer(b.host,
|
|
||||||
b.port,
|
|
||||||
KAFKA_TIMEOUT,
|
|
||||||
KAFKA_BUFFER,
|
|
||||||
KAFKA_CLIENT_ID)
|
|
||||||
consumer.connect()
|
|
||||||
this.brokerConsumerMap[b.id] = consumer
|
|
||||||
}
|
|
||||||
this.shouldReconnect =false
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the brokers list as an immutable Seq collection for the Kafka
|
|
||||||
* scala underpinnings
|
|
||||||
*/
|
|
||||||
scala.collection.immutable.Seq getBrokersSeq() {
|
|
||||||
return JavaConversions.asScalaBuffer(this.brokers).toList()
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Return scala.collection.mutable.Set for the given List
|
|
||||||
*/
|
|
||||||
scala.collection.mutable.Set toScalaSet(Set set) {
|
|
||||||
return JavaConversions.asScalaSet(set)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Signal the runloop to safely die after it's next iteration
|
|
||||||
*/
|
|
||||||
void die() {
|
|
||||||
this.keepRunning = false
|
|
||||||
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
|
|
||||||
client.disconnect()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Store a new list of KafkaBroker objects and signal a reconnection
|
|
||||||
*/
|
|
||||||
void refresh(List<KafkaBroker> brokers) {
|
|
||||||
this.brokers = brokers.collect { KafkaBroker b ->
|
|
||||||
new Broker(b.brokerId, b.host, b.port)
|
|
||||||
}
|
|
||||||
this.shouldReconnect = true
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,174 +0,0 @@
|
||||||
package com.github.lookout.verspaetung
|
|
||||||
|
|
||||||
import com.github.lookout.verspaetung.zk.BrokerTreeWatcher
|
|
||||||
import com.github.lookout.verspaetung.zk.KafkaSpoutTreeWatcher
|
|
||||||
import com.github.lookout.verspaetung.zk.StandardTreeWatcher
|
|
||||||
|
|
||||||
import java.util.AbstractMap
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
|
||||||
import groovy.transform.TypeChecked
|
|
||||||
|
|
||||||
import com.timgroup.statsd.StatsDClient
|
|
||||||
import com.timgroup.statsd.NonBlockingDogStatsDClient
|
|
||||||
|
|
||||||
import org.apache.commons.cli.*
|
|
||||||
import org.apache.curator.retry.ExponentialBackoffRetry
|
|
||||||
import org.apache.curator.framework.CuratorFrameworkFactory
|
|
||||||
import org.apache.curator.framework.CuratorFramework
|
|
||||||
import org.apache.curator.framework.recipes.cache.TreeCache
|
|
||||||
import org.slf4j.Logger
|
|
||||||
import org.slf4j.LoggerFactory
|
|
||||||
|
|
||||||
class Main {
|
|
||||||
private static final String METRICS_PREFIX = 'verspaetung'
|
|
||||||
|
|
||||||
private static StatsDClient statsd
|
|
||||||
private static Logger logger
|
|
||||||
|
|
||||||
static void main(String[] args) {
|
|
||||||
String zookeeperHosts = 'localhost:2181'
|
|
||||||
String statsdHost = 'localhost'
|
|
||||||
Integer statsdPort = 8125
|
|
||||||
|
|
||||||
CommandLine cli = parseCommandLine(args)
|
|
||||||
|
|
||||||
if (cli.hasOption('z')) {
|
|
||||||
zookeeperHosts = cli.getOptionValue('z')
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cli.hasOption('H')) {
|
|
||||||
statsdHost = cli.getOptionValue('H')
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cli.hasOption('p')) {
|
|
||||||
statsdPort = cli.getOptionValue('p')
|
|
||||||
}
|
|
||||||
|
|
||||||
logger = LoggerFactory.getLogger(Main.class)
|
|
||||||
logger.info("Running with: ${args}")
|
|
||||||
logger.warn("Using: zookeepers=${zookeeperHosts} statsd=${statsdHost}:${statsdPort}")
|
|
||||||
|
|
||||||
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3)
|
|
||||||
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperHosts, retry)
|
|
||||||
ConcurrentHashMap<TopicPartition, List<zk.ConsumerOffset>> consumers = new ConcurrentHashMap()
|
|
||||||
|
|
||||||
statsd = new NonBlockingDogStatsDClient(METRICS_PREFIX, statsdHost, statsdPort)
|
|
||||||
|
|
||||||
client.start()
|
|
||||||
|
|
||||||
KafkaPoller poller = setupKafkaPoller(consumers, statsd, cli.hasOption('n'))
|
|
||||||
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client).start()
|
|
||||||
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, consumers).start()
|
|
||||||
|
|
||||||
/* Assuming that most people aren't needing to run Storm-based watchers
|
|
||||||
* as well
|
|
||||||
*/
|
|
||||||
if (cli.hasOption('s')) {
|
|
||||||
KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, consumers)
|
|
||||||
stormWatcher.start()
|
|
||||||
}
|
|
||||||
|
|
||||||
consumerWatcher.onInitComplete << {
|
|
||||||
logger.info("standard consumers initialized to ${consumers.size()} (topic, partition) tuples")
|
|
||||||
}
|
|
||||||
|
|
||||||
brokerWatcher.onBrokerUpdates << { brokers ->
|
|
||||||
poller.refresh(brokers)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("Started wait loop...")
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
statsd?.recordGaugeValue('heartbeat', 1)
|
|
||||||
Thread.sleep(1 * 1000)
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("exiting..")
|
|
||||||
poller.die()
|
|
||||||
poller.join()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create and start a KafkaPoller with the given statsd and consumers map
|
|
||||||
*/
|
|
||||||
static KafkaPoller setupKafkaPoller(AbstractMap consumers,
|
|
||||||
NonBlockingDogStatsDClient statsd,
|
|
||||||
Boolean dryRun) {
|
|
||||||
KafkaPoller poller = new KafkaPoller(consumers)
|
|
||||||
Closure deltaCallback = { String name, TopicPartition tp, Long delta ->
|
|
||||||
println "${tp.topic}:${tp.partition}-${name} = ${delta}"
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!dryRun) {
|
|
||||||
deltaCallback = { String name, TopicPartition tp, Long delta ->
|
|
||||||
statsd.recordGaugeValue(tp.topic, delta, [
|
|
||||||
'topic' : tp.topic,
|
|
||||||
'partition' : tp.partition,
|
|
||||||
'consumer-group' : name
|
|
||||||
])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
poller.onDelta << deltaCallback
|
|
||||||
poller.start()
|
|
||||||
return poller
|
|
||||||
}
|
|
||||||
|
|
||||||
static Options createCLI() {
|
|
||||||
Options options = new Options()
|
|
||||||
|
|
||||||
Option zookeeper = OptionBuilder.withArgName('HOSTS')
|
|
||||||
.hasArg()
|
|
||||||
.withDescription('Comma separated list of Zookeeper hosts (e.g. localhost:2181)')
|
|
||||||
.withLongOpt('zookeeper')
|
|
||||||
.withValueSeparator(',' as char)
|
|
||||||
.create('z')
|
|
||||||
|
|
||||||
Option statsdHost = OptionBuilder.withArgName('STATSD')
|
|
||||||
.hasArg()
|
|
||||||
.withType(String)
|
|
||||||
.withDescription('Hostname for a statsd instance (defaults to localhost)')
|
|
||||||
.withLongOpt('statsd-host')
|
|
||||||
.create('H')
|
|
||||||
|
|
||||||
Option statsdPort = OptionBuilder.withArgName('PORT')
|
|
||||||
.hasArg()
|
|
||||||
.withType(Integer)
|
|
||||||
.withDescription('Port for the statsd instance (defaults to 8125)')
|
|
||||||
.withLongOpt('statsd-port')
|
|
||||||
.create('p')
|
|
||||||
|
|
||||||
Option dryRun = OptionBuilder.withDescription('Disable reporting to a statsd host')
|
|
||||||
.withLongOpt('dry-run')
|
|
||||||
.create('n')
|
|
||||||
|
|
||||||
Option stormSpouts = OptionBuilder.withDescription('Watch Storm KafkaSpout offsets (under /kafka_spout)')
|
|
||||||
.withLongOpt('storm')
|
|
||||||
.create('s')
|
|
||||||
|
|
||||||
options.addOption(zookeeper)
|
|
||||||
options.addOption(statsdHost)
|
|
||||||
options.addOption(statsdPort)
|
|
||||||
options.addOption(dryRun)
|
|
||||||
options.addOption(stormSpouts)
|
|
||||||
|
|
||||||
return options
|
|
||||||
}
|
|
||||||
|
|
||||||
static CommandLine parseCommandLine(String[] args) {
|
|
||||||
Options options = createCLI()
|
|
||||||
PosixParser parser = new PosixParser()
|
|
||||||
|
|
||||||
try {
|
|
||||||
return parser.parse(options, args)
|
|
||||||
}
|
|
||||||
catch (MissingOptionException|UnrecognizedOptionException ex) {
|
|
||||||
HelpFormatter formatter = new HelpFormatter()
|
|
||||||
println ex.message
|
|
||||||
formatter.printHelp('verspaetung', options)
|
|
||||||
System.exit(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,95 +0,0 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
|
||||||
|
|
||||||
import com.github.lookout.verspaetung.KafkaBroker
|
|
||||||
|
|
||||||
import groovy.json.JsonSlurper
|
|
||||||
import groovy.transform.TypeChecked
|
|
||||||
import java.util.Collections
|
|
||||||
|
|
||||||
import org.apache.curator.framework.CuratorFramework
|
|
||||||
import org.apache.curator.framework.recipes.cache.ChildData
|
|
||||||
import org.apache.curator.framework.recipes.cache.TreeCache
|
|
||||||
import org.apache.curator.framework.recipes.cache.TreeCacheListener
|
|
||||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
|
||||||
import org.slf4j.Logger
|
|
||||||
import org.slf4j.LoggerFactory
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The BrokerTreeWatcher is a kind of watcher whose sole purpose is
|
|
||||||
* to watch the segment of the Zookeeper tree where Kafka stores broker
|
|
||||||
* information
|
|
||||||
*/
|
|
||||||
@TypeChecked
|
|
||||||
class BrokerTreeWatcher extends AbstractTreeWatcher {
|
|
||||||
static final Integer INVALID_BROKER_ID = -1
|
|
||||||
private static final String BROKERS_PATH = '/brokers/ids'
|
|
||||||
|
|
||||||
private final Logger logger = LoggerFactory.getLogger(BrokerTreeWatcher.class)
|
|
||||||
private JsonSlurper json
|
|
||||||
private List<Closure> onBrokerUpdates
|
|
||||||
private Boolean isTreeInitialized = false
|
|
||||||
private List<KafkaBroker> brokers
|
|
||||||
|
|
||||||
BrokerTreeWatcher(CuratorFramework client) {
|
|
||||||
super(client)
|
|
||||||
|
|
||||||
this.json = new JsonSlurper()
|
|
||||||
this.brokers = []
|
|
||||||
this.onBrokerUpdates = []
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
String zookeeperPath() {
|
|
||||||
return BROKERS_PATH
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Process events like NODE_ADDED and NODE_REMOVED to keep an up to date
|
|
||||||
* list of brokers
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
void childEvent(CuratorFramework client, TreeCacheEvent event) {
|
|
||||||
/* If we're initialized that means we should have all our brokers in
|
|
||||||
* our internal list already and we can fire an event
|
|
||||||
*/
|
|
||||||
if (event.type == TreeCacheEvent.Type.INITIALIZED) {
|
|
||||||
this.isTreeInitialized = true
|
|
||||||
List threadsafeBrokers = Collections.synchronizedList(this.brokers)
|
|
||||||
this.onBrokerUpdates.each { Closure c ->
|
|
||||||
c?.call(threadsafeBrokers)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (event.type != TreeCacheEvent.Type.NODE_ADDED) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ChildData nodeData = event.data
|
|
||||||
Integer brokerId = brokerIdFromPath(nodeData.path)
|
|
||||||
|
|
||||||
if (brokerId == INVALID_BROKER_ID) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8'))
|
|
||||||
|
|
||||||
this.brokers << new KafkaBroker(brokerData, brokerId)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Process a path string from Zookeeper for the Kafka broker's ID
|
|
||||||
*
|
|
||||||
* We're expecting paths like: /brokers/ids/1231524312
|
|
||||||
*/
|
|
||||||
Integer brokerIdFromPath(String path) {
|
|
||||||
List<String> pathParts = path?.split('/') as List
|
|
||||||
|
|
||||||
if ((pathParts == null) ||
|
|
||||||
(pathParts.size() != 4)) {
|
|
||||||
return INVALID_BROKER_ID
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Integer(pathParts[-1])
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,40 @@
|
||||||
|
package com.github.reiseburo.verspaetung
|
||||||
|
|
||||||
|
/**
|
||||||
|
* abstract the logic on how to reduce the polling speed and get back to
|
||||||
|
* to full speed.
|
||||||
|
*/
|
||||||
|
class Delay {
|
||||||
|
static final Integer POLLER_DELAY_MIN = (1 * 1000)
|
||||||
|
static final Integer POLLER_DELAY_MAX = (2048 * 1000) // about half an hour
|
||||||
|
|
||||||
|
private Integer delay = POLLER_DELAY_MIN
|
||||||
|
|
||||||
|
boolean reset() {
|
||||||
|
if (delay != POLLER_DELAY_MIN) {
|
||||||
|
delay = POLLER_DELAY_MIN
|
||||||
|
true
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean slower() {
|
||||||
|
if (delay < POLLER_DELAY_MAX) {
|
||||||
|
delay += delay
|
||||||
|
true
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Integer value() {
|
||||||
|
delay
|
||||||
|
}
|
||||||
|
|
||||||
|
String toString() {
|
||||||
|
"Delay[ ${delay / 1000} sec ]"
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
package com.github.reiseburo.verspaetung
|
||||||
|
|
||||||
|
/**
|
||||||
|
* POJO containing the necessary information to model a Kafka broker
|
||||||
|
*/
|
||||||
|
class KafkaBroker {
|
||||||
|
final private String host
|
||||||
|
final private Integer port
|
||||||
|
final private Integer brokerId
|
||||||
|
|
||||||
|
KafkaBroker(String host, Integer port, Integer brokerId) {
|
||||||
|
this.host = host
|
||||||
|
this.port = port
|
||||||
|
this.brokerId = brokerId
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
int hashCode() {
|
||||||
|
return this.brokerId
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean equals(Object compared) {
|
||||||
|
if (this.is(compared)) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(compared instanceof KafkaBroker)) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return compared.brokerId == brokerId
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
String toString() {
|
||||||
|
return "broker<${this.brokerId}>@${this.host}:${this.port}"
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
package com.github.reiseburo.verspaetung
|
||||||
|
|
||||||
|
/**
|
||||||
|
* POJO containing the necessary information to model a Kafka consumers
|
||||||
|
*/
|
||||||
|
class KafkaConsumer {
|
||||||
|
String topic
|
||||||
|
Integer partition
|
||||||
|
String name
|
||||||
|
|
||||||
|
KafkaConsumer(String topic, Integer partition, String name) {
|
||||||
|
this.topic = topic
|
||||||
|
this.partition = partition
|
||||||
|
this.name = name
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
String toString() {
|
||||||
|
return "KafkaConsumer<${topic}:${partition} - ${name}>"
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
int hashCode() {
|
||||||
|
return Objects.hash(this.topic, this.partition, this.name)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return true for any two KafkaConsumer instances which have the same
|
||||||
|
* topic, partition and name properties
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
boolean equals(Object compared) {
|
||||||
|
/* bail early for object identity */
|
||||||
|
if (this.is(compared)) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!(compared instanceof KafkaConsumer)) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return (this.topic == compared.topic) &&
|
||||||
|
(this.partition == compared.partition) &&
|
||||||
|
(this.name == compared.name)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,242 @@
|
||||||
|
package com.github.reiseburo.verspaetung
|
||||||
|
|
||||||
|
import org.slf4j.Logger
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
|
import kafka.cluster.Broker
|
||||||
|
import kafka.client.ClientUtils
|
||||||
|
import kafka.consumer.SimpleConsumer
|
||||||
|
import kafka.common.TopicAndPartition
|
||||||
|
import kafka.common.KafkaException
|
||||||
|
/* UGH */
|
||||||
|
import scala.collection.JavaConversions
|
||||||
|
|
||||||
|
/**
|
||||||
|
* KafkaPoller is a relatively simple class which contains a runloop for periodically
|
||||||
|
* contacting the Kafka brokers defined in Zookeeper and fetching the latest topic
|
||||||
|
* meta-data for them
|
||||||
|
*/
|
||||||
|
class KafkaPoller extends Thread {
|
||||||
|
|
||||||
|
private static final String KAFKA_CLIENT_ID = 'VerspaetungClient'
|
||||||
|
private static final Integer KAFKA_TIMEOUT = (5 * 1000)
|
||||||
|
private static final Integer KAFKA_BUFFER = (100 * 1024)
|
||||||
|
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPoller)
|
||||||
|
|
||||||
|
private Boolean keepRunning = true
|
||||||
|
private Boolean shouldReconnect = false
|
||||||
|
private final AbstractMap<Integer, SimpleConsumer> brokerConsumerMap
|
||||||
|
private final AbstractMap<TopicPartition, Long> topicOffsetMap
|
||||||
|
private final List<Closure> onDelta
|
||||||
|
private final AbstractSet<String> currentTopics
|
||||||
|
private final List<Broker> brokers
|
||||||
|
|
||||||
|
KafkaPoller(AbstractMap map, AbstractSet topicSet) {
|
||||||
|
this.topicOffsetMap = map
|
||||||
|
this.currentTopics = topicSet
|
||||||
|
this.brokerConsumerMap = [:]
|
||||||
|
this.brokers = []
|
||||||
|
this.onDelta = []
|
||||||
|
setName('Verspaetung Kafka Poller')
|
||||||
|
}
|
||||||
|
|
||||||
|
/* There are a number of cases where we intentionally swallow stacktraces
|
||||||
|
* to ensure that we're not unnecessarily spamming logs with useless stacktraces
|
||||||
|
*
|
||||||
|
* For CatchException, we're explicitly gobbling up all potentially crashing exceptions
|
||||||
|
* here
|
||||||
|
*/
|
||||||
|
@SuppressWarnings(['LoggingSwallowsStacktrace', 'CatchException'])
|
||||||
|
void run() {
|
||||||
|
LOGGER.info('Starting wait loop')
|
||||||
|
Delay delay = new Delay()
|
||||||
|
LOGGER.error('polling ' + delay)
|
||||||
|
while (keepRunning) {
|
||||||
|
LOGGER.debug('poll loop')
|
||||||
|
|
||||||
|
if (shouldReconnect) {
|
||||||
|
reconnect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Only makes sense to try to dump meta-data if we've got some
|
||||||
|
* topics that we should keep an eye on
|
||||||
|
*/
|
||||||
|
if (this.currentTopics.size() > 0) {
|
||||||
|
try {
|
||||||
|
dumpMetadata()
|
||||||
|
if (delay.reset()) {
|
||||||
|
LOGGER.error('back to normal ' + delay)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (KafkaException kex) {
|
||||||
|
LOGGER.error('Failed to interact with Kafka: {}', kex.message)
|
||||||
|
slower(delay)
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
LOGGER.error('Failed to fetch and dump Kafka metadata', ex)
|
||||||
|
slower(delay)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Thread.sleep(delay.value())
|
||||||
|
}
|
||||||
|
disconnectConsumers()
|
||||||
|
}
|
||||||
|
|
||||||
|
private void slower(Delay delay) {
|
||||||
|
if (delay.slower()) {
|
||||||
|
LOGGER.error('using ' + delay)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings('CatchException')
|
||||||
|
private void dumpMetadata() {
|
||||||
|
LOGGER.debug('dumping meta-data')
|
||||||
|
|
||||||
|
Object metadata = fetchMetadataForCurrentTopics()
|
||||||
|
|
||||||
|
withTopicsAndPartitions(metadata) { tp, p ->
|
||||||
|
try {
|
||||||
|
captureLatestOffsetFor(tp, p)
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
LOGGER.error('Failed to fetch latest for {}:{}', tp.topic, tp.partition, ex)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOGGER.debug('finished dumping meta-data')
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoke the given closure with the TopicPartition and Partition meta-data
|
||||||
|
* informationn for all of the topic meta-data that was passed in.
|
||||||
|
*
|
||||||
|
* The 'metadata' is the expected return from
|
||||||
|
* kafka.client.ClientUtils.fetchTopicMetadata
|
||||||
|
*/
|
||||||
|
private void withTopicsAndPartitions(Object metadata, Closure closure) {
|
||||||
|
withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f ->
|
||||||
|
withScalaCollection(f.partitionsMetadata).each { p ->
|
||||||
|
TopicPartition tp = new TopicPartition(f.topic, p.partitionId)
|
||||||
|
closure.call(tp, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch the leader metadata and update our data structures
|
||||||
|
*/
|
||||||
|
private void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) {
|
||||||
|
Integer leaderId = partitionMetadata.leader.get()?.id
|
||||||
|
Integer partitionId = partitionMetadata.partitionId
|
||||||
|
|
||||||
|
Long offset = latestFromLeader(leaderId, tp.topic, partitionId)
|
||||||
|
|
||||||
|
this.topicOffsetMap[tp] = offset
|
||||||
|
}
|
||||||
|
|
||||||
|
private Long latestFromLeader(Integer leaderId, String topic, Integer partition) {
|
||||||
|
SimpleConsumer consumer = this.brokerConsumerMap[leaderId]
|
||||||
|
|
||||||
|
/* If we don't have a proper SimpleConsumer instance (e.g. null) then
|
||||||
|
* we might not have gotten valid data back from Zookeeper
|
||||||
|
*/
|
||||||
|
if (!(consumer instanceof SimpleConsumer)) {
|
||||||
|
LOGGER.warn('Attempted to the leaderId: {} ({}/{})', leaderId, topic, partition)
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
TopicAndPartition topicAndPart = new TopicAndPartition(topic, partition)
|
||||||
|
/* XXX: A zero clientId into this method might not be right */
|
||||||
|
return consumer.earliestOrLatestOffset(topicAndPart, -1, 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
private Iterable withScalaCollection(scala.collection.Iterable iter) {
|
||||||
|
return JavaConversions.asJavaIterable(iter)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Blocking reconnect to the Kafka brokers
|
||||||
|
*/
|
||||||
|
@SuppressWarnings('CatchException')
|
||||||
|
private void reconnect() {
|
||||||
|
disconnectConsumers()
|
||||||
|
LOGGER.info('Creating SimpleConsumer connections for brokers {}', this.brokers)
|
||||||
|
synchronized(this.brokers) {
|
||||||
|
this.brokers.each { Broker broker ->
|
||||||
|
SimpleConsumer consumer = new SimpleConsumer(broker.host,
|
||||||
|
broker.port,
|
||||||
|
KAFKA_TIMEOUT,
|
||||||
|
KAFKA_BUFFER,
|
||||||
|
KAFKA_CLIENT_ID)
|
||||||
|
try {
|
||||||
|
consumer.connect()
|
||||||
|
this.brokerConsumerMap[broker.id] = consumer
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
LOGGER.info('Error connecting cunsumer to {}', broker, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
this.shouldReconnect = false
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Signal the runloop to safely die after it's next iteration
|
||||||
|
*/
|
||||||
|
void die() {
|
||||||
|
this.keepRunning = false
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings('CatchException')
|
||||||
|
private void disconnectConsumers() {
|
||||||
|
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
|
||||||
|
LOGGER.info('Disconnecting {}', client)
|
||||||
|
try {
|
||||||
|
client?.disconnect()
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
LOGGER.info('Error disconnecting {}', client, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store a new list of KafkaBroker objects and signal a reconnection
|
||||||
|
*/
|
||||||
|
void refresh(List<KafkaBroker> brokers) {
|
||||||
|
synchronized(this.brokers) {
|
||||||
|
this.brokers.clear()
|
||||||
|
this.brokers.addAll(brokers.collect { KafkaBroker b ->
|
||||||
|
new Broker(b.brokerId, b.host, b.port)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
this.shouldReconnect = true
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the brokers list as an immutable Seq collection for the Kafka
|
||||||
|
* scala underpinnings
|
||||||
|
*/
|
||||||
|
private scala.collection.immutable.Seq getBrokersSeq() {
|
||||||
|
synchronized(this.brokers) {
|
||||||
|
return JavaConversions.asScalaBuffer(this.brokers).toList()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return scala.collection.mutable.Set for the given List
|
||||||
|
*/
|
||||||
|
private scala.collection.mutable.Set toScalaSet(Set set) {
|
||||||
|
return JavaConversions.asScalaSet(set)
|
||||||
|
}
|
||||||
|
|
||||||
|
private Object fetchMetadataForCurrentTopics() {
|
||||||
|
return ClientUtils.fetchTopicMetadata(
|
||||||
|
toScalaSet(currentTopics),
|
||||||
|
brokersSeq,
|
||||||
|
KAFKA_CLIENT_ID,
|
||||||
|
KAFKA_TIMEOUT,
|
||||||
|
0)
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,287 @@
|
||||||
|
package com.github.reiseburo.verspaetung
|
||||||
|
|
||||||
|
import com.github.reiseburo.verspaetung.zk.BrokerTreeWatcher
|
||||||
|
import com.github.reiseburo.verspaetung.zk.KafkaSpoutTreeWatcher
|
||||||
|
import com.github.reiseburo.verspaetung.zk.StandardTreeWatcher
|
||||||
|
import com.github.reiseburo.verspaetung.metrics.ConsumerGauge
|
||||||
|
import com.github.reiseburo.verspaetung.metrics.HeartbeatGauge
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.concurrent.ConcurrentSkipListSet
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
import org.apache.commons.cli.*
|
||||||
|
import org.apache.curator.retry.ExponentialBackoffRetry
|
||||||
|
import org.apache.curator.framework.CuratorFrameworkFactory
|
||||||
|
import org.apache.curator.framework.CuratorFramework
|
||||||
|
import org.coursera.metrics.datadog.DatadogReporter
|
||||||
|
import org.coursera.metrics.datadog.transport.UdpTransport
|
||||||
|
import org.slf4j.Logger
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
|
import com.codahale.metrics.*
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Main entry point for running the verspaetung application
|
||||||
|
*/
|
||||||
|
@SuppressWarnings
|
||||||
|
class Main {
|
||||||
|
private static final String METRICS_PREFIX = 'verspaetung'
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(Main)
|
||||||
|
private static final MetricRegistry registry = new MetricRegistry()
|
||||||
|
private static ScheduledReporter reporter
|
||||||
|
|
||||||
|
static void main(String[] args) {
|
||||||
|
String statsdPrefix = METRICS_PREFIX
|
||||||
|
String zookeeperHosts = 'localhost:2181'
|
||||||
|
String statsdHost = 'localhost'
|
||||||
|
Integer statsdPort = 8125
|
||||||
|
Integer delayInSeconds = 5
|
||||||
|
String[] excludeGroups = []
|
||||||
|
|
||||||
|
CommandLine cli = parseCommandLine(args)
|
||||||
|
|
||||||
|
if (cli.hasOption('z')) {
|
||||||
|
zookeeperHosts = cli.getOptionValue('z')
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cli.hasOption('H')) {
|
||||||
|
statsdHost = cli.getOptionValue('H')
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cli.hasOption('p')) {
|
||||||
|
statsdPort = cli.getOptionValue('p')
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cli.hasOption('d')) {
|
||||||
|
delayInSeconds = cli.getOptionValue('d').toInteger()
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cli.hasOption('x')) {
|
||||||
|
excludeGroups = cli.getOptionValues('x')
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("Running with: ${args}")
|
||||||
|
logger.warn('Using: zookeepers={} statsd={}:{}', zookeeperHosts, statsdHost, statsdPort)
|
||||||
|
logger.info('Reporting every {} seconds', delayInSeconds)
|
||||||
|
|
||||||
|
if (cli.hasOption('prefix')) {
|
||||||
|
statsdPrefix = "${cli.getOptionValue('prefix')}.${METRICS_PREFIX}"
|
||||||
|
}
|
||||||
|
|
||||||
|
registry.register(MetricRegistry.name(Main.class, 'heartbeat'),
|
||||||
|
new HeartbeatGauge())
|
||||||
|
|
||||||
|
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3)
|
||||||
|
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperHosts, retry)
|
||||||
|
client.start()
|
||||||
|
|
||||||
|
/* We need a good shared set of all the topics we should keep an eye on
|
||||||
|
* for the Kafka poller. This will be written to by the tree watchers
|
||||||
|
* and read from by the poller, e.g.
|
||||||
|
* Watcher --/write/--> watchedTopics --/read/--> KafkaPoller
|
||||||
|
*/
|
||||||
|
ConcurrentSkipListSet<String> watchedTopics = new ConcurrentSkipListSet<>()
|
||||||
|
|
||||||
|
/* consumerOffsets is where we will keep all the offsets from Zookeeper
|
||||||
|
* from the Kafka consumers
|
||||||
|
*/
|
||||||
|
ConcurrentHashMap<KafkaConsumer, Integer> consumerOffsets = new ConcurrentHashMap<>()
|
||||||
|
|
||||||
|
/* topicOffsets is where the KafkaPoller should be writing all of it's
|
||||||
|
* latest offsets from querying the Kafka brokers
|
||||||
|
*/
|
||||||
|
ConcurrentHashMap<TopicPartition, Long> topicOffsets = new ConcurrentHashMap<>()
|
||||||
|
|
||||||
|
/* Hash map for keeping track of KafkaConsumer to ConsumerGauge
|
||||||
|
* instances. We're only really doing this because the MetricRegistry
|
||||||
|
* doesn't do a terrific job of exposing this for us
|
||||||
|
*/
|
||||||
|
ConcurrentHashMap<KafkaConsumer, ConsumerGauge> consumerGauges = new ConcurrentHashMap<>()
|
||||||
|
|
||||||
|
|
||||||
|
KafkaPoller poller = new KafkaPoller(topicOffsets, watchedTopics)
|
||||||
|
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client).start()
|
||||||
|
brokerWatcher.onBrokerUpdates << { brokers -> poller.refresh(brokers) }
|
||||||
|
|
||||||
|
poller.start()
|
||||||
|
|
||||||
|
/* Need to reuse this closure for the KafkaSpoutTreeWatcher if we have
|
||||||
|
* one
|
||||||
|
*/
|
||||||
|
Closure gaugeRegistrar = { KafkaConsumer consumer ->
|
||||||
|
if (!shouldExcludeConsumer(excludeGroups, consumer)) {
|
||||||
|
registerMetricFor(consumer, consumerGauges, consumerOffsets, topicOffsets)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client,
|
||||||
|
watchedTopics,
|
||||||
|
consumerOffsets)
|
||||||
|
consumerWatcher.onConsumerData << gaugeRegistrar
|
||||||
|
consumerWatcher.start()
|
||||||
|
|
||||||
|
|
||||||
|
/* Assuming that most people aren't needing to run Storm-based watchers
|
||||||
|
* as well
|
||||||
|
*/
|
||||||
|
KafkaSpoutTreeWatcher stormWatcher = null
|
||||||
|
if (cli.hasOption('s')) {
|
||||||
|
stormWatcher = new KafkaSpoutTreeWatcher(client,
|
||||||
|
watchedTopics,
|
||||||
|
consumerOffsets)
|
||||||
|
stormWatcher.onConsumerData << gaugeRegistrar
|
||||||
|
stormWatcher.start()
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
if (cli.hasOption('n')) {
|
||||||
|
reporter = ConsoleReporter.forRegistry(registry)
|
||||||
|
.convertRatesTo(TimeUnit.SECONDS)
|
||||||
|
.convertDurationsTo(TimeUnit.MILLISECONDS)
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
UdpTransport transport = new UdpTransport.Builder()
|
||||||
|
.withPrefix(statsdPrefix)
|
||||||
|
.build()
|
||||||
|
|
||||||
|
reporter = DatadogReporter.forRegistry(registry)
|
||||||
|
.withEC2Host()
|
||||||
|
.withTransport(transport)
|
||||||
|
.build()
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Start the reporter if we've got it */
|
||||||
|
reporter?.start(delayInSeconds, TimeUnit.SECONDS)
|
||||||
|
|
||||||
|
// shutdown threads
|
||||||
|
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||||
|
public void run() {
|
||||||
|
Main.logger.info("showdown threads")
|
||||||
|
poller.die()
|
||||||
|
consumerWatcher.close()
|
||||||
|
if (stormWatcher != null) {
|
||||||
|
stormWatcher.close()
|
||||||
|
}
|
||||||
|
poller.join()
|
||||||
|
}
|
||||||
|
});
|
||||||
|
logger.info('Starting wait loop...')
|
||||||
|
}
|
||||||
|
|
||||||
|
static void registerMetricFor(KafkaConsumer consumer,
|
||||||
|
ConcurrentHashMap<KafkaConsumer, ConsumerGauge> consumerGauges,
|
||||||
|
ConcurrentHashMap<KafkaConsumer, Integer> consumerOffsets,
|
||||||
|
ConcurrentHashMap<TopicPartition, Long> topicOffsets) {
|
||||||
|
/*
|
||||||
|
* Bail early if we already ahve our Consumer registered
|
||||||
|
*/
|
||||||
|
if (consumerGauges.containsKey(consumer)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ConsumerGauge gauge = new ConsumerGauge(consumer,
|
||||||
|
consumerOffsets,
|
||||||
|
topicOffsets)
|
||||||
|
consumerGauges.put(consumer, gauge)
|
||||||
|
this.registry.register(gauge.nameForRegistry, gauge)
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create the Options option necessary for verspaetung to have CLI options
|
||||||
|
*/
|
||||||
|
static Options createCLI() {
|
||||||
|
Options options = new Options()
|
||||||
|
|
||||||
|
Option zookeeper = OptionBuilder.withArgName('HOSTS')
|
||||||
|
.hasArg()
|
||||||
|
.withDescription('Comma separated list of Zookeeper hosts (e.g. localhost:2181)')
|
||||||
|
.withLongOpt('zookeeper')
|
||||||
|
.withValueSeparator(',' as char)
|
||||||
|
.create('z')
|
||||||
|
|
||||||
|
Option excludeGroups = OptionBuilder.withArgName('EXCLUDES')
|
||||||
|
.hasArgs()
|
||||||
|
.withDescription('Regular expression for consumer groups to exclude from reporting (can be declared multiple times)')
|
||||||
|
.withLongOpt('exclude')
|
||||||
|
.create('x')
|
||||||
|
|
||||||
|
Option statsdHost = OptionBuilder.withArgName('STATSD')
|
||||||
|
.hasArg()
|
||||||
|
.withType(String)
|
||||||
|
.withDescription('Hostname for a statsd instance (defaults to localhost)')
|
||||||
|
.withLongOpt('statsd-host')
|
||||||
|
.create('H')
|
||||||
|
|
||||||
|
Option statsdPort = OptionBuilder.withArgName('PORT')
|
||||||
|
.hasArg()
|
||||||
|
.withType(Integer)
|
||||||
|
.withDescription('Port for the statsd instance (defaults to 8125)')
|
||||||
|
.withLongOpt('statsd-port')
|
||||||
|
.create('p')
|
||||||
|
|
||||||
|
Option dryRun = OptionBuilder.withDescription('Disable reporting to a statsd host')
|
||||||
|
.withLongOpt('dry-run')
|
||||||
|
.create('n')
|
||||||
|
|
||||||
|
Option stormSpouts = OptionBuilder.withDescription('Watch Storm KafkaSpout offsets (under /kafka_spout)')
|
||||||
|
.withLongOpt('storm')
|
||||||
|
.create('s')
|
||||||
|
|
||||||
|
Option statsdPrefix = OptionBuilder.withArgName('PREFIX')
|
||||||
|
.hasArg()
|
||||||
|
.withType(String)
|
||||||
|
.withDescription("Prefix all metrics with PREFIX before they're reported (e.g. PREFIX.verspaetung.mytopic)")
|
||||||
|
.withLongOpt('prefix')
|
||||||
|
.create()
|
||||||
|
|
||||||
|
Option delaySeconds = OptionBuilder.withArgName('DELAY')
|
||||||
|
.hasArg()
|
||||||
|
.withType(Integer)
|
||||||
|
.withDescription("Seconds to delay between reporting metrics to the metrics receiver (defaults: 5s)")
|
||||||
|
.withLongOpt('delay')
|
||||||
|
.create('d')
|
||||||
|
|
||||||
|
options.addOption(zookeeper)
|
||||||
|
options.addOption(statsdHost)
|
||||||
|
options.addOption(statsdPort)
|
||||||
|
options.addOption(statsdPrefix)
|
||||||
|
options.addOption(dryRun)
|
||||||
|
options.addOption(stormSpouts)
|
||||||
|
options.addOption(delaySeconds)
|
||||||
|
options.addOption(excludeGroups)
|
||||||
|
|
||||||
|
return options
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Parse out all the command line options from the array of string
|
||||||
|
* arguments
|
||||||
|
*/
|
||||||
|
static CommandLine parseCommandLine(String[] args) {
|
||||||
|
Options options = createCLI()
|
||||||
|
PosixParser parser = new PosixParser()
|
||||||
|
|
||||||
|
try {
|
||||||
|
return parser.parse(options, args)
|
||||||
|
}
|
||||||
|
catch (MissingOptionException|UnrecognizedOptionException ex) {
|
||||||
|
HelpFormatter formatter = new HelpFormatter()
|
||||||
|
println ex.message
|
||||||
|
formatter.printHelp('verspaetung', options)
|
||||||
|
System.exit(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return true if we should exclude the given KafkaConsumer from reporting
|
||||||
|
*/
|
||||||
|
|
||||||
|
static boolean shouldExcludeConsumer(String[] excludeGroups, KafkaConsumer consumer) {
|
||||||
|
return null != excludeGroups?.find { String excludeRule -> consumer?.name.matches(excludeRule) }
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,11 +1,11 @@
|
||||||
package com.github.lookout.verspaetung
|
package com.github.reiseburo.verspaetung
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple container for Kafka topic names and partition IDs
|
* Simple container for Kafka topic names and partition IDs
|
||||||
*/
|
*/
|
||||||
class TopicPartition {
|
class TopicPartition {
|
||||||
private String topic
|
private final String topic
|
||||||
private Integer partition
|
private final Integer partition
|
||||||
|
|
||||||
TopicPartition(String topic, Integer partition) {
|
TopicPartition(String topic, Integer partition) {
|
||||||
this.topic = topic
|
this.topic = topic
|
||||||
|
@ -27,12 +27,8 @@ class TopicPartition {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((this.topic == compared.topic) &&
|
return (this.topic == compared.topic) &&
|
||||||
(this.partition == compared.partition)) {
|
(this.partition == compared.partition)
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
|
@ -0,0 +1,73 @@
|
||||||
|
package com.github.reiseburo.verspaetung.metrics
|
||||||
|
|
||||||
|
import com.codahale.metrics.Gauge
|
||||||
|
import groovy.transform.TypeChecked
|
||||||
|
import org.coursera.metrics.datadog.Tagged
|
||||||
|
|
||||||
|
import com.github.reiseburo.verspaetung.KafkaConsumer
|
||||||
|
import com.github.reiseburo.verspaetung.TopicPartition
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Dropwizard Metrics Gauge for reporting the value of a given KafkaConsumer
|
||||||
|
*/
|
||||||
|
@TypeChecked
|
||||||
|
class ConsumerGauge implements Gauge<Integer>, Tagged {
|
||||||
|
protected KafkaConsumer consumer
|
||||||
|
protected AbstractMap<KafkaConsumer, Integer> consumers
|
||||||
|
protected AbstractMap<TopicPartition, Long> topics
|
||||||
|
private final TopicPartition topicPartition
|
||||||
|
|
||||||
|
ConsumerGauge(KafkaConsumer consumer,
|
||||||
|
AbstractMap<KafkaConsumer, Integer> consumers,
|
||||||
|
AbstractMap<TopicPartition, Long> topics) {
|
||||||
|
this.consumer = consumer
|
||||||
|
this.consumers = consumers
|
||||||
|
this.topics = topics
|
||||||
|
|
||||||
|
this.topicPartition = new TopicPartition(consumer.topic, consumer.partition)
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Integer getValue() {
|
||||||
|
if ((!this.consumers.containsKey(consumer)) ||
|
||||||
|
(!this.topics.containsKey(topicPartition))) {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Returning the maximum value of the computation and zero, there are
|
||||||
|
* some cases where we can be "behind" on the Kafka latest offset
|
||||||
|
* polling and this could result in an erroneous negative value. See:
|
||||||
|
* <https://github.com/reiseburo/verspaetung/issues/25> for more details
|
||||||
|
*/
|
||||||
|
return Math.max(0,
|
||||||
|
((Integer)this.topics[topicPartition]) - this.consumers[consumer])
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
List<String> getTags() {
|
||||||
|
return ["partition:${this.consumer.partition}",
|
||||||
|
"topic:${this.consumer.topic}",
|
||||||
|
"consumer-group:${this.consumer.name}"
|
||||||
|
]*.toString()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* return a unique name for this gauge
|
||||||
|
*/
|
||||||
|
String getNameForRegistry() {
|
||||||
|
return "${this.consumer.topic}.${this.consumer.partition}.${this.consumer.name}"
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
String getName() {
|
||||||
|
return this.consumer.topic
|
||||||
|
|
||||||
|
/* need to return this if we're just using the console or statsd
|
||||||
|
* reporters
|
||||||
|
|
||||||
|
return "${this.consumer.topic}.${this.consumer.partition}.${this.consumer.name}"
|
||||||
|
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,14 @@
|
||||||
|
package com.github.reiseburo.verspaetung.metrics
|
||||||
|
|
||||||
|
import com.codahale.metrics.Gauge
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A simple gauge that will always just return 1 indicating that the process is
|
||||||
|
* alive
|
||||||
|
*/
|
||||||
|
class HeartbeatGauge implements Gauge<Integer> {
|
||||||
|
@Override
|
||||||
|
Integer getValue() {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,22 +1,27 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
import com.github.lookout.verspaetung.TopicPartition
|
import com.github.reiseburo.verspaetung.KafkaConsumer
|
||||||
|
|
||||||
import java.util.concurrent.CopyOnWriteArrayList
|
|
||||||
import groovy.transform.TypeChecked
|
import groovy.transform.TypeChecked
|
||||||
|
|
||||||
import org.apache.curator.framework.CuratorFramework
|
import org.apache.curator.framework.CuratorFramework
|
||||||
import org.apache.curator.framework.recipes.cache.ChildData
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
@TypeChecked
|
@TypeChecked
|
||||||
abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
|
abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
|
||||||
protected AbstractMap<TopicPartition, List<ConsumerOffset>> consumersMap
|
protected AbstractMap<KafkaConsumer, Integer> consumerOffsets
|
||||||
|
protected AbstractSet<String> watchedTopics
|
||||||
|
protected List<Closure> onConsumerData = []
|
||||||
|
|
||||||
AbstractConsumerTreeWatcher(CuratorFramework client,
|
protected AbstractConsumerTreeWatcher(CuratorFramework client,
|
||||||
AbstractMap consumersMap) {
|
AbstractSet topics,
|
||||||
|
AbstractMap offsets) {
|
||||||
super(client)
|
super(client)
|
||||||
this.consumersMap = consumersMap
|
this.watchedTopics = topics
|
||||||
|
this.consumerOffsets = offsets
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -61,17 +66,18 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
|
||||||
* this class on instantiation
|
* this class on instantiation
|
||||||
*/
|
*/
|
||||||
void trackConsumerOffset(ConsumerOffset offset) {
|
void trackConsumerOffset(ConsumerOffset offset) {
|
||||||
if (this.consumersMap == null) {
|
if (this.consumerOffsets == null) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
TopicPartition key = new TopicPartition(offset.topic, offset.partition)
|
this.watchedTopics << offset.topic
|
||||||
|
KafkaConsumer consumer = new KafkaConsumer(offset.topic,
|
||||||
|
offset.partition,
|
||||||
|
offset.groupName)
|
||||||
|
this.consumerOffsets[consumer] = offset.offset
|
||||||
|
|
||||||
if (this.consumersMap.containsKey(key)) {
|
this.onConsumerData.each { Closure c ->
|
||||||
this.consumersMap[key] << offset
|
c.call(consumer)
|
||||||
}
|
|
||||||
else {
|
|
||||||
this.consumersMap[key] = new CopyOnWriteArrayList([offset])
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -80,12 +86,8 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
|
||||||
* we're interested in
|
* we're interested in
|
||||||
*/
|
*/
|
||||||
Boolean isNodeEvent(TreeCacheEvent event) {
|
Boolean isNodeEvent(TreeCacheEvent event) {
|
||||||
if ((event?.type == TreeCacheEvent.Type.NODE_ADDED) ||
|
return (event?.type == TreeCacheEvent.Type.NODE_ADDED) ||
|
||||||
(event?.type == TreeCacheEvent.Type.NODE_UPDATED)) {
|
(event?.type == TreeCacheEvent.Type.NODE_UPDATED)
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
import groovy.transform.TypeChecked
|
import groovy.transform.TypeChecked
|
||||||
|
|
||||||
|
@ -16,19 +16,21 @@ import org.slf4j.LoggerFactory
|
||||||
* further down the pipeline
|
* further down the pipeline
|
||||||
*/
|
*/
|
||||||
@TypeChecked
|
@TypeChecked
|
||||||
|
@SuppressWarnings(['ThisReferenceEscapesConstructor'])
|
||||||
abstract class AbstractTreeWatcher implements TreeCacheListener {
|
abstract class AbstractTreeWatcher implements TreeCacheListener {
|
||||||
protected List<Closure> onInitComplete
|
protected List<Closure> onInitComplete
|
||||||
protected Logger logger
|
protected Logger logger
|
||||||
protected CuratorFramework client
|
protected CuratorFramework client
|
||||||
protected TreeCache cache
|
protected TreeCache cache
|
||||||
|
|
||||||
AbstractTreeWatcher(CuratorFramework client) {
|
protected AbstractTreeWatcher(CuratorFramework curatorClient) {
|
||||||
this.logger = LoggerFactory.getLogger(this.class)
|
logger = LoggerFactory.getLogger(this.class)
|
||||||
this.client = client
|
client = curatorClient
|
||||||
this.onInitComplete = []
|
onInitComplete = []
|
||||||
|
|
||||||
this.cache = new TreeCache(client, zookeeperPath())
|
cache = new TreeCache(client, zookeeperPath())
|
||||||
this.cache.listenable.addListener(this)
|
/* this may introduce a race condition, need to figure out a better way to handle it */
|
||||||
|
cache.listenable.addListener(this)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -45,5 +47,13 @@ abstract class AbstractTreeWatcher implements TreeCacheListener {
|
||||||
return this
|
return this
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close our internal cache and return ourselves for API cleanliness
|
||||||
|
*/
|
||||||
|
AbstractTreeWatcher close() {
|
||||||
|
this.cache?.close()
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
abstract void childEvent(CuratorFramework client, TreeCacheEvent event)
|
abstract void childEvent(CuratorFramework client, TreeCacheEvent event)
|
||||||
}
|
}
|
|
@ -0,0 +1,73 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||||
|
|
||||||
|
import groovy.transform.TypeChecked
|
||||||
|
|
||||||
|
import org.slf4j.Logger
|
||||||
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
|
/**
|
||||||
|
* manage a list of brokers. can be online or offline. offline means the
|
||||||
|
* internal list is hidden, i.e. the list() gives you an empty list.
|
||||||
|
*/
|
||||||
|
@TypeChecked
|
||||||
|
class BrokerManager {
|
||||||
|
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(BrokerManager)
|
||||||
|
|
||||||
|
private final List<KafkaBroker> brokers = Collections.synchronizedList([])
|
||||||
|
|
||||||
|
// we start with being offline
|
||||||
|
private boolean offline = true
|
||||||
|
|
||||||
|
void add(KafkaBroker broker) {
|
||||||
|
synchronized(brokers) {
|
||||||
|
if (broker != null && this.brokers.indexOf(broker) == -1) {
|
||||||
|
this.brokers.add(broker)
|
||||||
|
logger.info('broker added: {}', broker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void update(KafkaBroker broker) {
|
||||||
|
synchronized(brokers) {
|
||||||
|
if (broker == null) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if (this.brokers.indexOf(broker) != -1) {
|
||||||
|
this.brokers.remove(broker)
|
||||||
|
}
|
||||||
|
this.brokers.add(broker)
|
||||||
|
logger.info('broker updated: {}', broker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void remove(KafkaBroker broker) {
|
||||||
|
synchronized(brokers) {
|
||||||
|
if (broker != null && this.brokers.remove(broker)) {
|
||||||
|
logger.info('broker removed: {}', broker)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO not sure if this is correct - see BrokerTreeWatcher
|
||||||
|
@SuppressWarnings('ConfusingMethodName')
|
||||||
|
void offline() {
|
||||||
|
this.offline = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO not sure if this is correct - see BrokerTreeWatcher
|
||||||
|
void online() {
|
||||||
|
this.offline = false
|
||||||
|
}
|
||||||
|
|
||||||
|
Collection<KafkaBroker> list() {
|
||||||
|
if (this.offline) {
|
||||||
|
[]
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
this.brokers
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,72 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||||
|
|
||||||
|
import groovy.json.JsonSlurper
|
||||||
|
import groovy.transform.TypeChecked
|
||||||
|
|
||||||
|
import org.apache.curator.framework.CuratorFramework
|
||||||
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The BrokerTreeWatcher is a kind of watcher whose sole purpose is
|
||||||
|
* to watch the segment of the Zookeeper tree where Kafka stores broker
|
||||||
|
* information
|
||||||
|
*/
|
||||||
|
@TypeChecked
|
||||||
|
class BrokerTreeWatcher extends AbstractTreeWatcher {
|
||||||
|
private static final String BROKERS_PATH = '/brokers/ids'
|
||||||
|
|
||||||
|
private final List<Closure> onBrokerUpdates
|
||||||
|
private final BrokerManager manager
|
||||||
|
private final PojoFactory factory
|
||||||
|
|
||||||
|
BrokerTreeWatcher(CuratorFramework client) {
|
||||||
|
super(client)
|
||||||
|
|
||||||
|
this.factory = new PojoFactory(new JsonSlurper())
|
||||||
|
this.manager = new BrokerManager()
|
||||||
|
this.onBrokerUpdates = []
|
||||||
|
}
|
||||||
|
|
||||||
|
String zookeeperPath() {
|
||||||
|
return BROKERS_PATH
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process events to keep an up to date list of brokers
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
void childEvent(CuratorFramework client, TreeCacheEvent event) {
|
||||||
|
switch (event.type) {
|
||||||
|
case TreeCacheEvent.Type.INITIALIZED:
|
||||||
|
this.manager.online()
|
||||||
|
break
|
||||||
|
case TreeCacheEvent.Type.NODE_ADDED:
|
||||||
|
KafkaBroker broker = this.factory.createKafkaBroker(event)
|
||||||
|
this.manager.add(broker)
|
||||||
|
break
|
||||||
|
case TreeCacheEvent.Type.NODE_UPDATED:
|
||||||
|
KafkaBroker broker = this.factory.createKafkaBroker(event)
|
||||||
|
this.manager.update(broker)
|
||||||
|
break
|
||||||
|
case TreeCacheEvent.Type.NODE_REMOVED:
|
||||||
|
KafkaBroker broker = this.factory.createKafkaBroker(event)
|
||||||
|
this.manager.remove(broker)
|
||||||
|
break
|
||||||
|
// TODO these 3 events might come with path which can be mapped
|
||||||
|
// to a specific broker
|
||||||
|
case TreeCacheEvent.Type.CONNECTION_LOST:
|
||||||
|
case TreeCacheEvent.Type.CONNECTION_SUSPENDED:
|
||||||
|
this.manager.offline()
|
||||||
|
break
|
||||||
|
case TreeCacheEvent.Type.CONNECTION_RECONNECTED:
|
||||||
|
this.manager.online()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
this.onBrokerUpdates.each { Closure c ->
|
||||||
|
c?.call(this.manager.list())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,16 +1,13 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
import org.apache.curator.framework.recipes.cache.ChildData
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* POJO representing data from Zookeeper for a consumer, topic and offset
|
* POJO representing data from Zookeeper for a consumer, topic and offset
|
||||||
*/
|
*/
|
||||||
class ConsumerOffset {
|
class ConsumerOffset {
|
||||||
private String topic
|
String topic
|
||||||
private String groupName
|
String groupName
|
||||||
private Integer offset
|
Integer offset
|
||||||
private Integer partition
|
Integer partition
|
||||||
private ChildData rawData
|
|
||||||
|
|
||||||
ConsumerOffset() {
|
ConsumerOffset() {
|
||||||
}
|
}
|
||||||
|
@ -22,7 +19,7 @@ class ConsumerOffset {
|
||||||
}
|
}
|
||||||
|
|
||||||
String toString() {
|
String toString() {
|
||||||
return "ConsumerOffset[${hashCode()}] ${topic}:${partition} ${groupName} is at ${offset}"
|
return "ConsumerOffset[${hashCode()}] ${topic}:${partition} ${groupName} is at ${offset}".toString()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,42 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import groovy.transform.TypeChecked
|
||||||
|
|
||||||
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
|
/**
|
||||||
|
* parsing TreeCacheEvents and provide helper method to access its data in typed manner
|
||||||
|
*/
|
||||||
|
@TypeChecked
|
||||||
|
class EventData {
|
||||||
|
|
||||||
|
private final String data
|
||||||
|
private final List<String> pathParts
|
||||||
|
|
||||||
|
EventData(TreeCacheEvent event) {
|
||||||
|
ChildData data = event.data
|
||||||
|
this.pathParts = data.path == null ? null : (data.path.split('/') as List)
|
||||||
|
this.data = data.data == null ? null : new String(data.data, 'UTF-8')
|
||||||
|
}
|
||||||
|
|
||||||
|
Integer asInteger() {
|
||||||
|
new Integer(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
String asString() {
|
||||||
|
data
|
||||||
|
}
|
||||||
|
|
||||||
|
Integer pathPartsSize() {
|
||||||
|
pathParts.size()
|
||||||
|
}
|
||||||
|
|
||||||
|
Integer getPathPartAsInteger(int pos) {
|
||||||
|
if (pathParts.size() <= pos) {
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
new Integer(pathParts[pos])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
import groovy.json.JsonSlurper
|
import groovy.json.JsonSlurper
|
||||||
import groovy.transform.TypeChecked
|
import groovy.transform.TypeChecked
|
||||||
|
@ -15,20 +15,25 @@ import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
@InheritConstructors
|
@InheritConstructors
|
||||||
class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher {
|
class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher {
|
||||||
private static final String ZK_PATH = '/kafka_spout'
|
private static final String ZK_PATH = '/kafka_spout'
|
||||||
private JsonSlurper json
|
private final JsonSlurper json
|
||||||
|
|
||||||
KafkaSpoutTreeWatcher(CuratorFramework client, AbstractMap consumersMap) {
|
KafkaSpoutTreeWatcher(CuratorFramework client,
|
||||||
super(client, consumersMap)
|
AbstractSet topics,
|
||||||
|
AbstractMap offsets) {
|
||||||
|
super(client, topics, offsets)
|
||||||
|
|
||||||
this.json = new JsonSlurper()
|
this.json = new JsonSlurper()
|
||||||
}
|
}
|
||||||
|
|
||||||
String zookeeperPath() { return ZK_PATH }
|
String zookeeperPath() {
|
||||||
|
return ZK_PATH
|
||||||
|
}
|
||||||
|
|
||||||
/* skipping type checking since Groovy's JsonSlurper gives us a pretty
|
/* skipping type checking since Groovy's JsonSlurper gives us a pretty
|
||||||
* loose Object to deal with
|
* loose Object to deal with
|
||||||
*/
|
*/
|
||||||
@TypeChecked(TypeCheckingMode.SKIP)
|
@TypeChecked(TypeCheckingMode.SKIP)
|
||||||
|
@SuppressWarnings(['LineLength'])
|
||||||
ConsumerOffset processChildData(ChildData nodeData) {
|
ConsumerOffset processChildData(ChildData nodeData) {
|
||||||
Object offsetData = json.parseText(new String(nodeData.data, 'UTF-8'))
|
Object offsetData = json.parseText(new String(nodeData.data, 'UTF-8'))
|
||||||
/*
|
/*
|
|
@ -0,0 +1,42 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||||
|
|
||||||
|
import groovy.json.JsonSlurper
|
||||||
|
|
||||||
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
|
/**
|
||||||
|
* factory creating Pojo's from TreeCacheEvents
|
||||||
|
*/
|
||||||
|
class PojoFactory {
|
||||||
|
|
||||||
|
private final JsonSlurper json
|
||||||
|
|
||||||
|
PojoFactory(JsonSlurper json) {
|
||||||
|
this.json = json
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* converts an treeCacheEvent into a KafkaBroker. the caller
|
||||||
|
* is responsible for calling the factory method with the
|
||||||
|
* right data. i.e. path starts with /brokers/ids. the implementation
|
||||||
|
* just uses whatever it is given to create a KafkaBroker object
|
||||||
|
*/
|
||||||
|
KafkaBroker createKafkaBroker(TreeCacheEvent event) {
|
||||||
|
EventData data = new EventData(event)
|
||||||
|
// We're expecting paths like: /brokers/ids/1231524312
|
||||||
|
Integer id = data.getPathPartAsInteger(3)
|
||||||
|
if (id == null) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
String json = data.asString()
|
||||||
|
if (json == null) {
|
||||||
|
new KafkaBroker('', 0, id)
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Object payload = this.json.parseText(json)
|
||||||
|
new KafkaBroker(payload.host, payload.port, id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
import groovy.transform.TypeChecked
|
import groovy.transform.TypeChecked
|
||||||
import groovy.transform.InheritConstructors
|
import groovy.transform.InheritConstructors
|
||||||
|
@ -13,12 +13,15 @@ import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
class StandardTreeWatcher extends AbstractConsumerTreeWatcher {
|
class StandardTreeWatcher extends AbstractConsumerTreeWatcher {
|
||||||
private static final String ZK_PATH = '/consumers'
|
private static final String ZK_PATH = '/consumers'
|
||||||
|
|
||||||
String zookeeperPath() { return ZK_PATH }
|
String zookeeperPath() {
|
||||||
|
return ZK_PATH
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Extract the necessary information from a standard (i.e. high-level Kafka
|
* Extract the necessary information from a standard (i.e. high-level Kafka
|
||||||
* consumer) tree of offsets
|
* consumer) tree of offsets
|
||||||
*/
|
*/
|
||||||
|
@SuppressWarnings(['LineLength'])
|
||||||
ConsumerOffset processChildData(ChildData data) {
|
ConsumerOffset processChildData(ChildData data) {
|
||||||
/*
|
/*
|
||||||
ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473,8595174478,1416808804928,1416808805262,1,0,0,0,1,0,8595174473, data=[48]}
|
ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473,8595174478,1416808804928,1416808805262,1,0,0,0,1,0,8595174473, data=[48]}
|
||||||
|
@ -40,15 +43,13 @@ ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473
|
||||||
}
|
}
|
||||||
catch (NumberFormatException ex) {
|
catch (NumberFormatException ex) {
|
||||||
logger.error("Failed to parse an Integer: ${data}")
|
logger.error("Failed to parse an Integer: ${data}")
|
||||||
return null
|
offset = null
|
||||||
}
|
}
|
||||||
|
|
||||||
return offset
|
return offset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
Boolean isOffsetSubtree(String path) {
|
Boolean isOffsetSubtree(String path) {
|
||||||
return (path =~ /\/consumers\/(.*)\/offsets\/(.*)/)
|
return (path =~ /\/consumers\/(.*)\/offsets\/(.*)/)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -1,29 +0,0 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
|
||||||
|
|
||||||
import spock.lang.*
|
|
||||||
|
|
||||||
class BrokerTreeWatcherSpec extends Specification {
|
|
||||||
BrokerTreeWatcher watcher
|
|
||||||
|
|
||||||
def setup() {
|
|
||||||
this.watcher = new BrokerTreeWatcher()
|
|
||||||
}
|
|
||||||
|
|
||||||
def "brokerIdFromPath() should return the right ID with a valid path"() {
|
|
||||||
given:
|
|
||||||
String path = "/brokers/ids/1337"
|
|
||||||
|
|
||||||
expect:
|
|
||||||
watcher.brokerIdFromPath(path) == 1337
|
|
||||||
}
|
|
||||||
|
|
||||||
def "brokerIdFromPath() should return -1 on null paths"() {
|
|
||||||
expect:
|
|
||||||
watcher.brokerIdFromPath(null) == -1
|
|
||||||
}
|
|
||||||
|
|
||||||
def "brokerIdFromPath() should return -1 on empty/invalid paths"() {
|
|
||||||
expect:
|
|
||||||
watcher.brokerIdFromPath('/spock/ed') == -1
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
package com.github.reiseburo.verspaetung
|
||||||
|
|
||||||
|
import spock.lang.*
|
||||||
|
|
||||||
|
class DelaySpec extends Specification {
|
||||||
|
Delay delay = new Delay()
|
||||||
|
|
||||||
|
def "it should give default on first use"() {
|
||||||
|
given:
|
||||||
|
|
||||||
|
expect:
|
||||||
|
delay.value() == Delay.POLLER_DELAY_MIN
|
||||||
|
}
|
||||||
|
|
||||||
|
def "slower has an upper bound"() {
|
||||||
|
given:
|
||||||
|
for(int i = 1; i < 20; i++) { delay.slower() }
|
||||||
|
def firstLast = delay.value()
|
||||||
|
def result = delay.slower()
|
||||||
|
def secondLast = delay.value()
|
||||||
|
|
||||||
|
expect:
|
||||||
|
firstLast == secondLast
|
||||||
|
firstLast == Delay.POLLER_DELAY_MAX
|
||||||
|
result == false
|
||||||
|
}
|
||||||
|
|
||||||
|
def "increasing delay gives true"() {
|
||||||
|
def result = true
|
||||||
|
for(int i = 1; delay.value() < Delay.POLLER_DELAY_MAX; i++) {
|
||||||
|
result = result && delay.slower()
|
||||||
|
}
|
||||||
|
def last = delay.slower()
|
||||||
|
|
||||||
|
expect:
|
||||||
|
result == true
|
||||||
|
last == false
|
||||||
|
}
|
||||||
|
|
||||||
|
def "reset on min value gives false"() {
|
||||||
|
given:
|
||||||
|
def result = delay.reset()
|
||||||
|
|
||||||
|
expect:
|
||||||
|
result == false
|
||||||
|
}
|
||||||
|
def "reset on none min value gives true"() {
|
||||||
|
given:
|
||||||
|
delay.slower()
|
||||||
|
def result = delay.reset()
|
||||||
|
|
||||||
|
expect:
|
||||||
|
result == true
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
package com.github.reiseburo.verspaetung
|
||||||
|
|
||||||
|
import spock.lang.*
|
||||||
|
|
||||||
|
class KafkaConsumerSpec extends Specification {
|
||||||
|
String topic = 'spock-topic'
|
||||||
|
Integer partition = 2
|
||||||
|
String consumerName = 'spock-consumer'
|
||||||
|
|
||||||
|
def "the constructor should set the properties properly"() {
|
||||||
|
given:
|
||||||
|
KafkaConsumer consumer = new KafkaConsumer(topic, partition, consumerName)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
consumer instanceof KafkaConsumer
|
||||||
|
consumer.topic == topic
|
||||||
|
consumer.partition == partition
|
||||||
|
consumer.name == consumerName
|
||||||
|
}
|
||||||
|
|
||||||
|
def "equals() is true with identical source material"() {
|
||||||
|
given:
|
||||||
|
KafkaConsumer consumer1 = new KafkaConsumer(topic, partition, consumerName)
|
||||||
|
KafkaConsumer consumer2 = new KafkaConsumer(topic, partition, consumerName)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
consumer1 == consumer2
|
||||||
|
}
|
||||||
|
|
||||||
|
def "equals() is false with differing source material"() {
|
||||||
|
given:
|
||||||
|
KafkaConsumer consumer1 = new KafkaConsumer(topic, partition, consumerName)
|
||||||
|
KafkaConsumer consumer2 = new KafkaConsumer(topic, partition, "i am different")
|
||||||
|
|
||||||
|
expect:
|
||||||
|
consumer1 != consumer2
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,6 @@
|
||||||
|
package com.github.reiseburo.verspaetung
|
||||||
|
|
||||||
|
import spock.lang.*
|
||||||
|
|
||||||
|
class KafkaPollerSpec extends Specification {
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
package com.github.reiseburo.verspaetung
|
||||||
|
|
||||||
|
|
||||||
|
import spock.lang.*
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
class MainSpec extends Specification {
|
||||||
|
def "shouldExcludeGroups() shuold return false by default"() {
|
||||||
|
expect:
|
||||||
|
Main.shouldExcludeConsumer(new String[0], null) == false
|
||||||
|
}
|
||||||
|
|
||||||
|
def "shouldExcludeGroups() with a matching exclude should return true"() {
|
||||||
|
given:
|
||||||
|
final String[] groups = ['foo'] as String[]
|
||||||
|
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'foo')
|
||||||
|
|
||||||
|
expect:
|
||||||
|
Main.shouldExcludeConsumer(groups, consumer)
|
||||||
|
}
|
||||||
|
|
||||||
|
def "shouldExcludeGroups() with a matching regex should return true"() {
|
||||||
|
given:
|
||||||
|
final String[] groups = ['.*foo'] as String[]
|
||||||
|
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'spockfoo')
|
||||||
|
|
||||||
|
expect:
|
||||||
|
Main.shouldExcludeConsumer(groups, consumer)
|
||||||
|
}
|
||||||
|
|
||||||
|
def "shouldExcludeGroups() with multiple regexes that don't match should return false"() {
|
||||||
|
given:
|
||||||
|
final String[] groups = ['.*foo', 'bar(.*)'] as String[]
|
||||||
|
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'spock')
|
||||||
|
|
||||||
|
expect:
|
||||||
|
Main.shouldExcludeConsumer(groups, consumer) == false
|
||||||
|
}
|
||||||
|
|
||||||
|
def "shouldExcludeGroups() with multiple regexes which match should return true"() {
|
||||||
|
given:
|
||||||
|
final String[] groups = ['.*foo', 'bar(.*)'] as String[]
|
||||||
|
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'barstool')
|
||||||
|
|
||||||
|
expect:
|
||||||
|
Main.shouldExcludeConsumer(groups, consumer)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package com.github.lookout.verspaetung
|
package com.github.reiseburo.verspaetung
|
||||||
|
|
||||||
import spock.lang.*
|
import spock.lang.*
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
package com.github.reiseburo.verspaetung.metrics
|
||||||
|
|
||||||
|
import spock.lang.*
|
||||||
|
|
||||||
|
import com.github.reiseburo.verspaetung.KafkaConsumer
|
||||||
|
import com.github.reiseburo.verspaetung.TopicPartition
|
||||||
|
|
||||||
|
class ConsumerGaugeSpec extends Specification {
|
||||||
|
private KafkaConsumer consumer
|
||||||
|
private TopicPartition tp
|
||||||
|
|
||||||
|
def setup() {
|
||||||
|
this.tp = new TopicPartition('spock-topic', 1)
|
||||||
|
this.consumer = new KafkaConsumer(tp.topic, tp.partition, 'spock-consumer')
|
||||||
|
}
|
||||||
|
|
||||||
|
def "constructor should work"() {
|
||||||
|
given:
|
||||||
|
ConsumerGauge gauge = new ConsumerGauge(consumer, [:], [:])
|
||||||
|
|
||||||
|
expect:
|
||||||
|
gauge.consumer instanceof KafkaConsumer
|
||||||
|
gauge.consumers instanceof AbstractMap
|
||||||
|
}
|
||||||
|
|
||||||
|
def "getValue() should source a value from the map"() {
|
||||||
|
given:
|
||||||
|
ConsumerGauge gauge = new ConsumerGauge(this.consumer,
|
||||||
|
[(this.consumer) : 2],
|
||||||
|
[(this.tp) : 3])
|
||||||
|
|
||||||
|
expect:
|
||||||
|
gauge.value == 1
|
||||||
|
}
|
||||||
|
|
||||||
|
def "getValue() should return zero for a consumer not in the map"() {
|
||||||
|
given:
|
||||||
|
ConsumerGauge gauge = new ConsumerGauge(this.consumer, [:], [:])
|
||||||
|
|
||||||
|
expect:
|
||||||
|
gauge.value == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
def "getValue() should return zero instead of a negative number"() {
|
||||||
|
given:
|
||||||
|
ConsumerGauge gauge = new ConsumerGauge(this.consumer,
|
||||||
|
[(this.consumer) : 10],
|
||||||
|
[(this.tp) : 5])
|
||||||
|
|
||||||
|
expect:
|
||||||
|
gauge.value == 0
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,8 +1,8 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
import spock.lang.*
|
import spock.lang.*
|
||||||
|
|
||||||
import com.github.lookout.verspaetung.TopicPartition
|
import com.github.reiseburo.verspaetung.TopicPartition
|
||||||
import org.apache.curator.framework.CuratorFramework
|
import org.apache.curator.framework.CuratorFramework
|
||||||
import org.apache.curator.framework.recipes.cache.ChildData
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
@ -13,7 +13,7 @@ class AbstractConsumerTreeWatcherSpec extends Specification {
|
||||||
|
|
||||||
class MockWatcher extends AbstractConsumerTreeWatcher {
|
class MockWatcher extends AbstractConsumerTreeWatcher {
|
||||||
MockWatcher() {
|
MockWatcher() {
|
||||||
super(null, [:])
|
super(null, new HashSet(), [:])
|
||||||
}
|
}
|
||||||
ConsumerOffset processChildData(ChildData d) { }
|
ConsumerOffset processChildData(ChildData d) { }
|
||||||
String zookeeperPath() { return '/zk/spock' }
|
String zookeeperPath() { return '/zk/spock' }
|
||||||
|
@ -63,10 +63,11 @@ class AbstractConsumerTreeWatcherSpec extends Specification {
|
||||||
watcher.trackConsumerOffset(offset)
|
watcher.trackConsumerOffset(offset)
|
||||||
|
|
||||||
then:
|
then:
|
||||||
watcher.consumersMap.size() == 1
|
watcher.consumerOffsets.size() == 1
|
||||||
|
watcher.watchedTopics.size() == 1
|
||||||
}
|
}
|
||||||
|
|
||||||
def "trackConsumerOffset() should append to a list for existing topics in the map"() {
|
def "trackConsumerOffset() append an offset but not a topic for different group names"() {
|
||||||
given:
|
given:
|
||||||
String topic = 'spock-topic'
|
String topic = 'spock-topic'
|
||||||
TopicPartition mapKey = new TopicPartition(topic, 0)
|
TopicPartition mapKey = new TopicPartition(topic, 0)
|
||||||
|
@ -80,8 +81,15 @@ class AbstractConsumerTreeWatcherSpec extends Specification {
|
||||||
watcher.trackConsumerOffset(secondOffset)
|
watcher.trackConsumerOffset(secondOffset)
|
||||||
|
|
||||||
then:
|
then:
|
||||||
watcher.consumersMap.size() == 1
|
watcher.watchedTopics.size() == 1
|
||||||
watcher.consumersMap[mapKey].size() == 2
|
watcher.consumerOffsets.size() == 2
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Ignore
|
||||||
|
def "removeConsumer() should remove a ConsumerOffset from the map"() {
|
||||||
|
given:
|
||||||
|
TopicPartition tp = new TopicPartition('spock', 1)
|
||||||
|
ConsumerOffset consumer = new ConsumerOffset()
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,8 +1,8 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
import spock.lang.*
|
import spock.lang.*
|
||||||
|
|
||||||
import com.github.lookout.verspaetung.TopicPartition
|
import com.github.reiseburo.verspaetung.TopicPartition
|
||||||
import org.apache.curator.framework.CuratorFramework
|
import org.apache.curator.framework.CuratorFramework
|
||||||
import org.apache.curator.framework.recipes.cache.ChildData
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
|
@ -0,0 +1,95 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import spock.lang.*
|
||||||
|
|
||||||
|
import groovy.json.JsonSlurper
|
||||||
|
|
||||||
|
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||||
|
|
||||||
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
|
class BrokerManagerSpec extends Specification {
|
||||||
|
BrokerManager manager = new BrokerManager()
|
||||||
|
|
||||||
|
def "new instance is offline, i.e. has empty list of brokers"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
def "new instance is offline after add broker"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
manager.add(broker)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
def "list shows brokers after getting online"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
KafkaBroker broker2 = new KafkaBroker('', 0, 123)
|
||||||
|
manager.add(broker)
|
||||||
|
manager.online()
|
||||||
|
manager.remove(broker2)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 1
|
||||||
|
}
|
||||||
|
|
||||||
|
def "can remove brokers based on its id"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
|
||||||
|
manager.add(broker)
|
||||||
|
manager.online()
|
||||||
|
manager.remove(broker2)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
def "can update brokers"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
|
||||||
|
manager.online()
|
||||||
|
manager.add(broker)
|
||||||
|
manager.update(broker2)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 1
|
||||||
|
manager.list().first().host == 'localhost'
|
||||||
|
}
|
||||||
|
|
||||||
|
def "can go offline anytime"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
|
||||||
|
manager.online()
|
||||||
|
manager.add(broker)
|
||||||
|
manager.add(broker2)
|
||||||
|
manager.offline()
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
def "can go offline and online again"() {
|
||||||
|
given:
|
||||||
|
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||||
|
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
|
||||||
|
manager.online()
|
||||||
|
manager.add(broker)
|
||||||
|
manager.add(broker2)
|
||||||
|
manager.offline()
|
||||||
|
manager.online()
|
||||||
|
|
||||||
|
expect:
|
||||||
|
manager.list().size() == 2
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,47 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import spock.lang.*
|
||||||
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
|
class EventDataSpec extends Specification {
|
||||||
|
EventData data
|
||||||
|
|
||||||
|
def "converts TreeCacheEvents to EventData"() {
|
||||||
|
given:
|
||||||
|
String path = "/brokers/ids/1337"
|
||||||
|
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||||
|
new ChildData(path, null, "be happy".bytes)))
|
||||||
|
|
||||||
|
expect:
|
||||||
|
data.getPathPartAsInteger(3) == 1337
|
||||||
|
data.asString() == 'be happy'
|
||||||
|
data.pathPartsSize() == 4
|
||||||
|
data.getPathPartAsInteger(4) == null
|
||||||
|
}
|
||||||
|
|
||||||
|
def "converts TreeCacheEvents to EventData with integer payload and no path"() {
|
||||||
|
given:
|
||||||
|
String path = "/brokers/ids/1337"
|
||||||
|
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||||
|
new ChildData("/", null, "42".bytes)))
|
||||||
|
|
||||||
|
expect:
|
||||||
|
data.asString() == '42'
|
||||||
|
data.asInteger() == 42
|
||||||
|
data.pathPartsSize() == 0
|
||||||
|
data.getPathPartAsInteger(0) == null
|
||||||
|
}
|
||||||
|
|
||||||
|
def "converts TreeCacheEvents to EventData with no payload"() {
|
||||||
|
given:
|
||||||
|
String path = "/brokers/ids/1337"
|
||||||
|
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||||
|
new ChildData("/123", null, null)))
|
||||||
|
|
||||||
|
expect:
|
||||||
|
data.asString() == null
|
||||||
|
data.pathPartsSize() == 2
|
||||||
|
data.getPathPartAsInteger(1) == 123
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
import spock.lang.*
|
import spock.lang.*
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ class KafkaSpoutTreeWatcherSpec extends Specification {
|
||||||
|
|
||||||
def setup() {
|
def setup() {
|
||||||
this.mockCurator = Mock(CuratorFramework)
|
this.mockCurator = Mock(CuratorFramework)
|
||||||
this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, [:])
|
this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, new HashSet(), [:])
|
||||||
}
|
}
|
||||||
|
|
||||||
def "consumerNameFromPath() should give the right name for a valid path"() {
|
def "consumerNameFromPath() should give the right name for a valid path"() {
|
|
@ -0,0 +1,52 @@
|
||||||
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
|
import spock.lang.*
|
||||||
|
|
||||||
|
import groovy.json.JsonSlurper
|
||||||
|
|
||||||
|
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||||
|
|
||||||
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
|
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
|
|
||||||
|
class PojoFactorySpec extends Specification {
|
||||||
|
PojoFactory factory = new PojoFactory(new JsonSlurper())
|
||||||
|
|
||||||
|
def "create KafkaBroker from TreeCacheEvent"() {
|
||||||
|
given:
|
||||||
|
String path = "/brokers/ids/1337"
|
||||||
|
String payload = "{\"host\":\"localhost\",\"port\":9092}"
|
||||||
|
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||||
|
new ChildData(path, null, payload.bytes))
|
||||||
|
KafkaBroker broker = factory.createKafkaBroker(event)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
broker.brokerId == 1337
|
||||||
|
broker.host == 'localhost'
|
||||||
|
broker.port == 9092
|
||||||
|
}
|
||||||
|
|
||||||
|
def "create KafkaBroker from TreeCacheEvent without brokerId on path"() {
|
||||||
|
given:
|
||||||
|
String path = "/brokers/ids"
|
||||||
|
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||||
|
new ChildData(path, null, ''.bytes))
|
||||||
|
KafkaBroker broker = factory.createKafkaBroker(event)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
broker == null
|
||||||
|
}
|
||||||
|
|
||||||
|
def "create KafkaBroker from TreeCacheEvent without payload"() {
|
||||||
|
given:
|
||||||
|
String path = "/brokers/ids/123"
|
||||||
|
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||||
|
new ChildData(path, null, null))
|
||||||
|
KafkaBroker broker = factory.createKafkaBroker(event)
|
||||||
|
|
||||||
|
expect:
|
||||||
|
broker.brokerId == 123
|
||||||
|
broker.host == ''
|
||||||
|
broker.port == 0
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
package com.github.reiseburo.verspaetung.zk
|
||||||
|
|
||||||
import spock.lang.*
|
import spock.lang.*
|
||||||
|
|
||||||
|
@ -11,7 +11,7 @@ class StandardTreeWatcherSpec extends Specification {
|
||||||
|
|
||||||
def setup() {
|
def setup() {
|
||||||
this.mockCurator = Mock(CuratorFramework)
|
this.mockCurator = Mock(CuratorFramework)
|
||||||
this.watcher = new StandardTreeWatcher(this.mockCurator, [:])
|
this.watcher = new StandardTreeWatcher(this.mockCurator, new HashSet(), [:])
|
||||||
}
|
}
|
||||||
|
|
||||||
def "processChildData should return null if the path is invalid"() {
|
def "processChildData should return null if the path is invalid"() {
|
Loading…
Reference in New Issue