From b5cc4c433dd64a0d9056abcfc0f9339ac86f445a Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Fri, 19 Jun 2020 17:03:33 +0200 Subject: [PATCH 1/4] Fix lost reader wakeups --- src/reactor.rs | 17 ++++-- tests/async_io.rs | 145 +++++++++++++++++++++++++++++++++++++--------- 2 files changed, 130 insertions(+), 32 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index e30c6e5..af4b0f7 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -478,10 +478,10 @@ mod sys { } } fn read_flags() -> EpollFlags { - libc::EPOLLIN | libc::EPOLLRDHUP + libc::EPOLLIN | libc::EPOLLRDHUP | libc::EPOLLHUP | libc::EPOLLERR | libc::EPOLLPRI } fn write_flags() -> EpollFlags { - libc::EPOLLOUT + libc::EPOLLOUT | libc::EPOLLHUP | libc::EPOLLER } pub struct Events { @@ -570,7 +570,7 @@ mod sys { Ok(()) } pub fn deregister(&self, fd: RawFd) -> io::Result<()> { - let flags = libc::EV_RECEIPT | libc::EV_DELETE; + let flags = libc::EV_DELETE | libc::EV_RECEIPT; let changelist = [ KEvent::new(fd as _, libc::EVFILT_WRITE, flags, 0, 0, 0), KEvent::new(fd as _, libc::EVFILT_READ, flags, 0, 0, 0), @@ -608,9 +608,14 @@ mod sys { Events { list, len } } pub fn iter(&self) -> impl Iterator + '_ { + // On some platforms, closing the read end of a pipe wakes up writers, but the + // event is reported as EVFILT_READ with the EV_EOF flag. + // + // https://github.com/golang/go/commit/23aad448b1e3f7c3b4ba2af90120bde91ac865b4 self.list[..self.len].iter().map(|ev| Event { readable: ev.filter() == libc::EVFILT_READ, - writable: ev.filter() == libc::EVFILT_WRITE, + writable: ev.filter() == libc::EVFILT_WRITE + || (ev.filter() == libc::EVFILT_READ && (ev.flags() & libc::EV_EOF) != 0), key: ev.udata() as usize, }) } @@ -677,10 +682,10 @@ mod sys { } } fn read_flags() -> EventFlag { - EventFlag::IN | EventFlag::RDHUP + EventFlag::IN | EventFlag::RDHUP | EventFlag::HUP | EventFlag::ERR | EventFlag::PRI } fn write_flags() -> EventFlag { - EventFlag::OUT + EventFlag::OUT | EventFlag::HUP | EventFlag::ERR } pub struct Events(wepoll_binding::Events); diff --git a/tests/async_io.rs b/tests/async_io.rs index 95c3631..88fe96a 100644 --- a/tests/async_io.rs +++ b/tests/async_io.rs @@ -1,14 +1,13 @@ +use std::future::Future; +use std::io; +use std::net::{Shutdown, TcpListener, TcpStream, UdpSocket}; #[cfg(unix)] use std::os::unix::net::{UnixDatagram, UnixListener, UnixStream}; -use std::{ - io, - net::{Shutdown, TcpListener, TcpStream, UdpSocket}, - sync::Arc, - time::Duration, -}; +use std::sync::Arc; +use std::time::Duration; use futures::{AsyncReadExt, AsyncWriteExt, StreamExt}; -use smol::{Async, Task}; +use smol::{Async, Task, Timer}; #[cfg(unix)] use tempfile::tempdir; @@ -19,12 +18,21 @@ Etiam vestibulum lorem vel urna tempor, eu fermentum odio aliquam. Aliquam consequat urna vitae ipsum pulvinar, in blandit purus eleifend. "; +/// Runs future inside a local task. +/// +/// The main future passed to `smol::run()` is sometimes polled even if it was not woken - e.g. +/// this can happen when the executor is waiting on the reactor and then wakes up for whatever +/// reason. +fn run(future: impl Future + 'static) -> T { + smol::run(async { Task::local(async { future.await }).await }) +} + #[test] fn tcp_connect() -> io::Result<()> { - smol::run(async { - let listener = Async::::bind("127.0.0.1:12300")?; + run(async { + let listener = Async::::bind("127.0.0.1:0")?; let addr = listener.get_ref().local_addr()?; - let task = Task::spawn(async move { listener.accept().await }); + let task = Task::local(async move { listener.accept().await }); let stream2 = Async::::connect(&addr).await?; let stream1 = task.await?.0; @@ -48,10 +56,11 @@ fn tcp_connect() -> io::Result<()> { #[test] fn tcp_peek_read() -> io::Result<()> { - smol::run(async { - let listener = Async::::bind("127.0.0.1:12301")?; + run(async { + let listener = Async::::bind("127.0.0.1:0")?; + let addr = listener.get_ref().local_addr()?; - let mut stream = Async::::connect("127.0.0.1:12301").await?; + let mut stream = Async::::connect(addr).await?; stream.write_all(LOREM_IPSUM).await?; let mut buf = [0; 1024]; @@ -67,11 +76,57 @@ fn tcp_peek_read() -> io::Result<()> { }) } +#[test] +fn tcp_reader_hangup() -> io::Result<()> { + run(async { + let listener = Async::::bind("127.0.0.1:0")?; + let addr = listener.get_ref().local_addr()?; + let task = Task::local(async move { listener.accept().await }); + + let mut stream2 = Async::::connect(&addr).await?; + let stream1 = task.await?.0; + + let task = Task::local(async move { + Timer::after(Duration::from_secs(1)).await; + drop(stream1); + }); + + while stream2.write_all(LOREM_IPSUM).await.is_ok() {} + task.await; + + Ok(()) + }) +} + +#[test] +fn tcp_writer_hangup() -> io::Result<()> { + run(async { + let listener = Async::::bind("127.0.0.1:0")?; + let addr = listener.get_ref().local_addr()?; + let task = Task::local(async move { listener.accept().await }); + + let mut stream2 = Async::::connect(&addr).await?; + let stream1 = task.await?.0; + + let task = Task::local(async move { + Timer::after(Duration::from_secs(1)).await; + drop(stream1); + }); + + let mut v = vec![]; + stream2.read_to_end(&mut v).await?; + assert!(v.is_empty()); + + task.await; + Ok(()) + }) +} + #[test] fn udp_send_recv() -> io::Result<()> { - smol::run(async { - let socket1 = Async::::bind("127.0.0.1:12302")?; - let socket2 = Async::::bind("127.0.0.1:12303")?; + run(async { + let socket1 = Async::::bind("127.0.0.1:0")?; + let socket2 = Async::::bind("127.0.0.1:0")?; socket1.get_ref().connect(socket2.get_ref().local_addr()?)?; let mut buf = [0u8; 1024]; @@ -97,7 +152,7 @@ fn udp_send_recv() -> io::Result<()> { #[cfg(unix)] #[test] fn udp_connect() -> io::Result<()> { - smol::run(async { + run(async { let dir = tempdir()?; let path = dir.path().join("socket"); @@ -120,13 +175,13 @@ fn udp_connect() -> io::Result<()> { #[cfg(unix)] #[test] fn uds_connect() -> io::Result<()> { - smol::run(async { + run(async { let dir = tempdir()?; let path = dir.path().join("socket"); let listener = Async::::bind(&path)?; let addr = listener.get_ref().local_addr()?; - let task = Task::spawn(async move { listener.accept().await }); + let task = Task::local(async move { listener.accept().await }); let stream2 = Async::::connect(addr.as_pathname().unwrap()).await?; let stream1 = task.await?.0; @@ -153,7 +208,7 @@ fn uds_connect() -> io::Result<()> { #[cfg(unix)] #[test] fn uds_send_recv() -> io::Result<()> { - smol::run(async { + run(async { let (socket1, socket2) = Async::::pair()?; socket1.send(LOREM_IPSUM).await?; @@ -168,7 +223,7 @@ fn uds_send_recv() -> io::Result<()> { #[cfg(unix)] #[test] fn uds_send_to_recv_from() -> io::Result<()> { - smol::run(async { + run(async { let dir = tempdir()?; let path = dir.path().join("socket"); let socket1 = Async::::bind(&path)?; @@ -183,12 +238,50 @@ fn uds_send_to_recv_from() -> io::Result<()> { }) } +#[cfg(unix)] +#[test] +fn uds_reader_hangup() -> io::Result<()> { + run(async { + let (socket1, mut socket2) = Async::::pair()?; + + let task = Task::local(async move { + Timer::after(Duration::from_secs(1)).await; + drop(socket1); + }); + + while socket2.write_all(LOREM_IPSUM).await.is_ok() {} + task.await; + + Ok(()) + }) +} + +#[cfg(unix)] +#[test] +fn uds_writer_hangup() -> io::Result<()> { + run(async { + let (socket1, mut socket2) = Async::::pair()?; + + let task = Task::local(async move { + Timer::after(Duration::from_secs(1)).await; + drop(socket1); + }); + + let mut v = vec![]; + socket2.read_to_end(&mut v).await?; + assert!(v.is_empty()); + + task.await; + Ok(()) + }) +} + // Test that we correctly re-register interests when we are previously // interested in both readable and writable events and then we get only one of // them. (we need to re-register interest on the other.) #[test] fn tcp_duplex() -> io::Result<()> { - smol::run(async { + run(async { let listener = Async::::bind("127.0.0.1:0")?; let stream0 = Arc::new(Async::::connect(listener.get_ref().local_addr()?).await?); @@ -214,21 +307,21 @@ fn tcp_duplex() -> io::Result<()> { } // Read from and write to stream0. - let r0 = Task::spawn(do_read(stream0.clone())); - let w0 = Task::spawn(do_write(stream0)); + let r0 = Task::local(do_read(stream0.clone())); + let w0 = Task::local(do_write(stream0)); // Sleep a bit, so that reading and writing are both blocked. smol::Timer::after(Duration::from_millis(5)).await; // Start reading stream1, make stream0 writable. - let r1 = Task::spawn(do_read(stream1.clone())); + let r1 = Task::local(do_read(stream1.clone())); // Finish writing to stream0. w0.await?; r1.await?; // Start writing to stream1, make stream0 readable. - let w1 = Task::spawn(do_write(stream1)); + let w1 = Task::local(do_write(stream1)); // Will r0 be correctly woken? r0.await?; From ab9c9d9e4eedba34534320291bcc1aab9aabb286 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Fri, 19 Jun 2020 17:06:33 +0200 Subject: [PATCH 2/4] Fix compilation error --- src/reactor.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/reactor.rs b/src/reactor.rs index af4b0f7..ac9c5e3 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -254,7 +254,7 @@ impl ReactorLock<'_> { // The timeout was hit so fire ready timers. Ok(0) => { self.reactor.fire_timers(); - return Ok(()); + Ok(()) } // At least one I/O event occured. @@ -481,7 +481,7 @@ mod sys { libc::EPOLLIN | libc::EPOLLRDHUP | libc::EPOLLHUP | libc::EPOLLERR | libc::EPOLLPRI } fn write_flags() -> EpollFlags { - libc::EPOLLOUT | libc::EPOLLHUP | libc::EPOLLER + libc::EPOLLOUT | libc::EPOLLHUP | libc::EPOLLERR } pub struct Events { From f40acf640c16733db0fae8c725f3b19b3afc75fa Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sat, 20 Jun 2020 15:54:22 +0200 Subject: [PATCH 3/4] Fix lost wakeups in the executor --- src/multitask.rs | 58 ++++++++++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/src/multitask.rs b/src/multitask.rs index 1d1ec9e..a8b97b8 100644 --- a/src/multitask.rs +++ b/src/multitask.rs @@ -46,10 +46,12 @@ impl Global { /// A list of sleeping workers. struct Sleepers { - /// Number of sleeping workers. + /// Number of sleeping workers (both notified and unnotified). count: usize, /// Callbacks of sleeping unnotified workers. + /// + /// A sleeping worker is notified when its callback is missing from this list. callbacks: Vec, } @@ -60,14 +62,19 @@ impl Sleepers { self.callbacks.push(callback.clone()); } - /// Updates the callback of an already inserted worker. - fn update(&mut self, callback: &Callback) { + /// Re-inserts a sleeping worker's callback if it was notified. + /// + /// Returns `true` if the worker was notified. + fn update(&mut self, callback: &Callback) -> bool { if self.callbacks.iter().all(|cb| cb != callback) { self.callbacks.push(callback.clone()); + true + } else { + false } } - /// Removes a previously inserted worker. + /// Removes a previously inserted sleeping worker. fn remove(&mut self, callback: &Callback) { self.count -= 1; for i in (0..self.callbacks.len()).rev() { @@ -205,6 +212,11 @@ pub(crate) struct Worker { callback: Callback, /// Set to `true` when in sleeping state. + /// + /// States a worker can be in: + /// 1) Woken. + /// 2a) Sleeping and unnotified. + /// 2b) Sleeping and notified. sleeping: Cell, /// Bumped every time a task is run. @@ -243,24 +255,28 @@ impl Worker { Task(Some(handle)) } - /// Moves the worker into sleeping state. + /// Moves the worker into sleeping and unnotified state. + /// + /// Returns `true` if the worker was already sleeping and unnotified. fn sleep(&self) -> bool { let mut sleepers = self.global.sleepers.lock().unwrap(); if self.sleeping.get() { - sleepers.update(&self.callback); - self.global - .notified - .swap(sleepers.is_notified(), Ordering::SeqCst); - false + // Already sleeping, check if notified. + if !sleepers.update(&self.callback) { + return false; + } } else { + // Move to sleeping state. sleepers.insert(&self.callback); - self.global - .notified - .swap(sleepers.is_notified(), Ordering::SeqCst); - self.sleeping.set(true); - true } + + self.global + .notified + .swap(sleepers.is_notified(), Ordering::SeqCst); + + self.sleeping.set(true); + true } /// Moves the worker into woken state. @@ -268,14 +284,13 @@ impl Worker { if self.sleeping.get() { let mut sleepers = self.global.sleepers.lock().unwrap(); sleepers.remove(&self.callback); + self.global .notified .swap(sleepers.is_notified(), Ordering::SeqCst); - self.sleeping.set(false); - true - } else { - false } + + self.sleeping.replace(false) } /// Runs a single task and returns `true` if one was found. @@ -283,10 +298,9 @@ impl Worker { loop { match self.search() { None => { - // Go to sleep and then: - // - If already in sleeping state, return. - // - Otherwise, search again. + // Move to sleeping and unnotified state. if !self.sleep() { + // If already sleeping and unnotified, return. return false; } } From 12fe6b593831da1dfa0b4d11a4c4ea2744fa8561 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sat, 20 Jun 2020 16:29:04 +0200 Subject: [PATCH 4/4] Fix a scheduling fairness issue --- examples/ctrl-c.rs | 2 +- examples/unix-signal.rs | 2 +- src/multitask.rs | 1 + src/reactor.rs | 9 ++++++--- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/examples/ctrl-c.rs b/examples/ctrl-c.rs index d7f070f..d8decb8 100644 --- a/examples/ctrl-c.rs +++ b/examples/ctrl-c.rs @@ -20,7 +20,7 @@ fn main() { smol::run(async { println!("Waiting for Ctrl-C..."); - // Receive a message that indicates the Ctrl-C signal occured. + // Receive a message that indicates the Ctrl-C signal occurred. ctrl_c.recv().await; println!("Done!"); diff --git a/examples/unix-signal.rs b/examples/unix-signal.rs index b437e34..5c2874a 100644 --- a/examples/unix-signal.rs +++ b/examples/unix-signal.rs @@ -20,7 +20,7 @@ fn main() -> std::io::Result<()> { signal_hook::pipe::register(signal_hook::SIGINT, a)?; println!("Waiting for Ctrl-C..."); - // Receive a byte that indicates the Ctrl-C signal occured. + // Receive a byte that indicates the Ctrl-C signal occurred. b.read_exact(&mut [0]).await?; println!("Done!"); diff --git a/src/multitask.rs b/src/multitask.rs index a8b97b8..ca9cc3a 100644 --- a/src/multitask.rs +++ b/src/multitask.rs @@ -346,6 +346,7 @@ impl Worker { self.global.queue.push(err.into_inner()).unwrap(); self.global.notify(); } + self.local.flush().unwrap(); } return true; diff --git a/src/reactor.rs b/src/reactor.rs index ac9c5e3..7ab7b9b 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -251,13 +251,16 @@ impl ReactorLock<'_> { // Block on I/O events. match self.reactor.sys.wait(&mut self.events, timeout) { - // The timeout was hit so fire ready timers. + // No I/O events occurred. Ok(0) => { - self.reactor.fire_timers(); + if timeout != Some(Duration::from_secs(0)) { + // The non-zero timeout was hit so fire ready timers. + self.reactor.fire_timers(); + } Ok(()) } - // At least one I/O event occured. + // At least one I/O event occurred. Ok(_) => { // Iterate over sources in the event list. let sources = self.reactor.sources.lock();