Properly handle unsubscriptions and close our PathChildrenCache object out

This commit is contained in:
R. Tyler Croy 2015-09-04 12:36:30 -07:00
parent a1c7147c5e
commit 10ada1646f
No known key found for this signature in database
GPG Key ID: 1426C7DC3F51E16F
2 changed files with 20 additions and 3 deletions

View File

@ -6,6 +6,9 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import rx.Observable; import rx.Observable;
import rx.Subscriber; import rx.Subscriber;
import rx.functions.Action0;
import java.io.IOException;
/** /**
* PathChildren is an {@code Observable} which takes events from a {@code PathChildrenCache} * PathChildren is an {@code Observable} which takes events from a {@code PathChildrenCache}
@ -49,8 +52,7 @@ public class PathChildren {
cache = new PathChildrenCache(curatorFramework, znodePath, true); cache = new PathChildrenCache(curatorFramework, znodePath, true);
try { try {
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
} } catch (Exception ex) {
catch (Exception ex) {
subscriber.onError(ex); subscriber.onError(ex);
} }
@ -61,6 +63,19 @@ public class PathChildren {
} }
}); });
} }
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
/* properly close out our cache object when our subscriber leaves */
if (cache instanceof PathChildrenCache) {
try {
cache.close();
} catch (IOException exception) {
/* Swallow, we don't need this exception */
}
}
}
}); });
} }
} }

View File

@ -58,12 +58,14 @@ class PathChildrenIntegrationSpec extends Specification {
def "an initialization event should be received"() { def "an initialization event should be received"() {
given: given:
boolean received = false boolean received = false
Subscription subscription
Observable<PathChildrenCacheEvent> observable = PathChildren.with(curator).watch('/') Observable<PathChildrenCacheEvent> observable = PathChildren.with(curator).watch('/')
when: when:
observable.subscribe { PathChildrenCacheEvent ev -> subscription = observable.subscribe { PathChildrenCacheEvent ev ->
if (ev.type == PathChildrenCacheEvent.Type.INITIALIZED) { if (ev.type == PathChildrenCacheEvent.Type.INITIALIZED) {
received = true received = true
subscription.unsubscribe()
} }
} }