diff --git a/src/main/java/com/github/reiseburo/rx/curator/PathChildren.java b/src/main/java/com/github/reiseburo/rx/curator/PathChildren.java index b43a824..3ca8dba 100644 --- a/src/main/java/com/github/reiseburo/rx/curator/PathChildren.java +++ b/src/main/java/com/github/reiseburo/rx/curator/PathChildren.java @@ -44,7 +44,6 @@ public class PathChildren { return instance; } - public Observable watch(final String znodePath) { return Observable.create(new Observable.OnSubscribe() { @Override diff --git a/src/test/groovy/com/github/reiseburo/rx/curator/PathChildrenSpec.groovy b/src/test/groovy/com/github/reiseburo/rx/curator/PathChildrenSpec.groovy index f1c0ef7..4fb116d 100644 --- a/src/test/groovy/com/github/reiseburo/rx/curator/PathChildrenSpec.groovy +++ b/src/test/groovy/com/github/reiseburo/rx/curator/PathChildrenSpec.groovy @@ -41,35 +41,68 @@ class PathChildrenIntegrationSpec extends Specification { TestingServer server TestSubscriber subscriber PollingConditions pollingConditions + boolean eventReceived = false def setup() { server = new TestingServer(true) curator = CuratorFrameworkFactory.newClient(server.connectString, new RetryOneTime(1000)) curator.start() subscriber = new TestSubscriber() - pollingConditions = new PollingConditions(timeout: 3) + pollingConditions = new PollingConditions(timeout: 10) } def cleanup() { + eventReceived = false curator?.close() server?.close() } - def "an initialization event should be received"() { - given: - boolean received = false - Subscription subscription - Observable observable = PathChildren.with(curator).watch('/') + Observable createObservableFor(CuratorFramework curator, String path) { + return PathChildren.with(curator).watch(path) + } + def "an initialization event should be received"() { when: - subscription = observable.subscribe { PathChildrenCacheEvent ev -> + createObservableFor(curator, '/').subscribe { PathChildrenCacheEvent ev -> if (ev.type == PathChildrenCacheEvent.Type.INITIALIZED) { - received = true - subscription.unsubscribe() + eventReceived = true } } then: - pollingConditions.eventually { assert received } + pollingConditions.eventually { assert eventReceived } + } + + def "receive normal add events"() { + given: + createObservableFor(curator, '/').subscribe { PathChildrenCacheEvent ev -> + if (ev.type == PathChildrenCacheEvent.Type.CHILD_ADDED) { + eventReceived = true + } + } + + when: + curator.create().forPath('/spock') + + then: + pollingConditions.eventually { assert eventReceived } + } + + def "receive delete events"() { + given: + final String znode = '/spock' + curator.create().forPath(znode) + + createObservableFor(curator, '/').subscribe { PathChildrenCacheEvent ev -> + if (ev.type == PathChildrenCacheEvent.Type.CHILD_REMOVED) { + eventReceived = true + } + } + + when: + curator.delete().forPath(znode) + + then: + pollingConditions.eventually { assert eventReceived } } }