Prefer callback lists instead of a single callback per object
This doesn't really matter but I prefer this approach from a style vantage point
This commit is contained in:
parent
8b792d72fd
commit
293ebb6fa9
|
@ -26,12 +26,12 @@ class Main {
|
|||
|
||||
KafkaPoller poller = new KafkaPoller()
|
||||
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(consumers)
|
||||
consumerWatcher.onInitComplete = {
|
||||
consumerWatcher.onInitComplete << {
|
||||
println "standard consumers initialized to ${consumers.size()} (topic, partition) tuples"
|
||||
}
|
||||
|
||||
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client)
|
||||
brokerWatcher.onBrokerUpdates = { brokers ->
|
||||
brokerWatcher.onBrokerUpdates << { brokers ->
|
||||
poller.refresh(brokers)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,8 @@ package com.github.lookout.verspaetung.zk
|
|||
|
||||
import com.github.lookout.verspaetung.TopicPartition
|
||||
|
||||
import groovy.transform.TypeChecked
|
||||
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.recipes.cache.ChildData
|
||||
import org.apache.curator.framework.recipes.cache.TreeCacheListener
|
||||
|
@ -13,12 +15,14 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
|||
* watchers is to process events from the TreeCache and emit processed events
|
||||
* further down the pipeline
|
||||
*/
|
||||
@TypeChecked
|
||||
abstract class AbstractTreeWatcher implements TreeCacheListener {
|
||||
protected AbstractMap consumersMap
|
||||
protected Closure onInitComplete
|
||||
protected AbstractMap<TopicPartition, List<ConsumerOffset>> consumersMap
|
||||
protected List<Closure> onInitComplete
|
||||
|
||||
AbstractTreeWatcher(AbstractMap consumers) {
|
||||
this.consumersMap = consumers
|
||||
this.onInitComplete = []
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -31,7 +35,9 @@ abstract class AbstractTreeWatcher implements TreeCacheListener {
|
|||
*/
|
||||
void childEvent(CuratorFramework client, TreeCacheEvent event) {
|
||||
if (event?.type == TreeCacheEvent.Type.INITIALIZED) {
|
||||
this.onInitComplete?.call()
|
||||
this.onInitComplete.each { Closure c ->
|
||||
c?.call()
|
||||
}
|
||||
}
|
||||
|
||||
/* bail out early if we don't care about the event */
|
||||
|
|
|
@ -24,7 +24,7 @@ class BrokerTreeWatcher implements TreeCacheListener {
|
|||
private JsonSlurper json
|
||||
private TreeCache cache
|
||||
private final String BROKERS_PATH = '/brokers/ids'
|
||||
private Closure onBrokerUpdates
|
||||
private List<Closure> onBrokerUpdates
|
||||
private Boolean isTreeInitialized = false
|
||||
private List<KafkaBroker> brokers
|
||||
|
||||
|
@ -33,6 +33,7 @@ class BrokerTreeWatcher implements TreeCacheListener {
|
|||
this.cache = new TreeCache(client, BROKERS_PATH)
|
||||
this.cache.listenable.addListener(this)
|
||||
this.brokers = []
|
||||
this.onBrokerUpdates = []
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -52,7 +53,10 @@ class BrokerTreeWatcher implements TreeCacheListener {
|
|||
*/
|
||||
if (event.type == TreeCacheEvent.Type.INITIALIZED) {
|
||||
this.isTreeInitialized = true
|
||||
this.onBrokerUpdates?.call(Collections.synchronizedList(this.brokers))
|
||||
List threadsafeBrokers = Collections.synchronizedList(this.brokers)
|
||||
this.onBrokerUpdates.each { Closure c ->
|
||||
c?.call(threadsafeBrokers)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue