Compare commits

...

29 Commits

Author SHA1 Message Date
R. Tyler Croy 148359d816
Pretty much any node is going to have Java, because Jenkins, duh 2016-08-03 21:22:45 -07:00
R. Tyler Croy 0bac696847
Archive test results, etc 2015-12-26 21:51:54 -08:00
R. Tyler Croy dc1cfab361 Merge pull request #38 from reiseburo/ci
Add a Jenkinsfile for building
2015-12-26 21:46:07 -08:00
R. Tyler Croy 324b3faba7
Add a Jenkinsfile for building 2015-12-26 21:32:16 -08:00
R. Tyler Croy f901cba7e1
Bump version to release some of mkristian's refactorings 2015-12-26 15:19:42 -08:00
R. Tyler Croy 48a258c3bc Merge pull request #37 from mkristian/broker-watcher-refactor
Broker watcher refactor
2015-12-26 15:13:56 -08:00
Christian Meier 7a6ef71eef refactor the BrokerTreeWatcher
split things into smaller pieces and as preparation of having only
ONE TreeWatcher running which itself dispatch the events to the interresting
parties.

Sponsored by Lookout Inc.
2015-11-10 22:42:07 +01:00
Christian Meier 618aaf915e make the connection or disconnection of consumers failsafe
i.e. ignore errors on disconnect and do not register consumers which
fail to connect
2015-11-10 22:15:57 +01:00
Christian Meier 66c0c332bb make is proper POJO and remove json magic to allow easier testing 2015-11-10 22:15:57 +01:00
Christian Meier 4c655995f9 nicer logging 2015-11-10 22:15:57 +01:00
Christian Meier dc918a700e not needed 2015-11-10 22:15:57 +01:00
R. Tyler Croy 416168cbff
Correct link to download button 2015-11-07 13:18:31 -08:00
R. Tyler Croy 68ce9cd72e
Add a docker-compose.yml for locally testing verspaetung 2015-11-07 13:11:38 -08:00
R. Tyler Croy 9af40ccdec Merge pull request #36 from reiseburo/reiseburo-rename
Bump the minor version for release, updating the group too
2015-11-07 12:55:36 -08:00
R. Tyler Croy 1525ae30ac
Add a new bintray API key for publishing releases 2015-11-07 12:46:19 -08:00
R. Tyler Croy fb4f152fed
Bump the minor version for release, updating the group too
Renaming the entire source tree now that this is under reiseburo
2015-11-07 12:39:25 -08:00
R. Tyler Croy f15aeaee0c Merge pull request #35 from mkristian/be-nice
Be nice
2015-11-03 12:07:48 -08:00
Christian Meier 98af950498 be nice and close all resources on shutdown 2015-11-03 20:40:49 +01:00
Christian Meier 439e8f635e declare private methods as such
helps to understand what methods could be used by other threads
2015-11-03 20:40:49 +01:00
Christian Meier 45d98cb5f6 allow new nodes to be added when they pop up
also ensure the broker list is synchronized in both the KafkaPoller
as well the BrokerTreeWatcher threads
2015-11-03 20:40:48 +01:00
Christian Meier 59c8e79f4e reduce polling speed in case kafka is responding with exceptions
just double the interval on each successive error and reset it to 1 sec
once a request succeeded again. maximum polling inteval is about half
an hour.
2015-11-03 20:40:41 +01:00
R. Tyler Croy 07bf319972
Add new usage output to readme 2015-09-01 17:43:35 -07:00
R. Tyler Croy 3778a24044
Add the --exclude CLI option for filtering out unwanted consumer groups
Fixes #34
2015-09-01 17:10:43 -07:00
R. Tyler Croy 921ef7be64
Only build on JDK8, targeting JDK7 compatibility though
Stupid errors in Travis make the build feedback not very useful
2015-09-01 16:39:44 -07:00
R. Tyler Croy 7b918d448a
Don't use groovy strings where we don't need to 2015-09-01 16:29:34 -07:00
R. Tyler Croy 4f416e84dd
Configure the intellij integration with sources 2015-09-01 16:06:49 -07:00
R. Tyler Croy e18ac81468
Introduce codenarc and make the project adhere to some coding standards 2015-09-01 15:50:31 -07:00
R. Tyler Croy fc97f67100
Add support for a customized delay in seconds between reports (defaulting to 5s) 2015-09-01 14:41:16 -07:00
R. Tyler Croy 0168f5c805
Disable generation of tar and zip archives
Fixes #28
2015-09-01 14:21:38 -07:00
42 changed files with 1343 additions and 312 deletions

View File

@ -1,7 +1,5 @@
language: java
# Only the oraclejdk7 build on a tag will actually release
jdk:
- oraclejdk7
- oraclejdk8
# Using 'install' to run the clean to avoid Travis automatically calling
@ -16,7 +14,6 @@ script:
env:
global:
secure: MzcJafov6+fztyym0hZFTxjirTAgVFqFRO4pSSoDUZV71jHBYRKLmQxiaYpqdl9d7Q7Jz7UfNZRSisNwZQdeZjs0B9yJwy9m1mDlJaUXIWN/xzW04qPnZ5zxh1yJHK+UHIw5G2qRZSE42m9G3TSRBlUz6OMk+tr2UYErfnKzcsc=
- secure: X71NKVXJjyG1C6/fZUTxdQ6HwAcaNRWSc0m/VRXsaiZwgXjIr+C+Uz9X4iu6Q52YAc+7CrvftAbIxgQhf+TBtvvnGeZgWXuYqf6Cx1weyL/6xsttOLsEGdtHU9jvrtw4tHRXxu/6F+8QAot8VRwUsCB4IL5Y3epshddbk+/1d9Q=
after_success:
"./gradlew bintrayUpload -PbintrayUser=lookouteng -PbintrayKey=${BINTRAY_KEY}"
after_success: "./gradlew bintrayUpload -PbintrayUser=rtyler -PbintrayKey=${BINTRAY_KEY}"

View File

@ -20,4 +20,4 @@ NOTE: This is mostly meant for the developer team.
Currently releases can be produced by simply pushing a Git tag to this GitHub
repository. This will cause Travis CI to build and test the tag, which if it is
successful, will automatically publish to
link:https://bintray.com/lookout/systems/verspaetung[Bintray].
link:https://bintray.com/reiseburo/apps/verspaetung[Bintray].

39
Jenkinsfile vendored Normal file
View File

@ -0,0 +1,39 @@
/*
* This is a multibranch workflow file for defining how this project should be
* built, tested and deployed
*/
node {
stage 'Clean workspace'
/* Running on a fresh Docker instance makes this redundant, but just in
* case the host isn't configured to give us a new Docker image for every
* build, make sure we clean things before we do anything
*/
deleteDir()
stage 'Checkout source'
/*
* Represents the SCM configuration in a "Workflow from SCM" project build. Use checkout
* scm to check out sources matching Jenkinsfile with the SCM details from
* the build that is executing this Jenkinsfile.
*
* when not in multibranch: https://issues.jenkins-ci.org/browse/JENKINS-31386
*/
checkout scm
stage 'Build and test'
/* if we can't install everything we need for Ruby in less than 15 minutes
* we might as well just give up
*/
timeout(30) {
sh './gradlew -iS'
}
stage 'Capture test results and artifacts'
step([$class: 'JUnitResultArchiver', testResults: 'build/test-results/**/*.xml'])
step([$class: 'ArtifactArchiver', artifacts: 'build/libs/*.jar', fingerprint: true])
}
// vim: ft=groovy

View File

@ -1,4 +1,4 @@
*Copyright (c) 2015 Lookout, Inc*
*Copyright (c) 2015 Lookout, Inc, R. Tyler Croy*
MIT License

View File

