mirror of https://github.com/stjepang/smol
Reregister if we get only readable or writable
but we are interested in both. Fix #161.
This commit is contained in:
parent
d43f850050
commit
e8d0febbc3
|
@ -293,6 +293,19 @@ impl ReactorLock<'_> {
|
|||
if ev.writable {
|
||||
ready.append(&mut wakers.writers);
|
||||
}
|
||||
|
||||
// Re-register if there are still writers or
|
||||
// readers. The can happen if e.g. we were
|
||||
// previously interested in both readability and
|
||||
// writability, but only one of them was emitted.
|
||||
if !(wakers.writers.is_empty() && wakers.readers.is_empty()) {
|
||||
self.reactor.sys.reregister(
|
||||
source.raw,
|
||||
source.key,
|
||||
!wakers.readers.is_empty(),
|
||||
!wakers.writers.is_empty(),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,7 +2,9 @@
|
|||
use std::os::unix::net::{UnixDatagram, UnixListener, UnixStream};
|
||||
use std::{
|
||||
io,
|
||||
net::{TcpListener, TcpStream, UdpSocket},
|
||||
net::{Shutdown, TcpListener, TcpStream, UdpSocket},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use futures::{AsyncReadExt, AsyncWriteExt, StreamExt};
|
||||
|
@ -180,3 +182,58 @@ fn uds_send_to_recv_from() -> io::Result<()> {
|
|||
Ok(())
|
||||
})
|
||||
}
|
||||
|
||||
// Test that we correctly re-register interests when we are previously
|
||||
// interested in both readable and writable events and then we get only one of
|
||||
// them. (we need to re-register interest on the other.)
|
||||
#[test]
|
||||
fn tcp_duplex() -> io::Result<()> {
|
||||
smol::run(async {
|
||||
let listener = Async::<TcpListener>::bind("127.0.0.1:0")?;
|
||||
let stream0 =
|
||||
Arc::new(Async::<TcpStream>::connect(listener.get_ref().local_addr()?).await?);
|
||||
let stream1 = Arc::new(listener.accept().await?.0);
|
||||
|
||||
async fn do_read(s: Arc<Async<TcpStream>>) -> io::Result<()> {
|
||||
let mut buf = vec![0u8; 4096];
|
||||
loop {
|
||||
let len = (&*s).read(&mut buf).await?;
|
||||
if len == 0 {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn do_write(s: Arc<Async<TcpStream>>) -> io::Result<()> {
|
||||
let buf = vec![0u8; 4096];
|
||||
for _ in 0..4096 {
|
||||
(&*s).write_all(&buf).await?;
|
||||
}
|
||||
s.get_ref().shutdown(Shutdown::Write)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Read from and write to stream0.
|
||||
let r0 = Task::spawn(do_read(stream0.clone()));
|
||||
let w0 = Task::spawn(do_write(stream0));
|
||||
|
||||
// Sleep a bit, so that reading and writing are both blocked.
|
||||
smol::Timer::after(Duration::from_millis(5)).await;
|
||||
|
||||
// Start reading stream1, make stream0 writable.
|
||||
let r1 = Task::spawn(do_read(stream1.clone()));
|
||||
|
||||
// Finish writing to stream0.
|
||||
w0.await?;
|
||||
r1.await?;
|
||||
|
||||
// Start writing to stream1, make stream0 readable.
|
||||
let w1 = Task::spawn(do_write(stream1));
|
||||
|
||||
// Will r0 be correctly woken?
|
||||
r0.await?;
|
||||
w1.await?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue