Implement an end-to-end test with the observer

The way the PathChildrenCache works by default is by calling the callback on a
separate thread, so we need to wait for the results to get to our subscriber
and periodically check things.

Considering a means of adding the ability to execute on "my" thread
This commit is contained in:
R. Tyler Croy 2015-09-04 11:48:35 -07:00
parent 1bf5311770
commit a1c7147c5e
No known key found for this signature in database
GPG Key ID: 1426C7DC3F51E16F
2 changed files with 74 additions and 2 deletions

View File

@ -1,8 +1,11 @@
package com.github.reiseburo.rx.curator;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import rx.Observable;
import rx.Subscriber;
/**
* PathChildren is an {@code Observable} which takes events from a {@code PathChildrenCache}
@ -10,6 +13,10 @@ import rx.Observable;
*/
public class PathChildren {
protected CuratorFramework curatorFramework;
protected PathChildrenCache cache;
protected PathChildren() {
}
public CuratorFramework getCuratorFramework() {
return curatorFramework;
@ -35,7 +42,25 @@ public class PathChildren {
}
public Observable<PathChildrenCacheEvent> watch(String znodePath) {
return Observable.never();
public Observable<PathChildrenCacheEvent> watch(final String znodePath) {
return Observable.create(new Observable.OnSubscribe<PathChildrenCacheEvent>() {
@Override
public void call(final Subscriber<? super PathChildrenCacheEvent> subscriber) {
cache = new PathChildrenCache(curatorFramework, znodePath, true);
try {
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
}
catch (Exception ex) {
subscriber.onError(ex);
}
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
subscriber.onNext(event);
}
});
}
});
}
}

View File

@ -1,9 +1,16 @@
package com.github.reiseburo.rx.curator
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent
import org.apache.curator.retry.RetryOneTime
import org.apache.curator.test.TestingServer
import rx.Observable
import rx.Subscription
import rx.observers.TestSubscriber
import rx.schedulers.Schedulers
import spock.lang.*
import spock.util.concurrent.PollingConditions
/**
*/
@ -24,3 +31,43 @@ class PathChildrenSpec extends Specification {
PathChildren.with(curatorFramework).watch('/brokers') instanceof Observable<PathChildrenCacheEvent>
}
}
/**
* This test will spin up an in-process Zookeeper server for testing events that
* get fired off through the PathChildren observer
*/
class PathChildrenIntegrationSpec extends Specification {
CuratorFramework curator
TestingServer server
TestSubscriber subscriber
PollingConditions pollingConditions
def setup() {
server = new TestingServer(true)
curator = CuratorFrameworkFactory.newClient(server.connectString, new RetryOneTime(1000))
curator.start()
subscriber = new TestSubscriber()
pollingConditions = new PollingConditions(timeout: 3)
}
def cleanup() {
curator?.close()
server?.close()
}
def "an initialization event should be received"() {
given:
boolean received = false
Observable<PathChildrenCacheEvent> observable = PathChildren.with(curator).watch('/')
when:
observable.subscribe { PathChildrenCacheEvent ev ->
if (ev.type == PathChildrenCacheEvent.Type.INITIALIZED) {
received = true
}
}
then:
pollingConditions.eventually { assert received }
}
}