From 98af950498820eacbbff280ee3633557157534ab Mon Sep 17 00:00:00 2001 From: Christian Meier Date: Mon, 2 Nov 2015 20:20:43 +0100 Subject: [PATCH] be nice and close all resources on shutdown --- .../github/lookout/verspaetung/KafkaPoller.groovy | 2 +- .../com/github/lookout/verspaetung/Main.groovy | 15 ++++++++++++++- .../verspaetung/zk/AbstractTreeWatcher.groovy | 8 ++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index 260dc3c..5683d84 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -79,6 +79,7 @@ class KafkaPoller extends Thread { Thread.sleep(delay.value()) } + disconnectConsumers() } private void slower(Delay delay) { @@ -178,7 +179,6 @@ class KafkaPoller extends Thread { */ void die() { this.keepRunning = false - disconnectConsumers() } private void disconnectConsumers() { diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index 173fd2a..ab13ef6 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -126,8 +126,9 @@ class Main { /* Assuming that most people aren't needing to run Storm-based watchers * as well */ + KafkaSpoutTreeWatcher stormWatcher = null if (cli.hasOption('s')) { - KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, + stormWatcher = new KafkaSpoutTreeWatcher(client, watchedTopics, consumerOffsets) stormWatcher.onConsumerData << gaugeRegistrar @@ -155,6 +156,18 @@ class Main { /* Start the reporter if we've got it */ reporter?.start(delayInSeconds, TimeUnit.SECONDS) + // shutdown threads + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + Main.logger.info("showdown threads") + poller.die() + consumerWatcher.close() + if (stormWatcher != null) { + stormWatcher.close() + } + poller.join() + } + }); logger.info('Starting wait loop...') synchronized(this) { wait() diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy index 81dcab5..c4f5bc9 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy @@ -47,5 +47,13 @@ abstract class AbstractTreeWatcher implements TreeCacheListener { return this } + /** + * Close our internal cache and return ourselves for API cleanliness + */ + AbstractTreeWatcher close() { + this.cache?.close() + return this + } + abstract void childEvent(CuratorFramework client, TreeCacheEvent event) }