reduce polling speed in case kafka is responding with exceptions

just double the interval on each successive error and reset it to 1 sec
once a request succeeded again. maximum polling inteval is about half
an hour.
This commit is contained in:
Christian Meier 2015-11-02 18:46:48 +01:00
parent 07bf319972
commit 59c8e79f4e
3 changed files with 110 additions and 3 deletions

View File

@ -0,0 +1,40 @@
package com.github.lookout.verspaetung
/**
* abstract the logic on how to reduce the polling speed and get back to
* to full speed.
*/
class Delay {
static final Integer POLLER_DELAY_MIN = (1 * 1000)
static final Integer POLLER_DELAY_MAX = (2048 * 1000) // about half an hour
private Integer delay = POLLER_DELAY_MIN
boolean reset() {
if (delay != POLLER_DELAY_MIN) {
delay = POLLER_DELAY_MIN
true
}
else {
false
}
}
boolean slower() {
if (delay < POLLER_DELAY_MAX) {
delay += delay
true
}
else {
false
}
}
Integer value() {
delay
}
String toString() {
"Delay[ ${delay / 1000} sec ]"
}
}

View File

@ -17,7 +17,7 @@ import scala.collection.JavaConversions
* meta-data for them
*/
class KafkaPoller extends Thread {
private static final Integer POLLER_DELAY = (1 * 1000)
private static final String KAFKA_CLIENT_ID = 'VerspaetungClient'
private static final Integer KAFKA_TIMEOUT = (5 * 1000)
private static final Integer KAFKA_BUFFER = (100 * 1024)
@ -48,7 +48,8 @@ class KafkaPoller extends Thread {
@SuppressWarnings(['LoggingSwallowsStacktrace', 'CatchException'])
void run() {
LOGGER.info('Starting wait loop')
Delay delay = new Delay()
LOGGER.error('polling ' + delay)
while (keepRunning) {
LOGGER.debug('poll loop')
@ -62,16 +63,27 @@ class KafkaPoller extends Thread {
if (this.currentTopics.size() > 0) {
try {
dumpMetadata()
if (delay.reset()) {
LOGGER.error('back to normal ' + delay)
}
}
catch (KafkaException kex) {
LOGGER.error('Failed to interact with Kafka: {}', kex.message)
slower(delay)
}
catch (Exception ex) {
LOGGER.error('Failed to fetch and dump Kafka metadata', ex)
slower(delay)
}
}
Thread.sleep(POLLER_DELAY)
Thread.sleep(delay.value())
}
}
void slower(Delay delay) {
if (delay.slower()) {
LOGGER.error('using ' + delay)
}
}

View File

@ -0,0 +1,55 @@
package com.github.lookout.verspaetung
import spock.lang.*
class DelaySpec extends Specification {
Delay delay = new Delay()
def "it should give default on first use"() {
given:
expect:
delay.value() == Delay.POLLER_DELAY_MIN
}
def "slower has an upper bound"() {
given:
for(int i = 1; i < 20; i++) { delay.slower() }
def firstLast = delay.value()
def result = delay.slower()
def secondLast = delay.value()
expect:
firstLast == secondLast
firstLast == Delay.POLLER_DELAY_MAX
result == false
}
def "increasing delay gives true"() {
def result = true
for(int i = 1; delay.value() < Delay.POLLER_DELAY_MAX; i++) {
result = result && delay.slower()
}
def last = delay.slower()
expect:
result == true
last == false
}
def "reset on min value gives false"() {
given:
def result = delay.reset()
expect:
result == false
}
def "reset on none min value gives true"() {
given:
delay.slower()
def result = delay.reset()
expect:
result == true
}
}