= Beetle image:https://travis-ci.org/reiseburo/beetle.svg?branch=master["Build Status", link="https://travis-ci.org/reiseburo/beetle"] Beetle is a somewhat higher level Java API on top of the client libraries distributed distributed with link:http://kafka.apache.org[Apache Kafka]. The goal of this library is not to replace the use of those libraries, but to wrap the library in a more easy to use package. == System Requirements * JDK7 or later == Hacking This project uses link:http://gradle.org[Gradle] so building and testing should be as easy as executing: % ./gradlew == Design/Notes/Thoughts NOTE: Right now this section is very much just a brain-dump/work-in-progress What is fundamentally missing from the upstream Kafka clients is an evented view on the world. Despite Zookeeper and Kafka's models effectively being event-driven, link:https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example[implementing a lower-level SimpleConsumer] utilizes busy-loops and rather disjointed logic for reconnects and error handling. The link:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example[higher-level consumer API] is also awkward to use as far as receiving messages (using an iterator) and handling parallel operations (stuffing a thread pool somewhere for receiving). A high-level Kafka consumer API maps rather nicely to the link:https://github.com/ReactiveX/RxJava[RxJava] usage model of Observers and Subscribers, e.g.g .Consumer.java [source, java] ---- /* * Prototype code showing how a typical end-user might use Beetle */ Brokers.fromZookeeper("localhost:2181") /* assuming a custom subscribe() operator exists in Beetle */ .subscribe("some-topic") /* assuming a custom consume() operator exists in Beetle */ .consume(message -> doSomethingWithMessage(message)) .map(message -> message.commitOffset()); ---- .ZookeeperlessConsumer.java [source, java] ---- /* * Assuming we already know which broker we want to talk to and * we don't care at all about leader changes or committing offsets */ long startOffset = 0; Brokers.just('localhost:6667') .subscribe("some-topic", startOffset) .consume(m -> doSomethingWithMessage(m)) ---- .LowLevelBeetle.java [source, java] ---- /* * The following is a prototype of some ideas for how the above examples * might be implemented internally */ CuratorFramework cf = CuratorFrameworkFactory.newClient("localhost:2181"); BrokersObserver.observe(cf) .subscribe(brokers -> TopicsObserver.observe(cf, brokers)) .subscribe(partitions -> ConsumerObserver.observe(cf, partitions)) .map(message -> doCustomBehaviorWith(message)) .map(message -> m.commitOffsetTo(cf)); ---- == Similar Projects . link:https://github.com/cjdev/kafka-rx[kafka-rx]: Scala-based client which provides a push alternative to kafka's pull-based stream