Compare commits

...

6 Commits

Author SHA1 Message Date
Mykhailo Padalka bc87cea6f5
Merge 2e939b5ef8 into e69c2aa40c 2024-01-19 19:56:31 +01:00
David Blewett e69c2aa40c
Merge pull request #587 from Magister/fix/native_config_get
Fix panic on getting config value from NativeConfig
2024-01-19 09:59:15 -05:00
David Blewett eacf17389e
Merge pull request #639 from Swatinem/to-string
Use `CStr::to_string_lossy` in Base{Consumer,Producer}
2024-01-19 09:58:34 -05:00
Arpad Borsos 15044da8d9
Use `CStr::to_string_lossy` in Base{Consumer,Producer}
In some error cases, the `Base{Consumer,Producer}` were eagerly copying strings,
and `unwrap`ing utf8 validation, just to print an error message.

This will avoid the allocation in the common case, and be panic-safe in the presumably unreachable case of invalid utf-8.
2023-12-19 12:30:11 +01:00
Misha Padalka 353812ff95 Fix panic on getting config value from NativeConfig
Kafka can return string with multiple \0 chars (seen on Windows x64),
and CStr::from_bytes_with_nul panics in that case.
String::from_utf8_lossy() handles that ok
2023-06-13 17:08:38 +03:00
Misha Padalka 2e939b5ef8 Snappy is not enabled by default in cmake build, enable it 2023-06-13 17:06:09 +03:00
5 changed files with 12 additions and 16 deletions

View File

@ -283,6 +283,8 @@ fn build_librdkafka() {
config.define("ENABLE_LZ4_EXT", "0");
}
config.define("WITH_SNAPPY", "1");
if let Ok(system_name) = env::var("CMAKE_SYSTEM_NAME") {
config.define("CMAKE_SYSTEM_NAME", system_name);
}

View File

@ -150,10 +150,7 @@ impl NativeClientConfig {
}
// Convert the C string to a Rust string.
Ok(CStr::from_bytes_with_nul(&buf)
.unwrap()
.to_string_lossy()
.into())
Ok(String::from_utf8_lossy(&buf).to_string())
}
}

View File

@ -155,12 +155,11 @@ where
}
}
_ => {
let buf = unsafe {
let evname = unsafe {
let evname = rdsys::rd_kafka_event_name(event.ptr());
CStr::from_ptr(evname).to_bytes()
CStr::from_ptr(evname).to_string_lossy()
};
let evname = String::from_utf8(buf.to_vec()).unwrap();
warn!("Ignored event '{}' on consumer poll", evname);
warn!("Ignored event '{evname}' on consumer poll");
}
}
}
@ -197,13 +196,12 @@ where
self.context().rebalance(self, err, &mut tpl);
}
_ => {
let buf = unsafe {
let err = unsafe {
let err_name =
rdsys::rd_kafka_err2name(rdsys::rd_kafka_event_error(event.ptr()));
CStr::from_ptr(err_name).to_bytes()
CStr::from_ptr(err_name).to_string_lossy()
};
let err = String::from_utf8(buf.to_vec()).unwrap();
warn!("invalid rebalance event: {:?}", err);
warn!("invalid rebalance event: {err}");
}
}
}

View File

@ -137,7 +137,7 @@ where
pub fn bootstrap_servers(&self) -> String {
let bootstrap =
unsafe { CStr::from_ptr(rdsys::rd_kafka_mock_cluster_bootstraps(self.mock_cluster)) };
bootstrap.to_string_lossy().to_string()
bootstrap.to_string_lossy().into_owned()
}
/// Clear the cluster's error state for the given ApiKey.

View File

@ -368,11 +368,10 @@ where
match evtype {
rdsys::RD_KAFKA_EVENT_DR => self.handle_delivery_report_event(ev),
_ => {
let buf = unsafe {
let evname = unsafe {
let evname = rdsys::rd_kafka_event_name(ev.ptr());
CStr::from_ptr(evname).to_bytes()
CStr::from_ptr(evname).to_string_lossy()
};
let evname = String::from_utf8(buf.to_vec()).unwrap();
warn!("Ignored event '{}' on base producer poll", evname);
}
}