Rework documentation

Enforce a more consistent style on the docs. Also correct a number of
typos and stale information.
This commit is contained in:
Nikhil Benesch 2020-05-16 01:44:31 -04:00
parent d11dea5956
commit 95f9bc2f98
25 changed files with 1148 additions and 883 deletions

49
CONTRIBUTING.md Normal file
View File

@ -0,0 +1,49 @@
# Maintainer and contributor instructions
## Compiling from source
To compile from source, you'll have to initialize the submodule containing
librdkafka:
```bash
git submodule update --init
```
and then compile using `cargo`, selecting the features that you want.
Example:
```bash
cargo build --features "ssl gssapi"
```
## Tests
### Unit tests
The unit tests can run without a Kafka broker present:
```bash
cargo test --lib
```
### Automatic testing
rust-rdkafka contains a suite of tests which is automatically executed by travis in
docker-compose. Given the interaction with C code that rust-rdkafka has to do, tests
are executed in valgrind to check eventual memory errors and leaks.
To run the full suite using docker-compose:
```bash
./test_suite.sh
```
To run locally, instead:
```bash
KAFKA_HOST="kafka_server:9092" cargo test
```
In this case there is a broker expected to be running on `KAFKA_HOST`.
The broker must be configured with default partition number 3 and topic
autocreation in order for the tests to succeed.

221
README.md
View File

