mirror of https://github.com/reiseburo/beetle
Sketching out some more example prototypes
This commit is contained in:
parent
450d0bd0c9
commit
36654bd721
40
README.adoc
40
README.adoc
|
@ -44,24 +44,52 @@ Subscribers, e.g.g
|
||||||
.Consumer.java
|
.Consumer.java
|
||||||
[source, java]
|
[source, java]
|
||||||
----
|
----
|
||||||
/* Using Java 8 Lambda syntax for conciseness' sake */
|
/*
|
||||||
LocateBrokers.zookeeper("localhost:2181")
|
* Prototype code showing how a typical end-user might use Beetle
|
||||||
.map(broker -> TopicSubscription("some-topic"))
|
*/
|
||||||
|
|
||||||
|
Brokers.fromZookeeper("localhost:2181")
|
||||||
|
/* assuming a custom subscribe() operator exists in Beetle */
|
||||||
|
.subscribe("some-topic")
|
||||||
/* assuming a custom consume() operator exists in Beetle */
|
/* assuming a custom consume() operator exists in Beetle */
|
||||||
.consume(message -> doSomethingWithMessage(message))
|
.consume(message -> doSomethingWithMessage(message))
|
||||||
.map(message -> message.commitOffset());
|
.map(message -> message.commitOffset());
|
||||||
----
|
----
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
.ZookeeperlessConsumer.java
|
.ZookeeperlessConsumer.java
|
||||||
[source, java]
|
[source, java]
|
||||||
----
|
----
|
||||||
Broker.just('localhost:6667')
|
/*
|
||||||
.map(broker -> TopicSubscription("some-topic")
|
* Assuming we already know which broker we want to talk to and
|
||||||
|
* we don't care at all about leader changes or committing offsets
|
||||||
|
*/
|
||||||
|
long startOffset = 0;
|
||||||
|
|
||||||
|
Brokers.just('localhost:6667')
|
||||||
|
.subscribe("some-topic", startOffset)
|
||||||
.consume(m -> doSomethingWithMessage(m))
|
.consume(m -> doSomethingWithMessage(m))
|
||||||
.map(m -> m.commitOffset());
|
|
||||||
----
|
----
|
||||||
|
|
||||||
|
|
||||||
|
.LowLevelBeetle.java
|
||||||
|
----
|
||||||
|
/*
|
||||||
|
* The following is a prototype of some ideas for how the above examples
|
||||||
|
* might be implemented internally
|
||||||
|
*/
|
||||||
|
CuratorFramework cf = CuratorFrameworkFactory.newClient("localhost:2181");
|
||||||
|
|
||||||
|
BrokersObserver.observe(cf)
|
||||||
|
.subscribe(brokers -> TopicsObserver.observe(cf, brokers))
|
||||||
|
.subscribe(partitions -> ConsumerObserver.observe(cf, partitions))
|
||||||
|
.map(message ->
|
||||||
|
doCustomBehaviorWith(message))
|
||||||
|
.map(message -> m.commitOffsetTo(cf));
|
||||||
|
----
|
||||||
|
|
||||||
|
|
||||||
== Similar Projects
|
== Similar Projects
|
||||||
|
|
||||||
. link:https://github.com/cjdev/kafka-rx[kafka-rx]: Scala-based client which
|
. link:https://github.com/cjdev/kafka-rx[kafka-rx]: Scala-based client which
|
||||||
|
|
Loading…
Reference in New Issue