From 7a6ef71eef9f434a262d2888e6e0de56ef5a884f Mon Sep 17 00:00:00 2001 From: Christian Meier Date: Tue, 10 Nov 2015 16:53:27 +0100 Subject: [PATCH] refactor the BrokerTreeWatcher split things into smaller pieces and as preparation of having only ONE TreeWatcher running which itself dispatch the events to the interresting parties. Sponsored by Lookout Inc. --- .../reiseburo/verspaetung/KafkaPoller.groovy | 10 +- .../verspaetung/zk/BrokerManager.groovy | 73 ++++++++++++++ .../verspaetung/zk/BrokerTreeWatcher.groovy | 85 +++++++---------- .../reiseburo/verspaetung/zk/EventData.groovy | 42 ++++++++ .../verspaetung/zk/PojoFactory.groovy | 42 ++++++++ .../verspaetung/zk/BrokerManagerSpec.groovy | 95 +++++++++++++++++++ .../zk/BrokerTreeWatcherSpec.groovy | 29 ------ .../verspaetung/zk/EventDataSpec.groovy | 47 +++++++++ .../verspaetung/zk/PojoFactorySpec.groovy | 52 ++++++++++ 9 files changed, 389 insertions(+), 86 deletions(-) create mode 100644 src/main/groovy/com/github/reiseburo/verspaetung/zk/BrokerManager.groovy create mode 100644 src/main/groovy/com/github/reiseburo/verspaetung/zk/EventData.groovy create mode 100644 src/main/groovy/com/github/reiseburo/verspaetung/zk/PojoFactory.groovy create mode 100644 src/test/groovy/com/github/reiseburo/verspaetung/zk/BrokerManagerSpec.groovy delete mode 100644 src/test/groovy/com/github/reiseburo/verspaetung/zk/BrokerTreeWatcherSpec.groovy create mode 100644 src/test/groovy/com/github/reiseburo/verspaetung/zk/EventDataSpec.groovy create mode 100644 src/test/groovy/com/github/reiseburo/verspaetung/zk/PojoFactorySpec.groovy diff --git a/src/main/groovy/com/github/reiseburo/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/reiseburo/verspaetung/KafkaPoller.groovy index c49b6eb..18794f9 100644 --- a/src/main/groovy/com/github/reiseburo/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/reiseburo/verspaetung/KafkaPoller.groovy @@ -37,7 +37,7 @@ class KafkaPoller extends Thread { this.brokerConsumerMap = [:] this.brokers = [] this.onDelta = [] - setName("Verspaetung Kafka Poller") + setName('Verspaetung Kafka Poller') } /* There are a number of cases where we intentionally swallow stacktraces @@ -89,7 +89,7 @@ class KafkaPoller extends Thread { } } - @SuppressWarnings(['CatchException']) + @SuppressWarnings('CatchException') private void dumpMetadata() { LOGGER.debug('dumping meta-data') @@ -158,6 +158,7 @@ class KafkaPoller extends Thread { /** * Blocking reconnect to the Kafka brokers */ + @SuppressWarnings('CatchException') private void reconnect() { disconnectConsumers() LOGGER.info('Creating SimpleConsumer connections for brokers {}', this.brokers) @@ -172,7 +173,7 @@ class KafkaPoller extends Thread { consumer.connect() this.brokerConsumerMap[broker.id] = consumer } - catch(Exception e) { + catch (Exception e) { LOGGER.info('Error connecting cunsumer to {}', broker, e) } } @@ -187,13 +188,14 @@ class KafkaPoller extends Thread { this.keepRunning = false } + @SuppressWarnings('CatchException') private void disconnectConsumers() { this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client -> LOGGER.info('Disconnecting {}', client) try { client?.disconnect() } - catch(Exception e) { + catch (Exception e) { LOGGER.info('Error disconnecting {}', client, e) } } diff --git a/src/main/groovy/com/github/reiseburo/verspaetung/zk/BrokerManager.groovy b/src/main/groovy/com/github/reiseburo/verspaetung/zk/BrokerManager.groovy new file mode 100644 index 0000000..742e385 --- /dev/null +++ b/src/main/groovy/com/github/reiseburo/verspaetung/zk/BrokerManager.groovy @@ -0,0 +1,73 @@ +package com.github.reiseburo.verspaetung.zk + +import com.github.reiseburo.verspaetung.KafkaBroker + +import groovy.transform.TypeChecked + +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +/** + * manage a list of brokers. can be online or offline. offline means the + * internal list is hidden, i.e. the list() gives you an empty list. + */ +@TypeChecked +class BrokerManager { + + private static final Logger logger = LoggerFactory.getLogger(BrokerManager) + + private final List brokers = Collections.synchronizedList([]) + + // we start with being offline + private boolean offline = true + + void add(KafkaBroker broker) { + synchronized(brokers) { + if (broker != null && this.brokers.indexOf(broker) == -1) { + this.brokers.add(broker) + logger.info('broker added: {}', broker) + } + } + } + + void update(KafkaBroker broker) { + synchronized(brokers) { + if (broker == null) { + return + } + if (this.brokers.indexOf(broker) != -1) { + this.brokers.remove(broker) + } + this.brokers.add(broker) + logger.info('broker updated: {}', broker) + } + } + + void remove(KafkaBroker broker) { + synchronized(brokers) { + if (broker != null && this.brokers.remove(broker)) { + logger.info('broker removed: {}', broker) + } + } + } + + // TODO not sure if this is correct - see BrokerTreeWatcher + @SuppressWarnings('ConfusingMethodName') + void offline() { + this.offline = true + } + + // TODO not sure if this is correct - see BrokerTreeWatcher + void online() { + this.offline = false + } + + Collection list() { + if (this.offline) { + [] + } + else { + this.brokers + } + } +} diff --git a/src/main/groovy/com/github/reiseburo/verspaetung/zk/BrokerTreeWatcher.groovy b/src/main/groovy/com/github/reiseburo/verspaetung/zk/BrokerTreeWatcher.groovy index d6759e2..ca7af92 100644 --- a/src/main/groovy/com/github/reiseburo/verspaetung/zk/BrokerTreeWatcher.groovy +++ b/src/main/groovy/com/github/reiseburo/verspaetung/zk/BrokerTreeWatcher.groovy @@ -6,7 +6,6 @@ import groovy.json.JsonSlurper import groovy.transform.TypeChecked import org.apache.curator.framework.CuratorFramework -import org.apache.curator.framework.recipes.cache.ChildData import org.apache.curator.framework.recipes.cache.TreeCacheEvent /** @@ -16,19 +15,17 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent */ @TypeChecked class BrokerTreeWatcher extends AbstractTreeWatcher { - static final Integer INVALID_BROKER_ID = -1 private static final String BROKERS_PATH = '/brokers/ids' - private Boolean isTreeInitialized = false - private final JsonSlurper json private final List onBrokerUpdates - private final List brokers + private final BrokerManager manager + private final PojoFactory factory BrokerTreeWatcher(CuratorFramework client) { super(client) - this.json = new JsonSlurper() - this.brokers = Collections.synchronizedList([]) + this.factory = new PojoFactory(new JsonSlurper()) + this.manager = new BrokerManager() this.onBrokerUpdates = [] } @@ -37,57 +34,39 @@ class BrokerTreeWatcher extends AbstractTreeWatcher { } /** - * Process events like NODE_ADDED and NODE_REMOVED to keep an up to date - * list of brokers + * Process events to keep an up to date list of brokers */ @Override void childEvent(CuratorFramework client, TreeCacheEvent event) { - /* If we're initialized that means we should have all our brokers in - * our internal list already and we can fire an event - */ - if (event.type == TreeCacheEvent.Type.INITIALIZED) { - this.isTreeInitialized = true - this.onBrokerUpdates.each { Closure c -> - c?.call(this.brokers) - } - return + switch (event.type) { + case TreeCacheEvent.Type.INITIALIZED: + this.manager.online() + break + case TreeCacheEvent.Type.NODE_ADDED: + KafkaBroker broker = this.factory.createKafkaBroker(event) + this.manager.add(broker) + break + case TreeCacheEvent.Type.NODE_UPDATED: + KafkaBroker broker = this.factory.createKafkaBroker(event) + this.manager.update(broker) + break + case TreeCacheEvent.Type.NODE_REMOVED: + KafkaBroker broker = this.factory.createKafkaBroker(event) + this.manager.remove(broker) + break + // TODO these 3 events might come with path which can be mapped + // to a specific broker + case TreeCacheEvent.Type.CONNECTION_LOST: + case TreeCacheEvent.Type.CONNECTION_SUSPENDED: + this.manager.offline() + break + case TreeCacheEvent.Type.CONNECTION_RECONNECTED: + this.manager.online() + break } - if (event.type != TreeCacheEvent.Type.NODE_ADDED) { - return + this.onBrokerUpdates.each { Closure c -> + c?.call(this.manager.list()) } - - ChildData nodeData = event.data - Integer brokerId = brokerIdFromPath(nodeData.path) - - if (brokerId == INVALID_BROKER_ID) { - return - } - - Object brokerData = json.parseText(new String(nodeData.data, 'UTF-8')) - - this.brokers << new KafkaBroker(brokerData, brokerId) - - if (this.isTreeInitialized) { - this.onBrokerUpdates.each { Closure c -> - c?.call(this.brokers) - } - } - } - - /** - * Process a path string from Zookeeper for the Kafka broker's ID - * - * We're expecting paths like: /brokers/ids/1231524312 - */ - Integer brokerIdFromPath(String path) { - List pathParts = path?.split('/') as List - - if ((pathParts == null) || - (pathParts.size() != 4)) { - return INVALID_BROKER_ID - } - - return new Integer(pathParts[-1]) } } diff --git a/src/main/groovy/com/github/reiseburo/verspaetung/zk/EventData.groovy b/src/main/groovy/com/github/reiseburo/verspaetung/zk/EventData.groovy new file mode 100644 index 0000000..05062c6 --- /dev/null +++ b/src/main/groovy/com/github/reiseburo/verspaetung/zk/EventData.groovy @@ -0,0 +1,42 @@ +package com.github.reiseburo.verspaetung.zk + +import groovy.transform.TypeChecked + +import org.apache.curator.framework.recipes.cache.ChildData +import org.apache.curator.framework.recipes.cache.TreeCacheEvent + +/** + * parsing TreeCacheEvents and provide helper method to access its data in typed manner + */ +@TypeChecked +class EventData { + + private final String data + private final List pathParts + + EventData(TreeCacheEvent event) { + ChildData data = event.data + this.pathParts = data.path == null ? null : (data.path.split('/') as List) + this.data = data.data == null ? null : new String(data.data, 'UTF-8') + } + + Integer asInteger() { + new Integer(data) + } + + String asString() { + data + } + + Integer pathPartsSize() { + pathParts.size() + } + + Integer getPathPartAsInteger(int pos) { + if (pathParts.size() <= pos) { + return null + } + new Integer(pathParts[pos]) + } +} + diff --git a/src/main/groovy/com/github/reiseburo/verspaetung/zk/PojoFactory.groovy b/src/main/groovy/com/github/reiseburo/verspaetung/zk/PojoFactory.groovy new file mode 100644 index 0000000..d09ebd2 --- /dev/null +++ b/src/main/groovy/com/github/reiseburo/verspaetung/zk/PojoFactory.groovy @@ -0,0 +1,42 @@ +package com.github.reiseburo.verspaetung.zk + +import com.github.reiseburo.verspaetung.KafkaBroker + +import groovy.json.JsonSlurper + +import org.apache.curator.framework.recipes.cache.TreeCacheEvent + +/** + * factory creating Pojo's from TreeCacheEvents + */ +class PojoFactory { + + private final JsonSlurper json + + PojoFactory(JsonSlurper json) { + this.json = json + } + + /** + * converts an treeCacheEvent into a KafkaBroker. the caller + * is responsible for calling the factory method with the + * right data. i.e. path starts with /brokers/ids. the implementation + * just uses whatever it is given to create a KafkaBroker object + */ + KafkaBroker createKafkaBroker(TreeCacheEvent event) { + EventData data = new EventData(event) + // We're expecting paths like: /brokers/ids/1231524312 + Integer id = data.getPathPartAsInteger(3) + if (id == null) { + return + } + String json = data.asString() + if (json == null) { + new KafkaBroker('', 0, id) + } + else { + Object payload = this.json.parseText(json) + new KafkaBroker(payload.host, payload.port, id) + } + } +} diff --git a/src/test/groovy/com/github/reiseburo/verspaetung/zk/BrokerManagerSpec.groovy b/src/test/groovy/com/github/reiseburo/verspaetung/zk/BrokerManagerSpec.groovy new file mode 100644 index 0000000..aefb518 --- /dev/null +++ b/src/test/groovy/com/github/reiseburo/verspaetung/zk/BrokerManagerSpec.groovy @@ -0,0 +1,95 @@ +package com.github.reiseburo.verspaetung.zk + +import spock.lang.* + +import groovy.json.JsonSlurper + +import com.github.reiseburo.verspaetung.KafkaBroker + +import org.apache.curator.framework.recipes.cache.ChildData +import org.apache.curator.framework.recipes.cache.TreeCacheEvent + +class BrokerManagerSpec extends Specification { + BrokerManager manager = new BrokerManager() + + def "new instance is offline, i.e. has empty list of brokers"() { + given: + KafkaBroker broker = new KafkaBroker('', 0, 1) + + expect: + manager.list().size() == 0 + } + + def "new instance is offline after add broker"() { + given: + KafkaBroker broker = new KafkaBroker('', 0, 1) + manager.add(broker) + + expect: + manager.list().size() == 0 + } + + def "list shows brokers after getting online"() { + given: + KafkaBroker broker = new KafkaBroker('', 0, 1) + KafkaBroker broker2 = new KafkaBroker('', 0, 123) + manager.add(broker) + manager.online() + manager.remove(broker2) + + expect: + manager.list().size() == 1 + } + + def "can remove brokers based on its id"() { + given: + KafkaBroker broker = new KafkaBroker('', 0, 1) + KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1) + manager.add(broker) + manager.online() + manager.remove(broker2) + + expect: + manager.list().size() == 0 + } + + def "can update brokers"() { + given: + KafkaBroker broker = new KafkaBroker('', 0, 1) + KafkaBroker broker2 = new KafkaBroker('localhost', 0, 1) + manager.online() + manager.add(broker) + manager.update(broker2) + + expect: + manager.list().size() == 1 + manager.list().first().host == 'localhost' + } + + def "can go offline anytime"() { + given: + KafkaBroker broker = new KafkaBroker('', 0, 1) + KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12) + manager.online() + manager.add(broker) + manager.add(broker2) + manager.offline() + + expect: + manager.list().size() == 0 + } + + def "can go offline and online again"() { + given: + KafkaBroker broker = new KafkaBroker('', 0, 1) + KafkaBroker broker2 = new KafkaBroker('localhost', 0, 12) + manager.online() + manager.add(broker) + manager.add(broker2) + manager.offline() + manager.online() + + expect: + manager.list().size() == 2 + } +} diff --git a/src/test/groovy/com/github/reiseburo/verspaetung/zk/BrokerTreeWatcherSpec.groovy b/src/test/groovy/com/github/reiseburo/verspaetung/zk/BrokerTreeWatcherSpec.groovy deleted file mode 100644 index 2e9c6eb..0000000 --- a/src/test/groovy/com/github/reiseburo/verspaetung/zk/BrokerTreeWatcherSpec.groovy +++ /dev/null @@ -1,29 +0,0 @@ -package com.github.reiseburo.verspaetung.zk - -import spock.lang.* - -class BrokerTreeWatcherSpec extends Specification { - BrokerTreeWatcher watcher - - def setup() { - this.watcher = new BrokerTreeWatcher() - } - - def "brokerIdFromPath() should return the right ID with a valid path"() { - given: - String path = "/brokers/ids/1337" - - expect: - watcher.brokerIdFromPath(path) == 1337 - } - - def "brokerIdFromPath() should return -1 on null paths"() { - expect: - watcher.brokerIdFromPath(null) == -1 - } - - def "brokerIdFromPath() should return -1 on empty/invalid paths"() { - expect: - watcher.brokerIdFromPath('/spock/ed') == -1 - } -} diff --git a/src/test/groovy/com/github/reiseburo/verspaetung/zk/EventDataSpec.groovy b/src/test/groovy/com/github/reiseburo/verspaetung/zk/EventDataSpec.groovy new file mode 100644 index 0000000..176dc32 --- /dev/null +++ b/src/test/groovy/com/github/reiseburo/verspaetung/zk/EventDataSpec.groovy @@ -0,0 +1,47 @@ +package com.github.reiseburo.verspaetung.zk + +import spock.lang.* +import org.apache.curator.framework.recipes.cache.ChildData +import org.apache.curator.framework.recipes.cache.TreeCacheEvent + +class EventDataSpec extends Specification { + EventData data + + def "converts TreeCacheEvents to EventData"() { + given: + String path = "/brokers/ids/1337" + data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED, + new ChildData(path, null, "be happy".bytes))) + + expect: + data.getPathPartAsInteger(3) == 1337 + data.asString() == 'be happy' + data.pathPartsSize() == 4 + data.getPathPartAsInteger(4) == null + } + + def "converts TreeCacheEvents to EventData with integer payload and no path"() { + given: + String path = "/brokers/ids/1337" + data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED, + new ChildData("/", null, "42".bytes))) + + expect: + data.asString() == '42' + data.asInteger() == 42 + data.pathPartsSize() == 0 + data.getPathPartAsInteger(0) == null + } + + def "converts TreeCacheEvents to EventData with no payload"() { + given: + String path = "/brokers/ids/1337" + data = new EventData(new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED, + new ChildData("/123", null, null))) + + expect: + data.asString() == null + data.pathPartsSize() == 2 + data.getPathPartAsInteger(1) == 123 + } +} diff --git a/src/test/groovy/com/github/reiseburo/verspaetung/zk/PojoFactorySpec.groovy b/src/test/groovy/com/github/reiseburo/verspaetung/zk/PojoFactorySpec.groovy new file mode 100644 index 0000000..19dbaeb --- /dev/null +++ b/src/test/groovy/com/github/reiseburo/verspaetung/zk/PojoFactorySpec.groovy @@ -0,0 +1,52 @@ +package com.github.reiseburo.verspaetung.zk + +import spock.lang.* + +import groovy.json.JsonSlurper + +import com.github.reiseburo.verspaetung.KafkaBroker + +import org.apache.curator.framework.recipes.cache.ChildData +import org.apache.curator.framework.recipes.cache.TreeCacheEvent + +class PojoFactorySpec extends Specification { + PojoFactory factory = new PojoFactory(new JsonSlurper()) + + def "create KafkaBroker from TreeCacheEvent"() { + given: + String path = "/brokers/ids/1337" + String payload = "{\"host\":\"localhost\",\"port\":9092}" + TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED, + new ChildData(path, null, payload.bytes)) + KafkaBroker broker = factory.createKafkaBroker(event) + + expect: + broker.brokerId == 1337 + broker.host == 'localhost' + broker.port == 9092 + } + + def "create KafkaBroker from TreeCacheEvent without brokerId on path"() { + given: + String path = "/brokers/ids" + TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED, + new ChildData(path, null, ''.bytes)) + KafkaBroker broker = factory.createKafkaBroker(event) + + expect: + broker == null + } + + def "create KafkaBroker from TreeCacheEvent without payload"() { + given: + String path = "/brokers/ids/123" + TreeCacheEvent event = new TreeCacheEvent(TreeCacheEvent.Type.INITIALIZED, + new ChildData(path, null, null)) + KafkaBroker broker = factory.createKafkaBroker(event) + + expect: + broker.brokerId == 123 + broker.host == '' + broker.port == 0 + } +}