@ -1,6 +1,6 @@
image:https://travis-ci.org/lookout/verspaetung.svg?branch=master["Build Status", link="https://travis-ci.org/lookout/verspaetung"]
image:https://travis-ci.org/reiseburo/verspaetung.svg?branch=master["Build Status", link="https://travis-ci.org/reiseburo/verspaetung"]
image::https://api.bintray.com/packages/lookout/systems/verspaetung/images/download.svg[link="https://bintray.com/lookout/systems/verspaetung/_latestVersion"]
image::https://api.bintray.com/packages/reiseburo/apps/verspaetung/images/download.svg[link="https://bintray.com/reiseburo/apps/verspaetung/_latestVersion"]
Verspätung is a small utility which aims to help identify delay of link:http://kafka.apache.org[Kafka] consumers.
@ -15,13 +15,20 @@ reports it to statsd.
% java -jar verspaetung-*-all.jar --help
usage: verspaetung
-d,--delay <DELAY> Seconds to delay between reporting metrics to
the metrics receiver (defaults: 5s)
-H,--statsd-host <STATSD> Hostname for a statsd instance (defaults to
localhost)
-n,--dry-run Disable reporting to a statsd host
-p,--statsd-port <PORT> Port for the statsd instance (defaults to
8125)
--prefix <PREFIX> Prefix all metrics with PREFIX before they're
reported (e.g. PREFIX.verspaetung.mytopic)
-s,--storm Watch Storm KafkaSpout offsets (under
/kafka_spout)
-x,--exclude <EXCLUDES> Regular expression for consumer groups to
exclude from reporting (can be declared
multiple times)
-z,--zookeeper <HOSTS> Comma separated list of Zookeeper hosts (e.g.
localhost:2181)

View File

