package com.github.reiseburo.verspaetung import com.github.reiseburo.verspaetung.zk.BrokerTreeWatcher import com.github.reiseburo.verspaetung.zk.KafkaSpoutTreeWatcher import com.github.reiseburo.verspaetung.zk.StandardTreeWatcher import com.github.reiseburo.verspaetung.metrics.ConsumerGauge import com.github.reiseburo.verspaetung.metrics.HeartbeatGauge import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentSkipListSet import java.util.concurrent.TimeUnit import org.apache.commons.cli.* import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.CuratorFramework import org.coursera.metrics.datadog.DatadogReporter import org.coursera.metrics.datadog.transport.UdpTransport import org.slf4j.Logger import org.slf4j.LoggerFactory import com.codahale.metrics.* /** * Main entry point for running the verspaetung application */ @SuppressWarnings class Main { private static final String METRICS_PREFIX = 'verspaetung' private static final Logger logger = LoggerFactory.getLogger(Main) private static final MetricRegistry registry = new MetricRegistry() private static ScheduledReporter reporter static void main(String[] args) { String statsdPrefix = METRICS_PREFIX String zookeeperHosts = 'localhost:2181' String statsdHost = 'localhost' Integer statsdPort = 8125 Integer delayInSeconds = 5 String[] excludeGroups = [] CommandLine cli = parseCommandLine(args) if (cli.hasOption('z')) { zookeeperHosts = cli.getOptionValue('z') } if (cli.hasOption('H')) { statsdHost = cli.getOptionValue('H') } if (cli.hasOption('p')) { statsdPort = cli.getOptionValue('p') } if (cli.hasOption('d')) { delayInSeconds = cli.getOptionValue('d').toInteger() } if (cli.hasOption('x')) { excludeGroups = cli.getOptionValues('x') }"Running with: ${args}") logger.warn('Using: zookeepers={} statsd={}:{}', zookeeperHosts, statsdHost, statsdPort)'Reporting every {} seconds', delayInSeconds) if (cli.hasOption('prefix')) { statsdPrefix = "${cli.getOptionValue('prefix')}.${METRICS_PREFIX}" } registry.register(, 'heartbeat'), new HeartbeatGauge()) ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3) CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperHosts, retry) client.start() /* We need a good shared set of all the topics we should keep an eye on * for the Kafka poller. This will be written to by the tree watchers * and read from by the poller, e.g. * Watcher --/write/--> watchedTopics --/read/--> KafkaPoller */ ConcurrentSkipListSet watchedTopics = new ConcurrentSkipListSet<>() /* consumerOffsets is where we will keep all the offsets from Zookeeper * from the Kafka consumers */ ConcurrentHashMap consumerOffsets = new ConcurrentHashMap<>() /* topicOffsets is where the KafkaPoller should be writing all of it's * latest offsets from querying the Kafka brokers */ ConcurrentHashMap topicOffsets = new ConcurrentHashMap<>() /* Hash map for keeping track of KafkaConsumer to ConsumerGauge * instances. We're only really doing this because the MetricRegistry * doesn't do a terrific job of exposing this for us */ ConcurrentHashMap consumerGauges = new ConcurrentHashMap<>() KafkaPoller poller = new KafkaPoller(topicOffsets, watchedTopics) BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client).start() brokerWatcher.onBrokerUpdates << { brokers -> poller.refresh(brokers) } poller.start() /* Need to reuse this closure for the KafkaSpoutTreeWatcher if we have * one */ Closure gaugeRegistrar = { KafkaConsumer consumer -> if (!shouldExcludeConsumer(excludeGroups, consumer)) { registerMetricFor(consumer, consumerGauges, consumerOffsets, topicOffsets) } } StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, watchedTopics, consumerOffsets) consumerWatcher.onConsumerData << gaugeRegistrar consumerWatcher.start() /* Assuming that most people aren't needing to run Storm-based watchers * as well */ KafkaSpoutTreeWatcher stormWatcher = null if (cli.hasOption('s')) { stormWatcher = new KafkaSpoutTreeWatcher(client, watchedTopics, consumerOffsets) stormWatcher.onConsumerData << gaugeRegistrar stormWatcher.start() } if (cli.hasOption('n')) { reporter = ConsoleReporter.forRegistry(registry) .convertRatesTo(TimeUnit.SECONDS) .convertDurationsTo(TimeUnit.MILLISECONDS) .build() } else { UdpTransport transport = new UdpTransport.Builder() .withPrefix(statsdPrefix) .build() reporter = DatadogReporter.forRegistry(registry) .withEC2Host() .withTransport(transport) .build() } /* Start the reporter if we've got it */ reporter?.start(delayInSeconds, TimeUnit.SECONDS) // shutdown threads Runtime.getRuntime().addShutdownHook(new Thread() { public void run() {"showdown threads") poller.die() consumerWatcher.close() if (stormWatcher != null) { stormWatcher.close() } poller.join() } });'Starting wait loop...') } static void registerMetricFor(KafkaConsumer consumer, ConcurrentHashMap consumerGauges, ConcurrentHashMap consumerOffsets, ConcurrentHashMap topicOffsets) { /* * Bail early if we already ahve our Consumer registered */ if (consumerGauges.containsKey(consumer)) { return } ConsumerGauge gauge = new ConsumerGauge(consumer, consumerOffsets, topicOffsets) consumerGauges.put(consumer, gauge) this.registry.register(gauge.nameForRegistry, gauge) } /** * Create the Options option necessary for verspaetung to have CLI options */ static Options createCLI() { Options options = new Options() Option zookeeper = OptionBuilder.withArgName('HOSTS') .hasArg() .withDescription('Comma separated list of Zookeeper hosts (e.g. localhost:2181)') .withLongOpt('zookeeper') .withValueSeparator(',' as char) .create('z') Option excludeGroups = OptionBuilder.withArgName('EXCLUDES') .hasArgs() .withDescription('Regular expression for consumer groups to exclude from reporting (can be declared multiple times)') .withLongOpt('exclude') .create('x') Option statsdHost = OptionBuilder.withArgName('STATSD') .hasArg() .withType(String) .withDescription('Hostname for a statsd instance (defaults to localhost)') .withLongOpt('statsd-host') .create('H') Option statsdPort = OptionBuilder.withArgName('PORT') .hasArg() .withType(Integer) .withDescription('Port for the statsd instance (defaults to 8125)') .withLongOpt('statsd-port') .create('p') Option dryRun = OptionBuilder.withDescription('Disable reporting to a statsd host') .withLongOpt('dry-run') .create('n') Option stormSpouts = OptionBuilder.withDescription('Watch Storm KafkaSpout offsets (under /kafka_spout)') .withLongOpt('storm') .create('s') Option statsdPrefix = OptionBuilder.withArgName('PREFIX') .hasArg() .withType(String) .withDescription("Prefix all metrics with PREFIX before they're reported (e.g. PREFIX.verspaetung.mytopic)") .withLongOpt('prefix') .create() Option delaySeconds = OptionBuilder.withArgName('DELAY') .hasArg() .withType(Integer) .withDescription("Seconds to delay between reporting metrics to the metrics receiver (defaults: 5s)") .withLongOpt('delay') .create('d') options.addOption(zookeeper) options.addOption(statsdHost) options.addOption(statsdPort) options.addOption(statsdPrefix) options.addOption(dryRun) options.addOption(stormSpouts) options.addOption(delaySeconds) options.addOption(excludeGroups) return options } /** * Parse out all the command line options from the array of string * arguments */ static CommandLine parseCommandLine(String[] args) { Options options = createCLI() PosixParser parser = new PosixParser() try { return parser.parse(options, args) } catch (MissingOptionException|UnrecognizedOptionException ex) { HelpFormatter formatter = new HelpFormatter() println ex.message formatter.printHelp('verspaetung', options) System.exit(1) } } /** * Return true if we should exclude the given KafkaConsumer from reporting */ static boolean shouldExcludeConsumer(String[] excludeGroups, KafkaConsumer consumer) { return null != excludeGroups?.find { String excludeRule -> consumer?.name.matches(excludeRule) } } }