Merge pull request #188 from stjepang/simplify

Make smol truly live up to its name
This commit is contained in:
Stjepan Glavina 2020-07-15 14:46:39 +02:00 committed by GitHub
commit 80461663c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 427 additions and 5009 deletions

View File

@ -1,47 +0,0 @@
name: Cross compile
on:
push:
branches:
- master
pull_request:
jobs:
cross:
name: Cross compile
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, macos-latest]
steps:
- uses: actions/checkout@master
- name: Install nightly
uses: actions-rs/toolchain@v1
with:
toolchain: nightly
override: true
- name: Install docker
if: startsWith(matrix.os, 'ubuntu')
run: sudo apt install docker
- name: Install cross
run: cargo install cross
- name: Android
if: startsWith(matrix.os, 'ubuntu')
run: cross test --target arm-linux-androideabi
- name: NetBSD
if: startsWith(matrix.os, 'ubuntu')
run: cross build --target x86_64-unknown-netbsd
- name: FreeBSD
if: startsWith(matrix.os, 'ubuntu')
run: cross build --target x86_64-unknown-freebsd
- name: iOS
if: startsWith(matrix.os, 'macos')
run: cross build --target aarch64-apple-ios

View File

@ -11,7 +11,6 @@ documentation = "https://docs.rs/smol"
keywords = ["async", "await", "future", "io", "networking"]
categories = ["asynchronous", "concurrency", "network-programming"]
readme = "README.md"
autoexamples = false
[features]
# Optional feature for seamless integration with crates depending on tokio.
@ -25,17 +24,11 @@ autoexamples = false
tokio02 = ["tokio"]
[dependencies]
async-task = "3.0.0"
blocking = "0.4.6"
concurrent-queue = "1.1.1"
fastrand = "1.2.4"
futures-io = { version = "0.3.5", default-features = false, features = ["std"] }
futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] }
libc = "0.2.71"
async-io = "0.1.1"
multitask = "0.2.0"
num_cpus = "1.13.0"
once_cell = "1.4.0"
scoped-tls = "1.0.0"
slab = "0.4.2"
socket2 = { version = "0.3.12", features = ["pair", "unix"] }
blocking = "0.4.6"
[dependencies.tokio]
version = "0.2"
@ -43,23 +36,35 @@ default-features = false
features = ["rt-threaded"]
optional = true
[target.'cfg(windows)'.dependencies]
wepoll-sys-stjepang = "1.0.6"
winapi = { version = "0.3.8", features = ["ioapiset"] }
[dev-dependencies]
criterion = "0.3.2"
futures = { version = "0.3.5", default-features = false, features = ["std"] }
anyhow = "1.0.28"
async-channel = "1.1.1"
async-dup = "1.1.0"
async-h1 = "1.1.2"
async-native-tls = "0.3.3"
async-std = "1.5.0"
async-tungstenite = { version = "0.4.2", features = ["async-native-tls"] }
base64 = "0.12.0"
ctrlc = "3.1.4"
futures = "0.3.4"
http = "0.2.1"
http-types = "1.2.0"
hyper = { version = "0.13.5", default-features = false, features = ["stream"] }
native-tls = "0.2.4"
num_cpus = "1.13.0"
piper = "0.1.3"
reqwest = "0.10.4"
scraper = "0.11.0"
signal-hook = "0.1.13"
surf = { version = "2.0.0-alpha.1", default-features = false, features = ["h1-client"] }
tempfile = "3.1.0"
tokio = { version = "0.2", default-features = false }
tungstenite = "0.10.1"
url = "2.1.1"
[workspace]
members = [
".",
"examples",
]
[target.'cfg(target_os = "linux")'.dev-dependencies]
inotify = { version = "0.8.2", default-features = false }
nix = "0.17.0"
timerfd = "1.1.1"
[[bench]]
name = "spawn"
harness = false
[target.'cfg(windows)'.dev-dependencies]
uds_windows = "0.1.4"

View File

