Add a simple PathChildrenCache experiment trying to tie Curator into RxJava

Running this against a running Kafka results in:

    9:14:42 AM: Executing external task 'runExperiment -t -PexperimentClass=rx.curator.PathCacheObservable'...
    :compileJava UP-TO-DATE
    :compileGroovy UP-TO-DATE
    :processResources UP-TO-DATE
    :classes UP-TO-DATE
    :compileExperimentsJava UP-TO-DATE
    :compileExperimentsGroovy
    :processExperimentsResources UP-TO-DATE
    :experimentsClasses
    :compileTestJava UP-TO-DATE
    :compileTestGroovy UP-TO-DATE
    :processTestResources UP-TO-DATE
    :testClasses UP-TO-DATE
    :test UP-TO-DATE
    :runExperiment
    Starting PatchCacheObservable experiment
    log4j:WARN No appenders could be found for logger (org.apache.curator.framework.imps.CuratorFrameworkImpl).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    Starting dataset: []
    heard PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/brokers/topics', stat=4294967331,4294967331,1412043135590,1412043135590,0,27,0,0,0,27,824641831943
    , data=null}} from org.apache.curator.framework.imps.CuratorFrameworkImpl@5eb30746
    received: PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/brokers/topics', stat=4294967331,4294967331,1412043135590,1412043135590,0,27,0,0,0,27,824641831943
    , data=null}}
    heard PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/brokers/ids', stat=4294967330,4294967330,1412043135586,1412043135586,0,135,0,0,0,3,824659713847
    , data=null}} from org.apache.curator.framework.imps.CuratorFrameworkImpl@5eb30746
    received: PathChildrenCacheEvent{type=CHILD_ADDED, data=ChildData{path='/brokers/ids', stat=4294967330,4294967330,1412043135586,1412043135586,0,135,0,0,0,3,824659713847
    , data=null}}
    heard PathChildrenCacheEvent{type=INITIALIZED, data=null} from org.apache.curator.framework.imps.CuratorFrameworkImpl@5eb30746
    complete
    Final data: [ChildData{path='/brokers/ids', stat=4294967330,4294967330,1412043135586,1412043135586,0,135,0,0,0,3,824659713847
    , data=null}, ChildData{path='/brokers/topics', stat=4294967331,4294967331,1412043135590,1412043135590,0,27,0,0,0,27,824641831943
    , data=null}]

    BUILD SUCCESSFUL

    Total time: 9.821 secs

This doesn't really result in much interesting other than the events being sent
along to the subscriber properly, which I suppose is neat
This commit is contained in:
R. Tyler Croy 2015-09-04 09:15:42 -07:00
parent 65a6a19b6d
commit 4eee010938
No known key found for this signature in database
GPG Key ID: 1426C7DC3F51E16F
1 changed files with 63 additions and 0 deletions

View File

@ -0,0 +1,63 @@
package rx.curator
import org.apache.curator.RetryPolicy
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.cache.PathChildrenCache
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener
import org.apache.curator.retry.RetryNTimes
import rx.Observable
import rx.Observer
import rx.Subscriber
import rx.observables.BlockingObservable
class PathCacheObservable {
static void main(String[] arguments) {
println "Starting PatchCacheObservable experiment"
RetryPolicy retryPolicy = new RetryNTimes(3, 1000)
CuratorFramework curator = CuratorFrameworkFactory.newClient('localhost:2181', retryPolicy)
curator.start()
PathChildrenCache cache = new PathChildrenCache(curator, '/brokers', true)
println "Starting dataset: ${cache.currentData}"
Observable observable = Observable.create { Subscriber subscriber ->
cache.listenable.addListener({ CuratorFramework cf, PathChildrenCacheEvent ev ->
println "heard ${ev} from ${cf}"
/* If we've got our terminate event, properly close the cache and
* tell our subscriber, if it exists, to complete
*/
if (ev.type == PathChildrenCacheEvent.Type.INITIALIZED) {
cache.close()
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted()
}
}
else {
/* emit each event that isn't our terminator */
subscriber.onNext(ev)
}
} as PathChildrenCacheListener)
/* Starting with a post-initialized event so we know when to terminate */
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT)
}
boolean waitFor = true
observable.subscribe([
onError: { Throwable e -> println "error: ${e}"; e.printStackTrace() },
onCompleted: { waitFor = false; println "complete"; println "Final data: ${cache.currentData}" },
onNext: { println "received: ${it}" }
] as Observer<PathChildrenCacheEvent>)
/* since this is evented, we need something to sit around and wait for us */
synchronized (this) {
while (waitFor) { wait(500) }
}
}
}