Retry on EINTR

This commit is contained in:
Stjepan Glavina 2020-04-27 15:10:29 +02:00
parent 530f43203d
commit fce8487385
1 changed files with 37 additions and 32 deletions

View File

@ -150,7 +150,7 @@ impl Reactor {
self.fire_timers();
}
// Interrupt the reactor.
// Notify that a timer was added.
self.timer_event.notify();
id
@ -242,7 +242,7 @@ impl ReactorLock<'_> {
self.react(false)
}
/// Blocks until at least one event is processed or the syscall is interrupted.
/// Blocks until at least one event is processed.
pub fn wait(&mut self) -> io::Result<()> {
self.react(true)
}
@ -257,39 +257,44 @@ impl ReactorLock<'_> {
Some(Duration::from_secs(0))
};
// Block on I/O events.
match self.reactor.sys.wait(&mut self.events, timeout) {
// The timeout was hit.
Ok(0) => Ok(()),
// At least one I/O event occured.
Ok(_) => {
// Iterate over sources in the event list.
let sources = self.reactor.sources.lock();
for source in self.events.iter().filter_map(|i| sources.get(i)) {
// I/O events may deregister sources, so we need to re-register.
self.reactor.sys.reregister(source.raw, source.key)?;
// Bump the ticker.
let mut wakers = source.wakers.lock();
let tick = source.tick.load(Ordering::Acquire);
source.tick.store(tick.wrapping_add(1), Ordering::Release);
// Wake up tasks waiting on I/O.
for w in wakers.drain(..) {
w.wake();
}
loop {
// Block on I/O events.
match self.reactor.sys.wait(&mut self.events, timeout) {
// The timeout was hit so fire ready timers.
Ok(0) => {
self.reactor.fire_timers();
return Ok(());
}
Ok(())
// At least one I/O event occured.
Ok(_) => {
// Iterate over sources in the event list.
let sources = self.reactor.sources.lock();
for source in self.events.iter().filter_map(|i| sources.get(i)) {
// I/O events may deregister sources, so we need to re-register.
self.reactor.sys.reregister(source.raw, source.key)?;
// Bump the ticker.
let mut wakers = source.wakers.lock();
let tick = source.tick.load(Ordering::Acquire);
source.tick.store(tick.wrapping_add(1), Ordering::Release);
// Wake up tasks waiting on I/O.
for w in wakers.drain(..) {
w.wake();
}
}
return Ok(());
}
// The syscall was interrupted.
Err(err) if err.kind() == io::ErrorKind::Interrupted => continue,
// An actual error occureed.
Err(err) => return Err(err),
}
// The syscall was interrupted.
Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
// An actual error occureed.
Err(err) => Err(err),
}
}
}