Ignore "owners" and other subtrees from the Kafka High Level Consumer ZK space
References #2
This commit is contained in:
parent
699e141c22
commit
6d82735b3c
|
@ -20,6 +20,14 @@ class StandardTreeWatcher extends AbstractTreeWatcher {
|
|||
if (data == null) {
|
||||
return null
|
||||
}
|
||||
|
||||
/* There are non-offset related subtrees in /consumers that we don't
|
||||
* care about, let's just skip over them
|
||||
*/
|
||||
if (!isOffsetSubtree(data.path)) {
|
||||
return null
|
||||
}
|
||||
|
||||
/*
|
||||
ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473,8595174478,1416808804928,1416808805262,1,0,0,0,1,0,8595174473, data=[48]}
|
||||
*/
|
||||
|
@ -33,9 +41,22 @@ ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473
|
|||
|
||||
offset.groupName = pathParts[2]
|
||||
offset.topic = pathParts[4]
|
||||
offset.partition = new Integer(pathParts[5])
|
||||
offset.offset = new Integer(new String(data.data))
|
||||
|
||||
try {
|
||||
offset.partition = new Integer(pathParts[5])
|
||||
offset.offset = new Integer(new String(data.data))
|
||||
}
|
||||
catch (NumberFormatException ex) {
|
||||
logger.error("Failed to parse an Integer: ${data}")
|
||||
return null
|
||||
}
|
||||
|
||||
return offset
|
||||
}
|
||||
|
||||
|
||||
Boolean isOffsetSubtree(String path) {
|
||||
return (path =~ /\/consumers\/(.*)\/offsets\/(.*)/)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -37,4 +37,20 @@ class StandardTreeWatcherSpec extends Specification {
|
|||
offset.partition == 3
|
||||
offset.offset == 0
|
||||
}
|
||||
|
||||
def "isOffsetSubtree should return true for a valid subtree path"() {
|
||||
given:
|
||||
String path = '/consumers/offtopic-spock-test/offsets/topic/7'
|
||||
|
||||
expect:
|
||||
watcher.isOffsetSubtree(path) == true
|
||||
}
|
||||
|
||||
def "isOffsetSubtree should return false for a non-offset subtree path"() {
|
||||
given:
|
||||
String path = '/consumers/offtopic-1025413624/owners/spock-hostname/0'
|
||||
|
||||
expect:
|
||||
watcher.isOffsetSubtree(path) == false
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue