diff --git a/README.adoc b/README.adoc index c623fd6..43afb7f 100644 --- a/README.adoc +++ b/README.adoc @@ -44,24 +44,52 @@ Subscribers, e.g.g .Consumer.java [source, java] ---- -/* Using Java 8 Lambda syntax for conciseness' sake */ -LocateBrokers.zookeeper("localhost:2181") - .map(broker -> TopicSubscription("some-topic")) +/* + * Prototype code showing how a typical end-user might use Beetle + */ + +Brokers.fromZookeeper("localhost:2181") + /* assuming a custom subscribe() operator exists in Beetle */ + .subscribe("some-topic") /* assuming a custom consume() operator exists in Beetle */ .consume(message -> doSomethingWithMessage(message)) .map(message -> message.commitOffset()); ---- + .ZookeeperlessConsumer.java [source, java] ---- -Broker.just('localhost:6667') - .map(broker -> TopicSubscription("some-topic") +/* + * Assuming we already know which broker we want to talk to and + * we don't care at all about leader changes or committing offsets + */ +long startOffset = 0; + +Brokers.just('localhost:6667') + .subscribe("some-topic", startOffset) .consume(m -> doSomethingWithMessage(m)) - .map(m -> m.commitOffset()); ---- + +.LowLevelBeetle.java +---- +/* + * The following is a prototype of some ideas for how the above examples + * might be implemented internally + */ +CuratorFramework cf = CuratorFrameworkFactory.newClient("localhost:2181"); + +BrokersObserver.observe(cf) + .subscribe(brokers -> TopicsObserver.observe(cf, brokers)) + .subscribe(partitions -> ConsumerObserver.observe(cf, partitions)) + .map(message -> + doCustomBehaviorWith(message)) + .map(message -> m.commitOffsetTo(cf)); +---- + + == Similar Projects . link:https://github.com/cjdev/kafka-rx[kafka-rx]: Scala-based client which