Rework StreamConsumer API

This commit is contained in:
Nikhil Benesch 2021-01-03 14:05:06 -05:00
parent d772e0e8fe
commit 6fb2c37cd0
17 changed files with 329 additions and 201 deletions

View File

@ -35,30 +35,6 @@ regex = "1.1.6"
smol = { version = "0.1.7" }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "time"] }
[[example]]
name = "asynchronous_processing"
required-features = ["tokio"]
[[example]]
name = "at_least_once"
required-features = ["tokio"]
[[example]]
name = "roundtrip"
required-features = ["tokio"]
[[example]]
name = "simple_consumer"
required-features = ["tokio"]
[[test]]
name = "test_high_consumers"
required-features = ["tokio"]
[[test]]
name = "test_metadata"
required-features = ["tokio"]
# These features are re-exports of the features that the rdkafka-sys crate
# provides. See the rdkafka-sys documentation for details.
[features]

View File

@ -13,22 +13,57 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
rust-rdkafka map to types in librdkafka as directly as possible. The
maintainers apologize for the difficulty in upgrading through this change.
* Support calling `StreamConsumer::start` or its variants multiple times on the
same `StreamConsumer`.
* **Breaking changes.** Rework the consumer APIs to fix several bugs and design
warts:
* **Breaking change.** Remove the `no_message_error` parameter from
`StreamConsumer::start_with` and `StreamConsumer::start_with_runtime`.
Use a combinator like `tokio_stream::StreamExt::timeout` if you require the
old behavior.
* Rename `StreamConsumer::start` to `StreamConsumer::stream`, though the
former name will be retained as a deprecated alias for one release to ease
the transition. The new name better reflects that the method is a cheap
operation that can be called repeatedly and in multiple threads
simultaneously.
* **Breaking change.** Remove the `Consumer::get_base_consumer` method, as
accessing the `BaseConsumer` that underlied a `StreamConsumer` was dangerous.
* Remove the `StreamConsumer::start_with` and
`StreamConsumer::start_with_runtime` methods.
* **Breaking change.** Return an `&Arc<C>` from `Client::context` rather than an
`&C`. This is expected to cause very little breakage in practice.
There is no replacement in rust-rdkafka itself for the `no_message_error`
parameter. If you need this message, use a downstream combinator like
`tokio_stream::StreamExt::timeout`.
* **Breaking change.** Move the `BaseConsumer::context` method to the `Consumer`
trait, so that it is available when using the `StreamConsumer` as well.
There is no longer a need for the `poll_interval` parameter to these
methods. Message delivery is now entirely event driven, so no time-based
polling occurs.
To specify an `AsyncRuntime` besides the default, specify the desired
runtime type as the new `R` parameter of `StreamConsumer` when you create
it.
* Remove the `Consumer::get_base_consumer` method, as
accessing the `BaseConsumer` that underlied a `StreamConsumer` was
dangerous.
* Return an `&Arc<C>` from `Client::context` rather than an
`&C`. This is expected to cause very little breakage in practice.
* Move the `BaseConsumer::context` method to the `Consumer`
trait, so that it is available when using the `StreamConsumer` as well.
* Fix stalls when using multiple `MessageStream`s simultaneously.
Thanks to [@Marwes] for discovering the issue and contributing the initial
fix.
* Add a convenience method, `StreamConsumer::next`, to yield the next message
from a stream.
Thanks again to [@Marwes].
* Add a new implementation of `AsyncRuntime` called `NaiveRuntime` that does not
depend on Tokio.
This runtime has poor performance, but is necessary to make the crate compile
when the `tokio` feature is disabled.
[@Marwes]: https://github.com/Marwes
<a name="0.24.0"></a>
## 0.24.0 (2020-07-08)

View File

