From 26770189436b1cd91a2eb364af427532cc3d809c Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 24 Nov 2014 21:07:55 -0800 Subject: [PATCH] Refactor more Kafka subscription into OfftopicClient to allow multiple streams at once WOOOHOOO Fixes #7 --- .../groovy/offtopic/KafkaSubscriber.groovy | 7 ++- .../groovy/offtopic/OfftopicClient.groovy | 55 ++++++++++++++++++- src/ratpack/ratpack.groovy | 32 ++++------- .../groovy/offtopic/OfftopicClientSpec.groovy | 32 +++++++++++ 4 files changed, 102 insertions(+), 24 deletions(-) diff --git a/src/main/groovy/offtopic/KafkaSubscriber.groovy b/src/main/groovy/offtopic/KafkaSubscriber.groovy index f471663..536bbe6 100644 --- a/src/main/groovy/offtopic/KafkaSubscriber.groovy +++ b/src/main/groovy/offtopic/KafkaSubscriber.groovy @@ -49,7 +49,12 @@ class KafkaSubscriber { consumerMap.get(this.topic).each { stream -> def iterator = stream.iterator() while (iterator.hasNext()) { - this.callback.call(iterator.next()) + def message = iterator.next() + def data = ['raw' : new String(message.message()), + 'b64' : message.message().encodeBase64().toString(), + 'topic' : message.topic(), + 'tstamp' : System.currentTimeMillis()] + this.callback.call(data) } } } diff --git a/src/main/groovy/offtopic/OfftopicClient.groovy b/src/main/groovy/offtopic/OfftopicClient.groovy index e1b3c40..aa55edb 100644 --- a/src/main/groovy/offtopic/OfftopicClient.groovy +++ b/src/main/groovy/offtopic/OfftopicClient.groovy @@ -1,9 +1,60 @@ package offtopic +/** + * OfftopicClient coordinates the interactions between KafkaSubscriber objects + * and the websocket interactions + */ class OfftopicClient { - int clientId = 0 + public int clientId = 0 - public OfftopicClient() { + private Closure messageCallback = null + private String topicsPattern = null + private ArrayList subscribers = null + private Configuration config = null + + public OfftopicClient(Configuration configuration) { this.clientId = new Random().nextInt() + this.config = configuration + this.subscribers = new ArrayList() + } + + public ArrayList getSubscribers() { + return this.subscribers + } + + public void createSubscribersFor(String topicsPattern) { + topicsPattern.split("\\+").each { topic -> + if (topic.length() == 0) { + return + } + + KafkaSubscriber subscriber = new KafkaSubscriber(Configuration.instance.zookeepers, + topic, + "offtopic-${clientId}") + subscriber.callback = this.messageCallback + this.subscribers.add(subscriber) + } + } + + public void setOnMessageCallback(Closure c) { + this.messageCallback = c + } + + public void startSubscribers() { + this.subscribers.each { subscriber -> + Thread runner = new Thread({ + subscriber.connect() + println "subscriber connected" + subscriber.consume() + println "consume over!" + }) + runner.start() + } + } + + public void shutdown() { + this.subscribers.each { subscriber -> + subscriber.shutdown() + } } } diff --git a/src/ratpack/ratpack.groovy b/src/ratpack/ratpack.groovy index a52d389..a90877b 100644 --- a/src/ratpack/ratpack.groovy +++ b/src/ratpack/ratpack.groovy @@ -36,7 +36,6 @@ ratpack { } get('topics/:name') { - println offtopic.Configuration.instance['offtopic.zookeepers'] render "Fetching info for ${pathTokens.name}" } @@ -45,32 +44,23 @@ ratpack { } get('topics/:name/websocket') { ctx -> - client = new OfftopicClient() - subscriber = new KafkaSubscriber(Configuration.instance.zookeepers, - pathTokens.name, - "offtopic-${client.clientId}") - runner = new Thread({ - subscriber.connect() - println "subscriber connected" - subscriber.consume() - println "consume over!" - }) + println "creating thingies" + def client = new OfftopicClient(Configuration.instance) + println "client: ${client}" websocket(ctx) { ws -> - println "Connected ${ws} ${subscriber}" - subscriber.callback = { msg -> - println "called back with: ${msg}" - ws.send(groovy.json.JsonOutput.toJson(['raw' : new String(msg.message()), - 'b64' : msg.message().encodeBase64().toString(), - 'topic' : msg.topic(), - 'tstamp' : System.currentTimeMillis()])) + println "Connected ${ws}" + client.onMessageCallback = { m -> + println "called back with ${m}" + ws.send(groovy.json.JsonOutput.toJson(m)) println "sent message" } - runner.start() + client.createSubscribersFor(pathTokens.name) + print "subscribers created for ${pathTokens.name}" + client.startSubscribers() } connect { sock -> sock.onClose { - println "closing up ${subscriber}" - subscriber.shutdown() + client.shutdown() } sock.onMessage { msg -> println "client sent ${msg}" diff --git a/src/test/groovy/offtopic/OfftopicClientSpec.groovy b/src/test/groovy/offtopic/OfftopicClientSpec.groovy index 2aa5392..3b6ce5d 100644 --- a/src/test/groovy/offtopic/OfftopicClientSpec.groovy +++ b/src/test/groovy/offtopic/OfftopicClientSpec.groovy @@ -10,4 +10,36 @@ class OfftopicClientSpec extends Specification { then: client.clientId != 0 } + + def "initialization should create a subscribers ArrayList"() { + when: + def client = new OfftopicClient() + + then: + client.subscribers.size() == 0 + } +} + +class OfftopicClientCreateSubscribersSpec extends Specification { + def client = null + + def setup() { + this.client = new OfftopicClient() + } + + def "createSubscribersFor with an empty string"() { + when: + this.client.createSubscribersFor('') + + then: + this.client.subscribers.size() == 0 + } + + def "createSubscribersFor with a single topic"() { + when: + this.client.createSubscribersFor('spock-topic') + + then: + this.client.subscribers.size() == 1 + } }