diff --git a/build.gradle b/build.gradle index bda8333..6db8f69 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ apply plugin: 'application' group = "com.github.lookout" description = "A utility for monitoring the delay of Kafka consumers" -version = '0.1.5' +version = '0.1.6' mainClassName = 'com.github.lookout.verspaetung.Main' defaultTasks 'clean', 'check' sourceCompatibility = '1.7' @@ -35,6 +35,7 @@ repositories { jcenter() maven { url 'https://dl.bintray.com/rtyler/maven' } + maven { url 'https://dl.bintray.com/lookout/systems' } } dependencies { @@ -55,7 +56,11 @@ dependencies { /* Needed for command line options parsing */ compile 'commons-cli:commons-cli:1.2+' - compile 'com.timgroup:java-statsd-client:3.1.2+' + compile 'com.github.lookout:metrics-datadog:0.1.3' + + ['metrics-core', 'metrics-graphite'].each { artifactName -> + compile "io.dropwizard.metrics:${artifactName}:3.1.0" + } /* Logback is to be used for logging through the app */ compile 'ch.qos.logback:logback-classic:1.1.2+' diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaBroker.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaBroker.groovy index c6ea3f9..c22434e 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaBroker.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaBroker.groovy @@ -8,7 +8,7 @@ class KafkaBroker { private Integer port private Integer brokerId - public KafkaBroker(Object jsonObject, Integer brokerId) { + KafkaBroker(Object jsonObject, Integer brokerId) { this.host = jsonObject.host this.port = jsonObject.port this.brokerId = brokerId diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaConsumer.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaConsumer.groovy new file mode 100644 index 0000000..a7db815 --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaConsumer.groovy @@ -0,0 +1,51 @@ +package com.github.lookout.verspaetung + +/** + * POJO containing the necessary information to model a Kafka consumers + */ +class KafkaConsumer { + String topic + Integer partition + String name + + KafkaConsumer(String topic, Integer partition, String name) { + this.topic = topic + this.partition = partition + this.name = name + } + + @Override + String toString() { + return "KafkaConsumer<${topic}:${partition} - ${name}>" + } + + @Override + int hashCode() { + return Objects.hash(this.topic, this.partition, this.name) + } + + /** + * Return true for any two KafkaConsumer instances which have the same + * topic, partition and name properties + */ + @Override + boolean equals(Object compared) { + /* bail early for object identity */ + if (this.is(compared)) { + return true + } + + if (!(compared instanceof KafkaConsumer)) { + return false + } + + if ((this.topic == compared.topic) && + (this.partition == compared.partition) && + (this.name == compared.name)) { + return true + } + + return false + } + +} diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index 9337770..d74e7a7 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -28,12 +28,14 @@ class KafkaPoller extends Thread { private Boolean keepRunning = true private Boolean shouldReconnect = false private ConcurrentHashMap brokerConsumerMap - private AbstractMap> consumersMap + private AbstractMap topicOffsetMap private List brokers private List onDelta + private AbstractSet currentTopics - KafkaPoller(AbstractMap map) { - this.consumersMap = map + KafkaPoller(AbstractMap map, AbstractSet topicSet) { + this.topicOffsetMap = map + this.currentTopics = topicSet this.brokerConsumerMap = [:] this.brokers = [] this.onDelta = [] @@ -49,7 +51,10 @@ class KafkaPoller extends Thread { reconnect() } - if (this.consumersMap.size() > 0) { + /* Only makes sense to try to dump meta-data if we've got some + * topics that we should keep an eye on + */ + if (this.currentTopics.size() > 0) { dumpMetadata() } @@ -64,10 +69,10 @@ class KafkaPoller extends Thread { withTopicsAndPartitions(metadata) { tp, p -> try { - processDeltasFor(tp, p) + captureLatestOffsetFor(tp, p) } catch (Exception ex) { - logger.error("Failed to process deltas for ${tp.topic}:${tp.partition}", ex) + logger.error("Failed to fetch latest for ${tp.topic}:${tp.partition}", ex) } } @@ -92,23 +97,15 @@ class KafkaPoller extends Thread { /** - * Fetch the leader metadata and invoke our callbacks with the right deltas - * for the given topic and partition information + * Fetch the leader metadata and update our data structures */ - void processDeltasFor(TopicPartition tp, Object partitionMetadata) { + void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) { Integer leaderId = partitionMetadata.leader.get()?.id Integer partitionId = partitionMetadata.partitionId Long offset = latestFromLeader(leaderId, tp.topic, partitionId) - this.consumersMap[tp].each { zk.ConsumerOffset c -> - logger.debug("Values for ${c.groupName} on ${tp.topic}:${tp.partition}: ${offset} - ${c.offset}") - - Long delta = offset - c.offset - this.onDelta.each { Closure callback -> - callback.call(c.groupName, tp, delta) - } - } + this.topicOffsetMap[tp] = offset } Long latestFromLeader(Integer leaderId, String topic, Integer partition) { @@ -159,14 +156,6 @@ class KafkaPoller extends Thread { this.shouldReconnect = true } - /** - * Collect all the topics from our given consumer map and return a list of - * them - */ - private List collectCurrentTopics() { - return this.consumersMap.keySet().collect { TopicPartition k -> k.topic } - } - /** * Return the brokers list as an immutable Seq collection for the Kafka * scala underpinnings @@ -185,7 +174,7 @@ class KafkaPoller extends Thread { private Object fetchMetadataForCurrentTopics() { return ClientUtils.fetchTopicMetadata( - toScalaSet(new HashSet(collectCurrentTopics())), + toScalaSet(currentTopics), brokersSeq, KAFKA_CLIENT_ID, KAFKA_TIMEOUT, diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index e3a7c20..ba01124 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -3,27 +3,33 @@ package com.github.lookout.verspaetung import com.github.lookout.verspaetung.zk.BrokerTreeWatcher import com.github.lookout.verspaetung.zk.KafkaSpoutTreeWatcher import com.github.lookout.verspaetung.zk.StandardTreeWatcher +import com.github.lookout.verspaetung.metrics.ConsumerGauge import java.util.AbstractMap import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentSkipListSet +import java.util.concurrent.TimeUnit import groovy.transform.TypeChecked -import com.timgroup.statsd.StatsDClient -import com.timgroup.statsd.NonBlockingDogStatsDClient - import org.apache.commons.cli.* import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.TreeCache +import org.coursera.metrics.datadog.DatadogReporter +import org.coursera.metrics.datadog.transport.UdpTransport import org.slf4j.Logger import org.slf4j.LoggerFactory +import com.codahale.metrics.* + + class Main { private static final String METRICS_PREFIX = 'verspaetung' - private static StatsDClient statsd private static Logger logger + private static ScheduledReporter reporter + private static final MetricRegistry registry = new MetricRegistry() static void main(String[] args) { String statsdPrefix = METRICS_PREFIX @@ -53,74 +59,115 @@ class Main { statsdPrefix = "${cli.getOptionValue('prefix')}.${METRICS_PREFIX}" } + + registry.register(MetricRegistry.name(Main.class, 'heartbeat'), + new metrics.HeartbeatGauge()) + ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3) CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperHosts, retry) - ConcurrentHashMap> consumers = new ConcurrentHashMap() - - statsd = new NonBlockingDogStatsDClient(statsdPrefix, statsdHost, statsdPort) - client.start() - KafkaPoller poller = setupKafkaPoller(consumers, statsd, cli.hasOption('n')) + /* We need a good shared set of all the topics we should keep an eye on + * for the Kafka poller. This will be written to by the tree watchers + * and read from by the poller, e.g. + * Watcher --/write/--> watchedTopics --/read/--> KafkaPoller + */ + ConcurrentSkipListSet watchedTopics = new ConcurrentSkipListSet<>() + + /* consumerOffsets is where we will keep all the offsets from Zookeeper + * from the Kafka consumers + */ + ConcurrentHashMap consumerOffsets = new ConcurrentHashMap<>() + + /* topicOffsets is where the KafkaPoller should be writing all of it's + * latest offsets from querying the Kafka brokers + */ + ConcurrentHashMap topicOffsets = new ConcurrentHashMap<>() + + /* Hash map for keeping track of KafkaConsumer to ConsumerGauge + * instances. We're only really doing this because the MetricRegistry + * doesn't do a terrific job of exposing this for us + */ + ConcurrentHashMap consumerGauges = new ConcurrentHashMap<>() + + + KafkaPoller poller = new KafkaPoller(topicOffsets, watchedTopics) BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client).start() - StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, consumers).start() + brokerWatcher.onBrokerUpdates << { brokers -> poller.refresh(brokers) } + + poller.start() + + /* Need to reuse this closure for the KafkaSpoutTreeWatcher if we have + * one + */ + Closure gaugeRegistrar = { KafkaConsumer consumer -> + registerMetricFor(consumer, consumerGauges, consumerOffsets, topicOffsets) + } + + StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(client, + watchedTopics, + consumerOffsets) + consumerWatcher.onConsumerData << gaugeRegistrar + consumerWatcher.start() + /* Assuming that most people aren't needing to run Storm-based watchers * as well */ if (cli.hasOption('s')) { - KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, consumers) + KafkaSpoutTreeWatcher stormWatcher = new KafkaSpoutTreeWatcher(client, + watchedTopics, + consumerOffsets) + stormWatcher.onConsumerData << gaugeRegistrar stormWatcher.start() } - consumerWatcher.onInitComplete << { - logger.info("standard consumers initialized to ${consumers.size()} (topic, partition) tuples") + + if (cli.hasOption('n')) { + reporter = ConsoleReporter.forRegistry(registry) + .convertRatesTo(TimeUnit.SECONDS) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .build() + } + else { + UdpTransport transport = new UdpTransport.Builder() + .withPrefix(statsdPrefix) + .build() + + reporter = DatadogReporter.forRegistry(registry) + .withEC2Host() + .withTransport(transport) + .build() } - brokerWatcher.onBrokerUpdates << { brokers -> - poller.refresh(brokers) + /* Start the reporter if we've got it */ + reporter?.start(1, TimeUnit.SECONDS) + + logger.info("Starting wait loop...") + synchronized(this) { + wait() + } + } + + static void registerMetricFor(KafkaConsumer consumer, + ConcurrentHashMap consumerGauges, + ConcurrentHashMap consumerOffsets, + ConcurrentHashMap topicOffsets) { + if (consumerGauges.containsKey(consumer)) { + return } - logger.info("Started wait loop...") - - while (true) { - statsd?.recordGaugeValue('heartbeat', 1) - Thread.sleep(1 * 1000) - } - - logger.info("exiting..") - poller.die() - poller.join() - return + ConsumerGauge gauge = new ConsumerGauge(consumer, + consumerOffsets, + topicOffsets) + consumerGauges.put(consumer, gauge) + this.registry.register(gauge.nameForRegistry, gauge) } /** - * Create and start a KafkaPoller with the given statsd and consumers map + * Create the Options option necessary for verspaetung to have CLI options */ - static KafkaPoller setupKafkaPoller(AbstractMap consumers, - NonBlockingDogStatsDClient statsd, - Boolean dryRun) { - KafkaPoller poller = new KafkaPoller(consumers) - Closure deltaCallback = { String name, TopicPartition tp, Long delta -> - println "${tp.topic}:${tp.partition}-${name} = ${delta}" - } - - if (!dryRun) { - deltaCallback = { String name, TopicPartition tp, Long delta -> - statsd.recordGaugeValue(tp.topic, delta, [ - 'topic' : tp.topic, - 'partition' : tp.partition, - 'consumer-group' : name - ]) - } - } - - poller.onDelta << deltaCallback - poller.start() - return poller - } - static Options createCLI() { Options options = new Options() @@ -170,6 +217,11 @@ class Main { return options } + + /** + * Parse out all the command line options from the array of string + * arguments + */ static CommandLine parseCommandLine(String[] args) { Options options = createCLI() PosixParser parser = new PosixParser() diff --git a/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy b/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy new file mode 100644 index 0000000..fad3c24 --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy @@ -0,0 +1,66 @@ +package com.github.lookout.verspaetung.metrics + +import java.util.AbstractMap +import com.codahale.metrics.Gauge +import groovy.transform.TypeChecked +import org.coursera.metrics.datadog.Tagged + +import com.github.lookout.verspaetung.KafkaConsumer +import com.github.lookout.verspaetung.TopicPartition + +/** + * Dropwizard Metrics Gauge for reporting the value of a given KafkaConsumer + */ +@TypeChecked +class ConsumerGauge implements Gauge, Tagged { + protected KafkaConsumer consumer + protected AbstractMap consumers + protected AbstractMap topics + private TopicPartition topicPartition + + ConsumerGauge(KafkaConsumer consumer, + AbstractMap consumers, + AbstractMap topics) { + this.consumer = consumer + this.consumers = consumers + this.topics = topics + + this.topicPartition = new TopicPartition(consumer.topic, consumer.partition) + } + + @Override + Integer getValue() { + if ((!this.consumers.containsKey(consumer)) || + (!this.topics.containsKey(topicPartition))) { + return 0 + } + return ((Integer)this.topics[topicPartition]) - this.consumers[consumer] + } + + @Override + List getTags() { + return ["partition:${this.consumer.partition}", + "topic:${this.consumer.topic}", + "consumer-group:${this.consumer.name}" + ].collect { s -> s.strings.join('') } + } + + /** + * return a unique name for this gauge + */ + String getNameForRegistry() { + return "${this.consumer.topic}.${this.consumer.partition}.${this.consumer.name}" + } + + @Override + String getName() { + return this.consumer.topic + + /* need to return this if we're just using the console or statsd + * reporters + + return "${this.consumer.topic}.${this.consumer.partition}.${this.consumer.name}" + + */ + } +} diff --git a/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy b/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy new file mode 100644 index 0000000..603864a --- /dev/null +++ b/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy @@ -0,0 +1,14 @@ +package com.github.lookout.verspaetung.metrics + +import com.codahale.metrics.* + +/** + * A simple gauge that will always just return 1 indicating that the process is + * alive + */ +class HeartbeatGauge implements Gauge { + @Override + Integer getValue() { + return 1 + } +} diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy index 69fac7d..2a3e91a 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy @@ -1,5 +1,6 @@ package com.github.lookout.verspaetung.zk +import com.github.lookout.verspaetung.KafkaConsumer import com.github.lookout.verspaetung.TopicPartition import java.util.concurrent.CopyOnWriteArrayList @@ -11,12 +12,16 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent @TypeChecked abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher { - protected AbstractMap> consumersMap + protected AbstractMap consumerOffsets + protected AbstractSet watchedTopics + protected List onConsumerData = [] AbstractConsumerTreeWatcher(CuratorFramework client, - AbstractMap consumersMap) { + AbstractSet topics, + AbstractMap offsets) { super(client) - this.consumersMap = consumersMap + this.watchedTopics = topics + this.consumerOffsets = offsets } /** @@ -61,17 +66,18 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher { * this class on instantiation */ void trackConsumerOffset(ConsumerOffset offset) { - if (this.consumersMap == null) { + if (this.consumerOffsets == null) { return } - TopicPartition key = new TopicPartition(offset.topic, offset.partition) + this.watchedTopics << offset.topic + KafkaConsumer consumer = new KafkaConsumer(offset.topic, + offset.partition, + offset.groupName) + this.consumerOffsets[consumer] = offset.offset - if (this.consumersMap.containsKey(key)) { - this.consumersMap[key] << offset - } - else { - this.consumersMap[key] = new CopyOnWriteArrayList([offset]) + this.onConsumerData.each { Closure c -> + c.call(consumer) } } diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy index 1865823..eba5030 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy @@ -17,8 +17,10 @@ class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher { private static final String ZK_PATH = '/kafka_spout' private JsonSlurper json - KafkaSpoutTreeWatcher(CuratorFramework client, AbstractMap consumersMap) { - super(client, consumersMap) + KafkaSpoutTreeWatcher(CuratorFramework client, + AbstractSet topics, + AbstractMap offsets) { + super(client, topics, offsets) this.json = new JsonSlurper() } diff --git a/src/test/groovy/com/github/lookout/verspaetung/KafkaConsumerSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/KafkaConsumerSpec.groovy new file mode 100644 index 0000000..458b872 --- /dev/null +++ b/src/test/groovy/com/github/lookout/verspaetung/KafkaConsumerSpec.groovy @@ -0,0 +1,38 @@ +package com.github.lookout.verspaetung + +import spock.lang.* + +class KafkaConsumerSpec extends Specification { + String topic = 'spock-topic' + Integer partition = 2 + String consumerName = 'spock-consumer' + + def "the constructor should set the properties properly"() { + given: + KafkaConsumer consumer = new KafkaConsumer(topic, partition, consumerName) + + expect: + consumer instanceof KafkaConsumer + consumer.topic == topic + consumer.partition == partition + consumer.name == consumerName + } + + def "equals() is true with identical source material"() { + given: + KafkaConsumer consumer1 = new KafkaConsumer(topic, partition, consumerName) + KafkaConsumer consumer2 = new KafkaConsumer(topic, partition, consumerName) + + expect: + consumer1 == consumer2 + } + + def "equals() is false with differing source material"() { + given: + KafkaConsumer consumer1 = new KafkaConsumer(topic, partition, consumerName) + KafkaConsumer consumer2 = new KafkaConsumer(topic, partition, "i am different") + + expect: + consumer1 != consumer2 + } +} diff --git a/src/test/groovy/com/github/lookout/verspaetung/metrics/ConsumerGaugeSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/metrics/ConsumerGaugeSpec.groovy new file mode 100644 index 0000000..bf7300a --- /dev/null +++ b/src/test/groovy/com/github/lookout/verspaetung/metrics/ConsumerGaugeSpec.groovy @@ -0,0 +1,43 @@ +package com.github.lookout.verspaetung.metrics + +import spock.lang.* + +import com.github.lookout.verspaetung.KafkaConsumer +import com.github.lookout.verspaetung.TopicPartition + +class ConsumerGaugeSpec extends Specification { + private KafkaConsumer consumer + private TopicPartition tp + + def setup() { + this.tp = new TopicPartition('spock-topic', 1) + this.consumer = new KafkaConsumer(tp.topic, tp.partition, 'spock-consumer') + } + + def "constructor should work"() { + given: + ConsumerGauge gauge = new ConsumerGauge(consumer, [:], [:]) + + expect: + gauge.consumer instanceof KafkaConsumer + gauge.consumers instanceof AbstractMap + } + + def "getValue() should source a value from the map"() { + given: + ConsumerGauge gauge = new ConsumerGauge(this.consumer, + [(this.consumer) : 2], + [(this.tp) : 3]) + + expect: + gauge.value == 1 + } + + def "getValue() should return zero for a consumer not in the map"() { + given: + ConsumerGauge gauge = new ConsumerGauge(consumer, [:], [:]) + + expect: + gauge.value == 0 + } +} diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy index 90f7d35..2dba092 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcherSpec.groovy @@ -13,7 +13,7 @@ class AbstractConsumerTreeWatcherSpec extends Specification { class MockWatcher extends AbstractConsumerTreeWatcher { MockWatcher() { - super(null, [:]) + super(null, new HashSet(), [:]) } ConsumerOffset processChildData(ChildData d) { } String zookeeperPath() { return '/zk/spock' } @@ -63,10 +63,11 @@ class AbstractConsumerTreeWatcherSpec extends Specification { watcher.trackConsumerOffset(offset) then: - watcher.consumersMap.size() == 1 + watcher.consumerOffsets.size() == 1 + watcher.watchedTopics.size() == 1 } - def "trackConsumerOffset() should append to a list for existing topics in the map"() { + def "trackConsumerOffset() append an offset but not a topic for different group names"() { given: String topic = 'spock-topic' TopicPartition mapKey = new TopicPartition(topic, 0) @@ -80,8 +81,8 @@ class AbstractConsumerTreeWatcherSpec extends Specification { watcher.trackConsumerOffset(secondOffset) then: - watcher.consumersMap.size() == 1 - watcher.consumersMap[mapKey].size() == 2 + watcher.watchedTopics.size() == 1 + watcher.consumerOffsets.size() == 2 } diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy index a5f6d99..4a26333 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcherSpec.groovy @@ -11,7 +11,7 @@ class KafkaSpoutTreeWatcherSpec extends Specification { def setup() { this.mockCurator = Mock(CuratorFramework) - this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, [:]) + this.watcher = new KafkaSpoutTreeWatcher(this.mockCurator, new HashSet(), [:]) } def "consumerNameFromPath() should give the right name for a valid path"() { diff --git a/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy b/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy index ef8a038..52463bb 100644 --- a/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy +++ b/src/test/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcherSpec.groovy @@ -11,7 +11,7 @@ class StandardTreeWatcherSpec extends Specification { def setup() { this.mockCurator = Mock(CuratorFramework) - this.watcher = new StandardTreeWatcher(this.mockCurator, [:]) + this.watcher = new StandardTreeWatcher(this.mockCurator, new HashSet(), [:]) } def "processChildData should return null if the path is invalid"() {