Experimenting with observing ChildData and turning into a stream of Brokers

This commit is contained in:
R. Tyler Croy 2015-09-04 15:24:35 -07:00
parent f1a539541d
commit dfbb26399d
No known key found for this signature in database
GPG Key ID: 1426C7DC3F51E16F
5 changed files with 147 additions and 20 deletions

View File

@ -33,10 +33,15 @@ dependencies {
compile 'com.github.reiseburo:rx-curator:[0.1.0,1.0)'
compile 'com.fasterxml.jackson.core:jackson-core:[2.6.1,2.7)'
compile 'com.fasterxml.jackson.core:jackson-databind:[2.6.1,2.7)'
testCompile "org.spockframework:spock-core:1.0-groovy-2.4"
testCompile 'cglib:cglib-nodep:3.1'
testCompile 'org.apache.curator:curator-test:[2.7.1,2.8)'
experimentsCompile 'org.codehaus.groovy:groovy-all:[2.4.4,2.5)'
experimentsCompile sourceSets.main.output
}
idea {

View File

@ -1,9 +1,12 @@
package rx.curator
import com.fasterxml.jackson.databind.ObjectMapper
import com.github.reiseburo.beetle.Broker
import com.github.reiseburo.rx.curator.PathChildren
import org.apache.curator.RetryPolicy
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.cache.ChildData
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent
import org.apache.curator.retry.RetryNTimes
@ -19,14 +22,18 @@ class PathCacheObservable {
boolean waitFor = true
PathChildren.with(curator).watch('/brokers')
PathChildren.with(curator).watch('/brokers/ids')
.flatMap({ PathChildrenCacheEvent ev ->
if (ev.type == PathChildrenCacheEvent.Type.CHILD_ADDED) {
return Observable.from(ev.data)
}
})
.subscribe({
println "Rec: ${it}"
.flatMap({ ChildData data ->
Broker broker = Broker.fromJSON(new String(data.data))
return Observable.from(broker.inferIdFromPath(data.path))
})
.subscribe({ Broker b ->
println b
waitFor = false
})

View File

@ -1,31 +1,109 @@
package com.github.reiseburo.beetle;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import java.io.IOException;
/**
* Simple POJO containing informtaion about a Kafka broker
*/
@JsonSerialize
@JsonIgnoreProperties(ignoreUnknown=true)
public class Broker {
private String host;
private String brokerId;
private int port;
private int jmxPort;
@JsonProperty
public String getHost() {
return this.host;
}
@JsonProperty
public int getPort() {
return port;
}
@JsonProperty(value="jmx_port")
public int getJmxPort() {
return jmxPort;
}
@JsonProperty
public String getBrokerId() {
return brokerId;
}
@JsonProperty
public void setJmxPort(int jmxPort) {
this.jmxPort = jmxPort;
}
@JsonProperty
public void setHost(String host) {
this.host = host;
}
@JsonProperty
public void setBrokerId(String brokerId) {
this.brokerId = brokerId;
}
@JsonProperty
public void setPort(int port) {
this.port = port;
}
/**
* Empty constructor for jackson-databind
*/
public Broker() {
}
private Broker(Builder builder) {
this.brokerId = builder.brokerId;
this.host = builder.host;
this.port = builder.port;
this.jmxPort = builder.jmxPort;
}
public Broker inferIdFromPath(String znodePath) {
String[] parts = znodePath.split("/");
brokerId = parts[parts.length - 1];
return this;
}
/**
* @return Debugging representation of the Broker
*/
public String toString() {
return String.format("<Broker:%d (%s %s:%d [jmx:%d])>",
hashCode(), brokerId, host, port, jmxPort);
}
public static Builder builder() {
return new Builder();
}
/**
* Convert the JSON that Kafka stores in Zookeeper into a Broker representation
*
* @param jsonBuffer
* @return
* @throws IOException
*/
public static Broker fromJSON(String jsonBuffer) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(jsonBuffer, Broker.class);
}
/**
* Builder
*/
public static class Builder {
private int port;
private int jmxPort;
@ -56,20 +134,4 @@ public class Broker {
return new Broker(this);
}
}
public static Builder builder() {
return new Builder();
}
private Broker(Builder builder) {
this.brokerId = builder.brokerId;
this.host = builder.host;
this.port = builder.port;
this.jmxPort = builder.jmxPort;
}
public String toString() {
return String.format("<Broker:%d (%s %s:%d [jmx:%d])>",
hashCode(), brokerId, host, port, jmxPort);
}
}

View File

@ -5,6 +5,31 @@ import spock.lang.*
/**
*/
class BrokerSpec extends Specification {
def "fromJSON() with proper JSON should return the expected Broker"() {
given:
final String json = '{"jmx_port":9999,"timestamp":"1428168559585","host":"kafka.example.org","version":1,"port":6667}'
Broker broker
when:
broker = Broker.fromJSON(json)
then:
broker instanceof Broker
broker.host == 'kafka.example.org'
broker.port == 6667
broker.jmxPort == 9999
}
def "inferIdFromPath() should work properly"() {
given:
Broker broker = Broker.builder().build()
when:
broker.inferIdFromPath('/brokers/ids/123')
then:
broker.brokerId == '123'
}
}
class BrokerBuilderSpec extends Specification {

View File

@ -1,6 +1,9 @@
package com.github.reiseburo.beetle.internal
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.RetryOneTime
import org.apache.curator.test.TestingServer
import spock.lang.*
import rx.Observable
@ -46,3 +49,28 @@ class BrokersMonitorSpec extends Specification {
subscriber.assertError(NullPointerException.class)
}
}
class BrokersMonitorIntegrationSpec extends Specification {
CuratorFramework curator
TestingServer server
def setup() {
server = new TestingServer()
curator = CuratorFrameworkFactory.newClient(server.connectString, new RetryOneTime())
}
def cleanup() {
curator?.close()
server?.close()
}
@Ignore
def "foo"() {
given: '/brokers/ids exists'
final String brokersPath = '/brokers/ids'
curator.create().creatingParentsIfNeeded().forPath(brokersPath)
expect:
true
}
}