Compare commits

...

18 Commits

Author SHA1 Message Date
Greg Orzell f1246fe513 Merge branch 'master' of github.com:gorzell/rust-rdkafka into gorzell/add-timestamp 2024-01-30 11:57:24 +01:00
David Blewett e69c2aa40c
Merge pull request #587 from Magister/fix/native_config_get
Fix panic on getting config value from NativeConfig
2024-01-19 09:59:15 -05:00
David Blewett eacf17389e
Merge pull request #639 from Swatinem/to-string
Use `CStr::to_string_lossy` in Base{Consumer,Producer}
2024-01-19 09:58:34 -05:00
David Blewett bb76b5bd40 Release v0.36.2. 2024-01-16 17:00:49 -05:00
David Blewett ed293167f7
Merge pull request #646 from fede1024/davidblewett/kafka-36
Add Kafka 3.6 to the integration test matrix.
2024-01-16 14:18:59 -05:00
David Blewett 957aef1fb1 Add Kafka 3.6 to the integration test matrix. 2024-01-11 17:51:01 -05:00
David Blewett 85539acd38
Merge pull request #636 from getsentry/base-consumer
Pass BaseConsumer to ConsumerContext::rebalance
2024-01-11 17:36:05 -05:00
David Blewett 6154c84a7c Release v0.36.1. 2024-01-11 17:35:00 -05:00
David Blewett 9b44abacae
Merge pull request #644 from fede1024/scanterog/return-none-on-rebalance
Return back to the caller on rebalance events
2024-01-11 17:24:14 -05:00
Samuel Cantero d6ec6e1596 Return back to the caller on rebalance events
This does not affect the StreamConsumer or any other wrapper consumer.
It will only incur on an extra Poll call when there's a rebalance event.

When using bindings built upon the rust-rdkafka ffi, the caller is
responsible for initiating the rebalance calls (*assign).
If a high timeout is specified, the rebalance handler will only be
triggered once the timeout period has elapsed.

This fixes it by always returning on rebalance events except when
Timeout::Never. Poll calls with timeout::Never are expected to return
a message.
2024-01-11 16:35:43 -05:00
David Blewett 738590b9f0
Merge pull request #645 from fede1024/davidblewett/tune-docker-compose
Disable valgrind in CI temporarily
2024-01-11 16:34:48 -05:00
David Blewett 321c04078a Avoid topic pollution by prefixing with test name. 2024-01-11 15:30:10 -05:00
David Blewett e347f9aece Disable valgrind for now, and start up kafka/zk before starting building 2024-01-11 14:58:07 -05:00
David Blewett 8f37c49553 Use a short timeout instead of now_or_never. 2024-01-11 14:57:47 -05:00
Sebastian Zivota 95f6cd8431 Fix simple_consumer example 2024-01-08 11:46:57 +01:00
Arpad Borsos 15044da8d9
Use `CStr::to_string_lossy` in Base{Consumer,Producer}
In some error cases, the `Base{Consumer,Producer}` were eagerly copying strings,
and `unwrap`ing utf8 validation, just to print an error message.

This will avoid the allocation in the common case, and be panic-safe in the presumably unreachable case of invalid utf-8.
2023-12-19 12:30:11 +01:00
Sebastian Zivota c811175785 Pass BaseConsumer to ConsumerContext::rebalance 2023-11-28 16:43:56 +01:00
Misha Padalka 353812ff95 Fix panic on getting config value from NativeConfig
Kafka can return string with multiple \0 chars (seen on Windows x64),
and CStr::from_bytes_with_nul panics in that case.
String::from_utf8_lossy() handles that ok
2023-06-13 17:08:38 +03:00
19 changed files with 126 additions and 103 deletions

View File

@ -69,6 +69,8 @@ jobs:
fail-fast: false
matrix:
include:
- confluent-version: 7.5.1
kafka-version: 3.6
- confluent-version: 7.5.1
kafka-version: 3.5
- confluent-version: 5.3.1

2
Cargo.lock generated
View File

