declare private methods as such

helps to understand what methods could be used by other threads
This commit is contained in:
Christian Meier 2015-11-02 19:56:42 +01:00
parent 45d98cb5f6
commit 439e8f635e
1 changed files with 7 additions and 7 deletions

View File

@ -81,14 +81,14 @@ class KafkaPoller extends Thread {
} }
} }
void slower(Delay delay) { private void slower(Delay delay) {
if (delay.slower()) { if (delay.slower()) {
LOGGER.error('using ' + delay) LOGGER.error('using ' + delay)
} }
} }
@SuppressWarnings(['CatchException']) @SuppressWarnings(['CatchException'])
void dumpMetadata() { private void dumpMetadata() {
LOGGER.debug('dumping meta-data') LOGGER.debug('dumping meta-data')
Object metadata = fetchMetadataForCurrentTopics() Object metadata = fetchMetadataForCurrentTopics()
@ -113,7 +113,7 @@ class KafkaPoller extends Thread {
* The 'metadata' is the expected return from * The 'metadata' is the expected return from
* kafka.client.ClientUtils.fetchTopicMetadata * kafka.client.ClientUtils.fetchTopicMetadata
*/ */
void withTopicsAndPartitions(Object metadata, Closure closure) { private void withTopicsAndPartitions(Object metadata, Closure closure) {
withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f -> withScalaCollection(metadata.topicsMetadata).each { kafka.api.TopicMetadata f ->
withScalaCollection(f.partitionsMetadata).each { p -> withScalaCollection(f.partitionsMetadata).each { p ->
TopicPartition tp = new TopicPartition(f.topic, p.partitionId) TopicPartition tp = new TopicPartition(f.topic, p.partitionId)
@ -125,7 +125,7 @@ class KafkaPoller extends Thread {
/** /**
* Fetch the leader metadata and update our data structures * Fetch the leader metadata and update our data structures
*/ */
void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) { private void captureLatestOffsetFor(TopicPartition tp, Object partitionMetadata) {
Integer leaderId = partitionMetadata.leader.get()?.id Integer leaderId = partitionMetadata.leader.get()?.id
Integer partitionId = partitionMetadata.partitionId Integer partitionId = partitionMetadata.partitionId
@ -134,7 +134,7 @@ class KafkaPoller extends Thread {
this.topicOffsetMap[tp] = offset this.topicOffsetMap[tp] = offset
} }
Long latestFromLeader(Integer leaderId, String topic, Integer partition) { private Long latestFromLeader(Integer leaderId, String topic, Integer partition) {
SimpleConsumer consumer = this.brokerConsumerMap[leaderId] SimpleConsumer consumer = this.brokerConsumerMap[leaderId]
/* If we don't have a proper SimpleConsumer instance (e.g. null) then /* If we don't have a proper SimpleConsumer instance (e.g. null) then
@ -149,14 +149,14 @@ class KafkaPoller extends Thread {
return consumer.earliestOrLatestOffset(topicAndPart, -1, 0) return consumer.earliestOrLatestOffset(topicAndPart, -1, 0)
} }
Iterable withScalaCollection(scala.collection.Iterable iter) { private Iterable withScalaCollection(scala.collection.Iterable iter) {
return JavaConversions.asJavaIterable(iter) return JavaConversions.asJavaIterable(iter)
} }
/** /**
* Blocking reconnect to the Kafka brokers * Blocking reconnect to the Kafka brokers
*/ */
void reconnect() { private void reconnect() {
disconnectConsumers() disconnectConsumers()
LOGGER.info('Creating SimpleConsumer connections for brokers') LOGGER.info('Creating SimpleConsumer connections for brokers')
synchronized(this.brokers) { synchronized(this.brokers) {