make the connection or disconnection of consumers failsafe

i.e. ignore errors on disconnect and do not register consumers which
fail to connect
This commit is contained in:
Christian Meier 2015-11-10 16:46:29 +01:00
parent 66c0c332bb
commit 618aaf915e
1 changed files with 17 additions and 6 deletions

View File

@ -37,6 +37,7 @@ class KafkaPoller extends Thread {
this.brokerConsumerMap = [:] this.brokerConsumerMap = [:]
this.brokers = [] this.brokers = []
this.onDelta = [] this.onDelta = []
setName("Verspaetung Kafka Poller")
} }
/* There are a number of cases where we intentionally swallow stacktraces /* There are a number of cases where we intentionally swallow stacktraces
@ -161,14 +162,19 @@ class KafkaPoller extends Thread {
disconnectConsumers() disconnectConsumers()
LOGGER.info('Creating SimpleConsumer connections for brokers {}', this.brokers) LOGGER.info('Creating SimpleConsumer connections for brokers {}', this.brokers)
synchronized(this.brokers) { synchronized(this.brokers) {
this.brokers.each { Broker b -> this.brokers.each { Broker broker ->
SimpleConsumer consumer = new SimpleConsumer(b.host, SimpleConsumer consumer = new SimpleConsumer(broker.host,
b.port, broker.port,
KAFKA_TIMEOUT, KAFKA_TIMEOUT,
KAFKA_BUFFER, KAFKA_BUFFER,
KAFKA_CLIENT_ID) KAFKA_CLIENT_ID)
consumer.connect() try {
this.brokerConsumerMap[b.id] = consumer consumer.connect()
this.brokerConsumerMap[broker.id] = consumer
}
catch(Exception e) {
LOGGER.info('Error connecting cunsumer to {}', broker, e)
}
} }
} }
this.shouldReconnect = false this.shouldReconnect = false
@ -184,7 +190,12 @@ class KafkaPoller extends Thread {
private void disconnectConsumers() { private void disconnectConsumers() {
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client -> this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
LOGGER.info('Disconnecting {}', client) LOGGER.info('Disconnecting {}', client)
client?.disconnect() try {
client?.disconnect()
}
catch(Exception e) {
LOGGER.info('Error disconnecting {}', client, e)
}
} }
} }