@ -1070,7 +1070,7 @@ dependencies = [
[[package]]
name = "rdkafka"
version = "0.36.0"
version = "0.36.2"
dependencies = [
"async-std",
"backoff",

View File

@ -1,6 +1,6 @@
[package]
name = "rdkafka"
version = "0.36.0"
version = "0.36.2"
authors = ["Federico Giraud <giraud.federico@gmail.com>"]
repository = "https://github.com/fede1024/rust-rdkafka"
readme = "README.md"

View File

@ -4,6 +4,11 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
## Unreleased
## 0.36.2 (2024-01-16)
* Update `BaseConsumer::poll` to return `None` when handling rebalance
or offset commit events.
## 0.36.0 (2023-11-08)
* Pivot the library from using librdkafka's callback interface to using

View File

@ -4,7 +4,7 @@ use log::{info, warn};
use rdkafka::client::ClientContext;
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance};
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance};
use rdkafka::error::KafkaResult;
use rdkafka::message::{Headers, Message};
use rdkafka::topic_partition_list::TopicPartitionList;
@ -22,11 +22,11 @@ struct CustomContext;
impl ClientContext for CustomContext {}
impl ConsumerContext for CustomContext {
fn pre_rebalance(&self, rebalance: &Rebalance) {
fn pre_rebalance(&self, _: &BaseConsumer<Self>, rebalance: &Rebalance) {
info!("Pre rebalance {:?}", rebalance);
}
fn post_rebalance(&self, rebalance: &Rebalance) {
fn post_rebalance(&self, _: &BaseConsumer<Self>, rebalance: &Rebalance) {
info!("Post rebalance {:?}", rebalance);
}

View File

@ -150,10 +150,7 @@ impl NativeClientConfig {
}
// Convert the C string to a Rust string.
Ok(CStr::from_bytes_with_nul(&buf)
.unwrap()
.to_string_lossy()
.into())
Ok(String::from_utf8_lossy(&buf).to_string())
}
}

View File

@ -11,7 +11,7 @@ use log::{error, warn};
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::client::{Client, NativeQueue};
use crate::client::{Client, NativeClient, NativeQueue};
use crate::config::{
ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
};
@ -144,17 +144,22 @@ where
}
rdsys::RD_KAFKA_EVENT_REBALANCE => {
self.handle_rebalance_event(event);
if timeout != Timeout::Never {
return None;
}
}
rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT => {
self.handle_offset_commit_event(event);
if timeout != Timeout::Never {
return None;
}
}
_ => {
let buf = unsafe {
let evname = unsafe {
let evname = rdsys::rd_kafka_event_name(event.ptr());
CStr::from_ptr(evname).to_bytes()
CStr::from_ptr(evname).to_string_lossy()
};
let evname = String::from_utf8(buf.to_vec()).unwrap();
warn!("Ignored event '{}' on consumer poll", evname);
warn!("Ignored event '{evname}' on consumer poll");
}
}
}
@ -188,17 +193,15 @@ where
// The TPL is owned by the Event and will be destroyed when the event is destroyed.
// Dropping it here will lead to double free.
let mut tpl = ManuallyDrop::new(tpl);
self.context()
.rebalance(self.client.native_client(), err, &mut tpl);
self.context().rebalance(self, err, &mut tpl);
}
_ => {
let buf = unsafe {
let err = unsafe {
let err_name =
rdsys::rd_kafka_err2name(rdsys::rd_kafka_event_error(event.ptr()));
CStr::from_ptr(err_name).to_bytes()
CStr::from_ptr(err_name).to_string_lossy()
};
let err = String::from_utf8(buf.to_vec()).unwrap();
warn!("invalid rebalance event: {:?}", err);
warn!("invalid rebalance event: {err}");
}
}
}
@ -353,6 +356,10 @@ where
pub fn closed(&self) -> bool {
unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) == 1 }
}
pub(crate) fn native_client(&self) -> &NativeClient {
self.client.native_client()
}
}
impl<C> Consumer<C> for BaseConsumer<C>

View File

