Compare commits

...

6 Commits

Author SHA1 Message Date
Arthur Le Moigne 04b88a45ef
Merge 6493043232 into e69c2aa40c 2024-01-19 19:56:31 +01:00
David Blewett e69c2aa40c
Merge pull request #587 from Magister/fix/native_config_get
Fix panic on getting config value from NativeConfig
2024-01-19 09:59:15 -05:00
David Blewett eacf17389e
Merge pull request #639 from Swatinem/to-string
Use `CStr::to_string_lossy` in Base{Consumer,Producer}
2024-01-19 09:58:34 -05:00
Arpad Borsos 15044da8d9
Use `CStr::to_string_lossy` in Base{Consumer,Producer}
In some error cases, the `Base{Consumer,Producer}` were eagerly copying strings,
and `unwrap`ing utf8 validation, just to print an error message.

This will avoid the allocation in the common case, and be panic-safe in the presumably unreachable case of invalid utf-8.
2023-12-19 12:30:11 +01:00
Arthur LE MOIGNE 6493043232 Better implementation of Debug trait for TopicPartitionList 2023-10-02 14:15:04 +02:00
Misha Padalka 353812ff95 Fix panic on getting config value from NativeConfig
Kafka can return string with multiple \0 chars (seen on Windows x64),
and CStr::from_bytes_with_nul panics in that case.
String::from_utf8_lossy() handles that ok
2023-06-13 17:08:38 +03:00
6 changed files with 47 additions and 33 deletions

View File

@ -150,10 +150,7 @@ impl NativeClientConfig {
}
// Convert the C string to a Rust string.
Ok(CStr::from_bytes_with_nul(&buf)
.unwrap()
.to_string_lossy()
.into())
Ok(String::from_utf8_lossy(&buf).to_string())
}
}

View File

@ -155,12 +155,11 @@ where
}
}
_ => {
let buf = unsafe {
let evname = unsafe {
let evname = rdsys::rd_kafka_event_name(event.ptr());
CStr::from_ptr(evname).to_bytes()
CStr::from_ptr(evname).to_string_lossy()
};
let evname = String::from_utf8(buf.to_vec()).unwrap();
warn!("Ignored event '{}' on consumer poll", evname);
warn!("Ignored event '{evname}' on consumer poll");
}
}
}
@ -197,13 +196,12 @@ where
self.context().rebalance(self, err, &mut tpl);
}
_ => {
let buf = unsafe {
let err = unsafe {
let err_name =
rdsys::rd_kafka_err2name(rdsys::rd_kafka_event_error(event.ptr()));
CStr::from_ptr(err_name).to_bytes()
CStr::from_ptr(err_name).to_string_lossy()
};
let err = String::from_utf8(buf.to_vec()).unwrap();
warn!("invalid rebalance event: {:?}", err);
warn!("invalid rebalance event: {err}");
}
}
}

View File

@ -137,7 +137,7 @@ where
pub fn bootstrap_servers(&self) -> String {
let bootstrap =
unsafe { CStr::from_ptr(rdsys::rd_kafka_mock_cluster_bootstraps(self.mock_cluster)) };
bootstrap.to_string_lossy().to_string()
bootstrap.to_string_lossy().into_owned()
}
/// Clear the cluster's error state for the given ApiKey.

View File

@ -368,11 +368,10 @@ where
match evtype {
rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev),
_ => {
let buf = unsafe {
let evname = unsafe {
let evname = rdsys::rd_kafka_event_name(ev.ptr());
CStr::from_ptr(evname).to_bytes()
CStr::from_ptr(evname).to_string_lossy()
};
let evname = String::from_utf8(buf.to_vec()).unwrap();
warn!("Ignored event '{}' on base producer poll", evname);
}
}

View File

@ -78,7 +78,6 @@ impl Offset {
}
}
// TODO: implement Debug
/// One element of the topic partition list.
pub struct TopicPartitionListElem<'a> {
ptr: &'a mut RDKafkaTopicPartition,
@ -165,6 +164,18 @@ impl<'a> PartialEq for TopicPartitionListElem<'a> {
}
}
impl fmt::Debug for TopicPartitionListElem<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TopicPartitionListElem")
.field("topic", &self.topic())
.field("partition", &self.partition())
.field("offset", &self.offset())
.field("metadata", &self.metadata())
.field("error", &self.error())
.finish()
}
}
/// A structure to store and manipulate a list of topics and partitions with optional offsets.
pub struct TopicPartitionList {
ptr: NativePtr<RDKafkaTopicPartitionList>,
@ -380,22 +391,7 @@ impl Default for TopicPartitionList {
impl fmt::Debug for TopicPartitionList {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "TPL {{")?;
for (i, elem) in self.elements().iter().enumerate() {
if i > 0 {
write!(f, "; ")?;
}
write!(
f,
"{}/{}: offset={:?} metadata={:?}, error={:?}",
elem.topic(),
elem.partition(),
elem.offset(),
elem.metadata(),
elem.error(),
)?;
}
write!(f, "}}")
f.debug_list().entries(self.elements()).finish()
}
}

View File

@ -0,0 +1,24 @@
use rdkafka::{Offset, TopicPartitionList};
/// Test topic partition list API and wrappers.
#[test]
fn test_fmt_debug() {
{
let tpl = TopicPartitionList::new();
assert_eq!(format!("{tpl:?}"), "[]");
}
{
let mut tpl = TopicPartitionList::new();
tpl.add_topic_unassigned("foo");
tpl.add_partition("bar", 8);
tpl.add_partition_offset("bar", 7, Offset::Offset(42))
.unwrap();
assert_eq!(
format!("{tpl:?}"),
"[TopicPartitionListElem { topic: \"foo\", partition: -1, offset: Invalid, metadata: \"\", error: Ok(()) }, \
TopicPartitionListElem { topic: \"bar\", partition: 8, offset: Invalid, metadata: \"\", error: Ok(()) }, \
TopicPartitionListElem { topic: \"bar\", partition: 7, offset: Offset(42), metadata: \"\", error: Ok(()) }]");
}
}