Refactor some of the KafkaPoller setup out and add support for --dry-run

Fixes #10
This commit is contained in:
R. Tyler Croy 2015-01-28 01:13:13 -08:00
parent a444f49b46
commit 0706895af1
1 changed files with 51 additions and 26 deletions

View File

@ -3,6 +3,7 @@ package com.github.lookout.verspaetung
import com.github.lookout.verspaetung.zk.BrokerTreeWatcher
import com.github.lookout.verspaetung.zk.StandardTreeWatcher
import java.util.AbstractMap
import java.util.concurrent.ConcurrentHashMap
import groovy.transform.TypeChecked
@ -55,29 +56,21 @@ class Main {
client.start()
TreeCache cache = new TreeCache(client, '/consumers')
KafkaPoller poller = new KafkaPoller(consumers)
KafkaPoller poller = setupKafkaPoller(consumers, statsd, cli.hasOption('n'))
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client)
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(consumers)
consumerWatcher.onInitComplete << {
logger.info("standard consumers initialized to ${consumers.size()} (topic, partition) tuples")
}
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client)
brokerWatcher.onBrokerUpdates << { brokers ->
poller.refresh(brokers)
}
cache.listenable.addListener(consumerWatcher)
poller.onDelta << { String groupName, TopicPartition tp, Long delta ->
statsd.recordGaugeValue(tp.topic, delta, [
'topic' : tp.topic,
'partition' : tp.partition,
'consumer-group' : groupName
])
}
poller.start()
brokerWatcher.start()
cache.start()
@ -94,6 +87,33 @@ class Main {
return
}
/**
* Create and start a KafkaPoller with the given statsd and consumers map
*/
static KafkaPoller setupKafkaPoller(AbstractMap consumers,
NonBlockingDogStatsDClient statsd,
Boolean dryRun) {
KafkaPoller poller = new KafkaPoller(consumers)
Closure deltaCallback = { String name, TopicPartition tp, Long delta ->
println "${tp.topic}:${tp.partition}-${name} = ${delta}"
}
if (!dryRun) {
deltaCallback = { String name, TopicPartition tp, Long delta ->
statsd.recordGaugeValue(tp.topic, delta, [
'topic' : tp.topic,
'partition' : tp.partition,
'consumer-group' : name
])
}
}
poller.onDelta << deltaCallback
poller.start()
return poller
}
static Options createCLI() {
Options options = new Options()
@ -104,23 +124,28 @@ class Main {
.withValueSeparator(',' as char)
.create('z')
Option statsd_host = OptionBuilder.withArgName('STATSD')
.hasArg()
.withType(String)
.withDescription('Hostname for a statsd instance (defaults to localhost)')
.withLongOpt('statsd-host')
.create('H')
Option statsdHost = OptionBuilder.withArgName('STATSD')
.hasArg()
.withType(String)
.withDescription('Hostname for a statsd instance (defaults to localhost)')
.withLongOpt('statsd-host')
.create('H')
Option statsd_port = OptionBuilder.withArgName('PORT')
.hasArg()
.withType(Integer)
.withDescription('Port for the statsd instance (defaults to 8125)')
.withLongOpt('statsd-port')
.create('p')
Option statsdPort = OptionBuilder.withArgName('PORT')
.hasArg()
.withType(Integer)
.withDescription('Port for the statsd instance (defaults to 8125)')
.withLongOpt('statsd-port')
.create('p')
Option dryRun = OptionBuilder.withDescription('Disable reporting to a statsd host')
.withLongOpt('dry-run')
.create('n')
options.addOption(zookeeper)
options.addOption(statsd_host)
options.addOption(statsd_port)
options.addOption(statsdHost)
options.addOption(statsdPort)
options.addOption(dryRun)
return options
}