@ -7,7 +7,7 @@ use std::time::Duration;
use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;
use crate::client::{Client, ClientContext, NativeClient};
use crate::client::{Client, ClientContext};
use crate::error::{KafkaError, KafkaResult};
use crate::groups::GroupList;
use crate::log::{error, trace};
@ -43,7 +43,7 @@ pub enum Rebalance<'a> {
/// be specified.
///
/// See also the [`ClientContext`] trait.
pub trait ConsumerContext: ClientContext {
pub trait ConsumerContext: ClientContext + Sized {
/// Implements the default rebalancing strategy and calls the
/// [`pre_rebalance`](ConsumerContext::pre_rebalance) and
/// [`post_rebalance`](ConsumerContext::post_rebalance) methods. If this
@ -51,7 +51,7 @@ pub trait ConsumerContext: ClientContext {
/// if needed.
fn rebalance(
&self,
native_client: &NativeClient,
base_consumer: &BaseConsumer<Self>,
err: RDKafkaRespErr,
tpl: &mut TopicPartitionList,
) {
@ -66,9 +66,10 @@ pub trait ConsumerContext: ClientContext {
};
trace!("Running pre-rebalance with {:?}", rebalance);
self.pre_rebalance(&rebalance);
self.pre_rebalance(base_consumer, &rebalance);
trace!("Running rebalance with {:?}", rebalance);
let native_client = base_consumer.native_client();
// Execute rebalance
unsafe {
match err {
@ -93,18 +94,18 @@ pub trait ConsumerContext: ClientContext {
}
}
trace!("Running post-rebalance with {:?}", rebalance);
self.post_rebalance(&rebalance);
self.post_rebalance(base_consumer, &rebalance);
}
/// Pre-rebalance callback. This method will run before the rebalance and
/// should terminate its execution quickly.
#[allow(unused_variables)]
fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
fn pre_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
/// Post-rebalance callback. This method will run after the rebalance and
/// should terminate its execution quickly.
#[allow(unused_variables)]
fn post_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
fn post_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
// TODO: convert pointer to structure
/// Post commit callback. This method will run after a group of offsets was

View File

@ -137,7 +137,7 @@ where
pub fn bootstrap_servers(&self) -> String {
let bootstrap =
unsafe { CStr::from_ptr(rdsys::rd_kafka_mock_cluster_bootstraps(self.mock_cluster)) };
bootstrap.to_string_lossy().to_string()
bootstrap.to_string_lossy().into_owned()
}
/// Clear the cluster's error state for the given ApiKey.

View File

@ -368,11 +368,10 @@ where
match evtype {
rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev),
_ => {
let buf = unsafe {
let evname = unsafe {
let evname = rdsys::rd_kafka_event_name(ev.ptr());
CStr::from_ptr(evname).to_bytes()
CStr::from_ptr(evname).to_string_lossy()
};
let evname = String::from_utf8(buf.to_vec()).unwrap();
warn!("Ignored event '{}' on base producer poll", evname);
}
}

View File

@ -29,34 +29,36 @@ run_with_valgrind() {
# Initialize.
git submodule update --init
cargo test --no-run
docker-compose up -d
cargo test
# Run unit tests.
echo_good "*** Run unit tests ***"
for test_file in target/debug/deps/rdkafka-*
do
if [[ -x "$test_file" ]]
then
echo_good "Executing "$test_file""
run_with_valgrind "$test_file"
fi
done
echo_good "*** Unit tests succeeded ***"
# Run integration tests.
echo_good "*** Run unit tests ***"
for test_file in target/debug/deps/test_*
do
if [[ -x "$test_file" ]]
then
echo_good "Executing "$test_file""
run_with_valgrind "$test_file"
fi
done
echo_good "*** Integration tests succeeded ***"
#echo_good "*** Run unit tests ***"
#for test_file in target/debug/deps/rdkafka-*
#do
# if [[ -x "$test_file" ]]
# then
# echo_good "Executing "$test_file""
# run_with_valgrind "$test_file"
# fi
#done
#echo_good "*** Unit tests succeeded ***"
#
## Run integration tests.
#
#echo_good "*** Run integration tests ***"
#for test_file in target/debug/deps/test_*
#do
# if [[ -x "$test_file" ]]
# then
# #echo_good "*** Restarting kafka/zk ***"
# #docker-compose restart --timeout 30
# echo_good "Executing "$test_file""
# run_with_valgrind "$test_file"
# fi
#done
#echo_good "*** Integration tests succeeded ***"
# Run smol runtime example.

View File

@ -32,7 +32,7 @@ fn create_admin_client() -> AdminClient<DefaultClientContext> {
async fn create_consumer_group(consumer_group_name: &str) {
let admin_client = create_admin_client();
let topic_name = &rand_test_topic();
let topic_name = &rand_test_topic(consumer_group_name);
let consumer: BaseConsumer = create_config()
.set("group.id", consumer_group_name.clone())
.create()
@ -124,8 +124,8 @@ async fn test_topics() {
// Verify that topics are created as specified, and that they can later
// be deleted.
{
let name1 = rand_test_topic();
let name2 = rand_test_topic();
let name1 = rand_test_topic("test_topics");
let name2 = rand_test_topic("test_topics");
// Test both the builder API and the literal construction.
let topic1 =
@ -254,7 +254,7 @@ async fn test_topics() {
// Verify that incorrect replication configurations are ignored when
// creating partitions.
{
let name = rand_test_topic();
let name = rand_test_topic("test_topics");
let topic = NewTopic::new(&name, 1, TopicReplication::Fixed(1));
let res = admin_client
@ -291,7 +291,7 @@ async fn test_topics() {
// Verify that deleting a non-existent topic fails.
{
let name = rand_test_topic();
let name = rand_test_topic("test_topics");
let res = admin_client
.delete_topics(&[&name], &opts)
.await
@ -305,8 +305,8 @@ async fn test_topics() {
// Verify that mixed-success operations properly report the successful and
// failing operators.
{
let name1 = rand_test_topic();
let name2 = rand_test_topic();
let name1 = rand_test_topic("test_topics");
let name2 = rand_test_topic("test_topics");
let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1));
let topic2 = NewTopic::new(&name2, 1, TopicReplication::Fixed(1));

View File

@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;
use futures::future::{self, FutureExt};
use futures::future;
use futures::stream::StreamExt;
use maplit::hashmap;
use rdkafka_sys::RDKafkaErrorCode;
@ -70,7 +70,7 @@ async fn test_produce_consume_base() {
let _r = env_logger::try_init();
let start_time = current_time_millis();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base");
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
let consumer = create_stream_consumer(&rand_test_group(), None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
@ -105,7 +105,7 @@ async fn test_produce_consume_base() {
async fn test_produce_consume_base_concurrent() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base_concurrent");
populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
let consumer = Arc::new(create_stream_consumer(&rand_test_group(), None));
@ -135,7 +135,7 @@ async fn test_produce_consume_base_concurrent() {
async fn test_produce_consume_base_assign() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base_assign");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
@ -170,7 +170,7 @@ async fn test_produce_consume_base_assign() {
async fn test_produce_consume_base_unassign() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base_unassign");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
@ -195,7 +195,7 @@ async fn test_produce_consume_base_unassign() {
async fn test_produce_consume_base_incremental_assign_and_unassign() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base_incremental_assign_and_unassign");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
@ -236,7 +236,7 @@ async fn test_produce_consume_base_incremental_assign_and_unassign() {
async fn test_produce_consume_with_timestamp() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_with_timestamp");
let message_map =
populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), Some(1111)).await;
let consumer = create_stream_consumer(&rand_test_group(), None);
@ -277,7 +277,7 @@ async fn test_produce_consume_with_timestamp() {
async fn test_consumer_commit_message() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consumer_commit_message");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await;
@ -355,7 +355,7 @@ async fn test_consumer_commit_message() {
async fn test_consumer_store_offset_commit() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consumer_store_offset_commit");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await;
@ -440,7 +440,7 @@ async fn test_consumer_store_offset_commit() {
async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
let _ = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consumer_commit_metadata");
let group_name = rand_test_group();
populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;
@ -491,11 +491,11 @@ async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
Ok(())
}
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_consume_partition_order() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consume_partition_order");
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(2), None).await;
@ -545,8 +545,8 @@ async fn test_consume_partition_order() {
let partition1 = consumer.split_partition_queue(&topic_name, 1).unwrap();
let mut i = 0;
while i < 12 {
if let Some(m) = consumer.recv().now_or_never() {
while i < 5 {
if let Ok(m) = time::timeout(Duration::from_millis(1000), consumer.recv()).await {
// retry on transient errors until we get a message
let m = match m {
Err(KafkaError::MessageConsumption(
@ -564,9 +564,11 @@ async fn test_consume_partition_order() {
let partition: i32 = m.partition();
assert!(partition == 0 || partition == 2);
i += 1;
} else {
panic!("Timeout receiving message");
}
if let Some(m) = partition1.recv().now_or_never() {
if let Ok(m) = time::timeout(Duration::from_millis(1000), partition1.recv()).await {
// retry on transient errors until we get a message
let m = match m {
Err(KafkaError::MessageConsumption(
@ -583,6 +585,8 @@ async fn test_consume_partition_order() {
};
assert_eq!(m.partition(), 1);
i += 1;
} else {
panic!("Timeout receiving message");
}
}
}

View File

@ -30,7 +30,7 @@ fn future_producer(config_overrides: HashMap<&str, &str>) -> FutureProducer<Defa
#[tokio::test]
async fn test_future_producer_send() {
let producer = future_producer(HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_future_producer_send");
let results: FuturesUnordered<_> = (0..10)
.map(|_| {
@ -60,7 +60,7 @@ async fn test_future_producer_send_full() {
config.insert("message.timeout.ms", "5000");
config.insert("queue.buffering.max.messages", "1");
let producer = &future_producer(config);
let topic_name = &rand_test_topic();
let topic_name = &rand_test_topic("test_future_producer_send_full");
// Fill up the queue.
producer

View File

@ -31,7 +31,7 @@ fn create_base_consumer(
async fn test_produce_consume_seek() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_seek");
populate_topic(&topic_name, 5, &value_fn, &key_fn, Some(0), None).await;
let consumer = create_base_consumer(&rand_test_group(), None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
@ -96,7 +96,7 @@ async fn test_produce_consume_seek() {
async fn test_produce_consume_seek_partitions() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_seek_partitions");
populate_topic(&topic_name, 30, &value_fn, &key_fn, None, None).await;
let consumer = create_base_consumer(&rand_test_group(), None);
@ -158,7 +158,7 @@ async fn test_produce_consume_iter() {
let _r = env_logger::try_init();
let start_time = current_time_millis();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_iter");
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
let consumer = create_base_consumer(&rand_test_group(), None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
@ -196,7 +196,7 @@ async fn test_pause_resume_consumer_iter() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_pause_resume_consumer_iter");
populate_topic(
&topic_name,
MESSAGE_COUNT,
@ -237,7 +237,7 @@ async fn test_pause_resume_consumer_iter() {
async fn test_consume_partition_order() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consume_partition_order");
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(2), None).await;
@ -357,7 +357,7 @@ async fn test_consume_partition_order() {
async fn test_produce_consume_message_queue_nonempty_callback() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_message_queue_nonempty_callback");
create_topic(&topic_name, 1).await;

View File

@ -191,7 +191,7 @@ where
#[test]
fn test_base_producer_queue_full() {
let producer = base_producer(hashmap! { "queue.buffering.max.messages" => "10" });
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_base_producer_queue_full");
let results = (0..30)
.map(|id| {
@ -235,7 +235,7 @@ fn test_base_producer_timeout() {
"bootstrap.servers" => "1.2.3.4"
},
);
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_base_producer_timeout");
let results_count = (0..10)
.map(|id| {
@ -346,7 +346,7 @@ fn test_base_producer_headers() {
ids: ids_set.clone(),
};
let producer = base_producer_with_context(context, HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_base_producer_headers");
let results_count = (0..10)
.map(|id| {
@ -387,7 +387,7 @@ fn test_base_producer_headers() {
fn test_threaded_producer_send() {
let context = CollectingContext::new();
let producer = threaded_producer_with_context(context.clone(), HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_threaded_producer_send");
let results_count = (0..10)
.map(|id| {
@ -431,7 +431,7 @@ fn test_base_producer_opaque_arc() -> Result<(), Box<dyn Error>> {
let shared_count = Arc::new(Mutex::new(0));
let context = OpaqueArcContext {};
let producer = base_producer_with_context(context, HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_base_producer_opaque_arc");
let results_count = (0..10)
.map(|_| {
@ -482,7 +482,13 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() {
let producer = base_producer_with_context(context.clone(), config_overrides);
producer
.send(BaseRecord::<(), str, usize>::with_opaque_to(&rand_test_topic(), 0).payload(""))
.send(
BaseRecord::<(), str, usize>::with_opaque_to(
&rand_test_topic("test_register_custom_partitioner_linger_non_zero_key_null"),
0,
)
.payload(""),
)
.unwrap();
producer.flush(Duration::from_secs(10)).unwrap();
@ -499,7 +505,7 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() {
fn test_custom_partitioner_base_producer() {
let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2));
let producer = base_producer_with_context(context.clone(), HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_custom_partitioner_base_producer");
let results_count = (0..10)
.map(|id| {
@ -527,7 +533,7 @@ fn test_custom_partitioner_base_producer() {
fn test_custom_partitioner_threaded_producer() {
let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2));
let producer = threaded_producer_with_context(context.clone(), HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_custom_partitioner_threaded_producer");
let results_count = (0..10)
.map(|id| {

View File

@ -31,7 +31,7 @@ fn create_consumer(group_id: &str) -> StreamConsumer {
async fn test_metadata() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_metadata");
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None).await;
@ -92,7 +92,7 @@ async fn test_metadata() {
async fn test_subscription() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_subscription");
populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;
let consumer = create_consumer(&rand_test_group());
consumer.subscribe(&[topic_name.as_str()]).unwrap();
@ -109,7 +109,7 @@ async fn test_subscription() {
async fn test_group_membership() {
let _r = env_logger::try_init();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_group_membership");
let group_name = rand_test_group();
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await;

View File

@ -64,8 +64,8 @@ fn count_records(topic: &str, iso: IsolationLevel) -> Result<usize, KafkaError>
#[tokio::test]
async fn test_transaction_abort() -> Result<(), Box<dyn Error>> {
let consume_topic = rand_test_topic();
let produce_topic = rand_test_topic();
let consume_topic = rand_test_topic("test_transaction_abort");
let produce_topic = rand_test_topic("test_transaction_abort");
populate_topic(&consume_topic, 30, &value_fn, &key_fn, Some(0), None).await;
@ -132,8 +132,8 @@ async fn test_transaction_abort() -> Result<(), Box<dyn Error>> {
#[tokio::test]
async fn test_transaction_commit() -> Result<(), Box<dyn Error>> {
let consume_topic = rand_test_topic();
let produce_topic = rand_test_topic();
let consume_topic = rand_test_topic("test_transaction_commit");
let produce_topic = rand_test_topic("test_transaction_commit");
populate_topic(&consume_topic, 30, &value_fn, &key_fn, Some(0), None).await;

View File

@ -17,12 +17,12 @@ use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::statistics::Statistics;
use rdkafka::TopicPartitionList;
pub fn rand_test_topic() -> String {
pub fn rand_test_topic(test_name: &str) -> String {
let id = rand::thread_rng()
.gen_ascii_chars()
.take(10)
.collect::<String>();
format!("__test_{}", id)
format!("__{}_{}", test_name, id)
}
pub fn rand_test_group() -> String {
@ -170,7 +170,7 @@ mod tests {
#[tokio::test]
async fn test_populate_topic() {
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_populate_topic");
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await;
let total_messages = message_map