diff --git a/build.gradle b/build.gradle index 5666572..d16173b 100644 --- a/build.gradle +++ b/build.gradle @@ -32,7 +32,7 @@ repositories { } dependencies { - compile 'org.codehaus.groovy:groovy-all:2.4.4' + compile 'org.codehaus.groovy:groovy-all:[2.4.4,2.5)' compile 'io.ratpack:ratpack-handlebars:[0.9.19,1.0)' compile 'io.ratpack:ratpack-jackson:[0.9.17,1.0)' diff --git a/src/main/groovy/offtopic/KafkaService.groovy b/src/main/groovy/offtopic/KafkaService.groovy index 4308c66..942dac8 100644 --- a/src/main/groovy/offtopic/KafkaService.groovy +++ b/src/main/groovy/offtopic/KafkaService.groovy @@ -2,7 +2,6 @@ package offtopic import groovy.json.JsonSlurper import groovy.util.logging.Slf4j -import offtopic.curator.CuratorPool import org.apache.curator.framework.CuratorFramework /** @@ -14,28 +13,28 @@ class KafkaService { final static String BROKERS_PATH = "/brokers/ids" final static String TOPICS_PATH = "/brokers/topics" - public static List fetchBrokers() { + /** + * Fetch brokers based on their conventional location in Zookeeper (/brokers/ids) + * and return a list of maps for the various discovered broker metadata + * + * {"jmx_port":9999,"timestamp":"1428168559585","host":"kafka-stage1-1.lasangia.io","version":1,"port":6667} + * + * @return List of Map objects containing deserialized metadata + */ + static List fetchBrokers(CuratorFramework curator) { JsonSlurper parser = new JsonSlurper() - List brokers = [] + List brokers = [] - CuratorPool.withCurator { CuratorFramework c -> - c.children.forPath(BROKERS_PATH).each { String id -> - // Pulling this into a String buffer since parse(byte[]) is - // throwing a stackoverflow error - String buffer = new String(c.data.forPath("${BROKERS_PATH}/${id}")) - brokers.add(parser.parseText(buffer)) - } + curator.children.forPath(BROKERS_PATH).each { String id -> + byte[] buffer = curator.data.forPath("${BROKERS_PATH}/${id}") + brokers.add(parser.parse(buffer) as Map) } log.info('Fetched brokers from Zookeeper: {}', brokers) return brokers } - public static List fetchTopics() { - List brokers = [] - CuratorPool.withCurator { c -> - brokers = c.children.forPath(TOPICS_PATH) - } - return brokers + static List fetchTopics(CuratorFramework curator) { + return curator.children.forPath(TOPICS_PATH) } } diff --git a/src/main/groovy/offtopic/KafkaSubscriber.groovy b/src/main/groovy/offtopic/KafkaSubscriber.groovy index 6fc7107..f323e57 100644 --- a/src/main/groovy/offtopic/KafkaSubscriber.groovy +++ b/src/main/groovy/offtopic/KafkaSubscriber.groovy @@ -11,33 +11,32 @@ import groovy.util.logging.Slf4j */ @Slf4j class KafkaSubscriber { - String topic private Closure callback private ConsumerConnector consumer private String zookeepers private String consumerId - public KafkaSubscriber(String zks, String topicName, String consumerId) { + KafkaSubscriber(String zks, String topicName, String consumerId) { this.topic = topicName this.zookeepers = zks this.consumerId = consumerId } - public void setCallback(Closure theCallback) { - this.callback = theCallback + void setCallback(Closure theCallback) { + callback = theCallback } - public void connect() { - this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector( - createConsumerConfig(this.zookeepers, this.consumerId)) + void connect() { + consumer = kafka.consumer.Consumer.createJavaConsumerConnector( + createConsumerConfig(zookeepers, consumerId)) } - public void shutdown() { + void shutdown() { this.consumer?.shutdown() } - public void consume() { + void consume() { if (this.consumer == null) { log.warn "no consumer, gtfo" return diff --git a/src/ratpack/ratpack.groovy b/src/ratpack/ratpack.groovy index d62ed64..2e1693f 100644 --- a/src/ratpack/ratpack.groovy +++ b/src/ratpack/ratpack.groovy @@ -1,3 +1,5 @@ +import offtopic.curator.CuratorPool +import org.apache.curator.framework.CuratorFramework import org.slf4j.Logger import org.slf4j.LoggerFactory import ratpack.handlebars.HandlebarsModule @@ -28,7 +30,10 @@ ratpack { } get('topics') { - topics = KafkaService.fetchTopics() + List topics + CuratorPool.withCurator { CuratorFramework cf -> + topics = KafkaService.fetchTopics(cf) + } if (request.headers.get('Content-Type') == 'application/json') { render(json(topics)) @@ -73,7 +78,10 @@ ratpack { } get('brokers') { - brokers = KafkaService.fetchBrokers() + List brokers + CuratorPool.withCurator { CuratorFramework cf -> + brokers = KafkaService.fetchBrokers(cf) + } if (request.headers.get('Content-Type') == 'application/json') { render(json(brokers)) diff --git a/src/test/groovy/offtopic/KafkaSubscriberSpec.groovy b/src/test/groovy/offtopic/KafkaSubscriberSpec.groovy index 879606c..7d37491 100644 --- a/src/test/groovy/offtopic/KafkaSubscriberSpec.groovy +++ b/src/test/groovy/offtopic/KafkaSubscriberSpec.groovy @@ -5,7 +5,7 @@ import spock.lang.Specification class KafkaSubscriberSpec extends Specification { def "initializing"() { when: - def subscriber = new KafkaSubscriber(null, 'spock', null) + KafkaSubscriber subscriber = new KafkaSubscriber(null, 'spock', null) then: subscriber instanceof KafkaSubscriber