From 10ada1646f0cc79d775be2ffb6b06a1bb0989872 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 4 Sep 2015 12:36:30 -0700 Subject: [PATCH] Properly handle unsubscriptions and close our PathChildrenCache object out --- .../reiseburo/rx/curator/PathChildren.java | 19 +++++++++++++++++-- .../rx/curator/PathChildrenSpec.groovy | 4 +++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/github/reiseburo/rx/curator/PathChildren.java b/src/main/java/com/github/reiseburo/rx/curator/PathChildren.java index 5c305f5..222eb78 100644 --- a/src/main/java/com/github/reiseburo/rx/curator/PathChildren.java +++ b/src/main/java/com/github/reiseburo/rx/curator/PathChildren.java @@ -6,6 +6,9 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import rx.Observable; import rx.Subscriber; +import rx.functions.Action0; + +import java.io.IOException; /** * PathChildren is an {@code Observable} which takes events from a {@code PathChildrenCache} @@ -49,8 +52,7 @@ public class PathChildren { cache = new PathChildrenCache(curatorFramework, znodePath, true); try { cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); - } - catch (Exception ex) { + } catch (Exception ex) { subscriber.onError(ex); } @@ -61,6 +63,19 @@ public class PathChildren { } }); } + }).doOnUnsubscribe(new Action0() { + @Override + public void call() { + /* properly close out our cache object when our subscriber leaves */ + if (cache instanceof PathChildrenCache) { + try { + cache.close(); + } catch (IOException exception) { + /* Swallow, we don't need this exception */ + } + } + + } }); } } diff --git a/src/test/groovy/com/github/reiseburo/rx/curator/PathChildrenSpec.groovy b/src/test/groovy/com/github/reiseburo/rx/curator/PathChildrenSpec.groovy index 973efa1..f1c0ef7 100644 --- a/src/test/groovy/com/github/reiseburo/rx/curator/PathChildrenSpec.groovy +++ b/src/test/groovy/com/github/reiseburo/rx/curator/PathChildrenSpec.groovy @@ -58,12 +58,14 @@ class PathChildrenIntegrationSpec extends Specification { def "an initialization event should be received"() { given: boolean received = false + Subscription subscription Observable observable = PathChildren.with(curator).watch('/') when: - observable.subscribe { PathChildrenCacheEvent ev -> + subscription = observable.subscribe { PathChildrenCacheEvent ev -> if (ev.type == PathChildrenCacheEvent.Type.INITIALIZED) { received = true + subscription.unsubscribe() } }