diff --git a/build.gradle b/build.gradle index 79a7a69..8f53b4d 100644 --- a/build.gradle +++ b/build.gradle @@ -3,6 +3,7 @@ plugins { id 'com.github.johnrengelman.shadow' version '1.2.0' id "org.ajoberstar.github-pages" version "1.2.0" id "org.asciidoctor.gradle.asciidoctor" version "1.5.1" + id 'codenarc' id 'groovy' id 'application' } @@ -63,6 +64,8 @@ dependencies { /* Logback is to be used for logging through the app */ compile 'ch.qos.logback:logback-classic:1.1.2+' + codenarc "org.codenarc:CodeNarc:0.24" + testCompile 'org.spockframework:spock-core:0.7-groovy-2.0' testCompile 'cglib:cglib-nodep:2.2.+' } @@ -92,6 +95,14 @@ githubPages { //////////////////////////////////////////////////////////////////////////////// +codenarc { + configFile file("${projectDir}/gradle/codenarc.rules") + sourceSets = [sourceSets.main] +} +codenarcMain { + exclude '**/Main.groovy' +} + shadowJar { exclude 'META-INF/*.RSA', 'META-INF/*.DSA' manifest { diff --git a/gradle.properties b/gradle.properties index ee1405f..46ceb0c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,4 +1,5 @@ -org.gradle.daemon=true +# Disabling daemon mode since it makes PermGen leaks with codenarc more common +org.gradle.daemon=false bintrayUser= bintrayKey= sourceCompatibility=1.7 diff --git a/gradle/codenarc.rules b/gradle/codenarc.rules new file mode 100644 index 0000000..c1edcd2 --- /dev/null +++ b/gradle/codenarc.rules @@ -0,0 +1,379 @@ +ruleset { + + description ''' + A Sample Groovy RuleSet containing all CodeNarc Rules + You can use this as a template for your own custom RuleSet. + Just delete the rules that you don't want to include. + ''' + + AbcComplexity // DEPRECATED: Use the AbcMetric rule instead. Requires the GMetrics jar + AbcMetric // Requires the GMetrics jar + AbstractClassName + AbstractClassWithPublicConstructor + AbstractClassWithoutAbstractMethod + AddEmptyString + AssertWithinFinallyBlock + AssignCollectionSort + AssignCollectionUnique + AssignmentInConditional + AssignmentToStaticFieldFromInstanceMethod + BigDecimalInstantiation + BitwiseOperatorInConditional + BlankLineBeforePackage + BooleanGetBoolean + BooleanMethodReturnsNull + BracesForClass + BracesForForLoop + BracesForIfElse + BracesForMethod + BracesForTryCatchFinally + BrokenNullCheck + BrokenOddnessCheck + BuilderMethodWithSideEffects + BusyWait + CatchArrayIndexOutOfBoundsException + CatchError + CatchException + CatchIllegalMonitorStateException + CatchIndexOutOfBoundsException + CatchNullPointerException + CatchRuntimeException + CatchThrowable + ChainedTest + ClassForName + ClassJavadoc + ClassName + ClassNameSameAsFilename + ClassNameSameAsSuperclass + ClassSize + CloneWithoutCloneable + CloneableWithoutClone + CloseWithoutCloseable + ClosureAsLastMethodParameter + ClosureStatementOnOpeningLineOfMultipleLineClosure + CollectAllIsDeprecated + CompareToWithoutComparable + ComparisonOfTwoConstants + ComparisonWithSelf + ConfusingClassNamedException + ConfusingMethodName + ConfusingMultipleReturns + ConfusingTernary + ConsecutiveBlankLines + ConsecutiveLiteralAppends + ConsecutiveStringConcatenation + ConstantAssertExpression + ConstantIfExpression + ConstantTernaryExpression + ConstantsOnlyInterface + CouldBeElvis + CoupledTestCase + CrapMetric // Requires the GMetrics jar and a Cobertura coverage file + CyclomaticComplexity // Requires the GMetrics jar + DeadCode + DirectConnectionManagement + DoubleCheckedLocking + DoubleNegative + DuplicateCaseStatement + DuplicateImport + DuplicateListLiteral + DuplicateMapKey + DuplicateMapLiteral + /* + This is a pointless check + DuplicateNumberLiteral + /* + DuplicateSetValue + DuplicateStringLiteral + ElseBlockBraces + EmptyCatchBlock + EmptyClass + EmptyElseBlock + EmptyFinallyBlock + EmptyForStatement + EmptyIfStatement + EmptyInstanceInitializer + EmptyMethod + EmptyMethodInAbstractClass + EmptyStaticInitializer + EmptySwitchStatement + EmptySynchronizedStatement + EmptyTryBlock + EmptyWhileStatement + EnumCustomSerializationIgnored + EqualsAndHashCode + EqualsOverloaded + ExceptionExtendsError + ExceptionExtendsThrowable + ExceptionNotThrown + ExplicitArrayListInstantiation + ExplicitCallToAndMethod + ExplicitCallToCompareToMethod + ExplicitCallToDivMethod + ExplicitCallToEqualsMethod + ExplicitCallToGetAtMethod + ExplicitCallToLeftShiftMethod + ExplicitCallToMinusMethod + ExplicitCallToModMethod + ExplicitCallToMultiplyMethod + ExplicitCallToOrMethod + ExplicitCallToPlusMethod + ExplicitCallToPowerMethod + ExplicitCallToRightShiftMethod + ExplicitCallToXorMethod + ExplicitGarbageCollection + ExplicitHashMapInstantiation + ExplicitHashSetInstantiation + ExplicitLinkedHashMapInstantiation + ExplicitLinkedListInstantiation + ExplicitStackInstantiation + ExplicitTreeSetInstantiation + FactoryMethodName + FieldName + FileCreateTempFile + FileEndsWithoutNewline + FinalClassWithProtectedMember + ForLoopShouldBeWhileLoop + ForStatementBraces + GStringAsMapKey + GStringExpressionWithinString + GetterMethodCouldBeProperty + GrailsDomainHasEquals + GrailsDomainHasToString + GrailsDomainReservedSqlKeywordName + GrailsDomainWithServiceReference + GrailsDuplicateConstraint + GrailsDuplicateMapping + GrailsMassAssignment + GrailsPublicControllerMethod + GrailsServletContextReference + GrailsSessionReference // DEPRECATED + GrailsStatelessService + GroovyLangImmutable + HardCodedWindowsFileSeparator + HardCodedWindowsRootDirectory + HashtableIsObsolete + IfStatementBraces + /* + "Uh yes, we'd like you to make your code more unreadable" + IfStatementCouldBeTernary + */ + IllegalClassMember + IllegalClassReference + IllegalPackageReference + IllegalRegex + IllegalString + IllegalSubclass + ImplementationAsType + ImportFromSamePackage + ImportFromSunPackages + InconsistentPropertyLocking + InconsistentPropertySynchronization + InsecureRandom + /* + instanceof is useful in Groovy, not sure why this check is here + Instanceof + */ + IntegerGetInteger + InterfaceName + InterfaceNameSameAsSuperInterface + InvertedIfElse + JUnitAssertAlwaysFails + JUnitAssertAlwaysSucceeds + JUnitAssertEqualsConstantActualValue + JUnitFailWithoutMessage + JUnitLostTest + JUnitPublicField + JUnitPublicNonTestMethod + JUnitPublicProperty + JUnitSetUpCallsSuper + JUnitStyleAssertions + JUnitTearDownCallsSuper + JUnitTestMethodWithoutAssert + JUnitUnnecessarySetUp + JUnitUnnecessaryTearDown + JUnitUnnecessaryThrowsException + JavaIoPackageAccess + JdbcConnectionReference + JdbcResultSetReference + JdbcStatementReference + LineLength + LocaleSetDefault + LoggerForDifferentClass + LoggerWithWrongModifiers + LoggingSwallowsStacktrace + LongLiteralWithLowerCaseL + MethodCount + MethodName + MethodSize + MisorderedStaticImports + MissingBlankLineAfterImports + MissingBlankLineAfterPackage + MissingNewInThrowStatement + MultipleLoggers + MultipleUnaryOperators + NestedBlockDepth + NestedForLoop + NestedSynchronization + NoDef + NoWildcardImports + NonFinalPublicField + NonFinalSubclassOfSensitiveInterface + ObjectFinalize + ObjectOverrideMisspelledMethodName + PackageName + PackageNameMatchesFilePath + ParameterCount + ParameterName + ParameterReassignment + PrintStackTrace + Println + PrivateFieldCouldBeFinal + PropertyName + PublicFinalizeMethod + PublicInstanceField + RandomDoubleCoercedToZero + RemoveAllOnSelf + RequiredRegex + RequiredString + ReturnFromFinallyBlock + ReturnNullFromCatchBlock + ReturnsNullInsteadOfEmptyArray + ReturnsNullInsteadOfEmptyCollection + SerialPersistentFields + SerialVersionUID + /* Not needed + SerializableClassMustDefineSerialVersionUID + */ + SimpleDateFormatMissingLocale + SpaceAfterCatch + SpaceAfterClosingBrace + SpaceAfterComma + SpaceAfterFor + SpaceAfterIf + SpaceAfterOpeningBrace + SpaceAfterSemicolon + SpaceAfterSwitch + SpaceAfterWhile + SpaceAroundClosureArrow + /* + I prefer a little extra space + SpaceAroundMapEntryColon + */ + SpaceAroundOperator + SpaceBeforeClosingBrace + SpaceBeforeOpeningBrace + SpockIgnoreRestUsed + StatelessClass + StatelessSingleton + StaticCalendarField + StaticConnection + StaticDateFormatField + StaticMatcherField + StaticSimpleDateFormatField + SwallowThreadDeath + SynchronizedMethod + SynchronizedOnBoxedPrimitive + SynchronizedOnGetClass + SynchronizedOnReentrantLock + SynchronizedOnString + SynchronizedOnThis + SynchronizedReadObjectMethod + SystemErrPrint + SystemExit + SystemOutPrint + SystemRunFinalizersOnExit + TernaryCouldBeElvis + ThisReferenceEscapesConstructor + ThreadGroup + ThreadLocalNotStaticFinal + ThreadYield + ThrowError + ThrowException + ThrowExceptionFromFinallyBlock + ThrowNullPointerException + ThrowRuntimeException + ThrowThrowable + ToStringReturnsNull + TrailingWhitespace + UnnecessaryBigDecimalInstantiation + UnnecessaryBigIntegerInstantiation + UnnecessaryBooleanExpression + UnnecessaryBooleanInstantiation + UnnecessaryCallForLastElement + UnnecessaryCallToSubstring + UnnecessaryCast + UnnecessaryCatchBlock + UnnecessaryCollectCall + UnnecessaryCollectionCall + UnnecessaryConstructor + UnnecessaryDefInFieldDeclaration + UnnecessaryDefInMethodDeclaration + UnnecessaryDefInVariableDeclaration + UnnecessaryDotClass + UnnecessaryDoubleInstantiation + UnnecessaryElseStatement + UnnecessaryFail + UnnecessaryFinalOnPrivateMethod + UnnecessaryFloatInstantiation + UnnecessaryGString + UnnecessaryGetter + UnnecessaryGroovyImport + UnnecessaryIfStatement + UnnecessaryInstanceOfCheck + UnnecessaryInstantiationToGetClass + UnnecessaryIntegerInstantiation + UnnecessaryLongInstantiation + UnnecessaryModOne + UnnecessaryNullCheck + UnnecessaryNullCheckBeforeInstanceOf + UnnecessaryObjectReferences + UnnecessaryOverridingMethod + UnnecessaryPackageReference + UnnecessaryParenthesesForMethodCallWithClosure + UnnecessaryPublicModifier + /* + I quite like explicit return estatements + UnnecessaryReturnKeyword + */ + UnnecessarySafeNavigationOperator + UnnecessarySelfAssignment + UnnecessarySemicolon + UnnecessaryStringInstantiation + UnnecessarySubstring + UnnecessaryTernaryExpression + UnnecessaryToString + UnnecessaryTransientModifier + UnsafeArrayDeclaration + UnsafeImplementationAsMap + UnusedArray + UnusedImport + /* + Some interfaces require no use for the method parameter + UnusedMethodParameter + */ + UnusedObject + UnusedPrivateField + UnusedPrivateMethod + UnusedPrivateMethodParameter + UnusedVariable + UseAssertEqualsInsteadOfAssertTrue + UseAssertFalseInsteadOfNegation + UseAssertNullInsteadOfAssertEquals + UseAssertSameInsteadOfAssertTrue + UseAssertTrueInsteadOfAssertEquals + UseAssertTrueInsteadOfNegation + UseCollectMany + UseCollectNested + UseOfNotifyMethod + VariableName + VectorIsObsolete + VolatileArrayField + VolatileLongOrDoubleField + WaitOutsideOfWhileLoop + WhileStatementBraces +} + +doNotApplyToClassNames = 'Main' + +// vim: ft=groovy diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaBroker.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaBroker.groovy index c22434e..93053d7 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaBroker.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaBroker.groovy @@ -4,9 +4,9 @@ package com.github.lookout.verspaetung * POJO containing the necessary information to model a Kafka broker */ class KafkaBroker { - private String host - private Integer port - private Integer brokerId + final private String host + final private Integer port + final private Integer brokerId KafkaBroker(Object jsonObject, Integer brokerId) { this.host = jsonObject.host diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaConsumer.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaConsumer.groovy index a7db815..b2f51bf 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaConsumer.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaConsumer.groovy @@ -39,13 +39,9 @@ class KafkaConsumer { return false } - if ((this.topic == compared.topic) && + return (this.topic == compared.topic) && (this.partition == compared.partition) && - (this.name == compared.name)) { - return true - } - - return false + (this.name == compared.name) } } diff --git a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy index 45acaaa..27b3f90 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/KafkaPoller.groovy @@ -1,8 +1,5 @@ package com.github.lookout.verspaetung -import groovy.transform.TypeChecked - -import java.util.concurrent.ConcurrentHashMap import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -11,24 +8,28 @@ import kafka.client.ClientUtils import kafka.consumer.SimpleConsumer import kafka.common.TopicAndPartition import kafka.common.KafkaException -import kafka.javaapi.* /* UGH */ import scala.collection.JavaConversions +/** + * KafkaPoller is a relatively simple class which contains a runloop for periodically + * contacting the Kafka brokers defined in Zookeeper and fetching the latest topic + * meta-data for them + */ class KafkaPoller extends Thread { - private final Integer POLLER_DELAY = (1 * 1000) - private final String KAFKA_CLIENT_ID = 'VerspaetungClient' - private final Integer KAFKA_TIMEOUT = (5 * 1000) - private final Integer KAFKA_BUFFER = (100 * 1024) - private final Logger logger = LoggerFactory.getLogger(KafkaPoller.class) + private static final Integer POLLER_DELAY = (1 * 1000) + private static final String KAFKA_CLIENT_ID = 'VerspaetungClient' + private static final Integer KAFKA_TIMEOUT = (5 * 1000) + private static final Integer KAFKA_BUFFER = (100 * 1024) + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPoller) private Boolean keepRunning = true private Boolean shouldReconnect = false - private ConcurrentHashMap brokerConsumerMap - private AbstractMap topicOffsetMap + private final AbstractMap brokerConsumerMap + private final AbstractMap topicOffsetMap + private final List onDelta + private final AbstractSet currentTopics private List brokers - private List onDelta - private AbstractSet currentTopics KafkaPoller(AbstractMap map, AbstractSet topicSet) { this.topicOffsetMap = map @@ -38,11 +39,18 @@ class KafkaPoller extends Thread { this.onDelta = [] } + /* There are a number of cases where we intentionally swallow stacktraces + * to ensure that we're not unnecessarily spamming logs with useless stacktraces + * + * For CatchException, we're explicitly gobbling up all potentially crashing exceptions + * here + */ + @SuppressWarnings(['LoggingSwallowsStacktrace', 'CatchException']) void run() { - logger.info("Starting wait loop") + LOGGER.info('Starting wait loop') while (keepRunning) { - logger.debug("poll loop") + LOGGER.debug('poll loop') if (shouldReconnect) { reconnect() @@ -56,10 +64,10 @@ class KafkaPoller extends Thread { dumpMetadata() } catch (KafkaException kex) { - logger.error("Failed to interact with Kafka: ${kex.message}") + LOGGER.error('Failed to interact with Kafka: {}', kex.message) } catch (Exception ex) { - logger.error("Failed to fetch and dump Kafka metadata", ex) + LOGGER.error('Failed to fetch and dump Kafka metadata', ex) } } @@ -67,21 +75,23 @@ class KafkaPoller extends Thread { } } + @SuppressWarnings(['CatchException']) void dumpMetadata() { - logger.debug("dumping meta-data") + LOGGER.debug('dumping meta-data') - def metadata = fetchMetadataForCurrentTopics() + Object metadata = fetchMetadataForCurrentTopics() withTopicsAndPartitions(metadata) { tp, p -> try { captureLatestOffsetFor(tp, p) } catch (Exception ex) { - logger.error("Failed to fetch latest for ${tp.topic}:${tp.partition}", ex) + LOGGER.error('Failed to fetch latest for {}:{}', tp.topic, tp.partition, ex) + } } - logger.debug("finished dumping meta-data") + LOGGER.debug('finished dumping meta-data') } /** @@ -100,7 +110,6 @@ class KafkaPoller extends Thread { } } - /** * Fetch the leader metadata and update our data structures */ @@ -120,7 +129,7 @@ class KafkaPoller extends Thread { * we might not have gotten valid data back from Zookeeper */ if (!(consumer instanceof SimpleConsumer)) { - logger.warn("Attempted to the leaderId: ${leaderId} (${topic}/${partition}") + LOGGER.warn('Attempted to the leaderId: {} ({}/{})', leaderId, topic, partition) return 0 } TopicAndPartition topicAndPart = new TopicAndPartition(topic, partition) @@ -137,7 +146,7 @@ class KafkaPoller extends Thread { */ void reconnect() { disconnectConsumers() - logger.info("Creating SimpleConsumer connections for brokers") + LOGGER.info('Creating SimpleConsumer connections for brokers') this.brokers.each { Broker b -> SimpleConsumer consumer = new SimpleConsumer(b.host, b.port, @@ -160,7 +169,7 @@ class KafkaPoller extends Thread { private void disconnectConsumers() { this.brokerConsumerMap.each { Integer brokerId, SimpleConsumer client -> - logger.info("Disconnecting ${client}") + LOGGER.info('Disconnecting {}', client) client?.disconnect() } } @@ -190,7 +199,6 @@ class KafkaPoller extends Thread { return JavaConversions.asScalaSet(set) } - private Object fetchMetadataForCurrentTopics() { return ClientUtils.fetchTopicMetadata( toScalaSet(currentTopics), @@ -199,6 +207,4 @@ class KafkaPoller extends Thread { KAFKA_TIMEOUT, 0) } - - } diff --git a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy index 586fb91..c8b031c 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/Main.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/Main.groovy @@ -4,18 +4,16 @@ import com.github.lookout.verspaetung.zk.BrokerTreeWatcher import com.github.lookout.verspaetung.zk.KafkaSpoutTreeWatcher import com.github.lookout.verspaetung.zk.StandardTreeWatcher import com.github.lookout.verspaetung.metrics.ConsumerGauge +import com.github.lookout.verspaetung.metrics.HeartbeatGauge -import java.util.AbstractMap import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentSkipListSet import java.util.concurrent.TimeUnit -import groovy.transform.TypeChecked import org.apache.commons.cli.* import org.apache.curator.retry.ExponentialBackoffRetry import org.apache.curator.framework.CuratorFrameworkFactory import org.apache.curator.framework.CuratorFramework -import org.apache.curator.framework.recipes.cache.TreeCache import org.coursera.metrics.datadog.DatadogReporter import org.coursera.metrics.datadog.transport.UdpTransport import org.slf4j.Logger @@ -23,13 +21,16 @@ import org.slf4j.LoggerFactory import com.codahale.metrics.* - +/** + * Main entry point for running the verspaetung application + */ +@SuppressWarnings class Main { private static final String METRICS_PREFIX = 'verspaetung' - private static Logger logger - private static ScheduledReporter reporter + private static final Logger logger = LoggerFactory.getLogger(Main) private static final MetricRegistry registry = new MetricRegistry() + private static ScheduledReporter reporter static void main(String[] args) { String statsdPrefix = METRICS_PREFIX @@ -56,7 +57,6 @@ class Main { delayInSeconds = cli.getOptionValue('d').toInteger() } - logger = LoggerFactory.getLogger(Main.class) logger.info("Running with: ${args}") logger.warn("Using: zookeepers={} statsd={}:{}", zookeeperHosts, statsdHost, statsdPort) logger.info("Reporting every {} seconds", delayInSeconds) @@ -65,9 +65,8 @@ class Main { statsdPrefix = "${cli.getOptionValue('prefix')}.${METRICS_PREFIX}" } - registry.register(MetricRegistry.name(Main.class, 'heartbeat'), - new metrics.HeartbeatGauge()) + new HeartbeatGauge()) ExponentialBackoffRetry retry = new ExponentialBackoffRetry(1000, 3) CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperHosts, retry) diff --git a/src/main/groovy/com/github/lookout/verspaetung/TopicPartition.groovy b/src/main/groovy/com/github/lookout/verspaetung/TopicPartition.groovy index 83312d1..3d72359 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/TopicPartition.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/TopicPartition.groovy @@ -4,8 +4,8 @@ package com.github.lookout.verspaetung * Simple container for Kafka topic names and partition IDs */ class TopicPartition { - private String topic - private Integer partition + private final String topic + private final Integer partition TopicPartition(String topic, Integer partition) { this.topic = topic @@ -27,12 +27,8 @@ class TopicPartition { return false } - if ((this.topic == compared.topic) && - (this.partition == compared.partition)) { - return true - } - - return false + return (this.topic == compared.topic) && + (this.partition == compared.partition) } @Override diff --git a/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy b/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy index 2abf1af..c39dd10 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/metrics/ConsumerGauge.groovy @@ -1,6 +1,5 @@ package com.github.lookout.verspaetung.metrics -import java.util.AbstractMap import com.codahale.metrics.Gauge import groovy.transform.TypeChecked import org.coursera.metrics.datadog.Tagged @@ -8,9 +7,6 @@ import org.coursera.metrics.datadog.Tagged import com.github.lookout.verspaetung.KafkaConsumer import com.github.lookout.verspaetung.TopicPartition -import org.slf4j.Logger -import org.slf4j.LoggerFactory - /** * Dropwizard Metrics Gauge for reporting the value of a given KafkaConsumer */ @@ -19,9 +15,7 @@ class ConsumerGauge implements Gauge, Tagged { protected KafkaConsumer consumer protected AbstractMap consumers protected AbstractMap topics - private TopicPartition topicPartition - - private Logger logger = LoggerFactory.getLogger(ConsumerGauge.class) + private final TopicPartition topicPartition ConsumerGauge(KafkaConsumer consumer, AbstractMap consumers, @@ -55,7 +49,7 @@ class ConsumerGauge implements Gauge, Tagged { return ["partition:${this.consumer.partition}", "topic:${this.consumer.topic}", "consumer-group:${this.consumer.name}" - ].collect { s -> s.toString() } + ]*.toString() } /** diff --git a/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy b/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy index 603864a..03679f7 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/metrics/HeartbeatGauge.groovy @@ -1,6 +1,6 @@ package com.github.lookout.verspaetung.metrics -import com.codahale.metrics.* +import com.codahale.metrics.Gauge /** * A simple gauge that will always just return 1 indicating that the process is diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy index 2a3e91a..cb9b102 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractConsumerTreeWatcher.groovy @@ -1,22 +1,22 @@ package com.github.lookout.verspaetung.zk import com.github.lookout.verspaetung.KafkaConsumer -import com.github.lookout.verspaetung.TopicPartition -import java.util.concurrent.CopyOnWriteArrayList import groovy.transform.TypeChecked import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.ChildData import org.apache.curator.framework.recipes.cache.TreeCacheEvent +/** + */ @TypeChecked abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher { protected AbstractMap consumerOffsets protected AbstractSet watchedTopics protected List onConsumerData = [] - AbstractConsumerTreeWatcher(CuratorFramework client, + protected AbstractConsumerTreeWatcher(CuratorFramework client, AbstractSet topics, AbstractMap offsets) { super(client) @@ -86,12 +86,8 @@ abstract class AbstractConsumerTreeWatcher extends AbstractTreeWatcher { * we're interested in */ Boolean isNodeEvent(TreeCacheEvent event) { - if ((event?.type == TreeCacheEvent.Type.NODE_ADDED) || - (event?.type == TreeCacheEvent.Type.NODE_UPDATED)) { - return true - } - return false + return (event?.type == TreeCacheEvent.Type.NODE_ADDED) || + (event?.type == TreeCacheEvent.Type.NODE_UPDATED) } } - diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy index d9e3445..81dcab5 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/AbstractTreeWatcher.groovy @@ -16,19 +16,21 @@ import org.slf4j.LoggerFactory * further down the pipeline */ @TypeChecked +@SuppressWarnings(['ThisReferenceEscapesConstructor']) abstract class AbstractTreeWatcher implements TreeCacheListener { protected List onInitComplete protected Logger logger protected CuratorFramework client protected TreeCache cache - AbstractTreeWatcher(CuratorFramework client) { - this.logger = LoggerFactory.getLogger(this.class) - this.client = client - this.onInitComplete = [] + protected AbstractTreeWatcher(CuratorFramework curatorClient) { + logger = LoggerFactory.getLogger(this.class) + client = curatorClient + onInitComplete = [] - this.cache = new TreeCache(client, zookeeperPath()) - this.cache.listenable.addListener(this) + cache = new TreeCache(client, zookeeperPath()) + /* this may introduce a race condition, need to figure out a better way to handle it */ + cache.listenable.addListener(this) } /** diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy index b0ccb28..310551a 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/BrokerTreeWatcher.groovy @@ -4,15 +4,10 @@ import com.github.lookout.verspaetung.KafkaBroker import groovy.json.JsonSlurper import groovy.transform.TypeChecked -import java.util.Collections import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.recipes.cache.ChildData -import org.apache.curator.framework.recipes.cache.TreeCache -import org.apache.curator.framework.recipes.cache.TreeCacheListener import org.apache.curator.framework.recipes.cache.TreeCacheEvent -import org.slf4j.Logger -import org.slf4j.LoggerFactory /** * The BrokerTreeWatcher is a kind of watcher whose sole purpose is @@ -24,11 +19,10 @@ class BrokerTreeWatcher extends AbstractTreeWatcher { static final Integer INVALID_BROKER_ID = -1 private static final String BROKERS_PATH = '/brokers/ids' - private final Logger logger = LoggerFactory.getLogger(BrokerTreeWatcher.class) - private JsonSlurper json - private List onBrokerUpdates private Boolean isTreeInitialized = false - private List brokers + private final JsonSlurper json + private final List onBrokerUpdates + private final List brokers BrokerTreeWatcher(CuratorFramework client) { super(client) @@ -38,7 +32,6 @@ class BrokerTreeWatcher extends AbstractTreeWatcher { this.onBrokerUpdates = [] } - String zookeeperPath() { return BROKERS_PATH } diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/ConsumerOffset.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/ConsumerOffset.groovy index c956bd6..8ce5aba 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/ConsumerOffset.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/ConsumerOffset.groovy @@ -1,16 +1,13 @@ package com.github.lookout.verspaetung.zk -import org.apache.curator.framework.recipes.cache.ChildData - /** * POJO representing data from Zookeeper for a consumer, topic and offset */ class ConsumerOffset { - private String topic - private String groupName - private Integer offset - private Integer partition - private ChildData rawData + String topic + String groupName + Integer offset + Integer partition ConsumerOffset() { } @@ -22,7 +19,7 @@ class ConsumerOffset { } String toString() { - return "ConsumerOffset[${hashCode()}] ${topic}:${partition} ${groupName} is at ${offset}" + return "ConsumerOffset[${hashCode()}] ${topic}:${partition} ${groupName} is at ${offset}".toString() } } diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy index eba5030..2b01a21 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/KafkaSpoutTreeWatcher.groovy @@ -15,7 +15,7 @@ import org.apache.curator.framework.recipes.cache.ChildData @InheritConstructors class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher { private static final String ZK_PATH = '/kafka_spout' - private JsonSlurper json + private final JsonSlurper json KafkaSpoutTreeWatcher(CuratorFramework client, AbstractSet topics, @@ -25,12 +25,15 @@ class KafkaSpoutTreeWatcher extends AbstractConsumerTreeWatcher { this.json = new JsonSlurper() } - String zookeeperPath() { return ZK_PATH } + String zookeeperPath() { + return ZK_PATH + } /* skipping type checking since Groovy's JsonSlurper gives us a pretty * loose Object to deal with */ @TypeChecked(TypeCheckingMode.SKIP) + @SuppressWarnings(['LineLength']) ConsumerOffset processChildData(ChildData nodeData) { Object offsetData = json.parseText(new String(nodeData.data, 'UTF-8')) /* diff --git a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy index dd5e209..cf6c4a4 100644 --- a/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy +++ b/src/main/groovy/com/github/lookout/verspaetung/zk/StandardTreeWatcher.groovy @@ -13,12 +13,15 @@ import org.apache.curator.framework.recipes.cache.ChildData class StandardTreeWatcher extends AbstractConsumerTreeWatcher { private static final String ZK_PATH = '/consumers' - String zookeeperPath() { return ZK_PATH } + String zookeeperPath() { + return ZK_PATH + } /** * Extract the necessary information from a standard (i.e. high-level Kafka * consumer) tree of offsets */ + @SuppressWarnings(['LineLength']) ConsumerOffset processChildData(ChildData data) { /* ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473,8595174478,1416808804928,1416808805262,1,0,0,0,1,0,8595174473, data=[48]} @@ -40,15 +43,13 @@ ChildData{path='/consumers/offtopic-spock-test/offsets/topic/7', stat=8595174473 } catch (NumberFormatException ex) { logger.error("Failed to parse an Integer: ${data}") - return null + offset = null } return offset } - Boolean isOffsetSubtree(String path) { return (path =~ /\/consumers\/(.*)\/offsets\/(.*)/) } - }