diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy index 21d04bb..1865823 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy @@ -35,7 +35,7 @@ class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher { [broker:[host:REDACTED, port:6667], offset:179, partition:7, topic:device_data, topology:[id:01c0d1fc-e956-4b35-9891-dd835488cf45, name:unwrap_topology]] */ ConsumerOffset offset = new ConsumerOffset() - offset.groupName = offsetData.topology.name + offset.groupName = consumerNameFromPath(nodeData.path) offset.topic = offsetData.topic offset.partition = offsetData.partition offset.offset = offsetData.offset @@ -50,4 +50,14 @@ class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher { Boolean isOffsetSubtree(String path) { return (path =~ /\/kafka_spout\/(.*)\/partition_(\d+)/) } + + /** + * Extract the given name for a KafkaSpout consumer based on the path in + * Zookeeper + */ + String consumerNameFromPath(String path) { + List pieces = path.split(/\//) as List + + return pieces[2] + } } diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy index 960b4a5..a5f6d99 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy @@ -14,6 +14,14 @@ class KafkaSpoutTreeWatcherSpec extends Specification { this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, [:]) } + def "consumerNameFromPath() should give the right name for a valid path"() { + given: + String path = '/kafka_spout/spock-topology/partition_1' + + expect: + watcher.consumerNameFromPath(path) == 'spock-topology' + } + def "isOffsetSubtree should return true for a valid subtree path"() { given: String path = '/kafka_spout/spock-topology/partition_1'