Make zookeeper hosts configurable, consumerIds random and a few more improvements

Slowly moving towards the OfftopicClient to hold state and configuration around
websocket connections

Fixes #4
Fixes #3
This commit is contained in:
R. Tyler Croy 2014-11-24 17:52:15 -08:00
parent a0cc416a53
commit 183f429ebd
12 changed files with 91 additions and 14 deletions

View File

@ -39,6 +39,7 @@ dependencies {
compile 'org.apache.kafka:kafka_2.10:0.8.1.1+' compile 'org.apache.kafka:kafka_2.10:0.8.1.1+'
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0' testCompile 'org.spockframework:spock-core:0.7-groovy-2.0'
testCompile 'cglib:cglib-nodep:2.2.+'
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -0,0 +1,18 @@
package offtopic
/**
* class for wrapping our system configuration
*/
@Singleton
class Configuration extends Properties {
/**
* Load defaults, starting in the current working directory, searching for
* 'offtopic.properties'
*/
public boolean loadDefaults() {
File cwdConfiguration = new File('offtopic.properties')
if (cwdConfiguration.exists()) {
this.load(new FileInputStream(cwdConfiguration))
}
}
}

View File

@ -14,9 +14,13 @@ class KafkaSubscriber {
String topic String topic
private Closure callback private Closure callback
private ConsumerConnector consumer private ConsumerConnector consumer
private String zookeepers
private String consumerId
public KafkaSubscriber(String topicName) { public KafkaSubscriber(String zks, String topicName, String consumerId) {
this.topic = topicName this.topic = topicName
this.zookeepers = zks
this.consumerId = consumerId
} }
public void setCallback(Closure theCallback) { public void setCallback(Closure theCallback) {
@ -25,7 +29,7 @@ class KafkaSubscriber {
public void connect() { public void connect() {
this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector( this.consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig('localhost:2181', 'offtopic-spock-test')) createConsumerConfig(this.zookeepers, this.consumerId))
} }
public void shutdown() { public void shutdown() {
@ -45,7 +49,7 @@ class KafkaSubscriber {
consumerMap.get(this.topic).each { stream -> consumerMap.get(this.topic).each { stream ->
def iterator = stream.iterator() def iterator = stream.iterator()
while (iterator.hasNext()) { while (iterator.hasNext()) {
this.callback.call(iterator.next().message()) this.callback.call(iterator.next())
} }
} }
} }

View File

@ -0,0 +1,9 @@
package offtopic
class OfftopicClient {
int clientId = 0
public OfftopicClient() {
this.clientId = new Random().nextInt()
}
}

View File

@ -2,10 +2,22 @@ package offtopic.curator
import org.apache.commons.pool2.impl.GenericObjectPool import org.apache.commons.pool2.impl.GenericObjectPool
@Singleton(strict=false)
class CuratorPool extends GenericObjectPool<CuratorClient>{ class CuratorPool extends GenericObjectPool<CuratorClient>{
private CuratorPool() { private static CuratorPool instance = null
/** XXX: Figure out how to get ZK from settings */
super(new CuratorClientObjectFactory('localhost:2181')) public static void prepare(String zookeepers) {
this.instance = new CuratorPool(zookeepers)
}
public static CuratorPool getInstance() {
if (this.instance == null) {
throw new Exception("Cannot access CuratorPool before prepare() has been called")
}
return this.instance
}
private CuratorPool(String zookeepers) {
super(new CuratorClientObjectFactory(zookeepers))
println "CREATING WITH ${zookeepers}"
} }
} }

View File

@ -0,0 +1 @@
zookeepers=localhost:2181

View File

@ -18,7 +18,7 @@ function watchTopic(name) {
var el = "<div id='" + el_id + "' class='list-group-item'>" + data.raw + "<br/><div id='" + el_id + "_b64' style='display:none;'><pre>" + data.b64 + "</pre></div>"; var el = "<div id='" + el_id + "' class='list-group-item'>" + data.raw + "<br/><div id='" + el_id + "_b64' style='display:none;'><pre>" + data.b64 + "</pre></div>";
messages.prepend(el); messages.prepend(el);
$("#" + el_id).click(function(ev) { $("#" + el_id).click(function(ev) {
$("#"+el_id+'_b64').show(); $("#"+el_id+'_b64').toggle();
}); });
// Let's only keep the last 25 // Let's only keep the last 25

View File

@ -8,9 +8,13 @@ import static ratpack.websocket.WebSockets.websocket
import offtopic.KafkaService import offtopic.KafkaService
import offtopic.KafkaSubscriber import offtopic.KafkaSubscriber
import offtopic.Configuration
import offtopic.OfftopicClient
ratpack { ratpack {
bindings { bindings {
offtopic.Configuration.instance.loadDefaults()
offtopic.curator.CuratorPool.prepare(Configuration.instance.zookeepers)
add new HandlebarsModule() add new HandlebarsModule()
add new JacksonModule() add new JacksonModule()
} }
@ -32,6 +36,7 @@ ratpack {
} }
get('topics/:name') { get('topics/:name') {
println offtopic.Configuration.instance['offtopic.zookeepers']
render "Fetching info for ${pathTokens.name}" render "Fetching info for ${pathTokens.name}"
} }
@ -40,7 +45,10 @@ ratpack {
} }
get('topics/:name/websocket') { ctx -> get('topics/:name/websocket') { ctx ->
subscriber = new KafkaSubscriber(pathTokens.name) client = new OfftopicClient()
subscriber = new KafkaSubscriber(Configuration.instance.zookeepers,
pathTokens.name,
"offtopic-${client.clientId}")
runner = new Thread({ runner = new Thread({
subscriber.connect() subscriber.connect()
println "subscriber connected" println "subscriber connected"
@ -52,8 +60,10 @@ ratpack {
println "Connected ${ws} ${subscriber}" println "Connected ${ws} ${subscriber}"
subscriber.callback = { msg -> subscriber.callback = { msg ->
println "called back with: ${msg}" println "called back with: ${msg}"
ws.send(groovy.json.JsonOutput.toJson(['raw' : new String(msg), ws.send(groovy.json.JsonOutput.toJson(['raw' : new String(msg.message()),
'b64' : msg.encodeBase64().toString()])) 'b64' : msg.message().encodeBase64().toString(),
'topic' : msg.topic(),
'tstamp' : System.currentTimeMillis()]))
println "sent message" println "sent message"
} }
runner.start() runner.start()

View File

@ -0,0 +1,6 @@
package offtopic
import spock.lang.Specification
class ConfigurationSpec extends Specification {
}

View File

@ -5,7 +5,7 @@ import spock.lang.Specification
class KafkaSubscriberSpec extends Specification { class KafkaSubscriberSpec extends Specification {
def "initializing"() { def "initializing"() {
when: when:
def subscriber = new KafkaSubscriber('spock') def subscriber = new KafkaSubscriber(null, 'spock', null)
then: then:
subscriber instanceof KafkaSubscriber subscriber instanceof KafkaSubscriber

View File

@ -0,0 +1,13 @@
package offtopic
import spock.lang.Specification
class OfftopicClientSpec extends Specification {
def "initialization should create a clientId"() {
when:
def client = new OfftopicClient()
then:
client.clientId != 0
}
}

View File

@ -4,7 +4,10 @@ import spock.lang.Specification
class CuratorPoolSpec extends Specification { class CuratorPoolSpec extends Specification {
def "an instance should be an instance"() { def "an instance should be an instance"() {
expect: when:
CuratorPool.instance instanceof CuratorPool CuratorPool.instance
then:
thrown(Exception)
} }
} }