Starting to clean up KafkaService so it's less dependent on a global CuratorPool

This commit is contained in:
R. Tyler Croy 2015-09-02 10:38:34 -07:00
parent fd7db966ff
commit 48cb3ac459
No known key found for this signature in database
GPG Key ID: 1426C7DC3F51E16F
5 changed files with 35 additions and 29 deletions

View File

@ -32,7 +32,7 @@ repositories {
} }
dependencies { dependencies {
compile 'org.codehaus.groovy:groovy-all:2.4.4' compile 'org.codehaus.groovy:groovy-all:[2.4.4,2.5)'
compile 'io.ratpack:ratpack-handlebars:[0.9.19,1.0)' compile 'io.ratpack:ratpack-handlebars:[0.9.19,1.0)'
compile 'io.ratpack:ratpack-jackson:[0.9.17,1.0)' compile 'io.ratpack:ratpack-jackson:[0.9.17,1.0)'

View File

@ -2,7 +2,6 @@ package offtopic
import groovy.json.JsonSlurper import groovy.json.JsonSlurper
import groovy.util.logging.Slf4j import groovy.util.logging.Slf4j
import offtopic.curator.CuratorPool
import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.CuratorFramework
/** /**
@ -14,28 +13,28 @@ class KafkaService {
final static String BROKERS_PATH = "/brokers/ids" final static String BROKERS_PATH = "/brokers/ids"
final static String TOPICS_PATH = "/brokers/topics" final static String TOPICS_PATH = "/brokers/topics"
public static List<String> fetchBrokers() { /**
* Fetch brokers based on their conventional location in Zookeeper (/brokers/ids)
* and return a list of maps for the various discovered broker metadata
*
* {"jmx_port":9999,"timestamp":"1428168559585","host":"kafka-stage1-1.lasangia.io","version":1,"port":6667}
*
* @return List of Map objects containing deserialized metadata
*/
static List<Map> fetchBrokers(CuratorFramework curator) {
JsonSlurper parser = new JsonSlurper() JsonSlurper parser = new JsonSlurper()
List<String> brokers = [] List<Map> brokers = []
CuratorPool.withCurator { CuratorFramework c -> curator.children.forPath(BROKERS_PATH).each { String id ->
c.children.forPath(BROKERS_PATH).each { String id -> byte[] buffer = curator.data.forPath("${BROKERS_PATH}/${id}")
// Pulling this into a String buffer since parse(byte[]) is brokers.add(parser.parse(buffer) as Map)
// throwing a stackoverflow error
String buffer = new String(c.data.forPath("${BROKERS_PATH}/${id}"))
brokers.add(parser.parseText(buffer))
}
} }
log.info('Fetched brokers from Zookeeper: {}', brokers) log.info('Fetched brokers from Zookeeper: {}', brokers)
return brokers return brokers
} }
public static List<String> fetchTopics() { static List<String> fetchTopics(CuratorFramework curator) {
List brokers = [] return curator.children.forPath(TOPICS_PATH)
CuratorPool.withCurator { c ->
brokers = c.children.forPath(TOPICS_PATH)
}
return brokers
} }
} }

View File

@ -11,33 +11,32 @@ import groovy.util.logging.Slf4j
*/ */
@Slf4j @Slf4j
class KafkaSubscriber { class KafkaSubscriber {
String topic String topic
private Closure callback private Closure callback
private ConsumerConnector consumer private ConsumerConnector consumer
private String zookeepers private String zookeepers
private String consumerId private String consumerId
public KafkaSubscriber(String zks, String topicName, String consumerId) { KafkaSubscriber(String zks, String topicName, String consumerId) {
this.topic = topicName this.topic = topicName
this.zookeepers = zks this.zookeepers = zks
this.consumerId = consumerId this.consumerId = consumerId
} }
public void setCallback(Closure theCallback) { void setCallback(Closure theCallback) {
this.callback = theCallback callback = theCallback
} }
public void connect() { void connect() {
this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector( consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(this.zookeepers, this.consumerId)) createConsumerConfig(zookeepers, consumerId))
} }
public void shutdown() { void shutdown() {
this.consumer?.shutdown() this.consumer?.shutdown()
} }
public void consume() { void consume() {
if (this.consumer == null) { if (this.consumer == null) {
log.warn "no consumer, gtfo" log.warn "no consumer, gtfo"
return return

View File

@ -1,3 +1,5 @@
import offtopic.curator.CuratorPool
import org.apache.curator.framework.CuratorFramework
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import ratpack.handlebars.HandlebarsModule import ratpack.handlebars.HandlebarsModule
@ -28,7 +30,10 @@ ratpack {
} }
get('topics') { get('topics') {
topics = KafkaService.fetchTopics() List<String> topics
CuratorPool.withCurator { CuratorFramework cf ->
topics = KafkaService.fetchTopics(cf)
}
if (request.headers.get('Content-Type') == 'application/json') { if (request.headers.get('Content-Type') == 'application/json') {
render(json(topics)) render(json(topics))
@ -73,7 +78,10 @@ ratpack {
} }
get('brokers') { get('brokers') {
brokers = KafkaService.fetchBrokers() List<Map> brokers
CuratorPool.withCurator { CuratorFramework cf ->
brokers = KafkaService.fetchBrokers(cf)
}
if (request.headers.get('Content-Type') == 'application/json') { if (request.headers.get('Content-Type') == 'application/json') {
render(json(brokers)) render(json(brokers))

View File

@ -5,7 +5,7 @@ import spock.lang.Specification
class KafkaSubscriberSpec extends Specification { class KafkaSubscriberSpec extends Specification {
def "initializing"() { def "initializing"() {
when: when:
def subscriber = new KafkaSubscriber(null, 'spock', null) KafkaSubscriber subscriber = new KafkaSubscriber(null, 'spock', null)
then: then:
subscriber instanceof KafkaSubscriber subscriber instanceof KafkaSubscriber