Merge branch 'master' into refactor

This commit is contained in:
Stjepan Glavina 2020-06-18 16:10:54 +02:00
commit ebc1f4fb34
6 changed files with 86 additions and 5 deletions

View File

@ -1,3 +1,14 @@
# Version 0.1.12
- Fix a bug in `Async::<UdpSocket>::recv()`.
# Version 0.1.11
- Update `wepoll-binding`.
- Reduce dependencies.
- Replace `nix` with `libc`.
- Set minimum required `tokio` version to 0.2.
# Version 0.1.10
- Fix incorrectly reported error kind when connecting fails.

View File

@ -1,6 +1,6 @@
[package]
name = "smol"
version = "0.1.10"
version = "0.1.12"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
description = "A small and fast async runtime"
@ -38,7 +38,7 @@ slab = "0.4.2"
socket2 = { version = "0.3.12", features = ["pair", "unix"] }
[dependencies.tokio]
version = "0.2.19"
version = "0.2"
default-features = false
features = ["rt-threaded"]
optional = true

View File

@ -34,7 +34,7 @@ signal-hook = "0.1.13"
smol = { path = "../", features = ["tokio02"] }
surf = { version = "2.0.0-alpha.1", default-features = false, features = ["h1-client"] }
tempfile = "3.1.0"
tokio = { version = "0.2.18", default-features = false }
tokio = { version = "0.2", default-features = false }
tungstenite = "0.10.1"
url = "2.1.1"

View File

@ -852,7 +852,7 @@ impl Async<UdpSocket> {
/// # std::io::Result::Ok(()) });
/// ```
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.write_with(|io| io.recv(buf)).await
self.read_with(|io| io.recv(buf)).await
}
/// Receives a single datagram message from the connected peer without removing it from the

View File

@ -277,6 +277,19 @@ impl ReactorLock<'_> {
if ev.writable {
ready.append(&mut wakers.writers);
}
// Re-register if there are still writers or
// readers. The can happen if e.g. we were
// previously interested in both readability and
// writability, but only one of them was emitted.
if !(wakers.writers.is_empty() && wakers.readers.is_empty()) {
self.reactor.sys.reregister(
source.raw,
source.key,
!wakers.readers.is_empty(),
!wakers.writers.is_empty(),
)?;
}
}
}

View File

@ -2,7 +2,9 @@
use std::os::unix::net::{UnixDatagram, UnixListener, UnixStream};
use std::{
io,
net::{TcpListener, TcpStream, UdpSocket},
net::{Shutdown, TcpListener, TcpStream, UdpSocket},
sync::Arc,
time::Duration,
};
use futures::{AsyncReadExt, AsyncWriteExt, StreamExt};
@ -180,3 +182,58 @@ fn uds_send_to_recv_from() -> io::Result<()> {
Ok(())
})
}
// Test that we correctly re-register interests when we are previously
// interested in both readable and writable events and then we get only one of
// them. (we need to re-register interest on the other.)
#[test]
fn tcp_duplex() -> io::Result<()> {
smol::run(async {
let listener = Async::<TcpListener>::bind("127.0.0.1:0")?;
let stream0 =
Arc::new(Async::<TcpStream>::connect(listener.get_ref().local_addr()?).await?);
let stream1 = Arc::new(listener.accept().await?.0);
async fn do_read(s: Arc<Async<TcpStream>>) -> io::Result<()> {
let mut buf = vec![0u8; 4096];
loop {
let len = (&*s).read(&mut buf).await?;
if len == 0 {
return Ok(());
}
}
}
async fn do_write(s: Arc<Async<TcpStream>>) -> io::Result<()> {
let buf = vec![0u8; 4096];
for _ in 0..4096 {
(&*s).write_all(&buf).await?;
}
s.get_ref().shutdown(Shutdown::Write)?;
Ok(())
}
// Read from and write to stream0.
let r0 = Task::spawn(do_read(stream0.clone()));
let w0 = Task::spawn(do_write(stream0));
// Sleep a bit, so that reading and writing are both blocked.
smol::Timer::after(Duration::from_millis(5)).await;
// Start reading stream1, make stream0 writable.
let r1 = Task::spawn(do_read(stream1.clone()));
// Finish writing to stream0.
w0.await?;
r1.await?;
// Start writing to stream1, make stream0 readable.
let w1 = Task::spawn(do_write(stream1));
// Will r0 be correctly woken?
r0.await?;
w1.await?;
Ok(())
})
}