diff --git a/build.gradle b/build.gradle index 0295415..d0bdb37 100644 --- a/build.gradle +++ b/build.gradle @@ -55,6 +55,9 @@ dependencies { compile 'com.timgroup:java-statsd-client:3.0.1+' + /* Logback is to be used for logging through the app */ + compile 'ch.qos.logback:logback-classic:1.1.2+' + testCompile 'org.spockframework:spock-core:0.7-groovy-2.0' testCompile 'cglib:cglib-nodep:2.2.+' } diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index 5b560e1..b3dd4b7 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -2,6 +2,10 @@ package com.github.lookout.verspaetung import groovy.transform.TypeChecked +import java.util.concurrent.ConcurrentHashMap +import org.slf4j.Logger +import org.slf4j.LoggerFactory + import kafka.cluster.Broker import kafka.client.ClientUtils import kafka.consumer.SimpleConsumer @@ -18,10 +22,11 @@ class KafkaPoller extends Thread { private final String KAFKA_CLIENT_ID = 'VerspaetungClient' private final Integer KAFKA_TIMEOUT = (5 * 1000) private final Integer KAFKA_BUFFER = (100 * 1024) + private final Logger logger = LoggerFactory.getLogger(KafkaPoller.class) private Boolean keepRunning = true private Boolean shouldReconnect = false - private HashMap brokerConsumerMap = [:] + private ConcurrentHashMap brokerConsumerMap = [:] private List brokers = [] private AbstractMap> consumersMap private List onDelta = [] @@ -31,8 +36,9 @@ class KafkaPoller extends Thread { } void run() { + logger.info("Starting wait loop") while (keepRunning) { - println 'kafka poll' + logger.debug("poll loop") if (shouldReconnect) { reconnect() @@ -46,8 +52,10 @@ class KafkaPoller extends Thread { } } + synchronized void dumpMetadata() { - println 'dumping' + logger.debug("dumping meta-data") + def topics = this.consumersMap.keySet().collect { TopicPartition k -> k.topic } def metadata = ClientUtils.fetchTopicMetadata(toScalaSet(new HashSet(topics)), brokersSeq, @@ -71,7 +79,7 @@ class KafkaPoller extends Thread { } } - println 'dumped' + logger.debug("finished dumping meta-data") } @@ -90,7 +98,7 @@ class KafkaPoller extends Thread { * Blocking reconnect to the Kafka brokers */ void reconnect() { - println "reconnecting" + logger.info("Creating SimpleConsumer connections for brokers") this.brokers.each { Broker b -> SimpleConsumer consumer = new SimpleConsumer(b.host, b.port, diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index 353be32..55fcef8 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -7,19 +7,22 @@ import java.util.concurrent.ConcurrentHashMap import groovy.transform.TypeChecked import com.timgroup.statsd.StatsDClient -import com.timgroup.statsd.NonBlockingStatsDClient +import com.timgroup.statsd.NonBlockingDogStatsDClient import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.TreeCache +import org.slf4j.Logger +import org.slf4j.LoggerFactory //@TypeChecked class Main { - private static final StatsDClient statsd = new NonBlockingStatsDClient('verspaetung', 'localhost', 8125) + private static final StatsDClient statsd = new NonBlockingDogStatsDClient('verspaetung', '10.32.2.211', 8125) + private static final Logger logger = LoggerFactory.getLogger(Main.class) static void main(String[] args) { - println "Running ${args}" + logger.info("Running with: ${args}") ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3) CuratorFramework client = CuratorFrameworkFactory.newClient(args[0], retry) @@ -32,7 +35,7 @@ class Main { KafkaPoller poller = new KafkaPoller(consumers) StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(consumers) consumerWatcher.onInitComplete << { - println "standard consumers initialized to ${consumers.size()} (topic, partition) tuples" + logger.info("standard consumers initialized to ${consumers.size()} (topic, partition) tuples") } BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client) @@ -43,17 +46,22 @@ class Main { cache.listenable.addListener(consumerWatcher) poller.onDelta << { String groupName, TopicPartition tp, Long delta -> - statsd.recordGaugeValue("${tp.topic}.${tp.partition}.${groupName}", delta) + statsd.recordGaugeValue(tp.topic, delta, [ + 'topic' : tp.topic, + 'partition' : tp.partition, + 'consumer-group' : groupName + ]) } poller.start() brokerWatcher.start() cache.start() - println 'started..' + + logger.info("Started wait loop...") while (true) { Thread.sleep(1000) } - println 'exiting..' + logger.info("exiting..") poller.die() poller.join() return diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy index e550e1a..f66a418 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy @@ -8,6 +8,8 @@ import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.ChildData import org.apache.curator.framework.recipes.cache.TreeCacheListener import org.apache.curator.framework.recipes.cache.TreeCacheEvent +import org.slf4j.Logger +import org.slf4j.LoggerFactory /** * AbstractTreeWatcher defines the contract and base components for the various @@ -19,10 +21,12 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent abstract class AbstractTreeWatcher implements TreeCacheListener { protected AbstractMap> consumersMap protected List onInitComplete + protected Logger logger AbstractTreeWatcher(AbstractMap consumers) { this.consumersMap = consumers this.onInitComplete = [] + this.logger = LoggerFactory.getLogger(this.class) } /** diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy index 6ec8a51..32fa290 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy @@ -11,6 +11,8 @@ import org.apache.curator.framework.recipes.cache.ChildData import org.apache.curator.framework.recipes.cache.TreeCache import org.apache.curator.framework.recipes.cache.TreeCacheListener import org.apache.curator.framework.recipes.cache.TreeCacheEvent +import org.slf4j.Logger +import org.slf4j.LoggerFactory /** * The BrokerTreeWatcher is a kind of watcher whose sole purpose is @@ -21,6 +23,7 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent class BrokerTreeWatcher implements TreeCacheListener { static final Integer INVALID_BROKER_ID = -1 + private final Logger logger = LoggerFactory.getLogger(BrokerTreeWatcher.class) private JsonSlurper json private TreeCache cache private final String BROKERS_PATH = '/brokers/ids' @@ -61,7 +64,6 @@ class BrokerTreeWatcher implements TreeCacheListener { } if (event.type != TreeCacheEvent.Type.NODE_ADDED) { - println event return } diff --git a/src/main/resources/logback.groovy b/src/main/resources/logback.groovy new file mode 100644 index 0000000..545d110 --- /dev/null +++ b/src/main/resources/logback.groovy @@ -0,0 +1,14 @@ +scan() + +import ch.qos.logback.classic.encoder.PatternLayoutEncoder +import ch.qos.logback.core.ConsoleAppender + +import static ch.qos.logback.classic.Level.* + +appender("STDOUT", ConsoleAppender) { + encoder(PatternLayoutEncoder) { + pattern = "%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" + } +} +root(INFO, ["STDOUT"]) +