Add the --exclude CLI option for filtering out unwanted consumer groups
Fixes #34
This commit is contained in:
parent
921ef7be64
commit
3778a24044
|
@ -38,6 +38,7 @@ class Main {
|
||||||
String statsdHost = 'localhost'
|
String statsdHost = 'localhost'
|
||||||
Integer statsdPort = 8125
|
Integer statsdPort = 8125
|
||||||
Integer delayInSeconds = 5
|
Integer delayInSeconds = 5
|
||||||
|
String[] excludeGroups = []
|
||||||
|
|
||||||
CommandLine cli = parseCommandLine(args)
|
CommandLine cli = parseCommandLine(args)
|
||||||
|
|
||||||
|
@ -57,6 +58,10 @@ class Main {
|
||||||
delayInSeconds = cli.getOptionValue('d').toInteger()
|
delayInSeconds = cli.getOptionValue('d').toInteger()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (cli.hasOption('x')) {
|
||||||
|
excludeGroups = cli.getOptionValues('x')
|
||||||
|
}
|
||||||
|
|
||||||
logger.info("Running with: ${args}")
|
logger.info("Running with: ${args}")
|
||||||
logger.warn('Using: zookeepers={} statsd={}:{}', zookeeperHosts, statsdHost, statsdPort)
|
logger.warn('Using: zookeepers={} statsd={}:{}', zookeeperHosts, statsdHost, statsdPort)
|
||||||
logger.info('Reporting every {} seconds', delayInSeconds)
|
logger.info('Reporting every {} seconds', delayInSeconds)
|
||||||
|
@ -106,7 +111,9 @@ class Main {
|
||||||
* one
|
* one
|
||||||
*/
|
*/
|
||||||
Closure gaugeRegistrar = { KafkaConsumer consumer ->
|
Closure gaugeRegistrar = { KafkaConsumer consumer ->
|
||||||
registerMetricFor(consumer, consumerGauges, consumerOffsets, topicOffsets)
|
if (!shouldExcludeConsumer(excludeGroups, consumer)) {
|
||||||
|
registerMetricFor(consumer, consumerGauges, consumerOffsets, topicOffsets)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client,
|
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client,
|
||||||
|
@ -158,6 +165,9 @@ class Main {
|
||||||
ConcurrentHashMap<KafkaConsumer, ConsumerGauge> consumerGauges,
|
ConcurrentHashMap<KafkaConsumer, ConsumerGauge> consumerGauges,
|
||||||
ConcurrentHashMap<KafkaConsumer, Integer> consumerOffsets,
|
ConcurrentHashMap<KafkaConsumer, Integer> consumerOffsets,
|
||||||
ConcurrentHashMap<TopicPartition, Long> topicOffsets) {
|
ConcurrentHashMap<TopicPartition, Long> topicOffsets) {
|
||||||
|
/*
|
||||||
|
* Bail early if we already ahve our Consumer registered
|
||||||
|
*/
|
||||||
if (consumerGauges.containsKey(consumer)) {
|
if (consumerGauges.containsKey(consumer)) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -183,6 +193,12 @@ class Main {
|
||||||
.withValueSeparator(',' as char)
|
.withValueSeparator(',' as char)
|
||||||
.create('z')
|
.create('z')
|
||||||
|
|
||||||
|
Option excludeGroups = OptionBuilder.withArgName('EXCLUDES')
|
||||||
|
.hasArgs()
|
||||||
|
.withDescription('Regular expression for consumer groups to exclude from reporting (can be declared multiple times)')
|
||||||
|
.withLongOpt('exclude')
|
||||||
|
.create('x')
|
||||||
|
|
||||||
Option statsdHost = OptionBuilder.withArgName('STATSD')
|
Option statsdHost = OptionBuilder.withArgName('STATSD')
|
||||||
.hasArg()
|
.hasArg()
|
||||||
.withType(String)
|
.withType(String)
|
||||||
|
@ -226,6 +242,7 @@ class Main {
|
||||||
options.addOption(dryRun)
|
options.addOption(dryRun)
|
||||||
options.addOption(stormSpouts)
|
options.addOption(stormSpouts)
|
||||||
options.addOption(delaySeconds)
|
options.addOption(delaySeconds)
|
||||||
|
options.addOption(excludeGroups)
|
||||||
|
|
||||||
return options
|
return options
|
||||||
}
|
}
|
||||||
|
@ -249,4 +266,12 @@ class Main {
|
||||||
System.exit(1)
|
System.exit(1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return true if we should exclude the given KafkaConsumer from reporting
|
||||||
|
*/
|
||||||
|
|
||||||
|
static boolean shouldExcludeConsumer(String[] excludeGroups, KafkaConsumer consumer) {
|
||||||
|
return null != excludeGroups?.find { String excludeRule -> consumer?.name.matches(excludeRule) }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
package com.github.lookout.verspaetung
|
||||||
|
|
||||||
|
|
||||||
|
import spock.lang.*
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
class MainSpec extends Specification {
|
||||||
|
def "shouldExcludeGroups() shuold return false by default"() {
|
||||||
|
expect:
|
||||||
|
Main.shouldExcludeConsumer(new String[0], null) == false
|
||||||
|
}
|
||||||
|
|
||||||
|
def "shouldExcludeGroups() with a matching exclude should return true"() {
|
||||||
|
given:
|
||||||
|
final String[] groups = ['foo'] as String[]
|
||||||
|
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'foo')
|
||||||
|
|
||||||
|
expect:
|
||||||
|
Main.shouldExcludeConsumer(groups, consumer)
|
||||||
|
}
|
||||||
|
|
||||||
|
def "shouldExcludeGroups() with a matching regex should return true"() {
|
||||||
|
given:
|
||||||
|
final String[] groups = ['.*foo'] as String[]
|
||||||
|
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'spockfoo')
|
||||||
|
|
||||||
|
expect:
|
||||||
|
Main.shouldExcludeConsumer(groups, consumer)
|
||||||
|
}
|
||||||
|
|
||||||
|
def "shouldExcludeGroups() with multiple regexes that don't match should return false"() {
|
||||||
|
given:
|
||||||
|
final String[] groups = ['.*foo', 'bar(.*)'] as String[]
|
||||||
|
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'spock')
|
||||||
|
|
||||||
|
expect:
|
||||||
|
Main.shouldExcludeConsumer(groups, consumer) == false
|
||||||
|
}
|
||||||
|
|
||||||
|
def "shouldExcludeGroups() with multiple regexes which match should return true"() {
|
||||||
|
given:
|
||||||
|
final String[] groups = ['.*foo', 'bar(.*)'] as String[]
|
||||||
|
final KafkaConsumer consumer = new KafkaConsumer('spock-topic', 0, 'barstool')
|
||||||
|
|
||||||
|
expect:
|
||||||
|
Main.shouldExcludeConsumer(groups, consumer)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue