diff --git a/build.gradle b/build.gradle index eef237b..9b10b00 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ apply plugin: 'application' group = "com.github.lookout" description = "A utility for monitoring the delay of Kafka consumers" -version = '0.1.7' +version = '0.1.8' mainClassName = 'com.github.lookout.verspaetung.Main' defaultTasks 'clean', 'check' sourceCompatibility = '1.7' diff --git a/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy b/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy index d6a590f..2abf1af 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy @@ -39,7 +39,15 @@ class ConsumerGauge implements Gauge, Tagged { (!this.topics.containsKey(topicPartition))) { return 0 } - return ((Integer)this.topics[topicPartition]) - this.consumers[consumer] + + /* + * Returning the maximum value of the computation and zero, there are + * some cases where we can be "behind" on the Kafka latest offset + * polling and this could result in an erroneous negative value. See: + * for more details + */ + return Math.max(0, + ((Integer)this.topics[topicPartition]) - this.consumers[consumer]) } @Override diff --git a/src/test/groovy/com/github/lookout/verspaetung/metrics/ConsumerGaugeSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/metrics/ConsumerGaugeSpec.groovy index bf7300a..b1371aa 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/metrics/ConsumerGaugeSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/metrics/ConsumerGaugeSpec.groovy @@ -35,7 +35,17 @@ class ConsumerGaugeSpec extends Specification { def "getValue() should return zero for a consumer not in the map"() { given: - ConsumerGauge gauge = new ConsumerGauge(consumer, [:], [:]) + ConsumerGauge gauge = new ConsumerGauge(this.consumer, [:], [:]) + + expect: + gauge.value == 0 + } + + def "getValue() should return zero instead of a negative number"() { + given: + ConsumerGauge gauge = new ConsumerGauge(this.consumer, + [(this.consumer) : 10], + [(this.tp) : 5]) expect: gauge.value == 0