diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index 260dc3c..5683d84 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -79,6 +79,7 @@ class KafkaPoller extends Thread { Thread.sleep(delay.value()) } + disconnectConsumers() } private void slower(Delay delay) { @@ -178,7 +179,6 @@ class KafkaPoller extends Thread { */ void die() { this.keepRunning = false - disconnectConsumers() } private void disconnectConsumers() { diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index 173fd2a..ab13ef6 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -126,8 +126,9 @@ class Main { /* Assuming that most people aren't needing to run Storm-based watchers * as well */ + KafkaSpoutTreeWatcher stormWatcher = null if (cli.hasOption('s')) { - KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, + stormWatcher = new KafkaSpoutTreeWatcher(client, watchedTopics, consumerOffsets) stormWatcher.onConsumerData << gaugeRegistrar @@ -155,6 +156,18 @@ class Main { /* Start the reporter if we've got it */ reporter?.start(delayInSeconds, TimeUnit.SECONDS) + // shutdown threads + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + Main.logger.info("showdown threads") + poller.die() + consumerWatcher.close() + if (stormWatcher != null) { + stormWatcher.close() + } + poller.join() + } + }); logger.info('Starting wait loop...') synchronized(this) { wait() diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy index 81dcab5..c4f5bc9 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy @@ -47,5 +47,13 @@ abstract class AbstractTreeWatcher implements TreeCacheListener { return this } + /** + * Close our internal cache and return ourselves for API cleanliness + */ + AbstractTreeWatcher close() { + this.cache?.close() + return this + } + abstract void childEvent(CuratorFramework client, TreeCacheEvent event) }