diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index 8d56c7e..7faa0a8 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -3,6 +3,7 @@ package com.github.lookout.verspaetung import com.github.lookout.verspaetung.zk.BrokerTreeWatcher import com.github.lookout.verspaetung.zk.StandardTreeWatcher +import java.util.AbstractMap import java.util.concurrent.ConcurrentHashMap import groovy.transform.TypeChecked @@ -55,29 +56,21 @@ class Main { client.start() TreeCache cache = new TreeCache(client, '/consumers') - - KafkaPoller poller = new KafkaPoller(consumers) + KafkaPoller poller = setupKafkaPoller(consumers, statsd, cli.hasOption('n')) + BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client) StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(consumers) + + consumerWatcher.onInitComplete << { logger.info("standard consumers initialized to ${consumers.size()} (topic, partition) tuples") } - BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client) brokerWatcher.onBrokerUpdates << { brokers -> poller.refresh(brokers) } cache.listenable.addListener(consumerWatcher) - poller.onDelta << { String groupName, TopicPartition tp, Long delta -> - statsd.recordGaugeValue(tp.topic, delta, [ - 'topic' : tp.topic, - 'partition' : tp.partition, - 'consumer-group' : groupName - ]) - } - - poller.start() brokerWatcher.start() cache.start() @@ -94,6 +87,33 @@ class Main { return } + + /** + * Create and start a KafkaPoller with the given statsd and consumers map + */ + static KafkaPoller setupKafkaPoller(AbstractMap consumers, + NonBlockingDogStatsDClient statsd, + Boolean dryRun) { + KafkaPoller poller = new KafkaPoller(consumers) + Closure deltaCallback = { String name, TopicPartition tp, Long delta -> + println "${tp.topic}:${tp.partition}-${name} = ${delta}" + } + + if (!dryRun) { + deltaCallback = { String name, TopicPartition tp, Long delta -> + statsd.recordGaugeValue(tp.topic, delta, [ + 'topic' : tp.topic, + 'partition' : tp.partition, + 'consumer-group' : name + ]) + } + } + + poller.onDelta << deltaCallback + poller.start() + return poller + } + static Options createCLI() { Options options = new Options() @@ -104,23 +124,28 @@ class Main { .withValueSeparator(',' as char) .create('z') - Option statsd_host = OptionBuilder.withArgName('STATSD') - .hasArg() - .withType(String) - .withDescription('Hostname for a statsd instance (defaults to localhost)') - .withLongOpt('statsd-host') - .create('H') + Option statsdHost = OptionBuilder.withArgName('STATSD') + .hasArg() + .withType(String) + .withDescription('Hostname for a statsd instance (defaults to localhost)') + .withLongOpt('statsd-host') + .create('H') - Option statsd_port = OptionBuilder.withArgName('PORT') - .hasArg() - .withType(Integer) - .withDescription('Port for the statsd instance (defaults to 8125)') - .withLongOpt('statsd-port') - .create('p') + Option statsdPort = OptionBuilder.withArgName('PORT') + .hasArg() + .withType(Integer) + .withDescription('Port for the statsd instance (defaults to 8125)') + .withLongOpt('statsd-port') + .create('p') + + Option dryRun = OptionBuilder.withDescription('Disable reporting to a statsd host') + .withLongOpt('dry-run') + .create('n') options.addOption(zookeeper) - options.addOption(statsd_host) - options.addOption(statsd_port) + options.addOption(statsdHost) + options.addOption(statsdPort) + options.addOption(dryRun) return options }