mirror of https://github.com/reiseburo/offtopic
parent
9e23d4f9b6
commit
4dc819f884
|
@ -44,6 +44,9 @@ dependencies {
|
|||
compile 'org.apache.curator:curator-framework:[2.7.1,2.8)'
|
||||
compile 'org.apache.commons:commons-pool2:[2.2,3.0)'
|
||||
|
||||
/* For logging at runtime */
|
||||
compile 'ch.qos.logback:logback-parent:[1.1.3,2.0)'
|
||||
|
||||
compile 'org.apache.kafka:kafka_2.11:0.8.2.1'
|
||||
// Forcing us up to ZK 3.5 to prevent wacky classpath errors when mixing
|
||||
// and matching dependencies
|
||||
|
|
|
@ -1,9 +1,12 @@
|
|||
package offtopic
|
||||
|
||||
import groovy.util.logging.Slf4j
|
||||
|
||||
/**
|
||||
* class for wrapping our system configuration
|
||||
*/
|
||||
@Singleton
|
||||
@Slf4j
|
||||
class Configuration extends Properties {
|
||||
/**
|
||||
* Load defaults, starting in the current working directory, searching for
|
||||
|
@ -12,7 +15,11 @@ class Configuration extends Properties {
|
|||
public boolean loadDefaults() {
|
||||
File cwdConfiguration = new File('offtopic.properties')
|
||||
if (cwdConfiguration.exists()) {
|
||||
log.info('Loading `offtopic.properties` for configuration')
|
||||
this.load(new FileInputStream(cwdConfiguration))
|
||||
}
|
||||
else {
|
||||
log.warn('Could not load configuration file {}', cwdConfiguration.absolutePath)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,36 +1,40 @@
|
|||
package offtopic
|
||||
|
||||
import groovy.json.JsonSlurper
|
||||
import groovy.util.logging.Slf4j
|
||||
import offtopic.curator.CuratorPool
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
|
||||
/**
|
||||
* KafkaService is a helper class to help expose data about Kafka to the
|
||||
* Ratpack app
|
||||
*/
|
||||
@Slf4j
|
||||
class KafkaService {
|
||||
final static String BROKERS_PATH = "/brokers/ids"
|
||||
final static String TOPICS_PATH = "/brokers/topics"
|
||||
|
||||
public static ArrayList fetchBrokers() {
|
||||
def brokers = new ArrayList()
|
||||
def parser = new JsonSlurper()
|
||||
public static List<String> fetchBrokers() {
|
||||
JsonSlurper parser = new JsonSlurper()
|
||||
List<String> brokers = []
|
||||
|
||||
CuratorPool.withCurator { c ->
|
||||
c.getChildren().forPath(BROKERS_PATH).each { String id ->
|
||||
CuratorPool.withCurator { CuratorFramework c ->
|
||||
c.children.forPath(BROKERS_PATH).each { String id ->
|
||||
// Pulling this into a String buffer since parse(byte[]) is
|
||||
// throwing a stackoverflow error
|
||||
String buffer = new String(c.getData().forPath("${BROKERS_PATH}/${id}"))
|
||||
String buffer = new String(c.data.forPath("${BROKERS_PATH}/${id}"))
|
||||
brokers.add(parser.parseText(buffer))
|
||||
}
|
||||
}
|
||||
|
||||
log.info('Fetched brokers from Zookeeper: {}', brokers)
|
||||
return brokers
|
||||
}
|
||||
|
||||
public static ArrayList fetchTopics() {
|
||||
ArrayList brokers = null
|
||||
public static List<String> fetchTopics() {
|
||||
List brokers = []
|
||||
CuratorPool.withCurator { c ->
|
||||
brokers = c.getChildren().forPath(TOPICS_PATH)
|
||||
brokers = c.children.forPath(TOPICS_PATH)
|
||||
}
|
||||
return brokers
|
||||
}
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
package offtopic.curator
|
||||
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.retry.ExponentialBackoffRetry
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory
|
||||
|
||||
class CuratorClient {
|
||||
def client = null
|
||||
CuratorFramework client
|
||||
|
||||
public CuratorClient(String zookeepers) {
|
||||
if (zookeepers?.length() <= 0) {
|
||||
|
|
|
@ -21,7 +21,7 @@ class CuratorPool extends GenericObjectPool<CuratorClient>{
|
|||
}
|
||||
|
||||
public static void withCurator(Closure closure) {
|
||||
def curator = null
|
||||
CuratorClient curator = null
|
||||
try {
|
||||
curator = CuratorPool.instance.borrowObject()
|
||||
closure.call(curator.client)
|
||||
|
|
|
@ -1,13 +1,16 @@
|
|||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
import ratpack.handlebars.HandlebarsModule
|
||||
import ratpack.jackson.JacksonModule
|
||||
|
||||
import java.util.logging.Level
|
||||
|
||||
import static ratpack.handlebars.Template.handlebarsTemplate
|
||||
import static ratpack.jackson.Jackson.json
|
||||
import static ratpack.groovy.Groovy.*
|
||||
import static ratpack.websocket.WebSockets.websocket
|
||||
|
||||
import offtopic.KafkaService
|
||||
import offtopic.KafkaSubscriber
|
||||
import offtopic.Configuration
|
||||
import offtopic.OfftopicClient
|
||||
|
||||
|
@ -26,7 +29,6 @@ ratpack {
|
|||
|
||||
get('topics') {
|
||||
topics = KafkaService.fetchTopics()
|
||||
println topics
|
||||
|
||||
if (request.headers.get('Content-Type') == 'application/json') {
|
||||
render(json(topics))
|
||||
|
@ -45,19 +47,20 @@ ratpack {
|
|||
}
|
||||
|
||||
get('topics/:name/websocket') { ctx ->
|
||||
def client = new OfftopicClient(Configuration.instance)
|
||||
def grepper = null
|
||||
OfftopicClient client = new OfftopicClient(Configuration.instance)
|
||||
String grepper = null
|
||||
|
||||
websocket(ctx) { ws ->
|
||||
println "Connected ${ws}"
|
||||
Logger log = LoggerFactory.getLogger('WebSocket')
|
||||
log.info('Connected client {}', ws)
|
||||
client.onMessageCallback = { m ->
|
||||
println "called back with ${m} (grep: ${grepper})"
|
||||
log.debug('Callback with {] (grep: {})', m, grepper)
|
||||
if ((grepper == null) || (m.raw =~ grepper)) {
|
||||
ws.send(groovy.json.JsonOutput.toJson(m))
|
||||
}
|
||||
}
|
||||
client.createSubscribersFor(pathTokens.name)
|
||||
print "subscribers created for ${pathTokens.name}"
|
||||
log.info('Subscribers created for {}', pathTokens.name)
|
||||
client.startSubscribers()
|
||||
} connect { sock ->
|
||||
sock.onClose {
|
||||
|
@ -80,18 +83,6 @@ ratpack {
|
|||
}
|
||||
}
|
||||
|
||||
/* set up a demo/dummy websocket listener */
|
||||
get("ws") { context ->
|
||||
websocket(context) { ws ->
|
||||
} connect {
|
||||
it.onClose {
|
||||
println "closing"
|
||||
} onMessage {
|
||||
"client sent me ${it}"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fileSystem "public", { f -> f.files() }
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue