This does not affect the StreamConsumer or any other wrapper consumer.
It will only incur on an extra Poll call when there's a rebalance event.
When using bindings built upon the rust-rdkafka ffi, the caller is
responsible for initiating the rebalance calls (*assign).
If a high timeout is specified, the rebalance handler will only be
triggered once the timeout period has elapsed.
This fixes it by always returning on rebalance events except when
Timeout::Never. Poll calls with timeout::Never are expected to return
a message.
In some error cases, the `Base{Consumer,Producer}` were eagerly copying strings,
and `unwrap`ing utf8 validation, just to print an error message.
This will avoid the allocation in the common case, and be panic-safe in the presumably unreachable case of invalid utf-8.
With the Event API we propagate generic client instance-level errors,
such as broker connection failures, authentication issues, etc.
However, fatal errors are also propagated via the Event API. These
indicates that the particular instance of the client (producer/consumer)
becomes non-functional.
If you have a consumer wrapping this one (FFI cases), the outer consumer must close
the queue and serve the events via Poll. Otherwise it will hang forever
as prior to calling close there's a rebalance & rdkafka awaits a
response before continuing.
rdkafka reports consumer errors via RD_KAFKA_EVENT_ERROR but producer errors gets
embedded on the ack returned via RD_KAFKA_EVENT_DR. Hence we need to return this event
for the consumer case in order to return the error to the user.
One poll call might not be enough to serve the delivery report
callbacks of the purged messages. The current flush impl will call
poll multiple times until the queue is empty or timeout.
I realize that there is a `stats_raw` trait method, but it would be better to not have to re-type out the statistics fields myself. This came about because I would like to act on many of the individual fields locally via some aggregations, but then output them back out (along with other fields in a larger status struct) as a JSON-encoded string, e.g.:
```rs
#[derive(Serialize)]
pub struct Status {
pub sample_field: u64,
pub sample_field_2: boolean,
pub statistics: Statistics,
}
```