@ -6,14 +6,16 @@
[![coverate](https://codecov.io/gh/fede1024/rust-rdkafka/graphs/badge.svg?branch=master)](https://codecov.io/gh/fede1024/rust-rdkafka/)
[![Join the chat at https://gitter.im/rust-rdkafka/Lobby](https://badges.gitter.im/rust-rdkafka/Lobby.svg)](https://gitter.im/rust-rdkafka/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
A fully asynchronous, [futures]-based Kafka client library for Rust based on [librdkafka].
A fully asynchronous, [futures]-enabled [Apache Kafka] client
library for Rust based on [librdkafka].
## The library
`rust-rdkafka` provides a safe Rust interface to librdkafka. The master branch is currently based on librdkafka 1.3.0.
`rust-rdkafka` provides a safe Rust interface to librdkafka. The master
branch is currently based on librdkafka 1.4.2.
### Documentation
- [Current master branch](https://fede1024.github.io/rust-rdkafka/)
- [Latest release](https://docs.rs/rdkafka/)
- [Changelog](https://github.com/fede1024/rust-rdkafka/blob/master/changelog.md)
@ -21,23 +23,32 @@ A fully asynchronous, [futures]-based Kafka client library for Rust based on [li
The main features provided at the moment are:
- Support for all Kafka versions since 0.8.x. For more information about broker compatibility options, check the [librdkafka documentation].
- Support for all Kafka versions since 0.8.x. For more information about
broker compatibility options, check the [librdkafka
documentation][broker-compat].
- Consume from single or multiple topics.
- Automatic consumer rebalancing.
- Customizable rebalance, with pre and post rebalance callbacks.
- Synchronous or asynchronous message production.
- Customizable offset commit.
- Access to cluster metadata (list of topic-partitions, replicas, active brokers etc).
- Access to group metadata (list groups, list members of groups, hostnames etc).
- Create and delete topics and add and edit partitions.
- Alter broker and topic configurations.
- Access to cluster metadata (list of topic-partitions, replicas, active
brokers etc).
- Access to group metadata (list groups, list members of groups, hostnames,
etc.).
- Access to producer and consumer metrics, errors and callbacks.
[librdkafka documentation]: https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility
### One million messages per second
`rust-rdkafka` is designed to be easy and safe to use thanks to the abstraction layer written in Rust, while at the same time being extremely fast thanks to the librdkafka C library.
`rust-rdkafka` is designed to be easy and safe to use thanks to the
abstraction layer written in Rust, while at the same time being extremely
fast thanks to the librdkafka C library.
Here are some benchmark results using the rust-rdkafka `BaseProducer`, sending data to a single Kafka 0.11 process running in localhost (default configuration, 3 partitions). Hardware: Dell laptop, with Intel Core i7-4712HQ @ 2.30GHz.
Here are some benchmark results using the [`BaseProducer`],
sending data to a single Kafka 0.11 process running in localhost (default
configuration, 3 partitions). Hardware: Dell laptop, with Intel Core
i7-4712HQ @ 2.30GHz.
- Scenario: produce 5 million messages, 10 bytes each, wait for all of them to be acked
- 1045413 messages/s, 9.970 MB/s (average over 5 runs)
@ -45,58 +56,75 @@ Here are some benchmark results using the rust-rdkafka `BaseProducer`, sending d
- Scenario: produce 100000 messages, 10 KB each, wait for all of them to be acked
- 24623 messages/s, 234.826 MB/s (average over 5 runs)
For more numbers, check out the [kafka-benchmark](https://github.com/fede1024/kafka-benchmark) project.
For more numbers, check out the [kafka-benchmark] project.
### Client types
`rust-rdkafka` provides low level and high level consumers and producers. Low level:
`rust-rdkafka` provides low level and high level consumers and producers.
* [`BaseConsumer`]: simple wrapper around the librdkafka consumer. It requires to be periodically `poll()`ed in order to execute callbacks, rebalances and to receive messages.
* [`BaseProducer`]: simple wrapper around the librdkafka producer. As in the consumer case, the user must call `poll()` periodically to execute delivery callbacks.
* [`ThreadedProducer`]: `BaseProducer` with a separate thread dedicated to polling the producer.
Low level:
* [`BaseConsumer`]: a simple wrapper around the librdkafka consumer. It
must be periodically `poll()`ed in order to execute callbacks, rebalances
and to receive messages.
* [`BaseProducer`]: a simple wrapper around the librdkafka producer. As in
the consumer case, the user must call `poll()` periodically to execute
delivery callbacks.
* [`ThreadedProducer`]: a `BaseProducer` with a separate thread dedicated to
polling the producer.
High level:
* [`StreamConsumer`]: it returns a [`stream`] of messages and takes care of polling the consumer internally.
* [`FutureProducer`]: it returns a [`future`] that will be completed once the message is delivered to Kafka (or failed).
* [`StreamConsumer`]: a [`Stream`] of messages that takes care of
polling the consumer automatically.
* [`FutureProducer`]: a [`Future`] that will be completed once
the message is delivered to Kafka (or failed).
For more information about consumers and producers, refer to their module-level documentation.
For more information about consumers and producers, refer to their
module-level documentation.
*Warning*: the library is under active development and the APIs are likely to change.
*Warning*: the library is under active development and the APIs are likely
to change.
### Asynchronous data processing with Tokio
[Tokio] is a platform for fast processing of asynchronous events in Rust. The interfaces exposed by the [`StreamConsumer`] and the [`FutureProducer`] allow rust-rdkafka users to easily integrate Kafka consumers and producers within the Tokio platform, and write asynchronous message processing code. Note that rust-rdkafka can be used without Tokio.
To see rust-rdkafka in action with Tokio, check out the [asynchronous processing example] in the examples folder.
[Tokio] is a platform for fast processing of asynchronous events in Rust.
The interfaces exposed by the [`StreamConsumer`] and the [`FutureProducer`]
allow rust-rdkafka users to easily integrate Kafka consumers and producers
within the Tokio platform, and write asynchronous message processing code.
Note that rust-rdkafka can be used without Tokio.
[Tokio]: https://tokio.rs/
[asynchronous processing example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/asynchronous_processing.rs
To see rust-rdkafka in action with Tokio, check out the
[asynchronous processing example] in the examples folder.
### At-least-once delivery
At-least-once delivery semantic is common in many streaming applications: every message is guaranteed to be processed at least once; in case of temporary failure, the message can be re-processed and/or re-delivered, but no message will be lost.
At-least-once delivery semantics are common in many streaming applications:
every message is guaranteed to be processed at least once; in case of
temporary failure, the message can be re-processed and/or re-delivered,
but no message will be lost.
In order to implement at-least-once delivery the stream processing application has to carefully commit the offset only once the message has been processed. Committing the offset too early, instead, might cause message loss, since upon recovery the consumer will start from the next message, skipping the one where the failure occurred.
In order to implement at-least-once delivery the stream processing
application has to carefully commit the offset only once the message has
been processed. Committing the offset too early, instead, might cause
message loss, since upon recovery the consumer will start from the next
message, skipping the one where the failure occurred.
To see how to implement at-least-once delivery with `rdkafka`, check out the [at-least-once delivery example] in the examples folder. To know more about delivery semantics, check the [message delivery semantics] chapter in the Kafka documentation.
[at-least-once delivery example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/at_least_once.rs
[message delivery semantics]: https://kafka.apache.org/0101/documentation.html#semantics
To see how to implement at-least-once delivery with `rdkafka`, check out the
[at-least-once delivery example] in the examples folder. To know more about
delivery semantics, check the [message delivery semantics] chapter in the
Kafka documentation.
### Users
Here are some of the projects using rust-rdkafka:
- [timely-dataflow]: a modular implementation of timely dataflow in Rust (you can also check the [blog post]).
- [timely-dataflow]: a distributed data-parallel compute engine. See also
the [blog post][timely-blog] announcing its Kafka integration.
- [kafka-view]: a web interface for Kafka clusters.
- [kafka-benchmark]: a high performance benchmarking tool for Kafka.
*If you are using rust-rdkafka, please let me know!*
[timely-dataflow]: https://github.com/frankmcsherry/timely-dataflow
[kafka-view]: https://github.com/fede1024/kafka-view
[kafka-benchmark]: https://github.com/fede1024/kafka-benchmark
[blog post]: https://github.com/frankmcsherry/blog/blob/master/posts/2017-11-08.md
*If you are using rust-rdkafka, please let us know!*
## Installation
@ -107,8 +135,8 @@ Add this to your `Cargo.toml`:
rdkafka = { version = "0.23", features = ["cmake-build"] }
```
This crate will compile librdkafka from sources and link it statically to your
executable. To compile librdkafka you'll need:
This crate will compile librdkafka from sources and link it statically to
your executable. To compile librdkafka you'll need:
* the GNU toolchain
* GNU `make`
@ -120,13 +148,12 @@ executable. To compile librdkafka you'll need:
* `libzstd-dev`: optional, *not* included by default (feature: `zstd-pkg-config`)
Note that using the CMake build system, via the `cmake-build` feature, is
**strongly** encouraged. The default build system has a [known
issue](rdkafka-sys/README.md#known-issues) that can cause corrupted builds.
encouraged if you can take the dependency on CMake.
By default a submodule with the librdkafka sources pinned to a specific commit
will be used to compile and statically link the library. The `dynamic-linking`
feature can be used to instead dynamically link rdkafka to the system's version
of librdkafka. Example:
By default a submodule with the librdkafka sources pinned to a specific
commit will be used to compile and statically link the library. The
`dynamic-linking` feature can be used to instead dynamically link rdkafka to
the system's version of librdkafka. Example:
```toml
[dependencies]
@ -134,103 +161,75 @@ rdkafka = { version = "0.23", features = ["dynamic-linking"] }
```
For a full listing of features, consult the [rdkafka-sys crate's
documentation](rdkafka-sys/README.md#features). All of rdkafka-sys features are
documentation][rdkafka-sys-features]. All of rdkafka-sys features are
re-exported as rdkafka features.
### Tokio dependency
The [`StreamConsumer`] and [`FutureProducer`] depend on Tokio, which can be a
heavyweight dependency for users who only intend to use the non-async/await
consumers or producers.
The `tokio` feature is enabled by default. To disable it, turn off default
features like so:
Some features of the [`StreamConsumer`] and [`FutureProducer`] depend on
Tokio, which can be a heavyweight dependency for users who only intend to
use the low-level consumers and producers. The Tokio integration is
enabled by default, but can be disabled by turning off default features:
```toml
[dependencies]
rdkafka = { version = "0.23", default-features = false }
```
## Compiling from sources
To compile from sources, you'll have to update the submodule containing librdkafka:
```bash
git submodule update --init
```
and then compile using `cargo`, selecting the features that you want. Example:
```bash
cargo build --features "ssl gssapi"
```
## Examples
You can find examples in the `examples` folder. To run them:
You can find examples in the [`examples`] folder. To run them:
```bash
cargo run --example <example_name> -- <example_args>
```
## Tests
### Unit tests
The unit tests can run without a Kafka broker present:
```bash
cargo test --lib
```
### Automatic testing
rust-rdkafka contains a suite of tests which is automatically executed by travis in
docker-compose. Given the interaction with C code that rust-rdkafka has to do, tests
are executed in valgrind to check eventual memory errors and leaks.
To run the full suite using docker-compose:
```bash
./test_suite.sh
```
To run locally, instead:
```bash
KAFKA_HOST="kafka_server:9092" cargo test
```
In this case there is a broker expected to be running on `KAFKA_HOST`.
The broker must be configured with default partition number 3 and topic autocreation in order
for the tests to succeed.
## Debugging
rust-rdkafka uses the `log` and `env_logger` crates to handle logging. Logging can be enabled
using the `RUST_LOG` environment variable, for example:
rust-rdkafka uses the [`log`] and [`env_logger`] crates to handle logging.
Logging can be enabled using the `RUST_LOG` environment variable, for
example:
```bash
RUST_LOG="librdkafka=trace,rdkafka::client=debug" cargo test
```
This will configure the logging level of librdkafka to trace, and the level of the client
module of the Rust client to debug. To actually receive logs from librdkafka, you also have to
set the `debug` option in the producer or consumer configuration (see librdkafka
[configuration](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)).
This will configure the logging level of librdkafka to trace, and the level
of the client module of the Rust client to debug. To actually receive logs
from librdkafka, you also have to set the `debug` option in the producer or
consumer configuration (see librdkafka
[configuration][librdkafka-config]).
To enable debugging in your project, make sure you initialize the logger with
`env_logger::init()` or equivalent.
To enable debugging in your project, make sure you initialize the logger
with `env_logger::init()`, or the equivalent for any `log`-compatible
logging framework.
[`BaseConsumer`]: https://docs.rs/rdkafka/*/rdkafka/consumer/base_consumer/struct.BaseConsumer.html
[`BaseProducer`]: https://docs.rs/rdkafka/*/rdkafka/producer/base_producer/struct.BaseProducer.html
[`ThreadedProducer`]: https://docs.rs/rdkafka/*/rdkafka/producer/base_producer/struct.ThreadedProducer.html
[`StreamConsumer`]: https://docs.rs/rdkafka/*/rdkafka/consumer/stream_consumer/struct.StreamConsumer.html
[`Future`]: https://doc.rust-lang.org/stable/std/future/trait.Future.html
[`FutureProducer`]: https://docs.rs/rdkafka/*/rdkafka/producer/future_producer/struct.FutureProducer.html
[`Stream`]: https://docs.rs/futures/*/futures/stream/trait.Stream.html
[`StreamConsumer`]: https://docs.rs/rdkafka/*/rdkafka/consumer/stream_consumer/struct.StreamConsumer.html
[`ThreadedProducer`]: https://docs.rs/rdkafka/*/rdkafka/producer/base_producer/struct.ThreadedProducer.html
[`log`]: https://docs.rs/log
[`env_logger`]: https://docs.rs/env_logger
[Apache Kafka]: https://kafka.apache.org
[asynchronous processing example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/asynchronous_processing.rs
[at-least-once delivery example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/at_least_once.rs
[broker-compat]: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility
[`examples`]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/
[futures]: https://github.com/rust-lang/futures-rs
[kafka-benchmark]: https://github.com/fede1024/kafka-benchmark
[kafka-benchmark]: https://github.com/fede1024/kafka-benchmark
[kafka-view]: https://github.com/fede1024/kafka-view
[librdkafka]: https://github.com/edenhill/librdkafka
[futures]: https://github.com/alexcrichton/futures-rs
[`future`]: https://docs.rs/futures/0.1.3/futures/trait.Future.html
[`stream`]: https://docs.rs/futures/0.1.3/futures/stream/trait.Stream.html
[librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
[message delivery semantics]: https://kafka.apache.org/0101/documentation.html#semantics
[rdkafka-sys-features]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#features
[rdkafka-sys-known-issues]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#known-issues
[timely-blog]: https://github.com/frankmcsherry/blog/blob/master/posts/2017-11-08.md
[timely-dataflow]: https://github.com/frankmcsherry/timely-dataflow
[Tokio]: https://tokio.rs/
## rdkafka-sys

View File

@ -27,10 +27,14 @@ def parse_template_file(path):
if line.startswith(INCLUDE_MARKER)][0]
except IndexError:
raise Exception("Missing include marker")
include_info = content[marker_position].split('$')
include_info = content[marker_position].strip().split('$')
doc_path = include_info[1]
start = None
if len(include_info) > 2:
start = include_info[2]
return Template(
header=content[0:marker_position], footer=content[marker_position+1:],
doc_path=include_info[1], start=include_info[2],
doc_path=include_info[1], start=start,
)
@ -42,10 +46,11 @@ output = sys.stdout
for line in template.header:
output.write(line)
for line in doc:
if line.startswith(template.start):
output.write(line)
break
if template.start:
for line in doc:
if line.startswith(template.start):
output.write(line)
break
for line in doc:
output.write(line)

View File

@ -0,0 +1,17 @@
# Maintainer and contributor instructions
## Bindings
To regenerate the bindings:
``` bash
git submodule update --init
cargo install bindgen
./update-bindings.sh
```
## Updating
To upgrade change the git submodule in `librdkafka`, check if new errors need to
be added to `helpers::primive_to_rd_kafka_resp_err_t` and update the version in
`Cargo.toml`.

View File

@ -1,25 +1,19 @@
# rdkafka-sys
Low level bindings to [librdkafka](https://github.com/edenhill/librdkafka).
Low level bindings to [librdkafka](https://github.com/edenhill/librdkafka),
a C library for the [Apache Kafka] protocol with producer, consumer, and
admin clients.
## Bindings
To regenerate the bindings:
``` bash
git submodule update --init
cargo install bindgen
./update-bindings.sh
```
For a safe wrapper, see the [rdkafka] crate.
## Version
The rdkafka-sys version number is in the format `X.Y.Z+RX.RY.RZ`, where `X.Y.Z`
is the version of this crate and follows SemVer conventions, while `RX.RY.RZ`
is the version of the bundled librdkafka.
The rdkafka-sys version number is in the format `X.Y.Z+RX.RY.RZ`, where
`X.Y.Z` is the version of this crate and follows SemVer conventions, while
`RX.RY.RZ` is the version of the bundled librdkafka.
Note that versions before v2.0.0+1.4.2 did not follow this convention, and
instead directly corresponded to the bundled librdkafka version.
instead directly correspond to the bundled librdkafka version.
## Build
@ -27,68 +21,66 @@ instead directly corresponded to the bundled librdkafka version.
* When any of librdkafka's optional dependencies are enabled, like libz or
OpenSSL, if you have multiple versions of that library installed upon your
system, librdkafka's build system may disagree with Cargo about which version
of the library to use! **This can result in subtly broken builds,** if
librdkafka compiles against the headers for one version but Cargo links
against a different version. For complete confidence when building release
binaries, use an environment like a Docker container or a chroot jail where
you can guarantee that only one version of each dependency is present.
Unfortunately, the current design of Cargo makes this nearly impossible to
fix.
* librdkafka's default build system, which uses a bespoke tool called mklove, is
somewhat unreliable, as enabling optional features is best-effort. If a
required dependency for an optional feature is not found, the feature will be
silently disabled ([details][mklove-bug]). **Using the CMake based build
system is strongly encouraged**, if you can take the dependency on CMake.
system, librdkafka's build system may disagree with Cargo about which
version of the library to use! **This can result in subtly broken
builds,** if librdkafka compiles against the headers for one version but
Cargo links against a different version. For complete confidence when
building release binaries, use an environment like a Docker container or a
chroot jail where you can guarantee that only one version of each
dependency is present. The current design of Cargo unfortunately makes
this nearly impossible to fix.
### Features
By default a submodule with the librdkafka sources pinned to a specific commit
will be used to compile and statically link the library.
By default a submodule with the librdkafka sources will be used to compile
and statically link the library.
The **`dynamic-linking`** feature can be used to link rdkafka to a locally
installed version of librdkafka: if the feature is enabled, the build script
will use `pkg-config` to check the version of the library installed in the
system, and it will configure the compiler to dynamically link against it.
The system version of librdkafka must exactly match the version of
librdkafka bundled with this crate.
The **`cmake-build`** feature builds librdkafka with its [CMake] build system,
rather than its default [mklove]-based build system. This feature requires that
CMake is installed on the build machine.
The **`cmake-build`** feature builds librdkafka with its [CMake] build
system, rather than its default [mklove]-based build system. This feature
requires that CMake is installed on the build machine.
The following features directly correspond to librdkafka features (i.e., flags
you would pass to `configure` if you were compiling manually).
The following features directly correspond to librdkafka features (i.e.,
flags you would pass to `configure` if you were compiling manually).
* The **`ssl`** feature enables SSL support. By default, the system's OpenSSL
library is dynamically linked, but static linking of the version bundled
with the openssl-sys crate can be requested with the `ssl-vendored` feature.
* The **`gssapi`** feature enables SASL GSSAPI support with Cyrus libsasl2.
By default the system's libsasl2 is dynamically linked, but static linking
of the version bundled with the sasl2-sys crate can be requested with the
`gssapi-vendored` feature.
* The **`ssl`** feature enables SSL support. By default, the system's
OpenSSL library is dynamically linked, but static linking of the version
bundled with the [openssl-sys] crate can be requested with the
`ssl-vendored` feature.
* The **`gssapi`** feature enables SASL GSSAPI support with Cyrus
libsasl2. By default the system's libsasl2 is dynamically linked, but
static linking of the version bundled with the [sasl2-sys] crate can be
requested with the `gssapi-vendored` feature.
* The **`libz`** feature enables support for zlib compression. This
feature is enabled by default. By default, the system's libz is dynamically
linked, but static linking of the version bundled with the libz-sys crate
can be requested with the `libz-static` feature.
feature is enabled by default. By default, the system's libz is
dynamically linked, but static linking of the version bundled with the
[libz-sys] crate can be requested with the `libz-static` feature.
* The **`zstd`** feature enables support for ZSTD compression. By default,
this builds and statically links the version bundled with the zstd-sys
crate, but dynamic linking of the system's version can be requested with the
`zstd-pkg-config` feature.
* The **`external-lz4`** feature statically links against the copy of liblz4
bundled with the lz4-sys crate. By default, librdkafka statically links
against its own bundled version of liblz4. Due to limitations with lz4-sys,
it is not yet possible to dynamically link against the system's version of
liblz4.
this builds and statically links the version bundled with the [zstd-sys]
crate, but dynamic linking of the system's version can be requested with
the `zstd-pkg-config` feature.
* The **`external-lz4`** feature statically links against the copy of
liblz4 bundled with the [lz4-sys] crate. By default, librdkafka
statically links against its own bundled version of liblz4. Due to
limitations with lz4-sys, it is not yet possible to dynamically link
against the system's version of liblz4.
All features are disabled by default unless noted otherwise above. The build
process is defined in [`build.rs`](build.rs).
## Updating
To upgrade change the git submodule in `librdkafka`, check if new errors
need to be added to `helpers::primive_to_rd_kafka_resp_err_t` and update
the version in `Cargo.toml`.
process is defined in [`build.rs`].
[`build.rs`]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/build.rs
[Apache Kafka]: https://kafka.apache.org
[CMake]: https://cmake.org
[libz-sys]: https://crates.io/crates/libz-sys
[lz4-sys]: https://crates.io/crates/lz4-sys
[mklove]: https://github.com/edenhill/mklove
[mklove-bug]: https://github.com/edenhill/librdkafka/pull/2640
[openssl-sys]: https://crates.io/crates/openssl-sys
[rdkafka]: https://docs.rs/rdkafka
[sasl2-sys]: https://docs.rs/sasl2-sys
[zstd-sys]: https://crates.io/crates/zstd-sys

View File

@ -1 +1,3 @@
__INCLUDE_RUST_DOC__$src/lib.rs$# rdkafka-sys
# rdkafka-sys
__INCLUDE_RUST_DOC__$src/lib.rs

View File

@ -1,3 +1,5 @@
//! Utility functions.
use crate::types::RDKafkaError;
use crate::types::RDKafkaError::*;
use crate::types::RDKafkaRespErr;

View File

@ -1,23 +1,17 @@
//! Low level bindings to [librdkafka](https://github.com/edenhill/librdkafka).
//! Low level bindings to [librdkafka](https://github.com/edenhill/librdkafka),
//! a C library for the [Apache Kafka] protocol with producer, consumer, and
//! admin clients.
//!
//! ## Bindings
//!
//! To regenerate the bindings:
//!
//! ``` bash
//! git submodule update --init
//! cargo install bindgen
//! ./update-bindings.sh
//! ```
//! For a safe wrapper, see the [rdkafka] crate.
//!
//! ## Version
//!
//! The rdkafka-sys version number is in the format `X.Y.Z+RX.RY.RZ`, where `X.Y.Z`
//! is the version of this crate and follows SemVer conventions, while `RX.RY.RZ`
//! is the version of the bundled librdkafka.
//! The rdkafka-sys version number is in the format `X.Y.Z+RX.RY.RZ`, where
//! `X.Y.Z` is the version of this crate and follows SemVer conventions, while
//! `RX.RY.RZ` is the version of the bundled librdkafka.
//!
//! Note that versions before v2.0.0+1.4.2 did not follow this convention, and
//! instead directly corresponded to the bundled librdkafka version.
//! instead directly correspond to the bundled librdkafka version.
//!
//! ## Build
//!
@ -25,73 +19,69 @@
//!
//! * When any of librdkafka's optional dependencies are enabled, like libz or
//! OpenSSL, if you have multiple versions of that library installed upon your
//! system, librdkafka's build system may disagree with Cargo about which version
//! of the library to use! **This can result in subtly broken builds,** if
//! librdkafka compiles against the headers for one version but Cargo links
//! against a different version. For complete confidence when building release
//! binaries, use an environment like a Docker container or a chroot jail where
//! you can guarantee that only one version of each dependency is present.
//! Unfortunately, the current design of Cargo makes this nearly impossible to
//! fix.
//!
//! * librdkafka's default build system, which uses a bespoke tool called mklove, is
//! somewhat unreliable, as it does not support out-of-tree builds. This means
//! that if you have multiple projects that depend on the same version of
//! `librdkafka`, they will share a build directory in `~/.cargo/registry`, and
//! builds from one project may corrupt builds from the other. **Using the CMake
//! based build system is strongly encouraged**, if you can take the dependency on
//! CMake.
//! system, librdkafka's build system may disagree with Cargo about which
//! version of the library to use! **This can result in subtly broken
//! builds,** if librdkafka compiles against the headers for one version but
//! Cargo links against a different version. For complete confidence when
//! building release binaries, use an environment like a Docker container or a
//! chroot jail where you can guarantee that only one version of each
//! dependency is present. The current design of Cargo unfortunately makes
//! this nearly impossible to fix.
//!
//! ### Features
//!
//! By default a submodule with the librdkafka sources pinned to a specific commit
//! will be used to compile and statically link the library.
//! By default a submodule with the librdkafka sources will be used to compile
//! and statically link the library.
//!
//! The **`dynamic-linking`** feature can be used to link rdkafka to a locally
//! installed version of librdkafka: if the feature is enabled, the build script
//! will use `pkg-config` to check the version of the library installed in the
//! system, and it will configure the compiler to dynamically link against it.
//! The system version of librdkafka must exactly match the version of
//! librdkafka bundled with this crate.
//!
//! The **`cmake-build`** feature builds librdkafka with its [CMake] build system,
//! rather than its default [mklove]-based build system. This feature requires that
//! CMake is installed on the build machine.
//! The **`cmake-build`** feature builds librdkafka with its [CMake] build
//! system, rather than its default [mklove]-based build system. This feature
//! requires that CMake is installed on the build machine.
//!
//! The following features directly correspond to librdkafka features (i.e., flags
//! you would pass to `configure` if you were compiling manually).
//! The following features directly correspond to librdkafka features (i.e.,
//! flags you would pass to `configure` if you were compiling manually).
//!
//! * The **`ssl`** feature enables SSL support. By default, the system's OpenSSL
//! library is dynamically linked, but static linking of the version bundled
//! with the openssl-sys crate can be requested with the `ssl-vendored` feature.
//! * The **`gssapi`** feature enables SASL GSSAPI support with Cyrus libsasl2.
//! This feature requires that libsasl2 is installed on the system, as there is
//! not yet a libsasl2-sys crate that can build and link against a bundled
//! copy of the library.
//! * The **`ssl`** feature enables SSL support. By default, the system's
//! OpenSSL library is dynamically linked, but static linking of the version
//! bundled with the [openssl-sys] crate can be requested with the
//! `ssl-vendored` feature.
//! * The **`gssapi`** feature enables SASL GSSAPI support with Cyrus
//! libsasl2. By default the system's libsasl2 is dynamically linked, but
//! static linking of the version bundled with the [sasl2-sys] crate can be
//! requested with the `gssapi-vendored` feature.
//! * The **`libz`** feature enables support for zlib compression. This
//! feature is enabled by default. By default, the system's libz is dynamically
//! linked, but static linking of the version bundled with the libz-sys crate
//! can be requested with the `libz-static` feature.
//! feature is enabled by default. By default, the system's libz is
//! dynamically linked, but static linking of the version bundled with the
//! [libz-sys] crate can be requested with the `libz-static` feature.
//! * The **`zstd`** feature enables support for ZSTD compression. By default,
//! this builds and statically links the version bundled with the zstd-sys
//! crate, but dynamic linking of the system's version can be requested with the
//! `zstd-pkg-config` feature.
//! * The **`external-lz4`** feature statically links against the copy of liblz4
//! bundled with the lz4-sys crate. By default, librdkafka statically links
//! against its own bundled version of liblz4. Due to limitations with lz4-sys,
//! it is not yet possible to dynamically link against the system's version of
//! liblz4.
//! this builds and statically links the version bundled with the [zstd-sys]
//! crate, but dynamic linking of the system's version can be requested with
//! the `zstd-pkg-config` feature.
//! * The **`external-lz4`** feature statically links against the copy of
//! liblz4 bundled with the [lz4-sys] crate. By default, librdkafka
//! statically links against its own bundled version of liblz4. Due to
//! limitations with lz4-sys, it is not yet possible to dynamically link
//! against the system's version of liblz4.
//!
//! All features are disabled by default unless noted otherwise above. The build
//! process is defined in [`build.rs`].
//!
//! ## Updating
//!
//! To upgrade change the git submodule in `librdkafka`, check if new errors
//! need to be added to `helpers::primive_to_rd_kafka_resp_err_t` and update
//! the version in `Cargo.toml`.
//!
//! [CMake]: https://cmake.org
//! [mklove]: https://github.com/edenhill/mklove
//! [`build.rs`]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/build.rs
//! [Apache Kafka]: https://kafka.apache.org
//! [CMake]: https://cmake.org
//! [libz-sys]: https://crates.io/crates/libz-sys
//! [lz4-sys]: https://crates.io/crates/lz4-sys
//! [mklove]: https://github.com/edenhill/mklove
//! [openssl-sys]: https://crates.io/crates/openssl-sys
//! [rdkafka]: https://docs.rs/rdkafka
//! [sasl2-sys]: https://docs.rs/sasl2-sys
//! [zstd-sys]: https://crates.io/crates/zstd-sys
#[cfg(feature = "openssl-sys")]
extern crate openssl_sys;
@ -108,6 +98,10 @@ extern crate zstd_sys;
#[cfg(feature = "lz4-sys")]
extern crate lz4_sys;
/// FFI bindings.
///
/// These bindings are automatically generated
/// with [bindgen](https://github.com/rust-lang/rust-bindgen).
#[allow(
non_camel_case_types,
non_upper_case_globals,

View File

@ -1,4 +1,4 @@
//! This module contains type aliases for types defined in the auto-generated bindings.
//! Aliases for types defined in the auto-generated bindings.
use std::convert::TryFrom;
use std::ffi::CStr;
@ -9,391 +9,392 @@ use crate::helpers;
// TYPES
/// Native rdkafka client
/// Native rdkafka client.
pub type RDKafka = bindings::rd_kafka_t;
/// Native rdkafka configuration
/// Native rdkafka configuration.
pub type RDKafkaConf = bindings::rd_kafka_conf_t;
/// Native rdkafka message
/// Native rdkafka message.
pub type RDKafkaMessage = bindings::rd_kafka_message_t;
/// Native rdkafka topic
/// Native rdkafka topic.
pub type RDKafkaTopic = bindings::rd_kafka_topic_t;
/// Native rdkafka topic configuration
/// Native rdkafka topic configuration.
pub type RDKafkaTopicConf = bindings::rd_kafka_topic_conf_t;
/// Native rdkafka topic partition
/// Native rdkafka topic partition.
pub type RDKafkaTopicPartition = bindings::rd_kafka_topic_partition_t;
/// Native rdkafka topic partition list
/// Native rdkafka topic partition list.
pub type RDKafkaTopicPartitionList = bindings::rd_kafka_topic_partition_list_t;
/// Native rdkafka metadata container
/// Native rdkafka metadata container.
pub type RDKafkaMetadata = bindings::rd_kafka_metadata_t;
/// Native rdkafka topic information
/// Native rdkafka topic information.
pub type RDKafkaMetadataTopic = bindings::rd_kafka_metadata_topic_t;
/// Native rdkafka partition information
/// Native rdkafka partition information.
pub type RDKafkaMetadataPartition = bindings::rd_kafka_metadata_partition_t;
/// Native rdkafka broker information
/// Native rdkafka broker information.
pub type RDKafkaMetadataBroker = bindings::rd_kafka_metadata_broker_t;
/// Native rdkafka state
/// Native rdkafka state.
pub type RDKafkaState = bindings::rd_kafka_s;
/// Native rdkafka list of groups
/// Native rdkafka list of groups.
pub type RDKafkaGroupList = bindings::rd_kafka_group_list;
/// Native rdkafka group information
/// Native rdkafka group information.
pub type RDKafkaGroupInfo = bindings::rd_kafka_group_info;
/// Native rdkafka group member information
/// Native rdkafka group member information.
pub type RDKafkaGroupMemberInfo = bindings::rd_kafka_group_member_info;
/// Native rdkafka group member information
/// Native rdkafka group member information.
pub type RDKafkaHeaders = bindings::rd_kafka_headers_t;
/// Native rdkafka queue
/// Native rdkafka queue.
pub type RDKafkaQueue = bindings::rd_kafka_queue_t;
// Native rdkafka new topic object
/// Native rdkafka new topic object.
pub type RDKafkaNewTopic = bindings::rd_kafka_NewTopic_t;
// Native rdkafka delete topic object
/// Native rdkafka delete topic object.
pub type RDKafkaDeleteTopic = bindings::rd_kafka_DeleteTopic_t;
// Native rdkafka new partitions object
/// Native rdkafka new partitions object.
pub type RDKafkaNewPartitions = bindings::rd_kafka_NewPartitions_t;
// Native rdkafka config resource
/// Native rdkafka config resource.
pub type RDKafkaConfigResource = bindings::rd_kafka_ConfigResource_t;
// Native rdkafka event
/// Native rdkafka event.
pub type RDKafkaEvent = bindings::rd_kafka_event_t;
// Native rdkafka admin options
/// Native rdkafka admin options.
pub type RDKafkaAdminOptions = bindings::rd_kafka_AdminOptions_t;
// Native rdkafka topic result
/// Native rdkafka topic result.
pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t;
// ENUMS
/// Client types
/// Client types.
pub use bindings::rd_kafka_type_t as RDKafkaType;
/// Configuration result
/// Configuration result.
pub use bindings::rd_kafka_conf_res_t as RDKafkaConfRes;
/// Response error
/// Response error.
pub use bindings::rd_kafka_resp_err_t as RDKafkaRespErr;
/// Admin operation
/// Admin operation.
pub use bindings::rd_kafka_admin_op_t as RDKafkaAdminOp;
/// Config resource type
/// Config resource type.
pub use bindings::rd_kafka_ResourceType_t as RDKafkaResourceType;
/// Config source
/// Config source.
pub use bindings::rd_kafka_ConfigSource_t as RDKafkaConfigSource;
/// Errors enum
// Errors enum
/// Error from the underlying rdkafka library.
/// Native rdkafka error.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RDKafkaError {
#[doc(hidden)]
Begin = -200,
/// Received message is incorrect
/// Received message is incorrect.
BadMessage = -199,
/// Bad/unknown compression
/// Bad/unknown compression.
BadCompression = -198,
/// Broker is going away
/// Broker is going away.
BrokerDestroy = -197,
/// Generic failure
/// Generic failure.
Fail = -196,
/// Broker transport failure
/// Broker transport failure.
BrokerTransportFailure = -195,
/// Critical system resource
/// Critical system resource.
CriticalSystemResource = -194,
/// Failed to resolve broker
/// Failed to resolve broker.
Resolve = -193,
/// Produced message timed out
/// Produced message timed out.
MessageTimedOut = -192,
/// Reached the end of the topic+partition queue on the broker. Not really an error.
PartitionEOF = -191,
/// Permanent: Partition does not exist in cluster.
UnknownPartition = -190,
/// File or filesystem error
/// File or filesystem error.
FileSystem = -189,
/// Permanent: Topic does not exist in cluster.
UnknownTopic = -188,
/// All broker connections are down.
AllBrokersDown = -187,
/// Invalid argument, or invalid configuration
/// Invalid argument, or invalid configuration.
InvalidArgument = -186,
/// Operation timed out
/// Operation timed out.
OperationTimedOut = -185,
/// Queue is full
/// Queue is full.
QueueFull = -184,
/// ISR count < required.acks
/// ISR count < required.acks.
ISRInsufficient = -183,
/// Broker node update
/// Broker node update.
NodeUpdate = -182,
/// SSL error
/// SSL error.
SSL = -181,
/// Waiting for coordinator to become available.
WaitingForCoordinator = -180,
/// Unknown client group
/// Unknown client group.
UnknownGroup = -179,
/// Operation in progress
/// Operation in progress.
InProgress = -178,
/// Previous operation in progress, wait for it to finish.
PreviousInProgress = -177,
/// This operation would interfere with an existing subscription
/// This operation would interfere with an existing subscription.
ExistingSubscription = -176,
/// Assigned partitions (rebalance_cb)
/// Assigned partitions (rebalance_cb).
AssignPartitions = -175,
/// Revoked partitions (rebalance_cb)
/// Revoked partitions (rebalance_cb).
RevokePartitions = -174,
/// Conflicting use
/// Conflicting use.
Conflict = -173,
/// Wrong state
/// Wrong state.
State = -172,
/// Unknown protocol
/// Unknown protocol.
UnknownProtocol = -171,
/// Not implemented
/// Not implemented.
NotImplemented = -170,
/// Authentication failure
/// Authentication failure.
Authentication = -169,
/// No stored offset
/// No stored offset.
NoOffset = -168,
/// Outdated
/// Outdated.
Outdated = -167,
/// Timed out in queue
/// Timed out in queue.
TimedOutQueue = -166,
/// Feature not supported by broker
/// Feature not supported by broker.
UnsupportedFeature = -165,
/// Awaiting cache update
/// Awaiting cache update.
WaitCache = -164,
/// Operation interrupted (e.g., due to yield))
/// Operation interrupted (e.g., due to yield).
Interrupted = -163,
/// Key serialization error
/// Key serialization error.
KeySerialization = -162,
/// Value serialization error
/// Value serialization error.
ValueSerialization = -161,
/// Key deserialization error
/// Key deserialization error.
KeyDeserialization = -160,
/// Value deserialization error
/// Value deserialization error.
ValueDeserialization = -159,
/// Partial response
/// Partial response.
Partial = -158,
/// Modification attempted on read-only object
/// Modification attempted on read-only object.
ReadOnly = -157,
/// No such entry or item not found
/// No such entry or item not found.
NoEnt = -156,
/// Read underflow
/// Read underflow.
Underflow = -155,
/// Invalid type
/// Invalid type.
InvalidType = -154,
/// Retry operation
/// Retry operation.
Retry = -153,
/// Purged in queue
/// Purged in queue.
PurgeQueue = -152,
/// Purged in flight
/// Purged in flight.
PurgeInflight = -151,
/// Fatal error: see rd_kafka_fatal_error()
/// Fatal error: see rd_kafka_fatal_error().
Fatal = -150,
/// Inconsistent state
/// Inconsistent state.
Inconsistent = -149,
/// Gap-less ordering would not be guaranteed if proceeding
/// Gap-less ordering would not be guaranteed if proceeding.
GaplessGuarantee = -148,
/// Maximum poll interval exceeded
/// Maximum poll interval exceeded.
PollExceeded = -147,
/// Unknown broker
/// Unknown broker.
UnknownBroker = -146,
/// Functionality not configured
/// Functionality not configured.
NotConfigured,
/// Instance has been fenced
/// Instance has been fenced.
Fenced,
/// Application generated error
/// Application generated error.
Application,
#[doc(hidden)]
End = -100,
/// Unknown broker error
/// Unknown broker error.
Unknown = -1,
/// Success
/// Success.
NoError = 0,
/// Offset out of range
/// Offset out of range.
OffsetOutOfRange = 1,
/// Invalid message
/// Invalid message.
InvalidMessage = 2,
/// Unknown topic or partition
/// Unknown topic or partition.
UnknownTopicOrPartition = 3,
/// Invalid message size
/// Invalid message size.
InvalidMessageSize = 4,
/// Leader not available
/// Leader not available.
LeaderNotAvailable = 5,
/// Not leader for partition
/// Not leader for partition.
NotLeaderForPartition = 6,
/// Request timed out
/// Request timed out.
RequestTimedOut = 7,
/// Broker not available
/// Broker not available.
BrokerNotAvailable = 8,
/// Replica not available
/// Replica not available.
ReplicaNotAvailable = 9,
/// Message size too large
/// Message size too large.
MessageSizeTooLarge = 10,
/// Stale controller epoch code
/// Stale controller epoch code.
StaleControllerEpoch = 11,
/// Offset metadata string too large
/// Offset metadata string too large.
OffsetMetadataTooLarge = 12,
/// Broker disconnected before response received
/// Broker disconnected before response received.
NetworkException = 13,
/// Coordinator load in progress
/// Coordinator load in progress.
CoordinatorLoadInProgress = 14,
/// Coordinator not available
/// Coordinator not available.
CoordinatorNotAvailable = 15,
/// Not coordinator
/// Not coordinator.
NotCoordinator = 16,
/// Invalid topic
/// Invalid topic.
InvalidTopic = 17,
/// Message batch larger than configured server segment size
/// Message batch larger than configured server segment size.
MessageBatchTooLarge = 18,
/// Not enough in-sync replicas
/// Not enough in-sync replicas.
NotEnoughReplicas = 19,
/// Message(s) written to insufficient number of in-sync replicas
/// Message(s) written to insufficient number of in-sync replicas.
NotEnoughReplicasAfterAppend = 20,
/// Invalid required acks value
/// Invalid required acks value.
InvalidRequiredAcks = 21,
/// Specified group generation id is not valid
/// Specified group generation id is not valid.
IllegalGeneration = 22,
/// Inconsistent group protocol
/// Inconsistent group protocol.
InconsistentGroupProtocol = 23,
/// Invalid group.id
/// Invalid group.id.
InvalidGroupId = 24,
/// Unknown member
/// Unknown member.
UnknownMemberId = 25,
/// Invalid session timeout
/// Invalid session timeout.
InvalidSessionTimeout = 26,
/// Group rebalance in progress
/// Group rebalance in progress.
RebalanceInProgress = 27,
/// Commit offset data size is not valid
/// Commit offset data size is not valid.
InvalidCommitOffsetSize = 28,
/// Topic authorization failed
/// Topic authorization failed.
TopicAuthorizationFailed = 29,
/// Group authorization failed
/// Group authorization failed.
GroupAuthorizationFailed = 30,
/// Cluster authorization failed
/// Cluster authorization failed.
ClusterAuthorizationFailed = 31,
/// Invalid timestamp
/// Invalid timestamp.
InvalidTimestamp = 32,
/// Unsupported SASL mechanism
/// Unsupported SASL mechanism.
UnsupportedSASLMechanism = 33,
/// Illegal SASL state
/// Illegal SASL state.
IllegalSASLState = 34,
/// Unsupported version
/// Unsupported version.
UnsupportedVersion = 35,
/// Topic already exists
/// Topic already exists.
TopicAlreadyExists = 36,
/// Invalid number of partitions
/// Invalid number of partitions.
InvalidPartitions = 37,
/// Invalid replication factor
/// Invalid replication factor.
InvalidReplicationFactor = 38,
/// Invalid replica assignment
/// Invalid replica assignment.
InvalidReplicaAssignment = 39,
/// Invalid config */
/// Invalid config.
InvalidConfig = 40,
/// Not controller for cluster
/// Not controller for cluster.
NotController = 41,
/// Invalid request
/// Invalid request.
InvalidRequest = 42,
/// Message format on broker does not support request
/// Message format on broker does not support request.
UnsupportedForMessageFormat = 43,
/// Policy violation
/// Policy violation.
PolicyViolation = 44,
/// Broker received an out of order sequence number
/// Broker received an out of order sequence number.
OutOfOrderSequenceNumber = 45,
/// Broker received a duplicate sequence number
/// Broker received a duplicate sequence number.
DuplicateSequenceNumber = 46,
/// Producer attempted an operation with an old epoch
/// Producer attempted an operation with an old epoch.
InvalidProducerEpoch = 47,
/// Producer attempted a transactional operation in an invalid state
/// Producer attempted a transactional operation in an invalid state.
InvalidTransactionalState = 48,
/// Producer attempted to use a producer id which is currently assigned to its transactional id
/// Producer attempted to use a producer id which is currently assigned to
/// its transactional id.
InvalidProducerIdMapping = 49,
/// Transaction timeout is larger than the maxi value allowed by the broker's
/// max.transaction.timeout.ms
/// Transaction timeout is larger than the maxi value allowed by the
/// broker's max.transaction.timeout.ms.
InvalidTransactionTimeout = 50,
/// Producer attempted to update a transaction while another concurrent operation on the same
/// transaction was ongoing
/// Producer attempted to update a transaction while another concurrent
/// operation on the same transaction was ongoing.
ConcurrentTransactions = 51,
/// Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current
/// coordinator for a given producer
/// Indicates that the transaction coordinator sending a WriteTxnMarker is
/// no longer the current coordinator for a given producer.
TransactionCoordinatorFenced = 52,
/// Transactional Id authorization failed
/// Transactional Id authorization failed.
TransactionalIdAuthorizationFailed = 53,
/// Security features are disabled
/// Security features are disabled.
SecurityDisabled = 54,
/// Operation not attempted
/// Operation not attempted.
OperationNotAttempted = 55,
/// Disk error when trying to access log file on the disk
/// Disk error when trying to access log file on the disk.
KafkaStorageError = 56,
/// The user-specified log directory is not found in the broker config
/// The user-specified log directory is not found in the broker config.
LogDirNotFound = 57,
/// SASL Authentication failed
/// SASL Authentication failed.
SaslAuthenticationFailed = 58,
/// Unknown Producer Id
/// Unknown Producer Id.
UnknownProducerId = 59,
/// Partition reassignment is in progress
/// Partition reassignment is in progress.
ReassignmentInProgress = 60,
/// Delegation Token feature is not enabled
/// Delegation Token feature is not enabled.
DelegationTokenAuthDisabled = 61,
/// Delegation Token is not found on server
/// Delegation Token is not found on server.
DelegationTokenNotFound = 62,
/// Specified Principal is not valid Owner/Renewer
/// Specified Principal is not valid Owner/Renewer.
DelegationTokenOwnerMismatch = 63,
/// Delegation Token requests are not allowed on this connection
/// Delegation Token requests are not allowed on this connection.
DelegationTokenRequestNotAllowed = 64,
/// Delegation Token authorization failed
/// Delegation Token authorization failed.
DelegationTokenAuthorizationFailed = 65,
/// Delegation Token is expired
/// Delegation Token is expired.
DelegationTokenExpired = 66,
/// Supplied principalType is not supported
/// Supplied principalType is not supported.
InvalidPrincipalType = 67,
/// The group is not empty
/// The group is not empty.
NonEmptyGroup = 68,
/// The group id does not exist
/// The group id does not exist.
GroupIdNotFound = 69,
/// The fetch session ID was not found
/// The fetch session ID was not found.
FetchSessionIdNotFound = 70,
/// The fetch session epoch is invalid
/// The fetch session epoch is invalid.
InvalidFetchSessionEpoch = 71,
/// No matching listener
/// No matching listener.
ListenerNotFound = 72,
/// Topic deletion is disabled
/// Topic deletion is disabled.
TopicDeletionDisabled = 73,
/// Leader epoch is older than broker epoch
/// Leader epoch is older than broker epoch.
FencedLeaderEpoch = 74,
/// Leader epoch is newer than broker epoch
/// Leader epoch is newer than broker epoch.
UnknownLeaderEpoch = 75,
/// Unsupported compression type
/// Unsupported compression type.
UnsupportedCompressionType = 76,
/// Broker epoch has changed
/// Broker epoch has changed.
StaleBrokerEpoch = 77,
/// Leader high watermark is not caught up
/// Leader high watermark is not caught up.
OffsetNotAvailable = 78,
/// Group member needs a valid member ID
/// Group member needs a valid member ID.
MemberIdRequired = 79,
/// Preferred leader was not available
/// Preferred leader was not available.
PreferredLeaderNotAvailable = 80,
/// Consumer group has reached maximum size
/// Consumer group has reached maximum size.
GroupMaxSizeReached = 81,
/// Static consumer fenced by other consumer with same group.instance.id
/// Static consumer fenced by other consumer with same group.instance.id.
FencedInstanceId = 82,
#[doc(hidden)]
EndAll,

View File

@ -6,9 +6,8 @@
[![coverate](https://codecov.io/gh/fede1024/rust-rdkafka/graphs/badge.svg?branch=master)](https://codecov.io/gh/fede1024/rust-rdkafka/)
[![Join the chat at https://gitter.im/rust-rdkafka/Lobby](https://badges.gitter.im/rust-rdkafka/Lobby.svg)](https://gitter.im/rust-rdkafka/Lobby?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
A fully asynchronous, [futures]-based Kafka client library for Rust based on [librdkafka].
__INCLUDE_RUST_DOC__$src/lib.rs
__INCLUDE_RUST_DOC__$src/lib.rs$## The library
## rdkafka-sys
See [rdkafka-sys](https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys).

View File

@ -1,4 +1,15 @@
//! Common client functionalities.
//! Common client functionality.
//!
//! In librdkafka parlance, a client is either a consumer or a producer. This
//! module's [`Client`] type provides the functionality that is common to both
//! consumers and producers.
//!
//! Typically you will not want to construct a client directly. Construct one of
//! the consumers in the [`consumer`] module or one of the producers in the
//! [`producer`] modules instead.
//!
//! [`consumer`]: crate::consumer
//! [`producer`]: crate::producer
use std::convert::TryFrom;
use std::ffi::{CStr, CString};
@ -21,16 +32,27 @@ use crate::metadata::Metadata;
use crate::statistics::Statistics;
use crate::util::{ErrBuf, KafkaDrop, NativePtr, Timeout};
/// Client-level context
/// Client-level context.
///
/// Each client (consumers and producers included) has a context object that can be used to
/// customize its behavior. Implementing `ClientContext` enables the customization of
/// methods common to all clients, while `ProducerContext` and `ConsumerContext` are specific to
/// producers and consumers. Refer to the list of methods to see which callbacks can currently
/// be overridden. Implementations of `ClientContext` must be thread safe, as they might be owned by
/// multiple threads.
/// Each client (consumers and producers included) has a context object that can
/// be used to customize its behavior. Implementing `ClientContext` enables the
/// customization of methods common to all clients, while [`ProducerContext`]
/// and [`ConsumerContext`] are specific to producers and consumers. Refer to
/// the list of methods to see which callbacks can currently be overridden.
///
/// **Important**: implementations of `ClientContext` must be thread safe, as
/// they might be shared between multiple threads.
///
/// [`ConsumerContext`]: crate::consumer::ConsumerContext
/// [`ProducerContext`]: crate::producer::ProducerContext
pub trait ClientContext: Send + Sync {
/// Receives log lines from librdkafka.
///
/// The default implementation forwards the log lines to the appropriate
/// [`log`] crate macro. Consult the [`RDKafkaLogLevel`] documentation for
/// details about the log level mapping.
///
/// [`log`]: https://docs.rs/log
fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
match level {
RDKafkaLogLevel::Emerg
@ -55,12 +77,16 @@ pub trait ClientContext: Send + Sync {
}
/// Receives the statistics of the librdkafka client. To enable, the
/// "statistics.interval.ms" configuration parameter must be specified.
/// `statistics.interval.ms` configuration parameter must be specified.
///
/// The default implementation logs the statistics at the `info` log level.
fn stats(&self, statistics: Statistics) {
info!("Client stats: {:?}", statistics);
}
/// Receives global errors from the librdkafka client.
///
/// The default implementation logs the error at the `error` log level.
fn error(&self, error: KafkaError, reason: &str) {
error!("librdkafka: {}: {}", error, reason);
}
@ -70,8 +96,10 @@ pub trait ClientContext: Send + Sync {
// https://github.com/rust-lang/rfcs/pull/1406 will maybe help in the future.
}
/// An empty `ClientContext` that can be used when no context is needed. Default
/// callback implementations will be used.
/// An empty [`ClientContext`] that can be used when no customizations are
/// needed.
///
/// Uses the default callback implementations provided by `ClientContext`.
#[derive(Clone, Default)]
pub struct DefaultClientContext;
@ -81,8 +109,9 @@ impl ClientContext for DefaultClientContext {}
// ********** CLIENT **********
//
/// A native rdkafka-sys client. This struct shouldn't be used directly. Use higher level `Client`
/// or producers and consumers.
/// A native rdkafka-sys client. This struct shouldn't be used directly. Use
/// higher level `Client` or producers and consumers.
// TODO(benesch): this should be `pub(crate)`.
pub struct NativeClient {
ptr: NativePtr<RDKafka>,
}
@ -110,9 +139,17 @@ impl NativeClient {
}
}
/// A low level rdkafka client. This client shouldn't be used directly. The producer and consumer modules
/// provide different producer and consumer implementations based on top of `Client` that can be
/// used instead.
/// A low-level rdkafka client.
///
/// This type is the basis of the consumers and producers in the [`consumer`]
/// and [`producer`] modules, respectively.
///
/// Typically you do not want to construct a `Client` directly, but instead
/// construct a consumer or producer. A `Client` can be used, however, when
/// only access to cluster metadata and watermarks is required.
///
/// [`consumer`]: crate::consumer
/// [`producer`]: crate::producer
pub struct Client<C: ClientContext = DefaultClientContext> {
native: NativeClient,
context: Box<C>,

View File

@ -2,21 +2,25 @@
//!
//! ## C library configuration
//!
//! The Rust library will forward all the configuration to the C library. The most frequently
//! used parameters are listed here.
//! The Rust library will forward all the configuration to the C library. The
//! most frequently used parameters are listed here.
//!
//! ### Frequently used parameters
//!
//! For producer-specific and consumer-specific parameters check the producer and consumer modules
//! documentation. The full list of available parameters is available in the [librdkafka
//! documentation](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
//! For producer-specific and consumer-specific parameters check the producer
//! and consumer modules documentation. The full list of available parameters is
//! available in the [librdkafka documentation][librdkafka-config].
//!
//! - `client.id` (rdkafka): Client identifier.
//! - `bootstrap.servers`: Initial list of brokers as a CSV list of broker host or host:port.
//! - `message.max.bytes` (1000000): Maximum message size.
//! - `debug`: A comma-separated list of debug contexts to enable. Use 'all' to print all the debugging information.
//! - `statistics.interval.ms` (0 - disabled): how often the statistic callback specified in the `Context` will be called.
//! - `client.id`: Client identifier. Default: `rdkafka`.
//! - `bootstrap.servers`: Initial list of brokers as a CSV list of broker host
//! or host:port. Default: empty.
//! - `message.max.bytes`: Maximum message size. Default: 1000000.
//! - `debug`: A comma-separated list of debug contexts to enable. Use 'all' to
//! print all the debugging information. Default: empty (off).
//! - `statistics.interval.ms`: how often the statistic callback
//! specified in the [`ClientContext`] will be called. Default: 0 (disabled).
//!
//! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
use std::collections::HashMap;
use std::ffi::CString;
@ -34,21 +38,25 @@ use crate::util::{ErrBuf, KafkaDrop, NativePtr};
/// The log levels supported by librdkafka.
#[derive(Copy, Clone, Debug)]
pub enum RDKafkaLogLevel {
/// Higher priority then Level::Error from the log crate.
/// Higher priority then [`Level::Error`](log::Level::Error) from the log
/// crate.
Emerg = 0,
/// Higher priority then Level::Error from the log crate.
/// Higher priority then [`Level::Error`](log::Level::Error) from the log
/// crate.
Alert = 1,
/// Higher priority then Level::Error from the log crate.
/// Higher priority then [`Level::Error`](log::Level::Error) from the log
/// crate.
Critical = 2,
/// Equivalent to Level::Error from the log crate.
/// Equivalent to [`Level::Error`](log::Level::Error) from the log crate.
Error = 3,
/// Equivalent to Level::Warning from the log crate.
/// Equivalent to [`Level::Warn`](log::Level::Warn) from the log crate.
Warning = 4,
/// Higher priority then Level::Info from the log crate.
/// Higher priority then [`Level::Info`](log::Level::Info) from the log
/// crate.
Notice = 5,
/// Equivalent to Level::Info from the log crate.
/// Equivalent to [`Level::Info`](log::Level::Info) from the log crate.
Info = 6,
/// Equivalent to Level::Debug from the log crate.
/// Equivalent to [`Level::Debug`](log::Level::Debug) from the log crate.
Debug = 7,
}
@ -108,7 +116,8 @@ impl NativeClientConfig {
#[derive(Clone, Debug)]
pub struct ClientConfig {
conf_map: HashMap<String, String>,
/// The librdkafka logging level. Refer to `RDKafkaLogLevel` for the list of available levels.
/// The librdkafka logging level. Refer to [`RDKafkaLogLevel`] for the list
/// of available levels.
pub log_level: RDKafkaLogLevel,
}
@ -198,12 +207,13 @@ fn log_level_from_global_config() -> RDKafkaLogLevel {
/// Create a new client based on the provided configuration.
pub trait FromClientConfig: Sized {
/// Create a client from client configuration. The default client context will be used.
/// Creates a client from a client configuration. The default client context
/// will be used.
fn from_config(_: &ClientConfig) -> KafkaResult<Self>;
}
/// Create a new client based on the provided configuration and context.
pub trait FromClientConfigAndContext<C: ClientContext>: Sized {
/// Create a client from client configuration and a client context.
/// Creates a client from a client configuration and a client context.
fn from_config_and_context(_: &ClientConfig, _: C) -> KafkaResult<Self>;
}

View File

@ -1,4 +1,4 @@
//! Base trait and common functionality for all consumers.
//! Kafka consumers.
use std::ptr;
use std::time::Duration;
@ -38,12 +38,19 @@ pub enum Rebalance<'a> {
Error(String),
}
/// Consumer specific Context. This user-defined object can be used to provide custom callbacks to
/// consumer events. Refer to the list of methods to check which callbacks can be specified.
/// Consumer-specific context.
///
/// This user-defined object can be used to provide custom callbacks for
/// consumer events. Refer to the list of methods to check which callbacks can
/// be specified.
///
/// See also the [`ClientContext`] trait.
pub trait ConsumerContext: ClientContext {
/// Implements the default rebalancing strategy and calls the `pre_rebalance` and
/// `post_rebalance` methods. If this method is overridden, it will be responsibility
/// of the user to call them if needed.
/// Implements the default rebalancing strategy and calls the
/// [`pre_rebalance`](ConsumerContext::pre_rebalance) and
/// [`post_rebalance`](ConsumerContext::post_rebalance) methods. If this
/// method is overridden, it will be responsibility of the user to call them
/// if needed.
fn rebalance(
&self,
native_client: &NativeClient,
@ -80,34 +87,34 @@ pub trait ConsumerContext: ClientContext {
self.post_rebalance(&rebalance);
}
/// Pre-rebalance callback. This method will run before the rebalance and should
/// terminate its execution quickly.
/// Pre-rebalance callback. This method will run before the rebalance and
/// should terminate its execution quickly.
#[allow(unused_variables)]
fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
/// Post-rebalance callback. This method will run after the rebalance and should
/// terminate its execution quickly.
/// Post-rebalance callback. This method will run after the rebalance and
/// should terminate its execution quickly.
#[allow(unused_variables)]
fn post_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
// TODO: convert pointer to structure
/// Post commit callback. This method will run after a group of offsets was committed to the
/// offset store.
/// Post commit callback. This method will run after a group of offsets was
/// committed to the offset store.
#[allow(unused_variables)]
fn commit_callback(&self, result: KafkaResult<()>, offsets: &TopicPartitionList) {}
/// Returns the minimum interval at which to poll the main queue, which
/// services the logging, stats, and error callbacks.
///
/// The main queue is polled once whenever [`Consumer.poll`] is called. If
/// `Consumer.poll` is called with a timeout that is larger than this
/// interval, then the main queue will be polled at that interval while the
/// consumer queue is blocked.
/// The main queue is polled once whenever [`BaseConsumer::poll`] is called.
/// If `poll` is called with a timeout that is larger than this interval,
/// then the main queue will be polled at that interval while the consumer
/// queue is blocked.
///
/// For example, if the main queue's minimum poll interval is 200ms and
/// `Consumer.poll` is called with a timeout of 1s, then `Consumer.poll` may
/// block for up to 1s waiting for a message, but it will poll the main
/// queue every 200ms while it is waiting.
/// `poll` is called with a timeout of 1s, then `poll` may block for up to
/// 1s waiting for a message, but it will poll the main queue every 200ms
/// while it is waiting.
///
/// By default, the minimum poll interval for the main queue is 1s.
fn main_queue_min_poll_interval(&self) -> Timeout {
@ -122,7 +129,8 @@ pub trait ConsumerContext: ClientContext {
// StreamConsumerContext as well.
}
/// An empty consumer context that can be user when no context is needed.
/// An empty consumer context that can be user when no customizations are
/// needed.
#[derive(Clone, Default)]
pub struct DefaultConsumerContext;

View File

@ -26,7 +26,8 @@ use crate::util::{NativePtr, OnDrop, Timeout};
/// The [`ConsumerContext`] used by the [`StreamConsumer`]. This context will
/// automatically wake up the message stream when new data is available.
///
/// This type is not intended to be used directly.
/// This type is not intended to be used directly. It will be automatically
/// created by the `StreamConsumer` when necessary.
pub struct StreamConsumerContext<C: ConsumerContext + 'static> {
inner: C,
waker: Arc<Mutex<Option<Waker>>>,
@ -162,7 +163,7 @@ impl<'a, C: ConsumerContext + 'a> Stream for MessageStream<'a, C> {
}
}
/// A Kafka Consumer providing a [`futures::Stream`] interface.
/// A Kafka consumer providing a [`futures::Stream`] interface.
///
/// This consumer doesn't need to be polled manually since
/// [`StreamConsumer::start`] will launch a background polling task.
@ -200,7 +201,7 @@ impl<C: ConsumerContext> FromClientConfigAndContext<C> for StreamConsumer<C> {
}
impl<C: ConsumerContext> StreamConsumer<C> {
/// Starts the `StreamConsumer` with default configuration (100ms polling
/// Starts the stream consumer with default configuration (100ms polling
/// interval and no `NoMessageReceived` notifications).
///
/// **Note:** this method must be called from within the context of a Tokio
@ -209,7 +210,7 @@ impl<C: ConsumerContext> StreamConsumer<C> {
self.start_with(Duration::from_millis(100), false)
}
/// Starts the `StreamConsumer` with the specified poll interval.
/// Starts the stream consumer with the specified poll interval.
///
/// If `no_message_error` is set to true, the returned `MessageStream` will
/// yield an error of type `KafkaError::NoMessageReceived` every time the
@ -235,7 +236,7 @@ impl<C: ConsumerContext> StreamConsumer<C> {
MessageStream::new(self, no_message_interval)
}
/// Stops the `StreamConsumer`.
/// Stops the stream consumer.
pub fn stop(&self) {
self.should_stop.store(true, Ordering::Relaxed);
}

View File

@ -12,8 +12,10 @@ pub use rdsys::types::RDKafkaError;
pub type KafkaResult<T> = Result<T, KafkaError>;
/// Verify if the value represents an error condition.
///
/// Some librdkafka codes are informational, rather than true errors.
pub trait IsError {
/// Return true if the value represents an error.
/// Reports whether the value represents an error.
fn is_error(self) -> bool;
}
@ -31,7 +33,9 @@ impl IsError for RDKafkaConfRes {
// TODO: consider using macro
/// Represents all Kafka errors. Check the underlying `RDKafkaError` to get details.
/// Represents all possible Kafka errors.
///
/// If applicable, check the underlying [`RDKafkaError`] to get details.
#[derive(Clone, PartialEq, Eq)]
pub enum KafkaError {
/// Creation of admin operation failed.

View File

@ -13,7 +13,7 @@ use crate::util::{KafkaDrop, NativePtr};
pub struct GroupMemberInfo(RDKafkaGroupMemberInfo);
impl GroupMemberInfo {
/// Return the id of the member.
/// Returns the ID of the member.
pub fn id(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.member_id)
@ -22,7 +22,7 @@ impl GroupMemberInfo {
}
}
/// Return the client id of the member.
/// Returns the client ID of the member.
pub fn client_id(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.client_id)
@ -40,7 +40,7 @@ impl GroupMemberInfo {
}
}
/// Return the metadata of the member
/// Return the metadata of the member.
pub fn metadata(&self) -> Option<&[u8]> {
unsafe {
if self.0.member_metadata.is_null() {
@ -54,7 +54,7 @@ impl GroupMemberInfo {
}
}
/// Return the assignment of the member
/// Return the partition assignment of the member.
pub fn assignment(&self) -> Option<&[u8]> {
unsafe {
if self.0.member_assignment.is_null() {
@ -73,7 +73,7 @@ impl GroupMemberInfo {
pub struct GroupInfo(RDKafkaGroupInfo);
impl GroupInfo {
/// Return the name of the group.
/// Returns the name of the group.
pub fn name(&self) -> &str {
unsafe {
CStr::from_ptr(self.0.group)
@ -126,8 +126,10 @@ impl fmt::Debug for GroupInfo {
}
}
/// List of groups. This structure wraps the pointer returned by rdkafka-sys, and deallocates all
/// the native resources when dropped.
/// List of groups.
///
/// This structure wraps the pointer returned by rdkafka-sys, and deallocates
/// all the native resources when dropped.
pub struct GroupList(NativePtr<RDKafkaGroupList>);
unsafe impl KafkaDrop for RDKafkaGroupList {

View File

@ -1,13 +1,13 @@
//! # rust-rdkafka
//!
//! A fully asynchronous, [futures]-based Kafka client library for Rust based on [librdkafka].
//! A fully asynchronous, [futures]-enabled [Apache Kafka] client
//! library for Rust based on [librdkafka].
//!
//! ## The library
//! `rust-rdkafka` provides a safe Rust interface to librdkafka. The master branch is currently based on librdkafka 1.3.0.
//!
//! `rust-rdkafka` provides a safe Rust interface to librdkafka. The master
//! branch is currently based on librdkafka 1.4.2.
//!
//! ### Documentation
//!
//! - [Current master branch](https://fede1024.github.io/rust-rdkafka/)
//! - [Latest release](https://docs.rs/rdkafka/)
//! - [Changelog](https://github.com/fede1024/rust-rdkafka/blob/master/changelog.md)
//!
@ -15,23 +15,32 @@
//!
//! The main features provided at the moment are:
//!
//! - Support for all Kafka versions since 0.8.x. For more information about broker compatibility options, check the [librdkafka documentation].
//! - Support for all Kafka versions since 0.8.x. For more information about
//! broker compatibility options, check the [librdkafka
//! documentation][broker-compat].
//! - Consume from single or multiple topics.
//! - Automatic consumer rebalancing.
//! - Customizable rebalance, with pre and post rebalance callbacks.
//! - Synchronous or asynchronous message production.
//! - Customizable offset commit.
//! - Access to cluster metadata (list of topic-partitions, replicas, active brokers etc).
//! - Access to group metadata (list groups, list members of groups, hostnames etc).
//! - Create and delete topics and add and edit partitions.
//! - Alter broker and topic configurations.
//! - Access to cluster metadata (list of topic-partitions, replicas, active
//! brokers etc).
//! - Access to group metadata (list groups, list members of groups, hostnames,
//! etc.).
//! - Access to producer and consumer metrics, errors and callbacks.
//!
//! [librdkafka documentation]: https://github.com/edenhill/librdkafka/wiki/Broker-version-compatibility
//!
//! ### One million messages per second
//!
//! `rust-rdkafka` is designed to be easy and safe to use thanks to the abstraction layer written in Rust, while at the same time being extremely fast thanks to the librdkafka C library.
//! `rust-rdkafka` is designed to be easy and safe to use thanks to the
//! abstraction layer written in Rust, while at the same time being extremely
//! fast thanks to the librdkafka C library.
//!
//! Here are some benchmark results using the rust-rdkafka `BaseProducer`, sending data to a single Kafka 0.11 process running in localhost (default configuration, 3 partitions). Hardware: Dell laptop, with Intel Core i7-4712HQ @ 2.30GHz.
//! Here are some benchmark results using the [`BaseProducer`],
//! sending data to a single Kafka 0.11 process running in localhost (default
//! configuration, 3 partitions). Hardware: Dell laptop, with Intel Core
//! i7-4712HQ @ 2.30GHz.
//!
//! - Scenario: produce 5 million messages, 10 bytes each, wait for all of them to be acked
//! - 1045413 messages/s, 9.970 MB/s (average over 5 runs)
@ -39,58 +48,75 @@
//! - Scenario: produce 100000 messages, 10 KB each, wait for all of them to be acked
//! - 24623 messages/s, 234.826 MB/s (average over 5 runs)
//!
//! For more numbers, check out the [kafka-benchmark](https://github.com/fede1024/kafka-benchmark) project.
//! For more numbers, check out the [kafka-benchmark] project.
//!
//! ### Client types
//!
//! `rust-rdkafka` provides low level and high level consumers and producers. Low level:
//! `rust-rdkafka` provides low level and high level consumers and producers.
//!
//! * [`BaseConsumer`]: simple wrapper around the librdkafka consumer. It requires to be periodically `poll()`ed in order to execute callbacks, rebalances and to receive messages.
//! * [`BaseProducer`]: simple wrapper around the librdkafka producer. As in the consumer case, the user must call `poll()` periodically to execute delivery callbacks.
//! * [`ThreadedProducer`]: `BaseProducer` with a separate thread dedicated to polling the producer.
//! Low level:
//!
//! * [`BaseConsumer`]: a simple wrapper around the librdkafka consumer. It
//! must be periodically `poll()`ed in order to execute callbacks, rebalances
//! and to receive messages.
//! * [`BaseProducer`]: a simple wrapper around the librdkafka producer. As in
//! the consumer case, the user must call `poll()` periodically to execute
//! delivery callbacks.
//! * [`ThreadedProducer`]: a `BaseProducer` with a separate thread dedicated to
//! polling the producer.
//!
//! High level:
//!
//! * [`StreamConsumer`]: it returns a [`stream`] of messages and takes care of polling the consumer internally.
//! * [`FutureProducer`]: it returns a [`future`] that will be completed once the message is delivered to Kafka (or failed).
//! * [`StreamConsumer`]: a [`Stream`] of messages that takes care of
//! polling the consumer automatically.
//! * [`FutureProducer`]: a [`Future`] that will be completed once
//! the message is delivered to Kafka (or failed).
//!
//! For more information about consumers and producers, refer to their module-level documentation.
//! For more information about consumers and producers, refer to their
//! module-level documentation.
//!
//! *Warning*: the library is under active development and the APIs are likely to change.
//! *Warning*: the library is under active development and the APIs are likely
//! to change.
//!
//! ### Asynchronous data processing with Tokio
//! [Tokio] is a platform for fast processing of asynchronous events in Rust. The interfaces exposed by the [`StreamConsumer`] and the [`FutureProducer`] allow rust-rdkafka users to easily integrate Kafka consumers and producers within the Tokio platform, and write asynchronous message processing code. Note that rust-rdkafka can be used without Tokio.
//!
//! To see rust-rdkafka in action with Tokio, check out the [asynchronous processing example] in the examples folder.
//! [Tokio] is a platform for fast processing of asynchronous events in Rust.
//! The interfaces exposed by the [`StreamConsumer`] and the [`FutureProducer`]
//! allow rust-rdkafka users to easily integrate Kafka consumers and producers
//! within the Tokio platform, and write asynchronous message processing code.
//! Note that rust-rdkafka can be used without Tokio.
//!
//! [Tokio]: https://tokio.rs/
//! [asynchronous processing example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/asynchronous_processing.rs
//! To see rust-rdkafka in action with Tokio, check out the
//! [asynchronous processing example] in the examples folder.
//!
//! ### At-least-once delivery
//!
//! At-least-once delivery semantic is common in many streaming applications: every message is guaranteed to be processed at least once; in case of temporary failure, the message can be re-processed and/or re-delivered, but no message will be lost.
//! At-least-once delivery semantics are common in many streaming applications:
//! every message is guaranteed to be processed at least once; in case of
//! temporary failure, the message can be re-processed and/or re-delivered,
//! but no message will be lost.
//!
//! In order to implement at-least-once delivery the stream processing application has to carefully commit the offset only once the message has been processed. Committing the offset too early, instead, might cause message loss, since upon recovery the consumer will start from the next message, skipping the one where the failure occurred.
//! In order to implement at-least-once delivery the stream processing
//! application has to carefully commit the offset only once the message has
//! been processed. Committing the offset too early, instead, might cause
//! message loss, since upon recovery the consumer will start from the next
//! message, skipping the one where the failure occurred.
//!
//! To see how to implement at-least-once delivery with `rdkafka`, check out the [at-least-once delivery example] in the examples folder. To know more about delivery semantics, check the [message delivery semantics] chapter in the Kafka documentation.
//!
//! [at-least-once delivery example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/at_least_once.rs
//! [message delivery semantics]: https://kafka.apache.org/0101/documentation.html#semantics
//! To see how to implement at-least-once delivery with `rdkafka`, check out the
//! [at-least-once delivery example] in the examples folder. To know more about
//! delivery semantics, check the [message delivery semantics] chapter in the
//! Kafka documentation.
//!
//! ### Users
//!
//! Here are some of the projects using rust-rdkafka:
//!
//! - [timely-dataflow]: a modular implementation of timely dataflow in Rust (you can also check the [blog post]).
//! - [timely-dataflow]: a distributed data-parallel compute engine. See also
//! the [blog post][timely-blog] announcing its Kafka integration.
//! - [kafka-view]: a web interface for Kafka clusters.
//! - [kafka-benchmark]: a high performance benchmarking tool for Kafka.
//!
//! *If you are using rust-rdkafka, please let me know!*
//!
//! [timely-dataflow]: https://github.com/frankmcsherry/timely-dataflow
//! [kafka-view]: https://github.com/fede1024/kafka-view
//! [kafka-benchmark]: https://github.com/fede1024/kafka-benchmark
//! [blog post]: https://github.com/frankmcsherry/blog/blob/master/posts/2017-11-08.md
//! *If you are using rust-rdkafka, please let us know!*
//!
//! ## Installation
//!
@ -101,8 +127,8 @@
//! rdkafka = { version = "0.23", features = ["cmake-build"] }
//! ```
//!
//! This crate will compile librdkafka from sources and link it statically to your
//! executable. To compile librdkafka you'll need:
//! This crate will compile librdkafka from sources and link it statically to
//! your executable. To compile librdkafka you'll need:
//!
//! * the GNU toolchain
//! * GNU `make`
@ -114,13 +140,12 @@
//! * `libzstd-dev`: optional, *not* included by default (feature: `zstd-pkg-config`)
//!
//! Note that using the CMake build system, via the `cmake-build` feature, is
//! **strongly** encouraged. The default build system has a [known
//! issue](rdkafka-sys/README.md#known-issues) that can cause corrupted builds.
//! encouraged if you can take the dependency on CMake.
//!
//! By default a submodule with the librdkafka sources pinned to a specific commit
//! will be used to compile and statically link the library. The `dynamic-linking`
//! feature can be used to instead dynamically link rdkafka to the system's version
//! of librdkafka. Example:
//! By default a submodule with the librdkafka sources pinned to a specific
//! commit will be used to compile and statically link the library. The
//! `dynamic-linking` feature can be used to instead dynamically link rdkafka to
//! the system's version of librdkafka. Example:
//!
//! ```toml
//! [dependencies]
@ -128,103 +153,75 @@
//! ```
//!
//! For a full listing of features, consult the [rdkafka-sys crate's
//! documentation](rdkafka-sys/README.md#features). All of rdkafka-sys features are
//! documentation][rdkafka-sys-features]. All of rdkafka-sys features are
//! re-exported as rdkafka features.
//!
//! ### Tokio dependency
//!
//! The [`StreamConsumer`] and [`FutureProducer`] depend on Tokio, which can be a
//! heavyweight dependency for users who only intend to use the non-async/await
//! consumers or producers.
//!
//! The `tokio` feature is enabled by default. To disable it, turn off default
//! features like so:
//! Some features of the [`StreamConsumer`] and [`FutureProducer`] depend on
//! Tokio, which can be a heavyweight dependency for users who only intend to
//! use the low-level consumers and producers. The Tokio integration is
//! enabled by default, but can be disabled by turning off default features:
//!
//! ```toml
//! [dependencies]
//! rdkafka = { version = "0.23", default-features = false }
//! ```
//!
//! ## Compiling from sources
//!
//! To compile from sources, you'll have to update the submodule containing librdkafka:
//!
//! ```bash
//! git submodule update --init
//! ```
//!
//! and then compile using `cargo`, selecting the features that you want. Example:
//!
//! ```bash
//! cargo build --features "ssl gssapi"
//! ```
//!
//! ## Examples
//!
//! You can find examples in the `examples` folder. To run them:
//! You can find examples in the [`examples`] folder. To run them:
//!
//! ```bash
//! cargo run --example <example_name> -- <example_args>
//! ```
//!
//! ## Tests
//!
//! ### Unit tests
//!
//! The unit tests can run without a Kafka broker present:
//!
//! ```bash
//! cargo test --lib
//! ```
//!
//! ### Automatic testing
//!
//! rust-rdkafka contains a suite of tests which is automatically executed by travis in
//! docker-compose. Given the interaction with C code that rust-rdkafka has to do, tests
//! are executed in valgrind to check eventual memory errors and leaks.
//!
//! To run the full suite using docker-compose:
//!
//! ```bash
//! ./test_suite.sh
//! ```
//!
//! To run locally, instead:
//!
//! ```bash
//! KAFKA_HOST="kafka_server:9092" cargo test
//! ```
//!
//! In this case there is a broker expected to be running on `KAFKA_HOST`.
//! The broker must be configured with default partition number 3 and topic autocreation in order
//! for the tests to succeed.
//!
//! ## Debugging
//!
//! rust-rdkafka uses the `log` and `env_logger` crates to handle logging. Logging can be enabled
//! using the `RUST_LOG` environment variable, for example:
//! rust-rdkafka uses the [`log`] and [`env_logger`] crates to handle logging.
//! Logging can be enabled using the `RUST_LOG` environment variable, for
//! example:
//!
//! ```bash
//! RUST_LOG="librdkafka=trace,rdkafka::client=debug" cargo test
//! ```
//!
//! This will configure the logging level of librdkafka to trace, and the level of the client
//! module of the Rust client to debug. To actually receive logs from librdkafka, you also have to
//! set the `debug` option in the producer or consumer configuration (see librdkafka
//! [configuration](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)).
//! This will configure the logging level of librdkafka to trace, and the level
//! of the client module of the Rust client to debug. To actually receive logs
//! from librdkafka, you also have to set the `debug` option in the producer or
//! consumer configuration (see librdkafka
//! [configuration][librdkafka-config]).
//!
//! To enable debugging in your project, make sure you initialize the logger with
//! `env_logger::init()` or equivalent.
//! To enable debugging in your project, make sure you initialize the logger
//! with `env_logger::init()`, or the equivalent for any `log`-compatible
//! logging framework.
//!
//! [`BaseConsumer`]: https://docs.rs/rdkafka/*/rdkafka/consumer/base_consumer/struct.BaseConsumer.html
//! [`BaseProducer`]: https://docs.rs/rdkafka/*/rdkafka/producer/base_producer/struct.BaseProducer.html
//! [`ThreadedProducer`]: https://docs.rs/rdkafka/*/rdkafka/producer/base_producer/struct.ThreadedProducer.html
//! [`StreamConsumer`]: https://docs.rs/rdkafka/*/rdkafka/consumer/stream_consumer/struct.StreamConsumer.html
//! [`Future`]: https://doc.rust-lang.org/stable/std/future/trait.Future.html
//! [`FutureProducer`]: https://docs.rs/rdkafka/*/rdkafka/producer/future_producer/struct.FutureProducer.html
//! [`Stream`]: https://docs.rs/futures/*/futures/stream/trait.Stream.html
//! [`StreamConsumer`]: https://docs.rs/rdkafka/*/rdkafka/consumer/stream_consumer/struct.StreamConsumer.html
//! [`ThreadedProducer`]: https://docs.rs/rdkafka/*/rdkafka/producer/base_producer/struct.ThreadedProducer.html
//! [`log`]: https://docs.rs/log
//! [`env_logger`]: https://docs.rs/env_logger
//! [Apache Kafka]: https://kafka.apache.org
//! [asynchronous processing example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/asynchronous_processing.rs
//! [at-least-once delivery example]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/at_least_once.rs
//! [broker-compat]: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#broker-version-compatibility
//! [`examples`]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/
//! [futures]: https://github.com/rust-lang/futures-rs
//! [kafka-benchmark]: https://github.com/fede1024/kafka-benchmark
//! [kafka-benchmark]: https://github.com/fede1024/kafka-benchmark
//! [kafka-view]: https://github.com/fede1024/kafka-view
//! [librdkafka]: https://github.com/edenhill/librdkafka
//! [futures]: https://github.com/alexcrichton/futures-rs
//! [`future`]: https://docs.rs/futures/0.1.3/futures/trait.Future.html
//! [`stream`]: https://docs.rs/futures/0.1.3/futures/stream/trait.Stream.html
//! [librdkafka-config]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
//! [message delivery semantics]: https://kafka.apache.org/0101/documentation.html#semantics
//! [rdkafka-sys-features]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#features
//! [rdkafka-sys-known-issues]: https://github.com/fede1024/rust-rdkafka/tree/master/rdkafka-sys/README.md#known-issues
//! [timely-blog]: https://github.com/frankmcsherry/blog/blob/master/posts/2017-11-08.md
//! [timely-dataflow]: https://github.com/frankmcsherry/timely-dataflow
//! [Tokio]: https://tokio.rs/
#![warn(missing_docs)]
#![cfg_attr(docsrs, feature(doc_cfg))]
@ -244,7 +241,7 @@ pub mod statistics;
pub mod topic_partition_list;
pub mod util;
// Re-export
// Re-exports.
pub use crate::client::ClientContext;
pub use crate::config::ClientConfig;
pub use crate::message::{Message, Timestamp};

View File

@ -14,14 +14,14 @@ use rdkafka_sys::types::*;
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr};
/// Timestamp of a message
/// Timestamp of a Kafka message.
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum Timestamp {
/// Timestamp not available
/// Timestamp not available.
NotAvailable,
/// Message creation time
/// Message creation time.
CreateTime(i64),
/// Log append time
/// Log append time.
LogAppendTime(i64),
}
@ -61,35 +61,40 @@ impl From<SystemTime> for Timestamp {
// }
//}
/// Message headers trait
/// A generic representation of Kafka message headers.
///
/// A trait to represent readable message headers. Headers are key-value pairs that can be sent
/// alongside every message. Only read-only methods are provided by this trait, as the underlying
/// storage might not allow modification.
/// This trait represents readable message headers. Headers are key-value pairs
/// that can be sent alongside every message. Only read-only methods are
/// provided by this trait, as the underlying storage might not allow
/// modification.
pub trait Headers {
/// Return the number of defined headers.
/// Returns the number of contained headers.
fn count(&self) -> usize;
/// Get the specified header (the first header corresponds to index 0). If the index is
/// out of bounds, None is returned.
/// Gets the specified header, where the first header corresponds to index
/// 0. If the index is out of bounds, returns `None`.
fn get(&self, idx: usize) -> Option<(&str, &[u8])>;
/// Same as [Headers::get], but the value of the header will be converted to the specified type.
/// If the conversion fails, an error will be returned instead.
/// Like [`Headers::get`], but the value of the header will be converted to
/// the specified type. If the conversion fails, returns an error.
fn get_as<V: FromBytes + ?Sized>(&self, idx: usize) -> Option<(&str, Result<&V, V::Error>)> {
self.get(idx)
.map(|(name, value)| (name, V::from_bytes(value)))
}
}
/// The `Message` trait provides access to the fields of a generic Kafka message.
/// A generic representation of a Kafka message.
///
/// Only read-only methods are provided by this trait, as the underlying storage
/// might not allow modification.
pub trait Message {
/// Represent the type of headers that this message contains.
/// The type of headers that this message contains.
type Headers: Headers;
/// Returns the key of the message, or None if there is no key.
/// Returns the key of the message, or `None` if there is no key.
fn key(&self) -> Option<&[u8]>;
/// Returns the payload of the message, or None if there is no payload.
/// Returns the payload of the message, or `None` if there is no payload.
fn payload(&self) -> Option<&[u8]>;
/// Returns the source topic of the message.
@ -98,32 +103,34 @@ pub trait Message {
/// Returns the partition number where the message is stored.
fn partition(&self) -> i32;
/// Returns the offset of the message.
/// Returns the offset of the message within the partition.
fn offset(&self) -> i64;
/// Returns the message timestamp for a consumed message if available.
/// Returns the message timestamp.
fn timestamp(&self) -> Timestamp;
/// Converts the raw bytes of the payload to a reference of the specified type, that points to the
/// same data inside the message and without performing any memory allocation
/// Converts the raw bytes of the payload to a reference of the specified
/// type, that points to the same data inside the message and without
/// performing any memory allocation.
fn payload_view<P: ?Sized + FromBytes>(&self) -> Option<Result<&P, P::Error>> {
self.payload().map(P::from_bytes)
}
/// Converts the raw bytes of the key to a reference of the specified type, that points to the
/// same data inside the message and without performing any memory allocation
/// Converts the raw bytes of the key to a reference of the specified type,
/// that points to the same data inside the message and without performing
/// any memory allocation.
fn key_view<K: ?Sized + FromBytes>(&self) -> Option<Result<&K, K::Error>> {
self.key().map(K::from_bytes)
}
/// Returns the headers of the message, if available.
/// Returns the headers of the message, or `None` if there are no headers.
fn headers(&self) -> Option<&Self::Headers>;
}
/// Borrowed message headers
/// A zero-copy collection of Kafka message headers.
///
/// The `BorrowedHeaders` struct provides a read-only access to headers owned by [OwnedMessage]
/// struct or by a [OwnedHeaders] struct.
/// Provides a read-only access to headers owned by a Kafka consumer or producer
/// or by an [`OwnedHeaders`] struct.
pub struct BorrowedHeaders;
impl BorrowedHeaders {
@ -138,8 +145,10 @@ impl BorrowedHeaders {
self as *const BorrowedHeaders as *const RDKafkaHeaders
}
/// Clones the content of `BorrowedHeaders` and returns an `OwnedMessage`,
/// that can outlive the consumer. This operation requires memory allocation and can be expensive.
/// Clones the content of `BorrowedHeaders` and returns an [`OwnedHeaders`]
/// that can outlive the consumer.
///
/// This operation requires memory allocation and can be expensive.
pub fn detach(&self) -> OwnedHeaders {
OwnedHeaders {
ptr: unsafe {
@ -182,14 +191,20 @@ impl Headers for BorrowedHeaders {
/// A zero-copy Kafka message.
///
/// The content of the message is stored in the receiving buffer of the consumer or the producer. As
/// such, `BorrowedMessage` cannot outlive the consumer or producer it belongs to.
/// Provides a read-only access to headers owned by a Kafka consumer or producer
/// or by an [`OwnedMessage`] struct.
///
/// ## Consumers
/// `BorrowedMessage`s coming from consumers are removed from the consumer buffer once they are
/// dropped. Holding references to too many messages will cause the memory of the consumer to fill
/// up and the consumer to block until some of the `BorrowedMessage`s are dropped.
///
/// `BorrowedMessage`s coming from consumers are removed from the consumer
/// buffer once they are dropped. Holding references to too many messages will
/// cause the memory of the consumer to fill up and the consumer to block until
/// some of the `BorrowedMessage`s are dropped.
///
/// ## Conversion to owned
/// To transform a `BorrowedMessage` into a `OwnedMessage`, use the `detach` method.
///
/// To transform a `BorrowedMessage` into a [`OwnedMessage`], use the
/// [`detach`](BorrowedMessage::detach) method.
pub struct BorrowedMessage<'a> {
ptr: NativePtr<RDKafkaMessage>,
_owner: PhantomData<&'a u8>,
@ -200,17 +215,6 @@ unsafe impl KafkaDrop for RDKafkaMessage {
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_message_destroy;
}
/// The result of a message production.
///
/// If message production is successful `DeliveryResult` will contain the sent message, that can be
/// used to find which partition and offset the message was sent to. If message production is not
/// successful, the `DeliveryReport` will contain an error and the message that failed to be sent.
/// The partition and offset, in this case, will default to -1 and 0 respectively.
/// ## Lifetimes
/// In both success or failure scenarios, the payload of the message resides in the buffer of the
/// producer and will be automatically removed once the `delivery` callback finishes.
pub type DeliveryResult<'a> = Result<BorrowedMessage<'a>, (KafkaError, BorrowedMessage<'a>)>;
impl<'a> fmt::Debug for BorrowedMessage<'a> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Message {{ ptr: {:?} }}", self.ptr())
@ -218,10 +222,12 @@ impl<'a> fmt::Debug for BorrowedMessage<'a> {
}
impl<'a> BorrowedMessage<'a> {
/// Creates a new `BorrowedMessage` that wraps the native Kafka message pointer returned by a
/// consumer. The lifetime of the message will be bound to the lifetime of the consumer passed
/// as parameter. This method should only be used with messages coming from consumers. If the
/// message contains an error, only the error is returned and the message structure is freed.
/// Creates a new `BorrowedMessage` that wraps the native Kafka message
/// pointer returned by a consumer. The lifetime of the message will be
/// bound to the lifetime of the consumer passed as parameter. This method
/// should only be used with messages coming from consumers. If the message
/// contains an error, only the error is returned and the message structure
/// is freed.
pub(crate) unsafe fn from_consumer<C>(
ptr: NativePtr<RDKafkaMessage>,
_consumer: &'a C,
@ -242,10 +248,11 @@ impl<'a> BorrowedMessage<'a> {
}
}
/// Creates a new `BorrowedMessage` that wraps the native Kafka message pointer returned by the
/// delivery callback of a producer. The lifetime of the message will be bound to the lifetime
/// of the reference passed as parameter. This method should only be used with messages coming
/// from the delivery callback. The message will not be freed in any circumstance.
/// Creates a new `BorrowedMessage` that wraps the native Kafka message
/// pointer returned by the delivery callback of a producer. The lifetime of
/// the message will be bound to the lifetime of the reference passed as
/// parameter. This method should only be used with messages coming from the
/// delivery callback. The message will not be freed in any circumstance.
pub(crate) unsafe fn from_dr_callback<O>(
ptr: *mut RDKafkaMessage,
_owner: &'a O,
@ -264,12 +271,12 @@ impl<'a> BorrowedMessage<'a> {
}
}
/// Returns a pointer to the RDKafkaMessage.
/// Returns a pointer to the [`RDKafkaMessage`].
pub fn ptr(&self) -> *mut RDKafkaMessage {
self.ptr.ptr()
}
/// Returns a pointer to the message's RDKafkaTopic
/// Returns a pointer to the message's [`RDKafkaTopic`]
pub fn topic_ptr(&self) -> *mut RDKafkaTopic {
self.ptr.rkt
}
@ -284,8 +291,10 @@ impl<'a> BorrowedMessage<'a> {
self.ptr.len
}
/// Clones the content of the `BorrowedMessage` and returns an `OwnedMessage`, that can
/// outlive the consumer. This operation requires memory allocation and can be expensive.
/// Clones the content of the `BorrowedMessage` and returns an
/// [`OwnedMessage`] that can outlive the consumer.
///
/// This operation requires memory allocation and can be expensive.
pub fn detach(&self) -> OwnedMessage {
OwnedMessage {
key: self.key().map(|k| k.to_vec()),
@ -369,11 +378,11 @@ unsafe impl<'a> Sync for BorrowedMessage<'a> {}
// ********** OWNED MESSAGE **********
//
/// Owned message headers
/// A collection of Kafka message headers that owns its backing data.
///
/// Kafka supports associating an array of key-value pairs to every message, called message headers.
/// The `OwnedHeaders` can be used to create the desired headers and to pass them to the producer.
/// See also [BorrowedHeaders].
/// Kafka supports associating an array of key-value pairs to every message,
/// called message headers. The `OwnedHeaders` can be used to create the desired
/// headers and to pass them to the producer. See also [`BorrowedHeaders`].
#[derive(Debug)]
pub struct OwnedHeaders {
ptr: NativePtr<RDKafkaHeaders>,
@ -388,13 +397,13 @@ unsafe impl Send for OwnedHeaders {}
unsafe impl Sync for OwnedHeaders {}
impl OwnedHeaders {
/// Create a new `OwnedHeaders` struct with initial capacity 5.
/// Creates a new `OwnedHeaders` struct with initial capacity 5.
pub fn new() -> OwnedHeaders {
OwnedHeaders::new_with_capacity(5)
}
/// Create a new `OwnedHeaders` struct with the desired initial capacity. The structure
/// is automatically resized as more headers are added.
/// Creates a new `OwnedHeaders` struct with the desired initial capacity.
/// The structure is automatically resized as more headers are added.
pub fn new_with_capacity(initial_capacity: usize) -> OwnedHeaders {
OwnedHeaders {
ptr: unsafe {
@ -403,7 +412,7 @@ impl OwnedHeaders {
}
}
/// Add a new header to the structure.
/// Adds a new header.
pub fn add<V: ToBytes + ?Sized>(self, name: &str, value: &V) -> OwnedHeaders {
let name_cstring = CString::new(name.to_owned()).unwrap();
let value_bytes = value.to_bytes();
@ -425,7 +434,7 @@ impl OwnedHeaders {
self.ptr.ptr()
}
/// Generate a read-only [BorrowedHeaders] reference.
/// Generates a read-only [`BorrowedHeaders`] reference.
pub fn as_borrowed(&self) -> &BorrowedHeaders {
unsafe { &*(self.ptr() as *mut RDKafkaHeaders as *mut BorrowedHeaders) }
}
@ -455,9 +464,11 @@ impl Clone for OwnedHeaders {
}
}
/// An [OwnedMessage] can be created from a [BorrowedMessage] using the [BorrowedMessage::detach]
/// method. [OwnedMessage]s don't hold any reference to the consumer and don't use any memory inside the
/// consumer buffer.
/// A Kafka message that owns its backing data.
///
/// An `OwnedMessage` can be created from a [`BorrowedMessage`] using the
/// [`BorrowedMessage::detach`] method. `OwnedMessage`s don't hold any reference
/// to the consumer and don't use any memory inside the consumer buffer.
#[derive(Debug, Clone)]
pub struct OwnedMessage {
payload: Option<Vec<u8>>,
@ -470,7 +481,9 @@ pub struct OwnedMessage {
}
impl OwnedMessage {
/// Create a new message with the specified content. Mainly useful for writing tests.
/// Creates a new message with the specified content.
///
/// This function is mainly useful in tests of `rust-rdkafka` itself.
pub fn new(
payload: Option<Vec<u8>>,
key: Option<Vec<u8>>,
@ -530,11 +543,30 @@ impl Message for OwnedMessage {
}
}
/// Given a reference to a byte array, returns a different view of the same data.
/// No allocation is performed, however the underlying data might be checked for
/// correctness (for example when converting to `str`).
/// The result of a message production.
///
/// If message production is successful `DeliveryResult` will contain the sent
/// message, which can be used to find which partition and offset the message
/// was sent to. If message production is not successful, the `DeliveryReport`
/// will contain an error and the message that failed to be sent. The partition
/// and offset, in this case, will default to -1 and 0 respectively.
///
/// ## Lifetimes
///
/// In both success or failure scenarios, the payload of the message resides in
/// the buffer of the producer and will be automatically removed once the
/// `delivery` callback finishes.
pub type DeliveryResult<'a> = Result<BorrowedMessage<'a>, (KafkaError, BorrowedMessage<'a>)>;
/// A cheap conversion from a byte slice to typed data.
///
/// Given a reference to a byte slice, returns a different view of the same
/// data. No allocation is performed, however the underlying data might be
/// checked for correctness (for example when converting to `str`).
///
/// See also the [`ToBytes`] trait.
pub trait FromBytes {
/// The error type that will be used whenever the conversion fails.
/// The error type that will be returned if the conversion fails.
type Error;
/// Tries to convert the provided byte slice into a different type.
fn from_bytes(_: &[u8]) -> Result<&Self, Self::Error>;
@ -554,10 +586,14 @@ impl FromBytes for str {
}
}
/// A cheap conversion from typed data to a byte slice.
///
/// Given some data, returns the byte representation of that data.
/// No copy of the data should be performed.
///
/// See also the [`FromBytes`] trait.
pub trait ToBytes {
/// Convert the provided data to bytes.
/// Converts the provided data to bytes.
fn to_bytes(&self) -> &[u8];
}

View File

@ -48,7 +48,8 @@ impl MetadataPartition {
}
// TODO: return result?
/// Returns the metadata error for the partition, or None if there is no error.
/// Returns the metadata error for the partition, or `None` if there is no
/// error.
pub fn error(&self) -> Option<RDKafkaRespErr> {
if self.0.err.is_error() {
Some(self.0.err)
@ -57,12 +58,12 @@ impl MetadataPartition {
}
}
/// Returns the broker ids of the replicas.
/// Returns the broker IDs of the replicas.
pub fn replicas(&self) -> &[i32] {
unsafe { slice::from_raw_parts(self.0.replicas, self.0.replica_cnt as usize) }
}
/// Returns the broker ids of the in sync replicas.
/// Returns the broker IDs of the in-sync replicas.
pub fn isr(&self) -> &[i32] {
unsafe { slice::from_raw_parts(self.0.isrs, self.0.isr_cnt as usize) }
}
@ -91,7 +92,8 @@ impl MetadataTopic {
}
}
/// Returns the metadata error, or None if there was no error.
/// Returns the metadata error for the topic, or `None` if there was no
/// error.
pub fn error(&self) -> Option<RDKafkaRespErr> {
if self.0.err.is_error() {
Some(self.0.err)
@ -101,8 +103,10 @@ impl MetadataTopic {
}
}
/// Metadata container. This structure wraps the metadata pointer returned by rdkafka-sys,
/// and deallocates all the native resources when dropped.
/// Metadata container.
///
/// This structure wraps the metadata pointer returned by rdkafka-sys, and
/// deallocates all the native resources when dropped.
pub struct Metadata(NativePtr<RDKafkaMetadata>);
unsafe impl KafkaDrop for RDKafkaMetadata {
@ -120,7 +124,7 @@ impl Metadata {
Metadata(NativePtr::from_ptr(ptr as *mut _).unwrap())
}
/// Returns the id of the broker originating this metadata.
/// Returns the ID of the broker originating this metadata.
pub fn orig_broker_id(&self) -> i32 {
self.0.orig_broker_id
}

View File

@ -1,38 +1,45 @@
//! Low level Kafka producers.
//! Low-level Kafka producers.
//!
//! For more information about the producers provided in rdkafka, refer to the module level documentation.
//! For more information about the producers provided in rdkafka, refer to the
//! [`producer`](super) module documentation.
//!
//! ## `BaseProducer`
//!
//! The `BaseProducer` is a low level Kafka producer designed to be as similar as possible to
//! the underlying C librdkafka producer, while maintaining a safe Rust interface.
//! The [`BaseProducer`] is a low level Kafka producer designed to be as similar
//! as possible to the underlying C librdkafka producer, while maintaining a
//! safe Rust interface.
//!
//! Production of messages is fully asynchronous. The librdkafka producer will take care of buffering
//! requests together according to configuration, and to send them efficiently. Once a message has
//! been produced, or the retry count reached, a callback function called delivery callback will be
//! called.
//! Production of messages is fully asynchronous. The librdkafka producer will
//! take care of buffering requests together according to configuration, and to
//! send them efficiently. Once a message has been produced, or the retry count
//! reached, a callback function called delivery callback will be called.
//!
//! The `BaseProducer` requires a `ProducerContext` which will be used to specify the delivery callback
//! and the `DeliveryOpaque`. The `DeliveryOpaque` is a user-defined type that the user can pass to the
//! `send` method of the producer, and that the producer will then forward to the delivery
//! callback of the corresponding message. The `DeliveryOpaque` is useful in case the delivery
//! callback requires additional information about the message (such as message id for example).
//! The `BaseProducer` requires a [`ProducerContext`] which will be used to
//! specify the delivery callback and the
//! [`DeliveryOpaque`](ProducerContext::DeliveryOpaque). The `DeliveryOpaque` is
//! a user-defined type that the user can pass to the `send` method of the
//! producer, and that the producer will then forward to the delivery callback
//! of the corresponding message. The `DeliveryOpaque` is useful in case the
//! delivery callback requires additional information about the message (such as
//! message id for example).
//!
//! ### Calling poll
//!
//! To execute delivery callbacks the `poll` method of the producer should be called regularly.
//! If `poll` is not called, or not often enough, a `RDKafkaError::QueueFull` error will be returned.
//! To execute delivery callbacks the `poll` method of the producer should be
//! called regularly. If `poll` is not called, or not often enough, a
//! [`RDKafkaError::QueueFull`] error will be returned.
//!
//! ## `ThreadedProducer`
//! The `ThreadedProducer` is a wrapper around the `BaseProducer` which spawns a thread
//! dedicated to calling `poll` on the producer at regular intervals, so that the user doesn't
//! have to. The thread is started when the producer is created, and it will be terminated
//! once the producer goes out of scope.
//!
//! A `RDKafkaError::QueueFull` error can still be returned in case the polling thread is not
//! fast enough or Kafka is not able to receive data and acknowledge messages quickly enough.
//! If this error is returned, the producer should wait and try again.
//! The `ThreadedProducer` is a wrapper around the `BaseProducer` which spawns a
//! thread dedicated to calling `poll` on the producer at regular intervals, so
//! that the user doesn't have to. The thread is started when the producer is
//! created, and it will be terminated once the producer goes out of scope.
//!
//! A [`RDKafkaError::QueueFull`] error can still be returned in case the
//! polling thread is not fast enough or Kafka is not able to receive data and
//! acknowledge messages quickly enough. If this error is returned, the caller
//! should wait and try again.
use std::ffi::CString;
use std::mem;
@ -61,21 +68,30 @@ pub use crate::message::DeliveryResult;
// ********** PRODUCER CONTEXT **********
//
/// A `ProducerContext` is an object that can be used during the creation of a producer to
/// customizer its behavior. In particular, it can be used to specify the `delivery` callback
/// that will be called when the ack from a delivered message is received.
/// Producer-specific context.
///
/// This user-defined object can be used to provide custom callbacks for
/// producer events. Refer to the list of methods to check which callbacks can
/// be specified.
///
/// In particular, it can be used to specify the `delivery` callback that will
/// be called when the acknowledgement for a delivered message is received.
///
/// See also the [`ClientContext`] trait.
pub trait ProducerContext: ClientContext {
/// A `DeliveryOpaque` is a user-defined structure that will be passed to the producer when
/// producing a message, and returned to the `delivery` method once the message has been
/// delivered, or failed to.
/// A `DeliveryOpaque` is a user-defined structure that will be passed to
/// the producer when producing a message, and returned to the `delivery`
/// method once the message has been delivered, or failed to.
type DeliveryOpaque: IntoOpaque;
/// This method will be called once the message has been delivered (or failed to). The
/// `DeliveryOpaque` will be the one provided by the user when calling send.
/// This method will be called once the message has been delivered (or
/// failed to). The `DeliveryOpaque` will be the one provided by the user
/// when calling send.
fn delivery(&self, delivery_result: &DeliveryResult, delivery_opaque: Self::DeliveryOpaque);
}
/// Default producer context that can be use when a custom context is not required.
/// An empty producer context that can be used when customizations are not
/// required.
#[derive(Clone)]
pub struct DefaultProducerContext;
@ -111,15 +127,16 @@ unsafe extern "C" fn delivery_cb<C: ProducerContext>(
// ********** BASE PRODUCER **********
//
/// Producer record for the base producer
/// A record for the [`BaseProducer`] and [`ThreadedProducer`].
///
/// The `BaseRecord` is a structure that can be used to provide a new record to
/// [BaseProducer::send]. Since most fields are optional, a `BaseRecord` can be constructed
/// using the builder pattern.
/// [`BaseProducer::send`] or [`ThreadedProducer::send`]. Since most fields are
/// optional, a `BaseRecord` can be constructed using the builder pattern.
///
/// # Examples
///
/// This example will create a `BaseRecord` with no [DeliveryOpaque](ProducerContext::DeliveryOpaque):
/// This example will create a `BaseRecord` with no
/// [`DeliveryOpaque`](ProducerContext::DeliveryOpaque):
///
/// ```rust,no_run
/// # use rdkafka::producer::BaseRecord;
@ -130,16 +147,16 @@ unsafe extern "C" fn delivery_cb<C: ProducerContext>(
/// .partition(5); // target partition
/// ```
///
/// The following example will build a similar record, but it will use a number as `DeliveryOpaque`
/// for the message:
/// The following example will build a similar record, but it will use a number
/// as the `DeliveryOpaque` for the message:
///
/// ```rust,no_run
/// # use rdkafka::producer::BaseRecord;
/// # use rdkafka::message::ToBytes;
/// let record = BaseRecord::with_opaque_to("topic_name", 123) // destination topic and message id
/// .key(&[1, 2, 3, 4]) // message key
/// .payload("content") // message payload
/// .partition(5); // target partition
/// .key(&[1, 2, 3, 4]) // message key
/// .payload("content") // message payload
/// .partition(5); // target partition
/// ```
#[derive(Debug)]
pub struct BaseRecord<
@ -148,27 +165,27 @@ pub struct BaseRecord<
P: ToBytes + ?Sized + 'a = (),
D: IntoOpaque = (),
> {
/// Required destination topic
/// Required destination topic.
pub topic: &'a str,
/// Optional destination partition
/// Optional destination partition.
pub partition: Option<i32>,
/// Optional payload
/// Optional payload.
pub payload: Option<&'a P>,
/// Optional key
/// Optional key.
pub key: Option<&'a K>,
/// Optional timestamp
/// Optional timestamp.
///
/// Note that Kafka represents timestamps as the number of milliseconds
/// since the Unix epoch.
pub timestamp: Option<i64>,
/// Optional message headers
/// Optional message headers.
pub headers: Option<OwnedHeaders>,
/// Required delivery opaque (defaults to `()` if not required)
/// Required delivery opaque (defaults to `()` if not required).
pub delivery_opaque: D,
}
impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized, D: IntoOpaque> BaseRecord<'a, K, P, D> {
/// Create a new record with the specified topic name and delivery opaque.
/// Creates a new record with the specified topic name and delivery opaque.
pub fn with_opaque_to(topic: &'a str, delivery_opaque: D) -> BaseRecord<'a, K, P, D> {
BaseRecord {
topic,
@ -181,25 +198,25 @@ impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized, D: IntoOpaque> BaseRecord<'a,
}
}
/// Set the destination partition of the record.
/// Sets the destination partition of the record.
pub fn partition(mut self, partition: i32) -> BaseRecord<'a, K, P, D> {
self.partition = Some(partition);
self
}
/// Set the payload of the record.
/// Sets the payload of the record.
pub fn payload(mut self, payload: &'a P) -> BaseRecord<'a, K, P, D> {
self.payload = Some(payload);
self
}
/// Set the key of the record.
/// Sets the key of the record.
pub fn key(mut self, key: &'a K) -> BaseRecord<'a, K, P, D> {
self.key = Some(key);
self
}
/// Set the timestamp of the record.
/// Sets the timestamp of the record.
///
/// Note that Kafka represents timestamps as the number of milliseconds
/// since the Unix epoch.
@ -208,7 +225,7 @@ impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized, D: IntoOpaque> BaseRecord<'a,
self
}
/// Set the headers of the record.
/// Sets the headers of the record.
pub fn headers(mut self, headers: OwnedHeaders) -> BaseRecord<'a, K, P, D> {
self.headers = Some(headers);
self
@ -216,7 +233,7 @@ impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized, D: IntoOpaque> BaseRecord<'a,
}
impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> BaseRecord<'a, K, P, ()> {
/// Create a new record with the specified topic name.
/// Creates a new record with the specified topic name.
pub fn to(topic: &'a str) -> BaseRecord<'a, K, P, ()> {
BaseRecord {
topic,
@ -238,7 +255,8 @@ impl FromClientConfig for BaseProducer<DefaultProducerContext> {
}
impl<C: ProducerContext> FromClientConfigAndContext<C> for BaseProducer<C> {
/// Creates a new `BaseProducer` starting from a configuration and a context.
/// Creates a new `BaseProducer` starting from a configuration and a
/// context.
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<BaseProducer<C>> {
let native_config = config.create_native_config()?;
unsafe { rdsys::rd_kafka_conf_set_dr_msg_cb(native_config.ptr(), Some(delivery_cb::<C>)) };
@ -252,17 +270,19 @@ impl<C: ProducerContext> FromClientConfigAndContext<C> for BaseProducer<C> {
}
}
/// Low level Kafka producer.
/// Lowest level Kafka producer.
///
/// The `BaseProducer` needs to be polled at regular intervals in order to serve queued delivery
/// report callbacks (for more information, refer to the module-level documentation). This producer
/// can be cheaply cloned to create a new reference to the same underlying producer.
/// The `BaseProducer` needs to be polled at regular intervals in order to serve
/// queued delivery report callbacks (for more information, refer to the
/// module-level documentation). This producer can be cheaply cloned to create a
/// new reference to the same underlying producer.
///
/// # Example usage
///
/// This code will send a message to Kafka. No custom [ProducerContext] is specified, so the
/// [DefaultProducerContext] will be used. To see how to use a producer context, refer to the
/// examples in the examples folder.
/// This code will send a message to Kafka. No custom [`ProducerContext`] is
/// specified, so the [`DefaultProducerContext`] will be used. To see how to use
/// a producer context, refer to the examples in the [`examples`] folder.
///
/// ```rust
/// use rdkafka::config::ClientConfig;
/// use rdkafka::producer::{BaseProducer, BaseRecord};
@ -287,6 +307,8 @@ impl<C: ProducerContext> FromClientConfigAndContext<C> for BaseProducer<C> {
/// // And/or flush the producer before dropping it.
/// producer.flush(Duration::from_secs(1));
/// ```
///
/// [`examples`]: https://github.com/fede1024/rust-rdkafka/blob/master/examples/
pub struct BaseProducer<C: ProducerContext = DefaultProducerContext> {
client_arc: Arc<Client<C>>,
}
@ -299,8 +321,10 @@ impl<C: ProducerContext> BaseProducer<C> {
}
}
/// Polls the producer. Regular calls to `poll` are required to process the events
/// and execute the message delivery callbacks. Returns the number of events served.
/// Polls the producer, returning the number of events served.
///
/// Regular calls to `poll` are required to process the events and execute
/// the message delivery callbacks.
pub fn poll<T: Into<Timeout>>(&self, timeout: T) -> i32 {
unsafe { rdsys::rd_kafka_poll(self.native_ptr(), timeout.into().as_millis()) }
}
@ -310,17 +334,21 @@ impl<C: ProducerContext> BaseProducer<C> {
self.client_arc.native_ptr()
}
/// Produce a message to Kafka. Message fields such as key, payload, partition, timestamp etc.
/// are provided to this method via a [BaseRecord]. If the message is correctly enqueued in the
/// producer's memory buffer, the method will take ownership of the record and return
/// immediately; in case of failure to enqueue, the original record is returned, alongside an
/// error code. If the message fails to be produced after being enqueued in the buffer, the
/// [ProducerContext::delivery] method will be called asynchronously, with the provided
/// [ProducerContext::DeliveryOpaque].
/// Sends a message to Kafka.
///
/// When no partition is specified the underlying Kafka library picks a partition based on a
/// hash of the key. If no key is specified, a random partition will be used. To correctly
/// handle errors, the delivery callback should be implemented.
/// Message fields such as key, payload, partition, timestamp etc. are
/// provided to this method via a [`BaseRecord`]. If the message is
/// correctly enqueued in the producer's memory buffer, the method will take
/// ownership of the record and return immediately; in case of failure to
/// enqueue, the original record is returned, alongside an error code. If
/// the message fails to be produced after being enqueued in the buffer, the
/// [`ProducerContext::delivery`] method will be called asynchronously, with
/// the provided [`ProducerContext::DeliveryOpaque`].
///
/// When no partition is specified the underlying Kafka library picks a
/// partition based on a hash of the key. If no key is specified, a random
/// partition will be used. To correctly handle errors, the delivery
/// callback should be implemented.
///
/// Note that this method will never block.
// Simplifying the return type requires generic associated types, which are
@ -380,13 +408,16 @@ impl<C: ProducerContext> BaseProducer<C> {
}
}
/// Flushes the producer. Should be called before termination. This method will call `poll()`
/// internally.
/// Flushes any pending messages.
///
/// This method should be called before termination to ensure delivery of
/// all enqueued messages. It will call `poll()` internally.
pub fn flush<T: Into<Timeout>>(&self, timeout: T) {
unsafe { rdsys::rd_kafka_flush(self.native_ptr(), timeout.into().as_millis()) };
}
/// Returns the number of messages waiting to be sent, or sent but not acknowledged yet.
/// Returns the number of messages that are either waiting to be sent or are
/// sent but are waiting to be acknowledged.
pub fn in_flight_count(&self) -> i32 {
unsafe { rdsys::rd_kafka_outq_len(self.native_ptr()) }
}
@ -409,11 +440,12 @@ impl<C: ProducerContext> Clone for BaseProducer<C> {
// ********** THREADED PRODUCER **********
//
/// A producer with a separate thread for event handling.
/// A low-level Kafka producer with a separate thread for event handling.
///
/// The `ThreadedProducer` is a `BaseProducer` with a separate thread dedicated to calling `poll` at
/// regular intervals in order to execute any queued event, such as delivery notifications. The
/// thread will be automatically stopped when the producer is dropped.
/// The `ThreadedProducer` is a [`BaseProducer`] with a separate thread
/// dedicated to calling `poll` at regular intervals in order to execute any
/// queued events, such as delivery notifications. The thread will be
/// automatically stopped when the producer is dropped.
#[must_use = "The threaded producer will stop immediately if unused"]
pub struct ThreadedProducer<C: ProducerContext + 'static> {
producer: BaseProducer<C>,
@ -466,7 +498,9 @@ impl<C: ProducerContext + 'static> FromClientConfigAndContext<C> for ThreadedPro
}
impl<C: ProducerContext + 'static> ThreadedProducer<C> {
/// Sends a message to Kafka. See the documentation in `BaseProducer`.
/// Sends a message to Kafka.
///
/// See the documentation for [`BaseProducer::send`] for details.
// Simplifying the return type requires generic associated types, which are
// unstable.
#[allow(clippy::type_complexity)]
@ -481,18 +515,24 @@ impl<C: ProducerContext + 'static> ThreadedProducer<C> {
self.producer.send(record)
}
/// Polls the internal producer. This is not normally required since the `ThreadedProducer` had
/// a thread dedicated to calling `poll` regularly.
/// Polls the internal producer.
///
/// This is not normally required since the `ThreadedProducer` has a thread
/// dedicated to calling `poll` regularly.
pub fn poll<T: Into<Timeout>>(&self, timeout: T) {
self.producer.poll(timeout);
}
/// Flushes the producer. Should be called before termination.
/// Flushes any pending messages.
///
/// This method should be called before termination to ensure delivery of
/// all enqueued messages.
pub fn flush<T: Into<Timeout>>(&self, timeout: T) {
self.producer.flush(timeout);
}
/// Returns the number of messages waiting to be sent, or send but not acknowledged yet.
/// Returns the number of messages that are either waiting to be sent or are
/// sent but are waiting to be acknowledged.
pub fn in_flight_count(&self) -> i32 {
self.producer.in_flight_count()
}
@ -521,8 +561,9 @@ impl<C: ProducerContext + 'static> Drop for ThreadedProducer<C> {
#[cfg(test)]
mod tests {
// Just test that there are no panics, and that each struct implements the expected
// traits (Clone, Send, Sync etc.). Behavior is tested in the integrations tests.
// Just test that there are no panics, and that each struct implements the
// expected traits (Clone, Send, Sync etc.). Behavior is tested in the
// integration tests.
use super::*;
use crate::config::ClientConfig;

View File

@ -1,6 +1,6 @@
//! Future producer
//! High-level, futures-enabled Kafka producer.
//!
//! A high level producer that returns a Future for every produced message.
//! See the [`FutureProducer`] for details.
// TODO: extend docs
use std::future::Future;
@ -24,26 +24,29 @@ use crate::util::{IntoOpaque, Timeout};
// ********** FUTURE PRODUCER **********
//
/// Same as [BaseRecord] but specific to the [FutureProducer]. The only difference is that
/// the [FutureRecord] doesn't provide custom delivery opaque object.
/// A record for the future producer.
///
/// Like [`BaseRecord`], but specific to the [`FutureProducer`]. The only
/// difference is that the [FutureRecord] doesn't provide custom delivery opaque
/// object.
#[derive(Debug)]
pub struct FutureRecord<'a, K: ToBytes + ?Sized + 'a, P: ToBytes + ?Sized + 'a> {
/// Required destination topic
/// Required destination topic.
pub topic: &'a str,
/// Optional destination partition
/// Optional destination partition.
pub partition: Option<i32>,
/// Optional payload
/// Optional payload.
pub payload: Option<&'a P>,
/// Optional key
/// Optional key.
pub key: Option<&'a K>,
/// Optional timestamp
/// Optional timestamp.
pub timestamp: Option<i64>,
/// Optional message headers
/// Optional message headers.
pub headers: Option<OwnedHeaders>,
}
impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> FutureRecord<'a, K, P> {
/// Create a new record with the specified topic name.
/// Creates a new record with the specified topic name.
pub fn to(topic: &'a str) -> FutureRecord<'a, K, P> {
FutureRecord {
topic,
@ -68,31 +71,31 @@ impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> FutureRecord<'a, K, P> {
}
}
/// Set the destination partition of the record.
/// Sets the destination partition of the record.
pub fn partition(mut self, partition: i32) -> FutureRecord<'a, K, P> {
self.partition = Some(partition);
self
}
/// Set the destination payload of the record.
/// Sets the destination payload of the record.
pub fn payload(mut self, payload: &'a P) -> FutureRecord<'a, K, P> {
self.payload = Some(payload);
self
}
/// Set the destination key of the record.
/// Sets the destination key of the record.
pub fn key(mut self, key: &'a K) -> FutureRecord<'a, K, P> {
self.key = Some(key);
self
}
/// Set the destination timestamp of the record.
/// Sets the destination timestamp of the record.
pub fn timestamp(mut self, timestamp: i64) -> FutureRecord<'a, K, P> {
self.timestamp = Some(timestamp);
self
}
/// Set the headers of the record.
/// Sets the headers of the record.
pub fn headers(mut self, headers: OwnedHeaders) -> FutureRecord<'a, K, P> {
self.headers = Some(headers);
self
@ -111,18 +114,22 @@ impl<'a, K: ToBytes + ?Sized, P: ToBytes + ?Sized> FutureRecord<'a, K, P> {
}
}
/// The `ProducerContext` used by the `FutureProducer`. This context will use a Future as its
/// `DeliveryOpaque` and will complete the future when the message is delivered (or failed to).
/// The [`ProducerContext`] used by the [`FutureProducer`].
///
/// This context will use a [`Future`] as its `DeliveryOpaque` and will complete
/// the future when the message is delivered (or failed to).
#[derive(Clone)]
pub struct FutureProducerContext<C: ClientContext + 'static> {
wrapped_context: C,
}
/// Represents the result of message production as performed from the `FutureProducer`.
/// Represents the result of message production as performed from the
/// `FutureProducer`.
///
/// If message delivery was successful, `OwnedDeliveryResult` will return the partition and offset
/// of the message. If the message failed to be delivered an error will be returned, together with
/// an owned copy of the original message.
/// If message delivery was successful, `OwnedDeliveryResult` will return the
/// partition and offset of the message. If the message failed to be delivered
/// an error will be returned, together with an owned copy of the original
/// message.
type OwnedDeliveryResult = Result<(i32, i64), (KafkaError, OwnedMessage)>;
// Delegates all the methods calls to the wrapped context.
@ -156,13 +163,16 @@ impl<C: ClientContext + 'static> ProducerContext for FutureProducerContext<C> {
}
}
/// A producer that returns a `Future` for every message being produced.
/// A producer that returns a [`Future`] for every message being produced.
///
/// Since message production in rdkafka is asynchronous, the caller cannot immediately know if the
/// delivery of the message was successful or not. The `FutureProducer` provides this information in
/// a `Future`, that will be completed once the information becomes available. This producer has an
/// internal polling thread and as such it doesn't need to be polled. It can be cheaply cloned to
/// get a reference to the same underlying producer. The internal will be terminated once the
/// Since message production in rdkafka is asynchronous, the caller cannot
/// immediately know if the delivery of the message was successful or not. The
/// FutureProducer provides this information in a [`Future`], which will be
/// completed once the information becomes available.
///
/// This producer has an internal polling thread and as such it doesn't need to
/// be polled. It can be cheaply cloned to get a reference to the same
/// underlying producer. The internal polling thread will be terminated when the
/// `FutureProducer` goes out of scope.
#[must_use = "Producer polling thread will stop immediately if unused"]
pub struct FutureProducer<C: ClientContext + 'static = DefaultClientContext> {
@ -215,11 +225,13 @@ impl Future for DeliveryFuture {
}
impl<C: ClientContext + 'static> FutureProducer<C> {
/// Sends the provided [FutureRecord]. Returns a [DeliveryFuture] that will eventually contain the
/// result of the send. The `block_ms` parameter will control for how long the producer
/// is allowed to block if the queue is full. Set it to -1 to block forever, or 0 to never block.
/// If `block_ms` is reached and the queue is still full, a [RDKafkaError::QueueFull] will be
/// reported in the [DeliveryFuture].
/// Sends a message to Kafka, returning a [`DeliveryFuture`] that will
/// resolve with the result of the send.
///
/// The `block_ms` parameter will control for how long the producer is
/// allowed to block if the queue is full. Set it to -1 to block forever, or
/// 0 to never block. If `block_ms` is reached and the queue is still full,
/// a [RDKafkaError::QueueFull] will be reported in the [DeliveryFuture].
pub fn send<K, P>(&self, record: FutureRecord<K, P>, block_ms: i64) -> DeliveryFuture
where
K: ToBytes + ?Sized,
@ -263,8 +275,8 @@ impl<C: ClientContext + 'static> FutureProducer<C> {
}
}
/// Same as [FutureProducer::send], with the only difference that if enqueuing fails, an
/// error will be returned immediately, alongside the [FutureRecord] provided.
/// Like [`FutureProducer::send`], but if enqueuing fails, an error will be
/// returned immediately, alongside the [FutureRecord] provided.
pub fn send_result<'a, K, P>(
&self,
record: FutureRecord<'a, K, P>,
@ -281,18 +293,24 @@ impl<C: ClientContext + 'static> FutureProducer<C> {
.map_err(|(e, record)| (e, FutureRecord::from_base_record(record)))
}
/// Polls the internal producer. This is not normally required since the `ThreadedProducer` had
/// a thread dedicated to calling `poll` regularly.
/// Polls the internal producer.
///
/// This is not normally required since the `FutureProducer` had a thread
/// dedicated to calling `poll` regularly.
pub fn poll<T: Into<Timeout>>(&self, timeout: T) {
self.producer.poll(timeout);
}
/// Flushes the producer. Should be called before termination.
/// Flushes any pending messages.
///
/// This method should be called before termination to ensure delivery of
/// all enqueued messages.
pub fn flush<T: Into<Timeout>>(&self, timeout: T) {
self.producer.flush(timeout);
}
/// Returns the number of messages waiting to be sent, or send but not acknowledged yet.
/// Returns the number of messages that are either waiting to be sent or are
/// sent but are waiting to be acknowledged.
pub fn in_flight_count(&self) -> i32 {
self.producer.in_flight_count()
}

View File

@ -1,77 +1,116 @@
//! Low level and high level rdkafka producers.
//! Kafka producers.
//!
//! ## The C librdkafka producer
//! Rust-rdkafka relies on the C librdkafka producer to communicate with Kafka, so in order to understand how
//! the Rust producers work it is important to understand the basics of the C one as well.
//!
//! Rust-rdkafka relies on the C librdkafka producer to communicate with Kafka,
//! so in order to understand how the Rust producers work it is important to
//! understand the basics of the C one as well.
//!
//! ### Async
//! The librdkafka producer is completely asynchronous: it maintains a memory buffer where messages
//! waiting to be sent or currently in flight are stored. Once a message is delivered or an error
//! occurred and the maximum number of retries has been reached, the producer will enqueue a delivery
//! event with the appropriate delivery result into an internal event queue.
//!
//! The librdkafka user is responsible for calling the `poll` function at regular intervals to
//! process those events; the thread calling `poll` will be the one executing the user-specified
//! delivery callback for every delivery event. If `poll` is not called, or not frequently
//! enough, the producer will return a `RDKafkaError::QueueFull` error and it won't be able to send any other
//! message until more delivery event are processed via `poll`. The `QueueFull` error can also be
//! returned if Kafka is not able to receive the messages quickly enough.
//! The librdkafka producer is completely asynchronous: it maintains a memory
//! buffer where messages waiting to be sent or currently in flight are stored.
//! Once a message is delivered or an error occurred and the maximum number of
//! retries has been reached, the producer will enqueue a delivery event with
//! the appropriate delivery result into an internal event queue.
//!
//! The librdkafka user is responsible for calling the `poll` function at
//! regular intervals to process those events; the thread calling `poll` will be
//! the one executing the user-specified delivery callback for every delivery
//! event. If `poll` is not called, or not frequently enough, the producer will
//! return a `RDKafkaError::QueueFull` error and it won't be able to send any
//! other message until more delivery event are processed via `poll`. The
//! `QueueFull` error can also be returned if Kafka is not able to receive the
//! messages quickly enough.
//!
//! ### Error reporting
//! The C library will try deal with all the transient errors such as broker disconnection,
//! timeouts etc. These errors, called global errors, are automatically logged in rust-rdkafka, but
//! they normally don't require any handling as they are automatically handled internally.
//! To see the logs, make sure you initialize the logger.
//!
//! As mentioned earlier, errors specific to message production will be reported in the delivery callback.
//! The C library will try deal with all the transient errors such as broker
//! disconnection, timeouts etc. These errors, called global errors, are
//! automatically logged in rust-rdkafka, but they normally don't require any
//! handling as they are automatically handled internally. To see the logs, make
//! sure you initialize the logger.
//!
//! As mentioned earlier, errors specific to message production will be reported
//! in the delivery callback.
//!
//! ### Buffering
//! Buffering is done automatically by librdkafka. When `send` is called, the message is enqueued
//! internally and once enough messages have been enqueued, or when enough time has passed, they will be
//! sent to Kafka as a single batch. You can control the behavior of the buffer by configuring the
//! the `queue.buffering.max.*` parameters listed below.
//!
//! ## Rust-rdkafka producers
//! Rust-rdkafka (rdkafka for brevity) provides two sets of producers: low level and high level.
//! Buffering is done automatically by librdkafka. When `send` is called, the
//! message is enqueued internally and once enough messages have been enqueued,
//! or when enough time has passed, they will be sent to Kafka as a single
//! batch. You can control the behavior of the buffer by configuring the the
//! `queue.buffering.max.*` parameters listed below.
//!
//! ### Low level producers
//! The lowest level producer provided by rdkafka is called `BaseProducer`. The goal of the
//! `BaseProducer` is to be as close as possible to the C one while maintaining a safe Rust interface.
//! In particular, the `BaseProducer` needs to be polled at regular intervals to execute
//! any delivery callback that might be waiting and to make sure the queue doesn't fill up.
//! ## `rust-rdkafka` producers
//!
//! Another low lever producer is the `ThreadedProducer`, which is a `BaseProducer` with
//! a dedicated thread for polling.
//! `rust-rdkafka` (rdkafka for brevity) provides two sets of producers: low
//! level and high level.
//!
//! The delivery callback can be defined using a `ProducerContext`. More information in the
//! `base_producer` module.
//! ### Low-level producers
//!
//! ### High level producer
//! At the moment the only high level producer implemented is the `FutureProducer`. The
//! `FutureProducer` doesn't rely on user-defined callbacks to notify the delivery or failure of
//! a message; instead, this information will be returned in a Future. The `FutureProducer` also
//! uses an internal thread that is used for polling, which makes calling poll explicitly not necessary.
//! The returned future will contain information about the delivered message in case of success,
//! or a copy of the original message in case of failure. Additional computation can be chained
//! to the returned future, and it will executed by the future executor once the value is available
//! (for more information, check the documentation of the futures crate).
//! The lowest level producer provided by rdkafka is called [`BaseProducer`].
//! The goal of the `BaseProducer` is to be as close as possible to the C one
//! while maintaining a safe Rust interface. In particular, the `BaseProducer`
//! needs to be polled at regular intervals to execute any delivery callback
//! that might be waiting and to make sure the queue doesn't fill up.
//!
//! Another low lever producer is the [`ThreadedProducer`], which is a
//! `BaseProducer` with a dedicated thread for polling.
//!
//! The delivery callback can be defined using a `ProducerContext`. See the
//! [`base_producer`] module for more information.
//!
//! ### High-level producer
//!
//! At the moment the only high level producer implemented is the
//! [`FutureProducer`]. The `FutureProducer` doesn't rely on user-defined
//! callbacks to notify the delivery or failure of a message; instead, this
//! information will be returned in a Future. The `FutureProducer` also uses an
//! internal thread that is used for polling, which makes calling poll
//! explicitly not necessary. The returned future will contain information about
//! the delivered message in case of success, or a copy of the original message
//! in case of failure. Additional computation can be chained to the returned
//! future, and it will executed by the future executor once the value is
//! available (for more information, check the documentation of the futures
//! crate).
//!
//! ## Configuration
//!
//! ### Producer configuration
//!
//! For the configuration parameters common to both producers and consumers, refer to the
//! documentation in the `config` module. Here are listed the most commonly used producer
//! configuration. Click [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for the full list.
//! For the configuration parameters common to both producers and consumers,
//! refer to the documentation in the `config` module. Here are listed the most
//! commonly used producer configuration. Click
//! [here](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md)
//! for the full list.
//!
//! - `queue.buffering.max.messages` (100000): Maximum number of messages allowed on the producer queue.
//! - `queue.buffering.max.kbytes` (4000000): Maximum total message size sum allowed on the producer queue. This property has higher priority than queue.buffering.max.messages.
//! - `queue.buffering.max.ms` (0): Delay in milliseconds to wait for messages in the producer queue to accumulate before sending a request to the brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency.
//! - `message.send.max.retries` (2): How many times to retry sending a failing batch. Note: retrying may cause reordering.
//! - `compression.codec` (none): Compression codec to use for compressing message sets.
//! - `request.required.acks` (1): This field indicates how many acknowledgements the leader broker must receive from ISR brokers before responding to the request: 0=Broker does not send any response/ack to client, 1=Only the leader broker will need to ack the message, -1 or all=broker will block until message is committed by all in sync replicas (ISRs) or broker's in.sync.replicas setting before sending response.
//! - `request.timeout.ms` (5000): The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on request.required.acks being != 0.
//! - `message.timeout.ms` (300000): Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite.
//! - `queue.buffering.max.messages`: Maximum number of messages allowed on the
//! producer queue. Default: 100000.
//! - `queue.buffering.max.kbytes`: Maximum total message size sum allowed on
//! the producer queue. This property has higher priority than
//! queue.buffering.max.messages. Default: 4000000.
//! - `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in
//! the producer queue to accumulate before sending a request to the brokers.
//! A higher value allows larger and more effective (less overhead, improved
//! compression) batches of messages to accumulate at the expense of increased
//! message delivery latency. Default: 0.
//! - `message.send.max.retries`: How many times to retry sending a failing
//! batch. Note: retrying may cause reordering. Default: 2.
//! - `compression.codec`: Compression codec to use for compressing message
//! sets. Default: none.
//! - `request.required.acks`: This field indicates how many acknowledgements
//! the leader broker must receive from ISR brokers before responding to the
//! request: 0=Broker does not send any response/ack to client, 1=Only the
//! leader broker will need to ack the message, -1 or all=broker will block
//! until message is committed by all in sync replicas (ISRs) or broker's
//! in.sync.replicas setting before sending response. Default: 1.
//! - `request.timeout.ms`: The ack timeout of the producer request in
//! milliseconds. This value is only enforced by the broker and relies on
//! request.required.acks being != 0. Default: 5000.
//! - `message.timeout.ms`: Local message timeout. This value is only enforced
//! locally and limits the time a produced message waits for successful
//! delivery. A time of 0 is infinite. Default: 300000.
//!
pub mod base_producer;

View File

@ -1,3 +1,5 @@
//! Client and broker statistics.
// TODO: add documentation for all fields
#![allow(missing_docs)]

View File

@ -1,5 +1,6 @@
//! A data structure representing topic, partitions and offsets, compatible with the
//! `RDKafkaTopicPartitionList` exported by `rdkafka-sys`.
//! Data structures representing topic, partitions and offsets.
//!
//! Compatible with the `RDKafkaTopicPartitionList` exported by `rdkafka-sys`.
use std::collections::HashMap;
use std::ffi::{CStr, CString};
@ -19,7 +20,7 @@ const OFFSET_END: i64 = rdsys::RD_KAFKA_OFFSET_END as i64;
const OFFSET_STORED: i64 = rdsys::RD_KAFKA_OFFSET_STORED as i64;
const OFFSET_INVALID: i64 = rdsys::RD_KAFKA_OFFSET_INVALID as i64;
/// A librdkafka offset.
/// A Kafka offset.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum Offset {
/// Start consuming from the beginning of the partition.
@ -35,7 +36,8 @@ pub enum Offset {
}
impl Offset {
/// Converts the integer representation of an offset use by librdkafka to an `Offset`.
/// Converts the integer representation of an offset used by librdkafka to
/// an `Offset`.
pub fn from_raw(raw_offset: i64) -> Offset {
match raw_offset {
OFFSET_BEGINNING => Offset::Beginning,
@ -46,7 +48,8 @@ impl Offset {
}
}
/// Converts the `Offset` to the internal integer representation used by librdkafka.
/// Converts the `Offset` to the internal integer representation used by
/// librdkafka.
pub fn to_raw(&self) -> i64 {
match *self {
Offset::Beginning => OFFSET_BEGINNING,
@ -65,7 +68,7 @@ pub struct TopicPartitionListElem<'a> {
}
impl<'a> TopicPartitionListElem<'a> {
// _owner_list serves as a marker so that the lifetime isn't to long
// _owner_list serves as a marker so that the lifetime isn't too long
fn from_ptr(
_owner_list: &'a TopicPartitionList,
ptr: &'a mut RDKafkaTopicPartition,

View File

@ -1,4 +1,4 @@
//! Utility functions
//! Utility functions and types.
use std::ffi::CStr;
use std::fmt;
@ -15,8 +15,8 @@ use log::trace;
use rdkafka_sys as rdsys;
/// Return a tuple representing the version of `librdkafka` in
/// hexadecimal and string format.
/// Returns a tuple representing the version of `librdkafka` in hexadecimal and
/// string format.
pub fn get_rdkafka_version() -> (u16, String) {
let version_number = unsafe { rdsys::rd_kafka_version() } as u16;
let c_str = unsafe { CStr::from_ptr(rdsys::rd_kafka_version_str()) };
@ -67,12 +67,12 @@ impl From<Option<Duration>> for Timeout {
}
}
/// Converts a Duration into milliseconds
/// Converts a [`Duration`] into milliseconds.
pub fn duration_to_millis(duration: Duration) -> u64 {
duration.as_secs() * 1000 + u64::from(duration.subsec_nanos()) / 1_000_000
}
/// Converts the given time to milliseconds since unix epoch.
/// Converts the given time to the number of milliseconds since the Unix epoch.
pub fn millis_to_epoch(time: SystemTime) -> i64 {
duration_to_millis(
time.duration_since(UNIX_EPOCH)
@ -80,13 +80,13 @@ pub fn millis_to_epoch(time: SystemTime) -> i64 {
) as i64
}
/// Returns the current time in millis since unix epoch.
/// Returns the current time in milliseconds since the Unix epoch.
pub fn current_time_millis() -> i64 {
millis_to_epoch(SystemTime::now())
}
/// Converts a pointer to an array to an optional slice. If the pointer is null, `None` will
/// be returned.
/// Converts a pointer to an array to an optional slice. If the pointer is null,
/// returns `None`.
pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) -> Option<&'a [T]> {
if ptr.is_null() {
None
@ -95,8 +95,8 @@ pub(crate) unsafe fn ptr_to_opt_slice<'a, T>(ptr: *const c_void, size: usize) ->
}
}
/// Converts a pointer to an array to a slice. If the pointer is null or the size is zero,
/// a zero-length slice will be returned.
/// Converts a pointer to an array to a slice. If the pointer is null or the
/// size is zero, returns a zero-length slice..
pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a [T] {
if ptr.is_null() || size == 0 {
&[][..]
@ -105,11 +105,14 @@ pub(crate) unsafe fn ptr_to_slice<'a, T>(ptr: *const c_void, size: usize) -> &'a
}
}
/// A trait for the conversion of Rust data to raw pointers. This conversion is used
/// to pass opaque objects to the C library and vice versa.
/// Converts Rust data to and from raw pointers.
///
/// This conversion is used to pass opaque objects to the C library and vice
/// versa.
pub trait IntoOpaque: Send + Sync {
/// Converts the object into a raw pointer.
fn as_ptr(&self) -> *mut c_void;
/// Converts the raw pointer back to the original Rust object.
unsafe fn from_ptr(_: *mut c_void) -> Self;
}
@ -153,14 +156,14 @@ impl<T: Send + Sync> IntoOpaque for Arc<T> {
}
// TODO: check if the implementation returns a copy of the data and update the documentation
/// Converts a byte array representing a C string into a String.
/// Converts a byte array representing a C string into a [`String`].
pub unsafe fn bytes_cstr_to_owned(bytes_cstr: &[c_char]) -> String {
CStr::from_ptr(bytes_cstr.as_ptr() as *const c_char)
.to_string_lossy()
.into_owned()
}
/// Converts a C string into a String.
/// Converts a C string into a [`String`].
pub unsafe fn cstr_to_owned(cstr: *const c_char) -> String {
CStr::from_ptr(cstr as *const c_char)
.to_string_lossy()