From 59c8e79f4ea05b6e52e743cdebcbf5ef45985248 Mon Sep 17 00:00:00 2001 From: Christian Meier Date: Mon, 2 Nov 2015 18:46:48 +0100 Subject: [PATCH] reduce polling speed in case kafka is responding with exceptions just double the interval on each successive error and reset it to 1 sec once a request succeeded again. maximum polling inteval is about half an hour. --- .../github/lookout/verspaetung/Delay.groovy | 40 ++++++++++++++ .../lookout/verspaetung/KafkaPoller.groovy | 18 +++++- .../lookout/verspaetung/DelaySpec.groovy | 55 +++++++++++++++++++ 3 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 src/main/groovy/com/github/lookout/verspaetung/Delay.groovy create mode 100644 src/test/groovy/com/github/lookout/verspaetung/DelaySpec.groovy diff --git a/src/main/groovy/com/github/lookout/verspaetung/Delay.groovy b/src/main/groovy/com/github/lookout/verspaetung/Delay.groovy new file mode 100644 index 0000000..95d1e8c --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/Delay.groovy @@ -0,0 +1,40 @@ +package com.github.lookout.verspaetung + +/** + * abstract the logic on how to reduce the polling speed and get back to + * to full speed. + */ +class Delay { + static final Integer POLLER_DELAY_MIN = (1 * 1000) + static final Integer POLLER_DELAY_MAX = (2048 * 1000) // about half an hour + + private Integer delay = POLLER_DELAY_MIN + + boolean reset() { + if (delay != POLLER_DELAY_MIN) { + delay = POLLER_DELAY_MIN + true + } + else { + false + } + } + + boolean slower() { + if (delay < POLLER_DELAY_MAX) { + delay += delay + true + } + else { + false + } + } + + Integer value() { + delay + } + + String toString() { + "Delay[ ${delay / 1000} sec ]" + } +} diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index 27b3f90..cfe587d 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -17,7 +17,7 @@ import scala.collection.JavaConversions * meta-data for them */ class KafkaPoller extends Thread { - private static final Integer POLLER_DELAY = (1 * 1000) + private static final String KAFKA_CLIENT_ID = 'VerspaetungClient' private static final Integer KAFKA_TIMEOUT = (5 * 1000) private static final Integer KAFKA_BUFFER = (100 * 1024) @@ -48,7 +48,8 @@ class KafkaPoller extends Thread { @SuppressWarnings(['LoggingSwallowsStacktrace', 'CatchException']) void run() { LOGGER.info('Starting wait loop') - + Delay delay = new Delay() + LOGGER.error('polling ' + delay) while (keepRunning) { LOGGER.debug('poll loop') @@ -62,16 +63,27 @@ class KafkaPoller extends Thread { if (this.currentTopics.size() > 0) { try { dumpMetadata() + if (delay.reset()) { + LOGGER.error('back to normal ' + delay) + } } catch (KafkaException kex) { LOGGER.error('Failed to interact with Kafka: {}', kex.message) + slower(delay) } catch (Exception ex) { LOGGER.error('Failed to fetch and dump Kafka metadata', ex) + slower(delay) } } - Thread.sleep(POLLER_DELAY) + Thread.sleep(delay.value()) + } + } + + void slower(Delay delay) { + if (delay.slower()) { + LOGGER.error('using ' + delay) } } diff --git a/src/test/groovy/com/github/lookout/verspaetung/DelaySpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/DelaySpec.groovy new file mode 100644 index 0000000..2c09b0f --- /dev/null +++ b/src/test/groovy/com/github/lookout/verspaetung/DelaySpec.groovy @@ -0,0 +1,55 @@ +package com.github.lookout.verspaetung + +import spock.lang.* + +class DelaySpec extends Specification { + Delay delay = new Delay() + + def "it should give default on first use"() { + given: + + expect: + delay.value() == Delay.POLLER_DELAY_MIN + } + + def "slower has an upper bound"() { + given: + for(int i = 1; i < 20; i++) { delay.slower() } + def firstLast = delay.value() + def result = delay.slower() + def secondLast = delay.value() + + expect: + firstLast == secondLast + firstLast == Delay.POLLER_DELAY_MAX + result == false + } + + def "increasing delay gives true"() { + def result = true + for(int i = 1; delay.value() < Delay.POLLER_DELAY_MAX; i++) { + result = result && delay.slower() + } + def last = delay.slower() + + expect: + result == true + last == false + } + + def "reset on min value gives false"() { + given: + def result = delay.reset() + + expect: + result == false + } + def "reset on none min value gives true"() { + given: + delay.slower() + def result = delay.reset() + + expect: + result == true + } +}