@ -80,7 +80,7 @@ async fn run_async_processor(
.expect("Producer creation error");
// Create the outer pipeline on the message stream.
let stream_processor = consumer.start().try_for_each(|borrowed_message| {
let stream_processor = consumer.stream().try_for_each(|borrowed_message| {
let producer = producer.clone();
let output_topic = output_topic.to_string();
async move {

View File

@ -15,7 +15,6 @@ use std::time::Duration;
use clap::{App, Arg};
use futures::future;
use futures::stream::StreamExt;
use log::{info, warn};
use rdkafka::client::ClientContext;
@ -142,10 +141,8 @@ async fn main() {
let consumer = create_consumer(brokers, group_id, input_topic);
let producer = create_producer(brokers);
let mut stream = consumer.start();
while let Some(message) = stream.next().await {
match message {
loop {
match consumer.next().await {
Err(e) => {
warn!("Kafka error: {}", e);
}

View File

@ -2,7 +2,6 @@ use std::convert::TryInto;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use clap::{App, Arg};
use futures::stream::StreamExt;
use hdrhistogram::Histogram;
use rdkafka::config::ClientConfig;
@ -81,11 +80,10 @@ async fn main() {
});
let start = Instant::now();
let mut stream = consumer.start();
let mut latencies = Histogram::<u64>::new(5).unwrap();
println!("Warming up for 10s...");
while let Some(message) = stream.next().await {
let message = message.unwrap();
loop {
let message = consumer.next().await.unwrap();
let then = message.timestamp().to_millis().unwrap();
if start.elapsed() < Duration::from_secs(10) {
// Warming up.

View File

@ -1,5 +1,4 @@
use clap::{App, Arg};
use futures::StreamExt;
use log::{info, warn};
use rdkafka::client::ClientContext;
@ -58,12 +57,8 @@ async fn consume_and_print(brokers: &str, group_id: &str, topics: &[&str]) {
.subscribe(&topics.to_vec())
.expect("Can't subscribe to specified topics");
// consumer.start() returns a stream. The stream can be used ot chain together expensive steps,
// such as complex computations on a thread pool or asynchronous IO.
let mut message_stream = consumer.start();
while let Some(message) = message_stream.next().await {
match message {
loop {
match consumer.next().await {
Err(e) => warn!("Kafka error: {}", e),
Ok(m) => {
let payload = match m.payload_view::<str>() {

View File

@ -83,7 +83,7 @@ fn main() {
process::exit(1);
}
let consumer: StreamConsumer = ClientConfig::new()
let consumer: StreamConsumer<_, SmolRuntime> = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
@ -93,7 +93,7 @@ fn main() {
.expect("Consumer creation failed");
consumer.subscribe(&[&topic]).unwrap();
let mut stream = consumer.start_with_runtime::<SmolRuntime>(Duration::from_millis(100));
let mut stream = consumer.stream();
let message = stream.next().await;
match message {
Some(Ok(message)) => println!(

View File

@ -136,6 +136,10 @@ impl ClientConfig {
}
}
pub(crate) fn get(&self, key: &str) -> Option<&str> {
self.conf_map.get(key).map(|val| val.as_str())
}
/// Sets a new parameter in the configuration.
pub fn set<'a>(&'a mut self, key: &str, value: &str) -> &'a mut ClientConfig {
self.conf_map.insert(key.to_string(), value.to_string());

View File

@ -1,4 +1,4 @@
//! Low level consumer wrapper.
//! Low-level consumers.
use std::cmp;
use std::ffi::CString;
@ -77,9 +77,10 @@ unsafe fn enable_nonempty_callback<C: ConsumerContext>(queue: &NativeQueue, cont
)
}
/// Low level wrapper around the librdkafka consumer. This consumer must be
/// periodically polled to make progress on rebalancing, callbacks and to
/// receive messages.
/// A low-level consumer that requires manual polling.
///
/// This consumer must be periodically polled to make progress on rebalancing,
/// callbacks and to receive messages.
pub struct BaseConsumer<C = DefaultConsumerContext>
where
C: ConsumerContext,

View File

@ -21,7 +21,9 @@ pub mod base_consumer;
pub mod stream_consumer;
// Re-exports.
#[doc(inline)]
pub use self::base_consumer::BaseConsumer;
#[doc(inline)]
pub use self::stream_consumer::{MessageStream, StreamConsumer};
/// Rebalance information.
@ -126,7 +128,7 @@ pub trait ConsumerContext: ClientContext {
// StreamConsumerContext as well.
}
/// An empty consumer context that can be user when no customizations are
/// An inert [`ConsumerContext`] that can be used when no customizations are
/// needed.
#[derive(Clone, Default)]
pub struct DefaultConsumerContext;
@ -134,8 +136,8 @@ pub struct DefaultConsumerContext;
impl ClientContext for DefaultConsumerContext {}
impl ConsumerContext for DefaultConsumerContext {}
/// Specifies if the commit should be performed synchronously
/// or asynchronously.
/// Specifies whether a commit should be performed synchronously or
/// asynchronously.
#[derive(Clone, Copy, Debug)]
pub enum CommitMode {
/// Synchronous commit.
@ -159,7 +161,8 @@ where
/// Returns the [`Client`] underlying this consumer.
fn client(&self) -> &Client<C>;
/// Returns a reference to [`Context`] used to create this consumer.
/// Returns a reference to the [`ConsumerContext`] used to create this
/// consumer.
fn context(&self) -> &Arc<C> {
self.client().context()
}

View File

@ -1,12 +1,16 @@
//! Stream-based consumer implementation.
//! High-level consumers with a [`Stream`](futures::Stream) interface.
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use std::time::Duration;
use futures::{ready, Stream};
use futures::channel::oneshot;
use futures::future::FutureExt;
use futures::select;
use futures::stream::{Stream, StreamExt};
use log::trace;
use slab::Slab;
use rdkafka_sys as rdsys;
@ -22,15 +26,15 @@ use crate::message::BorrowedMessage;
use crate::metadata::Metadata;
use crate::statistics::Statistics;
use crate::topic_partition_list::{Offset, TopicPartitionList};
#[cfg(feature = "tokio")]
use crate::util::TokioRuntime;
use crate::util::{AsyncRuntime, NativePtr, Timeout};
use crate::util::{AsyncRuntime, DefaultRuntime, NativePtr, Timeout};
/// The [`ConsumerContext`] used by the [`StreamConsumer`]. This context will
/// automatically wake up the message stream when new data is available.
/// A consumer context wrapper for a stream consumer.
///
/// This type is not intended to be used directly. It will be automatically
/// created by the `StreamConsumer` when necessary.
/// This context will automatically wake up the message stream when new data is
/// available.
///
/// This type is not intended to be used directly. The construction of a
/// `StreamConsumer` automatically wraps the underlying context in this type.
pub struct StreamConsumerContext<C>
where
C: ConsumerContext,
@ -49,6 +53,15 @@ where
wakers: Arc::new(Mutex::new(Slab::new())),
}
}
fn wake_all(&self) {
let mut wakers = self.wakers.lock().unwrap();
for (_, waker) in wakers.iter_mut() {
if let Some(waker) = waker.take() {
waker.wake();
}
}
}
}
impl<C> ClientContext for StreamConsumerContext<C>
@ -98,51 +111,33 @@ where
}
fn message_queue_nonempty_callback(&self) {
let mut wakers = self.wakers.lock().unwrap();
for (_, waker) in wakers.iter_mut() {
if let Some(waker) = waker.take() {
waker.wake();
}
}
self.wake_all();
self.inner.message_queue_nonempty_callback()
}
}
/// A Kafka consumer implementing [`futures::Stream`].
pub struct MessageStream<
'a,
C,
// Ugly, but this provides backwards compatibility when the `tokio` feature
// is enabled, as it is by default.
#[cfg(feature = "tokio")] R = TokioRuntime,
#[cfg(not(feature = "tokio"))] R,
> where
/// A stream of messages from a [`StreamConsumer`].
///
/// See the documentation of [`StreamConsumer::stream`] for details.
pub struct MessageStream<'a, C, R = DefaultRuntime>
where
C: ConsumerContext + 'static,
R: AsyncRuntime,
{
consumer: &'a StreamConsumer<C>,
interval: Duration,
delay: Pin<Box<Option<R::Delay>>>,
consumer: &'a StreamConsumer<C, R>,
slot: usize,
}
impl<'a, C, R> MessageStream<'a, C, R>
where
C: ConsumerContext + 'static,
R: AsyncRuntime,
{
fn new(consumer: &'a StreamConsumer<C>, interval: Duration) -> MessageStream<'a, C, R> {
fn new(consumer: &'a StreamConsumer<C, R>) -> MessageStream<'a, C, R> {
let slot = {
let context = consumer.base.context();
let mut wakers = context.wakers.lock().expect("lock poisoned");
wakers.insert(None)
};
MessageStream {
consumer,
interval,
delay: Box::pin(None),
slot,
}
MessageStream { consumer, slot }
}
fn context(&self) -> &StreamConsumerContext<C> {
@ -164,30 +159,15 @@ where
.map(|p| BorrowedMessage::from_consumer(p, self.consumer))
}
}
// SAFETY: All access to `self.delay` occurs via the following two
// functions. These functions are careful to never move out of `self.delay`.
// (They can *drop* the future stored in `self.delay`, but that is
// permitted.) They never return a non-pinned pointer to the contents of
// `self.delay`.
fn ensure_delay(&mut self, delay: R::Delay) -> Pin<&mut R::Delay> {
unsafe { Pin::new_unchecked(self.delay.as_mut().get_unchecked_mut().get_or_insert(delay)) }
}
fn clear_delay(&mut self) {
unsafe { *self.delay.as_mut().get_unchecked_mut() = None }
}
}
impl<'a, C, R> Stream for MessageStream<'a, C, R>
where
C: ConsumerContext + 'a,
R: AsyncRuntime,
{
type Item = KafkaResult<BorrowedMessage<'a>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Unconditionally store the waker so that we are woken up if the queue
// flips from non-empty to empty. We have to store the waker on every
// call to poll in case this future migrates between tasks. We also need
@ -197,15 +177,8 @@ where
self.set_waker(cx.waker().clone());
match self.poll() {
None => loop {
let delay = R::delay_for(self.interval);
ready!(self.ensure_delay(delay).poll(cx));
self.clear_delay();
},
Some(message) => {
self.clear_delay();
Poll::Ready(Some(message))
}
None => Poll::Pending,
Some(message) => Poll::Ready(Some(message)),
}
}
}
@ -213,7 +186,6 @@ where
impl<'a, C, R> Drop for MessageStream<'a, C, R>
where
C: ConsumerContext + 'static,
R: AsyncRuntime,
{
fn drop(&mut self) {
let mut wakers = self.context().wakers.lock().expect("lock poisoned");
@ -221,20 +193,143 @@ where
}
}
/// A Kafka consumer providing a [`futures::Stream`] interface.
/// A high-level consumer with a [`Stream`](futures::Stream) interface.
///
/// This consumer doesn't need to be polled explicitly since `await`ing the
/// stream returned by [`StreamConsumer::start`] will implicitly poll the
/// consumer.
/// This consumer doesn't need to be polled explicitly. Extracting an item from
/// the stream returned by the [`stream`](StreamConsumer::stream) will
/// implicitly poll the underlying Kafka consumer.
///
/// If you activate the consumer group protocol by calling
/// [`subscribe`](Consumer::subscribe), the stream consumer will integrate with
/// librdkafka's liveness detection as described in [KIP-62]. You must be sure
/// that you attempt to extract a message from the stream consumer at least
/// every `max.poll.interval.ms` milliseconds, or librdkafka will assume that
/// the processing thread is wedged and leave the consumer groups.
///
/// [KIP-62]: https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
#[must_use = "Consumer polling thread will stop immediately if unused"]
pub struct StreamConsumer<C = DefaultConsumerContext>
pub struct StreamConsumer<C = DefaultConsumerContext, R = DefaultRuntime>
where
C: ConsumerContext,
{
base: BaseConsumer<StreamConsumerContext<C>>,
_shutdown_trigger: oneshot::Sender<()>,
_runtime: PhantomData<R>,
}
impl<C> Consumer<StreamConsumerContext<C>> for StreamConsumer<C>
impl<R> FromClientConfig for StreamConsumer<DefaultConsumerContext, R>
where
R: AsyncRuntime,
{
fn from_config(config: &ClientConfig) -> KafkaResult<Self> {
StreamConsumer::from_config_and_context(config, DefaultConsumerContext)
}
}
/// Creates a new `StreamConsumer` starting from a [`ClientConfig`].
impl<C, R> FromClientConfigAndContext<C> for StreamConsumer<C, R>
where
C: ConsumerContext + 'static,
R: AsyncRuntime,
{
fn from_config_and_context(config: &ClientConfig, context: C) -> KafkaResult<Self> {
let context = StreamConsumerContext::new(context);
let base = BaseConsumer::from_config_and_context(config, context)?;
let native_ptr = base.client().native_ptr() as usize;
// Redirect rdkafka's main queue to the consumer queue so that we only
// need to listen to the consumer queue to observe events like
// rebalancings and stats.
unsafe { rdsys::rd_kafka_poll_set_consumer(base.client().native_ptr()) };
// We need to make sure we poll the consumer at least once every max
// poll interval, *unless* the processing task has wedged. To accomplish
// this, we launch a background task that sends spurious wakeup
// notifications at half the max poll interval. An unwedged processing
// task will wake up and poll the consumer with plenty of time to spare,
// while a wedged processing task will not.
//
// The default max poll interval is 5m, so there is essentially no
// performance impact to these spurious wakeups.
let (shutdown_trigger, shutdown_tripwire) = oneshot::channel();
let mut shutdown_tripwire = shutdown_tripwire.fuse();
let poll_interval = match config.get("max.poll.interval.ms") {
Some(millis) => {
let millis = millis.parse().expect("rdkafka validated config value");
Duration::from_millis(millis)
}
None => Duration::from_secs(300),
};
let context = base.context().clone();
R::spawn(async move {
trace!("Starting stream consumer wake loop: 0x{:x}", native_ptr);
loop {
select! {
_ = R::delay_for(poll_interval / 2).fuse() => context.wake_all(),
_ = shutdown_tripwire => break,
}
}
trace!("Shut down stream consumer wake loop: 0x{:x}", native_ptr);
});
Ok(StreamConsumer {
base,
_shutdown_trigger: shutdown_trigger,
_runtime: PhantomData,
})
}
}
impl<C, R> StreamConsumer<C, R>
where
C: ConsumerContext + 'static,
{
/// Constructs a stream that yields messages from this consumer.
///
/// It is legal to have multiple live message streams for the same consumer,
/// and to move those message streams across threads. Note, however, that
/// the message streams share the same underlying state. A message received
/// by the consumer will be delivered to only one of the live message
/// streams. If you seek the underlying consumer, all message streams
/// created from the consumer will begin to draw messages from the new
/// position of the consumer.
///
/// If you want multiple independent views of a Kafka topic, create multiple
/// consumers, not multiple message streams.
pub fn stream(&self) -> MessageStream<'_, C, R> {
MessageStream::new(self)
}
/// Constructs a stream that yields messages from this consumer.
#[deprecated = "use the more clearly named \"StreamConsumer::stream\" method instead"]
pub fn start(&self) -> MessageStream<'_, C, R> {
self.stream()
}
/// Yields the next message from the stream.
///
/// This method will block until the next message is available or an error
/// occurs. It is legal to call `next` from multiple threads simultaneously.
///
/// Note that this method is exactly as efficient as constructing a
/// single-use message stream and extracting one message from it:
///
/// ```
/// use futures::future::StreamExt;
///
/// # async fn example(consumer: StreamConsumer) {
/// consumer.stream().next().await.expect("MessageStream never returns None");
/// # }
/// ```
pub async fn next(&self) -> Result<BorrowedMessage<'_>, KafkaError> {
self.stream()
.next()
.await
.expect("kafka streams never terminate")
}
}
impl<C, R> Consumer<StreamConsumerContext<C>> for StreamConsumer<C, R>
where
C: ConsumerContext,
{
@ -380,57 +475,3 @@ where
self.base.resume(partitions)
}
}
impl FromClientConfig for StreamConsumer {
fn from_config(config: &ClientConfig) -> KafkaResult<StreamConsumer> {
StreamConsumer::from_config_and_context(config, DefaultConsumerContext)
}
}
/// Creates a new `StreamConsumer` starting from a [`ClientConfig`].
impl<C: ConsumerContext> FromClientConfigAndContext<C> for StreamConsumer<C> {
fn from_config_and_context(
config: &ClientConfig,
context: C,
) -> KafkaResult<StreamConsumer<C>> {
let context = StreamConsumerContext::new(context);
let stream_consumer = StreamConsumer {
base: BaseConsumer::from_config_and_context(config, context)?,
};
unsafe { rdsys::rd_kafka_poll_set_consumer(stream_consumer.base.client().native_ptr()) };
Ok(stream_consumer)
}
}
impl<C: ConsumerContext> StreamConsumer<C> {
/// Starts the stream consumer with default configuration (100ms polling
/// interval and no `NoMessageReceived` notifications).
///
/// **Note:** this method must be called from within the context of a Tokio
/// runtime.
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub fn start(&self) -> MessageStream<'_, C, TokioRuntime> {
self.start_with(Duration::from_millis(100))
}
/// Starts the stream consumer with the specified poll interval.
#[cfg(feature = "tokio")]
#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
pub fn start_with(&self, poll_interval: Duration) -> MessageStream<'_, C, TokioRuntime> {
// TODO: verify called once
self.start_with_runtime(poll_interval)
}
/// Like [`StreamConsumer::start_with`], but with a customizable
/// asynchronous runtime.
///
/// See the [`AsyncRuntime`] trait for the details on the interface the
/// runtime must satisfy.
pub fn start_with_runtime<R>(&self, poll_interval: Duration) -> MessageStream<'_, C, R>
where
R: AsyncRuntime,
{
MessageStream::new(self, poll_interval)
}
}

View File

@ -240,6 +240,7 @@
#![forbid(missing_docs)]
#![deny(rust_2018_idioms)]
#![allow(clippy::type_complexity)]
#![cfg_attr(docsrs, feature(doc_cfg))]
pub use rdkafka_sys::types;

View File

@ -10,8 +10,11 @@ use std::ptr;
use std::ptr::NonNull;
use std::slice;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::channel::oneshot;
use futures::future::{FutureExt, Map};
use log::trace;
use rdkafka_sys as rdsys;
@ -182,8 +185,13 @@ impl ErrBuf {
self.buf.as_mut_ptr() as *mut c_char
}
pub fn filled(&self) -> &[u8] {
let i = self.buf.iter().position(|c| *c == 0).unwrap();
&self.buf[..i + 1]
}
pub fn len(&self) -> usize {
self.buf.len()
self.filled().len()
}
}
@ -198,7 +206,7 @@ impl fmt::Display for ErrBuf {
write!(
f,
"{}",
CStr::from_bytes_with_nul(&self.buf)
CStr::from_bytes_with_nul(self.filled())
.unwrap()
.to_string_lossy()
)
@ -316,7 +324,7 @@ where
///
/// [smol]: https://docs.rs/smol
/// [smol_runtime]: https://github.com/fede1024/rust-rdkafka/tree/master/examples/smol_runtime.rs
pub trait AsyncRuntime {
pub trait AsyncRuntime: Send + Sync + 'static {
/// The type of the future returned by
/// [`delay_for`](AsyncRuntime::delay_for).
type Delay: Future<Output = ()> + Send;
@ -333,6 +341,48 @@ pub trait AsyncRuntime {
fn delay_for(duration: Duration) -> Self::Delay;
}
/// The default [`AsyncRuntime`] used when one is not explicitly specified.
///
/// This is defined to be the [`TokioRuntime`] when the `tokio` feature is
/// enabled, and the [`NaiveRuntime`] otherwise.
#[cfg(not(feature = "tokio"))]
pub type DefaultRuntime = NaiveRuntime;
/// The default [`AsyncRuntime`] used when one is not explicitly specified.
///
/// This is defined to be the [`TokioRuntime`] when the `tokio` feature is
/// enabled, and the [`NaiveRuntime`] otherwise.
#[cfg(feature = "tokio")]
pub type DefaultRuntime = TokioRuntime;
/// An [`AsyncRuntime`] implementation backed by the executor in the
/// [futures](futures) crate.
///
/// This runtime should not be used when performance is a concern, as it makes
/// heavy use of threads to compenstate for the lack of a timer in the futures
/// executor.
pub struct NaiveRuntime;
impl AsyncRuntime for NaiveRuntime {
type Delay = Map<oneshot::Receiver<()>, fn(Result<(), oneshot::Canceled>)>;
fn spawn<T>(task: T)
where
T: Future<Output = ()> + Send + 'static,
{
thread::spawn(|| futures::executor::block_on(task));
}
fn delay_for(duration: Duration) -> Self::Delay {
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
thread::sleep(duration);
tx.send(())
});
rx.map(|_| ())
}
}
/// An [`AsyncRuntime`] implementation backed by [Tokio](tokio).
///
/// This runtime is used by default throughout the crate, unless the `tokio`

View File

@ -13,6 +13,7 @@ use rdkafka::error::KafkaError;
use rdkafka::topic_partition_list::{Offset, TopicPartitionList};
use rdkafka::util::current_time_millis;
use rdkafka::{Message, Timestamp};
use rdkafka_sys::types::RDKafkaConfRes;
use crate::utils::*;
@ -30,16 +31,40 @@ fn create_stream_consumer(
create_stream_consumer_with_context(group_id, config_overrides, cons_context)
}
fn create_stream_consumer_with_context<C: ConsumerContext>(
fn create_stream_consumer_with_context<C>(
group_id: &str,
config_overrides: Option<HashMap<&str, &str>>,
context: C,
) -> StreamConsumer<C> {
) -> StreamConsumer<C>
where
C: ConsumerContext + 'static,
{
consumer_config(group_id, config_overrides)
.create_with_context(context)
.expect("Consumer creation failed")
}
#[tokio::test]
async fn test_invalid_max_poll_interval() {
let res: Result<StreamConsumer, _> = consumer_config(
&rand_test_group(),
Some(map!("max.poll.interval.ms" => "-1")),
)
.create();
match res {
Err(KafkaError::ClientConfig(RDKafkaConfRes::RD_KAFKA_CONF_INVALID, desc, key, value)) => {
assert_eq!(desc, "");
assert_eq!(key, "max.poll.interval.ms");
assert_eq!(value, "-1");
}
Ok(_) => panic!("invalid max poll interval configuration accepted"),
Err(e) => panic!(
"incorrect error returned for invalid max poll interval: {:?}",
e
),
}
}
// All produced messages should be consumed.
#[tokio::test(flavor = "multi_thread")]
async fn test_produce_consume_base() {
@ -52,7 +77,7 @@ async fn test_produce_consume_base() {
consumer.subscribe(&[topic_name.as_str()]).unwrap();
let _consumer_future = consumer
.start()
.stream()
.take(100)
.for_each(|message| {
match message {
@ -91,7 +116,7 @@ async fn test_produce_consume_base_concurrent() {
let consumer = consumer.clone();
tokio::spawn(async move {
consumer
.start_with(Duration::from_secs(60 * 60 * 24 * 365 /* 1 year */))
.stream()
.take(20)
.for_each(|message| match message {
Ok(_) => future::ready(()),
@ -125,7 +150,7 @@ async fn test_produce_consume_base_assign() {
let mut partition_count = vec![0, 0, 0];
let _consumer_future = consumer
.start()
.stream()
.take(19)
.for_each(|message| {
match message {
@ -151,7 +176,7 @@ async fn test_produce_consume_with_timestamp() {
consumer.subscribe(&[topic_name.as_str()]).unwrap();
let _consumer_future = consumer
.start()
.stream()
.take(100)
.for_each(|message| {
match message {
@ -193,7 +218,7 @@ async fn test_consumer_commit_message() {
consumer.subscribe(&[topic_name.as_str()]).unwrap();
let _consumer_future = consumer
.start()
.stream()
.take(33)
.for_each(|message| {
match message {
@ -256,7 +281,7 @@ async fn test_consumer_store_offset_commit() {
consumer.subscribe(&[topic_name.as_str()]).unwrap();
let _consumer_future = consumer
.start()
.stream()
.take(36)
.for_each(|message| {
match message {

View File

@ -325,11 +325,12 @@ fn test_fatal_errors() {
assert_eq!(producer.client().fatal_error(), None);
let msg = CString::new("fake error").unwrap();
unsafe {
rdkafka_sys::rd_kafka_test_fatal_error(
producer.client().native_ptr(),
RDKafkaRespErr::RD_KAFKA_RESP_ERR_OUT_OF_ORDER_SEQUENCE_NUMBER,
CString::new("fake error").unwrap().as_ptr(),
msg.as_ptr(),
);
}

View File

@ -96,7 +96,7 @@ async fn test_subscription() {
consumer.subscribe(&[topic_name.as_str()]).unwrap();
// Make sure the consumer joins the group.
let _consumer_future = consumer.start().next().await;
let _consumer_future = consumer.next().await;
let mut tpl = TopicPartitionList::new();
tpl.add_topic_unassigned(&topic_name);
@ -116,7 +116,7 @@ async fn test_group_membership() {
consumer.subscribe(&[topic_name.as_str()]).unwrap();
// Make sure the consumer joins the group.
let _consumer_future = consumer.start().next().await;
let _consumer_future = consumer.next().await;
let group_list = consumer
.fetch_group_list(None, Duration::from_secs(5))

View File

@ -17,6 +17,7 @@ use rdkafka::error::KafkaResult;
use rdkafka::message::ToBytes;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::statistics::Statistics;
use rdkafka::util::DefaultRuntime;
use rdkafka::TopicPartitionList;
#[macro_export]
@ -133,7 +134,7 @@ where
.map(|id| {
let future = async move {
producer
.send(
.send_with_runtime::<DefaultRuntime, _, _, _>(
FutureRecord {
topic: topic_name,
payload: Some(&value_fn(id)),