@ -11,12 +11,7 @@ https://docs.rs/smol)
[![Chat](https://img.shields.io/discord/701824908866617385.svg?logo=discord)](
https://discord.gg/x6m5Vvt)
A small and fast async runtime for Rust.
This runtime extends [the standard library][std] with async combinators
and is only 1500 lines of code long.
[std]: https://docs.rs/std
A small and fast executor.
Reading the [docs] or looking at the [examples] is a great way to start learning
async Rust.
@ -43,16 +38,6 @@ Read this [blog post](https://stjepang.github.io/2020/04/03/why-im-building-a-ne
* Tasks that support cancellation.
* Userspace timers.
## Examples
You need to be in the [examples] directory to run them:
```terminal
$ cd examples
$ ls
$ cargo run --example ctrl-c
```
## Compatibility
See [this example](./examples/other-runtimes.rs) for how to use smol with
@ -108,12 +93,6 @@ My personal crate recommendation list:
[ctrlc]: https://docs.rs/ctrlc
[signal-hook]: https://docs.rs/signal-hook
Crates useful with smol:
* Ergonomic async main wrapper that creates thread pools: [smol-potat]
[smol-potat]: https://github.com/wusyong/smol-potat
## TLS certificate
Some code examples are using TLS for authentication. The repository

View File

@ -1,21 +0,0 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use futures::future;
use smol::Task;
pub fn spawn_benchmark(c: &mut Criterion) {
std::thread::spawn(|| smol::run(future::pending::<()>()));
c.bench_function("spawn time", |b| {
b.iter(|| {
let x = black_box(5);
smol::block_on(async {
Task::spawn(async move {
let _ = x + 1;
})
.await;
});
})
});
}
criterion_group!(benches, spawn_benchmark);
criterion_main!(benches);

View File

@ -1,124 +0,0 @@
[package]
name = "smol-examples"
version = "0.0.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
publish = false
[target.'cfg(windows)'.dev-dependencies]
uds_windows = "0.1.4"
[target.'cfg(target_os = "linux")'.dev-dependencies]
inotify = { version = "0.8.2", default-features = false }
nix = "0.17.0"
timerfd = "1.1.1"
[dev-dependencies]
anyhow = "1.0.28"
async-h1 = "1.1.2"
async-native-tls = "0.3.3"
async-std = "1.5.0"
async-channel = "1.1.1"
async-tungstenite = { version = "0.4.2", features = ["async-native-tls"] }
base64 = "0.12.0"
ctrlc = "3.1.4"
futures = "0.3.4"
http = "0.2.1"
http-types = "1.2.0"
hyper = { version = "0.13.5", default-features = false, features = ["stream"] }
native-tls = "0.2.4"
num_cpus = "1.13.0"
piper = "0.1.1"
reqwest = "0.10.4"
scraper = "0.11.0"
signal-hook = "0.1.13"
smol = { path = "../", features = ["tokio02"] }
surf = { version = "2.0.0-alpha.1", default-features = false, features = ["h1-client"] }
tempfile = "3.1.0"
tokio = { version = "0.2", default-features = false }
tungstenite = "0.10.1"
url = "2.1.1"
[[example]]
name = "async-h1-client"
path = "async-h1-client.rs"
[[example]]
name = "async-h1-server"
path = "async-h1-server.rs"
[[example]]
name = "chat-client"
path = "chat-client.rs"
[[example]]
name = "chat-server"
path = "chat-server.rs"
[[example]]
name = "ctrl-c"
path = "ctrl-c.rs"
[[example]]
name = "hyper-client"
path = "hyper-client.rs"
[[example]]
name = "hyper-server"
path = "hyper-server.rs"
[[example]]
name = "linux-inotify"
path = "linux-inotify.rs"
[[example]]
name = "linux-timerfd"
path = "linux-timerfd.rs"
[[example]]
name = "other-runtimes"
path = "other-runtimes.rs"
[[example]]
name = "simple-client"
path = "simple-client.rs"
[[example]]
name = "simple-server"
path = "simple-server.rs"
[[example]]
name = "tcp-client"
path = "tcp-client.rs"
[[example]]
name = "tcp-server"
path = "tcp-server.rs"
[[example]]
name = "tls-client"
path = "tls-client.rs"
[[example]]
name = "tls-server"
path = "tls-server.rs"
[[example]]
name = "unix-signal"
path = "unix-signal.rs"
[[example]]
name = "web-crawler"
path = "web-crawler.rs"
[[example]]
name = "websocket-client"
path = "websocket-client.rs"
[[example]]
name = "websocket-server"
path = "websocket-server.rs"
[[example]]
name = "windows-uds"
path = "windows-uds.rs"

View File

@ -1,12 +0,0 @@
## smol examples
You need to be in this directory to run examples:
```
$ cd examples
$ ls
$ cargo run --example ctrl-c
```
If you've got an example you'd like to see here,
please feel free to open an issue or submit a PR!

View File

@ -3,16 +3,16 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example async-h1-client
//! ```
use std::net::TcpStream;
use std::net::{TcpStream, ToSocketAddrs};
use anyhow::{bail, Context as _, Error, Result};
use async_io::Async;
use blocking::{block_on, unblock};
use futures::prelude::*;
use http_types::{Method, Request, Response};
use smol::Async;
use url::Url;
/// Sends a request and fetches the response.
@ -25,8 +25,13 @@ async fn fetch(req: Request) -> Result<Response> {
.context("cannot guess port")?;
// Connect to the host.
let addr = format!("{}:{}", host, port);
let stream = Async::<TcpStream>::connect(addr).await?;
let socket_addr = {
let host = host.clone();
unblock!((host.as_str(), port).to_socket_addrs())?
.next()
.context("cannot resolve address")?
};
let stream = Async::<TcpStream>::connect(socket_addr).await?;
// Send the request and wait for the response.
let resp = match req.url().scheme() {
@ -42,7 +47,7 @@ async fn fetch(req: Request) -> Result<Response> {
}
fn main() -> Result<()> {
smol::run(async {
block_on(async {
// Create a request.
let addr = "https://www.rust-lang.org";
let req = Request::new(Method::Get, Url::parse(addr)?);

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example async-h1-server
//! ```
//!
@ -15,14 +14,14 @@
//! Refer to `README.md` to see how to the TLS certificate was generated.
use std::net::TcpListener;
use std::thread;
use anyhow::Result;
use async_io::Async;
use async_native_tls::{Identity, TlsAcceptor};
use blocking::block_on;
use futures::prelude::*;
use http_types::{Request, Response, StatusCode};
use piper::{Arc, Mutex};
use smol::{Async, Task};
use smol::Task;
/// Serves a request and returns a response.
async fn serve(req: Request) -> http_types::Result<Response> {
@ -51,7 +50,7 @@ async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Resul
// Spawn a background task serving this connection.
let task = match &tls {
None => {
let stream = Arc::new(stream);
let stream = async_dup::Arc::new(stream);
Task::spawn(async move {
if let Err(err) = async_h1::accept(&host, stream, serve).await {
println!("Connection error: {:#?}", err);
@ -62,7 +61,7 @@ async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Resul
// In case of HTTPS, establish a secure TLS connection first.
match tls.accept(stream).await {
Ok(stream) => {
let stream = Arc::new(Mutex::new(stream));
let stream = async_dup::Arc::new(async_dup::Mutex::new(stream));
Task::spawn(async move {
if let Err(err) = async_h1::accept(&host, stream, serve).await {
println!("Connection error: {:#?}", err);
@ -87,16 +86,13 @@ fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Create an executor thread pool.
let num_threads = num_cpus::get().max(1);
for _ in 0..num_threads {
thread::spawn(|| smol::run(future::pending::<()>()));
}
// Start HTTP and HTTPS servers.
smol::block_on(async {
let http = listen(Async::<TcpListener>::bind("127.0.0.1:8000")?, None);
let https = listen(Async::<TcpListener>::bind("127.0.0.1:8001")?, Some(tls));
block_on(async {
let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None);
let https = listen(
Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?,
Some(tls),
);
future::try_join(http, https).await?;
Ok(())
})

View File

@ -3,43 +3,41 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example chat-server
//! ```
//!
//! Then start clients:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example chat-client
//! ```
use std::net::TcpStream;
use async_io::Async;
use blocking::{block_on, Unblock};
use futures::io;
use futures::prelude::*;
use smol::Async;
fn main() -> io::Result<()> {
smol::run(async {
block_on(async {
// Connect to the server and create async stdin and stdout.
let stream = Async::<TcpStream>::connect("127.0.0.1:6000").await?;
let stdin = smol::reader(std::io::stdin());
let mut stdout = smol::writer(std::io::stdout());
let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 6000)).await?;
let stdin = Unblock::new(std::io::stdin());
let mut stdout = Unblock::new(std::io::stdout());
// Intro messages.
println!("Connected to {}", stream.get_ref().peer_addr()?);
println!("My nickname: {}", stream.get_ref().local_addr()?);
println!("Type a message and hit enter!\n");
// References to `Async<T>` also implement `AsyncRead` and `AsyncWrite`.
let stream_r = &stream;
let mut stream_w = &stream;
let reader = &stream;
let mut writer = &stream;
// Wait until the standard input is closed or the connection is closed.
futures::select! {
_ = io::copy(stdin, &mut stream_w).fuse() => println!("Quit!"),
_ = io::copy(stream_r, &mut stdout).fuse() => println!("Server disconnected!"),
_ = io::copy(stdin, &mut writer).fuse() => println!("Quit!"),
_ = io::copy(reader, &mut stdout).fuse() => println!("Server disconnected!"),
}
Ok(())

View File

@ -3,31 +3,30 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example chat-server
//! ```
//!
//! Then start clients:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example chat-client
//! ```
use std::collections::HashMap;
use std::net::{SocketAddr, TcpListener, TcpStream};
use async_channel::{bounded, Receiver, Sender};
use async_dup::Arc;
use async_io::Async;
use blocking::block_on;
use futures::io::{self, BufReader};
use futures::prelude::*;
use piper::{Arc, Receiver, Sender};
use smol::{Async, Task};
type Client = Arc<Async<TcpStream>>;
use smol::Task;
/// An event on the chat server.
enum Event {
/// A client has joined.
Join(SocketAddr, Client),
Join(SocketAddr, Arc<Async<TcpStream>>),
/// A client has left.
Leave(SocketAddr),
@ -39,10 +38,10 @@ enum Event {
/// Dispatches events to clients.
async fn dispatch(receiver: Receiver<Event>) -> io::Result<()> {
// Currently active clients.
let mut map = HashMap::<SocketAddr, Client>::new();
let mut map = HashMap::<SocketAddr, Arc<Async<TcpStream>>>::new();
// Receive incoming events.
while let Some(event) = receiver.recv().await {
while let Ok(event) = receiver.recv().await {
// Process the event and format a message to send to clients.
let output = match event {
Event::Join(addr, stream) => {
@ -69,29 +68,29 @@ async fn dispatch(receiver: Receiver<Event>) -> io::Result<()> {
}
/// Reads messages from the client and forwards them to the dispatcher task.
async fn read_messages(sender: Sender<Event>, client: Client) -> io::Result<()> {
async fn read_messages(sender: Sender<Event>, client: Arc<Async<TcpStream>>) -> io::Result<()> {
let addr = client.get_ref().peer_addr()?;
let mut lines = BufReader::new(client).lines();
while let Some(line) = lines.next().await {
let line = line?;
sender.send(Event::Message(addr, line)).await;
let _ = sender.send(Event::Message(addr, line)).await;
}
Ok(())
}
fn main() -> io::Result<()> {
smol::run(async {
block_on(async {
// Create a listener for incoming client connections.
let listener = Async::<TcpListener>::bind("127.0.0.1:6000")?;
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 6000))?;
// Intro messages.
println!("Listening on {}", listener.get_ref().local_addr()?);
println!("Start a chat client now!\n");
// Spawn a background task that dispatches events to clients.
let (sender, receiver) = piper::chan(100);
Task::spawn(dispatch(receiver)).unwrap().detach();
let (sender, receiver) = bounded(100);
Task::spawn(dispatch(receiver)).detach();
loop {
// Accept the next connection.
@ -102,13 +101,13 @@ fn main() -> io::Result<()> {
// Spawn a background task reading messages from the client.
Task::spawn(async move {
// Client starts with a `Join` event.
sender.send(Event::Join(addr, client.clone())).await;
let _ = sender.send(Event::Join(addr, client.clone())).await;
// Read messages from the client and ignore I/O errors when the client quits.
let _ = read_messages(sender.clone(), client).await;
// Client ends with a `Leave` event.
sender.send(Event::Leave(addr)).await;
let _ = sender.send(Event::Leave(addr)).await;
})
.detach();
}

View File

@ -3,25 +3,25 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example ctrl-c
//! ```
use blocking::block_on;
use futures::prelude::*;
fn main() {
// Set a handler that sends a message through a channel.
let (s, ctrl_c) = piper::chan(100);
let (s, ctrl_c) = async_channel::bounded(100);
let handle = move || {
let _ = s.send(()).now_or_never();
};
ctrlc::set_handler(handle).unwrap();
smol::run(async {
block_on(async {
println!("Waiting for Ctrl-C...");
// Receive a message that indicates the Ctrl-C signal occurred.
ctrl_c.recv().await;
let _ = ctrl_c.recv().await;
println!("Done!");
})

View File

@ -3,21 +3,23 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example hyper-client
//! ```
use std::io;
use std::net::{Shutdown, TcpStream};
use std::net::Shutdown;
use std::net::{TcpStream, ToSocketAddrs};
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::{bail, Context as _, Error, Result};
use async_io::Async;
use async_native_tls::TlsStream;
use blocking::{block_on, unblock};
use futures::prelude::*;
use http::Uri;
use hyper::{Body, Client, Request, Response};
use smol::{Async, Task};
use smol::Task;
/// Sends a request and fetches the response.
async fn fetch(req: Request<Body>) -> Result<Response<Body>> {
@ -29,7 +31,7 @@ async fn fetch(req: Request<Body>) -> Result<Response<Body>> {
}
fn main() -> Result<()> {
smol::run(async {
block_on(async {
// Create a request.
let req = Request::get("https://www.rust-lang.org").body(Body::empty())?;
@ -80,14 +82,26 @@ impl hyper::service::Service<Uri> for SmolConnector {
match uri.scheme_str() {
Some("http") => {
let addr = format!("{}:{}", uri.host().unwrap(), uri.port_u16().unwrap_or(80));
let stream = Async::<TcpStream>::connect(addr).await?;
let socket_addr = {
let host = host.to_string();
let port = uri.port_u16().unwrap_or(80);
unblock!((host.as_str(), port).to_socket_addrs())?
.next()
.context("cannot resolve address")?
};
let stream = Async::<TcpStream>::connect(socket_addr).await?;
Ok(SmolStream::Plain(stream))
}
Some("https") => {
// In case of HTTPS, establish a secure TLS connection first.
let addr = format!("{}:{}", uri.host().unwrap(), uri.port_u16().unwrap_or(443));
let stream = Async::<TcpStream>::connect(addr).await?;
let socket_addr = {
let host = host.to_string();
let port = uri.port_u16().unwrap_or(443);
unblock!((host.as_str(), port).to_socket_addrs())?
.next()
.context("cannot resolve address")?
};
let stream = Async::<TcpStream>::connect(socket_addr).await?;
let stream = async_native_tls::connect(host, stream).await?;
Ok(SmolStream::Tls(stream))
}

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example hyper-server
//! ```
//!
@ -15,17 +14,19 @@
//! Refer to `README.md` to see how to the TLS certificate was generated.
use std::io;
use std::net::{Shutdown, TcpListener, TcpStream};
use std::net::Shutdown;
use std::net::{TcpListener, TcpStream};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use anyhow::{Error, Result};
use async_io::Async;
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use blocking::block_on;
use futures::prelude::*;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use smol::{Async, Task};
use smol::Task;
/// Serves a request and returns a response.
async fn serve(req: Request<Body>, host: String) -> Result<Response<Body>> {
@ -59,15 +60,13 @@ fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Create an executor thread pool.
for _ in 0..num_cpus::get().max(1) {
thread::spawn(|| smol::run(future::pending::<()>()));
}
// Start HTTP and HTTPS servers.
smol::block_on(async {
let http = listen(Async::<TcpListener>::bind("127.0.0.1:8000")?, None);
let https = listen(Async::<TcpListener>::bind("127.0.0.1:8001")?, Some(tls));
block_on(async {
let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None);
let https = listen(
Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?,
Some(tls),
);
future::try_join(http, https).await?;
Ok(())
})

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example linux-inotify
//! ```
@ -12,8 +11,9 @@ fn main() -> std::io::Result<()> {
use std::ffi::OsString;
use std::io;
use async_io::Async;
use blocking::block_on;
use inotify::{EventMask, Inotify, WatchMask};
use smol::Async;
type Event = (OsString, EventMask);
@ -34,7 +34,7 @@ fn main() -> std::io::Result<()> {
}
}
smol::run(async {
block_on(async {
// Watch events in the current directory.
let mut inotify = Async::new(Inotify::init()?)?;
inotify.get_mut().add_watch(".", WatchMask::ALL_EVENTS)?;

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example linux-timerfd
//! ```
@ -13,7 +12,8 @@ fn main() -> std::io::Result<()> {
use std::os::unix::io::AsRawFd;
use std::time::{Duration, Instant};
use smol::Async;
use async_io::Async;
use blocking::block_on;
use timerfd::{SetTimeFlags, TimerFd, TimerState};
/// Converts a [`nix::Error`] into [`std::io::Error`].
@ -37,7 +37,7 @@ fn main() -> std::io::Result<()> {
Ok(())
}
smol::run(async {
block_on(async {
let start = Instant::now();
println!("Sleeping...");

View File

@ -13,16 +13,16 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example other-runtimes
//! ```
use std::time::{Duration, Instant};
use anyhow::{Error, Result};
use blocking::block_on;
fn main() -> Result<()> {
smol::run(async {
block_on(async {
// Sleep using async-std.
let start = Instant::now();
println!("Sleeping using async-std...");

View File

@ -6,11 +6,12 @@
//! cargo run --example simple-client
//! ```
use std::net::TcpStream;
use std::net::{TcpStream, ToSocketAddrs};
use anyhow::{bail, Context as _, Result};
use async_io::Async;
use blocking::{block_on, unblock};
use futures::prelude::*;
use smol::Async;
use url::Url;
/// Sends a GET request and fetches the response.
@ -32,7 +33,13 @@ async fn fetch(addr: &str) -> Result<Vec<u8>> {
);
// Connect to the host.
let mut stream = Async::<TcpStream>::connect(format!("{}:{}", host, port)).await?;
let socket_addr = {
let host = host.clone();
unblock!((host.as_str(), port).to_socket_addrs())?
.next()
.context("cannot resolve address")?
};
let mut stream = Async::<TcpStream>::connect(socket_addr).await?;
// Send the request and wait for the response.
let mut resp = Vec::new();
@ -54,7 +61,7 @@ async fn fetch(addr: &str) -> Result<Vec<u8>> {
}
fn main() -> Result<()> {
smol::run(async {
block_on(async {
let addr = "https://www.rust-lang.org";
let resp = fetch(addr).await?;
println!("{}", String::from_utf8_lossy(&resp));

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example simple-server
//! ```
//!
@ -15,12 +14,13 @@
//! Refer to `README.md` to see how to the TLS certificate was generated.
use std::net::{TcpListener, TcpStream};
use std::thread;
use anyhow::Result;
use async_io::Async;
use async_native_tls::{Identity, TlsAcceptor};
use blocking::block_on;
use futures::prelude::*;
use smol::{Async, Task};
use smol::Task;
const RESPONSE: &[u8] = br#"
HTTP/1.1 200 OK
@ -82,16 +82,13 @@ fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Create an executor thread pool.
let num_threads = num_cpus::get().max(1);
for _ in 0..num_threads {
thread::spawn(|| smol::run(future::pending::<()>()));
}
// Start HTTP and HTTPS servers.
smol::run(async {
let http = listen(Async::<TcpListener>::bind("127.0.0.1:8000")?, None);
let https = listen(Async::<TcpListener>::bind("127.0.0.1:8001")?, Some(tls));
block_on(async {
let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None);
let https = listen(
Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?,
Some(tls),
);
future::try_join(http, https).await?;
Ok(())
})

View File

@ -3,31 +3,30 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tcp-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tcp-client
//! ```
use std::net::TcpStream;
use async_io::Async;
use blocking::{block_on, Unblock};
use futures::io;
use futures::prelude::*;
use smol::Async;
fn main() -> io::Result<()> {
smol::run(async {
block_on(async {
// Create async stdin and stdout handles.
let stdin = smol::reader(std::io::stdin());
let mut stdout = smol::writer(std::io::stdout());
let stdin = Unblock::new(std::io::stdin());
let mut stdout = Unblock::new(std::io::stdout());
// Connect to the server.
let stream = Async::<TcpStream>::connect("127.0.0.1:7000").await?;
let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 7000)).await?;
println!("Connected to {}", stream.get_ref().peer_addr()?);
println!("Type a message and hit enter!\n");

View File

@ -3,21 +3,21 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tcp-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tcp-client
//! ```
use std::net::{TcpListener, TcpStream};
use async_io::Async;
use blocking::block_on;
use futures::io;
use smol::{Async, Task};
use smol::Task;
/// Echoes messages from the client back to it.
async fn echo(stream: Async<TcpStream>) -> io::Result<()> {
@ -26,9 +26,9 @@ async fn echo(stream: Async<TcpStream>) -> io::Result<()> {
}
fn main() -> io::Result<()> {
smol::run(async {
block_on(async {
// Create a listener.
let listener = Async::<TcpListener>::bind("127.0.0.1:7000")?;
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 7000))?;
println!("Listening on {}", listener.get_ref().local_addr()?);
println!("Now start a TCP client.");
@ -38,7 +38,7 @@ fn main() -> io::Result<()> {
println!("Accepted client: {}", peer_addr);
// Spawn a task that echoes messages from the client back to it.
Task::spawn(echo(stream)).unwrap().detach();
Task::spawn(echo(stream)).detach();
}
})
}

View File

@ -3,25 +3,23 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tls-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tls-client
//! ```
use std::net::TcpStream;
use anyhow::Result;
use async_io::Async;
use async_native_tls::{Certificate, TlsConnector};
use blocking::{block_on, Unblock};
use futures::io;
use futures::prelude::*;
use piper::Mutex;
use smol::Async;
fn main() -> Result<()> {
// Initialize TLS with the local certificate.
@ -29,19 +27,19 @@ fn main() -> Result<()> {
builder.add_root_certificate(Certificate::from_pem(include_bytes!("certificate.pem"))?);
let tls = TlsConnector::from(builder);
smol::run(async {
block_on(async {
// Create async stdin and stdout handles.
let stdin = smol::reader(std::io::stdin());
let mut stdout = smol::writer(std::io::stdout());
let stdin = Unblock::new(std::io::stdin());
let mut stdout = Unblock::new(std::io::stdout());
// Connect to the server.
let stream = Async::<TcpStream>::connect("127.0.0.1:7001").await?;
let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 7001)).await?;
let stream = tls.connect("127.0.0.1", stream).await?;
println!("Connected to {}", stream.get_ref().get_ref().peer_addr()?);
println!("Type a message and hit enter!\n");
// Pipe messages from stdin to the server and pipe messages from the server to stdout.
let stream = Mutex::new(stream);
let stream = async_dup::Mutex::new(stream);
future::try_join(
io::copy(stdin, &mut &stream),
io::copy(&stream, &mut stdout),

View File

@ -3,28 +3,27 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tls-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tls-client
//! ```
use std::net::{TcpListener, TcpStream};
use anyhow::Result;
use async_io::Async;
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use blocking::block_on;
use futures::io;
use piper::Mutex;
use smol::{Async, Task};
use smol::Task;
/// Echoes messages from the client back to it.
async fn echo(stream: TlsStream<Async<TcpStream>>) -> Result<()> {
let stream = Mutex::new(stream);
let stream = async_dup::Mutex::new(stream);
io::copy(&stream, &mut &stream).await?;
Ok(())
}
@ -34,9 +33,9 @@ fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
smol::run(async {
block_on(async {
// Create a listener.
let listener = Async::<TcpListener>::bind("127.0.0.1:7001")?;
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 7001))?;
println!("Listening on {}", listener.get_ref().local_addr()?);
println!("Now start a TLS client.");
@ -50,7 +49,7 @@ fn main() -> Result<()> {
);
// Spawn a task that echoes messages from the client back to it.
Task::spawn(echo(stream)).unwrap().detach();
Task::spawn(echo(stream)).detach();
}
})
}

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example unix-signal
//! ```
@ -11,10 +10,11 @@
fn main() -> std::io::Result<()> {
use std::os::unix::net::UnixStream;
use async_io::Async;
use blocking::block_on;
use futures::prelude::*;
use smol::Async;
smol::run(async {
block_on(async {
// Create a Unix stream that receives a byte on each signal occurrence.
let (a, mut b) = Async::<UnixStream>::pair()?;
signal_hook::pipe::register(signal_hook::SIGINT, a)?;

View File

@ -3,14 +3,14 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example web-crawler
//! ```
use std::collections::{HashSet, VecDeque};
use anyhow::Result;
use async_channel::Sender;
use async_channel::{bounded, Sender};
use blocking::block_on;
use scraper::{Html, Selector};
use smol::Task;
@ -20,7 +20,7 @@ const ROOT: &str = "https://www.rust-lang.org";
async fn fetch(url: String, sender: Sender<String>) {
let body = surf::get(&url).recv_string().await;
let body = body.unwrap_or_default();
sender.send(body).await.unwrap();
let _ = sender.send(body).await;
}
/// Extracts links from a HTML body.
@ -35,13 +35,13 @@ fn links(body: String) -> Vec<String> {
}
fn main() -> Result<()> {
smol::run(async {
block_on(async {
let mut seen = HashSet::new();
let mut queue = VecDeque::new();
seen.insert(ROOT.to_string());
queue.push_back(ROOT.to_string());
let (s, r) = async_channel::bounded(200);
let (s, r) = bounded(200);
let mut tasks = 0;
// Loop while the queue is not empty or tasks are fetching pages.

View File

@ -3,26 +3,25 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example websocket-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example websocket-client
//! ```
use std::net::TcpStream;
use std::net::{TcpStream, ToSocketAddrs};
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::{bail, Context as _, Result};
use async_io::Async;
use async_native_tls::{Certificate, TlsConnector, TlsStream};
use async_tungstenite::WebSocketStream;
use blocking::{block_on, unblock};
use futures::prelude::*;
use smol::Async;
use tungstenite::handshake::client::Response;
use tungstenite::Message;
use url::Url;
@ -34,16 +33,24 @@ async fn connect(addr: &str, tls: TlsConnector) -> Result<(WsStream, Response)>
let host = url.host_str().context("cannot parse host")?.to_string();
let port = url.port_or_known_default().context("cannot guess port")?;
// Resolve the address.
let socket_addr = {
let host = host.clone();
unblock!((host.as_str(), port).to_socket_addrs())?
.next()
.context("cannot resolve address")?
};
// Connect to the address.
match url.scheme() {
"ws" => {
let stream = Async::<TcpStream>::connect(format!("{}:{}", host, port)).await?;
let stream = Async::<TcpStream>::connect(socket_addr).await?;
let (stream, resp) = async_tungstenite::client_async(addr, stream).await?;
Ok((WsStream::Plain(stream), resp))
}
"wss" => {
// In case of WSS, establish a secure TLS connection first.
let stream = Async::<TcpStream>::connect(format!("{}:{}", host, port)).await?;
let stream = Async::<TcpStream>::connect(socket_addr).await?;
let stream = tls.connect(host, stream).await?;
let (stream, resp) = async_tungstenite::client_async(addr, stream).await?;
Ok((WsStream::Tls(stream), resp))
@ -58,7 +65,7 @@ fn main() -> Result<()> {
builder.add_root_certificate(Certificate::from_pem(include_bytes!("certificate.pem"))?);
let tls = TlsConnector::from(builder);
smol::run(async {
block_on(async {
// Connect to the server.
let (mut stream, resp) = connect("wss://127.0.0.1:9001", tls).await?;
dbg!(resp);

View File

@ -3,27 +3,26 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example websocket-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example websocket-client
//! ```
use std::net::{TcpListener, TcpStream};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use anyhow::{Context as _, Result};
use async_io::Async;
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use async_tungstenite::WebSocketStream;
use blocking::block_on;
use futures::prelude::*;
use smol::{Async, Task};
use smol::Task;
use tungstenite::Message;
/// Echoes messages from the client back to it.
@ -49,13 +48,13 @@ async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Resul
match &tls {
None => {
let stream = WsStream::Plain(async_tungstenite::accept_async(stream).await?);
Task::spawn(echo(stream)).unwrap().detach();
Task::spawn(echo(stream)).detach();
}
Some(tls) => {
// In case of WSS, establish a secure TLS connection first.
let stream = tls.accept(stream).await?;
let stream = WsStream::Tls(async_tungstenite::accept_async(stream).await?);
Task::spawn(echo(stream)).unwrap().detach();
Task::spawn(echo(stream)).detach();
}
}
}
@ -66,15 +65,13 @@ fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Create an executor thread pool.
for _ in 0..num_cpus::get().max(1) {
thread::spawn(|| smol::run(future::pending::<()>()));
}
// Start WS and WSS servers.
smol::block_on(async {
let ws = listen(Async::<TcpListener>::bind("127.0.0.1:9000")?, None);
let wss = listen(Async::<TcpListener>::bind("127.0.0.1:9001")?, Some(tls));
block_on(async {
let ws = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 9000))?, None);
let wss = listen(
Async::<TcpListener>::bind(([127, 0, 0, 1], 9001))?,
Some(tls),
);
future::try_join(ws, wss).await?;
Ok(())
})

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example windows-uds
//! ```
@ -11,9 +10,11 @@
fn main() -> std::io::Result<()> {
use std::path::PathBuf;
use async_io::Async;
use blocking::{block_on, Unblock};
use futures::io;
use futures::prelude::*;
use smol::{Async, Task};
use smol::Task;
use tempfile::tempdir;
use uds_windows::{UnixListener, UnixStream};
@ -23,7 +24,7 @@ fn main() -> std::io::Result<()> {
println!("Connected to {:?}", stream.get_ref().peer_addr()?);
// Pipe the stream to stdout.
let mut stdout = smol::writer(std::io::stdout());
let mut stdout = Unblock::new(std::io::stdout());
io::copy(&stream, &mut stdout).await?;
Ok(())
}
@ -31,7 +32,7 @@ fn main() -> std::io::Result<()> {
let dir = tempdir()?;
let path = dir.path().join("socket");
smol::run(async {
block_on(async {
// Create a listener.
let listener = Async::new(UnixListener::bind(&path)?)?;
println!("Listening on {:?}", listener.get_ref().local_addr()?);

File diff suppressed because it is too large Load Diff

View File

@ -1,45 +0,0 @@
//! Implementation of [`block_on()`].
//!
//! This is equivalent to [`futures::executor::block_on()`], but slightly more efficient.
//!
//! The implementation is explained in detail in [*Build your own block_on()*][blog-post].
//!
//! [`futures::executor::block_on()`]: https://docs.rs/futures/0.3/futures/executor/fn.block_on.html
//! [blog-post]: https://stjepang.github.io/2020/01/25/build-your-own-block-on.html
use std::future::Future;
use crate::context;
/// Blocks on a single future.
///
/// This function polls the future in a loop, parking the current thread after each step to wait
/// until its waker is woken.
///
/// Unlike [`run()`], it does not run executors or poll the reactor!
///
/// You can think of it as the easiest and most efficient way of turning an async operation into a
/// blocking operation.
///
/// # Examples
///
/// ```
/// use futures::future;
/// use smol::{Async, Timer};
/// use std::thread;
/// use std::time::Duration;
///
/// // Run executors and the reactor on a separeate thread, forever.
/// thread::spawn(|| smol::run(future::pending::<()>()));
///
/// smol::block_on(async {
/// // Sleep for a second.
/// // This timer only works because there's a thread calling `run()`.
/// Timer::after(Duration::from_secs(1)).await;
/// })
/// ```
///
/// [`run()`]: `crate::run()`
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
context::enter(|| blocking::block_on(future))
}

View File

@ -1,314 +0,0 @@
//! The blocking executor.
//!
//! Tasks created by [`Task::blocking()`] go into this executor. This executor is independent of
//! [`run()`][`crate::run()`] - it does not need to be driven.
//!
//! Blocking tasks are allowed to block without restrictions. However, the executor puts a limit on
//! the number of concurrently running tasks. Once that limit is hit, a task will need to complete
//! or yield in order for others to run.
//!
//! In idle state, this executor has no threads and consumes no resources. Once tasks are spawned,
//! new threads will get started, as many as is needed to keep up with the present amount of work.
//! When threads are idle, they wait for some time for new work to come in and shut down after a
//! certain timeout.
//!
//! This module also implements convenient adapters:
//!
//! - [`blocking!`] as syntax sugar around [`Task::blocking()`]
//! - [`iter()`] converts an [`Iterator`] into a [`Stream`]
//! - [`reader()`] converts a [`Read`] into an [`AsyncRead`]
//! - [`writer()`] converts a [`Write`] into an [`AsyncWrite`]
use std::collections::VecDeque;
use std::future::Future;
use std::io::{Read, Write};
use std::panic;
use std::sync::{Condvar, Mutex, MutexGuard};
use std::thread;
use std::time::Duration;
use futures_util::io::{AsyncRead, AsyncWrite};
use futures_util::stream::Stream;
use once_cell::sync::Lazy;
use crate::context;
use crate::task::{Runnable, Task};
/// The blocking executor.
pub(crate) struct BlockingExecutor {
/// The current state of the executor.
state: Mutex<State>,
/// Used to put idle threads to sleep and wake them up when new work comes in.
cvar: Condvar,
}
/// Current state of the blocking executor.
struct State {
/// Number of idle threads in the pool.
///
/// Idle threads are sleeping, waiting to get a task to run.
idle_count: usize,
/// Total number of thread in the pool.
///
/// This is the number of idle threads + the number of active threads.
thread_count: usize,
/// The queue of blocking tasks.
queue: VecDeque<Runnable>,
}
impl BlockingExecutor {
/// Returns a reference to the blocking executor.
pub fn get() -> &'static BlockingExecutor {
static EXECUTOR: Lazy<BlockingExecutor> = Lazy::new(|| BlockingExecutor {
state: Mutex::new(State {
idle_count: 0,
thread_count: 0,
queue: VecDeque::new(),
}),
cvar: Condvar::new(),
});
&EXECUTOR
}
/// Spawns a future onto this executor.
///
/// Returns a [`Task`] handle for the spawned task.
pub fn spawn<T: Send + 'static>(
&'static self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
// Create a task, schedule it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn(future, move |r| self.schedule(r), ());
runnable.schedule();
Task(Some(handle))
}
/// Runs the main loop on the current thread.
///
/// This function runs blocking tasks until it becomes idle and times out.
fn main_loop(&'static self) {
let mut state = self.state.lock().unwrap();
loop {
// This thread is not idle anymore because it's going to run tasks.
state.idle_count -= 1;
// Run tasks in the queue.
while let Some(runnable) = state.queue.pop_front() {
// We have found a task - grow the pool if needed.
self.grow_pool(state);
// Run the task.
let _ = panic::catch_unwind(|| runnable.run());
// Re-lock the state and continue.
state = self.state.lock().unwrap();
}
// This thread is now becoming idle.
state.idle_count += 1;
// Put the thread to sleep until another task is scheduled.
let timeout = Duration::from_millis(500);
let (s, res) = self.cvar.wait_timeout(state, timeout).unwrap();
state = s;
// If there are no tasks after a while, stop this thread.
if res.timed_out() && state.queue.is_empty() {
state.idle_count -= 1;
state.thread_count -= 1;
break;
}
}
}
/// Schedules a runnable task for execution.
fn schedule(&'static self, runnable: Runnable) {
let mut state = self.state.lock().unwrap();
state.queue.push_back(runnable);
// Notify a sleeping thread and spawn more threads if needed.
self.cvar.notify_one();
self.grow_pool(state);
}
/// Spawns more blocking threads if the pool is overloaded with work.
fn grow_pool(&'static self, mut state: MutexGuard<'static, State>) {
// If runnable tasks greatly outnumber idle threads and there aren't too many threads
// already, then be aggressive: wake all idle threads and spawn one more thread.
while state.queue.len() > state.idle_count * 5 && state.thread_count < 500 {
// The new thread starts in idle state.
state.idle_count += 1;
state.thread_count += 1;
// Notify all existing idle threads because we need to hurry up.
self.cvar.notify_all();
// Spawn the new thread.
thread::spawn(move || {
// If enabled, set up tokio before the main loop begins.
context::enter(|| self.main_loop())
});
}
}
}
/// Spawns blocking code onto a thread.
///
/// Note that `blocking!(expr)` is just syntax sugar for
/// `Task::blocking(async move { expr }).await`.
///
/// # Examples
///
/// Read a file into a string:
///
/// ```no_run
/// use smol::blocking;
/// use std::fs;
///
/// # smol::run(async {
/// let contents = blocking!(fs::read_to_string("file.txt"))?;
/// # std::io::Result::Ok(()) });
/// ```
///
/// Spawn a process:
///
/// ```no_run
/// use smol::blocking;
/// use std::process::Command;
///
/// # smol::run(async {
/// let out = blocking!(Command::new("dir").output())?;
/// # std::io::Result::Ok(()) });
/// ```
#[macro_export]
macro_rules! blocking {
($($expr:tt)*) => {
$crate::Task::blocking(async move { $($expr)* }).await
};
}
/// Creates a stream that iterates on a thread.
///
/// This adapter converts any kind of synchronous iterator into an asynchronous stream by running
/// it on the blocking executor and sending items back over a channel.
///
/// # Examples
///
/// List files in the current directory:
///
/// ```no_run
/// use futures::stream::StreamExt;
/// use smol::{blocking, iter};
/// use std::fs;
///
/// # smol::run(async {
/// // Load a directory.
/// let mut dir = blocking!(fs::read_dir("."))?;
/// let mut dir = iter(dir);
///
/// // Iterate over the contents of the directory.
/// while let Some(res) = dir.next().await {
/// println!("{}", res?.file_name().to_string_lossy());
/// }
/// # std::io::Result::Ok(()) });
/// ```
pub fn iter<T: Send + 'static>(
iter: impl Iterator<Item = T> + Send + 'static,
) -> impl Stream<Item = T> + Send + Unpin + 'static {
blocking::Unblock::new(iter)
}
/// Creates an async reader that runs on a thread.
///
/// This adapter converts any kind of synchronous reader into an asynchronous reader by running it
/// on the blocking executor and sending bytes back over a pipe.
///
/// # Examples
///
/// Read from a file:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::{blocking, reader};
/// use std::fs::File;
///
/// # smol::run(async {
/// // Open a file for reading.
/// let file = blocking!(File::open("foo.txt"))?;
/// let mut file = reader(file);
///
/// // Read the whole file.
/// let mut contents = Vec::new();
/// file.read_to_end(&mut contents).await?;
/// # std::io::Result::Ok(()) });
/// ```
///
/// Read output from a process:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::reader;
/// use std::process::{Command, Stdio};
///
/// # smol::run(async {
/// // Spawn a child process and make an async reader for its stdout.
/// let child = Command::new("dir").stdout(Stdio::piped()).spawn()?;
/// let mut child_stdout = reader(child.stdout.unwrap());
///
/// // Read the entire output.
/// let mut output = String::new();
/// child_stdout.read_to_string(&mut output).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn reader(reader: impl Read + Send + 'static) -> impl AsyncRead + Send + Unpin + 'static {
blocking::Unblock::new(reader)
}
/// Creates an async writer that runs on a thread.
///
/// This adapter converts any kind of synchronous writer into an asynchronous writer by running it
/// on the blocking executor and receiving bytes over a pipe.
///
/// **Note:** Don't forget to flush the writer at the end, or some written bytes might get lost!
///
/// # Examples
///
/// Write into a file:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::{blocking, writer};
/// use std::fs::File;
///
/// # smol::run(async {
/// // Open a file for writing.
/// let file = blocking!(File::open("foo.txt"))?;
/// let mut file = writer(file);
///
/// // Write some bytes into the file and flush.
/// file.write_all(b"hello").await?;
/// file.flush().await?;
/// # std::io::Result::Ok(()) });
/// ```
///
/// Write into standard output:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::writer;
///
/// # smol::run(async {
/// // Create an async writer to stdout.
/// let mut stdout = writer(std::io::stdout());
///
/// // Write a message and flush.
/// stdout.write_all(b"hello").await?;
/// stdout.flush().await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn writer(writer: impl Write + Send + 'static) -> impl AsyncWrite + Send + Unpin + 'static {
blocking::Unblock::new(writer)
}

View File

@ -1,36 +0,0 @@
//! Task context common to all executors.
//!
//! Before executor, we "enter" it by setting up some necessary thread-locals.
/// Enters the tokio context if the `tokio` feature is enabled.
pub(crate) fn enter<T>(f: impl FnOnce() -> T) -> T {
#[cfg(not(feature = "tokio02"))]
return f();
#[cfg(feature = "tokio02")]
{
use once_cell::sync::Lazy;
use std::cell::Cell;
use tokio::runtime::Runtime;
thread_local! {
/// The level of nested `enter` calls we are in, to ensure that the outer most always has a
/// runtime spawned.
static NESTING: Cell<usize> = Cell::new(0);
}
static RT: Lazy<Runtime> = Lazy::new(|| Runtime::new().expect("cannot initialize tokio"));
NESTING.with(|nesting| {
let res = if nesting.get() == 0 {
nesting.replace(1);
RT.enter(f)
} else {
nesting.replace(nesting.get() + 1);
f()
};
nesting.replace(nesting.get() - 1);
res
})
}
}

View File

@ -1,67 +1,61 @@
//! A small and fast async runtime.
//! A small and fast executor.
//!
//! # Executors
//! This crate runs a global executor thread pool and only has one type, [`Task`]. Despite the
//! trivially simple codebase, this executor and its related crates offer performance and features
//! comparable to more complex frameworks like [tokio].
//!
//! There are three executors that poll futures:
//! Related async crates:
//!
//! 1. Thread-local executor for tasks created by [`Task::local()`].
//! 2. Work-stealing executor for tasks created by [`Task::spawn()`].
//! 3. Blocking executor for tasks created by [`Task::blocking()`], [`blocking!`], [`iter()`],
//! [`reader()`] and [`writer()`].
//! * For async I/O and timers, use [`async-io`].
//! * For higher-level networking primitives, use [`async-net`].
//! * For executors, use [`multitask`].
//! * To call blocking code from async code or the other way around, use [`blocking`].
//!
//! Blocking executor is the only one that spawns threads on its own.
//! [`async-io`]: https://docs.rs/async-io
//! [`async-net`]: https://docs.rs/async-net
//! [`blocking`]: https://docs.rs/blocking
//! [`multitask`]: https://docs.rs/multitask
//! [`tokio`]: https://docs.rs/tokio
//!
//! See [here](fn.run.html#examples) for how to run executors on a single thread or on a thread
//! pool.
//! # TCP server
//!
//! # Reactor
//!
//! To wait for the next I/O event, the reactor calls [epoll] on Linux/Android, [kqueue] on
//! macOS/iOS/BSD, and [wepoll] on Windows.
//!
//! The [`Async`] type registers I/O handles in the reactor and is able to convert their blocking
//! operations into async operations.
//!
//! The [`Timer`] type registers timers in the reactor that will fire at the chosen points in
//! time.
//!
//! # Running
//!
//! Function [`run()`] simultaneously runs the thread-local executor, runs the work-stealing
//! executor, and polls the reactor for I/O events and timers. At least one thread has to be
//! calling [`run()`] in order for futures waiting on I/O and timers to get notified.
//!
//! If you want a multithreaded runtime, just call [`run()`] from multiple threads. See
//! [here](fn.run.html#examples) for an example.
//!
//! There is also [`block_on()`], which blocks the current thread until a future completes, but it
//! doesn't poll the reactor or run executors. When using [`block_on()`], make sure at least one
//! thread is calling [`run()`], or else I/O and timers will not work!
//!
//! Blocking tasks run in the background on a dedicated thread pool.
//!
//! # Examples
//!
//! Connect to a HTTP website, make a GET request, and pipe the response to the standard output:
//! A simple TCP server that prints messages received from clients:
//!
//! ```no_run
//! use futures::prelude::*;
//! use smol::Async;
//! use std::net::TcpStream;
//! use async_io::Async;
//! use blocking::{block_on, Unblock};
//! use smol::Task;
//! use std::net::TcpListener;
//!
//! fn main() -> std::io::Result<()> {
//! smol::run(async {
//! let mut stream = Async::<TcpStream>::connect("example.com:80").await?;
//! let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
//! stream.write_all(req).await?;
//! block_on(async {
//! // Start listening on port 9000.
//! let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 9000))?;
//!
//! let mut stdout = smol::writer(std::io::stdout());
//! futures::io::copy(&stream, &mut stdout).await?;
//! Ok(())
//! loop {
//! // Accept a new client.
//! let (stream, _) = listener.accept().await?;
//!
//! // Spawn a task handling this client.
//! let task = Task::spawn(async move {
//! // Create an async stdio handle.
//! let mut stdout = Unblock::new(std::io::stdout());
//!
//! // Copy data received from the client into stdout.
//! futures::io::copy(&stream, &mut stdout).await
//! });
//!
//! // Keep running the task in the background.
//! task.detach();
//! }
//! })
//! }
//! ```
//!
//! To interact with the server, run `nc 127.0.0.1 9000` and type a few lines of text.
//!
//! # Examples
//!
//! Look inside the [examples] directory for more:
//! a [web crawler][web-crawler],
//! a [Ctrl-C handler][ctrl-c],
@ -79,13 +73,10 @@
//! Finally, there's an [example][other-runtimes] showing how to use smol with
//! [async-std], [tokio], [surf], and [reqwest].
//!
//! [epoll]: https://en.wikipedia.org/wiki/Epoll
//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
//! [wepoll]: https://github.com/piscisaureus/wepoll
//!
//! [examples]: https://github.com/stjepang/smol/tree/master/examples
//! [examples]: https://github.com/stjepang/smol/tree/master/examples/!
//! [async-h1]: https://docs.rs/async-h1
//! [hyper]: https://docs.rs/hyper
//! [hyper]: https://docs.rs/tokio
//! [async-std]: https://docs.rs/async-std
//! [tokio]: https://docs.rs/tokio
//! [surf]: https://docs.rs/surf
@ -113,23 +104,179 @@
//! [websocket-client]: https://github.com/stjepang/smol/blob/master/examples/websocket-client.rs
//! [websocket-server]: https://github.com/stjepang/smol/blob/master/examples/websocket-server.rs
#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
mod async_io;
mod block_on;
mod blocking;
mod context;
mod multitask;
mod parking;
mod reactor;
mod run;
mod sys;
mod task;
mod timer;
use std::future::Future;
use std::panic::catch_unwind;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
pub use self::blocking::{iter, reader, writer};
pub use async_io::Async;
pub use block_on::block_on;
pub use run::run;
pub use task::Task;
pub use timer::Timer;
use multitask::Executor;
use once_cell::sync::Lazy;
/// A spawned future.
///
/// Tasks are also futures themselves and yield the output of the spawned future.
///
/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit
/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method.
///
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # blocking::block_on(async {
/// // Spawn a task onto the work-stealing executor.
/// let task = Task::spawn(async {
/// println!("Hello from a task!");
/// 1 + 2
/// });
///
/// // Wait for the task to complete.
/// assert_eq!(task.await, 3);
/// # });
/// ```
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
#[derive(Debug)]
pub struct Task<T>(multitask::Task<T>);
impl<T> Task<T> {
/// Spawns a future.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # blocking::block_on(async {
/// let task = Task::spawn(async { 1 + 2 });
/// assert_eq!(task.await, 3);
/// # });
/// ```
pub fn spawn<F>(future: F) -> Task<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
static EXECUTOR: Lazy<Executor> = Lazy::new(|| {
for _ in 0..num_cpus::get().max(1) {
thread::spawn(|| {
enter(|| {
let (p, u) = async_io::parking::pair();
let ticker = EXECUTOR.ticker(move || u.unpark());
loop {
if let Ok(false) = catch_unwind(|| ticker.tick()) {
p.park();
}
}
})
});
}
Executor::new()
});
Task(EXECUTOR.spawn(future))
}
/// Detaches the task to let it keep running in the background.
///
/// # Examples
///
/// ```no_run
/// use async_io::Timer;
/// use smol::Task;
/// use std::time::Duration;
///
/// # blocking::block_on(async {
/// Task::spawn(async {
/// loop {
/// println!("I'm a daemon task looping forever.");
/// Timer::new(Duration::from_secs(1)).await;
/// }
/// })
/// .detach();
/// # })
/// ```
pub fn detach(self) {
self.0.detach();
}
/// Cancels the task and waits for it to stop running.
///
/// Returns the task's output if it was completed just before it got canceled, or [`None`] if
/// it didn't complete.
///
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
/// canceling because it also waits for the task to stop running.
///
/// # Examples
///
/// ```
/// use async_io::Timer;
/// use smol::Task;
/// use std::time::Duration;
///
/// # blocking::block_on(async {
/// let task = Task::spawn(async {
/// loop {
/// println!("Even though I'm in an infinite loop, you can still cancel me!");
/// Timer::new(Duration::from_secs(1)).await;
/// }
/// });
///
/// Timer::new(Duration::from_secs(3)).await;
/// task.cancel().await;
/// # })
/// ```
pub async fn cancel(self) -> Option<T> {
self.0.cancel().await
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}
/// Enters the tokio context if the `tokio` feature is enabled.
fn enter<T>(f: impl FnOnce() -> T) -> T {
#[cfg(not(feature = "tokio02"))]
return f();
#[cfg(feature = "tokio02")]
{
use std::cell::Cell;
use tokio::runtime::Runtime;
thread_local! {
/// The level of nested `enter` calls we are in, to ensure that the outermost always
/// has a runtime spawned.
static NESTING: Cell<usize> = Cell::new(0);
}
/// The global tokio runtime.
static RT: Lazy<Runtime> = Lazy::new(|| Runtime::new().expect("cannot initialize tokio"));
NESTING.with(|nesting| {
let res = if nesting.get() == 0 {
nesting.replace(1);
RT.enter(f)
} else {
nesting.replace(nesting.get() + 1);
f()
};
nesting.replace(nesting.get() - 1);
res
})
}
}

View File

@ -1,508 +0,0 @@
use std::cell::Cell;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, ThreadId};
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use scoped_tls::scoped_thread_local;
use slab::Slab;
use crate::task::{Runnable, Task};
scoped_thread_local! {
static WORKER: Worker
}
/// State shared between [`Queue`] and [`Worker`].
struct Global {
/// The global queue.
queue: ConcurrentQueue<Runnable>,
/// Shards of the global queue created by workers.
shards: RwLock<Slab<Arc<ConcurrentQueue<Runnable>>>>,
/// Set to `true` when a sleeping worker is notified or no workers are sleeping.
notified: AtomicBool,
/// A list of sleeping workers.
sleepers: Mutex<Sleepers>,
}
impl Global {
/// Notifies a sleeping worker.
fn notify(&self) {
if !self
.notified
.compare_and_swap(false, true, Ordering::SeqCst)
{
let callback = self.sleepers.lock().unwrap().notify();
if let Some(cb) = callback {
cb.call();
}
}
}
}
/// A list of sleeping workers.
struct Sleepers {
/// Number of sleeping workers (both notified and unnotified).
count: usize,
/// Callbacks of sleeping unnotified workers.
///
/// A sleeping worker is notified when its callback is missing from this list.
callbacks: Vec<Callback>,
}
impl Sleepers {
/// Inserts a new sleeping worker.
fn insert(&mut self, callback: &Callback) {
self.count += 1;
self.callbacks.push(callback.clone());
}
/// Re-inserts a sleeping worker's callback if it was notified.
///
/// Returns `true` if the worker was notified.
fn update(&mut self, callback: &Callback) -> bool {
if self.callbacks.iter().all(|cb| cb != callback) {
self.callbacks.push(callback.clone());
true
} else {
false
}
}
/// Removes a previously inserted sleeping worker.
fn remove(&mut self, callback: &Callback) {
self.count -= 1;
for i in (0..self.callbacks.len()).rev() {
if &self.callbacks[i] == callback {
self.callbacks.remove(i);
return;
}
}
}
/// Returns `true` if a sleeping worker is notified or no workers are sleeping.
fn is_notified(&self) -> bool {
self.count == 0 || self.count > self.callbacks.len()
}
/// Returns notification callback for a sleeping worker.
///
/// If a worker was notified already or there are no workers, `None` will be returned.
fn notify(&mut self) -> Option<Callback> {
if self.callbacks.len() == self.count {
self.callbacks.pop()
} else {
None
}
}
}
/// A queue for spawning tasks.
pub(crate) struct Queue {
global: Arc<Global>,
}
impl Queue {
/// Creates a new queue for spawning tasks.
pub fn new() -> Queue {
Queue {
global: Arc::new(Global {
queue: ConcurrentQueue::unbounded(),
shards: RwLock::new(Slab::new()),
notified: AtomicBool::new(true),
sleepers: Mutex::new(Sleepers {
count: 0,
callbacks: Vec::new(),
}),
}),
}
}
/// Spawns a future onto this queue.
///
/// Returns a [`Task`] handle for the spawned task.
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
let global = self.global.clone();
// The function that schedules a runnable task when it gets woken up.
let schedule = move |runnable| {
if WORKER.is_set() {
WORKER.with(|w| {
if Arc::ptr_eq(&global, &w.global) {
if let Err(err) = w.shard.push(runnable) {
global.queue.push(err.into_inner()).unwrap();
}
} else {
global.queue.push(runnable).unwrap();
}
});
} else {
global.queue.push(runnable).unwrap();
}
global.notify();
};
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn(future, schedule, ());
runnable.schedule();
Task(Some(handle))
}
/// Registers a new worker.
///
/// The worker will automatically deregister itself when dropped.
pub fn worker(&self, notify: impl Fn() + Send + Sync + 'static) -> Worker {
let mut shards = self.global.shards.write().unwrap();
let vacant = shards.vacant_entry();
// Create a worker and put its stealer handle into the executor.
let worker = Worker {
key: vacant.key(),
global: Arc::new(self.global.clone()),
shard: SlotQueue {
slot: Cell::new(None),
queue: Arc::new(ConcurrentQueue::bounded(512)),
},
local: SlotQueue {
slot: Cell::new(None),
queue: Arc::new(ConcurrentQueue::unbounded()),
},
callback: Callback::new(notify),
sleeping: Cell::new(false),
ticker: Cell::new(0),
};
vacant.insert(worker.shard.queue.clone());
worker
}
}
impl Default for Queue {
fn default() -> Queue {
Queue::new()
}
}
/// A worker that participates in the work-stealing executor.
///
/// Each invocation of `run()` creates its own worker.
pub(crate) struct Worker {
/// The ID of this worker obtained during registration.
key: usize,
/// The global queue.
global: Arc<Arc<Global>>,
/// A shard of the global queue.
shard: SlotQueue<Runnable>,
/// Local queue for `!Send` tasks.
local: SlotQueue<Runnable>,
/// Callback invoked to wake this worker up.
callback: Callback,
/// Set to `true` when in sleeping state.
///
/// States a worker can be in:
/// 1) Woken.
/// 2a) Sleeping and unnotified.
/// 2b) Sleeping and notified.
sleeping: Cell<bool>,
/// Bumped every time a task is run.
ticker: Cell<usize>,
}
impl Worker {
/// Spawns a local future onto this executor.
///
/// Returns a [`Task`] handle for the spawned task.
pub fn spawn_local<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
let queue = self.local.queue.clone();
let callback = self.callback.clone();
let id = thread_id();
// The function that schedules a runnable task when it gets woken up.
let schedule = move |runnable| {
if thread_id() == id && WORKER.is_set() {
WORKER.with(|w| {
if Arc::ptr_eq(&queue, &w.local.queue) {
w.local.push(runnable).unwrap();
} else {
queue.push(runnable).unwrap();
}
});
} else {
queue.push(runnable).unwrap();
}
callback.call();
};
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn_local(future, schedule, ());
runnable.schedule();
Task(Some(handle))
}
/// Moves the worker into sleeping and unnotified state.
///
/// Returns `true` if the worker was already sleeping and unnotified.
fn sleep(&self) -> bool {
let mut sleepers = self.global.sleepers.lock().unwrap();
if self.sleeping.get() {
// Already sleeping, check if notified.
if !sleepers.update(&self.callback) {
return false;
}
} else {
// Move to sleeping state.
sleepers.insert(&self.callback);
}
self.global
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
self.sleeping.set(true);
true
}
/// Moves the worker into woken state.
fn wake(&self) -> bool {
if self.sleeping.get() {
let mut sleepers = self.global.sleepers.lock().unwrap();
sleepers.remove(&self.callback);
self.global
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
}
self.sleeping.replace(false)
}
/// Runs a single task and returns `true` if one was found.
pub fn tick(&self) -> bool {
loop {
match self.search() {
None => {
// Move to sleeping and unnotified state.
if !self.sleep() {
// If already sleeping and unnotified, return.
return false;
}
}
Some(r) => {
// Wake up.
if !self.wake() {
// If already woken, notify another worker.
self.global.notify();
}
// Bump the ticker.
let ticker = self.ticker.get();
self.ticker.set(ticker.wrapping_add(1));
// Flush slots to ensure fair task scheduling.
if ticker % 16 == 0 {
if let Err(err) = self.shard.flush() {
self.global.queue.push(err.into_inner()).unwrap();
self.global.notify();
}
self.local.flush().unwrap();
}
// Steal tasks from the global queue to ensure fair task scheduling.
if ticker % 64 == 0 {
self.shard.steal(&self.global.queue);
}
// Run the task.
if WORKER.set(self, || r.run()) {
// The task was woken while it was running, which means it got
// scheduled the moment running completed. Therefore, it is now inside
// the slot and would be the next task to run.
//
// Instead of re-running the task in the next iteration, let's flush
// the slot in order to give other tasks a chance to run.
//
// This is a necessary step to ensure task yielding works as expected.
// If a task wakes itself and returns `Poll::Pending`, we don't want it
// to run immediately after that because that'd defeat the whole
// purpose of yielding.
if let Err(err) = self.shard.flush() {
self.global.queue.push(err.into_inner()).unwrap();
self.global.notify();
}
self.local.flush().unwrap();
}
return true;
}
}
}
}
/// Finds the next task to run.
fn search(&self) -> Option<Runnable> {
if self.ticker.get() % 2 == 0 {
// On even ticks, look into the local queue and then into the shard.
if let Ok(r) = self.local.pop().or_else(|_| self.shard.pop()) {
return Some(r);
}
} else {
// On odd ticks, look into the shard and then into the local queue.
if let Ok(r) = self.shard.pop().or_else(|_| self.local.pop()) {
return Some(r);
}
}
// Try stealing from the global queue.
self.shard.steal(&self.global.queue);
if let Ok(r) = self.shard.pop() {
return Some(r);
}
// Try stealing from other shards.
let shards = self.global.shards.read().unwrap();
// Pick a random starting point in the iterator list and rotate the list.
let n = shards.len();
let start = fastrand::usize(..n);
let iter = shards.iter().chain(shards.iter()).skip(start).take(n);
// Remove this worker's shard.
let iter = iter.filter(|(key, _)| *key != self.key);
let iter = iter.map(|(_, shard)| shard);
// Try stealing from each shard in the list.
for shard in iter {
self.shard.steal(shard);
if let Ok(r) = self.shard.pop() {
return Some(r);
}
}
None
}
}
impl Drop for Worker {
fn drop(&mut self) {
// Wake and unregister the worker.
self.wake();
self.global.shards.write().unwrap().remove(self.key);
// Re-schedule remaining tasks in the shard.
while let Ok(r) = self.shard.pop() {
r.schedule();
}
// Notify another worker to start searching for tasks.
self.global.notify();
// TODO(stjepang): Close the local queue and empty it.
}
}
/// A queue with a single-item slot in front of it.
struct SlotQueue<T> {
slot: Cell<Option<T>>,
queue: Arc<ConcurrentQueue<T>>,
}
impl<T> SlotQueue<T> {
/// Pushes an item into the slot, overflowing the old item into the queue.
fn push(&self, t: T) -> Result<(), PushError<T>> {
match self.slot.replace(Some(t)) {
None => Ok(()),
Some(t) => self.queue.push(t),
}
}
/// Pops an item from the slot, or queue if the slot is empty.
fn pop(&self) -> Result<T, PopError> {
match self.slot.take() {
None => self.queue.pop(),
Some(t) => Ok(t),
}
}
/// Flushes the slot into the queue.
fn flush(&self) -> Result<(), PushError<T>> {
match self.slot.take() {
None => Ok(()),
Some(t) => self.queue.push(t),
}
}
/// Steals some items from another queue.
fn steal(&self, from: &ConcurrentQueue<T>) {
// Flush the slot before stealing.
if let Err(err) = self.flush() {
self.slot.set(Some(err.into_inner()));
return;
}
// Half of `from`'s length rounded up.
let mut count = (from.len() + 1) / 2;
if count > 0 {
// Don't steal more than fits into the queue.
if let Some(cap) = self.queue.capacity() {
count = count.min(cap - self.queue.len());
}
// Steal tasks.
for _ in 0..count {
if let Ok(t) = from.pop() {
assert!(self.queue.push(t).is_ok());
} else {
break;
}
}
}
}
}
/// Same as `std::thread::current().id()`, but more efficient.
fn thread_id() -> ThreadId {
thread_local! {
static ID: ThreadId = thread::current().id();
}
ID.try_with(|id| *id)
.unwrap_or_else(|_| thread::current().id())
}
#[derive(Clone)]
struct Callback(Arc<Box<dyn Fn() + Send + Sync>>);
impl Callback {
fn new(f: impl Fn() + Send + Sync + 'static) -> Callback {
Callback(Arc::new(Box::new(f)))
}
fn call(&self) {
(self.0)();
}
}
impl PartialEq for Callback {
fn eq(&self, other: &Callback) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}
impl Eq for Callback {}

View File

@ -1,268 +0,0 @@
use std::cell::Cell;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
use once_cell::sync::Lazy;
use slab::Slab;
use crate::reactor::Reactor;
static REGISTRY: Lazy<Mutex<Slab<Unparker>>> = Lazy::new(|| Mutex::new(Slab::new()));
/// Parks a thread.
pub(crate) struct Parker {
key: Cell<Option<usize>>,
unparker: Unparker,
}
impl Parker {
/// Creates a new [`Parker`].
pub fn new() -> Parker {
Parker {
key: Cell::new(None),
unparker: Unparker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
lock: Mutex::new(()),
cvar: Condvar::new(),
}),
},
}
}
/// Blocks the current thread until the token is made available.
pub fn park(&self) {
self.register();
self.unparker.inner.park(None);
}
/// Blocks the current thread until the token is made available or the timeout is reached.
pub fn park_timeout(&self, timeout: Duration) -> bool {
self.register();
self.unparker.inner.park(Some(timeout))
}
// /// Blocks the current thread until the token is made available or the deadline is reached.
// pub fn park_deadline(&self, deadline: Instant) -> bool {
// self.register();
// self.unparker
// .inner
// .park(Some(deadline.saturating_duration_since(Instant::now())))
// }
//
// /// Atomically makes the token available if it is not already.
// pub fn unpark(&self) {
// self.unparker.unpark()
// }
/// Returns a handle for unparking.
pub fn unparker(&self) -> Unparker {
self.unparker.clone()
}
fn register(&self) {
if self.key.get().is_none() {
let mut reg = REGISTRY.lock().unwrap();
let key = reg.insert(self.unparker.clone());
self.key.set(Some(key));
}
}
fn unregister(&self) {
if let Some(key) = self.key.take() {
let mut reg = REGISTRY.lock().unwrap();
reg.remove(key);
// Notify another parker to make sure the reactor keeps getting polled.
if let Some((_, u)) = reg.iter().next() {
u.unpark();
}
}
}
}
impl Drop for Parker {
fn drop(&mut self) {
self.unregister();
}
}
impl fmt::Debug for Parker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Parker { .. }")
}
}
/// Unparks a thread.
pub(crate) struct Unparker {
inner: Arc<Inner>,
}
impl Unparker {
/// Atomically makes the token available if it is not already.
pub fn unpark(&self) {
self.inner.unpark()
}
}
impl fmt::Debug for Unparker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Unparker { .. }")
}
}
impl Clone for Unparker {
fn clone(&self) -> Unparker {
Unparker {
inner: self.inner.clone(),
}
}
}
const EMPTY: usize = 0;
const PARKED: usize = 1;
const POLLING: usize = 2;
const NOTIFIED: usize = 3;
struct Inner {
state: AtomicUsize,
lock: Mutex<()>,
cvar: Condvar,
}
impl Inner {
fn park(&self, timeout: Option<Duration>) -> bool {
// If we were previously notified then we consume this notification and return quickly.
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
// Process available I/O events.
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
reactor_lock
.react(Some(Duration::from_secs(0)))
.expect("failure while polling I/O");
}
return true;
}
// If the timeout is zero, then there is no need to actually block.
if let Some(dur) = timeout {
if dur == Duration::from_millis(0) {
// Process available I/O events.
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
reactor_lock
.react(Some(Duration::from_secs(0)))
.expect("failure while polling I/O");
}
return false;
}
}
// Otherwise we need to coordinate going to sleep.
let mut reactor_lock = Reactor::get().try_lock();
let state = match reactor_lock {
None => PARKED,
Some(_) => POLLING,
};
let mut m = self.lock.lock().unwrap();
match self.state.compare_exchange(EMPTY, state, SeqCst, SeqCst) {
Ok(_) => {}
// Consume this notification to avoid spurious wakeups in the next park.
Err(NOTIFIED) => {
// We must read `state` here, even though we know it will be `NOTIFIED`. This is
// because `unpark` may have been called again since we read `NOTIFIED` in the
// `compare_exchange` above. We must perform an acquire operation that synchronizes
// with that `unpark` to observe any writes it made before the call to `unpark`. To
// do that we must read from the write it made to `state`.
let old = self.state.swap(EMPTY, SeqCst);
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return true;
}
Err(n) => panic!("inconsistent park_timeout state: {}", n),
}
match timeout {
None => {
loop {
// Block the current thread on the conditional variable.
match &mut reactor_lock {
None => m = self.cvar.wait(m).unwrap(),
Some(reactor_lock) => {
drop(m);
reactor_lock.react(None).expect("failure while polling I/O");
m = self.lock.lock().unwrap();
}
}
match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
Ok(_) => return true, // got a notification
Err(_) => {} // spurious wakeup, go back to sleep
}
}
}
Some(timeout) => {
// Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
// notification we just want to unconditionally set `state` back to `EMPTY`, either
// consuming a notification or un-flagging ourselves as parked.
let _m = match reactor_lock.as_mut() {
None => self.cvar.wait_timeout(m, timeout).unwrap().0,
Some(reactor_lock) => {
drop(m);
let deadline = Instant::now() + timeout;
loop {
reactor_lock
.react(Some(deadline.saturating_duration_since(Instant::now())))
.expect("failure while polling I/O");
if Instant::now() >= deadline {
break;
}
}
self.lock.lock().unwrap()
}
};
match self.state.swap(EMPTY, SeqCst) {
NOTIFIED => true, // got a notification
PARKED | POLLING => false, // no notification
n => panic!("inconsistent park_timeout state: {}", n),
}
}
}
}
pub fn unpark(&self) {
// To ensure the unparked thread will observe any writes we made before this call, we must
// perform a release operation that `park` can synchronize with. To do that we must write
// `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
// than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
let state = match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => return, // no one was waiting
NOTIFIED => return, // already unparked
state => state, // gotta go wake someone up
};
// There is a period between when the parked thread sets `state` to `PARKED` (or last
// checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
// If we were to notify during this period it would be ignored and then when the parked
// thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
// stage so we can acquire `lock` to wait until it is ready to receive the notification.
//
// Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
// it doesn't get woken only to have to wait for us to release `lock`.
drop(self.lock.lock().unwrap());
if state == PARKED {
self.cvar.notify_one();
} else {
Reactor::get().notify();
}
}
}

View File

@ -1,881 +0,0 @@
//! The reactor notifying [`Async`][`crate::Async`] and [`Timer`][`crate::Timer`].
//!
//! There is a single global reactor that contains all registered I/O handles and timers. The
//! reactor is polled by the executor, i.e. the [`run()`][`crate::run()`] function.
#[cfg(not(any(
target_os = "linux", // epoll
target_os = "android", // epoll
target_os = "illumos", // epoll
target_os = "macos", // kqueue
target_os = "ios", // kqueue
target_os = "freebsd", // kqueue
target_os = "netbsd", // kqueue
target_os = "openbsd", // kqueue
target_os = "dragonfly", // kqueue
target_os = "windows", // wepoll
)))]
compile_error!("reactor does not support this target OS");
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::io;
use std::mem;
#[cfg(unix)]
use std::os::unix::io::RawFd;
#[cfg(windows)]
use std::os::windows::io::RawSocket;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Poll, Waker};
use std::time::{Duration, Instant};
use concurrent_queue::ConcurrentQueue;
use futures_util::future;
use once_cell::sync::Lazy;
use slab::Slab;
/// The reactor.
///
/// Every async I/O handle and every timer is registered here. Invocations of
/// [`run()`][`crate::run()`] poll the reactor to check for new events every now and then.
///
/// There is only one global instance of this type, accessible by [`Reactor::get()`].
pub(crate) struct Reactor {
/// Raw bindings to epoll/kqueue/wepoll.
sys: sys::Reactor,
/// Ticker bumped before polling.
ticker: AtomicUsize,
/// Registered sources.
sources: Mutex<Slab<Arc<Source>>>,
/// Temporary storage for I/O events when polling the reactor.
events: Mutex<sys::Events>,
/// An ordered map of registered timers.
///
/// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to
/// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the
/// timer.
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
/// A queue of timer operations (insert and remove).
///
/// When inserting or removing a timer, we don't process it immediately - we just push it into
/// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
timer_ops: ConcurrentQueue<TimerOp>,
}
impl Reactor {
/// Returns a reference to the reactor.
pub fn get() -> &'static Reactor {
static REACTOR: Lazy<Reactor> = Lazy::new(|| Reactor {
sys: sys::Reactor::new().expect("cannot initialize I/O event notification"),
ticker: AtomicUsize::new(0),
sources: Mutex::new(Slab::new()),
events: Mutex::new(sys::Events::new()),
timers: Mutex::new(BTreeMap::new()),
timer_ops: ConcurrentQueue::bounded(1000),
});
&REACTOR
}
/// Notifies the thread blocked on the reactor.
pub fn notify(&self) {
self.sys.notify().expect("failed to notify reactor");
}
/// Registers an I/O source in the reactor.
pub fn insert_io(
&self,
#[cfg(unix)] raw: RawFd,
#[cfg(windows)] raw: RawSocket,
) -> io::Result<Arc<Source>> {
let mut sources = self.sources.lock().unwrap();
let vacant = sources.vacant_entry();
// Create a source and register it.
let key = vacant.key();
self.sys.register(raw, key)?;
let source = Arc::new(Source {
raw,
key,
wakers: Mutex::new(Wakers {
tick_readable: 0,
tick_writable: 0,
readers: Vec::new(),
writers: Vec::new(),
}),
});
Ok(vacant.insert(source).clone())
}
/// Deregisters an I/O source from the reactor.
pub fn remove_io(&self, source: &Source) -> io::Result<()> {
let mut sources = self.sources.lock().unwrap();
sources.remove(source.key);
self.sys.deregister(source.raw)
}
/// Registers a timer in the reactor.
///
/// Returns the inserted timer's ID.
pub fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
// Generate a new timer ID.
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
// Push an insert operation.
while self
.timer_ops
.push(TimerOp::Insert(when, id, waker.clone()))
.is_err()
{
// Fire timers to drain the queue.
self.fire_timers();
}
// Notify that a timer was added.
self.notify();
id
}
/// Deregisters a timer from the reactor.
pub fn remove_timer(&self, when: Instant, id: usize) {
// Push a remove operation.
while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
// Fire timers to drain the queue.
self.fire_timers();
}
}
/// Attempts to lock the reactor.
pub fn try_lock(&self) -> Option<ReactorLock<'_>> {
self.events.try_lock().ok().map(|events| {
let reactor = self;
ReactorLock { reactor, events }
})
}
/// Fires ready timers.
///
/// Returns the duration until the next timer before this method was called.
fn fire_timers(&self) -> Option<Duration> {
let mut timers = self.timers.lock().unwrap();
// Process timer operations, but no more than the queue capacity because otherwise we could
// keep popping operations forever.
for _ in 0..self.timer_ops.capacity().unwrap() {
match self.timer_ops.pop() {
Ok(TimerOp::Insert(when, id, waker)) => {
timers.insert((when, id), waker);
}
Ok(TimerOp::Remove(when, id)) => {
timers.remove(&(when, id));
}
Err(_) => break,
}
}
let now = Instant::now();
// Split timers into ready and pending timers.
let pending = timers.split_off(&(now, 0));
let ready = mem::replace(&mut *timers, pending);
// Calculate the duration until the next event.
let dur = if ready.is_empty() {
// Duration until the next timer.
timers
.keys()
.next()
.map(|(when, _)| when.saturating_duration_since(now))
} else {
// Timers are about to fire right now.
Some(Duration::from_secs(0))
};
// Drop the lock before waking.
drop(timers);
// Wake up tasks waiting on timers.
for (_, waker) in ready {
waker.wake();
}
dur
}
}
/// A lock on the reactor.
pub(crate) struct ReactorLock<'a> {
reactor: &'a Reactor,
events: MutexGuard<'a, sys::Events>,
}
impl ReactorLock<'_> {
/// Processes new events, blocking until the first event or the timeout.
pub fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
// Fire timers.
let next_timer = self.reactor.fire_timers();
// compute the timeout for blocking on I/O events.
let timeout = match (next_timer, timeout) {
(None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.min(b)),
};
// Bump the ticker before polling I/O.
let tick = self
.reactor
.ticker
.fetch_add(1, Ordering::SeqCst)
.wrapping_add(1);
// Block on I/O events.
match self.reactor.sys.wait(&mut self.events, timeout) {
// No I/O events occurred.
Ok(0) => {
if timeout != Some(Duration::from_secs(0)) {
// The non-zero timeout was hit so fire ready timers.
self.reactor.fire_timers();
}
Ok(())
}
// At least one I/O event occurred.
Ok(_) => {
// Iterate over sources in the event list.
let sources = self.reactor.sources.lock().unwrap();
let mut ready = Vec::new();
for ev in self.events.iter() {
// Check if there is a source in the table with this key.
if let Some(source) = sources.get(ev.key) {
let mut wakers = source.wakers.lock().unwrap();
// Wake readers if a readability event was emitted.
if ev.readable {
wakers.tick_readable = tick;
ready.append(&mut wakers.readers);
}
// Wake writers if a writability event was emitted.
if ev.writable {
wakers.tick_writable = tick;
ready.append(&mut wakers.writers);
}
// Re-register if there are still writers or
// readers. The can happen if e.g. we were
// previously interested in both readability and
// writability, but only one of them was emitted.
if !(wakers.writers.is_empty() && wakers.readers.is_empty()) {
self.reactor.sys.reregister(
source.raw,
source.key,
!wakers.readers.is_empty(),
!wakers.writers.is_empty(),
)?;
}
}
}
// Drop the lock before waking.
drop(sources);
// Wake up tasks waiting on I/O.
for waker in ready {
waker.wake();
}
Ok(())
}
// The syscall was interrupted.
Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
// An actual error occureed.
Err(err) => Err(err),
}
}
}
/// A single timer operation.
enum TimerOp {
Insert(Instant, usize, Waker),
Remove(Instant, usize),
}
/// A registered source of I/O events.
#[derive(Debug)]
pub(crate) struct Source {
/// Raw file descriptor on Unix platforms.
#[cfg(unix)]
pub(crate) raw: RawFd,
/// Raw socket handle on Windows.
#[cfg(windows)]
pub(crate) raw: RawSocket,
/// The key of this source obtained during registration.
key: usize,
/// Tasks interested in events on this source.
wakers: Mutex<Wakers>,
}
/// Tasks interested in events on a source.
#[derive(Debug)]
struct Wakers {
/// Last reactor tick that delivered a readability event.
tick_readable: usize,
/// Last reactor tick that delivered a writability event.
tick_writable: usize,
/// Tasks waiting for the next readability event.
readers: Vec<Waker>,
/// Tasks waiting for the next writability event.
writers: Vec<Waker>,
}
impl Source {
/// Waits until the I/O source is readable.
pub(crate) async fn readable(&self) -> io::Result<()> {
let mut ticks = None;
future::poll_fn(|cx| {
let mut wakers = self.wakers.lock().unwrap();
// Check if the reactor has delivered a readability event.
if let Some((a, b)) = ticks {
// If `tick_readable` has changed to a value other than the old reactor tick, that
// means a newer reactor tick has delivered a readability event.
if wakers.tick_readable != a && wakers.tick_readable != b {
return Poll::Ready(Ok(()));
}
}
// If there are no other readers, re-register in the reactor.
if wakers.readers.is_empty() {
Reactor::get().sys.reregister(
self.raw,
self.key,
true,
!wakers.writers.is_empty(),
)?;
}
// Register the current task's waker if not present already.
if wakers.readers.iter().all(|w| !w.will_wake(cx.waker())) {
wakers.readers.push(cx.waker().clone());
}
// Remember the current ticks.
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
wakers.tick_readable,
));
}
Poll::Pending
})
.await
}
/// Waits until the I/O source is writable.
pub(crate) async fn writable(&self) -> io::Result<()> {
let mut ticks = None;
future::poll_fn(|cx| {
let mut wakers = self.wakers.lock().unwrap();
// Check if the reactor has delivered a writability event.
if let Some((a, b)) = ticks {
// If `tick_writable` has changed to a value other than the old reactor tick, that
// means a newer reactor tick has delivered a writability event.
if wakers.tick_writable != a && wakers.tick_writable != b {
return Poll::Ready(Ok(()));
}
}
// If there are no other writers, re-register in the reactor.
if wakers.writers.is_empty() {
Reactor::get().sys.reregister(
self.raw,
self.key,
!wakers.readers.is_empty(),
true,
)?;
}
// Register the current task's waker if not present already.
if wakers.writers.iter().all(|w| !w.will_wake(cx.waker())) {
wakers.writers.push(cx.waker().clone());
}
// Remember the current ticks.
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
wakers.tick_writable,
));
}
Poll::Pending
})
.await
}
}
/// Raw bindings to epoll (Linux, Android, illumos).
#[cfg(any(target_os = "linux", target_os = "android", target_os = "illumos"))]
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::unix::io::RawFd;
use std::time::Duration;
use crate::sys::epoll::{
epoll_create1, epoll_ctl, epoll_wait, EpollEvent, EpollFlags, EpollOp,
};
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { libc::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
pub struct Reactor {
epoll_fd: RawFd,
event_fd: RawFd,
}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let epoll_fd = epoll_create1()?;
let event_fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
let reactor = Reactor { epoll_fd, event_fd };
reactor.register(event_fd, !0)?;
reactor.reregister(event_fd, !0, true, false)?;
Ok(reactor)
}
pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> {
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
let ev = &mut EpollEvent::new(0, key as u64);
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlAdd, fd, Some(ev))
}
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
let mut flags = libc::EPOLLONESHOT;
if read {
flags |= read_flags();
}
if write {
flags |= write_flags();
}
let ev = &mut EpollEvent::new(flags, key as u64);
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlMod, fd, Some(ev))
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlDel, fd, None)
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = timeout
.map(|t| {
if t == Duration::from_millis(0) {
t
} else {
t.max(Duration::from_millis(1))
}
})
.and_then(|t| t.as_millis().try_into().ok())
.unwrap_or(-1);
events.len = epoll_wait(self.epoll_fd, &mut events.list, timeout_ms)?;
let mut buf = [0u8; 8];
let _ = syscall!(read(
self.event_fd,
&mut buf[0] as *mut u8 as *mut libc::c_void,
buf.len()
));
self.reregister(self.event_fd, !0, true, false)?;
Ok(events.len)
}
pub fn notify(&self) -> io::Result<()> {
let buf: [u8; 8] = 1u64.to_ne_bytes();
let _ = syscall!(write(
self.event_fd,
&buf[0] as *const u8 as *const libc::c_void,
buf.len()
));
Ok(())
}
}
fn read_flags() -> EpollFlags {
libc::EPOLLIN | libc::EPOLLRDHUP | libc::EPOLLHUP | libc::EPOLLERR | libc::EPOLLPRI
}
fn write_flags() -> EpollFlags {
libc::EPOLLOUT | libc::EPOLLHUP | libc::EPOLLERR
}
pub struct Events {
list: Box<[EpollEvent]>,
len: usize,
}
impl Events {
pub fn new() -> Events {
let list = vec![EpollEvent::empty(); 1000].into_boxed_slice();
let len = 0;
Events { list, len }
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
readable: (ev.events() & read_flags()) != 0,
writable: (ev.events() & write_flags()) != 0,
key: ev.data() as usize,
})
}
}
pub struct Event {
pub readable: bool,
pub writable: bool,
pub key: usize,
}
}
/// Raw bindings to kqueue (macOS, iOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
))]
mod sys {
use std::io::{self, Read, Write};
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
use std::time::Duration;
use crate::sys::event::{kevent_ts, kqueue, KEvent};
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { libc::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
pub struct Reactor {
kqueue_fd: RawFd,
read_stream: UnixStream,
write_stream: UnixStream,
}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let kqueue_fd = kqueue()?;
syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
let (read_stream, write_stream) = UnixStream::pair()?;
read_stream.set_nonblocking(true)?;
write_stream.set_nonblocking(true)?;
let reactor = Reactor {
kqueue_fd,
read_stream,
write_stream,
};
reactor.reregister(reactor.read_stream.as_raw_fd(), !0, true, false)?;
Ok(reactor)
}
pub fn register(&self, fd: RawFd, _key: usize) -> io::Result<()> {
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
Ok(())
}
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
let mut read_flags = libc::EV_ONESHOT | libc::EV_RECEIPT;
let mut write_flags = libc::EV_ONESHOT | libc::EV_RECEIPT;
if read {
read_flags |= libc::EV_ADD;
} else {
read_flags |= libc::EV_DELETE;
}
if write {
write_flags |= libc::EV_ADD;
} else {
write_flags |= libc::EV_DELETE;
}
let udata = key as _;
let changelist = [
KEvent::new(fd as _, libc::EVFILT_READ, read_flags, 0, 0, udata),
KEvent::new(fd as _, libc::EVFILT_WRITE, write_flags, 0, 0, udata),
];
let mut eventlist = changelist;
kevent_ts(self.kqueue_fd, &changelist, &mut eventlist, None)?;
for ev in &eventlist {
// Explanation for ignoring EPIPE: https://github.com/tokio-rs/mio/issues/582
let (flags, data) = (ev.flags(), ev.data());
if (flags & libc::EV_ERROR) == 1
&& data != 0
&& data != libc::ENOENT as _
&& data != libc::EPIPE as _
{
return Err(io::Error::from_raw_os_error(data as _));
}
}
Ok(())
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
let flags = libc::EV_DELETE | libc::EV_RECEIPT;
let changelist = [
KEvent::new(fd as _, libc::EVFILT_WRITE, flags, 0, 0, 0),
KEvent::new(fd as _, libc::EVFILT_READ, flags, 0, 0, 0),
];
let mut eventlist = changelist;
kevent_ts(self.kqueue_fd, &changelist, &mut eventlist, None)?;
for ev in &eventlist {
let (flags, data) = (ev.flags(), ev.data());
if (flags & libc::EV_ERROR == 1) && data != 0 && data != libc::ENOENT as _ {
return Err(io::Error::from_raw_os_error(data as _));
}
}
Ok(())
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout = timeout.map(|t| libc::timespec {
tv_sec: t.as_secs() as libc::time_t,
tv_nsec: t.subsec_nanos() as libc::c_long,
});
events.len = kevent_ts(self.kqueue_fd, &[], &mut events.list, timeout)?;
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
self.reregister(self.read_stream.as_raw_fd(), !0, true, false)?;
Ok(events.len)
}
pub fn notify(&self) -> io::Result<()> {
let _ = (&self.write_stream).write(&[1]);
Ok(())
}
}
pub struct Events {
list: Box<[KEvent]>,
len: usize,
}
impl Events {
pub fn new() -> Events {
let flags = 0;
let event = KEvent::new(0, 0, flags, 0, 0, 0);
let list = vec![event; 1000].into_boxed_slice();
let len = 0;
Events { list, len }
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
// On some platforms, closing the read end of a pipe wakes up writers, but the
// event is reported as EVFILT_READ with the EV_EOF flag.
//
// https://github.com/golang/go/commit/23aad448b1e3f7c3b4ba2af90120bde91ac865b4
self.list[..self.len].iter().map(|ev| Event {
readable: ev.filter() == libc::EVFILT_READ,
writable: ev.filter() == libc::EVFILT_WRITE
|| (ev.filter() == libc::EVFILT_READ && (ev.flags() & libc::EV_EOF) != 0),
key: ev.udata() as usize,
})
}
}
pub struct Event {
pub readable: bool,
pub writable: bool,
pub key: usize,
}
}
/// Raw bindings to wepoll (Windows).
#[cfg(target_os = "windows")]
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::time::Duration;
use wepoll_sys_stjepang as we;
use winapi::um::winsock2;
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { we::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
pub struct Reactor {
handle: we::HANDLE,
}
unsafe impl Send for Reactor {}
unsafe impl Sync for Reactor {}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let handle = unsafe { we::epoll_create1(0) };
if handle.is_null() {
return Err(io::Error::last_os_error());
}
Ok(Reactor { handle })
}
pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> {
unsafe {
let mut nonblocking = true as libc::c_ulong;
let res = winsock2::ioctlsocket(
sock as winsock2::SOCKET,
winsock2::FIONBIO,
&mut nonblocking,
);
if res != 0 {
return Err(io::Error::last_os_error());
}
}
let mut ev = we::epoll_event {
events: 0,
data: we::epoll_data { u64: key as u64 },
};
syscall!(epoll_ctl(
self.handle,
we::EPOLL_CTL_ADD as libc::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
}
pub fn reregister(
&self,
sock: RawSocket,
key: usize,
read: bool,
write: bool,
) -> io::Result<()> {
let mut flags = we::EPOLLONESHOT;
if read {
flags |= READ_FLAGS;
}
if write {
flags |= WRITE_FLAGS;
}
let mut ev = we::epoll_event {
events: flags as u32,
data: we::epoll_data { u64: key as u64 },
};
syscall!(epoll_ctl(
self.handle,
we::EPOLL_CTL_MOD as libc::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
}
pub fn deregister(&self, sock: RawSocket) -> io::Result<()> {
syscall!(epoll_ctl(
self.handle,
we::EPOLL_CTL_DEL as libc::c_int,
sock as we::SOCKET,
0 as *mut we::epoll_event,
))?;
Ok(())
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = match timeout {
None => -1,
Some(t) => {
if t == Duration::from_millis(0) {
0
} else {
t.max(Duration::from_millis(1))
.as_millis()
.try_into()
.unwrap_or(libc::c_int::max_value())
}
}
};
events.len = syscall!(epoll_wait(
self.handle,
events.list.as_mut_ptr(),
events.list.len() as libc::c_int,
timeout_ms,
))? as usize;
Ok(events.len)
}
pub fn notify(&self) -> io::Result<()> {
unsafe {
// This errors if a notification has already been posted, but that's okay.
winapi::um::ioapiset::PostQueuedCompletionStatus(
self.handle as winapi::um::winnt::HANDLE,
0,
0,
0 as *mut _,
);
}
Ok(())
}
}
struct As(RawSocket);
impl AsRawSocket for As {
fn as_raw_socket(&self) -> RawSocket {
self.0
}
}
const READ_FLAGS: u32 =
we::EPOLLIN | we::EPOLLRDHUP | we::EPOLLHUP | we::EPOLLERR | we::EPOLLPRI;
const WRITE_FLAGS: u32 = we::EPOLLOUT | we::EPOLLHUP | we::EPOLLERR;
pub struct Events {
list: Box<[we::epoll_event]>,
len: usize,
}
unsafe impl Send for Events {}
unsafe impl Sync for Events {}
impl Events {
pub fn new() -> Events {
let ev = we::epoll_event {
events: 0,
data: we::epoll_data { u64: 0 },
};
Events {
list: vec![ev; 1000].into_boxed_slice(),
len: 0,
}
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
readable: (ev.events & READ_FLAGS) != 0,
writable: (ev.events & WRITE_FLAGS) != 0,
key: unsafe { ev.data.u64 } as usize,
})
}
}
pub struct Event {
pub readable: bool,
pub writable: bool,
pub key: usize,
}
}

View File

@ -1,135 +0,0 @@
//! Implementation of [`run()`].
//!
//! This function is the entry point to the smol executor.
use std::future::Future;
use std::task::{Context, Poll};
use std::time::Duration;
use once_cell::sync::Lazy;
use crate::context;
use crate::multitask;
use crate::parking::Parker;
use scoped_tls::scoped_thread_local;
/// The global task queue.
pub(crate) static QUEUE: Lazy<multitask::Queue> = Lazy::new(|| multitask::Queue::new());
scoped_thread_local! {
/// Thread-local worker queue.
pub(crate) static WORKER: multitask::Worker
}
/// Runs executors and polls the reactor.
///
/// This function simultaneously runs the thread-local executor, runs the work-stealing
/// executor, and polls the reactor for I/O events and timers. At least one thread has to be
/// calling [`run()`] in order for futures waiting on I/O and timers to get notified.
///
/// # Examples
///
/// Single-threaded executor:
///
/// ```
/// // Run the thread-local and work-stealing executor on the current thread.
/// smol::run(async {
/// println!("Hello from the smol executor!");
/// });
/// ```
///
/// Multi-threaded executor:
///
/// ```no_run
/// use futures::future;
/// use smol::Task;
/// use std::thread;
///
/// // Same number of threads as there are CPU cores.
/// let num_threads = num_cpus::get().max(1);
///
/// // Run the thread-local and work-stealing executor on a thread pool.
/// for _ in 0..num_threads {
/// // A pending future is one that simply yields forever.
/// thread::spawn(|| smol::run(future::pending::<()>()));
/// }
///
/// // No need to `run()`, now we can just block on the main future.
/// smol::block_on(async {
/// Task::spawn(async {
/// println!("Hello from an executor thread!");
/// })
/// .await;
/// });
/// ```
///
/// Stoppable multi-threaded executor:
///
/// ```
/// use smol::Task;
/// use std::thread;
///
/// // Same number of threads as there are CPU cores.
/// let num_threads = num_cpus::get().max(1);
///
/// // A channel that sends the shutdown signal.
/// let (s, r) = piper::chan::<()>(0);
/// let mut threads = Vec::new();
///
/// // Create an executor thread pool.
/// for _ in 0..num_threads {
/// // Spawn an executor thread that waits for the shutdown signal.
/// let r = r.clone();
/// threads.push(thread::spawn(move || smol::run(r.recv())));
/// }
///
/// // No need to `run()`, now we can just block on the main future.
/// smol::block_on(async {
/// Task::spawn(async {
/// println!("Hello from an executor thread!");
/// })
/// .await;
/// });
///
/// // Send a shutdown signal.
/// drop(s);
///
/// // Wait for threads to finish.
/// for t in threads {
/// t.join().unwrap();
/// }
/// ```
pub fn run<T>(future: impl Future<Output = T>) -> T {
let parker = Parker::new();
let unparker = parker.unparker();
let worker = QUEUE.worker(move || unparker.unpark());
// Create a waker that triggers an I/O event in the thread-local scheduler.
let unparker = parker.unparker();
let waker = async_task::waker_fn(move || unparker.unpark());
let cx = &mut Context::from_waker(&waker);
futures_util::pin_mut!(future);
// Set up tokio if enabled.
context::enter(|| {
WORKER.set(&worker, || {
'start: loop {
// Poll the main future.
if let Poll::Ready(val) = future.as_mut().poll(cx) {
return val;
}
for _ in 0..200 {
if !worker.tick() {
parker.park();
continue 'start;
}
}
// Process ready I/O events without blocking.
parker.park_timeout(Duration::from_secs(0));
}
})
})
}

View File

@ -1,277 +0,0 @@
#[cfg(unix)]
fn check_err(res: libc::c_int) -> Result<libc::c_int, std::io::Error> {
if res == -1 {
return Err(std::io::Error::last_os_error());
}
Ok(res)
}
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
))]
/// Kqueue.
pub mod event {
use super::check_err;
use std::os::unix::io::RawFd;
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "dragonfly",
target_os = "openbsd"
))]
#[allow(non_camel_case_types)]
type type_of_nchanges = libc::c_int;
#[cfg(target_os = "netbsd")]
#[allow(non_camel_case_types)]
type type_of_nchanges = libc::size_t;
#[cfg(target_os = "netbsd")]
#[allow(non_camel_case_types)]
type type_of_event_filter = u32;
#[cfg(not(target_os = "netbsd"))]
#[allow(non_camel_case_types)]
type type_of_event_filter = i16;
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "openbsd"
))]
#[allow(non_camel_case_types)]
type type_of_udata = *mut libc::c_void;
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
#[allow(non_camel_case_types)]
type type_of_data = libc::intptr_t;
#[cfg(any(target_os = "netbsd"))]
#[allow(non_camel_case_types)]
type type_of_udata = libc::intptr_t;
#[cfg(any(target_os = "netbsd", target_os = "openbsd"))]
#[allow(non_camel_case_types)]
type type_of_data = libc::int64_t;
#[derive(Clone, Copy)]
#[repr(C)]
pub struct KEvent(libc::kevent);
unsafe impl Send for KEvent {}
impl KEvent {
pub fn new(
ident: libc::uintptr_t,
filter: EventFilter,
flags: EventFlag,
fflags: FilterFlag,
data: libc::intptr_t,
udata: libc::intptr_t,
) -> KEvent {
KEvent(libc::kevent {
ident,
filter: filter as type_of_event_filter,
flags,
fflags,
data: data as type_of_data,
udata: udata as type_of_udata,
})
}
pub fn filter(&self) -> EventFilter {
unsafe { std::mem::transmute(self.0.filter as type_of_event_filter) }
}
pub fn flags(&self) -> EventFlag {
self.0.flags
}
pub fn data(&self) -> libc::intptr_t {
self.0.data as libc::intptr_t
}
pub fn udata(&self) -> libc::intptr_t {
self.0.udata as libc::intptr_t
}
}
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "openbsd"
))]
pub type EventFlag = u16;
#[cfg(any(target_os = "netbsd"))]
pub type EventFlag = u32;
pub type FilterFlag = u32;
#[cfg(target_os = "netbsd")]
pub type EventFilter = u32;
#[cfg(not(target_os = "netbsd"))]
pub type EventFilter = i16;
pub fn kqueue() -> Result<RawFd, std::io::Error> {
let res = unsafe { libc::kqueue() };
check_err(res)
}
pub fn kevent_ts(
kq: RawFd,
changelist: &[KEvent],
eventlist: &mut [KEvent],
timeout_opt: Option<libc::timespec>,
) -> Result<usize, std::io::Error> {
let res = unsafe {
libc::kevent(
kq,
changelist.as_ptr() as *const libc::kevent,
changelist.len() as type_of_nchanges,
eventlist.as_mut_ptr() as *mut libc::kevent,
eventlist.len() as type_of_nchanges,
if let Some(ref timeout) = timeout_opt {
timeout as *const libc::timespec
} else {
std::ptr::null()
},
)
};
check_err(res).map(|r| r as usize)
}
}
#[cfg(any(target_os = "linux", target_os = "android", target_os = "illumos"))]
/// Epoll.
pub mod epoll {
use super::check_err;
use std::os::unix::io::RawFd;
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[repr(i32)]
pub enum EpollOp {
EpollCtlAdd = libc::EPOLL_CTL_ADD,
EpollCtlDel = libc::EPOLL_CTL_DEL,
EpollCtlMod = libc::EPOLL_CTL_MOD,
}
pub type EpollFlags = libc::c_int;
pub fn epoll_create1() -> Result<RawFd, std::io::Error> {
// According to libuv, `EPOLL_CLOEXEC` is not defined on Android API < 21.
// But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform, so we use it instead.
#[cfg(target_os = "android")]
const CLOEXEC: libc::c_int = libc::O_CLOEXEC;
#[cfg(not(target_os = "android"))]
const CLOEXEC: libc::c_int = libc::EPOLL_CLOEXEC;
let fd = unsafe {
// Check if the `epoll_create1` symbol is available on this platform.
let ptr = libc::dlsym(
libc::RTLD_DEFAULT,
"epoll_create1\0".as_ptr() as *const libc::c_char,
);
if ptr.is_null() {
// If not, use `epoll_create` and manually set `CLOEXEC`.
let fd = check_err(libc::epoll_create(1024))?;
let flags = libc::fcntl(fd, libc::F_GETFD);
libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
fd
} else {
// Use `epoll_create1` with `CLOEXEC`.
let epoll_create1 = std::mem::transmute::<
*mut libc::c_void,
unsafe extern "C" fn(libc::c_int) -> libc::c_int,
>(ptr);
check_err(epoll_create1(CLOEXEC))?
}
};
Ok(fd)
}
pub fn epoll_ctl<'a, T>(
epfd: RawFd,
op: EpollOp,
fd: RawFd,
event: T,
) -> Result<(), std::io::Error>
where
T: Into<Option<&'a mut EpollEvent>>,
{
let mut event: Option<&mut EpollEvent> = event.into();
if event.is_none() && op != EpollOp::EpollCtlDel {
Err(std::io::Error::from_raw_os_error(libc::EINVAL))
} else {
let res = unsafe {
if let Some(ref mut event) = event {
libc::epoll_ctl(epfd, op as libc::c_int, fd, &mut event.event)
} else {
libc::epoll_ctl(epfd, op as libc::c_int, fd, std::ptr::null_mut())
}
};
check_err(res).map(drop)
}
}
pub fn epoll_wait(
epfd: RawFd,
events: &mut [EpollEvent],
timeout_ms: isize,
) -> Result<usize, std::io::Error> {
let res = unsafe {
libc::epoll_wait(
epfd,
events.as_mut_ptr() as *mut libc::epoll_event,
events.len() as libc::c_int,
timeout_ms as libc::c_int,
)
};
check_err(res).map(|r| r as usize)
}
#[derive(Clone, Copy)]
#[repr(transparent)]
pub struct EpollEvent {
event: libc::epoll_event,
}
impl EpollEvent {
pub fn new(events: EpollFlags, data: u64) -> Self {
EpollEvent {
event: libc::epoll_event {
events: events as u32,
u64: data,
},
}
}
pub fn empty() -> Self {
unsafe { std::mem::zeroed::<EpollEvent>() }
}
pub fn events(&self) -> EpollFlags {
self.event.events as libc::c_int
}
pub fn data(&self) -> u64 {
self.event.u64
}
}
}

View File

@ -1,283 +0,0 @@
//! The task system.
//!
//! A [`Task`] handle represents a spawned future that is run by the executor.
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::blocking::BlockingExecutor;
use crate::run::{QUEUE, WORKER};
/// A runnable future, ready for execution.
///
/// When a future is internally spawned using `async_task::spawn()` or `async_task::spawn_local()`,
/// we get back two values:
///
/// 1. an `async_task::Task<()>`, which we refer to as a `Runnable`
/// 2. an `async_task::JoinHandle<T, ()>`, which is wrapped inside a `Task<T>`
///
/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken. When it's
/// woken up, its schedule function is called, which means the `Runnable` gets pushed into a task
/// queue in an executor.
pub(crate) type Runnable = async_task::Task<()>;
/// A spawned future.
///
/// Tasks are also futures themselves and yield the output of the spawned future.
///
/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit
/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method.
///
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
///
/// If the future panics, the panic will be unwound into the [`run()`] invocation that polled it.
/// However, this does not apply to the blocking executor - it will simply ignore panics and
/// continue running.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// // Spawn a task onto the work-stealing executor.
/// let task = Task::spawn(async {
/// println!("Hello from a task!");
/// 1 + 2
/// });
///
/// // Wait for the task to complete.
/// assert_eq!(task.await, 3);
/// # });
/// ```
///
/// [`run()`]: `crate::run()`
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
#[derive(Debug)]
pub struct Task<T>(pub(crate) Option<async_task::JoinHandle<T, ()>>);
impl<T: 'static> Task<T> {
/// Spawns a future onto the thread-local executor.
///
/// Panics if the current thread is not inside an invocation of [`run()`].
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// let task = Task::local(async { 1 + 2 });
/// assert_eq!(task.await, 3);
/// # })
/// ```
///
/// [`run()`]: `crate::run()`
pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
if WORKER.is_set() {
WORKER.with(|w| w.spawn_local(future))
} else {
panic!("cannot spawn a thread-local task if not inside an executor")
}
}
}
impl<T: Send + 'static> Task<T> {
/// Spawns a future onto the work-stealing executor.
///
/// This future may be stolen and polled by any thread calling [`run()`].
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// let task = Task::spawn(async { 1 + 2 });
/// assert_eq!(task.await, 3);
/// # });
/// ```
///
/// [`run()`]: `crate::run()`
pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
if WORKER.is_set() {
WORKER.with(|w| w.spawn_local(future))
} else {
QUEUE.spawn(future)
}
}
/// Spawns a future onto the blocking executor.
///
/// This future is allowed to block for an indefinite length of time.
///
/// For convenience, there is also the [`blocking!`] macro that spawns a blocking tasks and
/// immediately awaits it.
///
/// # Examples
///
/// Read a line from the standard input:
///
/// ```no_run
/// use smol::Task;
/// use std::io::stdin;
///
/// # smol::block_on(async {
/// let line = Task::blocking(async {
/// let mut line = String::new();
/// std::io::stdin().read_line(&mut line).unwrap();
/// line
/// })
/// .await;
/// # });
/// ```
///
/// See also examples for [`blocking!`], [`iter()`], [`reader()`], and [`writer()`].
///
/// [`iter()`]: `crate::iter()`
/// [`reader()`]: `crate::reader()`
/// [`writer()`]: `crate::writer()`
pub fn blocking(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
BlockingExecutor::get().spawn(future)
}
}
impl<T, E> Task<Result<T, E>>
where
T: Send + 'static,
E: Debug + Send + 'static,
{
/// Spawns a new task that awaits and unwraps the result.
///
/// The new task will panic if the original task results in an error.
///
/// # Examples
///
/// ```
/// use smol::{Async, Task};
/// use std::net::TcpStream;
///
/// # smol::run(async {
/// let stream = Task::spawn(async {
/// Async::<TcpStream>::connect("example.com:80").await
/// })
/// .unwrap()
/// .await;
/// # })
/// ```
pub fn unwrap(self) -> Task<T> {
Task::spawn(async { self.await.unwrap() })
}
/// Spawns a new task that awaits and unwraps the result.
///
/// The new task will panic with the provided message if the original task results in an error.
///
/// # Examples
///
/// ```
/// use smol::{Async, Task};
/// use std::net::TcpStream;
///
/// # smol::run(async {
/// let stream = Task::spawn(async {
/// Async::<TcpStream>::connect("example.com:80").await
/// })
/// .expect("cannot connect")
/// .await;
/// # })
/// ```
pub fn expect(self, msg: &str) -> Task<T> {
let msg = msg.to_owned();
Task::spawn(async move { self.await.expect(&msg) })
}
}
impl Task<()> {
/// Detaches the task to let it keep running in the background.
///
/// # Examples
///
/// ```no_run
/// use smol::{Task, Timer};
/// use std::time::Duration;
///
/// # smol::run(async {
/// Task::spawn(async {
/// loop {
/// println!("I'm a daemon task looping forever.");
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// })
/// .detach();
/// # })
/// ```
pub fn detach(mut self) {
self.0.take().unwrap();
}
}
impl<T> Task<T> {
/// Cancels the task and waits for it to stop running.
///
/// Returns the task's output if it was completed just before it got canceled, or `None` if it
/// didn't complete.
///
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
/// canceling because it also waits for the task to stop running.
///
/// # Examples
///
/// ```
/// use smol::{Task, Timer};
/// use std::time::Duration;
///
/// # smol::run(async {
/// let task = Task::spawn(async {
/// loop {
/// println!("Even though I'm in an infinite loop, you can still cancel me!");
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// });
///
/// Timer::after(Duration::from_secs(3)).await;
/// task.cancel().await;
/// # })
/// ```
pub async fn cancel(self) -> Option<T> {
// There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just
// do `{ self }` here to avoid marking `self` as mutable.
let handle = { self }.0.take().unwrap();
handle.cancel();
handle.await
}
}
impl<T> Drop for Task<T> {
fn drop(&mut self) {
if let Some(handle) = &self.0 {
handle.cancel();
}
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => Poll::Ready(output.expect("task has failed")),
}
}
}
impl<T> Into<async_task::JoinHandle<T, ()>> for Task<T> {
fn into(mut self) -> async_task::JoinHandle<T, ()> {
self.0
.take()
.expect("task was already canceled or has failed")
}
}

View File

@ -1,139 +0,0 @@
//! Implementation of [`Timer`].
//!
//! Timers are futures that fire at a predefined point in time.
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use crate::reactor::Reactor;
/// Fires at the chosen point in time.
///
/// Timers are futures that output the [`Instant`] at which they fired.
///
/// # Examples
///
/// Sleep for 1 second:
///
/// ```
/// use smol::Timer;
/// use std::time::Duration;
///
/// async fn sleep(dur: Duration) {
/// Timer::after(dur).await;
/// }
///
/// # smol::run(async {
/// sleep(Duration::from_secs(1)).await;
/// # });
/// ```
///
/// Set a timeout on an I/O operation:
///
/// ```
/// use futures::future::Either;
/// use futures::io::{self, BufReader};
/// use futures::prelude::*;
/// use smol::Timer;
/// use std::time::Duration;
///
/// async fn timeout<T>(
/// dur: Duration,
/// f: impl Future<Output = io::Result<T>>,
/// ) -> io::Result<T> {
/// futures::pin_mut!(f);
/// match future::select(f, Timer::after(dur)).await {
/// Either::Left((out, _)) => out,
/// Either::Right(_) => Err(io::ErrorKind::TimedOut.into()),
/// }
/// }
///
/// # smol::run(async {
/// // Create a buffered stdin reader.
/// let mut stdin = BufReader::new(smol::reader(std::io::stdin()));
///
/// // Read a line within 5 seconds.
/// let mut line = String::new();
/// timeout(Duration::from_secs(5), stdin.read_line(&mut line)).await?;
/// # io::Result::Ok(()) });
/// ```
#[derive(Debug)]
pub struct Timer {
/// This timer's ID.
///
/// When this field is set to `None`, this timer is not registered in the reactor.
id: Option<usize>,
/// When this timer fires.
when: Instant,
}
impl Timer {
/// Fires after the specified duration of time.
///
/// # Examples
///
/// ```
/// use smol::Timer;
/// use std::time::Duration;
///
/// # smol::run(async {
/// Timer::after(Duration::from_secs(1)).await;
/// # });
/// ```
pub fn after(dur: Duration) -> Timer {
Timer::at(Instant::now() + dur)
}
/// Fires at the specified instant in time.
///
/// # Examples
///
/// ```
/// use smol::Timer;
/// use std::time::{Duration, Instant};
///
/// # smol::run(async {
/// let now = Instant::now();
/// let when = now + Duration::from_secs(1);
/// Timer::at(when).await;
/// # });
/// ```
pub fn at(when: Instant) -> Timer {
let id = None;
Timer { id, when }
}
}
impl Drop for Timer {
fn drop(&mut self) {
if let Some(id) = self.id.take() {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
}
}
}
impl Future for Timer {
type Output = Instant;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Check if the timer has already fired.
if Instant::now() >= self.when {
if let Some(id) = self.id.take() {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
}
Poll::Ready(self.when)
} else {
if self.id.is_none() {
// Register the timer in the reactor.
self.id = Some(Reactor::get().insert_timer(self.when, cx.waker()));
}
Poll::Pending
}
}
}

View File

@ -1,332 +0,0 @@
use std::future::Future;
use std::io;
use std::net::{Shutdown, TcpListener, TcpStream, UdpSocket};
#[cfg(unix)]
use std::os::unix::net::{UnixDatagram, UnixListener, UnixStream};
use std::sync::Arc;
use std::time::Duration;
use futures::{AsyncReadExt, AsyncWriteExt, StreamExt};
use smol::{Async, Task, Timer};
#[cfg(unix)]
use tempfile::tempdir;
const LOREM_IPSUM: &[u8] = b"
Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Donec pretium ante erat, vitae sodales mi varius quis.
Etiam vestibulum lorem vel urna tempor, eu fermentum odio aliquam.
Aliquam consequat urna vitae ipsum pulvinar, in blandit purus eleifend.
";
/// Runs future inside a local task.
///
/// The main future passed to `smol::run()` is sometimes polled even if it was not woken - e.g.
/// this can happen when the executor is waiting on the reactor and then wakes up for whatever
/// reason.
fn run<T: 'static>(future: impl Future<Output = T> + 'static) -> T {
smol::run(async { Task::local(async { future.await }).await })
}
#[test]
fn tcp_connect() -> io::Result<()> {
run(async {
let listener = Async::<TcpListener>::bind("127.0.0.1:0")?;
let addr = listener.get_ref().local_addr()?;
let task = Task::local(async move { listener.accept().await });
let stream2 = Async::<TcpStream>::connect(&addr).await?;
let stream1 = task.await?.0;
assert_eq!(
stream1.get_ref().peer_addr()?,
stream2.get_ref().local_addr()?,
);
assert_eq!(
stream2.get_ref().peer_addr()?,
stream1.get_ref().local_addr()?,
);
// Now that the listener is closed, connect should fail.
let err = Async::<TcpStream>::connect(&addr).await.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused);
Ok(())
})
}
#[test]
fn tcp_peek_read() -> io::Result<()> {
run(async {
let listener = Async::<TcpListener>::bind("127.0.0.1:0")?;
let addr = listener.get_ref().local_addr()?;
let mut stream = Async::<TcpStream>::connect(addr).await?;
stream.write_all(LOREM_IPSUM).await?;
let mut buf = [0; 1024];
let mut incoming = listener.incoming();
let mut stream = incoming.next().await.unwrap()?;
let n = stream.peek(&mut buf).await?;
assert_eq!(&buf[..n], LOREM_IPSUM);
let n = stream.read(&mut buf).await?;
assert_eq!(&buf[..n], LOREM_IPSUM);
Ok(())
})
}
#[test]
fn tcp_reader_hangup() -> io::Result<()> {
run(async {
let listener = Async::<TcpListener>::bind("127.0.0.1:0")?;
let addr = listener.get_ref().local_addr()?;
let task = Task::local(async move { listener.accept().await });
let mut stream2 = Async::<TcpStream>::connect(&addr).await?;
let stream1 = task.await?.0;
let task = Task::local(async move {
Timer::after(Duration::from_secs(1)).await;
drop(stream1);
});
while stream2.write_all(LOREM_IPSUM).await.is_ok() {}
task.await;
Ok(())
})
}
#[test]
fn tcp_writer_hangup() -> io::Result<()> {
run(async {
let listener = Async::<TcpListener>::bind("127.0.0.1:0")?;
let addr = listener.get_ref().local_addr()?;
let task = Task::local(async move { listener.accept().await });
let mut stream2 = Async::<TcpStream>::connect(&addr).await?;
let stream1 = task.await?.0;
let task = Task::local(async move {
Timer::after(Duration::from_secs(1)).await;
drop(stream1);
});
let mut v = vec![];
stream2.read_to_end(&mut v).await?;
assert!(v.is_empty());
task.await;
Ok(())
})
}
#[test]
fn udp_send_recv() -> io::Result<()> {
run(async {
let socket1 = Async::<UdpSocket>::bind("127.0.0.1:0")?;
let socket2 = Async::<UdpSocket>::bind("127.0.0.1:0")?;
socket1.get_ref().connect(socket2.get_ref().local_addr()?)?;
let mut buf = [0u8; 1024];
socket1.send(LOREM_IPSUM).await?;
let n = socket2.peek(&mut buf).await?;
assert_eq!(&buf[..n], LOREM_IPSUM);
let n = socket2.recv(&mut buf).await?;
assert_eq!(&buf[..n], LOREM_IPSUM);
socket2
.send_to(LOREM_IPSUM, socket1.get_ref().local_addr()?)
.await?;
let n = socket1.peek_from(&mut buf).await?.0;
assert_eq!(&buf[..n], LOREM_IPSUM);
let n = socket1.recv_from(&mut buf).await?.0;
assert_eq!(&buf[..n], LOREM_IPSUM);
Ok(())
})
}
#[cfg(unix)]
#[test]
fn udp_connect() -> io::Result<()> {
run(async {
let dir = tempdir()?;
let path = dir.path().join("socket");
let listener = Async::<UnixListener>::bind(&path)?;
let mut stream = Async::<UnixStream>::connect(&path).await?;
stream.write_all(LOREM_IPSUM).await?;
let mut buf = [0; 1024];
let mut incoming = listener.incoming();
let mut stream = incoming.next().await.unwrap()?;
let n = stream.read(&mut buf).await?;
assert_eq!(&buf[..n], LOREM_IPSUM);
Ok(())
})
}
#[cfg(unix)]
#[test]
fn uds_connect() -> io::Result<()> {
run(async {
let dir = tempdir()?;
let path = dir.path().join("socket");
let listener = Async::<UnixListener>::bind(&path)?;
let addr = listener.get_ref().local_addr()?;
let task = Task::local(async move { listener.accept().await });
let stream2 = Async::<UnixStream>::connect(addr.as_pathname().unwrap()).await?;
let stream1 = task.await?.0;
assert_eq!(
stream1.get_ref().peer_addr()?.as_pathname(),
stream2.get_ref().local_addr()?.as_pathname(),
);
assert_eq!(
stream2.get_ref().peer_addr()?.as_pathname(),
stream1.get_ref().local_addr()?.as_pathname(),
);
// Now that the listener is closed, connect should fail.
let err = Async::<UnixStream>::connect(addr.as_pathname().unwrap())
.await
.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::ConnectionRefused);
Ok(())
})
}
#[cfg(unix)]
#[test]
fn uds_send_recv() -> io::Result<()> {
run(async {
let (socket1, socket2) = Async::<UnixDatagram>::pair()?;
socket1.send(LOREM_IPSUM).await?;
let mut buf = [0; 1024];
let n = socket2.recv(&mut buf).await?;
assert_eq!(&buf[..n], LOREM_IPSUM);
Ok(())
})
}
#[cfg(unix)]
#[test]
fn uds_send_to_recv_from() -> io::Result<()> {
run(async {
let dir = tempdir()?;
let path = dir.path().join("socket");
let socket1 = Async::<UnixDatagram>::bind(&path)?;
let socket2 = Async::<UnixDatagram>::unbound()?;
socket2.send_to(LOREM_IPSUM, &path).await?;
let mut buf = [0; 1024];
let n = socket1.recv_from(&mut buf).await?.0;
assert_eq!(&buf[..n], LOREM_IPSUM);
Ok(())
})
}
#[cfg(unix)]
#[test]
fn uds_reader_hangup() -> io::Result<()> {
run(async {
let (socket1, mut socket2) = Async::<UnixStream>::pair()?;
let task = Task::local(async move {
Timer::after(Duration::from_secs(1)).await;
drop(socket1);
});
while socket2.write_all(LOREM_IPSUM).await.is_ok() {}
task.await;
Ok(())
})
}
#[cfg(unix)]
#[test]
fn uds_writer_hangup() -> io::Result<()> {
run(async {
let (socket1, mut socket2) = Async::<UnixStream>::pair()?;
let task = Task::local(async move {
Timer::after(Duration::from_secs(1)).await;
drop(socket1);
});
let mut v = vec![];
socket2.read_to_end(&mut v).await?;
assert!(v.is_empty());
task.await;
Ok(())
})
}
// Test that we correctly re-register interests when we are previously
// interested in both readable and writable events and then we get only one of
// them. (we need to re-register interest on the other.)
#[test]
fn tcp_duplex() -> io::Result<()> {
run(async {
let listener = Async::<TcpListener>::bind("127.0.0.1:0")?;
let stream0 =
Arc::new(Async::<TcpStream>::connect(listener.get_ref().local_addr()?).await?);
let stream1 = Arc::new(listener.accept().await?.0);
async fn do_read(s: Arc<Async<TcpStream>>) -> io::Result<()> {
let mut buf = vec![0u8; 4096];
loop {
let len = (&*s).read(&mut buf).await?;
if len == 0 {
return Ok(());
}
}
}
async fn do_write(s: Arc<Async<TcpStream>>) -> io::Result<()> {
let buf = vec![0u8; 4096];
for _ in 0..4096 {
(&*s).write_all(&buf).await?;
}
s.get_ref().shutdown(Shutdown::Write)?;
Ok(())
}
// Read from and write to stream0.
let r0 = Task::local(do_read(stream0.clone()));
let w0 = Task::local(do_write(stream0));
// Sleep a bit, so that reading and writing are both blocked.
smol::Timer::after(Duration::from_millis(5)).await;
// Start reading stream1, make stream0 writable.
let r1 = Task::local(do_read(stream1.clone()));
// Finish writing to stream0.
w0.await?;
r1.await?;
// Start writing to stream1, make stream0 readable.
let w1 = Task::local(do_write(stream1));
// Will r0 be correctly woken?
r0.await?;
w1.await?;
Ok(())
})
}

View File

@ -1,37 +0,0 @@
use futures_util::future;
#[test]
fn smoke() {
std::thread::spawn(|| {
smol::run(future::pending::<()>());
});
let res = smol::block_on(async { 1 + 2 });
assert_eq!(res, 3);
}
#[test]
#[should_panic = "boom"]
fn panic() {
std::thread::spawn(|| {
smol::run(future::pending::<()>());
});
smol::block_on(async {
// This panic should get propagated into the parent thread.
panic!("boom");
});
}
#[test]
fn nested_block_on() {
std::thread::spawn(|| {
smol::run(future::pending::<()>());
});
let x = smol::block_on(async {
let a = smol::block_on(async { smol::block_on(async { future::ready(3).await }) });
let b = smol::block_on(async { smol::block_on(async { future::ready(2).await }) });
a + b
});
assert_eq!(x, 3 + 2);
}

View File

@ -1,23 +0,0 @@
#[test]
fn spawn() {
assert_eq!(42, smol::run(smol::Task::spawn(async { 42 })));
}
#[test]
fn spawn_detach() {
let (s, r) = piper::chan(1);
smol::Task::spawn(async move { s.send(()).await }).detach();
assert_eq!(Some(()), smol::run(r.recv()));
}
#[test]
fn blocking() {
assert_eq!(42, smol::run(smol::Task::blocking(async { 42 })));
}
#[test]
fn blocking_detach() {
let (s, r) = piper::chan(1);
smol::Task::blocking(async move { s.send(()).await }).detach();
assert_eq!(Some(()), smol::run(r.recv()));
}

View File

@ -1,25 +0,0 @@
use smol::{self, Timer};
use std::time::{Duration, Instant};
#[test]
fn timer_at() {
let before = smol::run(async {
let now = Instant::now();
let when = now + Duration::from_secs(1);
Timer::at(when).await;
now
});
assert!(before.elapsed() >= Duration::from_secs(1));
}
#[test]
fn timer_after() {
let before = smol::run(async {
let now = Instant::now();
Timer::after(Duration::from_secs(1)).await;
now
});
assert!(before.elapsed() >= Duration::from_secs(1));
}