@ -3,16 +3,35 @@ plugins {
id 'com.github.johnrengelman.shadow' version '1.2.0'
id "org.ajoberstar.github-pages" version "1.2.0"
id "org.asciidoctor.gradle.asciidoctor" version "1.5.1"
id 'codenarc'
id 'groovy'
id 'idea'
id 'application'
}
group = "com.github.lookout"
group = "com.github.reiseburo"
description = "A utility for monitoring the delay of Kafka consumers"
version = '0.3.0'
mainClassName = 'com.github.lookout.verspaetung.Main'
version = '0.6.0'
mainClassName = 'com.github.reiseburo.verspaetung.Main'
defaultTasks 'check', 'assemble'
/* Ensure we properly build for JDK7 still */
plugins.withType(JavaPlugin) {
sourceCompatibility = 1.7
targetCompatibility = 1.7
project.tasks.withType(JavaCompile) { task ->
task.sourceCompatibility = project.sourceCompatibility
task.targetCompatibility = project.targetCompatibility
}
project.tasks.withType(GroovyCompile) { task ->
task.sourceCompatibility = project.sourceCompatibility
task.targetCompatibility = project.targetCompatibility
}
}
////////////////////////////////////////////////////////////////////////////////
// TESTING
@ -63,9 +82,18 @@ dependencies {
/* Logback is to be used for logging through the app */
compile 'ch.qos.logback:logback-classic:1.1.2+'
codenarc "org.codenarc:CodeNarc:0.24"
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0'
testCompile 'cglib:cglib-nodep:2.2.+'
}
idea {
module {
downloadJavadoc = true
downloadSources = true
}
}
////////////////////////////////////////////////////////////////////////////////
@ -92,6 +120,14 @@ githubPages {
////////////////////////////////////////////////////////////////////////////////
codenarc {
configFile file("${projectDir}/gradle/codenarc.rules")
sourceSets = [sourceSets.main]
}
codenarcMain {
exclude '**/Main.groovy'
}
shadowJar {
exclude 'META-INF/*.RSA', 'META-INF/*.DSA'
manifest {
@ -106,6 +142,22 @@ artifacts {
archives shadowJar
}
sourceSets {
main {
groovy {
exclude '**/*.sw*'
}
}
}
/*
* https://github.com/reiseburo/verspaetung/issues/28
*
* disable the distZip and distTar tasks since we only need the shadow jar
*/
distZip.enabled = false
distTar.enabled = false
/* We're not building a library jar so we'll disable this default jar task */
jar.enabled = false
/* Remove the "library" jar from the archives configuration so it's not
@ -120,15 +172,15 @@ bintray {
publish = true
/*
* Only only publish when we're tagging a release and if we've executed on
* the JDK7 build. This is to prevent multiple attempts by the build matrix
* a JDK8 build. This is to prevent multiple attempts by the build matrix
* to publish the artifacts
*/
dryRun = !((System.env.TRAVIS_TAG as boolean) && (System.env.TRAVIS_JDK_VERSION == 'oraclejdk7'))
dryRun = !((System.env.TRAVIS_TAG as boolean) && (System.env.TRAVIS_JDK_VERSION == 'oraclejdk8'))
configurations = ['archives']
pkg {
userOrg = 'lookout'
repo = 'systems'
userOrg = 'reiseburo'
repo = 'apps'
name = 'verspaetung'
labels = []

16
docker-compose.yml Normal file
View File

@ -0,0 +1,16 @@
zookeeper:
image: wurstmeister/zookeeper
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
ports:
- 9092:9092
links:
- zookeeper:zk
environment:
# Only using one node, so we're fine with localhost
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
volumes:
- /var/run/docker.sock:/var/run/docker.sock

View File

@ -1,4 +1,5 @@
org.gradle.daemon=true
# Disabling daemon mode since it makes PermGen leaks with codenarc more common
org.gradle.daemon=false
bintrayUser=
bintrayKey=
sourceCompatibility=1.7

379
gradle/codenarc.rules Normal file
View File

@ -0,0 +1,379 @@
ruleset {
description '''
A Sample Groovy RuleSet containing all CodeNarc Rules
You can use this as a template for your own custom RuleSet.
Just delete the rules that you don't want to include.
'''
AbcComplexity // DEPRECATED: Use the AbcMetric rule instead. Requires the GMetrics jar
AbcMetric // Requires the GMetrics jar
AbstractClassName
AbstractClassWithPublicConstructor
AbstractClassWithoutAbstractMethod
AddEmptyString
AssertWithinFinallyBlock
AssignCollectionSort
AssignCollectionUnique
AssignmentInConditional
AssignmentToStaticFieldFromInstanceMethod
BigDecimalInstantiation
BitwiseOperatorInConditional
BlankLineBeforePackage
BooleanGetBoolean
BooleanMethodReturnsNull
BracesForClass
BracesForForLoop
BracesForIfElse
BracesForMethod
BracesForTryCatchFinally
BrokenNullCheck
BrokenOddnessCheck
BuilderMethodWithSideEffects
BusyWait
CatchArrayIndexOutOfBoundsException
CatchError
CatchException
CatchIllegalMonitorStateException
CatchIndexOutOfBoundsException
CatchNullPointerException
CatchRuntimeException
CatchThrowable
ChainedTest
ClassForName
ClassJavadoc
ClassName
ClassNameSameAsFilename
ClassNameSameAsSuperclass
ClassSize
CloneWithoutCloneable
CloneableWithoutClone
CloseWithoutCloseable
ClosureAsLastMethodParameter
ClosureStatementOnOpeningLineOfMultipleLineClosure
CollectAllIsDeprecated
CompareToWithoutComparable
ComparisonOfTwoConstants
ComparisonWithSelf
ConfusingClassNamedException
ConfusingMethodName
ConfusingMultipleReturns
ConfusingTernary
ConsecutiveBlankLines
ConsecutiveLiteralAppends
ConsecutiveStringConcatenation
ConstantAssertExpression
ConstantIfExpression
ConstantTernaryExpression
ConstantsOnlyInterface
CouldBeElvis
CoupledTestCase
CrapMetric // Requires the GMetrics jar and a Cobertura coverage file
CyclomaticComplexity // Requires the GMetrics jar
DeadCode
DirectConnectionManagement
DoubleCheckedLocking
DoubleNegative
DuplicateCaseStatement
DuplicateImport
DuplicateListLiteral
DuplicateMapKey
DuplicateMapLiteral
/*
This is a pointless check
DuplicateNumberLiteral
/*
DuplicateSetValue
DuplicateStringLiteral
ElseBlockBraces
EmptyCatchBlock
EmptyClass
EmptyElseBlock
EmptyFinallyBlock
EmptyForStatement
EmptyIfStatement
EmptyInstanceInitializer
EmptyMethod
EmptyMethodInAbstractClass
EmptyStaticInitializer
EmptySwitchStatement
EmptySynchronizedStatement
EmptyTryBlock
EmptyWhileStatement
EnumCustomSerializationIgnored
EqualsAndHashCode
EqualsOverloaded
ExceptionExtendsError
ExceptionExtendsThrowable
ExceptionNotThrown
ExplicitArrayListInstantiation
ExplicitCallToAndMethod
ExplicitCallToCompareToMethod
ExplicitCallToDivMethod
ExplicitCallToEqualsMethod
ExplicitCallToGetAtMethod
ExplicitCallToLeftShiftMethod
ExplicitCallToMinusMethod
ExplicitCallToModMethod
ExplicitCallToMultiplyMethod
ExplicitCallToOrMethod
ExplicitCallToPlusMethod
ExplicitCallToPowerMethod
ExplicitCallToRightShiftMethod
ExplicitCallToXorMethod
ExplicitGarbageCollection
ExplicitHashMapInstantiation
ExplicitHashSetInstantiation
ExplicitLinkedHashMapInstantiation
ExplicitLinkedListInstantiation
ExplicitStackInstantiation
ExplicitTreeSetInstantiation
FactoryMethodName
FieldName
FileCreateTempFile
FileEndsWithoutNewline
FinalClassWithProtectedMember
ForLoopShouldBeWhileLoop
ForStatementBraces
GStringAsMapKey
GStringExpressionWithinString
GetterMethodCouldBeProperty
GrailsDomainHasEquals
GrailsDomainHasToString
GrailsDomainReservedSqlKeywordName
GrailsDomainWithServiceReference
GrailsDuplicateConstraint
GrailsDuplicateMapping
GrailsMassAssignment
GrailsPublicControllerMethod
GrailsServletContextReference
GrailsSessionReference // DEPRECATED
GrailsStatelessService
GroovyLangImmutable
HardCodedWindowsFileSeparator
HardCodedWindowsRootDirectory
HashtableIsObsolete
IfStatementBraces
/*
"Uh yes, we'd like you to make your code more unreadable"
IfStatementCouldBeTernary
*/
IllegalClassMember
IllegalClassReference
IllegalPackageReference
IllegalRegex
IllegalString
IllegalSubclass
ImplementationAsType
ImportFromSamePackage
ImportFromSunPackages
InconsistentPropertyLocking
InconsistentPropertySynchronization
InsecureRandom
/*
instanceof is useful in Groovy, not sure why this check is here
Instanceof
*/
IntegerGetInteger
InterfaceName
InterfaceNameSameAsSuperInterface
InvertedIfElse
JUnitAssertAlwaysFails
JUnitAssertAlwaysSucceeds
JUnitAssertEqualsConstantActualValue
JUnitFailWithoutMessage
JUnitLostTest
JUnitPublicField
JUnitPublicNonTestMethod
JUnitPublicProperty
JUnitSetUpCallsSuper
JUnitStyleAssertions
JUnitTearDownCallsSuper
JUnitTestMethodWithoutAssert
JUnitUnnecessarySetUp
JUnitUnnecessaryTearDown
JUnitUnnecessaryThrowsException
JavaIoPackageAccess
JdbcConnectionReference
JdbcResultSetReference
JdbcStatementReference
LineLength
LocaleSetDefault
LoggerForDifferentClass
LoggerWithWrongModifiers
LoggingSwallowsStacktrace
LongLiteralWithLowerCaseL
MethodCount
MethodName
MethodSize
MisorderedStaticImports
MissingBlankLineAfterImports
MissingBlankLineAfterPackage
MissingNewInThrowStatement
MultipleLoggers
MultipleUnaryOperators
NestedBlockDepth
NestedForLoop
NestedSynchronization
NoDef
NoWildcardImports
NonFinalPublicField
NonFinalSubclassOfSensitiveInterface
ObjectFinalize
ObjectOverrideMisspelledMethodName
PackageName
PackageNameMatchesFilePath
ParameterCount
ParameterName
ParameterReassignment
PrintStackTrace
Println
PrivateFieldCouldBeFinal
PropertyName
PublicFinalizeMethod
PublicInstanceField
RandomDoubleCoercedToZero
RemoveAllOnSelf
RequiredRegex
RequiredString
ReturnFromFinallyBlock
ReturnNullFromCatchBlock
ReturnsNullInsteadOfEmptyArray
ReturnsNullInsteadOfEmptyCollection
SerialPersistentFields
SerialVersionUID
/* Not needed
SerializableClassMustDefineSerialVersionUID
*/
SimpleDateFormatMissingLocale
SpaceAfterCatch
SpaceAfterClosingBrace
SpaceAfterComma
SpaceAfterFor
SpaceAfterIf
SpaceAfterOpeningBrace
SpaceAfterSemicolon
SpaceAfterSwitch
SpaceAfterWhile
SpaceAroundClosureArrow
/*
I prefer a little extra space
SpaceAroundMapEntryColon
*/
SpaceAroundOperator
SpaceBeforeClosingBrace
SpaceBeforeOpeningBrace
SpockIgnoreRestUsed
StatelessClass
StatelessSingleton
StaticCalendarField
StaticConnection
StaticDateFormatField
StaticMatcherField
StaticSimpleDateFormatField
SwallowThreadDeath
SynchronizedMethod
SynchronizedOnBoxedPrimitive
SynchronizedOnGetClass
SynchronizedOnReentrantLock
SynchronizedOnString
SynchronizedOnThis
SynchronizedReadObjectMethod
SystemErrPrint
SystemExit
SystemOutPrint
SystemRunFinalizersOnExit
TernaryCouldBeElvis
ThisReferenceEscapesConstructor
ThreadGroup
ThreadLocalNotStaticFinal
ThreadYield
ThrowError
ThrowException
ThrowExceptionFromFinallyBlock
ThrowNullPointerException
ThrowRuntimeException
ThrowThrowable
ToStringReturnsNull
TrailingWhitespace
UnnecessaryBigDecimalInstantiation
UnnecessaryBigIntegerInstantiation
UnnecessaryBooleanExpression
UnnecessaryBooleanInstantiation
UnnecessaryCallForLastElement
UnnecessaryCallToSubstring
UnnecessaryCast
UnnecessaryCatchBlock
UnnecessaryCollectCall
UnnecessaryCollectionCall
UnnecessaryConstructor
UnnecessaryDefInFieldDeclaration
UnnecessaryDefInMethodDeclaration
UnnecessaryDefInVariableDeclaration
UnnecessaryDotClass
UnnecessaryDoubleInstantiation
UnnecessaryElseStatement
UnnecessaryFail
UnnecessaryFinalOnPrivateMethod
UnnecessaryFloatInstantiation
UnnecessaryGString
UnnecessaryGetter
UnnecessaryGroovyImport
UnnecessaryIfStatement
UnnecessaryInstanceOfCheck
UnnecessaryInstantiationToGetClass
UnnecessaryIntegerInstantiation
UnnecessaryLongInstantiation
UnnecessaryModOne
UnnecessaryNullCheck
UnnecessaryNullCheckBeforeInstanceOf
UnnecessaryObjectReferences
UnnecessaryOverridingMethod
UnnecessaryPackageReference
UnnecessaryParenthesesForMethodCallWithClosure
UnnecessaryPublicModifier
/*
I quite like explicit return estatements
UnnecessaryReturnKeyword
*/
UnnecessarySafeNavigationOperator
UnnecessarySelfAssignment
UnnecessarySemicolon
UnnecessaryStringInstantiation
UnnecessarySubstring
UnnecessaryTernaryExpression
UnnecessaryToString
UnnecessaryTransientModifier
UnsafeArrayDeclaration
UnsafeImplementationAsMap
UnusedArray
UnusedImport
/*
Some interfaces require no use for the method parameter
UnusedMethodParameter
*/
UnusedObject
UnusedPrivateField
UnusedPrivateMethod
UnusedPrivateMethodParameter
UnusedVariable
UseAssertEqualsInsteadOfAssertTrue
UseAssertFalseInsteadOfNegation
UseAssertNullInsteadOfAssertEquals
UseAssertSameInsteadOfAssertTrue
UseAssertTrueInsteadOfAssertEquals
UseAssertTrueInsteadOfNegation
UseCollectMany
UseCollectNested
UseOfNotifyMethod
VariableName
VectorIsObsolete
VolatileArrayField
VolatileLongOrDoubleField
WaitOutsideOfWhileLoop
WhileStatementBraces
}
doNotApplyToClassNames = 'Main'
// vim: ft=groovy

View File

@ -1,21 +0,0 @@
package com.github.lookout.verspaetung
/**
* POJO containing the necessary information to model a Kafka broker
*/
class KafkaBroker {
private String host
private Integer port
private Integer brokerId
KafkaBroker(Object jsonObject, Integer brokerId) {
this.host = jsonObject.host
this.port = jsonObject.port
this.brokerId = brokerId
}
@Override
String toString() {
return "broker<${this.brokerId}>@${this.host}:${this.port}"
}
}

View File

@ -1,95 +0,0 @@
package com.github.lookout.verspaetung.zk
import com.github.lookout.verspaetung.KafkaBroker
import groovy.json.JsonSlurper
import groovy.transform.TypeChecked
import java.util.Collections
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCache
import org.apache.curator.framework.recipes.cache.TreeCacheListener
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
import org.slf4j.Logger
import org.slf4j.LoggerFactory
/**
* The BrokerTreeWatcher is a kind of watcher whose sole purpose is
* to watch the segment of the Zookeeper tree where Kafka stores broker
* information
*/
@TypeChecked
class BrokerTreeWatcher extends AbstractTreeWatcher {
static final Integer INVALID_BROKER_ID = -1
private static final String BROKERS_PATH = '/brokers/ids'
private final Logger logger = LoggerFactory.getLogger(BrokerTreeWatcher.class)
private JsonSlurper json
private List<Closure> onBrokerUpdates
private Boolean isTreeInitialized = false
private List<KafkaBroker> brokers
BrokerTreeWatcher(CuratorFramework client) {
super(client)
this.json = new JsonSlurper()
this.brokers = []
this.onBrokerUpdates = []
}
String zookeeperPath() {
return BROKERS_PATH
}
/**
* Process events like NODE_ADDED and NODE_REMOVED to keep an up to date
* list of brokers
*/
@Override
void childEvent(CuratorFramework client, TreeCacheEvent event) {
/* If we're initialized that means we should have all our brokers in
* our internal list already and we can fire an event
*/
if (event.type == TreeCacheEvent.Type.INITIALIZED) {
this.isTreeInitialized = true
List threadsafeBrokers = Collections.synchronizedList(this.brokers)
this.onBrokerUpdates.each { Closure c ->
c?.call(threadsafeBrokers)
}
return
}
if (event.type != TreeCacheEvent.Type.NODE_ADDED) {
return
}
ChildData nodeData = event.data
Integer brokerId = brokerIdFromPath(nodeData.path)
if (brokerId == INVALID_BROKER_ID) {
return
}
Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8'))
this.brokers << new KafkaBroker(brokerData, brokerId)
}
/**
* Process a path string from Zookeeper for the Kafka broker's ID
*
* We're expecting paths like: /brokers/ids/1231524312
*/
Integer brokerIdFromPath(String path) {
List<String> pathParts = path?.split('/') as List
if ((pathParts == null) ||
(pathParts.size() != 4)) {
return INVALID_BROKER_ID
}
return new Integer(pathParts[-1])
}
}

View File

@ -0,0 +1,40 @@
package com.github.reiseburo.verspaetung
/**
* abstract the logic on how to reduce the polling speed and get back to
* to full speed.
*/
class Delay {
static final Integer POLLER_DELAY_MIN = (1 * 1000)
static final Integer POLLER_DELAY_MAX = (2048 * 1000) // about half an hour
private Integer delay = POLLER_DELAY_MIN
boolean reset() {
if (delay != POLLER_DELAY_MIN) {
delay = POLLER_DELAY_MIN
true
}
else {
false
}
}
boolean slower() {
if (delay < POLLER_DELAY_MAX) {
delay += delay
true
}
else {
false
}
}
Integer value() {
delay
}
String toString() {
"Delay[ ${delay / 1000} sec ]"
}
}

View File

@ -0,0 +1,39 @@
package com.github.reiseburo.verspaetung
/**
* POJO containing the necessary information to model a Kafka broker
*/
class KafkaBroker {
final private String host
final private Integer port
final private Integer brokerId
KafkaBroker(String host, Integer port, Integer brokerId) {
this.host = host
this.port = port
this.brokerId = brokerId
}
@Override
int hashCode() {
return this.brokerId
}
@Override
boolean equals(Object compared) {
if (this.is(compared)) {
return true
}
if (!(compared instanceof KafkaBroker)) {
return false
}
return compared.brokerId == brokerId
}
@Override
String toString() {
return "broker<${this.brokerId}>@${this.host}:${this.port}"
}
}

View File

@ -1,4 +1,4 @@
package com.github.lookout.verspaetung
package com.github.reiseburo.verspaetung
/**
* POJO containing the necessary information to model a Kafka consumers
@ -39,13 +39,9 @@ class KafkaConsumer {
return false
}
if ((this.topic == compared.topic) &&
return (this.topic == compared.topic) &&
(this.partition == compared.partition) &&
(this.name == compared.name)) {
return true
}
return false
(this.name == compared.name)
}
}

View File

@ -1,8 +1,5 @@
package com.github.lookout.verspaetung
package com.github.reiseburo.verspaetung
import groovy.transform.TypeChecked
import java.util.concurrent.ConcurrentHashMap
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@ -11,24 +8,28 @@ import kafka.client.ClientUtils
import kafka.consumer.SimpleConsumer
import kafka.common.TopicAndPartition
import kafka.common.KafkaException
import kafka.javaapi.*
/* UGH */
import scala.collection.JavaConversions
/**
* KafkaPoller is a relatively simple class which contains a runloop for periodically
* contacting the Kafka brokers defined in Zookeeper and fetching the latest topic
* meta-data for them
*/
class KafkaPoller extends Thread {
private final Integer POLLER_DELAY = (1 * 1000)
private final String KAFKA_CLIENT_ID = 'VerspaetungClient'
private final Integer KAFKA_TIMEOUT = (5 * 1000)
private final Integer KAFKA_BUFFER = (100 * 1024)
private final Logger logger = LoggerFactory.getLogger(KafkaPoller.class)
private static final String KAFKA_CLIENT_ID = 'VerspaetungClient'
private static final Integer KAFKA_TIMEOUT = (5 * 1000)
private static final Integer KAFKA_BUFFER = (100 * 1024)
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPoller)
private Boolean keepRunning = true
private Boolean shouldReconnect = false
private ConcurrentHashMap<Integer, SimpleConsumer> brokerConsumerMap
private AbstractMap<TopicPartition, Long> topicOffsetMap
private List<Broker> brokers
private List<Closure> onDelta
private AbstractSet<String> currentTopics
private final AbstractMap<Integer, SimpleConsumer> brokerConsumerMap
private final AbstractMap<TopicPartition, Long> topicOffsetMap
private final List<Closure> onDelta
private final AbstractSet<String> currentTopics
private final List<Broker> brokers
KafkaPoller(AbstractMap map, AbstractSet topicSet) {
this.topicOffsetMap = map
@ -36,13 +37,22 @@ class KafkaPoller extends Thread {
this.brokerConsumerMap = [:]
this.brokers = []
this.onDelta = []
setName('Verspaetung Kafka Poller')
}
/* There are a number of cases where we intentionally swallow stacktraces
* to ensure that we're not unnecessarily spamming logs with useless stacktraces
*
* For CatchException, we're explicitly gobbling up all potentially crashing exceptions
* here
*/
@SuppressWarnings(['LoggingSwallowsStacktrace', 'CatchException'])
void run() {
logger.info("Starting wait loop")
LOGGER.info('Starting wait loop')
Delay delay = new Delay()
LOGGER.error('polling ' + delay)
while (keepRunning) {
logger.debug("poll loop")
LOGGER.debug('poll loop')
if (shouldReconnect) {
reconnect()
@ -54,34 +64,48 @@ class KafkaPoller extends Thread {
if (this.currentTopics.size() > 0) {
try {
dumpMetadata()
if (delay.reset()) {
LOGGER.error('back to normal ' + delay)
}
}
catch (KafkaException kex) {
logger.error("Failed to interact with Kafka: ${kex.message}")
LOGGER.error('Failed to interact with Kafka: {}', kex.message)
slower(delay)
}
catch (Exception ex) {
logger.error("Failed to fetch and dump Kafka metadata", ex)
LOGGER.error('Failed to fetch and dump Kafka metadata', ex)
slower(delay)
}
}
Thread.sleep(POLLER_DELAY)
Thread.sleep(delay.value())
}
disconnectConsumers()
}
private void slower(Delay delay) {
if (delay.slower()) {
LOGGER.error('using ' + delay)
}
}
void dumpMetadata() {
logger.debug("dumping meta-data")
@SuppressWarnings('CatchException')
private void dumpMetadata() {
LOGGER.debug('dumping meta-data')
def metadata = fetchMetadataForCurrentTopics()
Object metadata = fetchMetadataForCurrentTopics()
withTopicsAndPartitions(metadata) { tp, p ->
try {
captureLatestOffsetFor(tp, p)
}
catch (Exception ex) {
logger.error("Failed to fetch latest for ${tp.topic}:${tp.partition}", ex)
LOGGER.error('Failed to fetch latest for {}:{}', tp.topic, tp.partition, ex)
}
}
logger.debug("finished dumping meta-data")
LOGGER.debug('finished dumping meta-data')
}
/**
@ -91,7 +115,7 @@ class KafkaPoller extends Thread {
* The 'metadata' is the expected return from
* kafka.client.ClientUtils.fetchTopicMetadata
*/
void withTopicsAndPartitions(Object metadata, Closure closure) {
private void withTopicsAndPartitions(Object metadata, Closure closure) {
withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f ->
withScalaCollection(f.partitionsMetadata).each { p ->
TopicPartition tp = new TopicPartition(f.topic, p.partitionId)
@ -100,11 +124,10 @@ class KafkaPoller extends Thread {
}
}
/**
* Fetch the leader metadata and update our data structures
*/
void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) {
private void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) {
Integer leaderId = partitionMetadata.leader.get()?.id
Integer partitionId = partitionMetadata.partitionId
@ -113,14 +136,14 @@ class KafkaPoller extends Thread {
this.topicOffsetMap[tp] = offset
}
Long latestFromLeader(Integer leaderId, String topic, Integer partition) {
private Long latestFromLeader(Integer leaderId, String topic, Integer partition) {
SimpleConsumer consumer = this.brokerConsumerMap[leaderId]
/* If we don't have a proper SimpleConsumer instance (e.g. null) then
* we might not have gotten valid data back from Zookeeper
*/
if (!(consumer instanceof SimpleConsumer)) {
logger.warn("Attempted to the leaderId: ${leaderId} (${topic}/${partition}")
LOGGER.warn('Attempted to the leaderId: {} ({}/{})', leaderId, topic, partition)
return 0
}
TopicAndPartition topicAndPart = new TopicAndPartition(topic, partition)
@ -128,24 +151,32 @@ class KafkaPoller extends Thread {
return consumer.earliestOrLatestOffset(topicAndPart, -1, 0)
}
Iterable withScalaCollection(scala.collection.Iterable iter) {
private Iterable withScalaCollection(scala.collection.Iterable iter) {
return JavaConversions.asJavaIterable(iter)
}
/**
* Blocking reconnect to the Kafka brokers
*/
void reconnect() {
@SuppressWarnings('CatchException')
private void reconnect() {
disconnectConsumers()
logger.info("Creating SimpleConsumer connections for brokers")
this.brokers.each { Broker b ->
SimpleConsumer consumer = new SimpleConsumer(b.host,
b.port,
KAFKA_TIMEOUT,
KAFKA_BUFFER,
KAFKA_CLIENT_ID)
consumer.connect()
this.brokerConsumerMap[b.id] = consumer
LOGGER.info('Creating SimpleConsumer connections for brokers {}', this.brokers)
synchronized(this.brokers) {
this.brokers.each { Broker broker ->
SimpleConsumer consumer = new SimpleConsumer(broker.host,
broker.port,
KAFKA_TIMEOUT,
KAFKA_BUFFER,
KAFKA_CLIENT_ID)
try {
consumer.connect()
this.brokerConsumerMap[broker.id] = consumer
}
catch (Exception e) {
LOGGER.info('Error connecting cunsumer to {}', broker, e)
}
}
}
this.shouldReconnect = false
}
@ -155,13 +186,18 @@ class KafkaPoller extends Thread {
*/
void die() {
this.keepRunning = false
disconnectConsumers()
}
@SuppressWarnings('CatchException')
private void disconnectConsumers() {
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
logger.info("Disconnecting ${client}")
client?.disconnect()
LOGGER.info('Disconnecting {}', client)
try {
client?.disconnect()
}
catch (Exception e) {
LOGGER.info('Error disconnecting {}', client, e)
}
}
}
@ -169,8 +205,11 @@ class KafkaPoller extends Thread {
* Store a new list of KafkaBroker objects and signal a reconnection
*/
void refresh(List<KafkaBroker> brokers) {
this.brokers = brokers.collect { KafkaBroker b ->
new Broker(b.brokerId, b.host, b.port)
synchronized(this.brokers) {
this.brokers.clear()
this.brokers.addAll(brokers.collect { KafkaBroker b ->
new Broker(b.brokerId, b.host, b.port)
})
}
this.shouldReconnect = true
}
@ -180,7 +219,9 @@ class KafkaPoller extends Thread {
* scala underpinnings
*/
private scala.collection.immutable.Seq getBrokersSeq() {
return JavaConversions.asScalaBuffer(this.brokers).toList()
synchronized(this.brokers) {
return JavaConversions.asScalaBuffer(this.brokers).toList()
}
}
/**
@ -190,7 +231,6 @@ class KafkaPoller extends Thread {
return JavaConversions.asScalaSet(set)
}
private Object fetchMetadataForCurrentTopics() {
return ClientUtils.fetchTopicMetadata(
toScalaSet(currentTopics),
@ -199,6 +239,4 @@ class KafkaPoller extends Thread {
KAFKA_TIMEOUT,
0)
}
}

View File

@ -1,21 +1,19 @@
package com.github.lookout.verspaetung
package com.github.reiseburo.verspaetung
import com.github.lookout.verspaetung.zk.BrokerTreeWatcher
import com.github.lookout.verspaetung.zk.KafkaSpoutTreeWatcher
import com.github.lookout.verspaetung.zk.StandardTreeWatcher
import com.github.lookout.verspaetung.metrics.ConsumerGauge
import com.github.reiseburo.verspaetung.zk.BrokerTreeWatcher
import com.github.reiseburo.verspaetung.zk.KafkaSpoutTreeWatcher
import com.github.reiseburo.verspaetung.zk.StandardTreeWatcher
import com.github.reiseburo.verspaetung.metrics.ConsumerGauge
import com.github.reiseburo.verspaetung.metrics.HeartbeatGauge
import java.util.AbstractMap
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentSkipListSet
import java.util.concurrent.TimeUnit
import groovy.transform.TypeChecked
import org.apache.commons.cli.*
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.TreeCache
import org.coursera.metrics.datadog.DatadogReporter
import org.coursera.metrics.datadog.transport.UdpTransport
import org.slf4j.Logger
@ -23,19 +21,24 @@ import org.slf4j.LoggerFactory
import com.codahale.metrics.*
/**
* Main entry point for running the verspaetung application
*/
@SuppressWarnings
class Main {
private static final String METRICS_PREFIX = 'verspaetung'
private static Logger logger
private static ScheduledReporter reporter
private static final Logger logger = LoggerFactory.getLogger(Main)
private static final MetricRegistry registry = new MetricRegistry()
private static ScheduledReporter reporter
static void main(String[] args) {
String statsdPrefix = METRICS_PREFIX
String zookeeperHosts = 'localhost:2181'
String statsdHost = 'localhost'
Integer statsdPort = 8125
Integer delayInSeconds = 5
String[] excludeGroups = []
CommandLine cli = parseCommandLine(args)
@ -51,17 +54,24 @@ class Main {
statsdPort = cli.getOptionValue('p')
}
logger = LoggerFactory.getLogger(Main.class)
if (cli.hasOption('d')) {
delayInSeconds = cli.getOptionValue('d').toInteger()
}
if (cli.hasOption('x')) {
excludeGroups = cli.getOptionValues('x')
}
logger.info("Running with: ${args}")
logger.warn("Using: zookeepers=${zookeeperHosts} statsd=${statsdHost}:${statsdPort}")
logger.warn('Using: zookeepers={} statsd={}:{}', zookeeperHosts, statsdHost, statsdPort)
logger.info('Reporting every {} seconds', delayInSeconds)
if (cli.hasOption('prefix')) {
statsdPrefix = "${cli.getOptionValue('prefix')}.${METRICS_PREFIX}"
}
registry.register(MetricRegistry.name(Main.class, 'heartbeat'),
new metrics.HeartbeatGauge())
new HeartbeatGauge())
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperHosts, retry)
@ -101,7 +111,9 @@ class Main {
* one
*/
Closure gaugeRegistrar = { KafkaConsumer consumer ->
registerMetricFor(consumer, consumerGauges, consumerOffsets, topicOffsets)
if (!shouldExcludeConsumer(excludeGroups, consumer)) {
registerMetricFor(consumer, consumerGauges, consumerOffsets, topicOffsets)
}
}
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client,
@ -114,8 +126,9 @@ class Main {
/* Assuming that most people aren't needing to run Storm-based watchers
* as well
*/
KafkaSpoutTreeWatcher stormWatcher = null
if (cli.hasOption('s')) {
KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client,
stormWatcher = new KafkaSpoutTreeWatcher(client,
watchedTopics,
consumerOffsets)
stormWatcher.onConsumerData << gaugeRegistrar
@ -141,18 +154,30 @@ class Main {
}
/* Start the reporter if we've got it */
reporter?.start(1, TimeUnit.SECONDS)
reporter?.start(delayInSeconds, TimeUnit.SECONDS)
logger.info("Starting wait loop...")
synchronized(this) {
wait()
}
// shutdown threads
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
Main.logger.info("showdown threads")
poller.die()
consumerWatcher.close()
if (stormWatcher != null) {
stormWatcher.close()
}
poller.join()
}
});
logger.info('Starting wait loop...')
}
static void registerMetricFor(KafkaConsumer consumer,
ConcurrentHashMap<KafkaConsumer, ConsumerGauge> consumerGauges,
ConcurrentHashMap<KafkaConsumer, Integer> consumerOffsets,
ConcurrentHashMap<TopicPartition, Long> topicOffsets) {
/*
* Bail early if we already ahve our Consumer registered
*/
if (consumerGauges.containsKey(consumer)) {
return
}
@ -178,6 +203,12 @@ class Main {
.withValueSeparator(',' as char)
.create('z')
Option excludeGroups = OptionBuilder.withArgName('EXCLUDES')
.hasArgs()
.withDescription('Regular expression for consumer groups to exclude from reporting (can be declared multiple times)')
.withLongOpt('exclude')
.create('x')
Option statsdHost = OptionBuilder.withArgName('STATSD')
.hasArg()
.withType(String)
@ -207,12 +238,21 @@ class Main {
.withLongOpt('prefix')
.create()
Option delaySeconds = OptionBuilder.withArgName('DELAY')
.hasArg()
.withType(Integer)
.withDescription("Seconds to delay between reporting metrics to the metrics receiver (defaults: 5s)")
.withLongOpt('delay')
.create('d')
options.addOption(zookeeper)
options.addOption(statsdHost)
options.addOption(statsdPort)
options.addOption(statsdPrefix)
options.addOption(dryRun)
options.addOption(stormSpouts)
options.addOption(delaySeconds)
options.addOption(excludeGroups)
return options
}
@ -236,4 +276,12 @@ class Main {
System.exit(1)
}
}
/**
* Return true if we should exclude the given KafkaConsumer from reporting
*/
static boolean shouldExcludeConsumer(String[] excludeGroups, KafkaConsumer consumer) {
return null != excludeGroups?.find { String excludeRule -> consumer?.name.matches(excludeRule) }
}
}

View File

@ -1,11 +1,11 @@
package com.github.lookout.verspaetung
package com.github.reiseburo.verspaetung
/**
* Simple container for Kafka topic names and partition IDs
*/
class TopicPartition {
private String topic
private Integer partition
private final String topic
private final Integer partition
TopicPartition(String topic, Integer partition) {
this.topic = topic
@ -27,12 +27,8 @@ class TopicPartition {
return false
}
if ((this.topic == compared.topic) &&
(this.partition == compared.partition)) {
return true
}
return false
return (this.topic == compared.topic) &&
(this.partition == compared.partition)
}
@Override

View File

@ -1,15 +1,11 @@
package com.github.lookout.verspaetung.metrics
package com.github.reiseburo.verspaetung.metrics
import java.util.AbstractMap
import com.codahale.metrics.Gauge
import groovy.transform.TypeChecked
import org.coursera.metrics.datadog.Tagged
import com.github.lookout.verspaetung.KafkaConsumer
import com.github.lookout.verspaetung.TopicPartition
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import com.github.reiseburo.verspaetung.KafkaConsumer
import com.github.reiseburo.verspaetung.TopicPartition
/**
* Dropwizard Metrics Gauge for reporting the value of a given KafkaConsumer
@ -19,9 +15,7 @@ class ConsumerGauge implements Gauge<Integer>, Tagged {
protected KafkaConsumer consumer
protected AbstractMap<KafkaConsumer, Integer> consumers
protected AbstractMap<TopicPartition, Long> topics
private TopicPartition topicPartition
private Logger logger = LoggerFactory.getLogger(ConsumerGauge.class)
private final TopicPartition topicPartition
ConsumerGauge(KafkaConsumer consumer,
AbstractMap<KafkaConsumer, Integer> consumers,
@ -44,7 +38,7 @@ class ConsumerGauge implements Gauge<Integer>, Tagged {
* Returning the maximum value of the computation and zero, there are
* some cases where we can be "behind" on the Kafka latest offset
* polling and this could result in an erroneous negative value. See:
* <https://github.com/lookout/verspaetung/issues/25> for more details
* <https://github.com/reiseburo/verspaetung/issues/25> for more details
*/
return Math.max(0,
((Integer)this.topics[topicPartition]) - this.consumers[consumer])
@ -55,7 +49,7 @@ class ConsumerGauge implements Gauge<Integer>, Tagged {
return ["partition:${this.consumer.partition}",
"topic:${this.consumer.topic}",
"consumer-group:${this.consumer.name}"
].collect { s -> s.toString() }
]*.toString()
}
/**

View File

@ -1,6 +1,6 @@
package com.github.lookout.verspaetung.metrics
package com.github.reiseburo.verspaetung.metrics
import com.codahale.metrics.*
import com.codahale.metrics.Gauge
/**
* A simple gauge that will always just return 1 indicating that the process is

View File

@ -1,22 +1,22 @@
package com.github.lookout.verspaetung.zk
package com.github.reiseburo.verspaetung.zk
import com.github.lookout.verspaetung.KafkaConsumer
import com.github.lookout.verspaetung.TopicPartition
import com.github.reiseburo.verspaetung.KafkaConsumer
import java.util.concurrent.CopyOnWriteArrayList
import groovy.transform.TypeChecked
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
*/
@TypeChecked
abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
protected AbstractMap<KafkaConsumer, Integer> consumerOffsets
protected AbstractSet<String> watchedTopics
protected List<Closure> onConsumerData = []
AbstractConsumerTreeWatcher(CuratorFramework client,
protected AbstractConsumerTreeWatcher(CuratorFramework client,
AbstractSet topics,
AbstractMap offsets) {
super(client)
@ -86,12 +86,8 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher {
* we're interested in
*/
Boolean isNodeEvent(TreeCacheEvent event) {
if ((event?.type == TreeCacheEvent.Type.NODE_ADDED) ||
(event?.type == TreeCacheEvent.Type.NODE_UPDATED)) {
return true
}
return false
return (event?.type == TreeCacheEvent.Type.NODE_ADDED) ||
(event?.type == TreeCacheEvent.Type.NODE_UPDATED)
}
}

View File

@ -1,4 +1,4 @@
package com.github.lookout.verspaetung.zk
package com.github.reiseburo.verspaetung.zk
import groovy.transform.TypeChecked
@ -16,19 +16,21 @@ import org.slf4j.LoggerFactory
* further down the pipeline
*/
@TypeChecked
@SuppressWarnings(['ThisReferenceEscapesConstructor'])
abstract class AbstractTreeWatcher implements TreeCacheListener {
protected List<Closure> onInitComplete
protected Logger logger
protected CuratorFramework client
protected TreeCache cache
AbstractTreeWatcher(CuratorFramework client) {
this.logger = LoggerFactory.getLogger(this.class)
this.client = client
this.onInitComplete = []
protected AbstractTreeWatcher(CuratorFramework curatorClient) {
logger = LoggerFactory.getLogger(this.class)
client = curatorClient
onInitComplete = []
this.cache = new TreeCache(client, zookeeperPath())
this.cache.listenable.addListener(this)
cache = new TreeCache(client, zookeeperPath())
/* this may introduce a race condition, need to figure out a better way to handle it */
cache.listenable.addListener(this)
}
/**
@ -45,5 +47,13 @@ abstract class AbstractTreeWatcher implements TreeCacheListener {
return this
}
/**
* Close our internal cache and return ourselves for API cleanliness
*/
AbstractTreeWatcher close() {
this.cache?.close()
return this
}
abstract void childEvent(CuratorFramework client, TreeCacheEvent event)
}

View File

@ -0,0 +1,73 @@
package com.github.reiseburo.verspaetung.zk
import com.github.reiseburo.verspaetung.KafkaBroker
import groovy.transform.TypeChecked
import org.slf4j.Logger
import org.slf4j.LoggerFactory
/**
* manage a list of brokers. can be online or offline. offline means the
* internal list is hidden, i.e. the list() gives you an empty list.
*/
@TypeChecked
class BrokerManager {
private static final Logger logger = LoggerFactory.getLogger(BrokerManager)
private final List<KafkaBroker> brokers = Collections.synchronizedList([])
// we start with being offline
private boolean offline = true
void add(KafkaBroker broker) {
synchronized(brokers) {
if (broker != null && this.brokers.indexOf(broker) == -1) {
this.brokers.add(broker)
logger.info('broker added: {}', broker)
}
}
}
void update(KafkaBroker broker) {
synchronized(brokers) {
if (broker == null) {
return
}
if (this.brokers.indexOf(broker) != -1) {
this.brokers.remove(broker)
}
this.brokers.add(broker)
logger.info('broker updated: {}', broker)
}
}
void remove(KafkaBroker broker) {
synchronized(brokers) {
if (broker != null && this.brokers.remove(broker)) {
logger.info('broker removed: {}', broker)
}
}
}
// TODO not sure if this is correct - see BrokerTreeWatcher
@SuppressWarnings('ConfusingMethodName')
void offline() {
this.offline = true
}
// TODO not sure if this is correct - see BrokerTreeWatcher
void online() {
this.offline = false
}
Collection<KafkaBroker> list() {
if (this.offline) {
[]
}
else {
this.brokers
}
}
}

View File

@ -0,0 +1,72 @@
package com.github.reiseburo.verspaetung.zk
import com.github.reiseburo.verspaetung.KafkaBroker
import groovy.json.JsonSlurper
import groovy.transform.TypeChecked
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
* The BrokerTreeWatcher is a kind of watcher whose sole purpose is
* to watch the segment of the Zookeeper tree where Kafka stores broker
* information
*/
@TypeChecked
class BrokerTreeWatcher extends AbstractTreeWatcher {
private static final String BROKERS_PATH = '/brokers/ids'
private final List<Closure> onBrokerUpdates
private final BrokerManager manager
private final PojoFactory factory
BrokerTreeWatcher(CuratorFramework client) {
super(client)
this.factory = new PojoFactory(new JsonSlurper())
this.manager = new BrokerManager()
this.onBrokerUpdates = []
}
String zookeeperPath() {
return BROKERS_PATH
}
/**
* Process events to keep an up to date list of brokers
*/
@Override
void childEvent(CuratorFramework client, TreeCacheEvent event) {
switch (event.type) {
case TreeCacheEvent.Type.INITIALIZED:
this.manager.online()
break
case TreeCacheEvent.Type.NODE_ADDED:
KafkaBroker broker = this.factory.createKafkaBroker(event)
this.manager.add(broker)
break
case TreeCacheEvent.Type.NODE_UPDATED:
KafkaBroker broker = this.factory.createKafkaBroker(event)
this.manager.update(broker)
break
case TreeCacheEvent.Type.NODE_REMOVED:
KafkaBroker broker = this.factory.createKafkaBroker(event)
this.manager.remove(broker)
break
// TODO these 3 events might come with path which can be mapped
// to a specific broker
case TreeCacheEvent.Type.CONNECTION_LOST:
case TreeCacheEvent.Type.CONNECTION_SUSPENDED:
this.manager.offline()
break
case TreeCacheEvent.Type.CONNECTION_RECONNECTED:
this.manager.online()
break
}
this.onBrokerUpdates.each { Closure c ->
c?.call(this.manager.list())
}
}
}

View File

@ -1,16 +1,13 @@
package com.github.lookout.verspaetung.zk
import org.apache.curator.framework.recipes.cache.ChildData
package com.github.reiseburo.verspaetung.zk
/**
* POJO representing data from Zookeeper for a consumer, topic and offset
*/
class ConsumerOffset {
private String topic
private String groupName
private Integer offset
private Integer partition
private ChildData rawData
String topic
String groupName
Integer offset
Integer partition
ConsumerOffset() {
}
@ -22,7 +19,7 @@ class ConsumerOffset {
}
String toString() {
return "ConsumerOffset[${hashCode()}] ${topic}:${partition} ${groupName} is at ${offset}"
return "ConsumerOffset[${hashCode()}] ${topic}:${partition} ${groupName} is at ${offset}".toString()
}
}

View File

@ -0,0 +1,42 @@
package com.github.reiseburo.verspaetung.zk
import groovy.transform.TypeChecked
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
* parsing TreeCacheEvents and provide helper method to access its data in typed manner
*/
@TypeChecked
class EventData {
private final String data
private final List<String> pathParts
EventData(TreeCacheEvent event) {
ChildData data = event.data
this.pathParts = data.path == null ? null : (data.path.split('/') as List)
this.data = data.data == null ? null : new String(data.data, 'UTF-8')
}
Integer asInteger() {
new Integer(data)
}
String asString() {
data
}
Integer pathPartsSize() {
pathParts.size()
}
Integer getPathPartAsInteger(int pos) {
if (pathParts.size() <= pos) {
return null
}
new Integer(pathParts[pos])
}
}

View File

@ -1,4 +1,4 @@
package com.github.lookout.verspaetung.zk
package com.github.reiseburo.verspaetung.zk
import groovy.json.JsonSlurper
import groovy.transform.TypeChecked
@ -15,7 +15,7 @@ import org.apache.curator.framework.recipes.cache.ChildData
@InheritConstructors
class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher {
private static final String ZK_PATH = '/kafka_spout'
private JsonSlurper json
private final JsonSlurper json
KafkaSpoutTreeWatcher(CuratorFramework client,
AbstractSet topics,
@ -25,12 +25,15 @@ class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher {
this.json = new JsonSlurper()
}
String zookeeperPath() { return ZK_PATH }
String zookeeperPath() {
return ZK_PATH
}
/* skipping type checking since Groovy's JsonSlurper gives us a pretty
* loose Object to deal with
*/
@TypeChecked(TypeCheckingMode.SKIP)
@SuppressWarnings(['LineLength'])
ConsumerOffset processChildData(ChildData nodeData) {
Object offsetData = json.parseText(new String(nodeData.data, 'UTF-8'))
/*

View File

@ -0,0 +1,42 @@
package com.github.reiseburo.verspaetung.zk
import com.github.reiseburo.verspaetung.KafkaBroker
import groovy.json.JsonSlurper
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
/**
* factory creating Pojo's from TreeCacheEvents
*/
class PojoFactory {
private final JsonSlurper json
PojoFactory(JsonSlurper json) {
this.json = json
}
/**
* converts an treeCacheEvent into a KafkaBroker. the caller
* is responsible for calling the factory method with the
* right data. i.e. path starts with /brokers/ids. the implementation
* just uses whatever it is given to create a KafkaBroker object
*/
KafkaBroker createKafkaBroker(TreeCacheEvent event) {
EventData data = new EventData(event)
// We're expecting paths like: /brokers/ids/1231524312
Integer id = data.getPathPartAsInteger(3)
if (id == null) {
return
}
String json = data.asString()
if (json == null) {
new KafkaBroker('', 0, id)
}
else {
Object payload = this.json.parseText(json)
new KafkaBroker(payload.host, payload.port, id)
}
}
}

View File

@ -1,4 +1,4 @@
package com.github.lookout.verspaetung.zk
package com.github.reiseburo.verspaetung.zk
import groovy.transform.TypeChecked
import groovy.transform.InheritConstructors
@ -13,12 +13,15 @@ import org.apache.curator.framework.recipes.cache.ChildData
class StandardTreeWatcher extends AbstractConsumerTreeWatcher {
private static final String ZK_PATH = '/consumers'
String zookeeperPath() { return ZK_PATH }
String zookeeperPath() {
return ZK_PATH
}
/**
* Extract the necessary information from a standard (i.e. high-level Kafka
* consumer) tree of offsets
*/
@SuppressWarnings(['LineLength'])
ConsumerOffset processChildData(ChildData data) {
/*
ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473,8595174478,1416808804928,1416808805262,1,0,0,0,1,0,8595174473, data=[48]}
@ -40,15 +43,13 @@ ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473
}
catch (NumberFormatException ex) {
logger.error("Failed to parse an Integer: ${data}")
return null
offset = null
}
return offset
}
Boolean isOffsetSubtree(String path) {
return (path =~ /\/consumers\/(.*)\/offsets\/(.*)/)
}
}

View File

@ -1,29 +0,0 @@
package com.github.lookout.verspaetung.zk
import spock.lang.*
class BrokerTreeWatcherSpec extends Specification {
BrokerTreeWatcher watcher
def setup() {
this.watcher = new BrokerTreeWatcher()
}
def "brokerIdFromPath() should return the right ID with a valid path"() {
given:
String path = "/brokers/ids/1337"
expect:
watcher.brokerIdFromPath(path) == 1337
}
def "brokerIdFromPath() should return -1 on null paths"() {
expect:
watcher.brokerIdFromPath(null) == -1
}
def "brokerIdFromPath() should return -1 on empty/invalid paths"() {
expect:
watcher.brokerIdFromPath('/spock/ed') == -1
}
}

View File

@ -0,0 +1,55 @@
package com.github.reiseburo.verspaetung
import spock.lang.*
class DelaySpec extends Specification {
Delay delay = new Delay()
def "it should give default on first use"() {
given:
expect:
delay.value() == Delay.POLLER_DELAY_MIN
}
def "slower has an upper bound"() {
given:
for(int i = 1; i < 20; i++) { delay.slower() }
def firstLast = delay.value()
def result = delay.slower()
def secondLast = delay.value()
expect:
firstLast == secondLast
firstLast == Delay.POLLER_DELAY_MAX
result == false
}
def "increasing delay gives true"() {
def result = true
for(int i = 1; delay.value() < Delay.POLLER_DELAY_MAX; i++) {
result = result && delay.slower()
}
def last = delay.slower()
expect:
result == true
last == false
}
def "reset on min value gives false"() {
given:
def result = delay.reset()
expect:
result == false
}
def "reset on none min value gives true"() {
given:
delay.slower()
def result = delay.reset()
expect:
result == true
}
}

View File

@ -1,4 +1,4 @@
package com.github.lookout.verspaetung
package com.github.reiseburo.verspaetung
import spock.lang.*

View File

@ -1,4 +1,4 @@
package com.github.lookout.verspaetung
package com.github.reiseburo.verspaetung
import spock.lang.*

View File

@ -0,0 +1,49 @@
package com.github.reiseburo.verspaetung
import spock.lang.*
/**
*/
class MainSpec extends Specification {
def "shouldExcludeGroups() shuold return false by default"() {
expect:
Main.shouldExcludeConsumer(new String[0], null) == false
}
def "shouldExcludeGroups() with a matching exclude should return true"() {
given:
final String[] groups = ['foo'] as String[]
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'foo')
expect:
Main.shouldExcludeConsumer(groups, consumer)
}
def "shouldExcludeGroups() with a matching regex should return true"() {
given:
final String[] groups = ['.*foo'] as String[]
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'spockfoo')
expect:
Main.shouldExcludeConsumer(groups, consumer)
}
def "shouldExcludeGroups() with multiple regexes that don't match should return false"() {
given:
final String[] groups = ['.*foo', 'bar(.*)'] as String[]
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'spock')
expect:
Main.shouldExcludeConsumer(groups, consumer) == false
}
def "shouldExcludeGroups() with multiple regexes which match should return true"() {
given:
final String[] groups = ['.*foo', 'bar(.*)'] as String[]
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'barstool')
expect:
Main.shouldExcludeConsumer(groups, consumer)
}
}

View File

@ -1,4 +1,4 @@
package com.github.lookout.verspaetung
package com.github.reiseburo.verspaetung
import spock.lang.*

View File

@ -1,9 +1,9 @@
package com.github.lookout.verspaetung.metrics
package com.github.reiseburo.verspaetung.metrics
import spock.lang.*
import com.github.lookout.verspaetung.KafkaConsumer
import com.github.lookout.verspaetung.TopicPartition
import com.github.reiseburo.verspaetung.KafkaConsumer
import com.github.reiseburo.verspaetung.TopicPartition
class ConsumerGaugeSpec extends Specification {
private KafkaConsumer consumer

View File

@ -1,8 +1,8 @@
package com.github.lookout.verspaetung.zk
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import com.github.lookout.verspaetung.TopicPartition
import com.github.reiseburo.verspaetung.TopicPartition
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent

View File

@ -1,8 +1,8 @@
package com.github.lookout.verspaetung.zk
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import com.github.lookout.verspaetung.TopicPartition
import com.github.reiseburo.verspaetung.TopicPartition
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent

View File

@ -0,0 +1,95 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import groovy.json.JsonSlurper
import com.github.reiseburo.verspaetung.KafkaBroker
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class BrokerManagerSpec extends Specification {
BrokerManager manager = new BrokerManager()
def "new instance is offline, i.e. has empty list of brokers"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
expect:
manager.list().size() == 0
}
def "new instance is offline after add broker"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
manager.add(broker)
expect:
manager.list().size() == 0
}
def "list shows brokers after getting online"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('', 0, 123)
manager.add(broker)
manager.online()
manager.remove(broker2)
expect:
manager.list().size() == 1
}
def "can remove brokers based on its id"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
manager.add(broker)
manager.online()
manager.remove(broker2)
expect:
manager.list().size() == 0
}
def "can update brokers"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1)
manager.online()
manager.add(broker)
manager.update(broker2)
expect:
manager.list().size() == 1
manager.list().first().host == 'localhost'
}
def "can go offline anytime"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
manager.online()
manager.add(broker)
manager.add(broker2)
manager.offline()
expect:
manager.list().size() == 0
}
def "can go offline and online again"() {
given:
KafkaBroker broker = new KafkaBroker('', 0, 1)
KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12)
manager.online()
manager.add(broker)
manager.add(broker2)
manager.offline()
manager.online()
expect:
manager.list().size() == 2
}
}

View File

@ -0,0 +1,47 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class EventDataSpec extends Specification {
EventData data
def "converts TreeCacheEvents to EventData"() {
given:
String path = "/brokers/ids/1337"
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, "be happy".bytes)))
expect:
data.getPathPartAsInteger(3) == 1337
data.asString() == 'be happy'
data.pathPartsSize() == 4
data.getPathPartAsInteger(4) == null
}
def "converts TreeCacheEvents to EventData with integer payload and no path"() {
given:
String path = "/brokers/ids/1337"
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData("/", null, "42".bytes)))
expect:
data.asString() == '42'
data.asInteger() == 42
data.pathPartsSize() == 0
data.getPathPartAsInteger(0) == null
}
def "converts TreeCacheEvents to EventData with no payload"() {
given:
String path = "/brokers/ids/1337"
data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData("/123", null, null)))
expect:
data.asString() == null
data.pathPartsSize() == 2
data.getPathPartAsInteger(1) == 123
}
}

View File

@ -1,4 +1,4 @@
package com.github.lookout.verspaetung.zk
package com.github.reiseburo.verspaetung.zk
import spock.lang.*

View File

@ -0,0 +1,52 @@
package com.github.reiseburo.verspaetung.zk
import spock.lang.*
import groovy.json.JsonSlurper
import com.github.reiseburo.verspaetung.KafkaBroker
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class PojoFactorySpec extends Specification {
PojoFactory factory = new PojoFactory(new JsonSlurper())
def "create KafkaBroker from TreeCacheEvent"() {
given:
String path = "/brokers/ids/1337"
String payload = "{\"host\":\"localhost\",\"port\":9092}"
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, payload.bytes))
KafkaBroker broker = factory.createKafkaBroker(event)
expect:
broker.brokerId == 1337
broker.host == 'localhost'
broker.port == 9092
}
def "create KafkaBroker from TreeCacheEvent without brokerId on path"() {
given:
String path = "/brokers/ids"
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, ''.bytes))
KafkaBroker broker = factory.createKafkaBroker(event)
expect:
broker == null
}
def "create KafkaBroker from TreeCacheEvent without payload"() {
given:
String path = "/brokers/ids/123"
TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED,
new ChildData(path, null, null))
KafkaBroker broker = factory.createKafkaBroker(event)
expect:
broker.brokerId == 123
broker.host == ''
broker.port == 0
}
}

View File

@ -1,4 +1,4 @@
package com.github.lookout.verspaetung.zk
package com.github.reiseburo.verspaetung.zk
import spock.lang.*