From 8995351bbaa72ee44e52804c650dc0f57454faa5 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Wed, 28 Jan 2015 03:05:30 -0800 Subject: [PATCH] Parse the consumer group from the KafkaSpout ZK path instead of using the JSON The JSON in the Znode is actually the name of the topology, not necessarily the name of the consumer group used by a KafkSpout Fixes #9 --- .../verspaetung/zk/KafkaSpoutTreeWatcher.groovy | 12 +++++++++++- .../verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy | 8 ++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy index 21d04bb..1865823 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy @@ -35,7 +35,7 @@ class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher { [broker:[host:REDACTED, port:6667], offset:179, partition:7, topic:device_data, topology:[id:01c0d1fc-e956-4b35-9891-dd835488cf45, name:unwrap_topology]] */ ConsumerOffset offset = new ConsumerOffset() - offset.groupName = offsetData.topology.name + offset.groupName = consumerNameFromPath(nodeData.path) offset.topic = offsetData.topic offset.partition = offsetData.partition offset.offset = offsetData.offset @@ -50,4 +50,14 @@ class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher { Boolean isOffsetSubtree(String path) { return (path =~ /\/kafka_spout\/(.*)\/partition_(\d+)/) } + + /** + * Extract the given name for a KafkaSpout consumer based on the path in + * Zookeeper + */ + String consumerNameFromPath(String path) { + List pieces = path.split(/\//) as List + + return pieces[2] + } } diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy index 960b4a5..a5f6d99 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy @@ -14,6 +14,14 @@ class KafkaSpoutTreeWatcherSpec extends Specification { this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, [:]) } + def "consumerNameFromPath() should give the right name for a valid path"() { + given: + String path = '/kafka_spout/spock-topology/partition_1' + + expect: + watcher.consumerNameFromPath(path) == 'spock-topology' + } + def "isOffsetSubtree should return true for a valid subtree path"() { given: String path = '/kafka_spout/spock-topology/partition_1'