Compare commits
58 Commits
Author | SHA1 | Date |
---|---|---|
R. Tyler Croy | 148359d816 | |
R. Tyler Croy | 0bac696847 | |
R. Tyler Croy | dc1cfab361 | |
R. Tyler Croy | 324b3faba7 | |
R. Tyler Croy | f901cba7e1 | |
R. Tyler Croy | 48a258c3bc | |
Christian Meier | 7a6ef71eef | |
Christian Meier | 618aaf915e | |
Christian Meier | 66c0c332bb | |
Christian Meier | 4c655995f9 | |
Christian Meier | dc918a700e | |
R. Tyler Croy | 416168cbff | |
R. Tyler Croy | 68ce9cd72e | |
R. Tyler Croy | 9af40ccdec | |
R. Tyler Croy | 1525ae30ac | |
R. Tyler Croy | fb4f152fed | |
R. Tyler Croy | f15aeaee0c | |
Christian Meier | 98af950498 | |
Christian Meier | 439e8f635e | |
Christian Meier | 45d98cb5f6 | |
Christian Meier | 59c8e79f4e | |
R. Tyler Croy | 07bf319972 | |
R. Tyler Croy | 3778a24044 | |
R. Tyler Croy | 921ef7be64 | |
R. Tyler Croy | 7b918d448a | |
R. Tyler Croy | 4f416e84dd | |
R. Tyler Croy | e18ac81468 | |
R. Tyler Croy | fc97f67100 | |
R. Tyler Croy | 0168f5c805 | |
R. Tyler Croy | b6bbcf92a4 | |
R. Tyler Croy | be33333bfa | |
R. Tyler Croy | 5df6083e6d | |
R. Tyler Croy | 623aa6b10a | |
R. Tyler Croy | 9535100528 | |
R. Tyler Croy | 457d2a195c | |
R. Tyler Croy | e01e699861 | |
R. Tyler Croy | f968a2c1c8 | |
R. Tyler Croy | 8c7046aa44 | |
R. Tyler Croy | 39db2912ad | |
R. Tyler Croy | 08e4076f95 | |
R. Tyler Croy | cb3465f715 | |
R. Tyler Croy | 27d51c84e4 | |
R. Tyler Croy | 1c588f6bff | |
R. Tyler Croy | 5cc83eef08 | |
R. Tyler Croy | 685f0353f2 | |
R. Tyler Croy | 81d15d53ec | |
R. Tyler Croy | f053d9b68c | |
R. Tyler Croy | 3a9caa2535 | |
R. Tyler Croy | dc33298435 | |
R. Tyler Croy | 789e7e863c | |
R. Tyler Croy | 38e8c62e00 | |
R. Tyler Croy | 71347594ca | |
R. Tyler Croy | f4042894fe | |
R. Tyler Croy | 7b13050ee2 | |
R. Tyler Croy | 0fa983d0e8 | |
R. Tyler Croy | 95c244867a | |
R. Tyler Croy | 0c59dcee70 | |
R. Tyler Croy | 100b6ab28a |
|
@ -1,3 +1,5 @@
|
|||
.gradle*
|
||||
*.sw*
|
||||
build/
|
||||
.idea/*
|
||||
*.iml
|
||||
|
|
17
.travis.yml
17
.travis.yml
|
@ -1,4 +1,19 @@
|
|||
language: java
|
||||
jdk:
|
||||
- oraclejdk7
|
||||
- oraclejdk8
|
||||
|
||||
# Using 'install' to run the clean to avoid Travis automatically calling
|
||||
# `./gradlew assemble and duplicating my work
|
||||
install:
|
||||
- ./gradlew clean
|
||||
|
||||
# Invoke our default tasks, whatever is defined as important in the
|
||||
# build.gradle file
|
||||
script:
|
||||
- ./gradlew
|
||||
|
||||
env:
|
||||
global:
|
||||
- secure: X71NKVXJjyG1C6/fZUTxdQ6HwAcaNRWSc0m/VRXsaiZwgXjIr+C+Uz9X4iu6Q52YAc+7CrvftAbIxgQhf+TBtvvnGeZgWXuYqf6Cx1weyL/6xsttOLsEGdtHU9jvrtw4tHRXxu/6F+8QAot8VRwUsCB4IL5Y3epshddbk+/1d9Q=
|
||||
|
||||
after_success: "./gradlew bintrayUpload -PbintrayUser=rtyler -PbintrayKey=${BINTRAY_KEY}"
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
=== Hacking Verspätung
|
||||
|
||||
Verspätung is a link:http://groovy-lang.org[Groovy]-based application which is
|
||||
built with link:http://gradle.org[Gradle]. As such, if you are already familiar
|
||||
with these tools you should be able to find your way around the project with
|
||||
relative ease.
|
||||
|
||||
|
||||
A quick primer on what tasks are available:
|
||||
|
||||
* Running tests: `./gradlew check`
|
||||
* Running the app locally: `./gradlew run -PzookeeperHosts=localhost:2181`
|
||||
* Building the app for distribution: `./gradlew assemble`
|
||||
|
||||
|
||||
=== Releasing Verspätung
|
||||
|
||||
NOTE: This is mostly meant for the developer team.
|
||||
|
||||
Currently releases can be produced by simply pushing a Git tag to this GitHub
|
||||
repository. This will cause Travis CI to build and test the tag, which if it is
|
||||
successful, will automatically publish to
|
||||
link:https://bintray.com/reiseburo/apps/verspaetung[Bintray].
|
|
@ -0,0 +1,39 @@
|
|||
/*
|
||||
* This is a multibranch workflow file for defining how this project should be
|
||||
* built, tested and deployed
|
||||
*/
|
||||
|
||||
node {
|
||||
stage 'Clean workspace'
|
||||
/* Running on a fresh Docker instance makes this redundant, but just in
|
||||
* case the host isn't configured to give us a new Docker image for every
|
||||
* build, make sure we clean things before we do anything
|
||||
*/
|
||||
deleteDir()
|
||||
|
||||
|
||||
stage 'Checkout source'
|
||||
/*
|
||||
* Represents the SCM configuration in a "Workflow from SCM" project build. Use checkout
|
||||
* scm to check out sources matching Jenkinsfile with the SCM details from
|
||||
* the build that is executing this Jenkinsfile.
|
||||
*
|
||||
* when not in multibranch: https://issues.jenkins-ci.org/browse/JENKINS-31386
|
||||
*/
|
||||
checkout scm
|
||||
|
||||
|
||||
stage 'Build and test'
|
||||
/* if we can't install everything we need for Ruby in less than 15 minutes
|
||||
* we might as well just give up
|
||||
*/
|
||||
timeout(30) {
|
||||
sh './gradlew -iS'
|
||||
}
|
||||
|
||||
stage 'Capture test results and artifacts'
|
||||
step([$class: 'JUnitResultArchiver', testResults: 'build/test-results/**/*.xml'])
|
||||
step([$class: 'ArtifactArchiver', artifacts: 'build/libs/*.jar', fingerprint: true])
|
||||
}
|
||||
|
||||
// vim: ft=groovy
|
|
@ -1,4 +1,4 @@
|
|||
*Copyright (c) 2015 Lookout, Inc*
|
||||
*Copyright (c) 2015 Lookout, Inc, R. Tyler Croy*
|
||||
|
||||
MIT License
|
||||
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
image:https://travis-ci.org/reiseburo/verspaetung.svg?branch=master["Build Status", link="https://travis-ci.org/reiseburo/verspaetung"]
|
||||
|
||||
image::https://api.bintray.com/packages/reiseburo/apps/verspaetung/images/download.svg[link="https://bintray.com/reiseburo/apps/verspaetung/_latestVersion"]
|
||||
|
||||
Verspätung is a small utility which aims to help identify delay of link:http://kafka.apache.org[Kafka] consumers.
|
||||
|
||||
|
||||
Verspätung monitors the topics and their latest offsets by talking to Kafka, it
|
||||
will also keep track of how far along consumers are by monitoring the offsets
|
||||
that they have committed to Zookeeper. Using both of these pieces of
|
||||
information, Verspätung computs the delta for each of the consumer groups and
|
||||
reports it to statsd.
|
||||
|
||||
=== Using
|
||||
|
||||
% java -jar verspaetung-*-all.jar --help
|
||||
usage: verspaetung
|
||||
-d,--delay <DELAY> Seconds to delay between reporting metrics to
|
||||
the metrics receiver (defaults: 5s)
|
||||
-H,--statsd-host <STATSD> Hostname for a statsd instance (defaults to
|
||||
localhost)
|
||||
-n,--dry-run Disable reporting to a statsd host
|
||||
-p,--statsd-port <PORT> Port for the statsd instance (defaults to
|
||||
8125)
|
||||
--prefix <PREFIX> Prefix all metrics with PREFIX before they're
|
||||
reported (e.g. PREFIX.verspaetung.mytopic)
|
||||
-s,--storm Watch Storm KafkaSpout offsets (under
|
||||
/kafka_spout)
|
||||
-x,--exclude <EXCLUDES> Regular expression for consumer groups to
|
||||
exclude from reporting (can be declared
|
||||
multiple times)
|
||||
-z,--zookeeper <HOSTS> Comma separated list of Zookeeper hosts (e.g.
|
||||
localhost:2181)
|
||||
|
||||
Running Verspätung is rather easy, by default the daemon will monitor the
|
||||
standard Kafka high-level consumer offset path of `/consumers` and start
|
||||
reporting deltas automatically.
|
||||
|
20
README.md
20
README.md
|
@ -1,20 +0,0 @@
|
|||
# Verspätung
|
||||
|
||||
[![Build
|
||||
Status](https://travis-ci.org/lookout/verspaetung.svg)](https://travis-ci.org/lookout/verspaetung)
|
||||
|
||||
Verspätung is a small utility which aims to help identify delay of
|
||||
[Kafka](http://kafka.apache.org) consumers.
|
||||
|
||||
Verspätung monitors the topics and their latest offsets by talking to Kafka, it
|
||||
will also keep track of how far along consumers are by monitoring the offsets
|
||||
that they have committed to Zookeeper. Using both of these pieces of
|
||||
information, Verspätung computs the delta for each of the consumer groups and
|
||||
reports it to statsd.
|
||||
|
||||
|
||||
### Hacking
|
||||
|
||||
* *Running tests:* `./gradlew check`
|
||||
* *Running the app locally:* `./gradlew run -PzookeeperHosts=localhost:2181`
|
||||
* *Building the app for distribution:* `./gradlew shadowJar`
|
128
build.gradle
128
build.gradle
|
@ -1,17 +1,36 @@
|
|||
plugins {
|
||||
id "com.jfrog.bintray" version "1.0"
|
||||
id 'com.github.johnrengelman.shadow' version '1.2.0'
|
||||
id "org.ajoberstar.github-pages" version "1.2.0"
|
||||
id "org.asciidoctor.gradle.asciidoctor" version "1.5.1"
|
||||
id 'codenarc'
|
||||
id 'groovy'
|
||||
id 'idea'
|
||||
id 'application'
|
||||
}
|
||||
apply plugin: 'groovy'
|
||||
apply plugin: 'application'
|
||||
|
||||
group = "com.github.lookout"
|
||||
group = "com.github.reiseburo"
|
||||
description = "A utility for monitoring the delay of Kafka consumers"
|
||||
version = '0.1.4'
|
||||
mainClassName = 'com.github.lookout.verspaetung.Main'
|
||||
defaultTasks 'clean', 'check'
|
||||
sourceCompatibility = '1.7'
|
||||
targetCompatibility = '1.7'
|
||||
version = '0.6.0'
|
||||
mainClassName = 'com.github.reiseburo.verspaetung.Main'
|
||||
defaultTasks 'check', 'assemble'
|
||||
|
||||
/* Ensure we properly build for JDK7 still */
|
||||
plugins.withType(JavaPlugin) {
|
||||
sourceCompatibility = 1.7
|
||||
targetCompatibility = 1.7
|
||||
|
||||
|
||||
project.tasks.withType(JavaCompile) { task ->
|
||||
task.sourceCompatibility = project.sourceCompatibility
|
||||
task.targetCompatibility = project.targetCompatibility
|
||||
}
|
||||
|
||||
project.tasks.withType(GroovyCompile) { task ->
|
||||
task.sourceCompatibility = project.sourceCompatibility
|
||||
task.targetCompatibility = project.targetCompatibility
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
@ -31,14 +50,13 @@ test {
|
|||
////////////////////////////////////////////////////////////////////////////////
|
||||
// DEPENDENCY MANAGEMENT
|
||||
repositories {
|
||||
mavenCentral()
|
||||
jcenter()
|
||||
|
||||
maven { url 'https://dl.bintray.com/rtyler/maven' }
|
||||
/* needed for forked metrics library */
|
||||
maven { url 'https://dl.bintray.com/lookout/systems' }
|
||||
}
|
||||
|
||||
dependencies {
|
||||
compile 'org.codehaus.groovy:groovy-all:2.4.0+'
|
||||
compile 'org.codehaus.groovy:groovy-all:2.4.3+'
|
||||
|
||||
[
|
||||
'curator-framework',
|
||||
|
@ -55,21 +73,61 @@ dependencies {
|
|||
/* Needed for command line options parsing */
|
||||
compile 'commons-cli:commons-cli:1.2+'
|
||||
|
||||
compile 'com.timgroup:java-statsd-client:3.1.2+'
|
||||
compile 'com.github.lookout:metrics-datadog:0.1.3'
|
||||
|
||||
['metrics-core', 'metrics-graphite'].each { artifactName ->
|
||||
compile "io.dropwizard.metrics:${artifactName}:3.1.0"
|
||||
}
|
||||
|
||||
/* Logback is to be used for logging through the app */
|
||||
compile 'ch.qos.logback:logback-classic:1.1.2+'
|
||||
|
||||
codenarc "org.codenarc:CodeNarc:0.24"
|
||||
|
||||
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0'
|
||||
testCompile 'cglib:cglib-nodep:2.2.+'
|
||||
}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
jar {
|
||||
enabled = false
|
||||
idea {
|
||||
module {
|
||||
downloadJavadoc = true
|
||||
downloadSources = true
|
||||
}
|
||||
}
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// PUBLISHING/DOCUMENTATION
|
||||
assemble.dependsOn groovydoc
|
||||
|
||||
asciidoctor {
|
||||
/* Using a single backend, so skipping the html5/ output dir */
|
||||
separateOutputDirs false
|
||||
}
|
||||
assemble.dependsOn asciidoctor
|
||||
|
||||
githubPages {
|
||||
repoUri = 'git@github.com:lookout/verspaetung.git'
|
||||
pages {
|
||||
into('groovydoc') { from groovydoc }
|
||||
from asciidoctor
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
codenarc {
|
||||
configFile file("${projectDir}/gradle/codenarc.rules")
|
||||
sourceSets = [sourceSets.main]
|
||||
}
|
||||
codenarcMain {
|
||||
exclude '**/Main.groovy'
|
||||
}
|
||||
|
||||
shadowJar {
|
||||
exclude 'META-INF/*.RSA', 'META-INF/*.DSA'
|
||||
manifest {
|
||||
|
@ -84,17 +142,45 @@ artifacts {
|
|||
archives shadowJar
|
||||
}
|
||||
|
||||
sourceSets {
|
||||
main {
|
||||
groovy {
|
||||
exclude '**/*.sw*'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* https://github.com/reiseburo/verspaetung/issues/28
|
||||
*
|
||||
* disable the distZip and distTar tasks since we only need the shadow jar
|
||||
*/
|
||||
distZip.enabled = false
|
||||
distTar.enabled = false
|
||||
|
||||
/* We're not building a library jar so we'll disable this default jar task */
|
||||
jar.enabled = false
|
||||
/* Remove the "library" jar from the archives configuration so it's not
|
||||
* published
|
||||
*/
|
||||
configurations.archives.artifacts.removeAll { it.archiveTask.is jar }
|
||||
|
||||
|
||||
bintray {
|
||||
user = project.bintrayUser
|
||||
key = project.bintrayKey
|
||||
publish = true
|
||||
dryRun = false
|
||||
/*
|
||||
* Only only publish when we're tagging a release and if we've executed on
|
||||
* a JDK8 build. This is to prevent multiple attempts by the build matrix
|
||||
* to publish the artifacts
|
||||
*/
|
||||
dryRun = !((System.env.TRAVIS_TAG as boolean) && (System.env.TRAVIS_JDK_VERSION == 'oraclejdk8'))
|
||||
configurations = ['archives']
|
||||
|
||||
pkg {
|
||||
userOrg = 'lookout'
|
||||
repo = 'systems'
|
||||
userOrg = 'reiseburo'
|
||||
repo = 'apps'
|
||||
name = 'verspaetung'
|
||||
labels = []
|
||||
|
||||
|
|
|
@ -0,0 +1,16 @@
|
|||
|
||||
zookeeper:
|
||||
image: wurstmeister/zookeeper
|
||||
ports:
|
||||
- 2181:2181
|
||||
kafka:
|
||||
image: wurstmeister/kafka
|
||||
ports:
|
||||
- 9092:9092
|
||||
links:
|
||||
- zookeeper:zk
|
||||
environment:
|
||||
# Only using one node, so we're fine with localhost
|
||||
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
|
@ -1,3 +1,6 @@
|
|||
org.gradle.daemon=true
|
||||
# Disabling daemon mode since it makes PermGen leaks with codenarc more common
|
||||
org.gradle.daemon=false
|
||||
bintrayUser=
|
||||
bintrayKey=
|
||||
sourceCompatibility=1.7
|
||||
targetCompatibility=1.7
|
||||
|
|
|
@ -0,0 +1,379 @@
|
|||
ruleset {
|
||||
|
||||
description '''
|
||||
A Sample Groovy RuleSet containing all CodeNarc Rules
|
||||
You can use this as a template for your own custom RuleSet.
|
||||
Just delete the rules that you don't want to include.
|
||||
'''
|
||||
|
||||
AbcComplexity // DEPRECATED: Use the AbcMetric rule instead. Requires the GMetrics jar
|
||||
AbcMetric // Requires the GMetrics jar
|
||||
AbstractClassName
|
||||
AbstractClassWithPublicConstructor
|
||||
AbstractClassWithoutAbstractMethod
|
||||
AddEmptyString
|
||||
AssertWithinFinallyBlock
|
||||
AssignCollectionSort
|
||||
AssignCollectionUnique
|
||||
AssignmentInConditional
|
||||
AssignmentToStaticFieldFromInstanceMethod
|
||||
BigDecimalInstantiation
|
||||
BitwiseOperatorInConditional
|
||||
BlankLineBeforePackage
|
||||
BooleanGetBoolean
|
||||
BooleanMethodReturnsNull
|
||||
BracesForClass
|
||||
BracesForForLoop
|
||||
BracesForIfElse
|
||||
BracesForMethod
|
||||
BracesForTryCatchFinally
|
||||
BrokenNullCheck
|
||||
BrokenOddnessCheck
|
||||
BuilderMethodWithSideEffects
|
||||
BusyWait
|
||||
CatchArrayIndexOutOfBoundsException
|
||||
CatchError
|
||||
CatchException
|
||||
CatchIllegalMonitorStateException
|
||||
CatchIndexOutOfBoundsException
|
||||
CatchNullPointerException
|
||||
CatchRuntimeException
|
||||
CatchThrowable
|
||||
ChainedTest
|
||||
ClassForName
|
||||
ClassJavadoc
|
||||
ClassName
|
||||
ClassNameSameAsFilename
|
||||
ClassNameSameAsSuperclass
|
||||
ClassSize
|
||||
CloneWithoutCloneable
|
||||
CloneableWithoutClone
|
||||
CloseWithoutCloseable
|
||||
ClosureAsLastMethodParameter
|
||||
ClosureStatementOnOpeningLineOfMultipleLineClosure
|
||||
CollectAllIsDeprecated
|
||||
CompareToWithoutComparable
|
||||
ComparisonOfTwoConstants
|
||||
ComparisonWithSelf
|
||||
ConfusingClassNamedException
|
||||
ConfusingMethodName
|
||||
ConfusingMultipleReturns
|
||||
ConfusingTernary
|
||||
ConsecutiveBlankLines
|
||||
ConsecutiveLiteralAppends
|
||||
ConsecutiveStringConcatenation
|
||||
ConstantAssertExpression
|
||||
ConstantIfExpression
|
||||
ConstantTernaryExpression
|
||||
ConstantsOnlyInterface
|
||||
CouldBeElvis
|
||||
CoupledTestCase
|
||||
CrapMetric // Requires the GMetrics jar and a Cobertura coverage file
|
||||
CyclomaticComplexity // Requires the GMetrics jar
|
||||
DeadCode
|
||||
DirectConnectionManagement
|
||||
DoubleCheckedLocking
|
||||
DoubleNegative
|
||||
DuplicateCaseStatement
|
||||
DuplicateImport
|
||||
DuplicateListLiteral
|
||||
DuplicateMapKey
|
||||
DuplicateMapLiteral
|
||||
/*
|
||||
This is a pointless check
|
||||
DuplicateNumberLiteral
|
||||
/*
|
||||
DuplicateSetValue
|
||||
DuplicateStringLiteral
|
||||
ElseBlockBraces
|
||||
EmptyCatchBlock
|
||||
EmptyClass
|
||||
EmptyElseBlock
|
||||
EmptyFinallyBlock
|
||||
EmptyForStatement
|
||||
EmptyIfStatement
|
||||
EmptyInstanceInitializer
|
||||
EmptyMethod
|
||||
EmptyMethodInAbstractClass
|
||||
EmptyStaticInitializer
|
||||
EmptySwitchStatement
|
||||
EmptySynchronizedStatement
|
||||
EmptyTryBlock
|
||||
EmptyWhileStatement
|
||||
EnumCustomSerializationIgnored
|
||||
EqualsAndHashCode
|
||||
EqualsOverloaded
|
||||
ExceptionExtendsError
|
||||
ExceptionExtendsThrowable
|
||||
ExceptionNotThrown
|
||||
ExplicitArrayListInstantiation
|
||||
ExplicitCallToAndMethod
|
||||
ExplicitCallToCompareToMethod
|
||||
ExplicitCallToDivMethod
|
||||
ExplicitCallToEqualsMethod
|
||||
ExplicitCallToGetAtMethod
|
||||
ExplicitCallToLeftShiftMethod
|
||||
ExplicitCallToMinusMethod
|
||||
ExplicitCallToModMethod
|
||||
ExplicitCallToMultiplyMethod
|
||||
ExplicitCallToOrMethod
|
||||
ExplicitCallToPlusMethod
|
||||
ExplicitCallToPowerMethod
|
||||
ExplicitCallToRightShiftMethod
|
||||
ExplicitCallToXorMethod
|
||||
ExplicitGarbageCollection
|
||||
ExplicitHashMapInstantiation
|
||||
ExplicitHashSetInstantiation
|
||||
ExplicitLinkedHashMapInstantiation
|
||||
ExplicitLinkedListInstantiation
|
||||
ExplicitStackInstantiation
|
||||
ExplicitTreeSetInstantiation
|
||||
FactoryMethodName
|
||||
FieldName
|
||||
FileCreateTempFile
|
||||
FileEndsWithoutNewline
|
||||
FinalClassWithProtectedMember
|
||||
ForLoopShouldBeWhileLoop
|
||||
ForStatementBraces
|
||||
GStringAsMapKey
|
||||
GStringExpressionWithinString
|
||||
GetterMethodCouldBeProperty
|
||||
GrailsDomainHasEquals
|
||||
GrailsDomainHasToString
|
||||
GrailsDomainReservedSqlKeywordName
|
||||
GrailsDomainWithServiceReference
|
||||
GrailsDuplicateConstraint
|
||||
GrailsDuplicateMapping
|
||||
GrailsMassAssignment
|
||||
GrailsPublicControllerMethod
|
||||
GrailsServletContextReference
|
||||
GrailsSessionReference // DEPRECATED
|
||||
GrailsStatelessService
|
||||
GroovyLangImmutable
|
||||
HardCodedWindowsFileSeparator
|
||||
HardCodedWindowsRootDirectory
|
||||
HashtableIsObsolete
|
||||
IfStatementBraces
|
||||
/*
|
||||
"Uh yes, we'd like you to make your code more unreadable"
|
||||
IfStatementCouldBeTernary
|
||||
*/
|
||||
IllegalClassMember
|
||||
IllegalClassReference
|
||||
IllegalPackageReference
|
||||
IllegalRegex
|
||||
IllegalString
|
||||
IllegalSubclass
|
||||
ImplementationAsType
|
||||
ImportFromSamePackage
|
||||
ImportFromSunPackages
|
||||
InconsistentPropertyLocking
|
||||
InconsistentPropertySynchronization
|
||||
InsecureRandom
|
||||
/*
|
||||
instanceof is useful in Groovy, not sure why this check is here
|
||||
Instanceof
|
||||
*/
|
||||
IntegerGetInteger
|
||||
InterfaceName
|
||||
InterfaceNameSameAsSuperInterface
|
||||
InvertedIfElse
|
||||
JUnitAssertAlwaysFails
|
||||
JUnitAssertAlwaysSucceeds
|
||||
JUnitAssertEqualsConstantActualValue
|
||||
JUnitFailWithoutMessage
|
||||
JUnitLostTest
|
||||
JUnitPublicField
|
||||
JUnitPublicNonTestMethod
|
||||
JUnitPublicProperty
|
||||
JUnitSetUpCallsSuper
|
||||
JUnitStyleAssertions
|
||||
JUnitTearDownCallsSuper
|
||||
JUnitTestMethodWithoutAssert
|
||||
JUnitUnnecessarySetUp
|
||||
JUnitUnnecessaryTearDown
|
||||
JUnitUnnecessaryThrowsException
|
||||
JavaIoPackageAccess
|
||||
JdbcConnectionReference
|
||||
JdbcResultSetReference
|
||||
JdbcStatementReference
|
||||
LineLength
|
||||
LocaleSetDefault
|
||||
LoggerForDifferentClass
|
||||
LoggerWithWrongModifiers
|
||||
LoggingSwallowsStacktrace
|
||||
LongLiteralWithLowerCaseL
|
||||
MethodCount
|
||||
MethodName
|
||||
MethodSize
|
||||
MisorderedStaticImports
|
||||
MissingBlankLineAfterImports
|
||||
MissingBlankLineAfterPackage
|
||||
MissingNewInThrowStatement
|
||||
MultipleLoggers
|
||||
MultipleUnaryOperators
|
||||
NestedBlockDepth
|
||||
NestedForLoop
|
||||
NestedSynchronization
|
||||
NoDef
|
||||
NoWildcardImports
|
||||
NonFinalPublicField
|
||||
NonFinalSubclassOfSensitiveInterface
|
||||
ObjectFinalize
|
||||
ObjectOverrideMisspelledMethodName
|
||||
PackageName
|
||||
PackageNameMatchesFilePath
|
||||
ParameterCount
|
||||
ParameterName
|
||||
ParameterReassignment
|
||||
PrintStackTrace
|
||||
Println
|
||||
PrivateFieldCouldBeFinal
|
||||
PropertyName
|
||||
PublicFinalizeMethod
|
||||
PublicInstanceField
|
||||
RandomDoubleCoercedToZero
|
||||
RemoveAllOnSelf
|
||||
RequiredRegex
|
||||
RequiredString
|
||||
ReturnFromFinallyBlock
|
||||
ReturnNullFromCatchBlock
|
||||
ReturnsNullInsteadOfEmptyArray
|
||||
ReturnsNullInsteadOfEmptyCollection
|
||||
SerialPersistentFields
|
||||
SerialVersionUID
|
||||
/* Not needed
|
||||
SerializableClassMustDefineSerialVersionUID
|
||||
*/
|
||||
SimpleDateFormatMissingLocale
|
||||
SpaceAfterCatch
|
||||
SpaceAfterClosingBrace
|
||||
SpaceAfterComma
|
||||
SpaceAfterFor
|
||||
SpaceAfterIf
|
||||
SpaceAfterOpeningBrace
|
||||
SpaceAfterSemicolon
|
||||
SpaceAfterSwitch
|
||||
SpaceAfterWhile
|
||||
SpaceAroundClosureArrow
|
||||
/*
|
||||
I prefer a little extra space
|
||||
SpaceAroundMapEntryColon
|
||||
*/
|
||||
SpaceAroundOperator
|
||||
SpaceBeforeClosingBrace
|
||||
SpaceBeforeOpeningBrace
|
||||
SpockIgnoreRestUsed
|
||||
StatelessClass
|
||||
StatelessSingleton
|
||||
StaticCalendarField
|
||||
StaticConnection
|
||||
StaticDateFormatField
|
||||
StaticMatcherField
|
||||
StaticSimpleDateFormatField
|
||||
SwallowThreadDeath
|
||||
SynchronizedMethod
|
||||
SynchronizedOnBoxedPrimitive
|
||||
SynchronizedOnGetClass
|
||||
SynchronizedOnReentrantLock
|
||||
SynchronizedOnString
|
||||
SynchronizedOnThis
|
||||
SynchronizedReadObjectMethod
|
||||
SystemErrPrint
|
||||
SystemExit
|
||||
SystemOutPrint
|
||||
SystemRunFinalizersOnExit
|
||||
TernaryCouldBeElvis
|
||||
ThisReferenceEscapesConstructor
|
||||
ThreadGroup
|
||||
ThreadLocalNotStaticFinal
|
||||
ThreadYield
|
||||
ThrowError
|
||||
ThrowException
|
||||
ThrowExceptionFromFinallyBlock
|
||||
ThrowNullPointerException
|
||||
ThrowRuntimeException
|
||||
ThrowThrowable
|
||||
ToStringReturnsNull
|
||||
TrailingWhitespace
|
||||
UnnecessaryBigDecimalInstantiation
|
||||
UnnecessaryBigIntegerInstantiation
|
||||
UnnecessaryBooleanExpression
|
||||
UnnecessaryBooleanInstantiation
|
||||
UnnecessaryCallForLastElement
|
||||
UnnecessaryCallToSubstring
|
||||
UnnecessaryCast
|
||||
UnnecessaryCatchBlock
|
||||
UnnecessaryCollectCall
|
||||
UnnecessaryCollectionCall
|
||||
UnnecessaryConstructor
|
||||
UnnecessaryDefInFieldDeclaration
|
||||
UnnecessaryDefInMethodDeclaration
|
||||
UnnecessaryDefInVariableDeclaration
|
||||
UnnecessaryDotClass
|
||||
UnnecessaryDoubleInstantiation
|
||||
UnnecessaryElseStatement
|
||||
UnnecessaryFail
|
||||
UnnecessaryFinalOnPrivateMethod
|
||||
UnnecessaryFloatInstantiation
|
||||
UnnecessaryGString
|
||||
UnnecessaryGetter
|
||||
UnnecessaryGroovyImport
|
||||
UnnecessaryIfStatement
|
||||
UnnecessaryInstanceOfCheck
|
||||
UnnecessaryInstantiationToGetClass
|
||||
UnnecessaryIntegerInstantiation
|
||||
UnnecessaryLongInstantiation
|
||||
UnnecessaryModOne
|
||||
UnnecessaryNullCheck
|
||||
UnnecessaryNullCheckBeforeInstanceOf
|
||||
UnnecessaryObjectReferences
|
||||
UnnecessaryOverridingMethod
|
||||
UnnecessaryPackageReference
|
||||
UnnecessaryParenthesesForMethodCallWithClosure
|
||||
UnnecessaryPublicModifier
|
||||
/*
|
||||
I quite like explicit return estatements
|
||||
UnnecessaryReturnKeyword
|
||||
*/
|
||||
UnnecessarySafeNavigationOperator
|
||||
UnnecessarySelfAssignment
|
||||
UnnecessarySemicolon
|
||||
UnnecessaryStringInstantiation
|
||||
UnnecessarySubstring
|
||||
UnnecessaryTernaryExpression
|
||||
UnnecessaryToString
|
||||
UnnecessaryTransientModifier
|
||||
UnsafeArrayDeclaration
|
||||
UnsafeImplementationAsMap
|
||||
UnusedArray
|
||||
UnusedImport
|
||||
/*
|
||||
Some interfaces require no use for the method parameter
|
||||
UnusedMethodParameter
|
||||
*/
|
||||
UnusedObject
|
||||
UnusedPrivateField
|
||||
UnusedPrivateMethod
|
||||
UnusedPrivateMethodParameter
|
||||
UnusedVariable
|
||||
UseAssertEqualsInsteadOfAssertTrue
|
||||
UseAssertFalseInsteadOfNegation
|
||||
UseAssertNullInsteadOfAssertEquals
|
||||
UseAssertSameInsteadOfAssertTrue
|
||||
UseAssertTrueInsteadOfAssertEquals
|
||||
UseAssertTrueInsteadOfNegation
|
||||
UseCollectMany
|
||||
UseCollectNested
|
||||
UseOfNotifyMethod
|
||||
VariableName
|
||||
VectorIsObsolete
|
||||
VolatileArrayField
|
||||
VolatileLongOrDoubleField
|
||||
WaitOutsideOfWhileLoop
|
||||
WhileStatementBraces
|
||||
}
|
||||
|
||||
doNotApplyToClassNames = 'Main'
|
||||
|
||||
// vim: ft=groovy
|
|
@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
|
|||
distributionPath=wrapper/dists
|
||||
zipStoreBase=GRADLE_USER_HOME
|
||||
zipStorePath=wrapper/dists
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-2.2.1-bin.zip
|
||||
distributionUrl=https\://services.gradle.org/distributions/gradle-2.6-all.zip
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
= Verspätung
|
||||
:toc: right
|
||||
|
||||
|
||||
== Overview/Readme
|
||||
|
||||
include::../../../README.adoc[]
|
||||
|
||||
== Developing
|
||||
|
||||
=== Useful links
|
||||
|
||||
* link:groovydoc/[Internal API docs]
|
||||
|
||||
include::../../../HACKING.adoc[]
|
|
@ -1,21 +0,0 @@
|
|||
package com.github.lookout.verspaetung
|
||||
|
||||
/**
|
||||
* POJO containing the necessary information to model a Kafka broker
|
||||
*/
|
||||
class KafkaBroker {
|
||||
private String host
|
||||
private Integer port
|
||||
private Integer brokerId
|
||||
|
||||
public KafkaBroker(Object jsonObject, Integer brokerId) {
|
||||
this.host = jsonObject.host
|
||||
this.port = jsonObject.port
|
||||
this.brokerId = brokerId
|
||||
}
|
||||
|
||||
@Override
|
||||
String toString() {
|
||||
return "broker<${this.brokerId}>@${this.host}:${this.port}"
|
||||
}
|
||||
}
|
|
@ -1,146 +0,0 @@
|
|||
package com.github.lookout.verspaetung
|
||||
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import kafka.cluster.Broker
|
||||
import kafka.client.ClientUtils
|
||||
import kafka.consumer.SimpleConsumer
|
||||
import kafka.common.TopicAndPartition
|
||||
import kafka.javaapi.*
|
||||
/* UGH */
|
||||
import scala.collection.JavaConversions
|
||||
|
||||
/* Can't type check this because it makes the calls in and out of Scala an
|
||||
* atrocious pain in the ass
|
||||
*/
|
||||
//@TypeChecked
|
||||
class KafkaPoller extends Thread {
|
||||
private final String KAFKA_CLIENT_ID = 'VerspaetungClient'
|
||||
private final Integer KAFKA_TIMEOUT = (5 * 1000)
|
||||
private final Integer KAFKA_BUFFER = (100 * 1024)
|
||||
private final Logger logger = LoggerFactory.getLogger(KafkaPoller.class)
|
||||
|
||||
private Boolean keepRunning = true
|
||||
private Boolean shouldReconnect = false
|
||||
private ConcurrentHashMap<Integer, SimpleConsumer> brokerConsumerMap = [:]
|
||||
private List<Broker> brokers = []
|
||||
private AbstractMap<TopicPartition, List<zk.ConsumerOffset>> consumersMap
|
||||
private List<Closure> onDelta = []
|
||||
|
||||
KafkaPoller(AbstractMap map) {
|
||||
this.consumersMap = map
|
||||
}
|
||||
|
||||
void run() {
|
||||
logger.info("Starting wait loop")
|
||||
while (keepRunning) {
|
||||
logger.debug("poll loop")
|
||||
|
||||
if (shouldReconnect) {
|
||||
reconnect()
|
||||
}
|
||||
|
||||
if (this.consumersMap.size() > 0) {
|
||||
dumpMetadata()
|
||||
}
|
||||
|
||||
Thread.sleep(1 * 1000)
|
||||
}
|
||||
}
|
||||
|
||||
void dumpMetadata() {
|
||||
logger.debug("dumping meta-data")
|
||||
|
||||
def topics = this.consumersMap.keySet().collect { TopicPartition k -> k.topic }
|
||||
def metadata = ClientUtils.fetchTopicMetadata(toScalaSet(new HashSet(topics)),
|
||||
brokersSeq,
|
||||
KAFKA_CLIENT_ID,
|
||||
KAFKA_TIMEOUT,
|
||||
0)
|
||||
|
||||
withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f ->
|
||||
withScalaCollection(f.partitionsMetadata).each { p ->
|
||||
Long offset = latestFromLeader(p.leader.get()?.id, f.topic, p.partitionId)
|
||||
TopicPartition tp = new TopicPartition(f.topic, p.partitionId)
|
||||
|
||||
this.consumersMap[tp].each { zk.ConsumerOffset c ->
|
||||
logger.debug("Values for ${c.groupName} on ${tp.topic}:${tp.partition}: ${offset} - ${c.offset}")
|
||||
Long delta = offset - c.offset
|
||||
this.onDelta.each { Closure callback ->
|
||||
callback.call(c.groupName, tp, delta)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug("finished dumping meta-data")
|
||||
}
|
||||
|
||||
|
||||
Long latestFromLeader(Integer leaderId, String topic, Integer partition) {
|
||||
SimpleConsumer consumer = this.brokerConsumerMap[leaderId]
|
||||
TopicAndPartition topicAndPart = new TopicAndPartition(topic, partition)
|
||||
/* XXX: A zero clientId into this method might not be right */
|
||||
return consumer.earliestOrLatestOffset(topicAndPart, -1, 0)
|
||||
}
|
||||
|
||||
Iterable withScalaCollection(scala.collection.Iterable iter) {
|
||||
return JavaConversions.asJavaIterable(iter)
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocking reconnect to the Kafka brokers
|
||||
*/
|
||||
void reconnect() {
|
||||
logger.info("Creating SimpleConsumer connections for brokers")
|
||||
this.brokers.each { Broker b ->
|
||||
SimpleConsumer consumer = new SimpleConsumer(b.host,
|
||||
b.port,
|
||||
KAFKA_TIMEOUT,
|
||||
KAFKA_BUFFER,
|
||||
KAFKA_CLIENT_ID)
|
||||
consumer.connect()
|
||||
this.brokerConsumerMap[b.id] = consumer
|
||||
}
|
||||
this.shouldReconnect =false
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the brokers list as an immutable Seq collection for the Kafka
|
||||
* scala underpinnings
|
||||
*/
|
||||
scala.collection.immutable.Seq getBrokersSeq() {
|
||||
return JavaConversions.asScalaBuffer(this.brokers).toList()
|
||||
}
|
||||
|
||||
/**
|
||||
* Return scala.collection.mutable.Set for the given List
|
||||
*/
|
||||
scala.collection.mutable.Set toScalaSet(Set set) {
|
||||
return JavaConversions.asScalaSet(set)
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal the runloop to safely die after it's next iteration
|
||||
*/
|
||||
void die() {
|
||||
this.keepRunning = false
|
||||
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
|
||||
client.disconnect()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Store a new list of KafkaBroker objects and signal a reconnection
|
||||
*/
|
||||
void refresh(List<KafkaBroker> brokers) {
|
||||
this.brokers = brokers.collect { KafkaBroker b ->
|
||||
new Broker(b.brokerId, b.host, b.port)
|
||||
}
|
||||
this.shouldReconnect = true
|
||||
}
|
||||
}
|
|
@ -1,174 +0,0 @@
|
|||
package com.github.lookout.verspaetung
|
||||
|
||||
import com.github.lookout.verspaetung.zk.BrokerTreeWatcher
|
||||
import com.github.lookout.verspaetung.zk.KafkaSpoutTreeWatcher
|
||||
import com.github.lookout.verspaetung.zk.StandardTreeWatcher
|
||||
|
||||
import java.util.AbstractMap
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
import com.timgroup.statsd.StatsDClient
|
||||
import com.timgroup.statsd.NonBlockingDogStatsDClient
|
||||
|
||||
import org.apache.commons.cli.*
|
||||
import org.apache.curator.retry.ExponentialBackoffRetry
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.recipes.cache.TreeCache
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
class Main {
|
||||
private static final String METRICS_PREFIX = 'verspaetung'
|
||||
|
||||
private static StatsDClient statsd
|
||||
private static Logger logger
|
||||
|
||||
static void main(String[] args) {
|
||||
String zookeeperHosts = 'localhost:2181'
|
||||
String statsdHost = 'localhost'
|
||||
Integer statsdPort = 8125
|
||||
|
||||
CommandLine cli = parseCommandLine(args)
|
||||
|
||||
if (cli.hasOption('z')) {
|
||||
zookeeperHosts = cli.getOptionValue('z')
|
||||
}
|
||||
|
||||
if (cli.hasOption('H')) {
|
||||
statsdHost = cli.getOptionValue('H')
|
||||
}
|
||||
|
||||
if (cli.hasOption('p')) {
|
||||
statsdPort = cli.getOptionValue('p')
|
||||
}
|
||||
|
||||
logger = LoggerFactory.getLogger(Main.class)
|
||||
logger.info("Running with: ${args}")
|
||||
logger.warn("Using: zookeepers=${zookeeperHosts} statsd=${statsdHost}:${statsdPort}")
|
||||
|
||||
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3)
|
||||
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperHosts, retry)
|
||||
ConcurrentHashMap<TopicPartition, List<zk.ConsumerOffset>> consumers = new ConcurrentHashMap()
|
||||
|
||||
statsd = new NonBlockingDogStatsDClient(METRICS_PREFIX, statsdHost, statsdPort)
|
||||
|
||||
client.start()
|
||||
|
||||
KafkaPoller poller = setupKafkaPoller(consumers, statsd, cli.hasOption('n'))
|
||||
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client).start()
|
||||
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, consumers).start()
|
||||
|
||||
/* Assuming that most people aren't needing to run Storm-based watchers
|
||||
* as well
|
||||
*/
|
||||
if (cli.hasOption('s')) {
|
||||
KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, consumers)
|
||||
stormWatcher.start()
|
||||
}
|
||||
|
||||
consumerWatcher.onInitComplete << {
|
||||
logger.info("standard consumers initialized to ${consumers.size()} (topic, partition) tuples")
|
||||
}
|
||||
|
||||
brokerWatcher.onBrokerUpdates << { brokers ->
|
||||
poller.refresh(brokers)
|
||||
}
|
||||
|
||||
logger.info("Started wait loop...")
|
||||
|
||||
while (true) {
|
||||
statsd?.recordGaugeValue('heartbeat', 1)
|
||||
Thread.sleep(1 * 1000)
|
||||
}
|
||||
|
||||
logger.info("exiting..")
|
||||
poller.die()
|
||||
poller.join()
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create and start a KafkaPoller with the given statsd and consumers map
|
||||
*/
|
||||
static KafkaPoller setupKafkaPoller(AbstractMap consumers,
|
||||
NonBlockingDogStatsDClient statsd,
|
||||
Boolean dryRun) {
|
||||
KafkaPoller poller = new KafkaPoller(consumers)
|
||||
Closure deltaCallback = { String name, TopicPartition tp, Long delta ->
|
||||
println "${tp.topic}:${tp.partition}-${name} = ${delta}"
|
||||
}
|
||||
|
||||
if (!dryRun) {
|
||||
deltaCallback = { String name, TopicPartition tp, Long delta ->
|
||||
statsd.recordGaugeValue(tp.topic, delta, [
|
||||
'topic' : tp.topic,
|
||||
'partition' : tp.partition,
|
||||
'consumer-group' : name
|
||||
])
|
||||
}
|
||||
}
|
||||
|
||||
poller.onDelta << deltaCallback
|
||||
poller.start()
|
||||
return poller
|
||||
}
|
||||
|
||||
static Options createCLI() {
|
||||
Options options = new Options()
|
||||
|
||||
Option zookeeper = OptionBuilder.withArgName('HOSTS')
|
||||
.hasArg()
|
||||
.withDescription('Comma separated list of Zookeeper hosts (e.g. localhost:2181)')
|
||||
.withLongOpt('zookeeper')
|
||||
.withValueSeparator(',' as char)
|
||||
.create('z')
|
||||
|
||||
Option statsdHost = OptionBuilder.withArgName('STATSD')
|
||||
.hasArg()
|
||||
.withType(String)
|
||||
.withDescription('Hostname for a statsd instance (defaults to localhost)')
|
||||
.withLongOpt('statsd-host')
|
||||
.create('H')
|
||||
|
||||
Option statsdPort = OptionBuilder.withArgName('PORT')
|
||||
.hasArg()
|
||||
.withType(Integer)
|
||||
.withDescription('Port for the statsd instance (defaults to 8125)')
|
||||
.withLongOpt('statsd-port')
|
||||
.create('p')
|
||||
|
||||
Option dryRun = OptionBuilder.withDescription('Disable reporting to a statsd host')
|
||||
.withLongOpt('dry-run')
|
||||
.create('n')
|
||||
|
||||
Option stormSpouts = OptionBuilder.withDescription('Watch Storm KafkaSpout offsets (under /kafka_spout)')
|
||||
.withLongOpt('storm')
|
||||
.create('s')
|
||||
|
||||
options.addOption(zookeeper)
|
||||
options.addOption(statsdHost)
|
||||
options.addOption(statsdPort)
|
||||
options.addOption(dryRun)
|
||||
options.addOption(stormSpouts)
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
static CommandLine parseCommandLine(String[] args) {
|
||||
Options options = createCLI()
|
||||
PosixParser parser = new PosixParser()
|
||||
|
||||
try {
|
||||
return parser.parse(options, args)
|
||||
}
|
||||
catch (MissingOptionException|UnrecognizedOptionException ex) {
|
||||
HelpFormatter formatter = new HelpFormatter()
|
||||
println ex.message
|
||||
formatter.printHelp('verspaetung', options)
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,95 +0,0 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
|
||||
import com.github.lookout.verspaetung.KafkaBroker
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
import groovy.transform.TypeChecked
|
||||
import java.util.Collections
|
||||
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCache
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheListener
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
/**
|
||||
* The BrokerTreeWatcher is a kind of watcher whose sole purpose is
|
||||
* to watch the segment of the Zookeeper tree where Kafka stores broker
|
||||
* information
|
||||
*/
|
||||
@TypeChecked
|
||||
class BrokerTreeWatcher extends AbstractTreeWatcher {
|
||||
static final Integer INVALID_BROKER_ID = -1
|
||||
private static final String BROKERS_PATH = '/brokers/ids'
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(BrokerTreeWatcher.class)
|
||||
private JsonSlurper json
|
||||
private List<Closure> onBrokerUpdates
|
||||
private Boolean isTreeInitialized = false
|
||||
private List<KafkaBroker> brokers
|
||||
|
||||
BrokerTreeWatcher(CuratorFramework client) {
|
||||
super(client)
|
||||
|
||||
this.json = new JsonSlurper()
|
||||
this.brokers = []
|
||||
this.onBrokerUpdates = []
|
||||
}
|
||||
|
||||
|
||||
String zookeeperPath() {
|
||||
return BROKERS_PATH
|
||||
}
|
||||
|
||||
/**
|
||||
* Process events like NODE_ADDED and NODE_REMOVED to keep an up to date
|
||||
* list of brokers
|
||||
*/
|
||||
@Override
|
||||
void childEvent(CuratorFramework client, TreeCacheEvent event) {
|
||||
/* If we're initialized that means we should have all our brokers in
|
||||
* our internal list already and we can fire an event
|
||||
*/
|
||||
if (event.type == TreeCacheEvent.Type.INITIALIZED) {
|
||||
this.isTreeInitialized = true
|
||||
List threadsafeBrokers = Collections.synchronizedList(this.brokers)
|
||||
this.onBrokerUpdates.each { Closure c ->
|
||||
c?.call(threadsafeBrokers)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if (event.type != TreeCacheEvent.Type.NODE_ADDED) {
|
||||
return
|
||||
}
|
||||
|
||||
ChildData nodeData = event.data
|
||||
Integer brokerId = brokerIdFromPath(nodeData.path)
|
||||
|
||||
if (brokerId == INVALID_BROKER_ID) {
|
||||
return
|
||||
}
|
||||
|
||||
Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8'))
|
||||
|
||||
this.brokers << new KafkaBroker(brokerData, brokerId)
|
||||
}
|
||||
|
||||
/**
|
||||
* Process a path string from Zookeeper for the Kafka broker's ID
|
||||
*
|
||||
* We're expecting paths like: /brokers/ids/1231524312
|
||||
*/
|
||||
Integer brokerIdFromPath(String path) {
|
||||
List<String> pathParts = path?.split('/') as List
|
||||
|
||||
if ((pathParts == null) ||
|
||||
(pathParts.size() != 4)) {
|
||||
return INVALID_BROKER_ID
|
||||
}
|
||||
|
||||
return new Integer(pathParts[-1])
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package com.github.reiseburo.verspaetung
|
||||
|
||||
/**
|
||||
* abstract the logic on how to reduce the polling speed and get back to
|
||||
* to full speed.
|
||||
*/
|
||||
class Delay {
|
||||
static final Integer POLLER_DELAY_MIN = (1 * 1000)
|
||||
static final Integer POLLER_DELAY_MAX = (2048 * 1000) // about half an hour
|
||||
|
||||
private Integer delay = POLLER_DELAY_MIN
|
||||
|
||||
boolean reset() {
|
||||
if (delay != POLLER_DELAY_MIN) {
|
||||
delay = POLLER_DELAY_MIN
|
||||
true
|
||||
}
|
||||
else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
boolean slower() {
|
||||
if (delay < POLLER_DELAY_MAX) {
|
||||
delay += delay
|
||||
true
|
||||
}
|
||||
else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
Integer value() {
|
||||
delay
|
||||
}
|
||||
|
||||
String toString() {
|
||||
"Delay[ ${delay / 1000} sec ]"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package com.github.reiseburo.verspaetung
|
||||
|
||||
/**
|
||||
* POJO containing the necessary information to model a Kafka broker
|
||||
*/
|
||||
class KafkaBroker {
|
||||
final private String host
|
||||
final private Integer port
|
||||
final private Integer brokerId
|
||||
|
||||
KafkaBroker(String host, Integer port, Integer brokerId) {
|
||||
this.host = host
|
||||
this.port = port
|
||||
this.brokerId = brokerId
|
||||
}
|
||||
|
||||
@Override
|
||||
int hashCode() {
|
||||
return this.brokerId
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean equals(Object compared) {
|
||||
if (this.is(compared)) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (!(compared instanceof KafkaBroker)) {
|
||||
return false
|
||||
}
|
||||
|
||||
return compared.brokerId == brokerId
|
||||
}
|
||||
|
||||
@Override
|
||||
String toString() {
|
||||
return "broker<${this.brokerId}>@${this.host}:${this.port}"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package com.github.reiseburo.verspaetung
|
||||
|
||||
/**
|
||||
* POJO containing the necessary information to model a Kafka consumers
|
||||
*/
|
||||
class KafkaConsumer {
|
||||
String topic
|
||||
Integer partition
|
||||
String name
|
||||
|
||||
KafkaConsumer(String topic, Integer partition, String name) {
|
||||
this.topic = topic
|
||||
this.partition = partition
|
||||
this.name = name
|
||||
}
|
||||
|
||||
@Override
|
||||
String toString() {
|
||||
return "KafkaConsumer<${topic}:${partition} - ${name}>"
|
||||
}
|
||||
|
||||
@Override
|
||||
int hashCode() {
|
||||
return Objects.hash(this.topic, this.partition, this.name)
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true for any two KafkaConsumer instances which have the same
|
||||
* topic, partition and name properties
|
||||
*/
|
||||
@Override
|
||||
boolean equals(Object compared) {
|
||||
/* bail early for object identity */
|
||||
if (this.is(compared)) {
|
||||
return true
|
||||
}
|
||||
|
||||
if (!(compared instanceof KafkaConsumer)) {
|
||||
return false
|
||||
}
|
||||
|
||||
return (this.topic == compared.topic) &&
|
||||
(this.partition == compared.partition) &&
|
||||
(this.name == compared.name)
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,242 @@
|
|||
package com.github.reiseburo.verspaetung
|
||||
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import kafka.cluster.Broker
|
||||
import kafka.client.ClientUtils
|
||||
import kafka.consumer.SimpleConsumer
|
||||
import kafka.common.TopicAndPartition
|
||||
import kafka.common.KafkaException
|
||||
/* UGH */
|
||||
import scala.collection.JavaConversions
|
||||
|
||||
/**
|
||||
* KafkaPoller is a relatively simple class which contains a runloop for periodically
|
||||
* contacting the Kafka brokers defined in Zookeeper and fetching the latest topic
|
||||
* meta-data for them
|
||||
*/
|
||||
class KafkaPoller extends Thread {
|
||||
|
||||
private static final String KAFKA_CLIENT_ID = 'VerspaetungClient'
|
||||
private static final Integer KAFKA_TIMEOUT = (5 * 1000)
|
||||
private static final Integer KAFKA_BUFFER = (100 * 1024)
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPoller)
|
||||
|
||||
private Boolean keepRunning = true
|
||||
private Boolean shouldReconnect = false
|
||||
private final AbstractMap<Integer, SimpleConsumer> brokerConsumerMap
|
||||
private final AbstractMap<TopicPartition, Long> topicOffsetMap
|
||||
private final List<Closure> onDelta
|
||||
private final AbstractSet<String> currentTopics
|
||||
private final List<Broker> brokers
|
||||
|
||||
KafkaPoller(AbstractMap map, AbstractSet topicSet) {
|
||||
this.topicOffsetMap = map
|
||||
this.currentTopics = topicSet
|
||||
this.brokerConsumerMap = [:]
|
||||
this.brokers = []
|
||||
this.onDelta = []
|
||||
setName('Verspaetung Kafka Poller')
|
||||
}
|
||||
|
||||
/* There are a number of cases where we intentionally swallow stacktraces
|
||||
* to ensure that we're not unnecessarily spamming logs with useless stacktraces
|
||||
*
|
||||
* For CatchException, we're explicitly gobbling up all potentially crashing exceptions
|
||||
* here
|
||||
*/
|
||||
@SuppressWarnings(['LoggingSwallowsStacktrace', 'CatchException'])
|
||||
void run() {
|
||||
LOGGER.info('Starting wait loop')
|
||||
Delay delay = new Delay()
|
||||
LOGGER.error('polling ' + delay)
|
||||
while (keepRunning) {
|
||||
LOGGER.debug('poll loop')
|
||||
|
||||
if (shouldReconnect) {
|
||||
reconnect()
|
||||
}
|
||||
|
||||
/* Only makes sense to try to dump meta-data if we've got some
|
||||
* topics that we should keep an eye on
|
||||
*/
|
||||
if (this.currentTopics.size() > 0) {
|
||||
try {
|
||||
dumpMetadata()
|
||||
if (delay.reset()) {
|
||||
LOGGER.error('back to normal ' + delay)
|
||||
}
|
||||
}
|
||||
catch (KafkaException kex) {
|
||||
LOGGER.error('Failed to interact with Kafka: {}', kex.message)
|
||||
slower(delay)
|
||||
}
|
||||
catch (Exception ex) {
|
||||
LOGGER.error('Failed to fetch and dump Kafka metadata', ex)
|
||||
slower(delay)
|
||||
}
|
||||
}
|
||||
|
||||
Thread.sleep(delay.value())
|
||||
}
|
||||
disconnectConsumers()
|
||||
}
|
||||
|
||||
private void slower(Delay delay) {
|
||||
if (delay.slower()) {
|
||||
LOGGER.error('using ' + delay)
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings('CatchException')
|
||||
private void dumpMetadata() {
|
||||
LOGGER.debug('dumping meta-data')
|
||||
|
||||
Object metadata = fetchMetadataForCurrentTopics()
|
||||
|
||||
withTopicsAndPartitions(metadata) { tp, p ->
|
||||
try {
|
||||
captureLatestOffsetFor(tp, p)
|
||||
}
|
||||
catch (Exception ex) {
|
||||
LOGGER.error('Failed to fetch latest for {}:{}', tp.topic, tp.partition, ex)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.debug('finished dumping meta-data')
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke the given closure with the TopicPartition and Partition meta-data
|
||||
* informationn for all of the topic meta-data that was passed in.
|
||||
*
|
||||
* The 'metadata' is the expected return from
|
||||
* kafka.client.ClientUtils.fetchTopicMetadata
|
||||
*/
|
||||
private void withTopicsAndPartitions(Object metadata, Closure closure) {
|
||||
withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f ->
|
||||
withScalaCollection(f.partitionsMetadata).each { p ->
|
||||
TopicPartition tp = new TopicPartition(f.topic, p.partitionId)
|
||||
closure.call(tp, p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the leader metadata and update our data structures
|
||||
*/
|
||||
private void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) {
|
||||
Integer leaderId = partitionMetadata.leader.get()?.id
|
||||
Integer partitionId = partitionMetadata.partitionId
|
||||
|
||||
Long offset = latestFromLeader(leaderId, tp.topic, partitionId)
|
||||
|
||||
this.topicOffsetMap[tp] = offset
|
||||
}
|
||||
|
||||
private Long latestFromLeader(Integer leaderId, String topic, Integer partition) {
|
||||
SimpleConsumer consumer = this.brokerConsumerMap[leaderId]
|
||||
|
||||
/* If we don't have a proper SimpleConsumer instance (e.g. null) then
|
||||
* we might not have gotten valid data back from Zookeeper
|
||||
*/
|
||||
if (!(consumer instanceof SimpleConsumer)) {
|
||||
LOGGER.warn('Attempted to the leaderId: {} ({}/{})', leaderId, topic, partition)
|
||||
return 0
|
||||
}
|
||||
TopicAndPartition topicAndPart = new TopicAndPartition(topic, partition)
|
||||
/* XXX: A zero clientId into this method might not be right */
|
||||
return consumer.earliestOrLatestOffset(topicAndPart, -1, 0)
|
||||
}
|
||||
|
||||
private Iterable withScalaCollection(scala.collection.Iterable iter) {
|
||||
return JavaConversions.asJavaIterable(iter)
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocking reconnect to the Kafka brokers
|
||||
*/
|
||||
@SuppressWarnings('CatchException')
|
||||
private void reconnect() {
|
||||
disconnectConsumers()
|
||||
LOGGER.info('Creating SimpleConsumer connections for brokers {}', this.brokers)
|
||||
synchronized(this.brokers) {
|
||||
this.brokers.each { Broker broker ->
|
||||
SimpleConsumer consumer = new SimpleConsumer(broker.host,
|
||||
broker.port,
|
||||
KAFKA_TIMEOUT,
|
||||
KAFKA_BUFFER,
|
||||
KAFKA_CLIENT_ID)
|
||||
try {
|
||||
consumer.connect()
|
||||
this.brokerConsumerMap[broker.id] = consumer
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOGGER.info('Error connecting cunsumer to {}', broker, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
this.shouldReconnect = false
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal the runloop to safely die after it's next iteration
|
||||
*/
|
||||
void die() {
|
||||
this.keepRunning = false
|
||||
}
|
||||
|
||||
@SuppressWarnings('CatchException')
|
||||
private void disconnectConsumers() {
|
||||
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
|
||||
LOGGER.info('Disconnecting {}', client)
|
||||
try {
|
||||
client?.disconnect()
|
||||
}
|
||||
catch (Exception e) {
|
||||
LOGGER.info('Error disconnecting {}', client, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Store a new list of KafkaBroker objects and signal a reconnection
|
||||
*/
|
||||
void refresh(List<KafkaBroker> brokers) {
|
||||
synchronized(this.brokers) {
|
||||
this.brokers.clear()
|
||||
this.brokers.addAll(brokers.collect { KafkaBroker b ->
|
||||
new Broker(b.brokerId, b.host, b.port)
|
||||
})
|
||||
}
|
||||
this.shouldReconnect = true
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the brokers list as an immutable Seq collection for the Kafka
|
||||
* scala underpinnings
|
||||
*/
|
||||
private scala.collection.immutable.Seq getBrokersSeq() {
|
||||
synchronized(this.brokers) {
|
||||
return JavaConversions.asScalaBuffer(this.brokers).toList()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return scala.collection.mutable.Set for the given List
|
||||
*/
|
||||
private scala.collection.mutable.Set toScalaSet(Set set) {
|
||||
return JavaConversions.asScalaSet(set)
|
||||
}
|
||||
|
||||
private Object fetchMetadataForCurrentTopics() {
|
||||
return ClientUtils.fetchTopicMetadata(
|
||||
toScalaSet(currentTopics),
|
||||
brokersSeq,
|
||||
KAFKA_CLIENT_ID,
|
||||
KAFKA_TIMEOUT,
|
||||
0)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,287 @@
|
|||
package com.github.reiseburo.verspaetung
|
||||
|
||||
import com.github.reiseburo.verspaetung.zk.BrokerTreeWatcher
|
||||
import com.github.reiseburo.verspaetung.zk.KafkaSpoutTreeWatcher
|
||||
import com.github.reiseburo.verspaetung.zk.StandardTreeWatcher
|
||||
import com.github.reiseburo.verspaetung.metrics.ConsumerGauge
|
||||
import com.github.reiseburo.verspaetung.metrics.HeartbeatGauge
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.ConcurrentSkipListSet
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import org.apache.commons.cli.*
|
||||
import org.apache.curator.retry.ExponentialBackoffRetry
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.coursera.metrics.datadog.DatadogReporter
|
||||
import org.coursera.metrics.datadog.transport.UdpTransport
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import com.codahale.metrics.*
|
||||
|
||||
/**
|
||||
* Main entry point for running the verspaetung application
|
||||
*/
|
||||
@SuppressWarnings
|
||||
class Main {
|
||||
private static final String METRICS_PREFIX = 'verspaetung'
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(Main)
|
||||
private static final MetricRegistry registry = new MetricRegistry()
|
||||
private static ScheduledReporter reporter
|
||||
|
||||
static void main(String[] args) {
|
||||
String statsdPrefix = METRICS_PREFIX
|
||||
String zookeeperHosts = 'localhost:2181'
|
||||
String statsdHost = 'localhost'
|
||||
Integer statsdPort = 8125
|
||||
Integer delayInSeconds = 5
|
||||
String[] excludeGroups = []
|
||||
|
||||
CommandLine cli = parseCommandLine(args)
|
||||
|
||||
if (cli.hasOption('z')) {
|
||||
zookeeperHosts = cli.getOptionValue('z')
|
||||
}
|
||||
|
||||
if (cli.hasOption('H')) {
|
||||
statsdHost = cli.getOptionValue('H')
|
||||
}
|
||||
|
||||
if (cli.hasOption('p')) {
|
||||
statsdPort = cli.getOptionValue('p')
|
||||
}
|
||||
|
||||
if (cli.hasOption('d')) {
|
||||
delayInSeconds = cli.getOptionValue('d').toInteger()
|
||||
}
|
||||
|
||||
if (cli.hasOption('x')) {
|
||||
excludeGroups = cli.getOptionValues('x')
|
||||
}
|
||||
|
||||
logger.info("Running with: ${args}")
|
||||
logger.warn('Using: zookeepers={} statsd={}:{}', zookeeperHosts, statsdHost, statsdPort)
|
||||
logger.info('Reporting every {} seconds', delayInSeconds)
|
||||
|
||||
if (cli.hasOption('prefix')) {
|
||||
statsdPrefix = "${cli.getOptionValue('prefix')}.${METRICS_PREFIX}"
|
||||
}
|
||||
|
||||
registry.register(MetricRegistry.name(Main.class, 'heartbeat'),
|
||||
new HeartbeatGauge())
|
||||
|
||||
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3)
|
||||
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperHosts, retry)
|
||||
client.start()
|
||||
|
||||
/* We need a good shared set of all the topics we should keep an eye on
|
||||
* for the Kafka poller. This will be written to by the tree watchers
|
||||
* and read from by the poller, e.g.
|
||||
* Watcher --/write/--> watchedTopics --/read/--> KafkaPoller
|
||||
*/
|
||||
ConcurrentSkipListSet<String> watchedTopics = new ConcurrentSkipListSet<>()
|
||||
|
||||
/* consumerOffsets is where we will keep all the offsets from Zookeeper
|
||||
* from the Kafka consumers
|
||||
*/
|
||||
ConcurrentHashMap<KafkaConsumer, Integer> consumerOffsets = new ConcurrentHashMap<>()
|
||||
|
||||
/* topicOffsets is where the KafkaPoller should be writing all of it's
|
||||
* latest offsets from querying the Kafka brokers
|
||||
*/
|
||||
ConcurrentHashMap<TopicPartition, Long> topicOffsets = new ConcurrentHashMap<>()
|
||||
|
||||
/* Hash map for keeping track of KafkaConsumer to ConsumerGauge
|
||||
* instances. We're only really doing this because the MetricRegistry
|
||||
* doesn't do a terrific job of exposing this for us
|
||||
*/
|
||||
ConcurrentHashMap<KafkaConsumer, ConsumerGauge> consumerGauges = new ConcurrentHashMap<>()
|
||||
|
||||
|
||||
KafkaPoller poller = new KafkaPoller(topicOffsets, watchedTopics)
|
||||
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client).start()
|
||||
brokerWatcher.onBrokerUpdates << { brokers -> poller.refresh(brokers) }
|
||||
|
||||
poller.start()
|
||||
|
||||
/* Need to reuse this closure for the KafkaSpoutTreeWatcher if we have
|
||||
* one
|
||||
*/
|
||||
Closure gaugeRegistrar = { KafkaConsumer consumer ->
|
||||
if (!shouldExcludeConsumer(excludeGroups, consumer)) {
|
||||
registerMetricFor(consumer, consumerGauges, consumerOffsets, topicOffsets)
|
||||
}
|
||||
}
|
||||
|
||||
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client,
|
||||
watchedTopics,
|
||||
consumerOffsets)
|
||||
consumerWatcher.onConsumerData << gaugeRegistrar
|
||||
consumerWatcher.start()
|
||||
|
||||
|
||||
/* Assuming that most people aren't needing to run Storm-based watchers
|
||||
* as well
|
||||
*/
|
||||
KafkaSpoutTreeWatcher stormWatcher = null
|
||||
if (cli.hasOption('s')) {
|
||||
stormWatcher = new KafkaSpoutTreeWatcher(client,
|
||||
watchedTopics,
|
||||
consumerOffsets)
|
||||
stormWatcher.onConsumerData << gaugeRegistrar
|
||||
stormWatcher.start()
|
||||
}
|
||||
|
||||
|
||||
if (cli.hasOption('n')) {
|
||||
reporter = ConsoleReporter.forRegistry(registry)
|
||||
.convertRatesTo(TimeUnit.SECONDS)
|
||||
.convertDurationsTo(TimeUnit.MILLISECONDS)
|
||||
.build()
|
||||
}
|
||||
else {
|
||||
UdpTransport transport = new UdpTransport.Builder()
|
||||
.withPrefix(statsdPrefix)
|
||||
.build()
|
||||
|
||||
reporter = DatadogReporter.forRegistry(registry)
|
||||
.withEC2Host()
|
||||
.withTransport(transport)
|
||||
.build()
|
||||
}
|
||||
|
||||
/* Start the reporter if we've got it */
|
||||
reporter?.start(delayInSeconds, TimeUnit.SECONDS)
|
||||
|
||||
// shutdown threads
|
||||
Runtime.getRuntime().addShutdownHook(new Thread() {
|
||||
public void run() {
|
||||
Main.logger.info("showdown threads")
|
||||
poller.die()
|
||||
consumerWatcher.close()
|
||||
if (stormWatcher != null) {
|
||||
stormWatcher.close()
|
||||
}
|
||||
poller.join()
|
||||
}
|
||||
});
|
||||
logger.info('Starting wait loop...')
|
||||
}
|
||||
|
||||
static void registerMetricFor(KafkaConsumer consumer,
|
||||
ConcurrentHashMap<KafkaConsumer, ConsumerGauge> consumerGauges,
|
||||
ConcurrentHashMap<KafkaConsumer, Integer> consumerOffsets,
|
||||
ConcurrentHashMap<TopicPartition, Long> topicOffsets) {
|
||||
/*
|
||||
* Bail early if we already ahve our Consumer registered
|
||||
*/
|
||||
if (consumerGauges.containsKey(consumer)) {
|
||||
return
|
||||
}
|
||||
|
||||
ConsumerGauge gauge = new ConsumerGauge(consumer,
|
||||
consumerOffsets,
|
||||
topicOffsets)
|
||||
consumerGauges.put(consumer, gauge)
|
||||
this.registry.register(gauge.nameForRegistry, gauge)
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Create the Options option necessary for verspaetung to have CLI options
|
||||
*/
|
||||
static Options createCLI() {
|
||||
Options options = new Options()
|
||||
|
||||
Option zookeeper = OptionBuilder.withArgName('HOSTS')
|
||||
.hasArg()
|
||||
.withDescription('Comma separated list of Zookeeper hosts (e.g. localhost:2181)')
|
||||
.withLongOpt('zookeeper')
|
||||
.withValueSeparator(',' as char)
|
||||
.create('z')
|
||||
|
||||
Option excludeGroups = OptionBuilder.withArgName('EXCLUDES')
|
||||
.hasArgs()
|
||||
.withDescription('Regular expression for consumer groups to exclude from reporting (can be declared multiple times)')
|
||||
.withLongOpt('exclude')
|
||||
.create('x')
|
||||
|
||||
Option statsdHost = OptionBuilder.withArgName('STATSD')
|
||||
.hasArg()
|
||||
.withType(String)
|
||||
.withDescription('Hostname for a statsd instance (defaults to localhost)')
|
||||
.withLongOpt('statsd-host')
|
||||
.create('H')
|
||||
|
||||
Option statsdPort = OptionBuilder.withArgName('PORT')
|
||||
.hasArg()
|
||||
.withType(Integer)
|
||||
.withDescription('Port for the statsd instance (defaults to 8125)')
|
||||
.withLongOpt('statsd-port')
|
||||
.create('p')
|
||||
|
||||
Option dryRun = OptionBuilder.withDescription('Disable reporting to a statsd host')
|
||||
.withLongOpt('dry-run')
|
||||
.create('n')
|
||||
|
||||
Option stormSpouts = OptionBuilder.withDescription('Watch Storm KafkaSpout offsets (under /kafka_spout)')
|
||||
.withLongOpt('storm')
|
||||
.create('s')
|
||||
|
||||
Option statsdPrefix = OptionBuilder.withArgName('PREFIX')
|
||||
.hasArg()
|
||||
.withType(String)
|
||||
.withDescription("Prefix all metrics with PREFIX before they're reported (e.g. PREFIX.verspaetung.mytopic)")
|
||||
.withLongOpt('prefix')
|
||||
.create()
|
||||
|
||||
Option delaySeconds = OptionBuilder.withArgName('DELAY')
|
||||
.hasArg()
|
||||
.withType(Integer)
|
||||
.withDescription("Seconds to delay between reporting metrics to the metrics receiver (defaults: 5s)")
|
||||
.withLongOpt('delay')
|
||||
.create('d')
|
||||
|
||||
options.addOption(zookeeper)
|
||||
options.addOption(statsdHost)
|
||||
options.addOption(statsdPort)
|
||||
options.addOption(statsdPrefix)
|
||||
options.addOption(dryRun)
|
||||
options.addOption(stormSpouts)
|
||||
options.addOption(delaySeconds)
|
||||
options.addOption(excludeGroups)
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Parse out all the command line options from the array of string
|
||||
* arguments
|
||||
*/
|
||||
static CommandLine parseCommandLine(String[] args) {
|
||||
Options options = createCLI()
|
||||
PosixParser parser = new PosixParser()
|
||||
|
||||
try {
|
||||
return parser.parse(options, args)
|
||||
}
|
||||
catch (MissingOptionException|UnrecognizedOptionException ex) {
|
||||
HelpFormatter formatter = new HelpFormatter()
|
||||
println ex.message
|
||||
formatter.printHelp('verspaetung', options)
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true if we should exclude the given KafkaConsumer from reporting
|
||||
*/
|
||||
|
||||
static boolean shouldExcludeConsumer(String[] excludeGroups, KafkaConsumer consumer) {
|
||||
return null != excludeGroups?.find { String excludeRule -> consumer?.name.matches(excludeRule) }
|
||||
}
|
||||
}
|
|
@ -1,11 +1,11 @@
|
|||
package com.github.lookout.verspaetung
|
||||
package com.github.reiseburo.verspaetung
|
||||
|
||||
/**
|
||||
* Simple container for Kafka topic names and partition IDs
|
||||
*/
|
||||
class TopicPartition {
|
||||
private String topic
|
||||
private Integer partition
|
||||
private final String topic
|
||||
private final Integer partition
|
||||
|
||||
TopicPartition(String topic, Integer partition) {
|
||||
this.topic = topic
|
||||
|
@ -27,12 +27,8 @@ class TopicPartition {
|
|||
return false
|
||||
}
|
||||
|
||||
if ((this.topic == compared.topic) &&
|
||||
(this.partition == compared.partition)) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
return (this.topic == compared.topic) &&
|
||||
(this.partition == compared.partition)
|
||||
}
|
||||
|
||||
@Override
|
|
@ -0,0 +1,73 @@
|
|||
package com.github.reiseburo.verspaetung.metrics
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
import groovy.transform.TypeChecked
|
||||
import org.coursera.metrics.datadog.Tagged
|
||||
|
||||
import com.github.reiseburo.verspaetung.KafkaConsumer
|
||||
import com.github.reiseburo.verspaetung.TopicPartition
|
||||
|
||||
/**
|
||||
* Dropwizard Metrics Gauge for reporting the value of a given KafkaConsumer
|
||||
*/
|
||||
@TypeChecked
|
||||
class ConsumerGauge implements Gauge<Integer>, Tagged {
|
||||
protected KafkaConsumer consumer
|
||||
protected AbstractMap<KafkaConsumer, Integer> consumers
|
||||
protected AbstractMap<TopicPartition, Long> topics
|
||||
private final TopicPartition topicPartition
|
||||
|
||||
ConsumerGauge(KafkaConsumer consumer,
|
||||
AbstractMap<KafkaConsumer, Integer> consumers,
|
||||
AbstractMap<TopicPartition, Long> topics) {
|
||||
this.consumer = consumer
|
||||
this.consumers = consumers
|
||||
this.topics = topics
|
||||
|
||||
this.topicPartition = new TopicPartition(consumer.topic, consumer.partition)
|
||||
}
|
||||
|
||||
@Override
|
||||
Integer getValue() {
|
||||
if ((!this.consumers.containsKey(consumer)) ||
|
||||
(!this.topics.containsKey(topicPartition))) {
|
||||
return 0
|
||||
}
|
||||
|
||||
/*
|
||||
* Returning the maximum value of the computation and zero, there are
|
||||
* some cases where we can be "behind" on the Kafka latest offset
|
||||
* polling and this could result in an erroneous negative value. See:
|
||||
* <https://github.com/reiseburo/verspaetung/issues/25> for more details
|
||||
*/
|
||||
return Math.max(0,
|
||||
((Integer)this.topics[topicPartition]) - this.consumers[consumer])
|
||||
}
|
||||
|
||||
@Override
|
||||
List<String> getTags() {
|
||||
return ["partition:${this.consumer.partition}",
|
||||
"topic:${this.consumer.topic}",
|
||||
"consumer-group:${this.consumer.name}"
|
||||
]*.toString()
|
||||
}
|
||||
|
||||
/**
|
||||
* return a unique name for this gauge
|
||||
*/
|
||||
String getNameForRegistry() {
|
||||
return "${this.consumer.topic}.${this.consumer.partition}.${this.consumer.name}"
|
||||
}
|
||||
|
||||
@Override
|
||||
String getName() {
|
||||
return this.consumer.topic
|
||||
|
||||
/* need to return this if we're just using the console or statsd
|
||||
* reporters
|
||||
|
||||
return "${this.consumer.topic}.${this.consumer.partition}.${this.consumer.name}"
|
||||
|
||||
*/
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package com.github.reiseburo.verspaetung.metrics
|
||||
|
||||
import com.codahale.metrics.Gauge
|
||||
|
||||
/**
|
||||
* A simple gauge that will always just return 1 indicating that the process is
|
||||
* alive
|
||||
*/
|
||||
class HeartbeatGauge implements Gauge<Integer> {
|
||||
@Override
|
||||
Integer getValue() {
|
||||
return 1
|
||||
}
|
||||
}
|
|
@ -1,22 +1,27 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import com.github.lookout.verspaetung.TopicPartition
|
||||
import com.github.reiseburo.verspaetung.KafkaConsumer
|
||||
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
/**
|
||||
*/
|
||||
@TypeChecked
|
||||
abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
|
||||
protected AbstractMap<TopicPartition, List<ConsumerOffset>> consumersMap
|
||||
protected AbstractMap<KafkaConsumer, Integer> consumerOffsets
|
||||
protected AbstractSet<String> watchedTopics
|
||||
protected List<Closure> onConsumerData = []
|
||||
|
||||
AbstractConsumerTreeWatcher(CuratorFramework client,
|
||||
AbstractMap consumersMap) {
|
||||
protected AbstractConsumerTreeWatcher(CuratorFramework client,
|
||||
AbstractSet topics,
|
||||
AbstractMap offsets) {
|
||||
super(client)
|
||||
this.consumersMap = consumersMap
|
||||
this.watchedTopics = topics
|
||||
this.consumerOffsets = offsets
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -61,17 +66,18 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
|
|||
* this class on instantiation
|
||||
*/
|
||||
void trackConsumerOffset(ConsumerOffset offset) {
|
||||
if (this.consumersMap == null) {
|
||||
if (this.consumerOffsets == null) {
|
||||
return
|
||||
}
|
||||
|
||||
TopicPartition key = new TopicPartition(offset.topic, offset.partition)
|
||||
this.watchedTopics << offset.topic
|
||||
KafkaConsumer consumer = new KafkaConsumer(offset.topic,
|
||||
offset.partition,
|
||||
offset.groupName)
|
||||
this.consumerOffsets[consumer] = offset.offset
|
||||
|
||||
if (this.consumersMap.containsKey(key)) {
|
||||
this.consumersMap[key] << offset
|
||||
}
|
||||
else {
|
||||
this.consumersMap[key] = new CopyOnWriteArrayList([offset])
|
||||
this.onConsumerData.each { Closure c ->
|
||||
c.call(consumer)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -80,12 +86,8 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
|
|||
* we're interested in
|
||||
*/
|
||||
Boolean isNodeEvent(TreeCacheEvent event) {
|
||||
if ((event?.type == TreeCacheEvent.Type.NODE_ADDED) ||
|
||||
(event?.type == TreeCacheEvent.Type.NODE_UPDATED)) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
return (event?.type == TreeCacheEvent.Type.NODE_ADDED) ||
|
||||
(event?.type == TreeCacheEvent.Type.NODE_UPDATED)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
|
@ -16,19 +16,21 @@ import org.slf4j.LoggerFactory
|
|||
* further down the pipeline
|
||||
*/
|
||||
@TypeChecked
|
||||
@SuppressWarnings(['ThisReferenceEscapesConstructor'])
|
||||
abstract class AbstractTreeWatcher implements TreeCacheListener {
|
||||
protected List<Closure> onInitComplete
|
||||
protected Logger logger
|
||||
protected CuratorFramework client
|
||||
protected TreeCache cache
|
||||
|
||||
AbstractTreeWatcher(CuratorFramework client) {
|
||||
this.logger = LoggerFactory.getLogger(this.class)
|
||||
this.client = client
|
||||
this.onInitComplete = []
|
||||
protected AbstractTreeWatcher(CuratorFramework curatorClient) {
|
||||
logger = LoggerFactory.getLogger(this.class)
|
||||
client = curatorClient
|
||||
onInitComplete = []
|
||||
|
||||
this.cache = new TreeCache(client, zookeeperPath())
|
||||
this.cache.listenable.addListener(this)
|
||||
cache = new TreeCache(client, zookeeperPath())
|
||||
/* this may introduce a race condition, need to figure out a better way to handle it */
|
||||
cache.listenable.addListener(this)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -45,5 +47,13 @@ abstract class AbstractTreeWatcher implements TreeCacheListener {
|
|||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Close our internal cache and return ourselves for API cleanliness
|
||||
*/
|
||||
AbstractTreeWatcher close() {
|
||||
this.cache?.close()
|
||||
return this
|
||||
}
|
||||
|
||||
abstract void childEvent(CuratorFramework client, TreeCacheEvent event)
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
/**
|
||||
* manage a list of brokers. can be online or offline. offline means the
|
||||
* internal list is hidden, i.e. the list() gives you an empty list.
|
||||
*/
|
||||
@TypeChecked
|
||||
class BrokerManager {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(BrokerManager)
|
||||
|
||||
private final List<KafkaBroker> brokers = Collections.synchronizedList([])
|
||||
|
||||
// we start with being offline
|
||||
private boolean offline = true
|
||||
|
||||
void add(KafkaBroker broker) {
|
||||
synchronized(brokers) {
|
||||
if (broker != null && this.brokers.indexOf(broker) == -1) {
|
||||
this.brokers.add(broker)
|
||||
logger.info('broker added: {}', broker)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void update(KafkaBroker broker) {
|
||||
synchronized(brokers) {
|
||||
if (broker == null) {
|
||||
return
|
||||
}
|
||||
if (this.brokers.indexOf(broker) != -1) {
|
||||
this.brokers.remove(broker)
|
||||
}
|
||||
this.brokers.add(broker)
|
||||
logger.info('broker updated: {}', broker)
|
||||
}
|
||||
}
|
||||
|
||||
void remove(KafkaBroker broker) {
|
||||
synchronized(brokers) {
|
||||
if (broker != null && this.brokers.remove(broker)) {
|
||||
logger.info('broker removed: {}', broker)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO not sure if this is correct - see BrokerTreeWatcher
|
||||
@SuppressWarnings('ConfusingMethodName')
|
||||
void offline() {
|
||||
this.offline = true
|
||||
}
|
||||
|
||||
// TODO not sure if this is correct - see BrokerTreeWatcher
|
||||
void online() {
|
||||
this.offline = false
|
||||
}
|
||||
|
||||
Collection<KafkaBroker> list() {
|
||||
if (this.offline) {
|
||||
[]
|
||||
}
|
||||
else {
|
||||
this.brokers
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,72 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
/**
|
||||
* The BrokerTreeWatcher is a kind of watcher whose sole purpose is
|
||||
* to watch the segment of the Zookeeper tree where Kafka stores broker
|
||||
* information
|
||||
*/
|
||||
@TypeChecked
|
||||
class BrokerTreeWatcher extends AbstractTreeWatcher {
|
||||
private static final String BROKERS_PATH = '/brokers/ids'
|
||||
|
||||
private final List<Closure> onBrokerUpdates
|
||||
private final BrokerManager manager
|
||||
private final PojoFactory factory
|
||||
|
||||
BrokerTreeWatcher(CuratorFramework client) {
|
||||
super(client)
|
||||
|
||||
this.factory = new PojoFactory(new JsonSlurper())
|
||||
this.manager = new BrokerManager()
|
||||
this.onBrokerUpdates = []
|
||||
}
|
||||
|
||||
String zookeeperPath() {
|
||||
return BROKERS_PATH
|
||||
}
|
||||
|
||||
/**
|
||||
* Process events to keep an up to date list of brokers
|
||||
*/
|
||||
@Override
|
||||
void childEvent(CuratorFramework client, TreeCacheEvent event) {
|
||||
switch (event.type) {
|
||||
case TreeCacheEvent.Type.INITIALIZED:
|
||||
this.manager.online()
|
||||
break
|
||||
case TreeCacheEvent.Type.NODE_ADDED:
|
||||
KafkaBroker broker = this.factory.createKafkaBroker(event)
|
||||
this.manager.add(broker)
|
||||
break
|
||||
case TreeCacheEvent.Type.NODE_UPDATED:
|
||||
KafkaBroker broker = this.factory.createKafkaBroker(event)
|
||||
this.manager.update(broker)
|
||||
break
|
||||
case TreeCacheEvent.Type.NODE_REMOVED:
|
||||
KafkaBroker broker = this.factory.createKafkaBroker(event)
|
||||
this.manager.remove(broker)
|
||||
break
|
||||
// TODO these 3 events might come with path which can be mapped
|
||||
// to a specific broker
|
||||
case TreeCacheEvent.Type.CONNECTION_LOST:
|
||||
case TreeCacheEvent.Type.CONNECTION_SUSPENDED:
|
||||
this.manager.offline()
|
||||
break
|
||||
case TreeCacheEvent.Type.CONNECTION_RECONNECTED:
|
||||
this.manager.online()
|
||||
break
|
||||
}
|
||||
|
||||
this.onBrokerUpdates.each { Closure c ->
|
||||
c?.call(this.manager.list())
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,16 +1,13 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
/**
|
||||
* POJO representing data from Zookeeper for a consumer, topic and offset
|
||||
*/
|
||||
class ConsumerOffset {
|
||||
private String topic
|
||||
private String groupName
|
||||
private Integer offset
|
||||
private Integer partition
|
||||
private ChildData rawData
|
||||
String topic
|
||||
String groupName
|
||||
Integer offset
|
||||
Integer partition
|
||||
|
||||
ConsumerOffset() {
|
||||
}
|
||||
|
@ -22,7 +19,7 @@ class ConsumerOffset {
|
|||
}
|
||||
|
||||
String toString() {
|
||||
return "ConsumerOffset[${hashCode()}] ${topic}:${partition} ${groupName} is at ${offset}"
|
||||
return "ConsumerOffset[${hashCode()}] ${topic}:${partition} ${groupName} is at ${offset}".toString()
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,42 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
/**
|
||||
* parsing TreeCacheEvents and provide helper method to access its data in typed manner
|
||||
*/
|
||||
@TypeChecked
|
||||
class EventData {
|
||||
|
||||
private final String data
|
||||
private final List<String> pathParts
|
||||
|
||||
EventData(TreeCacheEvent event) {
|
||||
ChildData data = event.data
|
||||
this.pathParts = data.path == null ? null : (data.path.split('/') as List)
|
||||
this.data = data.data == null ? null : new String(data.data, 'UTF-8')
|
||||
}
|
||||
|
||||
Integer asInteger() {
|
||||
new Integer(data)
|
||||
}
|
||||
|
||||
String asString() {
|
||||
data
|
||||
}
|
||||
|
||||
Integer pathPartsSize() {
|
||||
pathParts.size()
|
||||
}
|
||||
|
||||
Integer getPathPartAsInteger(int pos) {
|
||||
if (pathParts.size() <= pos) {
|
||||
return null
|
||||
}
|
||||
new Integer(pathParts[pos])
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
import groovy.transform.TypeChecked
|
||||
|
@ -15,20 +15,25 @@ import org.apache.curator.framework.recipes.cache.ChildData
|
|||
@InheritConstructors
|
||||
class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher {
|
||||
private static final String ZK_PATH = '/kafka_spout'
|
||||
private JsonSlurper json
|
||||
private final JsonSlurper json
|
||||
|
||||
KafkaSpoutTreeWatcher(CuratorFramework client, AbstractMap consumersMap) {
|
||||
super(client, consumersMap)
|
||||
KafkaSpoutTreeWatcher(CuratorFramework client,
|
||||
AbstractSet topics,
|
||||
AbstractMap offsets) {
|
||||
super(client, topics, offsets)
|
||||
|
||||
this.json = new JsonSlurper()
|
||||
}
|
||||
|
||||
String zookeeperPath() { return ZK_PATH }
|
||||
String zookeeperPath() {
|
||||
return ZK_PATH
|
||||
}
|
||||
|
||||
/* skipping type checking since Groovy's JsonSlurper gives us a pretty
|
||||
* loose Object to deal with
|
||||
*/
|
||||
@TypeChecked(TypeCheckingMode.SKIP)
|
||||
@SuppressWarnings(['LineLength'])
|
||||
ConsumerOffset processChildData(ChildData nodeData) {
|
||||
Object offsetData = json.parseText(new String(nodeData.data, 'UTF-8'))
|
||||
/*
|
|
@ -0,0 +1,42 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
/**
|
||||
* factory creating Pojo's from TreeCacheEvents
|
||||
*/
|
||||
class PojoFactory {
|
||||
|
||||
private final JsonSlurper json
|
||||
|
||||
PojoFactory(JsonSlurper json) {
|
||||
this.json = json
|
||||
}
|
||||
|
||||
/**
|
||||
* converts an treeCacheEvent into a KafkaBroker. the caller
|
||||
* is responsible for calling the factory method with the
|
||||
* right data. i.e. path starts with /brokers/ids. the implementation
|
||||
* just uses whatever it is given to create a KafkaBroker object
|
||||
*/
|
||||
KafkaBroker createKafkaBroker(TreeCacheEvent event) {
|
||||
EventData data = new EventData(event)
|
||||
// We're expecting paths like: /brokers/ids/1231524312
|
||||
Integer id = data.getPathPartAsInteger(3)
|
||||
if (id == null) {
|
||||
return
|
||||
}
|
||||
String json = data.asString()
|
||||
if (json == null) {
|
||||
new KafkaBroker('', 0, id)
|
||||
}
|
||||
else {
|
||||
Object payload = this.json.parseText(json)
|
||||
new KafkaBroker(payload.host, payload.port, id)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import groovy.transform.TypeChecked
|
||||
import groovy.transform.InheritConstructors
|
||||
|
@ -13,12 +13,15 @@ import org.apache.curator.framework.recipes.cache.ChildData
|
|||
class StandardTreeWatcher extends AbstractConsumerTreeWatcher {
|
||||
private static final String ZK_PATH = '/consumers'
|
||||
|
||||
String zookeeperPath() { return ZK_PATH }
|
||||
String zookeeperPath() {
|
||||
return ZK_PATH
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the necessary information from a standard (i.e. high-level Kafka
|
||||
* consumer) tree of offsets
|
||||
*/
|
||||
@SuppressWarnings(['LineLength'])
|
||||
ConsumerOffset processChildData(ChildData data) {
|
||||
/*
|
||||
ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473,8595174478,1416808804928,1416808805262,1,0,0,0,1,0,8595174473, data=[48]}
|
||||
|
@ -40,15 +43,13 @@ ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473
|
|||
}
|
||||
catch (NumberFormatException ex) {
|
||||
logger.error("Failed to parse an Integer: ${data}")
|
||||
return null
|
||||
offset = null
|
||||
}
|
||||
|
||||
return offset
|
||||
}
|
||||
|
||||
|
||||
Boolean isOffsetSubtree(String path) {
|
||||
return (path =~ /\/consumers\/(.*)\/offsets\/(.*)/)
|
||||
}
|
||||
|
||||
}
|
|
@ -1,29 +0,0 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
class BrokerTreeWatcherSpec extends Specification {
|
||||
BrokerTreeWatcher watcher
|
||||
|
||||
def setup() {
|
||||
this.watcher = new BrokerTreeWatcher()
|
||||
}
|
||||
|
||||
def "brokerIdFromPath() should return the right ID with a valid path"() {
|
||||
given:
|
||||
String path = "/brokers/ids/1337"
|
||||
|
||||
expect:
|
||||
watcher.brokerIdFromPath(path) == 1337
|
||||
}
|
||||
|
||||
def "brokerIdFromPath() should return -1 on null paths"() {
|
||||
expect:
|
||||
watcher.brokerIdFromPath(null) == -1
|
||||
}
|
||||
|
||||
def "brokerIdFromPath() should return -1 on empty/invalid paths"() {
|
||||
expect:
|
||||
watcher.brokerIdFromPath('/spock/ed') == -1
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package com.github.reiseburo.verspaetung
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
class DelaySpec extends Specification {
|
||||
Delay delay = new Delay()
|
||||
|
||||
def "it should give default on first use"() {
|
||||
given:
|
||||
|
||||
expect:
|
||||
delay.value() == Delay.POLLER_DELAY_MIN
|
||||
}
|
||||
|
||||
def "slower has an upper bound"() {
|
||||
given:
|
||||
for(int i = 1; i < 20; i++) { delay.slower() }
|
||||
def firstLast = delay.value()
|
||||
def result = delay.slower()
|
||||
def secondLast = delay.value()
|
||||
|
||||
expect:
|
||||
firstLast == secondLast
|
||||
firstLast == Delay.POLLER_DELAY_MAX
|
||||
result == false
|
||||
}
|
||||
|
||||
def "increasing delay gives true"() {
|
||||
def result = true
|
||||
for(int i = 1; delay.value() < Delay.POLLER_DELAY_MAX; i++) {
|
||||
result = result && delay.slower()
|
||||
}
|
||||
def last = delay.slower()
|
||||
|
||||
expect:
|
||||
result == true
|
||||
last == false
|
||||
}
|
||||
|
||||
def "reset on min value gives false"() {
|
||||
given:
|
||||
def result = delay.reset()
|
||||
|
||||
expect:
|
||||
result == false
|
||||
}
|
||||
def "reset on none min value gives true"() {
|
||||
given:
|
||||
delay.slower()
|
||||
def result = delay.reset()
|
||||
|
||||
expect:
|
||||
result == true
|
||||
}
|
||||
}
|
|
@ -0,0 +1,38 @@
|
|||
package com.github.reiseburo.verspaetung
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
class KafkaConsumerSpec extends Specification {
|
||||
String topic = 'spock-topic'
|
||||
Integer partition = 2
|
||||
String consumerName = 'spock-consumer'
|
||||
|
||||
def "the constructor should set the properties properly"() {
|
||||
given:
|
||||
KafkaConsumer consumer = new KafkaConsumer(topic, partition, consumerName)
|
||||
|
||||
expect:
|
||||
consumer instanceof KafkaConsumer
|
||||
consumer.topic == topic
|
||||
consumer.partition == partition
|
||||
consumer.name == consumerName
|
||||
}
|
||||
|
||||
def "equals() is true with identical source material"() {
|
||||
given:
|
||||
KafkaConsumer consumer1 = new KafkaConsumer(topic, partition, consumerName)
|
||||
KafkaConsumer consumer2 = new KafkaConsumer(topic, partition, consumerName)
|
||||
|
||||
expect:
|
||||
consumer1 == consumer2
|
||||
}
|
||||
|
||||
def "equals() is false with differing source material"() {
|
||||
given:
|
||||
KafkaConsumer consumer1 = new KafkaConsumer(topic, partition, consumerName)
|
||||
KafkaConsumer consumer2 = new KafkaConsumer(topic, partition, "i am different")
|
||||
|
||||
expect:
|
||||
consumer1 != consumer2
|
||||
}
|
||||
}
|
|
@ -0,0 +1,6 @@
|
|||
package com.github.reiseburo.verspaetung
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
class KafkaPollerSpec extends Specification {
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
package com.github.reiseburo.verspaetung
|
||||
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
/**
|
||||
*/
|
||||
class MainSpec extends Specification {
|
||||
def "shouldExcludeGroups() shuold return false by default"() {
|
||||
expect:
|
||||
Main.shouldExcludeConsumer(new String[0], null) == false
|
||||
}
|
||||
|
||||
def "shouldExcludeGroups() with a matching exclude should return true"() {
|
||||
given:
|
||||
final String[] groups = ['foo'] as String[]
|
||||
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'foo')
|
||||
|
||||
expect:
|
||||
Main.shouldExcludeConsumer(groups, consumer)
|
||||
}
|
||||
|
||||
def "shouldExcludeGroups() with a matching regex should return true"() {
|
||||
given:
|
||||
final String[] groups = ['.*foo'] as String[]
|
||||
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'spockfoo')
|
||||
|
||||
expect:
|
||||
Main.shouldExcludeConsumer(groups, consumer)
|
||||
}
|
||||
|
||||
def "shouldExcludeGroups() with multiple regexes that don't match should return false"() {
|
||||
given:
|
||||
final String[] groups = ['.*foo', 'bar(.*)'] as String[]
|
||||
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'spock')
|
||||
|
||||
expect:
|
||||
Main.shouldExcludeConsumer(groups, consumer) == false
|
||||
}
|
||||
|
||||
def "shouldExcludeGroups() with multiple regexes which match should return true"() {
|
||||
given:
|
||||
final String[] groups = ['.*foo', 'bar(.*)'] as String[]
|
||||
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'barstool')
|
||||
|
||||
expect:
|
||||
Main.shouldExcludeConsumer(groups, consumer)
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung
|
||||
package com.github.reiseburo.verspaetung
|
||||
|
||||
import spock.lang.*
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
package com.github.reiseburo.verspaetung.metrics
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
import com.github.reiseburo.verspaetung.KafkaConsumer
|
||||
import com.github.reiseburo.verspaetung.TopicPartition
|
||||
|
||||
class ConsumerGaugeSpec extends Specification {
|
||||
private KafkaConsumer consumer
|
||||
private TopicPartition tp
|
||||
|
||||
def setup() {
|
||||
this.tp = new TopicPartition('spock-topic', 1)
|
||||
this.consumer = new KafkaConsumer(tp.topic, tp.partition, 'spock-consumer')
|
||||
}
|
||||
|
||||
def "constructor should work"() {
|
||||
given:
|
||||
ConsumerGauge gauge = new ConsumerGauge(consumer, [:], [:])
|
||||
|
||||
expect:
|
||||
gauge.consumer instanceof KafkaConsumer
|
||||
gauge.consumers instanceof AbstractMap
|
||||
}
|
||||
|
||||
def "getValue() should source a value from the map"() {
|
||||
given:
|
||||
ConsumerGauge gauge = new ConsumerGauge(this.consumer,
|
||||
[(this.consumer) : 2],
|
||||
[(this.tp) : 3])
|
||||
|
||||
expect:
|
||||
gauge.value == 1
|
||||
}
|
||||
|
||||
def "getValue() should return zero for a consumer not in the map"() {
|
||||
given:
|
||||
ConsumerGauge gauge = new ConsumerGauge(this.consumer, [:], [:])
|
||||
|
||||
expect:
|
||||
gauge.value == 0
|
||||
}
|
||||
|
||||
def "getValue() should return zero instead of a negative number"() {
|
||||
given:
|
||||
ConsumerGauge gauge = new ConsumerGauge(this.consumer,
|
||||
[(this.consumer) : 10],
|
||||
[(this.tp) : 5])
|
||||
|
||||
expect:
|
||||
gauge.value == 0
|
||||
}
|
||||
}
|
|
@ -1,8 +1,8 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
import com.github.lookout.verspaetung.TopicPartition
|
||||
import com.github.reiseburo.verspaetung.TopicPartition
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
@ -13,7 +13,7 @@ class AbstractConsumerTreeWatcherSpec extends Specification {
|
|||
|
||||
class MockWatcher extends AbstractConsumerTreeWatcher {
|
||||
MockWatcher() {
|
||||
super(null, [:])
|
||||
super(null, new HashSet(), [:])
|
||||
}
|
||||
ConsumerOffset processChildData(ChildData d) { }
|
||||
String zookeeperPath() { return '/zk/spock' }
|
||||
|
@ -63,10 +63,11 @@ class AbstractConsumerTreeWatcherSpec extends Specification {
|
|||
watcher.trackConsumerOffset(offset)
|
||||
|
||||
then:
|
||||
watcher.consumersMap.size() == 1
|
||||
watcher.consumerOffsets.size() == 1
|
||||
watcher.watchedTopics.size() == 1
|
||||
}
|
||||
|
||||
def "trackConsumerOffset() should append to a list for existing topics in the map"() {
|
||||
def "trackConsumerOffset() append an offset but not a topic for different group names"() {
|
||||
given:
|
||||
String topic = 'spock-topic'
|
||||
TopicPartition mapKey = new TopicPartition(topic, 0)
|
||||
|
@ -80,8 +81,15 @@ class AbstractConsumerTreeWatcherSpec extends Specification {
|
|||
watcher.trackConsumerOffset(secondOffset)
|
||||
|
||||
then:
|
||||
watcher.consumersMap.size() == 1
|
||||
watcher.consumersMap[mapKey].size() == 2
|
||||
watcher.watchedTopics.size() == 1
|
||||
watcher.consumerOffsets.size() == 2
|
||||
}
|
||||
|
||||
|
||||
@Ignore
|
||||
def "removeConsumer() should remove a ConsumerOffset from the map"() {
|
||||
given:
|
||||
TopicPartition tp = new TopicPartition('spock', 1)
|
||||
ConsumerOffset consumer = new ConsumerOffset()
|
||||
}
|
||||
}
|
|
@ -1,8 +1,8 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
import com.github.lookout.verspaetung.TopicPartition
|
||||
import com.github.reiseburo.verspaetung.TopicPartition
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
|
@ -0,0 +1,95 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
|
||||
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
class BrokerManagerSpec extends Specification {
|
||||
BrokerManager manager = new BrokerManager()
|
||||
|
||||
def "new instance is offline, i.e. has empty list of brokers"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
|
||||
expect:
|
||||
manager.list().size() == 0
|
||||
}
|
||||
|
||||
def "new instance is offline after add broker"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
manager.add(broker)
|
||||
|
||||
expect:
|
||||
manager.list().size() == 0
|
||||
}
|
||||
|
||||
def "list shows brokers after getting online"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
KafkaBroker broker2 = new KafkaBroker('', 0, 123)
|
||||
manager.add(broker)
|
||||
manager.online()
|
||||
manager.remove(broker2)
|
||||
|
||||
expect:
|
||||
manager.list().size() == 1
|
||||
}
|
||||
|
||||
def "can remove brokers based on its id"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
|
||||
manager.add(broker)
|
||||
manager.online()
|
||||
manager.remove(broker2)
|
||||
|
||||
expect:
|
||||
manager.list().size() == 0
|
||||
}
|
||||
|
||||
def "can update brokers"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
|
||||
manager.online()
|
||||
manager.add(broker)
|
||||
manager.update(broker2)
|
||||
|
||||
expect:
|
||||
manager.list().size() == 1
|
||||
manager.list().first().host == 'localhost'
|
||||
}
|
||||
|
||||
def "can go offline anytime"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
|
||||
manager.online()
|
||||
manager.add(broker)
|
||||
manager.add(broker2)
|
||||
manager.offline()
|
||||
|
||||
expect:
|
||||
manager.list().size() == 0
|
||||
}
|
||||
|
||||
def "can go offline and online again"() {
|
||||
given:
|
||||
KafkaBroker broker = new KafkaBroker('', 0, 1)
|
||||
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
|
||||
manager.online()
|
||||
manager.add(broker)
|
||||
manager.add(broker2)
|
||||
manager.offline()
|
||||
manager.online()
|
||||
|
||||
expect:
|
||||
manager.list().size() == 2
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
class EventDataSpec extends Specification {
|
||||
EventData data
|
||||
|
||||
def "converts TreeCacheEvents to EventData"() {
|
||||
given:
|
||||
String path = "/brokers/ids/1337"
|
||||
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||
new ChildData(path, null, "be happy".bytes)))
|
||||
|
||||
expect:
|
||||
data.getPathPartAsInteger(3) == 1337
|
||||
data.asString() == 'be happy'
|
||||
data.pathPartsSize() == 4
|
||||
data.getPathPartAsInteger(4) == null
|
||||
}
|
||||
|
||||
def "converts TreeCacheEvents to EventData with integer payload and no path"() {
|
||||
given:
|
||||
String path = "/brokers/ids/1337"
|
||||
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||
new ChildData("/", null, "42".bytes)))
|
||||
|
||||
expect:
|
||||
data.asString() == '42'
|
||||
data.asInteger() == 42
|
||||
data.pathPartsSize() == 0
|
||||
data.getPathPartAsInteger(0) == null
|
||||
}
|
||||
|
||||
def "converts TreeCacheEvents to EventData with no payload"() {
|
||||
given:
|
||||
String path = "/brokers/ids/1337"
|
||||
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||
new ChildData("/123", null, null)))
|
||||
|
||||
expect:
|
||||
data.asString() == null
|
||||
data.pathPartsSize() == 2
|
||||
data.getPathPartAsInteger(1) == 123
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
|
@ -11,7 +11,7 @@ class KafkaSpoutTreeWatcherSpec extends Specification {
|
|||
|
||||
def setup() {
|
||||
this.mockCurator = Mock(CuratorFramework)
|
||||
this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, [:])
|
||||
this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, new HashSet(), [:])
|
||||
}
|
||||
|
||||
def "consumerNameFromPath() should give the right name for a valid path"() {
|
|
@ -0,0 +1,52 @@
|
|||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
|
||||
import com.github.reiseburo.verspaetung.KafkaBroker
|
||||
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||
|
||||
class PojoFactorySpec extends Specification {
|
||||
PojoFactory factory = new PojoFactory(new JsonSlurper())
|
||||
|
||||
def "create KafkaBroker from TreeCacheEvent"() {
|
||||
given:
|
||||
String path = "/brokers/ids/1337"
|
||||
String payload = "{\"host\":\"localhost\",\"port\":9092}"
|
||||
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||
new ChildData(path, null, payload.bytes))
|
||||
KafkaBroker broker = factory.createKafkaBroker(event)
|
||||
|
||||
expect:
|
||||
broker.brokerId == 1337
|
||||
broker.host == 'localhost'
|
||||
broker.port == 9092
|
||||
}
|
||||
|
||||
def "create KafkaBroker from TreeCacheEvent without brokerId on path"() {
|
||||
given:
|
||||
String path = "/brokers/ids"
|
||||
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||
new ChildData(path, null, ''.bytes))
|
||||
KafkaBroker broker = factory.createKafkaBroker(event)
|
||||
|
||||
expect:
|
||||
broker == null
|
||||
}
|
||||
|
||||
def "create KafkaBroker from TreeCacheEvent without payload"() {
|
||||
given:
|
||||
String path = "/brokers/ids/123"
|
||||
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
|
||||
new ChildData(path, null, null))
|
||||
KafkaBroker broker = factory.createKafkaBroker(event)
|
||||
|
||||
expect:
|
||||
broker.brokerId == 123
|
||||
broker.host == ''
|
||||
broker.port == 0
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package com.github.lookout.verspaetung.zk
|
||||
package com.github.reiseburo.verspaetung.zk
|
||||
|
||||
import spock.lang.*
|
||||
|
||||
|
@ -11,7 +11,7 @@ class StandardTreeWatcherSpec extends Specification {
|
|||
|
||||
def setup() {
|
||||
this.mockCurator = Mock(CuratorFramework)
|
||||
this.watcher = new StandardTreeWatcher(this.mockCurator, [:])
|
||||
this.watcher = new StandardTreeWatcher(this.mockCurator, new HashSet(), [:])
|
||||
}
|
||||
|
||||
def "processChildData should return null if the path is invalid"() {
|
Loading…
Reference in New Issue