diff --git a/build.gradle b/build.gradle index 19af7f3..d05a0f6 100644 --- a/build.gradle +++ b/build.gradle @@ -39,6 +39,7 @@ dependencies { compile 'org.apache.kafka:kafka_2.10:0.8.1.1+' testCompile 'org.spockframework:spock-core:0.7-groovy-2.0' + testCompile 'cglib:cglib-nodep:2.2.+' } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/main/groovy/offtopic/Configuration.groovy b/src/main/groovy/offtopic/Configuration.groovy new file mode 100644 index 0000000..aea3c07 --- /dev/null +++ b/src/main/groovy/offtopic/Configuration.groovy @@ -0,0 +1,18 @@ +package offtopic + +/** + * class for wrapping our system configuration + */ +@Singleton +class Configuration extends Properties { + /** + * Load defaults, starting in the current working directory, searching for + * 'offtopic.properties' + */ + public boolean loadDefaults() { + File cwdConfiguration = new File('offtopic.properties') + if (cwdConfiguration.exists()) { + this.load(new FileInputStream(cwdConfiguration)) + } + } +} diff --git a/src/main/groovy/offtopic/KafkaSubscriber.groovy b/src/main/groovy/offtopic/KafkaSubscriber.groovy index b5a8aa7..f471663 100644 --- a/src/main/groovy/offtopic/KafkaSubscriber.groovy +++ b/src/main/groovy/offtopic/KafkaSubscriber.groovy @@ -14,9 +14,13 @@ class KafkaSubscriber { String topic private Closure callback private ConsumerConnector consumer + private String zookeepers + private String consumerId - public KafkaSubscriber(String topicName) { + public KafkaSubscriber(String zks, String topicName, String consumerId) { this.topic = topicName + this.zookeepers = zks + this.consumerId = consumerId } public void setCallback(Closure theCallback) { @@ -25,7 +29,7 @@ class KafkaSubscriber { public void connect() { this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector( - createConsumerConfig('localhost:2181', 'offtopic-spock-test')) + createConsumerConfig(this.zookeepers, this.consumerId)) } public void shutdown() { @@ -45,7 +49,7 @@ class KafkaSubscriber { consumerMap.get(this.topic).each { stream -> def iterator = stream.iterator() while (iterator.hasNext()) { - this.callback.call(iterator.next().message()) + this.callback.call(iterator.next()) } } } diff --git a/src/main/groovy/offtopic/OfftopicClient.groovy b/src/main/groovy/offtopic/OfftopicClient.groovy new file mode 100644 index 0000000..e1b3c40 --- /dev/null +++ b/src/main/groovy/offtopic/OfftopicClient.groovy @@ -0,0 +1,9 @@ +package offtopic + +class OfftopicClient { + int clientId = 0 + + public OfftopicClient() { + this.clientId = new Random().nextInt() + } +} diff --git a/src/main/groovy/offtopic/curator/CuratorPool.groovy b/src/main/groovy/offtopic/curator/CuratorPool.groovy index 48a0a08..a9b06d5 100644 --- a/src/main/groovy/offtopic/curator/CuratorPool.groovy +++ b/src/main/groovy/offtopic/curator/CuratorPool.groovy @@ -2,10 +2,22 @@ package offtopic.curator import org.apache.commons.pool2.impl.GenericObjectPool -@Singleton(strict=false) class CuratorPool extends GenericObjectPool{ - private CuratorPool() { - /** XXX: Figure out how to get ZK from settings */ - super(new CuratorClientObjectFactory('localhost:2181')) + private static CuratorPool instance = null + + public static void prepare(String zookeepers) { + this.instance = new CuratorPool(zookeepers) + } + + public static CuratorPool getInstance() { + if (this.instance == null) { + throw new Exception("Cannot access CuratorPool before prepare() has been called") + } + return this.instance + } + + private CuratorPool(String zookeepers) { + super(new CuratorClientObjectFactory(zookeepers)) + println "CREATING WITH ${zookeepers}" } } diff --git a/src/ratpack/offtopic.properties b/src/ratpack/offtopic.properties new file mode 100644 index 0000000..1e2c24c --- /dev/null +++ b/src/ratpack/offtopic.properties @@ -0,0 +1 @@ +zookeepers=localhost:2181 diff --git a/src/ratpack/public/js/ws.js b/src/ratpack/public/js/ws.js index a5c1d0b..94f908a 100644 --- a/src/ratpack/public/js/ws.js +++ b/src/ratpack/public/js/ws.js @@ -18,7 +18,7 @@ function watchTopic(name) { var el = "
" + data.raw + "
"; messages.prepend(el); $("#" + el_id).click(function(ev) { - $("#"+el_id+'_b64').show(); + $("#"+el_id+'_b64').toggle(); }); // Let's only keep the last 25 diff --git a/src/ratpack/ratpack.groovy b/src/ratpack/ratpack.groovy index eabc1b8..a52d389 100644 --- a/src/ratpack/ratpack.groovy +++ b/src/ratpack/ratpack.groovy @@ -8,9 +8,13 @@ import static ratpack.websocket.WebSockets.websocket import offtopic.KafkaService import offtopic.KafkaSubscriber +import offtopic.Configuration +import offtopic.OfftopicClient ratpack { bindings { + offtopic.Configuration.instance.loadDefaults() + offtopic.curator.CuratorPool.prepare(Configuration.instance.zookeepers) add new HandlebarsModule() add new JacksonModule() } @@ -32,6 +36,7 @@ ratpack { } get('topics/:name') { + println offtopic.Configuration.instance['offtopic.zookeepers'] render "Fetching info for ${pathTokens.name}" } @@ -40,7 +45,10 @@ ratpack { } get('topics/:name/websocket') { ctx -> - subscriber = new KafkaSubscriber(pathTokens.name) + client = new OfftopicClient() + subscriber = new KafkaSubscriber(Configuration.instance.zookeepers, + pathTokens.name, + "offtopic-${client.clientId}") runner = new Thread({ subscriber.connect() println "subscriber connected" @@ -52,8 +60,10 @@ ratpack { println "Connected ${ws} ${subscriber}" subscriber.callback = { msg -> println "called back with: ${msg}" - ws.send(groovy.json.JsonOutput.toJson(['raw' : new String(msg), - 'b64' : msg.encodeBase64().toString()])) + ws.send(groovy.json.JsonOutput.toJson(['raw' : new String(msg.message()), + 'b64' : msg.message().encodeBase64().toString(), + 'topic' : msg.topic(), + 'tstamp' : System.currentTimeMillis()])) println "sent message" } runner.start() diff --git a/src/test/groovy/offtopic/ConfigurationSpec.groovy b/src/test/groovy/offtopic/ConfigurationSpec.groovy new file mode 100644 index 0000000..35745b0 --- /dev/null +++ b/src/test/groovy/offtopic/ConfigurationSpec.groovy @@ -0,0 +1,6 @@ +package offtopic + +import spock.lang.Specification + +class ConfigurationSpec extends Specification { +} diff --git a/src/test/groovy/offtopic/KafkaSubscriberSpec.groovy b/src/test/groovy/offtopic/KafkaSubscriberSpec.groovy index 31092b6..879606c 100644 --- a/src/test/groovy/offtopic/KafkaSubscriberSpec.groovy +++ b/src/test/groovy/offtopic/KafkaSubscriberSpec.groovy @@ -5,7 +5,7 @@ import spock.lang.Specification class KafkaSubscriberSpec extends Specification { def "initializing"() { when: - def subscriber = new KafkaSubscriber('spock') + def subscriber = new KafkaSubscriber(null, 'spock', null) then: subscriber instanceof KafkaSubscriber diff --git a/src/test/groovy/offtopic/OfftopicClientSpec.groovy b/src/test/groovy/offtopic/OfftopicClientSpec.groovy new file mode 100644 index 0000000..2aa5392 --- /dev/null +++ b/src/test/groovy/offtopic/OfftopicClientSpec.groovy @@ -0,0 +1,13 @@ +package offtopic + +import spock.lang.Specification + +class OfftopicClientSpec extends Specification { + def "initialization should create a clientId"() { + when: + def client = new OfftopicClient() + + then: + client.clientId != 0 + } +} diff --git a/src/test/groovy/offtopic/curator/CuratorPoolSpec.groovy b/src/test/groovy/offtopic/curator/CuratorPoolSpec.groovy index b41b7e8..b334c06 100644 --- a/src/test/groovy/offtopic/curator/CuratorPoolSpec.groovy +++ b/src/test/groovy/offtopic/curator/CuratorPoolSpec.groovy @@ -4,7 +4,10 @@ import spock.lang.Specification class CuratorPoolSpec extends Specification { def "an instance should be an instance"() { - expect: - CuratorPool.instance instanceof CuratorPool + when: + CuratorPool.instance + + then: + thrown(Exception) } }