diff --git a/src/main/groovy/com/github/reiseburo/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/reiseburo/verspaetung/KafkaPoller.groovy index 5030826..c49b6eb 100644 --- a/src/main/groovy/com/github/reiseburo/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/reiseburo/verspaetung/KafkaPoller.groovy @@ -37,6 +37,7 @@ class KafkaPoller extends Thread { this.brokerConsumerMap = [:] this.brokers = [] this.onDelta = [] + setName("Verspaetung Kafka Poller") } /* There are a number of cases where we intentionally swallow stacktraces @@ -161,14 +162,19 @@ class KafkaPoller extends Thread { disconnectConsumers() LOGGER.info('Creating SimpleConsumer connections for brokers {}', this.brokers) synchronized(this.brokers) { - this.brokers.each { Broker b -> - SimpleConsumer consumer = new SimpleConsumer(b.host, - b.port, + this.brokers.each { Broker broker -> + SimpleConsumer consumer = new SimpleConsumer(broker.host, + broker.port, KAFKA_TIMEOUT, KAFKA_BUFFER, KAFKA_CLIENT_ID) - consumer.connect() - this.brokerConsumerMap[b.id] = consumer + try { + consumer.connect() + this.brokerConsumerMap[broker.id] = consumer + } + catch(Exception e) { + LOGGER.info('Error connecting cunsumer to {}', broker, e) + } } } this.shouldReconnect = false @@ -184,7 +190,12 @@ class KafkaPoller extends Thread { private void disconnectConsumers() { this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client -> LOGGER.info('Disconnecting {}', client) - client?.disconnect() + try { + client?.disconnect() + } + catch(Exception e) { + LOGGER.info('Error disconnecting {}', client, e) + } } }