Properly use the passed in AbstractMap for the StandardTreeWatcher
This commit is contained in:
parent
eae5e7436e
commit
8b792d72fd
|
@ -2,6 +2,8 @@ package com.github.lookout.verspaetung
|
||||||
|
|
||||||
import groovy.transform.TypeChecked
|
import groovy.transform.TypeChecked
|
||||||
|
|
||||||
|
import kafka.client.ClientUtils
|
||||||
|
|
||||||
@TypeChecked
|
@TypeChecked
|
||||||
class KafkaPoller extends Thread {
|
class KafkaPoller extends Thread {
|
||||||
private Boolean keepRunning = true
|
private Boolean keepRunning = true
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package com.github.lookout.verspaetung
|
package com.github.lookout.verspaetung
|
||||||
|
|
||||||
import com.github.lookout.verspaetung.zk.BrokerTreeWatcher
|
import com.github.lookout.verspaetung.zk.BrokerTreeWatcher
|
||||||
|
import com.github.lookout.verspaetung.zk.StandardTreeWatcher
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
import groovy.transform.TypeChecked
|
import groovy.transform.TypeChecked
|
||||||
|
@ -8,11 +9,7 @@ import groovy.transform.TypeChecked
|
||||||
import org.apache.curator.retry.ExponentialBackoffRetry
|
import org.apache.curator.retry.ExponentialBackoffRetry
|
||||||
import org.apache.curator.framework.CuratorFrameworkFactory
|
import org.apache.curator.framework.CuratorFrameworkFactory
|
||||||
import org.apache.curator.framework.CuratorFramework
|
import org.apache.curator.framework.CuratorFramework
|
||||||
import org.apache.zookeeper.KeeperException
|
|
||||||
import org.apache.curator.framework.recipes.cache.TreeCache
|
import org.apache.curator.framework.recipes.cache.TreeCache
|
||||||
import org.apache.curator.framework.recipes.cache.TreeCacheListener
|
|
||||||
|
|
||||||
import kafka.client.ClientUtils
|
|
||||||
|
|
||||||
//@TypeChecked
|
//@TypeChecked
|
||||||
class Main {
|
class Main {
|
||||||
|
@ -26,21 +23,26 @@ class Main {
|
||||||
client.start()
|
client.start()
|
||||||
|
|
||||||
TreeCache cache = new TreeCache(client, '/consumers')
|
TreeCache cache = new TreeCache(client, '/consumers')
|
||||||
cache.listenable.addListener(new zk.StandardTreeWatcher(consumers))
|
|
||||||
|
|
||||||
KafkaPoller poller = new KafkaPoller()
|
KafkaPoller poller = new KafkaPoller()
|
||||||
|
StandardTreeWatcher consumerWatcher = new StandardTreeWatcher(consumers)
|
||||||
|
consumerWatcher.onInitComplete = {
|
||||||
|
println "standard consumers initialized to ${consumers.size()} (topic, partition) tuples"
|
||||||
|
}
|
||||||
|
|
||||||
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client)
|
BrokerTreeWatcher brokerWatcher = new BrokerTreeWatcher(client)
|
||||||
brokerWatcher.onBrokerUpdates = { brokers ->
|
brokerWatcher.onBrokerUpdates = { brokers ->
|
||||||
poller.refresh(brokers)
|
poller.refresh(brokers)
|
||||||
}
|
}
|
||||||
|
|
||||||
poller.start()
|
cache.listenable.addListener(consumerWatcher)
|
||||||
|
|
||||||
|
poller.start()
|
||||||
brokerWatcher.start()
|
brokerWatcher.start()
|
||||||
cache.start()
|
cache.start()
|
||||||
println 'started..'
|
println 'started..'
|
||||||
|
|
||||||
Thread.sleep(5 * 1000)
|
Thread.sleep(9 * 1000)
|
||||||
|
|
||||||
println 'exiting..'
|
println 'exiting..'
|
||||||
poller.die()
|
poller.die()
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
package com.github.lookout.verspaetung.zk
|
||||||
|
|
||||||
import com.github.lookout.verspaetung.TopicPartition
|
import com.github.lookout.verspaetung.TopicPartition
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
|
||||||
|
|
||||||
import org.apache.curator.framework.CuratorFramework
|
import org.apache.curator.framework.CuratorFramework
|
||||||
import org.apache.curator.framework.recipes.cache.ChildData
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
|
@ -15,10 +14,10 @@ import org.apache.curator.framework.recipes.cache.TreeCacheEvent
|
||||||
* further down the pipeline
|
* further down the pipeline
|
||||||
*/
|
*/
|
||||||
abstract class AbstractTreeWatcher implements TreeCacheListener {
|
abstract class AbstractTreeWatcher implements TreeCacheListener {
|
||||||
protected Map consumersMap = [:]
|
protected AbstractMap consumersMap
|
||||||
|
protected Closure onInitComplete
|
||||||
|
|
||||||
AbstractTreeWatcher() { }
|
AbstractTreeWatcher(AbstractMap consumers) {
|
||||||
AbstractTreeWatcher(Map consumers) {
|
|
||||||
this.consumersMap = consumers
|
this.consumersMap = consumers
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,6 +30,10 @@ abstract class AbstractTreeWatcher implements TreeCacheListener {
|
||||||
* Primary TreeCache event processing callback
|
* Primary TreeCache event processing callback
|
||||||
*/
|
*/
|
||||||
void childEvent(CuratorFramework client, TreeCacheEvent event) {
|
void childEvent(CuratorFramework client, TreeCacheEvent event) {
|
||||||
|
if (event?.type == TreeCacheEvent.Type.INITIALIZED) {
|
||||||
|
this.onInitComplete?.call()
|
||||||
|
}
|
||||||
|
|
||||||
/* bail out early if we don't care about the event */
|
/* bail out early if we don't care about the event */
|
||||||
if (!isNodeEvent(event)) {
|
if (!isNodeEvent(event)) {
|
||||||
return
|
return
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package com.github.lookout.verspaetung.zk
|
package com.github.lookout.verspaetung.zk
|
||||||
|
|
||||||
import groovy.transform.TypeChecked
|
import groovy.transform.TypeChecked
|
||||||
|
import groovy.transform.InheritConstructors
|
||||||
import org.apache.curator.framework.recipes.cache.ChildData
|
import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -8,6 +9,7 @@ import org.apache.curator.framework.recipes.cache.ChildData
|
||||||
* consumers
|
* consumers
|
||||||
*/
|
*/
|
||||||
@TypeChecked
|
@TypeChecked
|
||||||
|
@InheritConstructors
|
||||||
class StandardTreeWatcher extends AbstractTreeWatcher {
|
class StandardTreeWatcher extends AbstractTreeWatcher {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -10,6 +10,9 @@ class AbstractTreeWatcherSpec extends Specification {
|
||||||
private AbstractTreeWatcher watcher
|
private AbstractTreeWatcher watcher
|
||||||
|
|
||||||
class MockWatcher extends AbstractTreeWatcher {
|
class MockWatcher extends AbstractTreeWatcher {
|
||||||
|
MockWatcher() {
|
||||||
|
super([:])
|
||||||
|
}
|
||||||
ConsumerOffset processChildData(ChildData d) { }
|
ConsumerOffset processChildData(ChildData d) { }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,7 +41,6 @@ class AbstractTreeWatcherSpec extends Specification {
|
||||||
watcher.isNodeEvent(event) == true
|
watcher.isNodeEvent(event) == true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def "childEvent() not processChildData if the event is not to be processed"() {
|
def "childEvent() not processChildData if the event is not to be processed"() {
|
||||||
given:
|
given:
|
||||||
watcher = Spy(MockWatcher)
|
watcher = Spy(MockWatcher)
|
||||||
|
|
|
@ -8,7 +8,7 @@ class StandardTreeWatcherSpec extends Specification {
|
||||||
private StandardTreeWatcher watcher
|
private StandardTreeWatcher watcher
|
||||||
|
|
||||||
def setup() {
|
def setup() {
|
||||||
this.watcher = new StandardTreeWatcher()
|
this.watcher = new StandardTreeWatcher([:])
|
||||||
}
|
}
|
||||||
|
|
||||||
def "processChildData should return null if null is given"() {
|
def "processChildData should return null if null is given"() {
|
||||||
|
|
Loading…
Reference in New Issue