Checkpoint some experimenting with a BrokerMonitor

Still figuring out how to map the PathCache concept from Curator into RxJava
observables
This commit is contained in:
R. Tyler Croy 2015-09-04 08:01:12 -07:00
parent f4e96ebace
commit 7e2d1691ca
No known key found for this signature in database
GPG Key ID: 1426C7DC3F51E16F
4 changed files with 129 additions and 0 deletions

View File

@ -0,0 +1,51 @@
package com.github.reiseburo.beetle.internal;
import org.apache.curator.framework.CuratorFramework;
import rx.Observable;
import rx.Subscriber;
/**
* BrokersMonitor exists largely create an Observable<BrokersMonitor> which
* will properly watch Zookeeper for changes in the /brokers subtree
*/
public class BrokersMonitor {
/**
* Internal constructor for created a BrokersMonitor object.
*
* Use BrokersMonitor.observe() to subscribe to instances instead
*
* @param curator
*/
protected BrokersMonitor(CuratorFramework curator) {
if (curator == null) {
throw new NullPointerException("`curator` cannot be null");
}
}
/**
* Create an {@code Observable<T>} for watching the Zookeeper connection
* associated with the given {@code CuratorFramework} object.
*
* Subscribers will receive onError if the BrokersMonitor could not be constructed
*
* Subscribers will never receive onCompleted
*
* @param curator
* @return
*/
public static Observable<ZKBrokerSet> observe(final CuratorFramework curator) {
return Observable.create(new Observable.OnSubscribe<ZKBrokerSet>() {
@Override
public void call(Subscriber<? super ZKBrokerSet> subscriber) {
try {
BrokersMonitor monitor = new BrokersMonitor(curator);
subscriber.onNext(null);
}
catch (NullPointerException npe) {
subscriber.onError(npe);
}
}
});
}
}

View File

@ -0,0 +1,22 @@
package com.github.reiseburo.beetle.internal;
import com.github.reiseburo.beetle.Broker;
import org.apache.curator.framework.CuratorFramework;
import java.util.Set;
/**
*/
public class ZKBrokerSet {
protected Set<Broker> brokers;
protected CuratorFramework activeCurator;
public Set<Broker> getBrokers() {
return brokers;
}
public CuratorFramework getCurator() {
return activeCurator;
}
}

View File

@ -0,0 +1,48 @@
package com.github.reiseburo.beetle.internal
import org.apache.curator.framework.CuratorFramework
import spock.lang.*
import rx.Observable
import rx.observers.TestSubscriber
/**
*/
class BrokersMonitorSpec extends Specification {
CuratorFramework curator
TestSubscriber subscriber
def setup() {
curator = Mock(CuratorFramework)
subscriber = new TestSubscriber()
}
def "observe(curator) gives me an Observable"() {
expect:
BrokersMonitor.observe(curator) instanceof Observable<BrokersMonitor>
}
@Ignore('in progress')
def "observe(curator) gives the subscriber a ZKBrokerSet"() {
given:
ZKBrokerSet brokers
Observable o = BrokersMonitor.observe(curator)
when:
o.subscribe { ZKBrokerSet set -> brokers = set }
then:
brokers instanceof ZKBrokerSet
}
def "observe(curator) with a null will call the subscribers onError"() {
given:
Observable o = BrokersMonitor.observe(null)
when:
o.subscribe(subscriber)
then:
subscriber.assertError(NullPointerException.class)
}
}

View File

@ -0,0 +1,8 @@
package com.github.reiseburo.beetle.internal
import spock.lang.*
/**
*/
class ZKBrokerSetSpec extends Specification {
}