From dfbb26399d19fe1bf99bbda852d8253afb13e968 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 4 Sep 2015 15:24:35 -0700 Subject: [PATCH] Experimenting with observing ChildData and turning into a stream of Brokers --- build.gradle | 5 + .../rx/curator/PathCacheObservable.groovy | 13 ++- .../com/github/reiseburo/beetle/Broker.java | 96 +++++++++++++++---- .../github/reiseburo/beetle/BrokerSpec.groovy | 25 +++++ .../beetle/internal/BrokersMonitorSpec.groovy | 28 ++++++ 5 files changed, 147 insertions(+), 20 deletions(-) diff --git a/build.gradle b/build.gradle index 7b55fcd..a2932f0 100644 --- a/build.gradle +++ b/build.gradle @@ -33,10 +33,15 @@ dependencies { compile 'com.github.reiseburo:rx-curator:[0.1.0,1.0)' + compile 'com.fasterxml.jackson.core:jackson-core:[2.6.1,2.7)' + compile 'com.fasterxml.jackson.core:jackson-databind:[2.6.1,2.7)' + testCompile "org.spockframework:spock-core:1.0-groovy-2.4" testCompile 'cglib:cglib-nodep:3.1' + testCompile 'org.apache.curator:curator-test:[2.7.1,2.8)' experimentsCompile 'org.codehaus.groovy:groovy-all:[2.4.4,2.5)' + experimentsCompile sourceSets.main.output } idea { diff --git a/src/experiments/groovy/rx/curator/PathCacheObservable.groovy b/src/experiments/groovy/rx/curator/PathCacheObservable.groovy index d80d433..ebcb556 100644 --- a/src/experiments/groovy/rx/curator/PathCacheObservable.groovy +++ b/src/experiments/groovy/rx/curator/PathCacheObservable.groovy @@ -1,9 +1,12 @@ package rx.curator +import com.fasterxml.jackson.databind.ObjectMapper +import com.github.reiseburo.beetle.Broker import com.github.reiseburo.rx.curator.PathChildren import org.apache.curator.RetryPolicy import org.apache.curator.framework.CuratorFramework import org.apache.curator.framework.CuratorFrameworkFactory +import org.apache.curator.framework.recipes.cache.ChildData import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent import org.apache.curator.retry.RetryNTimes @@ -19,14 +22,18 @@ class PathCacheObservable { boolean waitFor = true - PathChildren.with(curator).watch('/brokers') + PathChildren.with(curator).watch('/brokers/ids') .flatMap({ PathChildrenCacheEvent ev -> if (ev.type == PathChildrenCacheEvent.Type.CHILD_ADDED) { return Observable.from(ev.data) } }) - .subscribe({ - println "Rec: ${it}" + .flatMap({ ChildData data -> + Broker broker = Broker.fromJSON(new String(data.data)) + return Observable.from(broker.inferIdFromPath(data.path)) + }) + .subscribe({ Broker b -> + println b waitFor = false }) diff --git a/src/main/java/com/github/reiseburo/beetle/Broker.java b/src/main/java/com/github/reiseburo/beetle/Broker.java index 3ed975b..5b7b98d 100644 --- a/src/main/java/com/github/reiseburo/beetle/Broker.java +++ b/src/main/java/com/github/reiseburo/beetle/Broker.java @@ -1,31 +1,109 @@ package com.github.reiseburo.beetle; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; + +import java.io.IOException; + /** * Simple POJO containing informtaion about a Kafka broker */ +@JsonSerialize +@JsonIgnoreProperties(ignoreUnknown=true) public class Broker { private String host; - private String brokerId; private int port; private int jmxPort; + @JsonProperty public String getHost() { return this.host; } + @JsonProperty public int getPort() { return port; } + @JsonProperty(value="jmx_port") public int getJmxPort() { return jmxPort; } + @JsonProperty public String getBrokerId() { return brokerId; } + @JsonProperty + public void setJmxPort(int jmxPort) { + this.jmxPort = jmxPort; + } + + @JsonProperty + public void setHost(String host) { + this.host = host; + } + + @JsonProperty + public void setBrokerId(String brokerId) { + this.brokerId = brokerId; + } + + @JsonProperty + public void setPort(int port) { + this.port = port; + } + + /** + * Empty constructor for jackson-databind + */ + public Broker() { + } + + private Broker(Builder builder) { + this.brokerId = builder.brokerId; + this.host = builder.host; + this.port = builder.port; + this.jmxPort = builder.jmxPort; + } + + public Broker inferIdFromPath(String znodePath) { + String[] parts = znodePath.split("/"); + brokerId = parts[parts.length - 1]; + return this; + } + + /** + * @return Debugging representation of the Broker + */ + public String toString() { + return String.format("", + hashCode(), brokerId, host, port, jmxPort); + } + + public static Builder builder() { + return new Builder(); + } + + /** + * Convert the JSON that Kafka stores in Zookeeper into a Broker representation + * + * @param jsonBuffer + * @return + * @throws IOException + */ + public static Broker fromJSON(String jsonBuffer) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(jsonBuffer, Broker.class); + } + + /** + * Builder + */ public static class Builder { private int port; private int jmxPort; @@ -56,20 +134,4 @@ public class Broker { return new Broker(this); } } - - public static Builder builder() { - return new Builder(); - } - - private Broker(Builder builder) { - this.brokerId = builder.brokerId; - this.host = builder.host; - this.port = builder.port; - this.jmxPort = builder.jmxPort; - } - - public String toString() { - return String.format("", - hashCode(), brokerId, host, port, jmxPort); - } } diff --git a/src/test/groovy/com/github/reiseburo/beetle/BrokerSpec.groovy b/src/test/groovy/com/github/reiseburo/beetle/BrokerSpec.groovy index a87200f..96eac2c 100644 --- a/src/test/groovy/com/github/reiseburo/beetle/BrokerSpec.groovy +++ b/src/test/groovy/com/github/reiseburo/beetle/BrokerSpec.groovy @@ -5,6 +5,31 @@ import spock.lang.* /** */ class BrokerSpec extends Specification { + def "fromJSON() with proper JSON should return the expected Broker"() { + given: + final String json = '{"jmx_port":9999,"timestamp":"1428168559585","host":"kafka.example.org","version":1,"port":6667}' + Broker broker + + when: + broker = Broker.fromJSON(json) + + then: + broker instanceof Broker + broker.host == 'kafka.example.org' + broker.port == 6667 + broker.jmxPort == 9999 + } + + def "inferIdFromPath() should work properly"() { + given: + Broker broker = Broker.builder().build() + + when: + broker.inferIdFromPath('/brokers/ids/123') + + then: + broker.brokerId == '123' + } } class BrokerBuilderSpec extends Specification { diff --git a/src/test/groovy/com/github/reiseburo/beetle/internal/BrokersMonitorSpec.groovy b/src/test/groovy/com/github/reiseburo/beetle/internal/BrokersMonitorSpec.groovy index ef4180a..1614f62 100644 --- a/src/test/groovy/com/github/reiseburo/beetle/internal/BrokersMonitorSpec.groovy +++ b/src/test/groovy/com/github/reiseburo/beetle/internal/BrokersMonitorSpec.groovy @@ -1,6 +1,9 @@ package com.github.reiseburo.beetle.internal import org.apache.curator.framework.CuratorFramework +import org.apache.curator.framework.CuratorFrameworkFactory +import org.apache.curator.retry.RetryOneTime +import org.apache.curator.test.TestingServer import spock.lang.* import rx.Observable @@ -46,3 +49,28 @@ class BrokersMonitorSpec extends Specification { subscriber.assertError(NullPointerException.class) } } + +class BrokersMonitorIntegrationSpec extends Specification { + CuratorFramework curator + TestingServer server + + def setup() { + server = new TestingServer() + curator = CuratorFrameworkFactory.newClient(server.connectString, new RetryOneTime()) + } + + def cleanup() { + curator?.close() + server?.close() + } + + @Ignore + def "foo"() { + given: '/brokers/ids exists' + final String brokersPath = '/brokers/ids' + curator.create().creatingParentsIfNeeded().forPath(brokersPath) + + expect: + true + } +}