diff --git a/src/reactor.rs b/src/reactor.rs index c5cd338..11004fc 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -293,6 +293,19 @@ impl ReactorLock<'_> { if ev.writable { ready.append(&mut wakers.writers); } + + // Re-register if there are still writers or + // readers. The can happen if e.g. we were + // previously interested in both readability and + // writability, but only one of them was emitted. + if !(wakers.writers.is_empty() && wakers.readers.is_empty()) { + self.reactor.sys.reregister( + source.raw, + source.key, + !wakers.readers.is_empty(), + !wakers.writers.is_empty(), + )?; + } } } diff --git a/tests/async_io.rs b/tests/async_io.rs index 68c4826..95c3631 100644 --- a/tests/async_io.rs +++ b/tests/async_io.rs @@ -2,7 +2,9 @@ use std::os::unix::net::{UnixDatagram, UnixListener, UnixStream}; use std::{ io, - net::{TcpListener, TcpStream, UdpSocket}, + net::{Shutdown, TcpListener, TcpStream, UdpSocket}, + sync::Arc, + time::Duration, }; use futures::{AsyncReadExt, AsyncWriteExt, StreamExt}; @@ -180,3 +182,58 @@ fn uds_send_to_recv_from() -> io::Result<()> { Ok(()) }) } + +// Test that we correctly re-register interests when we are previously +// interested in both readable and writable events and then we get only one of +// them. (we need to re-register interest on the other.) +#[test] +fn tcp_duplex() -> io::Result<()> { + smol::run(async { + let listener = Async::::bind("127.0.0.1:0")?; + let stream0 = + Arc::new(Async::::connect(listener.get_ref().local_addr()?).await?); + let stream1 = Arc::new(listener.accept().await?.0); + + async fn do_read(s: Arc>) -> io::Result<()> { + let mut buf = vec![0u8; 4096]; + loop { + let len = (&*s).read(&mut buf).await?; + if len == 0 { + return Ok(()); + } + } + } + + async fn do_write(s: Arc>) -> io::Result<()> { + let buf = vec![0u8; 4096]; + for _ in 0..4096 { + (&*s).write_all(&buf).await?; + } + s.get_ref().shutdown(Shutdown::Write)?; + Ok(()) + } + + // Read from and write to stream0. + let r0 = Task::spawn(do_read(stream0.clone())); + let w0 = Task::spawn(do_write(stream0)); + + // Sleep a bit, so that reading and writing are both blocked. + smol::Timer::after(Duration::from_millis(5)).await; + + // Start reading stream1, make stream0 writable. + let r1 = Task::spawn(do_read(stream1.clone())); + + // Finish writing to stream0. + w0.await?; + r1.await?; + + // Start writing to stream1, make stream0 readable. + let w1 = Task::spawn(do_write(stream1)); + + // Will r0 be correctly woken? + r0.await?; + w1.await?; + + Ok(()) + }) +}