From 59c8e79f4ea05b6e52e743cdebcbf5ef45985248 Mon Sep 17 00:00:00 2001 From: Christian Meier Date: Mon, 2 Nov 2015 18:46:48 +0100 Subject: [PATCH 1/4] reduce polling speed in case kafka is responding with exceptions just double the interval on each successive error and reset it to 1 sec once a request succeeded again. maximum polling inteval is about half an hour. --- .../github/lookout/verspaetung/Delay.groovy | 40 ++++++++++++++ .../lookout/verspaetung/KafkaPoller.groovy | 18 +++++- .../lookout/verspaetung/DelaySpec.groovy | 55 +++++++++++++++++++ 3 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 src/main/groovy/com/github/lookout/verspaetung/Delay.groovy create mode 100644 src/test/groovy/com/github/lookout/verspaetung/DelaySpec.groovy diff --git a/src/main/groovy/com/github/lookout/verspaetung/Delay.groovy b/src/main/groovy/com/github/lookout/verspaetung/Delay.groovy new file mode 100644 index 0000000..95d1e8c --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/Delay.groovy @@ -0,0 +1,40 @@ +package com.github.lookout.verspaetung + +/** + * abstract the logic on how to reduce the polling speed and get back to + * to full speed. + */ +class Delay { + static final Integer POLLER_DELAY_MIN = (1 * 1000) + static final Integer POLLER_DELAY_MAX = (2048 * 1000) // about half an hour + + private Integer delay = POLLER_DELAY_MIN + + boolean reset() { + if (delay != POLLER_DELAY_MIN) { + delay = POLLER_DELAY_MIN + true + } + else { + false + } + } + + boolean slower() { + if (delay < POLLER_DELAY_MAX) { + delay += delay + true + } + else { + false + } + } + + Integer value() { + delay + } + + String toString() { + "Delay[ ${delay / 1000} sec ]" + } +} diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index 27b3f90..cfe587d 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -17,7 +17,7 @@ import scala.collection.JavaConversions * meta-data for them */ class KafkaPoller extends Thread { - private static final Integer POLLER_DELAY = (1 * 1000) + private static final String KAFKA_CLIENT_ID = 'VerspaetungClient' private static final Integer KAFKA_TIMEOUT = (5 * 1000) private static final Integer KAFKA_BUFFER = (100 * 1024) @@ -48,7 +48,8 @@ class KafkaPoller extends Thread { @SuppressWarnings(['LoggingSwallowsStacktrace', 'CatchException']) void run() { LOGGER.info('Starting wait loop') - + Delay delay = new Delay() + LOGGER.error('polling ' + delay) while (keepRunning) { LOGGER.debug('poll loop') @@ -62,16 +63,27 @@ class KafkaPoller extends Thread { if (this.currentTopics.size() > 0) { try { dumpMetadata() + if (delay.reset()) { + LOGGER.error('back to normal ' + delay) + } } catch (KafkaException kex) { LOGGER.error('Failed to interact with Kafka: {}', kex.message) + slower(delay) } catch (Exception ex) { LOGGER.error('Failed to fetch and dump Kafka metadata', ex) + slower(delay) } } - Thread.sleep(POLLER_DELAY) + Thread.sleep(delay.value()) + } + } + + void slower(Delay delay) { + if (delay.slower()) { + LOGGER.error('using ' + delay) } } diff --git a/src/test/groovy/com/github/lookout/verspaetung/DelaySpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/DelaySpec.groovy new file mode 100644 index 0000000..2c09b0f --- /dev/null +++ b/src/test/groovy/com/github/lookout/verspaetung/DelaySpec.groovy @@ -0,0 +1,55 @@ +package com.github.lookout.verspaetung + +import spock.lang.* + +class DelaySpec extends Specification { + Delay delay = new Delay() + + def "it should give default on first use"() { + given: + + expect: + delay.value() == Delay.POLLER_DELAY_MIN + } + + def "slower has an upper bound"() { + given: + for(int i = 1; i < 20; i++) { delay.slower() } + def firstLast = delay.value() + def result = delay.slower() + def secondLast = delay.value() + + expect: + firstLast == secondLast + firstLast == Delay.POLLER_DELAY_MAX + result == false + } + + def "increasing delay gives true"() { + def result = true + for(int i = 1; delay.value() < Delay.POLLER_DELAY_MAX; i++) { + result = result && delay.slower() + } + def last = delay.slower() + + expect: + result == true + last == false + } + + def "reset on min value gives false"() { + given: + def result = delay.reset() + + expect: + result == false + } + def "reset on none min value gives true"() { + given: + delay.slower() + def result = delay.reset() + + expect: + result == true + } +} From 45d98cb5f675f0251340271201e460fc676ab775 Mon Sep 17 00:00:00 2001 From: Christian Meier Date: Mon, 2 Nov 2015 19:51:32 +0100 Subject: [PATCH 2/4] allow new nodes to be added when they pop up also ensure the broker list is synchronized in both the KafkaPoller as well the BrokerTreeWatcher threads --- .../lookout/verspaetung/KafkaPoller.groovy | 31 ++++++++++++------- .../verspaetung/zk/BrokerTreeWatcher.groovy | 11 +++++-- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index cfe587d..c3ee6c8 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -29,7 +29,7 @@ class KafkaPoller extends Thread { private final AbstractMap topicOffsetMap private final List onDelta private final AbstractSet currentTopics - private List brokers + private final List brokers KafkaPoller(AbstractMap map, AbstractSet topicSet) { this.topicOffsetMap = map @@ -159,14 +159,16 @@ class KafkaPoller extends Thread { void reconnect() { disconnectConsumers() LOGGER.info('Creating SimpleConsumer connections for brokers') - this.brokers.each { Broker b -> - SimpleConsumer consumer = new SimpleConsumer(b.host, - b.port, - KAFKA_TIMEOUT, - KAFKA_BUFFER, - KAFKA_CLIENT_ID) - consumer.connect() - this.brokerConsumerMap[b.id] = consumer + synchronized(this.brokers) { + this.brokers.each { Broker b -> + SimpleConsumer consumer = new SimpleConsumer(b.host, + b.port, + KAFKA_TIMEOUT, + KAFKA_BUFFER, + KAFKA_CLIENT_ID) + consumer.connect() + this.brokerConsumerMap[b.id] = consumer + } } this.shouldReconnect = false } @@ -190,8 +192,11 @@ class KafkaPoller extends Thread { * Store a new list of KafkaBroker objects and signal a reconnection */ void refresh(List brokers) { - this.brokers = brokers.collect { KafkaBroker b -> - new Broker(b.brokerId, b.host, b.port) + synchronized(this.brokers) { + this.brokers.clear() + this.brokers.addAll(brokers.collect { KafkaBroker b -> + new Broker(b.brokerId, b.host, b.port) + }) } this.shouldReconnect = true } @@ -201,7 +206,9 @@ class KafkaPoller extends Thread { * scala underpinnings */ private scala.collection.immutable.Seq getBrokersSeq() { - return JavaConversions.asScalaBuffer(this.brokers).toList() + synchronized(this.brokers) { + return JavaConversions.asScalaBuffer(this.brokers).toList() + } } /** diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy index 310551a..8420fa2 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy @@ -28,7 +28,7 @@ class BrokerTreeWatcher extends AbstractTreeWatcher { super(client) this.json = new JsonSlurper() - this.brokers = [] + this.brokers = Collections.synchronizedList([]) this.onBrokerUpdates = [] } @@ -47,9 +47,8 @@ class BrokerTreeWatcher extends AbstractTreeWatcher { */ if (event.type == TreeCacheEvent.Type.INITIALIZED) { this.isTreeInitialized = true - List threadsafeBrokers = Collections.synchronizedList(this.brokers) this.onBrokerUpdates.each { Closure c -> - c?.call(threadsafeBrokers) + c?.call(this.brokers) } return } @@ -68,6 +67,12 @@ class BrokerTreeWatcher extends AbstractTreeWatcher { Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8')) this.brokers << new KafkaBroker(brokerData, brokerId) + + if (this.isTreeInitialized) { + this.onBrokerUpdates.each { Closure c -> + c?.call(this.brokers) + } + } } /** From 439e8f635e91ab53258dd2320fcf0ec225b9eaec Mon Sep 17 00:00:00 2001 From: Christian Meier Date: Mon, 2 Nov 2015 19:56:42 +0100 Subject: [PATCH 3/4] declare private methods as such helps to understand what methods could be used by other threads --- .../github/lookout/verspaetung/KafkaPoller.groovy | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index c3ee6c8..260dc3c 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -81,14 +81,14 @@ class KafkaPoller extends Thread { } } - void slower(Delay delay) { + private void slower(Delay delay) { if (delay.slower()) { LOGGER.error('using ' + delay) } } @SuppressWarnings(['CatchException']) - void dumpMetadata() { + private void dumpMetadata() { LOGGER.debug('dumping meta-data') Object metadata = fetchMetadataForCurrentTopics() @@ -113,7 +113,7 @@ class KafkaPoller extends Thread { * The 'metadata' is the expected return from * kafka.client.ClientUtils.fetchTopicMetadata */ - void withTopicsAndPartitions(Object metadata, Closure closure) { + private void withTopicsAndPartitions(Object metadata, Closure closure) { withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f -> withScalaCollection(f.partitionsMetadata).each { p -> TopicPartition tp = new TopicPartition(f.topic, p.partitionId) @@ -125,7 +125,7 @@ class KafkaPoller extends Thread { /** * Fetch the leader metadata and update our data structures */ - void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) { + private void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) { Integer leaderId = partitionMetadata.leader.get()?.id Integer partitionId = partitionMetadata.partitionId @@ -134,7 +134,7 @@ class KafkaPoller extends Thread { this.topicOffsetMap[tp] = offset } - Long latestFromLeader(Integer leaderId, String topic, Integer partition) { + private Long latestFromLeader(Integer leaderId, String topic, Integer partition) { SimpleConsumer consumer = this.brokerConsumerMap[leaderId] /* If we don't have a proper SimpleConsumer instance (e.g. null) then @@ -149,14 +149,14 @@ class KafkaPoller extends Thread { return consumer.earliestOrLatestOffset(topicAndPart, -1, 0) } - Iterable withScalaCollection(scala.collection.Iterable iter) { + private Iterable withScalaCollection(scala.collection.Iterable iter) { return JavaConversions.asJavaIterable(iter) } /** * Blocking reconnect to the Kafka brokers */ - void reconnect() { + private void reconnect() { disconnectConsumers() LOGGER.info('Creating SimpleConsumer connections for brokers') synchronized(this.brokers) { From 98af950498820eacbbff280ee3633557157534ab Mon Sep 17 00:00:00 2001 From: Christian Meier Date: Mon, 2 Nov 2015 20:20:43 +0100 Subject: [PATCH 4/4] be nice and close all resources on shutdown --- .../github/lookout/verspaetung/KafkaPoller.groovy | 2 +- .../com/github/lookout/verspaetung/Main.groovy | 15 ++++++++++++++- .../verspaetung/zk/AbstractTreeWatcher.groovy | 8 ++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index 260dc3c..5683d84 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -79,6 +79,7 @@ class KafkaPoller extends Thread { Thread.sleep(delay.value()) } + disconnectConsumers() } private void slower(Delay delay) { @@ -178,7 +179,6 @@ class KafkaPoller extends Thread { */ void die() { this.keepRunning = false - disconnectConsumers() } private void disconnectConsumers() { diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index 173fd2a..ab13ef6 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -126,8 +126,9 @@ class Main { /* Assuming that most people aren't needing to run Storm-based watchers * as well */ + KafkaSpoutTreeWatcher stormWatcher = null if (cli.hasOption('s')) { - KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, + stormWatcher = new KafkaSpoutTreeWatcher(client, watchedTopics, consumerOffsets) stormWatcher.onConsumerData << gaugeRegistrar @@ -155,6 +156,18 @@ class Main { /* Start the reporter if we've got it */ reporter?.start(delayInSeconds, TimeUnit.SECONDS) + // shutdown threads + Runtime.getRuntime().addShutdownHook(new Thread() { + public void run() { + Main.logger.info("showdown threads") + poller.die() + consumerWatcher.close() + if (stormWatcher != null) { + stormWatcher.close() + } + poller.join() + } + }); logger.info('Starting wait loop...') synchronized(this) { wait() diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy index 81dcab5..c4f5bc9 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy @@ -47,5 +47,13 @@ abstract class AbstractTreeWatcher implements TreeCacheListener { return this } + /** + * Close our internal cache and return ourselves for API cleanliness + */ + AbstractTreeWatcher close() { + this.cache?.close() + return this + } + abstract void childEvent(CuratorFramework client, TreeCacheEvent event) }