mirror of https://github.com/fede1024/rust-rdkafka
Compare commits
6 Commits
ff8acb8b3a
...
04b88a45ef
Author | SHA1 | Date |
---|---|---|
Arthur Le Moigne | 04b88a45ef | |
David Blewett | e69c2aa40c | |
David Blewett | eacf17389e | |
Arpad Borsos | 15044da8d9 | |
Arthur LE MOIGNE | 6493043232 | |
Misha Padalka | 353812ff95 |
|
@ -150,10 +150,7 @@ impl NativeClientConfig {
|
|||
}
|
||||
|
||||
// Convert the C string to a Rust string.
|
||||
Ok(CStr::from_bytes_with_nul(&buf)
|
||||
.unwrap()
|
||||
.to_string_lossy()
|
||||
.into())
|
||||
Ok(String::from_utf8_lossy(&buf).to_string())
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -155,12 +155,11 @@ where
|
|||
}
|
||||
}
|
||||
_ => {
|
||||
let buf = unsafe {
|
||||
let evname = unsafe {
|
||||
let evname = rdsys::rd_kafka_event_name(event.ptr());
|
||||
CStr::from_ptr(evname).to_bytes()
|
||||
CStr::from_ptr(evname).to_string_lossy()
|
||||
};
|
||||
let evname = String::from_utf8(buf.to_vec()).unwrap();
|
||||
warn!("Ignored event '{}' on consumer poll", evname);
|
||||
warn!("Ignored event '{evname}' on consumer poll");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -197,13 +196,12 @@ where
|
|||
self.context().rebalance(self, err, &mut tpl);
|
||||
}
|
||||
_ => {
|
||||
let buf = unsafe {
|
||||
let err = unsafe {
|
||||
let err_name =
|
||||
rdsys::rd_kafka_err2name(rdsys::rd_kafka_event_error(event.ptr()));
|
||||
CStr::from_ptr(err_name).to_bytes()
|
||||
CStr::from_ptr(err_name).to_string_lossy()
|
||||
};
|
||||
let err = String::from_utf8(buf.to_vec()).unwrap();
|
||||
warn!("invalid rebalance event: {:?}", err);
|
||||
warn!("invalid rebalance event: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -137,7 +137,7 @@ where
|
|||
pub fn bootstrap_servers(&self) -> String {
|
||||
let bootstrap =
|
||||
unsafe { CStr::from_ptr(rdsys::rd_kafka_mock_cluster_bootstraps(self.mock_cluster)) };
|
||||
bootstrap.to_string_lossy().to_string()
|
||||
bootstrap.to_string_lossy().into_owned()
|
||||
}
|
||||
|
||||
/// Clear the cluster's error state for the given ApiKey.
|
||||
|
|
|
@ -368,11 +368,10 @@ where
|
|||
match evtype {
|
||||
rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev),
|
||||
_ => {
|
||||
let buf = unsafe {
|
||||
let evname = unsafe {
|
||||
let evname = rdsys::rd_kafka_event_name(ev.ptr());
|
||||
CStr::from_ptr(evname).to_bytes()
|
||||
CStr::from_ptr(evname).to_string_lossy()
|
||||
};
|
||||
let evname = String::from_utf8(buf.to_vec()).unwrap();
|
||||
warn!("Ignored event '{}' on base producer poll", evname);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,7 +78,6 @@ impl Offset {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: implement Debug
|
||||
/// One element of the topic partition list.
|
||||
pub struct TopicPartitionListElem<'a> {
|
||||
ptr: &'a mut RDKafkaTopicPartition,
|
||||
|
@ -165,6 +164,18 @@ impl<'a> PartialEq for TopicPartitionListElem<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for TopicPartitionListElem<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("TopicPartitionListElem")
|
||||
.field("topic", &self.topic())
|
||||
.field("partition", &self.partition())
|
||||
.field("offset", &self.offset())
|
||||
.field("metadata", &self.metadata())
|
||||
.field("error", &self.error())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
/// A structure to store and manipulate a list of topics and partitions with optional offsets.
|
||||
pub struct TopicPartitionList {
|
||||
ptr: NativePtr<RDKafkaTopicPartitionList>,
|
||||
|
@ -380,22 +391,7 @@ impl Default for TopicPartitionList {
|
|||
|
||||
impl fmt::Debug for TopicPartitionList {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "TPL {{")?;
|
||||
for (i, elem) in self.elements().iter().enumerate() {
|
||||
if i > 0 {
|
||||
write!(f, "; ")?;
|
||||
}
|
||||
write!(
|
||||
f,
|
||||
"{}/{}: offset={:?} metadata={:?}, error={:?}",
|
||||
elem.topic(),
|
||||
elem.partition(),
|
||||
elem.offset(),
|
||||
elem.metadata(),
|
||||
elem.error(),
|
||||
)?;
|
||||
}
|
||||
write!(f, "}}")
|
||||
f.debug_list().entries(self.elements()).finish()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
use rdkafka::{Offset, TopicPartitionList};
|
||||
|
||||
/// Test topic partition list API and wrappers.
|
||||
|
||||
#[test]
|
||||
fn test_fmt_debug() {
|
||||
{
|
||||
let tpl = TopicPartitionList::new();
|
||||
assert_eq!(format!("{tpl:?}"), "[]");
|
||||
}
|
||||
|
||||
{
|
||||
let mut tpl = TopicPartitionList::new();
|
||||
tpl.add_topic_unassigned("foo");
|
||||
tpl.add_partition("bar", 8);
|
||||
tpl.add_partition_offset("bar", 7, Offset::Offset(42))
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
format!("{tpl:?}"),
|
||||
"[TopicPartitionListElem { topic: \"foo\", partition: -1, offset: Invalid, metadata: \"\", error: Ok(()) }, \
|
||||
TopicPartitionListElem { topic: \"bar\", partition: 8, offset: Invalid, metadata: \"\", error: Ok(()) }, \
|
||||
TopicPartitionListElem { topic: \"bar\", partition: 7, offset: Offset(42), metadata: \"\", error: Ok(()) }]");
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue