mirror of https://github.com/reiseburo/beetle
Convert PathCacheObservable experiment to use code refactored into rx-curator
This commit is contained in:
parent
f46fb4f0f4
commit
f1a539541d
|
@ -1,17 +1,13 @@
|
|||
package rx.curator
|
||||
|
||||
import com.github.reiseburo.rx.curator.PathChildren
|
||||
import org.apache.curator.RetryPolicy
|
||||
import org.apache.curator.framework.CuratorFramework
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCache
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent
|
||||
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener
|
||||
import org.apache.curator.retry.RetryNTimes
|
||||
|
||||
import rx.Observable
|
||||
import rx.Observer
|
||||
import rx.Subscriber
|
||||
import rx.observables.BlockingObservable
|
||||
|
||||
class PathCacheObservable {
|
||||
static void main(String[] arguments) {
|
||||
|
@ -20,39 +16,19 @@ class PathCacheObservable {
|
|||
RetryPolicy retryPolicy = new RetryNTimes(3, 1000)
|
||||
CuratorFramework curator = CuratorFrameworkFactory.newClient('localhost:2181', retryPolicy)
|
||||
curator.start()
|
||||
PathChildrenCache cache = new PathChildrenCache(curator, '/brokers', true)
|
||||
println "Starting dataset: ${cache.currentData}"
|
||||
|
||||
Observable observable = Observable.create { Subscriber subscriber ->
|
||||
cache.listenable.addListener({ CuratorFramework cf, PathChildrenCacheEvent ev ->
|
||||
println "heard ${ev} from ${cf}"
|
||||
|
||||
/* If we've got our terminate event, properly close the cache and
|
||||
* tell our subscriber, if it exists, to complete
|
||||
*/
|
||||
if (ev.type == PathChildrenCacheEvent.Type.INITIALIZED) {
|
||||
cache.close()
|
||||
if (!subscriber.isUnsubscribed()) {
|
||||
subscriber.onCompleted()
|
||||
}
|
||||
}
|
||||
else {
|
||||
/* emit each event that isn't our terminator */
|
||||
subscriber.onNext(ev)
|
||||
}
|
||||
} as PathChildrenCacheListener)
|
||||
|
||||
/* Starting with a post-initialized event so we know when to terminate */
|
||||
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT)
|
||||
}
|
||||
|
||||
boolean waitFor = true
|
||||
|
||||
observable.subscribe([
|
||||
onError: { Throwable e -> println "error: ${e}"; e.printStackTrace() },
|
||||
onCompleted: { waitFor = false; println "complete"; println "Final data: ${cache.currentData}" },
|
||||
onNext: { println "received: ${it}" }
|
||||
] as Observer<PathChildrenCacheEvent>)
|
||||
PathChildren.with(curator).watch('/brokers')
|
||||
.flatMap({ PathChildrenCacheEvent ev ->
|
||||
if (ev.type == PathChildrenCacheEvent.Type.CHILD_ADDED) {
|
||||
return Observable.from(ev.data)
|
||||
}
|
||||
})
|
||||
.subscribe({
|
||||
println "Rec: ${it}"
|
||||
waitFor = false
|
||||
})
|
||||
|
||||
/* since this is evented, we need something to sit around and wait for us */
|
||||
synchronized (this) {
|
||||
|
|
Loading…
Reference in New Issue