Merge pull request #162 from sopium/reregister-the-other-event

Reregister if we get only readable or writable
This commit is contained in:
Stjepan Glavina 2020-06-18 16:07:45 +02:00 committed by GitHub
commit 98ed9fb731
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 71 additions and 1 deletions

View File

@ -293,6 +293,19 @@ impl ReactorLock<'_> {
if ev.writable {
ready.append(&mut wakers.writers);
}
// Re-register if there are still writers or
// readers. The can happen if e.g. we were
// previously interested in both readability and
// writability, but only one of them was emitted.
if !(wakers.writers.is_empty() && wakers.readers.is_empty()) {
self.reactor.sys.reregister(
source.raw,
source.key,
!wakers.readers.is_empty(),
!wakers.writers.is_empty(),
)?;
}
}
}

View File

@ -2,7 +2,9 @@
use std::os::unix::net::{UnixDatagram, UnixListener, UnixStream};
use std::{
io,
net::{TcpListener, TcpStream, UdpSocket},
net::{Shutdown, TcpListener, TcpStream, UdpSocket},
sync::Arc,
time::Duration,
};
use futures::{AsyncReadExt, AsyncWriteExt, StreamExt};
@ -180,3 +182,58 @@ fn uds_send_to_recv_from() -> io::Result<()> {
Ok(())
})
}
// Test that we correctly re-register interests when we are previously
// interested in both readable and writable events and then we get only one of
// them. (we need to re-register interest on the other.)
#[test]
fn tcp_duplex() -> io::Result<()> {
smol::run(async {
let listener = Async::<TcpListener>::bind("127.0.0.1:0")?;
let stream0 =
Arc::new(Async::<TcpStream>::connect(listener.get_ref().local_addr()?).await?);
let stream1 = Arc::new(listener.accept().await?.0);
async fn do_read(s: Arc<Async<TcpStream>>) -> io::Result<()> {
let mut buf = vec![0u8; 4096];
loop {
let len = (&*s).read(&mut buf).await?;
if len == 0 {
return Ok(());
}
}
}
async fn do_write(s: Arc<Async<TcpStream>>) -> io::Result<()> {
let buf = vec![0u8; 4096];
for _ in 0..4096 {
(&*s).write_all(&buf).await?;
}
s.get_ref().shutdown(Shutdown::Write)?;
Ok(())
}
// Read from and write to stream0.
let r0 = Task::spawn(do_read(stream0.clone()));
let w0 = Task::spawn(do_write(stream0));
// Sleep a bit, so that reading and writing are both blocked.
smol::Timer::after(Duration::from_millis(5)).await;
// Start reading stream1, make stream0 writable.
let r1 = Task::spawn(do_read(stream1.clone()));
// Finish writing to stream0.
w0.await?;
r1.await?;
// Start writing to stream1, make stream0 readable.
let w1 = Task::spawn(do_write(stream1));
// Will r0 be correctly woken?
r0.await?;
w1.await?;
Ok(())
})
}