Always re-register IoEvent

This commit is contained in:
Stjepan Glavina 2020-05-17 10:58:06 +02:00
parent 21e65af190
commit e46e410d86
3 changed files with 16 additions and 1 deletions

View File

@ -217,6 +217,13 @@ impl<T: IntoRawSocket> IntoRawSocket for Async<T> {
}
impl<T> Async<T> {
/// Reregisters the I/O handle is registered in the reactor.
///
/// This is a useful method when the reactor is used in oneshot mode.
pub(crate) fn reregister(&self) -> io::Result<()> {
self.source.reregister()
}
/// Gets a reference to the inner I/O handle.
///
/// # Examples

View File

@ -71,6 +71,7 @@ impl IoEvent {
// Read all available bytes from the receiving socket.
while self.0.reader.get_ref().read(&mut [0; 64]).is_ok() {}
let value = self.0.flag.swap(false, Ordering::SeqCst);
let _ = self.0.reader.reregister();
// Publish all in-memory changes after clearing the flag.
atomic::fence(Ordering::SeqCst);

View File

@ -324,6 +324,13 @@ pub(crate) struct Source {
}
impl Source {
/// Reregisters the I/O handle is registered in the reactor.
///
/// This is a useful method when the reactor is used in oneshot mode.
pub(crate) fn reregister(&self) -> io::Result<()> {
Reactor::get().sys.reregister(self.raw, self.key)
}
/// Attempts a non-blocking I/O operation and registers a waker if it errors with `WouldBlock`.
pub fn poll_io<R>(
&self,
@ -355,7 +362,7 @@ impl Source {
if self.tick.load(Ordering::Acquire) == tick {
if wakers.is_empty() {
// Re-register the I/O handle if it's in oneshot mode.
Reactor::get().sys.reregister(self.raw, self.key)?;
self.reregister()?;
}
wakers.push(cx.waker().clone());