mirror of https://github.com/fede1024/rust-rdkafka
Compare commits
18 Commits
129b9ea00a
...
f1246fe513
Author | SHA1 | Date |
---|---|---|
Greg Orzell | f1246fe513 | |
David Blewett | e69c2aa40c | |
David Blewett | eacf17389e | |
David Blewett | bb76b5bd40 | |
David Blewett | ed293167f7 | |
David Blewett | 957aef1fb1 | |
David Blewett | 85539acd38 | |
David Blewett | 6154c84a7c | |
David Blewett | 9b44abacae | |
Samuel Cantero | d6ec6e1596 | |
David Blewett | 738590b9f0 | |
David Blewett | 321c04078a | |
David Blewett | e347f9aece | |
David Blewett | 8f37c49553 | |
Sebastian Zivota | 95f6cd8431 | |
Arpad Borsos | 15044da8d9 | |
Sebastian Zivota | c811175785 | |
Misha Padalka | 353812ff95 |
|
@ -69,6 +69,8 @@ jobs:
|
|||
fail-fast: false
|
||||
matrix:
|
||||
include:
|
||||
- confluent-version: 7.5.1
|
||||
kafka-version: 3.6
|
||||
- confluent-version: 7.5.1
|
||||
kafka-version: 3.5
|
||||
- confluent-version: 5.3.1
|
||||
|
|
|
@ -1070,7 +1070,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rdkafka"
|
||||
version = "0.36.0"
|
||||
version = "0.36.2"
|
||||
dependencies = [
|
||||
"async-std",
|
||||
"backoff",
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
[package]
|
||||
name = "rdkafka"
|
||||
version = "0.36.0"
|
||||
version = "0.36.2"
|
||||
authors = ["Federico Giraud <giraud.federico@gmail.com>"]
|
||||
repository = "https://github.com/fede1024/rust-rdkafka"
|
||||
readme = "README.md"
|
||||
|
|
|
@ -4,6 +4,11 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).
|
|||
|
||||
## Unreleased
|
||||
|
||||
## 0.36.2 (2024-01-16)
|
||||
|
||||
* Update `BaseConsumer::poll` to return `None` when handling rebalance
|
||||
or offset commit events.
|
||||
|
||||
## 0.36.0 (2023-11-08)
|
||||
|
||||
* Pivot the library from using librdkafka's callback interface to using
|
||||
|
|
|
@ -4,7 +4,7 @@ use log::{info, warn};
|
|||
use rdkafka::client::ClientContext;
|
||||
use rdkafka::config::{ClientConfig, RDKafkaLogLevel};
|
||||
use rdkafka::consumer::stream_consumer::StreamConsumer;
|
||||
use rdkafka::consumer::{CommitMode, Consumer, ConsumerContext, Rebalance};
|
||||
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance};
|
||||
use rdkafka::error::KafkaResult;
|
||||
use rdkafka::message::{Headers, Message};
|
||||
use rdkafka::topic_partition_list::TopicPartitionList;
|
||||
|
@ -22,11 +22,11 @@ struct CustomContext;
|
|||
impl ClientContext for CustomContext {}
|
||||
|
||||
impl ConsumerContext for CustomContext {
|
||||
fn pre_rebalance(&self, rebalance: &Rebalance) {
|
||||
fn pre_rebalance(&self, _: &BaseConsumer<Self>, rebalance: &Rebalance) {
|
||||
info!("Pre rebalance {:?}", rebalance);
|
||||
}
|
||||
|
||||
fn post_rebalance(&self, rebalance: &Rebalance) {
|
||||
fn post_rebalance(&self, _: &BaseConsumer<Self>, rebalance: &Rebalance) {
|
||||
info!("Post rebalance {:?}", rebalance);
|
||||
}
|
||||
|
||||
|
|
|
@ -150,10 +150,7 @@ impl NativeClientConfig {
|
|||
}
|
||||
|
||||
// Convert the C string to a Rust string.
|
||||
Ok(CStr::from_bytes_with_nul(&buf)
|
||||
.unwrap()
|
||||
.to_string_lossy()
|
||||
.into())
|
||||
Ok(String::from_utf8_lossy(&buf).to_string())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ use log::{error, warn};
|
|||
use rdkafka_sys as rdsys;
|
||||
use rdkafka_sys::types::*;
|
||||
|
||||
use crate::client::{Client, NativeQueue};
|
||||
use crate::client::{Client, NativeClient, NativeQueue};
|
||||
use crate::config::{
|
||||
ClientConfig, FromClientConfig, FromClientConfigAndContext, NativeClientConfig,
|
||||
};
|
||||
|
@ -144,17 +144,22 @@ where
|
|||
}
|
||||
rdsys::RD_KAFKA_EVENT_REBALANCE => {
|
||||
self.handle_rebalance_event(event);
|
||||
if timeout != Timeout::Never {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
rdsys::RD_KAFKA_EVENT_OFFSET_COMMIT => {
|
||||
self.handle_offset_commit_event(event);
|
||||
if timeout != Timeout::Never {
|
||||
return None;
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
let buf = unsafe {
|
||||
let evname = unsafe {
|
||||
let evname = rdsys::rd_kafka_event_name(event.ptr());
|
||||
CStr::from_ptr(evname).to_bytes()
|
||||
CStr::from_ptr(evname).to_string_lossy()
|
||||
};
|
||||
let evname = String::from_utf8(buf.to_vec()).unwrap();
|
||||
warn!("Ignored event '{}' on consumer poll", evname);
|
||||
warn!("Ignored event '{evname}' on consumer poll");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -188,17 +193,15 @@ where
|
|||
// The TPL is owned by the Event and will be destroyed when the event is destroyed.
|
||||
// Dropping it here will lead to double free.
|
||||
let mut tpl = ManuallyDrop::new(tpl);
|
||||
self.context()
|
||||
.rebalance(self.client.native_client(), err, &mut tpl);
|
||||
self.context().rebalance(self, err, &mut tpl);
|
||||
}
|
||||
_ => {
|
||||
let buf = unsafe {
|
||||
let err = unsafe {
|
||||
let err_name =
|
||||
rdsys::rd_kafka_err2name(rdsys::rd_kafka_event_error(event.ptr()));
|
||||
CStr::from_ptr(err_name).to_bytes()
|
||||
CStr::from_ptr(err_name).to_string_lossy()
|
||||
};
|
||||
let err = String::from_utf8(buf.to_vec()).unwrap();
|
||||
warn!("invalid rebalance event: {:?}", err);
|
||||
warn!("invalid rebalance event: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -353,6 +356,10 @@ where
|
|||
pub fn closed(&self) -> bool {
|
||||
unsafe { rdsys::rd_kafka_consumer_closed(self.client.native_ptr()) == 1 }
|
||||
}
|
||||
|
||||
pub(crate) fn native_client(&self) -> &NativeClient {
|
||||
self.client.native_client()
|
||||
}
|
||||
}
|
||||
|
||||
impl<C> Consumer<C> for BaseConsumer<C>
|
||||
|
|
|
@ -7,7 +7,7 @@ use std::time::Duration;
|
|||
use rdkafka_sys as rdsys;
|
||||
use rdkafka_sys::types::*;
|
||||
|
||||
use crate::client::{Client, ClientContext, NativeClient};
|
||||
use crate::client::{Client, ClientContext};
|
||||
use crate::error::{KafkaError, KafkaResult};
|
||||
use crate::groups::GroupList;
|
||||
use crate::log::{error, trace};
|
||||
|
@ -43,7 +43,7 @@ pub enum Rebalance<'a> {
|
|||
/// be specified.
|
||||
///
|
||||
/// See also the [`ClientContext`] trait.
|
||||
pub trait ConsumerContext: ClientContext {
|
||||
pub trait ConsumerContext: ClientContext + Sized {
|
||||
/// Implements the default rebalancing strategy and calls the
|
||||
/// [`pre_rebalance`](ConsumerContext::pre_rebalance) and
|
||||
/// [`post_rebalance`](ConsumerContext::post_rebalance) methods. If this
|
||||
|
@ -51,7 +51,7 @@ pub trait ConsumerContext: ClientContext {
|
|||
/// if needed.
|
||||
fn rebalance(
|
||||
&self,
|
||||
native_client: &NativeClient,
|
||||
base_consumer: &BaseConsumer<Self>,
|
||||
err: RDKafkaRespErr,
|
||||
tpl: &mut TopicPartitionList,
|
||||
) {
|
||||
|
@ -66,9 +66,10 @@ pub trait ConsumerContext: ClientContext {
|
|||
};
|
||||
|
||||
trace!("Running pre-rebalance with {:?}", rebalance);
|
||||
self.pre_rebalance(&rebalance);
|
||||
self.pre_rebalance(base_consumer, &rebalance);
|
||||
|
||||
trace!("Running rebalance with {:?}", rebalance);
|
||||
let native_client = base_consumer.native_client();
|
||||
// Execute rebalance
|
||||
unsafe {
|
||||
match err {
|
||||
|
@ -93,18 +94,18 @@ pub trait ConsumerContext: ClientContext {
|
|||
}
|
||||
}
|
||||
trace!("Running post-rebalance with {:?}", rebalance);
|
||||
self.post_rebalance(&rebalance);
|
||||
self.post_rebalance(base_consumer, &rebalance);
|
||||
}
|
||||
|
||||
/// Pre-rebalance callback. This method will run before the rebalance and
|
||||
/// should terminate its execution quickly.
|
||||
#[allow(unused_variables)]
|
||||
fn pre_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
|
||||
fn pre_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
|
||||
|
||||
/// Post-rebalance callback. This method will run after the rebalance and
|
||||
/// should terminate its execution quickly.
|
||||
#[allow(unused_variables)]
|
||||
fn post_rebalance<'a>(&self, rebalance: &Rebalance<'a>) {}
|
||||
fn post_rebalance<'a>(&self, base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance<'a>) {}
|
||||
|
||||
// TODO: convert pointer to structure
|
||||
/// Post commit callback. This method will run after a group of offsets was
|
||||
|
|
|
@ -137,7 +137,7 @@ where
|
|||
pub fn bootstrap_servers(&self) -> String {
|
||||
let bootstrap =
|
||||
unsafe { CStr::from_ptr(rdsys::rd_kafka_mock_cluster_bootstraps(self.mock_cluster)) };
|
||||
bootstrap.to_string_lossy().to_string()
|
||||
bootstrap.to_string_lossy().into_owned()
|
||||
}
|
||||
|
||||
/// Clear the cluster's error state for the given ApiKey.
|
||||
|
|
|
@ -368,11 +368,10 @@ where
|
|||
match evtype {
|
||||
rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev),
|
||||
_ => {
|
||||
let buf = unsafe {
|
||||
let evname = unsafe {
|
||||
let evname = rdsys::rd_kafka_event_name(ev.ptr());
|
||||
CStr::from_ptr(evname).to_bytes()
|
||||
CStr::from_ptr(evname).to_string_lossy()
|
||||
};
|
||||
let evname = String::from_utf8(buf.to_vec()).unwrap();
|
||||
warn!("Ignored event '{}' on base producer poll", evname);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,34 +29,36 @@ run_with_valgrind() {
|
|||
# Initialize.
|
||||
|
||||
git submodule update --init
|
||||
cargo test --no-run
|
||||
docker-compose up -d
|
||||
cargo test
|
||||
|
||||
# Run unit tests.
|
||||
|
||||
echo_good "*** Run unit tests ***"
|
||||
for test_file in target/debug/deps/rdkafka-*
|
||||
do
|
||||
if [[ -x "$test_file" ]]
|
||||
then
|
||||
echo_good "Executing "$test_file""
|
||||
run_with_valgrind "$test_file"
|
||||
fi
|
||||
done
|
||||
echo_good "*** Unit tests succeeded ***"
|
||||
|
||||
# Run integration tests.
|
||||
|
||||
echo_good "*** Run unit tests ***"
|
||||
for test_file in target/debug/deps/test_*
|
||||
do
|
||||
if [[ -x "$test_file" ]]
|
||||
then
|
||||
echo_good "Executing "$test_file""
|
||||
run_with_valgrind "$test_file"
|
||||
fi
|
||||
done
|
||||
echo_good "*** Integration tests succeeded ***"
|
||||
#echo_good "*** Run unit tests ***"
|
||||
#for test_file in target/debug/deps/rdkafka-*
|
||||
#do
|
||||
# if [[ -x "$test_file" ]]
|
||||
# then
|
||||
# echo_good "Executing "$test_file""
|
||||
# run_with_valgrind "$test_file"
|
||||
# fi
|
||||
#done
|
||||
#echo_good "*** Unit tests succeeded ***"
|
||||
#
|
||||
## Run integration tests.
|
||||
#
|
||||
#echo_good "*** Run integration tests ***"
|
||||
#for test_file in target/debug/deps/test_*
|
||||
#do
|
||||
# if [[ -x "$test_file" ]]
|
||||
# then
|
||||
# #echo_good "*** Restarting kafka/zk ***"
|
||||
# #docker-compose restart --timeout 30
|
||||
# echo_good "Executing "$test_file""
|
||||
# run_with_valgrind "$test_file"
|
||||
# fi
|
||||
#done
|
||||
#echo_good "*** Integration tests succeeded ***"
|
||||
|
||||
# Run smol runtime example.
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ fn create_admin_client() -> AdminClient<DefaultClientContext> {
|
|||
|
||||
async fn create_consumer_group(consumer_group_name: &str) {
|
||||
let admin_client = create_admin_client();
|
||||
let topic_name = &rand_test_topic();
|
||||
let topic_name = &rand_test_topic(consumer_group_name);
|
||||
let consumer: BaseConsumer = create_config()
|
||||
.set("group.id", consumer_group_name.clone())
|
||||
.create()
|
||||
|
@ -124,8 +124,8 @@ async fn test_topics() {
|
|||
// Verify that topics are created as specified, and that they can later
|
||||
// be deleted.
|
||||
{
|
||||
let name1 = rand_test_topic();
|
||||
let name2 = rand_test_topic();
|
||||
let name1 = rand_test_topic("test_topics");
|
||||
let name2 = rand_test_topic("test_topics");
|
||||
|
||||
// Test both the builder API and the literal construction.
|
||||
let topic1 =
|
||||
|
@ -254,7 +254,7 @@ async fn test_topics() {
|
|||
// Verify that incorrect replication configurations are ignored when
|
||||
// creating partitions.
|
||||
{
|
||||
let name = rand_test_topic();
|
||||
let name = rand_test_topic("test_topics");
|
||||
let topic = NewTopic::new(&name, 1, TopicReplication::Fixed(1));
|
||||
|
||||
let res = admin_client
|
||||
|
@ -291,7 +291,7 @@ async fn test_topics() {
|
|||
|
||||
// Verify that deleting a non-existent topic fails.
|
||||
{
|
||||
let name = rand_test_topic();
|
||||
let name = rand_test_topic("test_topics");
|
||||
let res = admin_client
|
||||
.delete_topics(&[&name], &opts)
|
||||
.await
|
||||
|
@ -305,8 +305,8 @@ async fn test_topics() {
|
|||
// Verify that mixed-success operations properly report the successful and
|
||||
// failing operators.
|
||||
{
|
||||
let name1 = rand_test_topic();
|
||||
let name2 = rand_test_topic();
|
||||
let name1 = rand_test_topic("test_topics");
|
||||
let name2 = rand_test_topic("test_topics");
|
||||
|
||||
let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1));
|
||||
let topic2 = NewTopic::new(&name2, 1, TopicReplication::Fixed(1));
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::collections::HashMap;
|
|||
use std::error::Error;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::future::{self, FutureExt};
|
||||
use futures::future;
|
||||
use futures::stream::StreamExt;
|
||||
use maplit::hashmap;
|
||||
use rdkafka_sys::RDKafkaErrorCode;
|
||||
|
@ -70,7 +70,7 @@ async fn test_produce_consume_base() {
|
|||
let _r = env_logger::try_init();
|
||||
|
||||
let start_time = current_time_millis();
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_produce_consume_base");
|
||||
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
|
||||
let consumer = create_stream_consumer(&rand_test_group(), None);
|
||||
consumer.subscribe(&[topic_name.as_str()]).unwrap();
|
||||
|
@ -105,7 +105,7 @@ async fn test_produce_consume_base() {
|
|||
async fn test_produce_consume_base_concurrent() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_produce_consume_base_concurrent");
|
||||
populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
|
||||
|
||||
let consumer = Arc::new(create_stream_consumer(&rand_test_group(), None));
|
||||
|
@ -135,7 +135,7 @@ async fn test_produce_consume_base_concurrent() {
|
|||
async fn test_produce_consume_base_assign() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_produce_consume_base_assign");
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
|
||||
|
@ -170,7 +170,7 @@ async fn test_produce_consume_base_assign() {
|
|||
async fn test_produce_consume_base_unassign() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_produce_consume_base_unassign");
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
|
||||
|
@ -195,7 +195,7 @@ async fn test_produce_consume_base_unassign() {
|
|||
async fn test_produce_consume_base_incremental_assign_and_unassign() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_produce_consume_base_incremental_assign_and_unassign");
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
|
||||
|
@ -236,7 +236,7 @@ async fn test_produce_consume_base_incremental_assign_and_unassign() {
|
|||
async fn test_produce_consume_with_timestamp() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_produce_consume_with_timestamp");
|
||||
let message_map =
|
||||
populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), Some(1111)).await;
|
||||
let consumer = create_stream_consumer(&rand_test_group(), None);
|
||||
|
@ -277,7 +277,7 @@ async fn test_produce_consume_with_timestamp() {
|
|||
async fn test_consumer_commit_message() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_consumer_commit_message");
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
|
||||
populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await;
|
||||
populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await;
|
||||
|
@ -355,7 +355,7 @@ async fn test_consumer_commit_message() {
|
|||
async fn test_consumer_store_offset_commit() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_consumer_store_offset_commit");
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
|
||||
populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await;
|
||||
populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await;
|
||||
|
@ -440,7 +440,7 @@ async fn test_consumer_store_offset_commit() {
|
|||
async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
|
||||
let _ = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_consumer_commit_metadata");
|
||||
let group_name = rand_test_group();
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;
|
||||
|
||||
|
@ -491,11 +491,11 @@ async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[tokio::test(flavor = "multi_thread")]
|
||||
async fn test_consume_partition_order() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_consume_partition_order");
|
||||
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(0), None).await;
|
||||
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(1), None).await;
|
||||
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(2), None).await;
|
||||
|
@ -545,8 +545,8 @@ async fn test_consume_partition_order() {
|
|||
let partition1 = consumer.split_partition_queue(&topic_name, 1).unwrap();
|
||||
|
||||
let mut i = 0;
|
||||
while i < 12 {
|
||||
if let Some(m) = consumer.recv().now_or_never() {
|
||||
while i < 5 {
|
||||
if let Ok(m) = time::timeout(Duration::from_millis(1000), consumer.recv()).await {
|
||||
// retry on transient errors until we get a message
|
||||
let m = match m {
|
||||
Err(KafkaError::MessageConsumption(
|
||||
|
@ -564,9 +564,11 @@ async fn test_consume_partition_order() {
|
|||
let partition: i32 = m.partition();
|
||||
assert!(partition == 0 || partition == 2);
|
||||
i += 1;
|
||||
} else {
|
||||
panic!("Timeout receiving message");
|
||||
}
|
||||
|
||||
if let Some(m) = partition1.recv().now_or_never() {
|
||||
if let Ok(m) = time::timeout(Duration::from_millis(1000), partition1.recv()).await {
|
||||
// retry on transient errors until we get a message
|
||||
let m = match m {
|
||||
Err(KafkaError::MessageConsumption(
|
||||
|
@ -583,6 +585,8 @@ async fn test_consume_partition_order() {
|
|||
};
|
||||
assert_eq!(m.partition(), 1);
|
||||
i += 1;
|
||||
} else {
|
||||
panic!("Timeout receiving message");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ fn future_producer(config_overrides: HashMap<&str, &str>) -> FutureProducer<Defa
|
|||
#[tokio::test]
|
||||
async fn test_future_producer_send() {
|
||||
let producer = future_producer(HashMap::new());
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_future_producer_send");
|
||||
|
||||
let results: FuturesUnordered<_> = (0..10)
|
||||
.map(|_| {
|
||||
|
@ -60,7 +60,7 @@ async fn test_future_producer_send_full() {
|
|||
config.insert("message.timeout.ms", "5000");
|
||||
config.insert("queue.buffering.max.messages", "1");
|
||||
let producer = &future_producer(config);
|
||||
let topic_name = &rand_test_topic();
|
||||
let topic_name = &rand_test_topic("test_future_producer_send_full");
|
||||
|
||||
// Fill up the queue.
|
||||
producer
|
||||
|
|
|
@ -31,7 +31,7 @@ fn create_base_consumer(
|
|||
async fn test_produce_consume_seek() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_produce_consume_seek");
|
||||
populate_topic(&topic_name, 5, &value_fn, &key_fn, Some(0), None).await;
|
||||
let consumer = create_base_consumer(&rand_test_group(), None);
|
||||
consumer.subscribe(&[topic_name.as_str()]).unwrap();
|
||||
|
@ -96,7 +96,7 @@ async fn test_produce_consume_seek() {
|
|||
async fn test_produce_consume_seek_partitions() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_produce_consume_seek_partitions");
|
||||
populate_topic(&topic_name, 30, &value_fn, &key_fn, None, None).await;
|
||||
|
||||
let consumer = create_base_consumer(&rand_test_group(), None);
|
||||
|
@ -158,7 +158,7 @@ async fn test_produce_consume_iter() {
|
|||
let _r = env_logger::try_init();
|
||||
|
||||
let start_time = current_time_millis();
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_produce_consume_iter");
|
||||
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
|
||||
let consumer = create_base_consumer(&rand_test_group(), None);
|
||||
consumer.subscribe(&[topic_name.as_str()]).unwrap();
|
||||
|
@ -196,7 +196,7 @@ async fn test_pause_resume_consumer_iter() {
|
|||
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_pause_resume_consumer_iter");
|
||||
populate_topic(
|
||||
&topic_name,
|
||||
MESSAGE_COUNT,
|
||||
|
@ -237,7 +237,7 @@ async fn test_pause_resume_consumer_iter() {
|
|||
async fn test_consume_partition_order() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_consume_partition_order");
|
||||
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(0), None).await;
|
||||
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(1), None).await;
|
||||
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(2), None).await;
|
||||
|
@ -357,7 +357,7 @@ async fn test_consume_partition_order() {
|
|||
async fn test_produce_consume_message_queue_nonempty_callback() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_produce_consume_message_queue_nonempty_callback");
|
||||
|
||||
create_topic(&topic_name, 1).await;
|
||||
|
||||
|
|
|
@ -191,7 +191,7 @@ where
|
|||
#[test]
|
||||
fn test_base_producer_queue_full() {
|
||||
let producer = base_producer(hashmap! { "queue.buffering.max.messages" => "10" });
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_base_producer_queue_full");
|
||||
|
||||
let results = (0..30)
|
||||
.map(|id| {
|
||||
|
@ -235,7 +235,7 @@ fn test_base_producer_timeout() {
|
|||
"bootstrap.servers" => "1.2.3.4"
|
||||
},
|
||||
);
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_base_producer_timeout");
|
||||
|
||||
let results_count = (0..10)
|
||||
.map(|id| {
|
||||
|
@ -346,7 +346,7 @@ fn test_base_producer_headers() {
|
|||
ids: ids_set.clone(),
|
||||
};
|
||||
let producer = base_producer_with_context(context, HashMap::new());
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_base_producer_headers");
|
||||
|
||||
let results_count = (0..10)
|
||||
.map(|id| {
|
||||
|
@ -387,7 +387,7 @@ fn test_base_producer_headers() {
|
|||
fn test_threaded_producer_send() {
|
||||
let context = CollectingContext::new();
|
||||
let producer = threaded_producer_with_context(context.clone(), HashMap::new());
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_threaded_producer_send");
|
||||
|
||||
let results_count = (0..10)
|
||||
.map(|id| {
|
||||
|
@ -431,7 +431,7 @@ fn test_base_producer_opaque_arc() -> Result<(), Box<dyn Error>> {
|
|||
let shared_count = Arc::new(Mutex::new(0));
|
||||
let context = OpaqueArcContext {};
|
||||
let producer = base_producer_with_context(context, HashMap::new());
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_base_producer_opaque_arc");
|
||||
|
||||
let results_count = (0..10)
|
||||
.map(|_| {
|
||||
|
@ -482,7 +482,13 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() {
|
|||
let producer = base_producer_with_context(context.clone(), config_overrides);
|
||||
|
||||
producer
|
||||
.send(BaseRecord::<(), str, usize>::with_opaque_to(&rand_test_topic(), 0).payload(""))
|
||||
.send(
|
||||
BaseRecord::<(), str, usize>::with_opaque_to(
|
||||
&rand_test_topic("test_register_custom_partitioner_linger_non_zero_key_null"),
|
||||
0,
|
||||
)
|
||||
.payload(""),
|
||||
)
|
||||
.unwrap();
|
||||
producer.flush(Duration::from_secs(10)).unwrap();
|
||||
|
||||
|
@ -499,7 +505,7 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() {
|
|||
fn test_custom_partitioner_base_producer() {
|
||||
let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2));
|
||||
let producer = base_producer_with_context(context.clone(), HashMap::new());
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_custom_partitioner_base_producer");
|
||||
|
||||
let results_count = (0..10)
|
||||
.map(|id| {
|
||||
|
@ -527,7 +533,7 @@ fn test_custom_partitioner_base_producer() {
|
|||
fn test_custom_partitioner_threaded_producer() {
|
||||
let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2));
|
||||
let producer = threaded_producer_with_context(context.clone(), HashMap::new());
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_custom_partitioner_threaded_producer");
|
||||
|
||||
let results_count = (0..10)
|
||||
.map(|id| {
|
||||
|
|
|
@ -31,7 +31,7 @@ fn create_consumer(group_id: &str) -> StreamConsumer {
|
|||
async fn test_metadata() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_metadata");
|
||||
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await;
|
||||
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await;
|
||||
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(2), None).await;
|
||||
|
@ -92,7 +92,7 @@ async fn test_metadata() {
|
|||
async fn test_subscription() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_subscription");
|
||||
populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;
|
||||
let consumer = create_consumer(&rand_test_group());
|
||||
consumer.subscribe(&[topic_name.as_str()]).unwrap();
|
||||
|
@ -109,7 +109,7 @@ async fn test_subscription() {
|
|||
async fn test_group_membership() {
|
||||
let _r = env_logger::try_init();
|
||||
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_group_membership");
|
||||
let group_name = rand_test_group();
|
||||
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(0), None).await;
|
||||
populate_topic(&topic_name, 1, &value_fn, &key_fn, Some(1), None).await;
|
||||
|
|
|
@ -64,8 +64,8 @@ fn count_records(topic: &str, iso: IsolationLevel) -> Result<usize, KafkaError>
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_transaction_abort() -> Result<(), Box<dyn Error>> {
|
||||
let consume_topic = rand_test_topic();
|
||||
let produce_topic = rand_test_topic();
|
||||
let consume_topic = rand_test_topic("test_transaction_abort");
|
||||
let produce_topic = rand_test_topic("test_transaction_abort");
|
||||
|
||||
populate_topic(&consume_topic, 30, &value_fn, &key_fn, Some(0), None).await;
|
||||
|
||||
|
@ -132,8 +132,8 @@ async fn test_transaction_abort() -> Result<(), Box<dyn Error>> {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_transaction_commit() -> Result<(), Box<dyn Error>> {
|
||||
let consume_topic = rand_test_topic();
|
||||
let produce_topic = rand_test_topic();
|
||||
let consume_topic = rand_test_topic("test_transaction_commit");
|
||||
let produce_topic = rand_test_topic("test_transaction_commit");
|
||||
|
||||
populate_topic(&consume_topic, 30, &value_fn, &key_fn, Some(0), None).await;
|
||||
|
||||
|
|
|
@ -17,12 +17,12 @@ use rdkafka::producer::{FutureProducer, FutureRecord};
|
|||
use rdkafka::statistics::Statistics;
|
||||
use rdkafka::TopicPartitionList;
|
||||
|
||||
pub fn rand_test_topic() -> String {
|
||||
pub fn rand_test_topic(test_name: &str) -> String {
|
||||
let id = rand::thread_rng()
|
||||
.gen_ascii_chars()
|
||||
.take(10)
|
||||
.collect::<String>();
|
||||
format!("__test_{}", id)
|
||||
format!("__{}_{}", test_name, id)
|
||||
}
|
||||
|
||||
pub fn rand_test_group() -> String {
|
||||
|
@ -170,7 +170,7 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_populate_topic() {
|
||||
let topic_name = rand_test_topic();
|
||||
let topic_name = rand_test_topic("test_populate_topic");
|
||||
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), None).await;
|
||||
|
||||
let total_messages = message_map
|
||||
|
|
Loading…
Reference in New Issue