Parse the consumer group from the KafkaSpout ZK path instead of using the JSON
The JSON in the Znode is actually the name of the topology, not necessarily the name of the consumer group used by a KafkSpout Fixes #9
This commit is contained in:
parent
d0c99b9a34
commit
8995351bba
|
@ -35,7 +35,7 @@ class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher {
|
|||
[broker:[host:REDACTED, port:6667], offset:179, partition:7, topic:device_data, topology:[id:01c0d1fc-e956-4b35-9891-dd835488cf45, name:unwrap_topology]]
|
||||
*/
|
||||
ConsumerOffset offset = new ConsumerOffset()
|
||||
offset.groupName = offsetData.topology.name
|
||||
offset.groupName = consumerNameFromPath(nodeData.path)
|
||||
offset.topic = offsetData.topic
|
||||
offset.partition = offsetData.partition
|
||||
offset.offset = offsetData.offset
|
||||
|
@ -50,4 +50,14 @@ class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher {
|
|||
Boolean isOffsetSubtree(String path) {
|
||||
return (path =~ /\/kafka_spout\/(.*)\/partition_(\d+)/)
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the given name for a KafkaSpout consumer based on the path in
|
||||
* Zookeeper
|
||||
*/
|
||||
String consumerNameFromPath(String path) {
|
||||
List<String> pieces = path.split(/\//) as List<String>
|
||||
|
||||
return pieces[2]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,14 @@ class KafkaSpoutTreeWatcherSpec extends Specification {
|
|||
this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, [:])
|
||||
}
|
||||
|
||||
def "consumerNameFromPath() should give the right name for a valid path"() {
|
||||
given:
|
||||
String path = '/kafka_spout/spock-topology/partition_1'
|
||||
|
||||
expect:
|
||||
watcher.consumerNameFromPath(path) == 'spock-topology'
|
||||
}
|
||||
|
||||
def "isOffsetSubtree should return true for a valid subtree path"() {
|
||||
given:
|
||||
String path = '/kafka_spout/spock-topology/partition_1'
|
||||
|
|
Loading…
Reference in New Issue