Add logback as a dependency and gut the printlns from the codebase

This commit is contained in:
R. Tyler Croy 2015-01-26 04:09:47 -08:00
parent 6d82735b3c
commit 49c2d7fc1d
6 changed files with 52 additions and 13 deletions

View File

@ -55,6 +55,9 @@ dependencies {
compile 'com.timgroup:java-statsd-client:3.0.1+' compile 'com.timgroup:java-statsd-client:3.0.1+'
/* Logback is to be used for logging through the app */
compile 'ch.qos.logback:logback-classic:1.1.2+'
testCompile 'org.spockframework:spock-core:0.7-groovy-2.0' testCompile 'org.spockframework:spock-core:0.7-groovy-2.0'
testCompile 'cglib:cglib-nodep:2.2.+' testCompile 'cglib:cglib-nodep:2.2.+'
} }

View File

@ -2,6 +2,10 @@ package com.github.lookout.verspaetung
import groovy.transform.TypeChecked import groovy.transform.TypeChecked
import java.util.concurrent.ConcurrentHashMap
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import kafka.cluster.Broker import kafka.cluster.Broker
import kafka.client.ClientUtils import kafka.client.ClientUtils
import kafka.consumer.SimpleConsumer import kafka.consumer.SimpleConsumer
@ -18,10 +22,11 @@ class KafkaPoller extends Thread {
private final String KAFKA_CLIENT_ID = 'VerspaetungClient' private final String KAFKA_CLIENT_ID = 'VerspaetungClient'
private final Integer KAFKA_TIMEOUT = (5 * 1000) private final Integer KAFKA_TIMEOUT = (5 * 1000)
private final Integer KAFKA_BUFFER = (100 * 1024) private final Integer KAFKA_BUFFER = (100 * 1024)
private final Logger logger = LoggerFactory.getLogger(KafkaPoller.class)
private Boolean keepRunning = true private Boolean keepRunning = true
private Boolean shouldReconnect = false private Boolean shouldReconnect = false
private HashMap<Integer, SimpleConsumer> brokerConsumerMap = [:] private ConcurrentHashMap<Integer, SimpleConsumer> brokerConsumerMap = [:]
private List<Broker> brokers = [] private List<Broker> brokers = []
private AbstractMap<TopicPartition, List<zk.ConsumerOffset>> consumersMap private AbstractMap<TopicPartition, List<zk.ConsumerOffset>> consumersMap
private List<Closure> onDelta = [] private List<Closure> onDelta = []
@ -31,8 +36,9 @@ class KafkaPoller extends Thread {
} }
void run() { void run() {
logger.info("Starting wait loop")
while (keepRunning) { while (keepRunning) {
println 'kafka poll' logger.debug("poll loop")
if (shouldReconnect) { if (shouldReconnect) {
reconnect() reconnect()
@ -46,8 +52,10 @@ class KafkaPoller extends Thread {
} }
} }
synchronized
void dumpMetadata() { void dumpMetadata() {
println 'dumping' logger.debug("dumping meta-data")
def topics = this.consumersMap.keySet().collect { TopicPartition k -> k.topic } def topics = this.consumersMap.keySet().collect { TopicPartition k -> k.topic }
def metadata = ClientUtils.fetchTopicMetadata(toScalaSet(new HashSet(topics)), def metadata = ClientUtils.fetchTopicMetadata(toScalaSet(new HashSet(topics)),
brokersSeq, brokersSeq,
@ -71,7 +79,7 @@ class KafkaPoller extends Thread {
} }
} }
println 'dumped' logger.debug("finished dumping meta-data")
} }
@ -90,7 +98,7 @@ class KafkaPoller extends Thread {
* Blocking reconnect to the Kafka brokers * Blocking reconnect to the Kafka brokers
*/ */
void reconnect() { void reconnect() {
println "reconnecting" logger.info("Creating SimpleConsumer connections for brokers")
this.brokers.each { Broker b -> this.brokers.each { Broker b ->
SimpleConsumer consumer = new SimpleConsumer(b.host, SimpleConsumer consumer = new SimpleConsumer(b.host,
b.port, b.port,

View File

@ -7,19 +7,22 @@ import java.util.concurrent.ConcurrentHashMap
import groovy.transform.TypeChecked import groovy.transform.TypeChecked
import com.timgroup.statsd.StatsDClient import com.timgroup.statsd.StatsDClient
import com.timgroup.statsd.NonBlockingStatsDClient import com.timgroup.statsd.NonBlockingDogStatsDClient
import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.TreeCache import org.apache.curator.framework.recipes.cache.TreeCache
import org.slf4j.Logger
import org.slf4j.LoggerFactory
//@TypeChecked //@TypeChecked
class Main { class Main {
private static final StatsDClient statsd = new NonBlockingStatsDClient('verspaetung', 'localhost', 8125) private static final StatsDClient statsd = new NonBlockingDogStatsDClient('verspaetung', '10.32.2.211', 8125)
private static final Logger logger = LoggerFactory.getLogger(Main.class)
static void main(String[] args) { static void main(String[] args) {
println "Running ${args}" logger.info("Running with: ${args}")
ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3) ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(args[0], retry) CuratorFramework client = CuratorFrameworkFactory.newClient(args[0], retry)
@ -32,7 +35,7 @@ class Main {
KafkaPoller poller = new KafkaPoller(consumers) KafkaPoller poller = new KafkaPoller(consumers)
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(consumers) StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(consumers)
consumerWatcher.onInitComplete << { consumerWatcher.onInitComplete << {
println "standard consumers initialized to ${consumers.size()} (topic, partition) tuples" logger.info("standard consumers initialized to ${consumers.size()} (topic, partition) tuples")
} }
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client) BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client)
@ -43,17 +46,22 @@ class Main {
cache.listenable.addListener(consumerWatcher) cache.listenable.addListener(consumerWatcher)
poller.onDelta << { String groupName, TopicPartition tp, Long delta -> poller.onDelta << { String groupName, TopicPartition tp, Long delta ->
statsd.recordGaugeValue("${tp.topic}.${tp.partition}.${groupName}", delta) statsd.recordGaugeValue(tp.topic, delta, [
'topic' : tp.topic,
'partition' : tp.partition,
'consumer-group' : groupName
])
} }
poller.start() poller.start()
brokerWatcher.start() brokerWatcher.start()
cache.start() cache.start()
println 'started..'
logger.info("Started wait loop...")
while (true) { Thread.sleep(1000) } while (true) { Thread.sleep(1000) }
println 'exiting..' logger.info("exiting..")
poller.die() poller.die()
poller.join() poller.join()
return return

View File

@ -8,6 +8,8 @@ import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.ChildData import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCacheListener import org.apache.curator.framework.recipes.cache.TreeCacheListener
import org.apache.curator.framework.recipes.cache.TreeCacheEvent import org.apache.curator.framework.recipes.cache.TreeCacheEvent
import org.slf4j.Logger
import org.slf4j.LoggerFactory
/** /**
* AbstractTreeWatcher defines the contract and base components for the various * AbstractTreeWatcher defines the contract and base components for the various
@ -19,10 +21,12 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent
abstract class AbstractTreeWatcher implements TreeCacheListener { abstract class AbstractTreeWatcher implements TreeCacheListener {
protected AbstractMap<TopicPartition, List<ConsumerOffset>> consumersMap protected AbstractMap<TopicPartition, List<ConsumerOffset>> consumersMap
protected List<Closure> onInitComplete protected List<Closure> onInitComplete
protected Logger logger
AbstractTreeWatcher(AbstractMap consumers) { AbstractTreeWatcher(AbstractMap consumers) {
this.consumersMap = consumers this.consumersMap = consumers
this.onInitComplete = [] this.onInitComplete = []
this.logger = LoggerFactory.getLogger(this.class)
} }
/** /**

View File

@ -11,6 +11,8 @@ import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.TreeCache import org.apache.curator.framework.recipes.cache.TreeCache
import org.apache.curator.framework.recipes.cache.TreeCacheListener import org.apache.curator.framework.recipes.cache.TreeCacheListener
import org.apache.curator.framework.recipes.cache.TreeCacheEvent import org.apache.curator.framework.recipes.cache.TreeCacheEvent
import org.slf4j.Logger
import org.slf4j.LoggerFactory
/** /**
* The BrokerTreeWatcher is a kind of watcher whose sole purpose is * The BrokerTreeWatcher is a kind of watcher whose sole purpose is
@ -21,6 +23,7 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent
class BrokerTreeWatcher implements TreeCacheListener { class BrokerTreeWatcher implements TreeCacheListener {
static final Integer INVALID_BROKER_ID = -1 static final Integer INVALID_BROKER_ID = -1
private final Logger logger = LoggerFactory.getLogger(BrokerTreeWatcher.class)
private JsonSlurper json private JsonSlurper json
private TreeCache cache private TreeCache cache
private final String BROKERS_PATH = '/brokers/ids' private final String BROKERS_PATH = '/brokers/ids'
@ -61,7 +64,6 @@ class BrokerTreeWatcher implements TreeCacheListener {
} }
if (event.type != TreeCacheEvent.Type.NODE_ADDED) { if (event.type != TreeCacheEvent.Type.NODE_ADDED) {
println event
return return
} }

View File

@ -0,0 +1,14 @@
scan()
import ch.qos.logback.classic.encoder.PatternLayoutEncoder
import ch.qos.logback.core.ConsoleAppender
import static ch.qos.logback.classic.Level.*
appender("STDOUT", ConsoleAppender) {
encoder(PatternLayoutEncoder) {
pattern = "%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n"
}
}
root(INFO, ["STDOUT"])