Merge pull request #35 from mkristian/be-nice

Be nice
This commit is contained in:
R. Tyler Croy 2015-11-03 12:07:48 -08:00
commit f15aeaee0c
6 changed files with 166 additions and 26 deletions

View File

@ -0,0 +1,40 @@
package com.github.lookout.verspaetung
/**
* abstract the logic on how to reduce the polling speed and get back to
* to full speed.
*/
class Delay {
static final Integer POLLER_DELAY_MIN = (1 * 1000)
static final Integer POLLER_DELAY_MAX = (2048 * 1000) // about half an hour
private Integer delay = POLLER_DELAY_MIN
boolean reset() {
if (delay != POLLER_DELAY_MIN) {
delay = POLLER_DELAY_MIN
true
}
else {
false
}
}
boolean slower() {
if (delay < POLLER_DELAY_MAX) {
delay += delay
true
}
else {
false
}
}
Integer value() {
delay
}
String toString() {
"Delay[ ${delay / 1000} sec ]"
}
}

View File

@ -17,7 +17,7 @@ import scala.collection.JavaConversions
* meta-data for them * meta-data for them
*/ */
class KafkaPoller extends Thread { class KafkaPoller extends Thread {
private static final Integer POLLER_DELAY = (1 * 1000)
private static final String KAFKA_CLIENT_ID = 'VerspaetungClient' private static final String KAFKA_CLIENT_ID = 'VerspaetungClient'
private static final Integer KAFKA_TIMEOUT = (5 * 1000) private static final Integer KAFKA_TIMEOUT = (5 * 1000)
private static final Integer KAFKA_BUFFER = (100 * 1024) private static final Integer KAFKA_BUFFER = (100 * 1024)
@ -29,7 +29,7 @@ class KafkaPoller extends Thread {
private final AbstractMap<TopicPartition, Long> topicOffsetMap private final AbstractMap<TopicPartition, Long> topicOffsetMap
private final List<Closure> onDelta private final List<Closure> onDelta
private final AbstractSet<String> currentTopics private final AbstractSet<String> currentTopics
private List<Broker> brokers private final List<Broker> brokers
KafkaPoller(AbstractMap map, AbstractSet topicSet) { KafkaPoller(AbstractMap map, AbstractSet topicSet) {
this.topicOffsetMap = map this.topicOffsetMap = map
@ -48,7 +48,8 @@ class KafkaPoller extends Thread {
@SuppressWarnings(['LoggingSwallowsStacktrace', 'CatchException']) @SuppressWarnings(['LoggingSwallowsStacktrace', 'CatchException'])
void run() { void run() {
LOGGER.info('Starting wait loop') LOGGER.info('Starting wait loop')
Delay delay = new Delay()
LOGGER.error('polling ' + delay)
while (keepRunning) { while (keepRunning) {
LOGGER.debug('poll loop') LOGGER.debug('poll loop')
@ -62,21 +63,33 @@ class KafkaPoller extends Thread {
if (this.currentTopics.size() > 0) { if (this.currentTopics.size() > 0) {
try { try {
dumpMetadata() dumpMetadata()
if (delay.reset()) {
LOGGER.error('back to normal ' + delay)
}
} }
catch (KafkaException kex) { catch (KafkaException kex) {
LOGGER.error('Failed to interact with Kafka: {}', kex.message) LOGGER.error('Failed to interact with Kafka: {}', kex.message)
slower(delay)
} }
catch (Exception ex) { catch (Exception ex) {
LOGGER.error('Failed to fetch and dump Kafka metadata', ex) LOGGER.error('Failed to fetch and dump Kafka metadata', ex)
slower(delay)
} }
} }
Thread.sleep(POLLER_DELAY) Thread.sleep(delay.value())
}
disconnectConsumers()
}
private void slower(Delay delay) {
if (delay.slower()) {
LOGGER.error('using ' + delay)
} }
} }
@SuppressWarnings(['CatchException']) @SuppressWarnings(['CatchException'])
void dumpMetadata() { private void dumpMetadata() {
LOGGER.debug('dumping meta-data') LOGGER.debug('dumping meta-data')
Object metadata = fetchMetadataForCurrentTopics() Object metadata = fetchMetadataForCurrentTopics()
@ -101,7 +114,7 @@ class KafkaPoller extends Thread {
* The 'metadata' is the expected return from * The 'metadata' is the expected return from
* kafka.client.ClientUtils.fetchTopicMetadata * kafka.client.ClientUtils.fetchTopicMetadata
*/ */
void withTopicsAndPartitions(Object metadata, Closure closure) { private void withTopicsAndPartitions(Object metadata, Closure closure) {
withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f -> withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f ->
withScalaCollection(f.partitionsMetadata).each { p -> withScalaCollection(f.partitionsMetadata).each { p ->
TopicPartition tp = new TopicPartition(f.topic, p.partitionId) TopicPartition tp = new TopicPartition(f.topic, p.partitionId)
@ -113,7 +126,7 @@ class KafkaPoller extends Thread {
/** /**
* Fetch the leader metadata and update our data structures * Fetch the leader metadata and update our data structures
*/ */
void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) { private void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) {
Integer leaderId = partitionMetadata.leader.get()?.id Integer leaderId = partitionMetadata.leader.get()?.id
Integer partitionId = partitionMetadata.partitionId Integer partitionId = partitionMetadata.partitionId
@ -122,7 +135,7 @@ class KafkaPoller extends Thread {
this.topicOffsetMap[tp] = offset this.topicOffsetMap[tp] = offset
} }
Long latestFromLeader(Integer leaderId, String topic, Integer partition) { private Long latestFromLeader(Integer leaderId, String topic, Integer partition) {
SimpleConsumer consumer = this.brokerConsumerMap[leaderId] SimpleConsumer consumer = this.brokerConsumerMap[leaderId]
/* If we don't have a proper SimpleConsumer instance (e.g. null) then /* If we don't have a proper SimpleConsumer instance (e.g. null) then
@ -137,24 +150,26 @@ class KafkaPoller extends Thread {
return consumer.earliestOrLatestOffset(topicAndPart, -1, 0) return consumer.earliestOrLatestOffset(topicAndPart, -1, 0)
} }
Iterable withScalaCollection(scala.collection.Iterable iter) { private Iterable withScalaCollection(scala.collection.Iterable iter) {
return JavaConversions.asJavaIterable(iter) return JavaConversions.asJavaIterable(iter)
} }
/** /**
* Blocking reconnect to the Kafka brokers * Blocking reconnect to the Kafka brokers
*/ */
void reconnect() { private void reconnect() {
disconnectConsumers() disconnectConsumers()
LOGGER.info('Creating SimpleConsumer connections for brokers') LOGGER.info('Creating SimpleConsumer connections for brokers')
this.brokers.each { Broker b -> synchronized(this.brokers) {
SimpleConsumer consumer = new SimpleConsumer(b.host, this.brokers.each { Broker b ->
b.port, SimpleConsumer consumer = new SimpleConsumer(b.host,
KAFKA_TIMEOUT, b.port,
KAFKA_BUFFER, KAFKA_TIMEOUT,
KAFKA_CLIENT_ID) KAFKA_BUFFER,
consumer.connect() KAFKA_CLIENT_ID)
this.brokerConsumerMap[b.id] = consumer consumer.connect()
this.brokerConsumerMap[b.id] = consumer
}
} }
this.shouldReconnect = false this.shouldReconnect = false
} }
@ -164,7 +179,6 @@ class KafkaPoller extends Thread {
*/ */
void die() { void die() {
this.keepRunning = false this.keepRunning = false
disconnectConsumers()
} }
private void disconnectConsumers() { private void disconnectConsumers() {
@ -178,8 +192,11 @@ class KafkaPoller extends Thread {
* Store a new list of KafkaBroker objects and signal a reconnection * Store a new list of KafkaBroker objects and signal a reconnection
*/ */
void refresh(List<KafkaBroker> brokers) { void refresh(List<KafkaBroker> brokers) {
this.brokers = brokers.collect { KafkaBroker b -> synchronized(this.brokers) {
new Broker(b.brokerId, b.host, b.port) this.brokers.clear()
this.brokers.addAll(brokers.collect { KafkaBroker b ->
new Broker(b.brokerId, b.host, b.port)
})
} }
this.shouldReconnect = true this.shouldReconnect = true
} }
@ -189,7 +206,9 @@ class KafkaPoller extends Thread {
* scala underpinnings * scala underpinnings
*/ */
private scala.collection.immutable.Seq getBrokersSeq() { private scala.collection.immutable.Seq getBrokersSeq() {
return JavaConversions.asScalaBuffer(this.brokers).toList() synchronized(this.brokers) {
return JavaConversions.asScalaBuffer(this.brokers).toList()
}
} }
/** /**

View File

@ -126,8 +126,9 @@ class Main {
/* Assuming that most people aren't needing to run Storm-based watchers /* Assuming that most people aren't needing to run Storm-based watchers
* as well * as well
*/ */
KafkaSpoutTreeWatcher stormWatcher = null
if (cli.hasOption('s')) { if (cli.hasOption('s')) {
KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, stormWatcher = new KafkaSpoutTreeWatcher(client,
watchedTopics, watchedTopics,
consumerOffsets) consumerOffsets)
stormWatcher.onConsumerData << gaugeRegistrar stormWatcher.onConsumerData << gaugeRegistrar
@ -155,6 +156,18 @@ class Main {
/* Start the reporter if we've got it */ /* Start the reporter if we've got it */
reporter?.start(delayInSeconds, TimeUnit.SECONDS) reporter?.start(delayInSeconds, TimeUnit.SECONDS)
// shutdown threads
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
Main.logger.info("showdown threads")
poller.die()
consumerWatcher.close()
if (stormWatcher != null) {
stormWatcher.close()
}
poller.join()
}
});
logger.info('Starting wait loop...') logger.info('Starting wait loop...')
synchronized(this) { synchronized(this) {
wait() wait()

View File

@ -47,5 +47,13 @@ abstract class AbstractTreeWatcher implements TreeCacheListener {
return this return this
} }
/**
* Close our internal cache and return ourselves for API cleanliness
*/
AbstractTreeWatcher close() {
this.cache?.close()
return this
}
abstract void childEvent(CuratorFramework client, TreeCacheEvent event) abstract void childEvent(CuratorFramework client, TreeCacheEvent event)
} }

View File

@ -28,7 +28,7 @@ class BrokerTreeWatcher extends AbstractTreeWatcher {
super(client) super(client)
this.json = new JsonSlurper() this.json = new JsonSlurper()
this.brokers = [] this.brokers = Collections.synchronizedList([])
this.onBrokerUpdates = [] this.onBrokerUpdates = []
} }
@ -47,9 +47,8 @@ class BrokerTreeWatcher extends AbstractTreeWatcher {
*/ */
if (event.type == TreeCacheEvent.Type.INITIALIZED) { if (event.type == TreeCacheEvent.Type.INITIALIZED) {
this.isTreeInitialized = true this.isTreeInitialized = true
List threadsafeBrokers = Collections.synchronizedList(this.brokers)
this.onBrokerUpdates.each { Closure c -> this.onBrokerUpdates.each { Closure c ->
c?.call(threadsafeBrokers) c?.call(this.brokers)
} }
return return
} }
@ -68,6 +67,12 @@ class BrokerTreeWatcher extends AbstractTreeWatcher {
Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8')) Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8'))
this.brokers << new KafkaBroker(brokerData, brokerId) this.brokers << new KafkaBroker(brokerData, brokerId)
if (this.isTreeInitialized) {
this.onBrokerUpdates.each { Closure c ->
c?.call(this.brokers)
}
}
} }
/** /**

View File

@ -0,0 +1,55 @@
package com.github.lookout.verspaetung
import spock.lang.*
class DelaySpec extends Specification {
Delay delay = new Delay()
def "it should give default on first use"() {
given:
expect:
delay.value() == Delay.POLLER_DELAY_MIN
}
def "slower has an upper bound"() {
given:
for(int i = 1; i < 20; i++) { delay.slower() }
def firstLast = delay.value()
def result = delay.slower()
def secondLast = delay.value()
expect:
firstLast == secondLast
firstLast == Delay.POLLER_DELAY_MAX
result == false
}
def "increasing delay gives true"() {
def result = true
for(int i = 1; delay.value() < Delay.POLLER_DELAY_MAX; i++) {
result = result && delay.slower()
}
def last = delay.slower()
expect:
result == true
last == false
}
def "reset on min value gives false"() {
given:
def result = delay.reset()
expect:
result == false
}
def "reset on none min value gives true"() {
given:
delay.slower()
def result = delay.reset()
expect:
result == true
}
}