Exploratory testing, a veritable boatload of garbage and hacks
This will at least create some stupid consumers and try to fetch the latest offsets for a bunch of stupid topics. The Kafka/Scala internals are so immensely frustrating
This commit is contained in:
parent
293ebb6fa9
commit
193b147064
|
@ -2,13 +2,32 @@ package com.github.lookout.verspaetung
|
|||
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
import kafka.cluster.Broker
|
||||
import kafka.client.ClientUtils
|
||||
import kafka.consumer.SimpleConsumer
|
||||
import kafka.common.TopicAndPartition
|
||||
import kafka.javaapi.*
|
||||
/* UGH */
|
||||
import scala.collection.JavaConversions
|
||||
|
||||
@TypeChecked
|
||||
/* Can't type check this because it makes the calls in and out of Scala an
|
||||
* atrocious pain in the ass
|
||||
*/
|
||||
//@TypeChecked
|
||||
class KafkaPoller extends Thread {
|
||||
private final String KAFKA_CLIENT_ID = 'VerspaetungClient'
|
||||
private final Integer KAFKA_TIMEOUT = (5 * 1000)
|
||||
private final Integer KAFKA_BUFFER = (100 * 1024)
|
||||
|
||||
private Boolean keepRunning = true
|
||||
private List<KafkaBroker> brokers
|
||||
private Boolean shouldReconnect = false
|
||||
private HashMap<Integer, SimpleConsumer> brokerConsumerMap = [:]
|
||||
private List<Broker> brokers = []
|
||||
private AbstractMap<TopicPartition, List<zk.ConsumerOffset>> consumersMap
|
||||
|
||||
KafkaPoller(AbstractMap map) {
|
||||
this.consumersMap = map
|
||||
}
|
||||
|
||||
void run() {
|
||||
while (keepRunning) {
|
||||
|
@ -22,11 +41,66 @@ class KafkaPoller extends Thread {
|
|||
}
|
||||
}
|
||||
|
||||
void dumpMetadata() {
|
||||
println 'dumping'
|
||||
def topics = this.consumersMap.keySet().collect { TopicPartition k -> k.topic }
|
||||
def metadata = ClientUtils.fetchTopicMetadata(toScalaSet(new HashSet(topics)),
|
||||
brokersSeq,
|
||||
KAFKA_CLIENT_ID,
|
||||
KAFKA_TIMEOUT,
|
||||
0)
|
||||
|
||||
withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f ->
|
||||
withScalaCollection(f.partitionsMetadata).each { p ->
|
||||
println "Consumer for ${f.topic}:${p.partitionId}"
|
||||
SimpleConsumer consumer = this.brokerConsumerMap[p.leader.get().id]
|
||||
println consumer
|
||||
TopicAndPartition topicAndPart = new TopicAndPartition(f.topic, p.partitionId)
|
||||
println consumer.earliestOrLatestOffset(
|
||||
topicAndPart,
|
||||
-1,
|
||||
0)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
println 'dumped'
|
||||
}
|
||||
|
||||
Iterable withScalaCollection(scala.collection.Iterable iter) {
|
||||
return JavaConversions.asJavaIterable(iter)
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocking reconnect to the Kafka brokers
|
||||
*/
|
||||
void reconnect() {
|
||||
println "reconnecting"
|
||||
this.brokers.each { Broker b ->
|
||||
SimpleConsumer consumer = new SimpleConsumer(b.host,
|
||||
b.port,
|
||||
KAFKA_TIMEOUT,
|
||||
KAFKA_BUFFER,
|
||||
KAFKA_CLIENT_ID)
|
||||
consumer.connect()
|
||||
this.brokerConsumerMap[b.id] = consumer
|
||||
}
|
||||
this.shouldReconnect =false
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the brokers list as an immutable Seq collection for the Kafka
|
||||
* scala underpinnings
|
||||
*/
|
||||
scala.collection.immutable.Seq getBrokersSeq() {
|
||||
return JavaConversions.asScalaBuffer(this.brokers).toList()
|
||||
}
|
||||
|
||||
/**
|
||||
* Return scala.collection.mutable.Set for the given List
|
||||
*/
|
||||
scala.collection.mutable.Set toScalaSet(Set set) {
|
||||
return JavaConversions.asScalaSet(set)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -34,14 +108,18 @@ class KafkaPoller extends Thread {
|
|||
*/
|
||||
void die() {
|
||||
this.keepRunning = false
|
||||
this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client ->
|
||||
client.disconnect()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Store a new list of KafkaBroker objects and signal a reconnection
|
||||
*/
|
||||
void refresh(List brokers) {
|
||||
println "refresh: ${brokers}"
|
||||
this.brokers = brokers
|
||||
void refresh(List<KafkaBroker> brokers) {
|
||||
this.brokers = brokers.collect { KafkaBroker b ->
|
||||
new Broker(b.brokerId, b.host, b.port)
|
||||
}
|
||||
this.shouldReconnect = true
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,10 +24,11 @@ class Main {
|
|||
|
||||
TreeCache cache = new TreeCache(client, '/consumers')
|
||||
|
||||
KafkaPoller poller = new KafkaPoller()
|
||||
KafkaPoller poller = new KafkaPoller(consumers)
|
||||
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(consumers)
|
||||
consumerWatcher.onInitComplete << {
|
||||
println "standard consumers initialized to ${consumers.size()} (topic, partition) tuples"
|
||||
poller.dumpMetadata()
|
||||
}
|
||||
|
||||
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client)
|
||||
|
@ -42,7 +43,7 @@ class Main {
|
|||
cache.start()
|
||||
println 'started..'
|
||||
|
||||
Thread.sleep(9 * 1000)
|
||||
Thread.sleep(10 * 1000)
|
||||
|
||||
println 'exiting..'
|
||||
poller.die()
|
||||
|
|
|
@ -39,4 +39,9 @@ class TopicPartition {
|
|||
int hashCode() {
|
||||
return Objects.hash(this.topic, this.partition)
|
||||
}
|
||||
|
||||
@Override
|
||||
String toString() {
|
||||
return "${this.topic}:${this.partition}"
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue