diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index 743e579..f810f3f 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -2,13 +2,32 @@ package com.github.lookout.verspaetung import groovy.transform.TypeChecked +import kafka.cluster.Broker import kafka.client.ClientUtils +import kafka.consumer.SimpleConsumer +import kafka.common.TopicAndPartition +import kafka.javaapi.* +/* UGH */ +import scala.collection.JavaConversions -@TypeChecked +/* Can't type check this because it makes the calls in and out of Scala an + * atrocious pain in the ass + */ +//@TypeChecked class KafkaPoller extends Thread { + private final String KAFKA_CLIENT_ID = 'VerspaetungClient' + private final Integer KAFKA_TIMEOUT = (5 * 1000) + private final Integer KAFKA_BUFFER = (100 * 1024) + private Boolean keepRunning = true - private List brokers private Boolean shouldReconnect = false + private HashMap brokerConsumerMap = [:] + private List brokers = [] + private AbstractMap> consumersMap + + KafkaPoller(AbstractMap map) { + this.consumersMap = map + } void run() { while (keepRunning) { @@ -22,11 +41,66 @@ class KafkaPoller extends Thread { } } + void dumpMetadata() { + println 'dumping' + def topics = this.consumersMap.keySet().collect { TopicPartition k -> k.topic } + def metadata = ClientUtils.fetchTopicMetadata(toScalaSet(new HashSet(topics)), + brokersSeq, + KAFKA_CLIENT_ID, + KAFKA_TIMEOUT, + 0) + + withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f -> + withScalaCollection(f.partitionsMetadata).each { p -> + println "Consumer for ${f.topic}:${p.partitionId}" + SimpleConsumer consumer = this.brokerConsumerMap[p.leader.get().id] + println consumer + TopicAndPartition topicAndPart = new TopicAndPartition(f.topic, p.partitionId) + println consumer.earliestOrLatestOffset( + topicAndPart, + -1, + 0) + + } + } + + println 'dumped' + } + + Iterable withScalaCollection(scala.collection.Iterable iter) { + return JavaConversions.asJavaIterable(iter) + } + /** * Blocking reconnect to the Kafka brokers */ void reconnect() { println "reconnecting" + this.brokers.each { Broker b -> + SimpleConsumer consumer = new SimpleConsumer(b.host, + b.port, + KAFKA_TIMEOUT, + KAFKA_BUFFER, + KAFKA_CLIENT_ID) + consumer.connect() + this.brokerConsumerMap[b.id] = consumer + } + this.shouldReconnect =false + } + + /** + * Return the brokers list as an immutable Seq collection for the Kafka + * scala underpinnings + */ + scala.collection.immutable.Seq getBrokersSeq() { + return JavaConversions.asScalaBuffer(this.brokers).toList() + } + + /** + * Return scala.collection.mutable.Set for the given List + */ + scala.collection.mutable.Set toScalaSet(Set set) { + return JavaConversions.asScalaSet(set) } /** @@ -34,14 +108,18 @@ class KafkaPoller extends Thread { */ void die() { this.keepRunning = false + this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client -> + client.disconnect() + } } /** * Store a new list of KafkaBroker objects and signal a reconnection */ - void refresh(List brokers) { - println "refresh: ${brokers}" - this.brokers = brokers + void refresh(List brokers) { + this.brokers = brokers.collect { KafkaBroker b -> + new Broker(b.brokerId, b.host, b.port) + } this.shouldReconnect = true } } diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index 0304f36..7e496c0 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -24,10 +24,11 @@ class Main { TreeCache cache = new TreeCache(client, '/consumers') - KafkaPoller poller = new KafkaPoller() + KafkaPoller poller = new KafkaPoller(consumers) StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(consumers) consumerWatcher.onInitComplete << { println "standard consumers initialized to ${consumers.size()} (topic, partition) tuples" + poller.dumpMetadata() } BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client) @@ -42,7 +43,7 @@ class Main { cache.start() println 'started..' - Thread.sleep(9 * 1000) + Thread.sleep(10 * 1000) println 'exiting..' poller.die() diff --git a/src/main/groovy/com/github/lookout/verspaetung/TopicPartition.groovy b/src/main/groovy/com/github/lookout/verspaetung/TopicPartition.groovy index 797f56c..83312d1 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/TopicPartition.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/TopicPartition.groovy @@ -39,4 +39,9 @@ class TopicPartition { int hashCode() { return Objects.hash(this.topic, this.partition) } + + @Override + String toString() { + return "${this.topic}:${this.partition}" + } }