This commit is contained in:
Stjepan Glavina 2020-07-14 21:19:01 +02:00
commit f61db4fcbe
38 changed files with 400 additions and 4602 deletions

View File

@ -11,7 +11,6 @@ documentation = "https://docs.rs/smol"
keywords = ["async", "await", "future", "io", "networking"]
categories = ["asynchronous", "concurrency", "network-programming"]
readme = "README.md"
autoexamples = false
[features]
# Optional feature for seamless integration with crates depending on tokio.
@ -25,17 +24,11 @@ autoexamples = false
tokio02 = ["tokio"]
[dependencies]
async-task = "3.0.0"
blocking = "0.4.6"
concurrent-queue = "1.1.1"
fastrand = "1.2.4"
futures-io = { version = "0.3.5", default-features = false, features = ["std"] }
futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] }
libc = "0.2.71"
async-io = { path = "../async-io" }
multitask = "0.2.0"
num_cpus = "1.13.0"
once_cell = "1.4.0"
scoped-tls = "1.0.0"
slab = "0.4.2"
socket2 = { version = "0.3.12", features = ["pair", "unix"] }
blocking = "0.4.6"
[dependencies.tokio]
version = "0.2"
@ -43,23 +36,28 @@ default-features = false
features = ["rt-threaded"]
optional = true
[target.'cfg(windows)'.dependencies]
wepoll-sys-stjepang = "1.0.6"
winapi = { version = "0.3.8", features = ["ioapiset"] }
[dev-dependencies]
criterion = "0.3.2"
futures = { version = "0.3.5", default-features = false, features = ["std"] }
anyhow = "1.0.28"
async-channel = "1.1.1"
async-dup = "1.1.0"
async-h1 = "1.1.2"
async-native-tls = "0.3.3"
async-net = { path = "../async-net" }
async-std = "1.5.0"
async-tungstenite = { version = "0.4.2", features = ["async-native-tls"] }
base64 = "0.12.0"
ctrlc = "3.1.4"
futures = "0.3.4"
http = "0.2.1"
http-types = "1.2.0"
hyper = { version = "0.13.5", default-features = false, features = ["stream"] }
native-tls = "0.2.4"
num_cpus = "1.13.0"
piper = "0.1.3"
reqwest = "0.10.4"
scraper = "0.11.0"
signal-hook = "0.1.13"
surf = { version = "2.0.0-alpha.1", default-features = false, features = ["h1-client"] }
tempfile = "3.1.0"
[workspace]
members = [
".",
"examples",
]
[[bench]]
name = "spawn"
harness = false
tokio = { version = "0.2", default-features = false }
tungstenite = "0.10.1"
url = "2.1.1"

View File

@ -11,12 +11,7 @@ https://docs.rs/smol)
[![Chat](https://img.shields.io/discord/701824908866617385.svg?logo=discord)](
https://discord.gg/x6m5Vvt)
A small and fast async runtime for Rust.
This runtime extends [the standard library][std] with async combinators
and is only 1500 lines of code long.
[std]: https://docs.rs/std
A small and fast executor.
Reading the [docs] or looking at the [examples] is a great way to start learning
async Rust.
@ -43,16 +38,6 @@ Read this [blog post](https://stjepang.github.io/2020/04/03/why-im-building-a-ne
* Tasks that support cancellation.
* Userspace timers.
## Examples
You need to be in the [examples] directory to run them:
```terminal
$ cd examples
$ ls
$ cargo run --example ctrl-c
```
## Compatibility
See [this example](./examples/other-runtimes.rs) for how to use smol with

View File

@ -1,21 +0,0 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use futures::future;
use smol::Task;
pub fn spawn_benchmark(c: &mut Criterion) {
std::thread::spawn(|| smol::run(future::pending::<()>()));
c.bench_function("spawn time", |b| {
b.iter(|| {
let x = black_box(5);
smol::block_on(async {
Task::spawn(async move {
let _ = x + 1;
})
.await;
});
})
});
}
criterion_group!(benches, spawn_benchmark);
criterion_main!(benches);

View File

@ -1,123 +0,0 @@
[package]
name = "smol-examples"
version = "0.0.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
publish = false
[target.'cfg(windows)'.dev-dependencies]
uds_windows = "0.1.4"
[target.'cfg(target_os = "linux")'.dev-dependencies]
inotify = { version = "0.8.2", default-features = false }
nix = "0.17.0"
timerfd = "1.1.1"
[dev-dependencies]
anyhow = "1.0.28"
async-h1 = "1.1.2"
async-native-tls = "0.3.3"
async-std = "1.5.0"
async-tungstenite = { version = "0.4.2", features = ["async-native-tls"] }
base64 = "0.12.0"
ctrlc = "3.1.4"
futures = "0.3.4"
http = "0.2.1"
http-types = "1.2.0"
hyper = { version = "0.13.5", default-features = false, features = ["stream"] }
native-tls = "0.2.4"
num_cpus = "1.13.0"
piper = "0.1.1"
reqwest = "0.10.4"
scraper = "0.11.0"
signal-hook = "0.1.13"
smol = { path = "../", features = ["tokio02"] }
surf = { version = "2.0.0-alpha.1", default-features = false, features = ["h1-client"] }
tempfile = "3.1.0"
tokio = { version = "0.2", default-features = false }
tungstenite = "0.10.1"
url = "2.1.1"
[[example]]
name = "async-h1-client"
path = "async-h1-client.rs"
[[example]]
name = "async-h1-server"
path = "async-h1-server.rs"
[[example]]
name = "chat-client"
path = "chat-client.rs"
[[example]]
name = "chat-server"
path = "chat-server.rs"
[[example]]
name = "ctrl-c"
path = "ctrl-c.rs"
[[example]]
name = "hyper-client"
path = "hyper-client.rs"
[[example]]
name = "hyper-server"
path = "hyper-server.rs"
[[example]]
name = "linux-inotify"
path = "linux-inotify.rs"
[[example]]
name = "linux-timerfd"
path = "linux-timerfd.rs"
[[example]]
name = "other-runtimes"
path = "other-runtimes.rs"
[[example]]
name = "simple-client"
path = "simple-client.rs"
[[example]]
name = "simple-server"
path = "simple-server.rs"
[[example]]
name = "tcp-client"
path = "tcp-client.rs"
[[example]]
name = "tcp-server"
path = "tcp-server.rs"
[[example]]
name = "tls-client"
path = "tls-client.rs"
[[example]]
name = "tls-server"
path = "tls-server.rs"
[[example]]
name = "unix-signal"
path = "unix-signal.rs"
[[example]]
name = "web-crawler"
path = "web-crawler.rs"
[[example]]
name = "websocket-client"
path = "websocket-client.rs"
[[example]]
name = "websocket-server"
path = "websocket-server.rs"
[[example]]
name = "windows-uds"
path = "windows-uds.rs"

View File

@ -1,12 +0,0 @@
## smol examples
You need to be in this directory to run examples:
```
$ cd examples
$ ls
$ cargo run --example ctrl-c
```
If you've got an example you'd like to see here,
please feel free to open an issue or submit a PR!

View File

@ -3,16 +3,14 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example async-h1-client
//! ```
use std::net::TcpStream;
use anyhow::{bail, Context as _, Error, Result};
use async_net::TcpStream;
use blocking::block_on;
use futures::prelude::*;
use http_types::{Method, Request, Response};
use smol::Async;
use url::Url;
/// Sends a request and fetches the response.
@ -26,7 +24,7 @@ async fn fetch(req: Request) -> Result<Response> {
// Connect to the host.
let addr = format!("{}:{}", host, port);
let stream = Async::<TcpStream>::connect(addr).await?;
let stream = TcpStream::connect(addr).await?;
// Send the request and wait for the response.
let resp = match req.url().scheme() {
@ -42,7 +40,7 @@ async fn fetch(req: Request) -> Result<Response> {
}
fn main() -> Result<()> {
smol::run(async {
block_on(async {
// Create a request.
let addr = "https://www.rust-lang.org";
let req = Request::new(Method::Get, Url::parse(addr)?);

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example async-h1-server
//! ```
//!
@ -14,15 +13,12 @@
//!
//! Refer to `README.md` to see how to the TLS certificate was generated.
use std::net::TcpListener;
use std::thread;
use anyhow::Result;
use async_native_tls::{Identity, TlsAcceptor};
use async_net::TcpListener;
use blocking::block_on;
use futures::prelude::*;
use http_types::{Request, Response, StatusCode};
use piper::{Arc, Mutex};
use smol::{Async, Task};
/// Serves a request and returns a response.
async fn serve(req: Request) -> http_types::Result<Response> {
@ -35,11 +31,11 @@ async fn serve(req: Request) -> http_types::Result<Response> {
}
/// Listens for incoming connections and serves them.
async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Result<()> {
async fn listen(listener: TcpListener, tls: Option<TlsAcceptor>) -> Result<()> {
// Format the full host address.
let host = match &tls {
None => format!("http://{}", listener.get_ref().local_addr()?),
Some(_) => format!("https://{}", listener.get_ref().local_addr()?),
None => format!("http://{}", listener.local_addr()?),
Some(_) => format!("https://{}", listener.local_addr()?),
};
println!("Listening on {}", host);
@ -51,8 +47,8 @@ async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Resul
// Spawn a background task serving this connection.
let task = match &tls {
None => {
let stream = Arc::new(stream);
Task::spawn(async move {
let stream = async_dup::Arc::new(stream);
smol::spawn(async move {
if let Err(err) = async_h1::accept(&host, stream, serve).await {
println!("Connection error: {:#?}", err);
}
@ -62,8 +58,8 @@ async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Resul
// In case of HTTPS, establish a secure TLS connection first.
match tls.accept(stream).await {
Ok(stream) => {
let stream = Arc::new(Mutex::new(stream));
Task::spawn(async move {
let stream = async_dup::Arc::new(async_dup::Mutex::new(stream));
smol::spawn(async move {
if let Err(err) = async_h1::accept(&host, stream, serve).await {
println!("Connection error: {:#?}", err);
}
@ -87,16 +83,10 @@ fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Create an executor thread pool.
let num_threads = num_cpus::get().max(1);
for _ in 0..num_threads {
thread::spawn(|| smol::run(future::pending::<()>()));
}
// Start HTTP and HTTPS servers.
smol::block_on(async {
let http = listen(Async::<TcpListener>::bind("127.0.0.1:8000")?, None);
let https = listen(Async::<TcpListener>::bind("127.0.0.1:8001")?, Some(tls));
block_on(async {
let http = listen(TcpListener::bind("127.0.0.1:8000").await?, None);
let https = listen(TcpListener::bind("127.0.0.1:8001").await?, Some(tls));
future::try_join(http, https).await?;
Ok(())
})

View File

@ -3,43 +3,39 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example chat-server
//! ```
//!
//! Then start clients:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example chat-client
//! ```
use std::net::TcpStream;
use async_net::TcpStream;
use blocking::{block_on, Unblock};
use futures::io;
use futures::prelude::*;
use smol::Async;
fn main() -> io::Result<()> {
smol::run(async {
block_on(async {
// Connect to the server and create async stdin and stdout.
let stream = Async::<TcpStream>::connect("127.0.0.1:6000").await?;
let stdin = smol::reader(std::io::stdin());
let mut stdout = smol::writer(std::io::stdout());
let stream = TcpStream::connect("127.0.0.1:6000").await?;
let stdin = Unblock::new(std::io::stdin());
let mut stdout = Unblock::new(std::io::stdout());
// Intro messages.
println!("Connected to {}", stream.get_ref().peer_addr()?);
println!("My nickname: {}", stream.get_ref().local_addr()?);
println!("Connected to {}", stream.peer_addr()?);
println!("My nickname: {}", stream.local_addr()?);
println!("Type a message and hit enter!\n");
// References to `Async<T>` also implement `AsyncRead` and `AsyncWrite`.
let stream_r = &stream;
let mut stream_w = &stream;
let reader = &stream;
let mut writer = stream.clone();
// Wait until the standard input is closed or the connection is closed.
futures::select! {
_ = io::copy(stdin, &mut stream_w).fuse() => println!("Quit!"),
_ = io::copy(stream_r, &mut stdout).fuse() => println!("Server disconnected!"),
_ = io::copy(stdin, &mut writer).fuse() => println!("Quit!"),
_ = io::copy(reader, &mut stdout).fuse() => println!("Server disconnected!"),
}
Ok(())

View File

@ -3,31 +3,28 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example chat-server
//! ```
//!
//! Then start clients:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example chat-client
//! ```
use std::collections::HashMap;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::net::SocketAddr;
use async_channel::{bounded, Receiver, Sender};
use async_net::{TcpListener, TcpStream};
use blocking::block_on;
use futures::io::{self, BufReader};
use futures::prelude::*;
use piper::{Arc, Receiver, Sender};
use smol::{Async, Task};
type Client = Arc<Async<TcpStream>>;
/// An event on the chat server.
enum Event {
/// A client has joined.
Join(SocketAddr, Client),
Join(SocketAddr, TcpStream),
/// A client has left.
Leave(SocketAddr),
@ -39,10 +36,10 @@ enum Event {
/// Dispatches events to clients.
async fn dispatch(receiver: Receiver<Event>) -> io::Result<()> {
// Currently active clients.
let mut map = HashMap::<SocketAddr, Client>::new();
let mut map = HashMap::<SocketAddr, TcpStream>::new();
// Receive incoming events.
while let Some(event) = receiver.recv().await {
while let Ok(event) = receiver.recv().await {
// Process the event and format a message to send to clients.
let output = match event {
Event::Join(addr, stream) => {
@ -69,46 +66,46 @@ async fn dispatch(receiver: Receiver<Event>) -> io::Result<()> {
}
/// Reads messages from the client and forwards them to the dispatcher task.
async fn read_messages(sender: Sender<Event>, client: Client) -> io::Result<()> {
let addr = client.get_ref().peer_addr()?;
async fn read_messages(sender: Sender<Event>, client: TcpStream) -> io::Result<()> {
let addr = client.peer_addr()?;
let mut lines = BufReader::new(client).lines();
while let Some(line) = lines.next().await {
let line = line?;
sender.send(Event::Message(addr, line)).await;
let _ = sender.send(Event::Message(addr, line)).await;
}
Ok(())
}
fn main() -> io::Result<()> {
smol::run(async {
block_on(async {
// Create a listener for incoming client connections.
let listener = Async::<TcpListener>::bind("127.0.0.1:6000")?;
let listener = TcpListener::bind("127.0.0.1:6000").await?;
// Intro messages.
println!("Listening on {}", listener.get_ref().local_addr()?);
println!("Listening on {}", listener.local_addr()?);
println!("Start a chat client now!\n");
// Spawn a background task that dispatches events to clients.
let (sender, receiver) = piper::chan(100);
Task::spawn(dispatch(receiver)).unwrap().detach();
let (sender, receiver) = bounded(100);
smol::spawn(dispatch(receiver)).detach();
loop {
// Accept the next connection.
let (stream, addr) = listener.accept().await?;
let client = Arc::new(stream);
let client = stream;
let sender = sender.clone();
// Spawn a background task reading messages from the client.
Task::spawn(async move {
smol::spawn(async move {
// Client starts with a `Join` event.
sender.send(Event::Join(addr, client.clone())).await;
let _ = sender.send(Event::Join(addr, client.clone())).await;
// Read messages from the client and ignore I/O errors when the client quits.
let _ = read_messages(sender.clone(), client).await;
// Client ends with a `Leave` event.
sender.send(Event::Leave(addr)).await;
let _ = sender.send(Event::Leave(addr)).await;
})
.detach();
}

View File

@ -3,25 +3,25 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example ctrl-c
//! ```
use blocking::block_on;
use futures::prelude::*;
fn main() {
// Set a handler that sends a message through a channel.
let (s, ctrl_c) = piper::chan(100);
let (s, ctrl_c) = async_channel::bounded(100);
let handle = move || {
let _ = s.send(()).now_or_never();
};
ctrlc::set_handler(handle).unwrap();
smol::run(async {
block_on(async {
println!("Waiting for Ctrl-C...");
// Receive a message that indicates the Ctrl-C signal occurred.
ctrl_c.recv().await;
let _ = ctrl_c.recv().await;
println!("Done!");
})

View File

@ -3,21 +3,21 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example hyper-client
//! ```
use std::io;
use std::net::{Shutdown, TcpStream};
use std::net::Shutdown;
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::{bail, Context as _, Error, Result};
use async_native_tls::TlsStream;
use async_net::TcpStream;
use blocking::block_on;
use futures::prelude::*;
use http::Uri;
use hyper::{Body, Client, Request, Response};
use smol::{Async, Task};
/// Sends a request and fetches the response.
async fn fetch(req: Request<Body>) -> Result<Response<Body>> {
@ -29,7 +29,7 @@ async fn fetch(req: Request<Body>) -> Result<Response<Body>> {
}
fn main() -> Result<()> {
smol::run(async {
block_on(async {
// Create a request.
let req = Request::get("https://www.rust-lang.org").body(Body::empty())?;
@ -57,7 +57,7 @@ struct SmolExecutor;
impl<F: Future + Send + 'static> hyper::rt::Executor<F> for SmolExecutor {
fn execute(&self, fut: F) {
Task::spawn(async { drop(fut.await) }).detach();
smol::spawn(async { drop(fut.await) }).detach();
}
}
@ -81,13 +81,13 @@ impl hyper::service::Service<Uri> for SmolConnector {
match uri.scheme_str() {
Some("http") => {
let addr = format!("{}:{}", uri.host().unwrap(), uri.port_u16().unwrap_or(80));
let stream = Async::<TcpStream>::connect(addr).await?;
let stream = TcpStream::connect(addr).await?;
Ok(SmolStream::Plain(stream))
}
Some("https") => {
// In case of HTTPS, establish a secure TLS connection first.
let addr = format!("{}:{}", uri.host().unwrap(), uri.port_u16().unwrap_or(443));
let stream = Async::<TcpStream>::connect(addr).await?;
let stream = TcpStream::connect(addr).await?;
let stream = async_native_tls::connect(host, stream).await?;
Ok(SmolStream::Tls(stream))
}
@ -100,10 +100,10 @@ impl hyper::service::Service<Uri> for SmolConnector {
/// A TCP or TCP+TLS connection.
enum SmolStream {
/// A plain TCP connection.
Plain(Async<TcpStream>),
Plain(TcpStream),
/// A TCP connection secured by TLS.
Tls(TlsStream<Async<TcpStream>>),
Tls(TlsStream<TcpStream>),
}
impl hyper::client::connect::Connection for SmolStream {
@ -147,7 +147,7 @@ impl tokio::io::AsyncWrite for SmolStream {
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self {
SmolStream::Plain(s) => {
s.get_ref().shutdown(Shutdown::Write)?;
s.shutdown(Shutdown::Write)?;
Poll::Ready(Ok(()))
}
SmolStream::Tls(s) => Pin::new(s).poll_close(cx),

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example hyper-server
//! ```
//!
@ -15,17 +14,17 @@
//! Refer to `README.md` to see how to the TLS certificate was generated.
use std::io;
use std::net::{Shutdown, TcpListener, TcpStream};
use std::net::Shutdown;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use anyhow::{Error, Result};
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use async_net::{TcpListener, TcpStream};
use blocking::block_on;
use futures::prelude::*;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use smol::{Async, Task};
/// Serves a request and returns a response.
async fn serve(req: Request<Body>, host: String) -> Result<Response<Body>> {
@ -34,11 +33,11 @@ async fn serve(req: Request<Body>, host: String) -> Result<Response<Body>> {
}
/// Listens for incoming connections and serves them.
async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Result<()> {
async fn listen(listener: TcpListener, tls: Option<TlsAcceptor>) -> Result<()> {
// Format the full host address.
let host = &match tls {
None => format!("http://{}", listener.get_ref().local_addr()?),
Some(_) => format!("https://{}", listener.get_ref().local_addr()?),
None => format!("http://{}", listener.local_addr()?),
Some(_) => format!("https://{}", listener.local_addr()?),
};
println!("Listening on {}", host);
@ -59,15 +58,10 @@ fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Create an executor thread pool.
for _ in 0..num_cpus::get().max(1) {
thread::spawn(|| smol::run(future::pending::<()>()));
}
// Start HTTP and HTTPS servers.
smol::block_on(async {
let http = listen(Async::<TcpListener>::bind("127.0.0.1:8000")?, None);
let https = listen(Async::<TcpListener>::bind("127.0.0.1:8001")?, Some(tls));
block_on(async {
let http = listen(TcpListener::bind("127.0.0.1:8000").await?, None);
let https = listen(TcpListener::bind("127.0.0.1:8001").await?, Some(tls));
future::try_join(http, https).await?;
Ok(())
})
@ -79,18 +73,18 @@ struct SmolExecutor;
impl<F: Future + Send + 'static> hyper::rt::Executor<F> for SmolExecutor {
fn execute(&self, fut: F) {
Task::spawn(async { drop(fut.await) }).detach();
smol::spawn(async { drop(fut.await) }).detach();
}
}
/// Listens for incoming connections.
struct SmolListener {
listener: Async<TcpListener>,
listener: TcpListener,
tls: Option<TlsAcceptor>,
}
impl SmolListener {
fn new(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Self {
fn new(listener: TcpListener, tls: Option<TlsAcceptor>) -> Self {
Self { listener, tls }
}
}
@ -127,13 +121,13 @@ impl hyper::server::accept::Accept for SmolListener {
/// A TCP or TCP+TLS connection.
enum SmolStream {
/// A plain TCP connection.
Plain(Async<TcpStream>),
Plain(TcpStream),
/// A TCP connection secured by TLS.
Tls(TlsStream<Async<TcpStream>>),
Tls(TlsStream<TcpStream>),
/// A TCP connection that is in process of getting secured by TLS.
Handshake(future::BoxFuture<'static, io::Result<TlsStream<Async<TcpStream>>>>),
Handshake(future::BoxFuture<'static, io::Result<TlsStream<TcpStream>>>),
}
impl hyper::client::connect::Connection for SmolStream {
@ -190,7 +184,7 @@ impl tokio::io::AsyncWrite for SmolStream {
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut *self {
SmolStream::Plain(s) => {
s.get_ref().shutdown(Shutdown::Write)?;
s.shutdown(Shutdown::Write)?;
Poll::Ready(Ok(()))
}
SmolStream::Tls(s) => Pin::new(s).poll_close(cx),

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example linux-inotify
//! ```

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example linux-timerfd
//! ```

View File

@ -13,16 +13,16 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example other-runtimes
//! ```
use std::time::{Duration, Instant};
use anyhow::{Error, Result};
use blocking::block_on;
fn main() -> Result<()> {
smol::run(async {
block_on(async {
// Sleep using async-std.
let start = Instant::now();
println!("Sleeping using async-std...");

View File

@ -6,11 +6,10 @@
//! cargo run --example simple-client
//! ```
use std::net::TcpStream;
use anyhow::{bail, Context as _, Result};
use async_net::TcpStream;
use blocking::block_on;
use futures::prelude::*;
use smol::Async;
use url::Url;
/// Sends a GET request and fetches the response.
@ -32,7 +31,7 @@ async fn fetch(addr: &str) -> Result<Vec<u8>> {
);
// Connect to the host.
let mut stream = Async::<TcpStream>::connect(format!("{}:{}", host, port)).await?;
let mut stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
// Send the request and wait for the response.
let mut resp = Vec::new();
@ -54,7 +53,7 @@ async fn fetch(addr: &str) -> Result<Vec<u8>> {
}
fn main() -> Result<()> {
smol::run(async {
block_on(async {
let addr = "https://www.rust-lang.org";
let resp = fetch(addr).await?;
println!("{}", String::from_utf8_lossy(&resp));

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example simple-server
//! ```
//!
@ -14,13 +13,11 @@
//!
//! Refer to `README.md` to see how to the TLS certificate was generated.
use std::net::{TcpListener, TcpStream};
use std::thread;
use anyhow::Result;
use async_native_tls::{Identity, TlsAcceptor};
use async_net::{TcpListener, TcpStream};
use blocking::block_on;
use futures::prelude::*;
use smol::{Async, Task};
const RESPONSE: &[u8] = br#"
HTTP/1.1 200 OK
@ -31,14 +28,14 @@ Content-Length: 47
"#;
/// Reads a request from the client and sends it a response.
async fn serve(mut stream: Async<TcpStream>, tls: Option<TlsAcceptor>) -> Result<()> {
async fn serve(mut stream: TcpStream, tls: Option<TlsAcceptor>) -> Result<()> {
match tls {
None => {
println!("Serving http://{}", stream.get_ref().local_addr()?);
println!("Serving http://{}", stream.local_addr()?);
stream.write_all(RESPONSE).await?;
}
Some(tls) => {
println!("Serving https://{}", stream.get_ref().local_addr()?);
println!("Serving https://{}", stream.local_addr()?);
// In case of HTTPS, establish a secure TLS connection first.
match tls.accept(stream).await {
@ -55,11 +52,11 @@ async fn serve(mut stream: Async<TcpStream>, tls: Option<TlsAcceptor>) -> Result
}
/// Listens for incoming connections and serves them.
async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Result<()> {
async fn listen(listener: TcpListener, tls: Option<TlsAcceptor>) -> Result<()> {
// Display the full host address.
match &tls {
None => println!("Listening on http://{}", listener.get_ref().local_addr()?),
Some(_) => println!("Listening on https://{}", listener.get_ref().local_addr()?),
None => println!("Listening on http://{}", listener.local_addr()?),
Some(_) => println!("Listening on https://{}", listener.local_addr()?),
}
loop {
@ -68,7 +65,7 @@ async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Resul
let tls = tls.clone();
// Spawn a background task serving this connection.
Task::spawn(async move {
smol::spawn(async move {
if let Err(err) = serve(stream, tls).await {
println!("Connection error: {:#?}", err);
}
@ -82,16 +79,10 @@ fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Create an executor thread pool.
let num_threads = num_cpus::get().max(1);
for _ in 0..num_threads {
thread::spawn(|| smol::run(future::pending::<()>()));
}
// Start HTTP and HTTPS servers.
smol::run(async {
let http = listen(Async::<TcpListener>::bind("127.0.0.1:8000")?, None);
let https = listen(Async::<TcpListener>::bind("127.0.0.1:8001")?, Some(tls));
block_on(async {
let http = listen(TcpListener::bind("127.0.0.1:8000").await?, None);
let https = listen(TcpListener::bind("127.0.0.1:8001").await?, Some(tls));
future::try_join(http, https).await?;
Ok(())
})

View File

@ -3,32 +3,29 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tcp-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tcp-client
//! ```
use std::net::TcpStream;
use async_net::TcpStream;
use blocking::{block_on, Unblock};
use futures::io;
use futures::prelude::*;
use smol::Async;
fn main() -> io::Result<()> {
smol::run(async {
block_on(async {
// Create async stdin and stdout handles.
let stdin = smol::reader(std::io::stdin());
let mut stdout = smol::writer(std::io::stdout());
let stdin = Unblock::new(std::io::stdin());
let mut stdout = Unblock::new(std::io::stdout());
// Connect to the server.
let stream = Async::<TcpStream>::connect("127.0.0.1:7000").await?;
println!("Connected to {}", stream.get_ref().peer_addr()?);
let stream = TcpStream::connect("127.0.0.1:7000").await?;
println!("Connected to {}", stream.peer_addr()?);
println!("Type a message and hit enter!\n");
// Pipe messages from stdin to the server and pipe messages from the server to stdout.

View File

@ -3,33 +3,30 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tcp-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tcp-client
//! ```
use std::net::{TcpListener, TcpStream};
use async_net::{TcpListener, TcpStream};
use blocking::block_on;
use futures::io;
use smol::{Async, Task};
/// Echoes messages from the client back to it.
async fn echo(stream: Async<TcpStream>) -> io::Result<()> {
io::copy(&stream, &mut &stream).await?;
async fn echo(mut stream: TcpStream) -> io::Result<()> {
io::copy(stream.clone(), &mut stream).await?;
Ok(())
}
fn main() -> io::Result<()> {
smol::run(async {
block_on(async {
// Create a listener.
let listener = Async::<TcpListener>::bind("127.0.0.1:7000")?;
println!("Listening on {}", listener.get_ref().local_addr()?);
let listener = TcpListener::bind("127.0.0.1:7000").await?;
println!("Listening on {}", listener.local_addr()?);
println!("Now start a TCP client.");
// Accept clients in a loop.
@ -38,7 +35,7 @@ fn main() -> io::Result<()> {
println!("Accepted client: {}", peer_addr);
// Spawn a task that echoes messages from the client back to it.
Task::spawn(echo(stream)).unwrap().detach();
smol::spawn(echo(stream)).detach();
}
})
}

View File

@ -3,25 +3,21 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tls-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tls-client
//! ```
use std::net::TcpStream;
use anyhow::Result;
use async_native_tls::{Certificate, TlsConnector};
use async_net::TcpStream;
use blocking::{block_on, Unblock};
use futures::io;
use futures::prelude::*;
use piper::Mutex;
use smol::Async;
fn main() -> Result<()> {
// Initialize TLS with the local certificate.
@ -29,19 +25,19 @@ fn main() -> Result<()> {
builder.add_root_certificate(Certificate::from_pem(include_bytes!("certificate.pem"))?);
let tls = TlsConnector::from(builder);
smol::run(async {
block_on(async {
// Create async stdin and stdout handles.
let stdin = smol::reader(std::io::stdin());
let mut stdout = smol::writer(std::io::stdout());
let stdin = Unblock::new(std::io::stdin());
let mut stdout = Unblock::new(std::io::stdout());
// Connect to the server.
let stream = Async::<TcpStream>::connect("127.0.0.1:7001").await?;
let stream = TcpStream::connect("127.0.0.1:7001").await?;
let stream = tls.connect("127.0.0.1", stream).await?;
println!("Connected to {}", stream.get_ref().get_ref().peer_addr()?);
println!("Connected to {}", stream.get_ref().peer_addr()?);
println!("Type a message and hit enter!\n");
// Pipe messages from stdin to the server and pipe messages from the server to stdout.
let stream = Mutex::new(stream);
let stream = async_dup::Mutex::new(stream);
future::try_join(
io::copy(stdin, &mut &stream),
io::copy(&stream, &mut stdout),

View File

@ -3,28 +3,24 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tls-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example tls-client
//! ```
use std::net::{TcpListener, TcpStream};
use anyhow::Result;
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use async_net::{TcpListener, TcpStream};
use blocking::block_on;
use futures::io;
use piper::Mutex;
use smol::{Async, Task};
/// Echoes messages from the client back to it.
async fn echo(stream: TlsStream<Async<TcpStream>>) -> Result<()> {
let stream = Mutex::new(stream);
async fn echo(stream: TlsStream<TcpStream>) -> Result<()> {
let stream = async_dup::Mutex::new(stream);
io::copy(&stream, &mut &stream).await?;
Ok(())
}
@ -34,23 +30,20 @@ fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
smol::run(async {
block_on(async {
// Create a listener.
let listener = Async::<TcpListener>::bind("127.0.0.1:7001")?;
println!("Listening on {}", listener.get_ref().local_addr()?);
let listener = TcpListener::bind("127.0.0.1:7001").await?;
println!("Listening on {}", listener.local_addr()?);
println!("Now start a TLS client.");
// Accept clients in a loop.
loop {
let (stream, _) = listener.accept().await?;
let stream = tls.accept(stream).await?;
println!(
"Accepted client: {}",
stream.get_ref().get_ref().peer_addr()?
);
println!("Accepted client: {}", stream.get_ref().peer_addr()?);
// Spawn a task that echoes messages from the client back to it.
Task::spawn(echo(stream)).unwrap().detach();
smol::spawn(echo(stream)).detach();
}
})
}

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example unix-signal
//! ```
@ -11,10 +10,11 @@
fn main() -> std::io::Result<()> {
use std::os::unix::net::UnixStream;
use async_io::Async;
use blocking::block_on;
use futures::prelude::*;
use smol::Async;
smol::run(async {
block_on(async {
// Create a Unix stream that receives a byte on each signal occurrence.
let (a, mut b) = Async::<UnixStream>::pair()?;
signal_hook::pipe::register(signal_hook::SIGINT, a)?;

View File

@ -3,16 +3,15 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example web-crawler
//! ```
use std::collections::{HashSet, VecDeque};
use anyhow::Result;
use piper::Sender;
use async_channel::{bounded, Sender};
use blocking::block_on;
use scraper::{Html, Selector};
use smol::Task;
const ROOT: &str = "https://www.rust-lang.org";
@ -20,7 +19,7 @@ const ROOT: &str = "https://www.rust-lang.org";
async fn fetch(url: String, sender: Sender<String>) {
let body = surf::get(&url).recv_string().await;
let body = body.unwrap_or_default();
sender.send(body).await;
let _ = sender.send(body).await;
}
/// Extracts links from a HTML body.
@ -35,26 +34,26 @@ fn links(body: String) -> Vec<String> {
}
fn main() -> Result<()> {
smol::run(async {
block_on(async {
let mut seen = HashSet::new();
let mut queue = VecDeque::new();
seen.insert(ROOT.to_string());
queue.push_back(ROOT.to_string());
let (s, r) = piper::chan(200);
let (s, r) = bounded(200);
let mut tasks = 0;
// Loop while the queue is not empty or tasks are fetching pages.
while queue.len() + tasks > 0 {
// Limit the number of concurrent tasks.
while tasks < s.capacity() {
while tasks < s.capacity().unwrap() {
// Process URLs in the queue and fetch more pages.
match queue.pop_front() {
None => break,
Some(url) => {
println!("{}", url);
tasks += 1;
Task::spawn(fetch(url, s.clone())).detach();
smol::spawn(fetch(url, s.clone())).detach();
}
}
}

View File

@ -3,26 +3,24 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example websocket-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example websocket-client
//! ```
use std::net::TcpStream;
use std::pin::Pin;
use std::task::{Context, Poll};
use anyhow::{bail, Context as _, Result};
use async_native_tls::{Certificate, TlsConnector, TlsStream};
use async_net::TcpStream;
use async_tungstenite::WebSocketStream;
use blocking::block_on;
use futures::prelude::*;
use smol::Async;
use tungstenite::handshake::client::Response;
use tungstenite::Message;
use url::Url;
@ -37,13 +35,13 @@ async fn connect(addr: &str, tls: TlsConnector) -> Result<(WsStream, Response)>
// Connect to the address.
match url.scheme() {
"ws" => {
let stream = Async::<TcpStream>::connect(format!("{}:{}", host, port)).await?;
let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
let (stream, resp) = async_tungstenite::client_async(addr, stream).await?;
Ok((WsStream::Plain(stream), resp))
}
"wss" => {
// In case of WSS, establish a secure TLS connection first.
let stream = Async::<TcpStream>::connect(format!("{}:{}", host, port)).await?;
let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
let stream = tls.connect(host, stream).await?;
let (stream, resp) = async_tungstenite::client_async(addr, stream).await?;
Ok((WsStream::Tls(stream), resp))
@ -58,7 +56,7 @@ fn main() -> Result<()> {
builder.add_root_certificate(Certificate::from_pem(include_bytes!("certificate.pem"))?);
let tls = TlsConnector::from(builder);
smol::run(async {
block_on(async {
// Connect to the server.
let (mut stream, resp) = connect("wss://127.0.0.1:9001", tls).await?;
dbg!(resp);
@ -74,10 +72,10 @@ fn main() -> Result<()> {
/// A WebSocket or WebSocket+TLS connection.
enum WsStream {
/// A plain WebSocket connection.
Plain(WebSocketStream<Async<TcpStream>>),
Plain(WebSocketStream<TcpStream>),
/// A WebSocket connection secured by TLS.
Tls(WebSocketStream<TlsStream<Async<TcpStream>>>),
Tls(WebSocketStream<TlsStream<TcpStream>>),
}
impl Sink<Message> for WsStream {

View File

@ -3,27 +3,24 @@
//! First start a server:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example websocket-server
//! ```
//!
//! Then start a client:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example websocket-client
//! ```
use std::net::{TcpListener, TcpStream};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use anyhow::{Context as _, Result};
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use async_net::{TcpListener, TcpStream};
use async_tungstenite::WebSocketStream;
use blocking::block_on;
use futures::prelude::*;
use smol::{Async, Task};
use tungstenite::Message;
/// Echoes messages from the client back to it.
@ -34,28 +31,28 @@ async fn echo(mut stream: WsStream) -> Result<()> {
}
/// Listens for incoming connections and serves them.
async fn listen(listener: Async<TcpListener>, tls: Option<TlsAcceptor>) -> Result<()> {
async fn listen(listener: TcpListener, tls: Option<TlsAcceptor>) -> Result<()> {
let host = match &tls {
None => format!("ws://{}", listener.get_ref().local_addr()?),
Some(_) => format!("wss://{}", listener.get_ref().local_addr()?),
None => format!("ws://{}", listener.local_addr()?),
Some(_) => format!("wss://{}", listener.local_addr()?),
};
println!("Listening on {}", host);
loop {
// Accept the next connection.
let (stream, _) = listener.accept().await?;
println!("Accepted client: {}", stream.get_ref().peer_addr()?);
println!("Accepted client: {}", stream.peer_addr()?);
match &tls {
None => {
let stream = WsStream::Plain(async_tungstenite::accept_async(stream).await?);
Task::spawn(echo(stream)).unwrap().detach();
smol::spawn(echo(stream)).detach();
}
Some(tls) => {
// In case of WSS, establish a secure TLS connection first.
let stream = tls.accept(stream).await?;
let stream = WsStream::Tls(async_tungstenite::accept_async(stream).await?);
Task::spawn(echo(stream)).unwrap().detach();
smol::spawn(echo(stream)).detach();
}
}
}
@ -66,15 +63,10 @@ fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Create an executor thread pool.
for _ in 0..num_cpus::get().max(1) {
thread::spawn(|| smol::run(future::pending::<()>()));
}
// Start WS and WSS servers.
smol::block_on(async {
let ws = listen(Async::<TcpListener>::bind("127.0.0.1:9000")?, None);
let wss = listen(Async::<TcpListener>::bind("127.0.0.1:9001")?, Some(tls));
block_on(async {
let ws = listen(TcpListener::bind("127.0.0.1:9000").await?, None);
let wss = listen(TcpListener::bind("127.0.0.1:9001").await?, Some(tls));
future::try_join(ws, wss).await?;
Ok(())
})
@ -83,10 +75,10 @@ fn main() -> Result<()> {
/// A WebSocket or WebSocket+TLS connection.
enum WsStream {
/// A plain WebSocket connection.
Plain(WebSocketStream<Async<TcpStream>>),
Plain(WebSocketStream<TcpStream>),
/// A WebSocket connection secured by TLS.
Tls(WebSocketStream<TlsStream<Async<TcpStream>>>),
Tls(WebSocketStream<TlsStream<TcpStream>>),
}
impl Sink<Message> for WsStream {

View File

@ -3,7 +3,6 @@
//! Run with:
//!
//! ```
//! cd examples # make sure to be in this directory
//! cargo run --example windows-uds
//! ```

File diff suppressed because it is too large Load Diff

View File

@ -1,45 +0,0 @@
//! Implementation of [`block_on()`].
//!
//! This is equivalent to [`futures::executor::block_on()`], but slightly more efficient.
//!
//! The implementation is explained in detail in [*Build your own block_on()*][blog-post].
//!
//! [`futures::executor::block_on()`]: https://docs.rs/futures/0.3/futures/executor/fn.block_on.html
//! [blog-post]: https://stjepang.github.io/2020/01/25/build-your-own-block-on.html
use std::future::Future;
use crate::context;
/// Blocks on a single future.
///
/// This function polls the future in a loop, parking the current thread after each step to wait
/// until its waker is woken.
///
/// Unlike [`run()`], it does not run executors or poll the reactor!
///
/// You can think of it as the easiest and most efficient way of turning an async operation into a
/// blocking operation.
///
/// # Examples
///
/// ```
/// use futures::future;
/// use smol::{Async, Timer};
/// use std::thread;
/// use std::time::Duration;
///
/// // Run executors and the reactor on a separeate thread, forever.
/// thread::spawn(|| smol::run(future::pending::<()>()));
///
/// smol::block_on(async {
/// // Sleep for a second.
/// // This timer only works because there's a thread calling `run()`.
/// Timer::after(Duration::from_secs(1)).await;
/// })
/// ```
///
/// [`run()`]: `crate::run()`
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
context::enter(|| blocking::block_on(future))
}

View File

@ -1,314 +0,0 @@
//! The blocking executor.
//!
//! Tasks created by [`Task::blocking()`] go into this executor. This executor is independent of
//! [`run()`][`crate::run()`] - it does not need to be driven.
//!
//! Blocking tasks are allowed to block without restrictions. However, the executor puts a limit on
//! the number of concurrently running tasks. Once that limit is hit, a task will need to complete
//! or yield in order for others to run.
//!
//! In idle state, this executor has no threads and consumes no resources. Once tasks are spawned,
//! new threads will get started, as many as is needed to keep up with the present amount of work.
//! When threads are idle, they wait for some time for new work to come in and shut down after a
//! certain timeout.
//!
//! This module also implements convenient adapters:
//!
//! - [`blocking!`] as syntax sugar around [`Task::blocking()`]
//! - [`iter()`] converts an [`Iterator`] into a [`Stream`]
//! - [`reader()`] converts a [`Read`] into an [`AsyncRead`]
//! - [`writer()`] converts a [`Write`] into an [`AsyncWrite`]
use std::collections::VecDeque;
use std::future::Future;
use std::io::{Read, Write};
use std::panic;
use std::sync::{Condvar, Mutex, MutexGuard};
use std::thread;
use std::time::Duration;
use futures_util::io::{AsyncRead, AsyncWrite};
use futures_util::stream::Stream;
use once_cell::sync::Lazy;
use crate::context;
use crate::task::{Runnable, Task};
/// The blocking executor.
pub(crate) struct BlockingExecutor {
/// The current state of the executor.
state: Mutex<State>,
/// Used to put idle threads to sleep and wake them up when new work comes in.
cvar: Condvar,
}
/// Current state of the blocking executor.
struct State {
/// Number of idle threads in the pool.
///
/// Idle threads are sleeping, waiting to get a task to run.
idle_count: usize,
/// Total number of thread in the pool.
///
/// This is the number of idle threads + the number of active threads.
thread_count: usize,
/// The queue of blocking tasks.
queue: VecDeque<Runnable>,
}
impl BlockingExecutor {
/// Returns a reference to the blocking executor.
pub fn get() -> &'static BlockingExecutor {
static EXECUTOR: Lazy<BlockingExecutor> = Lazy::new(|| BlockingExecutor {
state: Mutex::new(State {
idle_count: 0,
thread_count: 0,
queue: VecDeque::new(),
}),
cvar: Condvar::new(),
});
&EXECUTOR
}
/// Spawns a future onto this executor.
///
/// Returns a [`Task`] handle for the spawned task.
pub fn spawn<T: Send + 'static>(
&'static self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
// Create a task, schedule it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn(future, move |r| self.schedule(r), ());
runnable.schedule();
Task(Some(handle))
}
/// Runs the main loop on the current thread.
///
/// This function runs blocking tasks until it becomes idle and times out.
fn main_loop(&'static self) {
let mut state = self.state.lock().unwrap();
loop {
// This thread is not idle anymore because it's going to run tasks.
state.idle_count -= 1;
// Run tasks in the queue.
while let Some(runnable) = state.queue.pop_front() {
// We have found a task - grow the pool if needed.
self.grow_pool(state);
// Run the task.
let _ = panic::catch_unwind(|| runnable.run());
// Re-lock the state and continue.
state = self.state.lock().unwrap();
}
// This thread is now becoming idle.
state.idle_count += 1;
// Put the thread to sleep until another task is scheduled.
let timeout = Duration::from_millis(500);
let (s, res) = self.cvar.wait_timeout(state, timeout).unwrap();
state = s;
// If there are no tasks after a while, stop this thread.
if res.timed_out() && state.queue.is_empty() {
state.idle_count -= 1;
state.thread_count -= 1;
break;
}
}
}
/// Schedules a runnable task for execution.
fn schedule(&'static self, runnable: Runnable) {
let mut state = self.state.lock().unwrap();
state.queue.push_back(runnable);
// Notify a sleeping thread and spawn more threads if needed.
self.cvar.notify_one();
self.grow_pool(state);
}
/// Spawns more blocking threads if the pool is overloaded with work.
fn grow_pool(&'static self, mut state: MutexGuard<'static, State>) {
// If runnable tasks greatly outnumber idle threads and there aren't too many threads
// already, then be aggressive: wake all idle threads and spawn one more thread.
while state.queue.len() > state.idle_count * 5 && state.thread_count < 500 {
// The new thread starts in idle state.
state.idle_count += 1;
state.thread_count += 1;
// Notify all existing idle threads because we need to hurry up.
self.cvar.notify_all();
// Spawn the new thread.
thread::spawn(move || {
// If enabled, set up tokio before the main loop begins.
context::enter(|| self.main_loop())
});
}
}
}
/// Spawns blocking code onto a thread.
///
/// Note that `blocking!(expr)` is just syntax sugar for
/// `Task::blocking(async move { expr }).await`.
///
/// # Examples
///
/// Read a file into a string:
///
/// ```no_run
/// use smol::blocking;
/// use std::fs;
///
/// # smol::run(async {
/// let contents = blocking!(fs::read_to_string("file.txt"))?;
/// # std::io::Result::Ok(()) });
/// ```
///
/// Spawn a process:
///
/// ```no_run
/// use smol::blocking;
/// use std::process::Command;
///
/// # smol::run(async {
/// let out = blocking!(Command::new("dir").output())?;
/// # std::io::Result::Ok(()) });
/// ```
#[macro_export]
macro_rules! blocking {
($($expr:tt)*) => {
$crate::Task::blocking(async move { $($expr)* }).await
};
}
/// Creates a stream that iterates on a thread.
///
/// This adapter converts any kind of synchronous iterator into an asynchronous stream by running
/// it on the blocking executor and sending items back over a channel.
///
/// # Examples
///
/// List files in the current directory:
///
/// ```no_run
/// use futures::stream::StreamExt;
/// use smol::{blocking, iter};
/// use std::fs;
///
/// # smol::run(async {
/// // Load a directory.
/// let mut dir = blocking!(fs::read_dir("."))?;
/// let mut dir = iter(dir);
///
/// // Iterate over the contents of the directory.
/// while let Some(res) = dir.next().await {
/// println!("{}", res?.file_name().to_string_lossy());
/// }
/// # std::io::Result::Ok(()) });
/// ```
pub fn iter<T: Send + 'static>(
iter: impl Iterator<Item = T> + Send + 'static,
) -> impl Stream<Item = T> + Send + Unpin + 'static {
blocking::Unblock::new(iter)
}
/// Creates an async reader that runs on a thread.
///
/// This adapter converts any kind of synchronous reader into an asynchronous reader by running it
/// on the blocking executor and sending bytes back over a pipe.
///
/// # Examples
///
/// Read from a file:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::{blocking, reader};
/// use std::fs::File;
///
/// # smol::run(async {
/// // Open a file for reading.
/// let file = blocking!(File::open("foo.txt"))?;
/// let mut file = reader(file);
///
/// // Read the whole file.
/// let mut contents = Vec::new();
/// file.read_to_end(&mut contents).await?;
/// # std::io::Result::Ok(()) });
/// ```
///
/// Read output from a process:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::reader;
/// use std::process::{Command, Stdio};
///
/// # smol::run(async {
/// // Spawn a child process and make an async reader for its stdout.
/// let child = Command::new("dir").stdout(Stdio::piped()).spawn()?;
/// let mut child_stdout = reader(child.stdout.unwrap());
///
/// // Read the entire output.
/// let mut output = String::new();
/// child_stdout.read_to_string(&mut output).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn reader(reader: impl Read + Send + 'static) -> impl AsyncRead + Send + Unpin + 'static {
blocking::Unblock::new(reader)
}
/// Creates an async writer that runs on a thread.
///
/// This adapter converts any kind of synchronous writer into an asynchronous writer by running it
/// on the blocking executor and receiving bytes over a pipe.
///
/// **Note:** Don't forget to flush the writer at the end, or some written bytes might get lost!
///
/// # Examples
///
/// Write into a file:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::{blocking, writer};
/// use std::fs::File;
///
/// # smol::run(async {
/// // Open a file for writing.
/// let file = blocking!(File::open("foo.txt"))?;
/// let mut file = writer(file);
///
/// // Write some bytes into the file and flush.
/// file.write_all(b"hello").await?;
/// file.flush().await?;
/// # std::io::Result::Ok(()) });
/// ```
///
/// Write into standard output:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::writer;
///
/// # smol::run(async {
/// // Create an async writer to stdout.
/// let mut stdout = writer(std::io::stdout());
///
/// // Write a message and flush.
/// stdout.write_all(b"hello").await?;
/// stdout.flush().await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn writer(writer: impl Write + Send + 'static) -> impl AsyncWrite + Send + Unpin + 'static {
blocking::Unblock::new(writer)
}

View File

@ -1,36 +0,0 @@
//! Task context common to all executors.
//!
//! Before executor, we "enter" it by setting up some necessary thread-locals.
/// Enters the tokio context if the `tokio` feature is enabled.
pub(crate) fn enter<T>(f: impl FnOnce() -> T) -> T {
#[cfg(not(feature = "tokio02"))]
return f();
#[cfg(feature = "tokio02")]
{
use once_cell::sync::Lazy;
use std::cell::Cell;
use tokio::runtime::Runtime;
thread_local! {
/// The level of nested `enter` calls we are in, to ensure that the outer most always has a
/// runtime spawned.
static NESTING: Cell<usize> = Cell::new(0);
}
static RT: Lazy<Runtime> = Lazy::new(|| Runtime::new().expect("cannot initialize tokio"));
NESTING.with(|nesting| {
let res = if nesting.get() == 0 {
nesting.replace(1);
RT.enter(f)
} else {
nesting.replace(nesting.get() + 1);
f()
};
nesting.replace(nesting.get() - 1);
res
})
}
}

View File

@ -1,67 +1,58 @@
//! A small and fast async runtime.
//! A small and fast executor.
//!
//! # Executors
//! This crate runs a global executor thread pool and only has one type, [`Task`]. Despite the
//! trivially simple codebase, this executor and its related crates offer performance and features
//! comparable to more complex frameworks like [tokio].
//!
//! There are three executors that poll futures:
//! Related async crates:
//!
//! 1. Thread-local executor for tasks created by [`Task::local()`].
//! 2. Work-stealing executor for tasks created by [`Task::spawn()`].
//! 3. Blocking executor for tasks created by [`Task::blocking()`], [`blocking!`], [`iter()`],
//! [`reader()`] and [`writer()`].
//! * For async I/O and timers, use [`async-io`].
//! * For higher-level networking primitives, use [`async-net`].
//! * To call blocking code from async code or the other way around, use [`blocking`].
//!
//! Blocking executor is the only one that spawns threads on its own.
//! [`async-io`]: https://docs.rs/async-io
//! [`async-net`]: https://docs.rs/async-net
//! [`blocking`]: https://docs.rs/blocking
//! [`tokio`]: https://docs.rs/tokio
//!
//! See [here](fn.run.html#examples) for how to run executors on a single thread or on a thread
//! pool.
//! # TCP server
//!
//! # Reactor
//!
//! To wait for the next I/O event, the reactor calls [epoll] on Linux/Android, [kqueue] on
//! macOS/iOS/BSD, and [wepoll] on Windows.
//!
//! The [`Async`] type registers I/O handles in the reactor and is able to convert their blocking
//! operations into async operations.
//!
//! The [`Timer`] type registers timers in the reactor that will fire at the chosen points in
//! time.
//!
//! # Running
//!
//! Function [`run()`] simultaneously runs the thread-local executor, runs the work-stealing
//! executor, and polls the reactor for I/O events and timers. At least one thread has to be
//! calling [`run()`] in order for futures waiting on I/O and timers to get notified.
//!
//! If you want a multithreaded runtime, just call [`run()`] from multiple threads. See
//! [here](fn.run.html#examples) for an example.
//!
//! There is also [`block_on()`], which blocks the current thread until a future completes, but it
//! doesn't poll the reactor or run executors. When using [`block_on()`], make sure at least one
//! thread is calling [`run()`], or else I/O and timers will not work!
//!
//! Blocking tasks run in the background on a dedicated thread pool.
//!
//! # Examples
//!
//! Connect to a HTTP website, make a GET request, and pipe the response to the standard output:
//! A simple TCP server that prints messages received from clients:
//!
//! ```no_run
//! use futures::prelude::*;
//! use smol::Async;
//! use std::net::TcpStream;
//! use async_net::TcpListener;
//! use blocking::{block_on, Unblock};
//! use smol::Task;
//!
//! fn main() -> std::io::Result<()> {
//! smol::run(async {
//! let mut stream = Async::<TcpStream>::connect("example.com:80").await?;
//! let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
//! stream.write_all(req).await?;
//! block_on(async {
//! // Start listening on port 9000.
//! let listener = TcpListener::bind("127.0.0.1:9000").await?;
//!
//! let mut stdout = smol::writer(std::io::stdout());
//! futures::io::copy(&stream, &mut stdout).await?;
//! Ok(())
//! loop {
//! // Accept a new client.
//! let (stream, _) = listener.accept().await?;
//!
//! // Spawn a task handling this client.
//! let task = Task::spawn(async move {
//! // Create an async stdio handle.
//! let mut stdout = Unblock::new(std::io::stdout());
//!
//! // Copy data received from the client into stdout.
//! futures::io::copy(&stream, &mut stdout).await
//! });
//!
//! // Keep running the task in the background.
//! task.detach();
//! }
//! })
//! }
//! ```
//!
//! To interact with the server, run `nc 127.0.0.1 9000` and type a few lines of text.
//!
//! # Examples
//!
//! Look inside the [examples] directory for more:
//! a [web crawler][web-crawler],
//! a [Ctrl-C handler][ctrl-c],
@ -79,13 +70,10 @@
//! Finally, there's an [example][other-runtimes] showing how to use smol with
//! [async-std], [tokio], [surf], and [reqwest].
//!
//! [epoll]: https://en.wikipedia.org/wiki/Epoll
//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
//! [wepoll]: https://github.com/piscisaureus/wepoll
//!
//! [examples]: https://github.com/stjepang/smol/tree/master/examples
//! [examples]: https://github.com/stjepang/smol/tree/master/examples/!
//! [async-h1]: https://docs.rs/async-h1
//! [hyper]: https://docs.rs/hyper
//! [hyper]: https://docs.rs/tokio
//! [async-std]: https://docs.rs/async-std
//! [tokio]: https://docs.rs/tokio
//! [surf]: https://docs.rs/surf
@ -113,23 +101,180 @@
//! [websocket-client]: https://github.com/stjepang/smol/blob/master/examples/websocket-client.rs
//! [websocket-server]: https://github.com/stjepang/smol/blob/master/examples/websocket-server.rs
#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
mod async_io;
mod block_on;
mod blocking;
mod context;
mod multitask;
mod parking;
mod reactor;
mod run;
mod sys;
mod task;
mod timer;
use std::future::Future;
use std::panic::catch_unwind;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
pub use self::blocking::{iter, reader, writer};
pub use async_io::Async;
pub use block_on::block_on;
pub use run::run;
pub use task::Task;
pub use timer::Timer;
use multitask::Executor;
use once_cell::sync::Lazy;
/// A spawned future.
///
/// Tasks are also futures themselves and yield the output of the spawned future.
///
/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit
/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method.
///
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # blocking::block_on(async {
/// // Spawn a task onto the work-stealing executor.
/// let task = Task::spawn(async {
/// println!("Hello from a task!");
/// 1 + 2
/// });
///
/// // Wait for the task to complete.
/// assert_eq!(task.await, 3);
/// # });
/// ```
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
#[derive(Debug)]
pub struct Task<T>(multitask::Task<T>);
impl<T> Task<T> {
/// Spawns a future.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # blocking::block_on(async {
/// let task = Task::spawn(async { 1 + 2 });
/// assert_eq!(task.await, 3);
/// # });
/// ```
pub fn spawn<F>(future: F) -> Task<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
static EXECUTOR: Lazy<Executor> = Lazy::new(|| {
for _ in 0..num_cpus::get().max(1) {
thread::spawn(|| {
enter(|| {
let (p, u) = async_io::parking::pair();
let ticker = EXECUTOR.ticker(move || u.unpark());
loop {
if let Ok(false) = catch_unwind(|| ticker.tick()) {
p.park();
}
}
})
});
}
Executor::new()
});
Task(EXECUTOR.spawn(future))
}
/// Detaches the task to let it keep running in the background.
///
/// # Examples
///
/// ```no_run
/// use async_io::Timer;
/// use smol::Task;
/// use std::time::Duration;
///
/// # blocking::block_on(async {
/// Task::spawn(async {
/// loop {
/// println!("I'm a daemon task looping forever.");
/// Timer::new(Duration::from_secs(1)).await;
/// }
/// })
/// .detach();
/// # })
/// ```
pub fn detach(self) {
self.0.detach();
}
/// Cancels the task and waits for it to stop running.
///
/// Returns the task's output if it was completed just before it got canceled, or [`None`] if
/// it didn't complete.
///
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
/// canceling because it also waits for the task to stop running.
///
/// # Examples
///
/// ```
/// use async_io::Timer;
/// use smol::Task;
/// use std::time::Duration;
///
/// # blocking::block_on(async {
/// let task = Task::spawn(async {
/// loop {
/// println!("Even though I'm in an infinite loop, you can still cancel me!");
/// Timer::new(Duration::from_secs(1)).await;
/// }
/// });
///
/// Timer::new(Duration::from_secs(3)).await;
/// task.cancel().await;
/// # })
/// ```
pub async fn cancel(self) -> Option<T> {
self.0.cancel().await
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
}
/// Enters the tokio context if the `tokio` feature is enabled.
fn enter<T>(f: impl FnOnce() -> T) -> T {
#[cfg(not(feature = "tokio02"))]
return f();
#[cfg(feature = "tokio02")]
{
use once_cell::sync::Lazy;
use std::cell::Cell;
use tokio::runtime::Runtime;
thread_local! {
/// The level of nested `enter` calls we are in, to ensure that the outermost always
/// has a runtime spawned.
static NESTING: Cell<usize> = Cell::new(0);
}
/// The global tokio runtime.
static RT: Lazy<Runtime> = Lazy::new(|| Runtime::new().expect("cannot initialize tokio"));
NESTING.with(|nesting| {
let res = if nesting.get() == 0 {
nesting.replace(1);
RT.enter(f)
} else {
nesting.replace(nesting.get() + 1);
f()
};
nesting.replace(nesting.get() - 1);
res
})
}
}

View File

@ -1,508 +0,0 @@
use std::cell::Cell;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::thread::{self, ThreadId};
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use scoped_tls::scoped_thread_local;
use slab::Slab;
use crate::task::{Runnable, Task};
scoped_thread_local! {
static WORKER: Worker
}
/// State shared between [`Queue`] and [`Worker`].
struct Global {
/// The global queue.
queue: ConcurrentQueue<Runnable>,
/// Shards of the global queue created by workers.
shards: RwLock<Slab<Arc<ConcurrentQueue<Runnable>>>>,
/// Set to `true` when a sleeping worker is notified or no workers are sleeping.
notified: AtomicBool,
/// A list of sleeping workers.
sleepers: Mutex<Sleepers>,
}
impl Global {
/// Notifies a sleeping worker.
fn notify(&self) {
if !self
.notified
.compare_and_swap(false, true, Ordering::SeqCst)
{
let callback = self.sleepers.lock().unwrap().notify();
if let Some(cb) = callback {
cb.call();
}
}
}
}
/// A list of sleeping workers.
struct Sleepers {
/// Number of sleeping workers (both notified and unnotified).
count: usize,
/// Callbacks of sleeping unnotified workers.
///
/// A sleeping worker is notified when its callback is missing from this list.
callbacks: Vec<Callback>,
}
impl Sleepers {
/// Inserts a new sleeping worker.
fn insert(&mut self, callback: &Callback) {
self.count += 1;
self.callbacks.push(callback.clone());
}
/// Re-inserts a sleeping worker's callback if it was notified.
///
/// Returns `true` if the worker was notified.
fn update(&mut self, callback: &Callback) -> bool {
if self.callbacks.iter().all(|cb| cb != callback) {
self.callbacks.push(callback.clone());
true
} else {
false
}
}
/// Removes a previously inserted sleeping worker.
fn remove(&mut self, callback: &Callback) {
self.count -= 1;
for i in (0..self.callbacks.len()).rev() {
if &self.callbacks[i] == callback {
self.callbacks.remove(i);
return;
}
}
}
/// Returns `true` if a sleeping worker is notified or no workers are sleeping.
fn is_notified(&self) -> bool {
self.count == 0 || self.count > self.callbacks.len()
}
/// Returns notification callback for a sleeping worker.
///
/// If a worker was notified already or there are no workers, `None` will be returned.
fn notify(&mut self) -> Option<Callback> {
if self.callbacks.len() == self.count {
self.callbacks.pop()
} else {
None
}
}
}
/// A queue for spawning tasks.
pub(crate) struct Queue {
global: Arc<Global>,
}
impl Queue {
/// Creates a new queue for spawning tasks.
pub fn new() -> Queue {
Queue {
global: Arc::new(Global {
queue: ConcurrentQueue::unbounded(),
shards: RwLock::new(Slab::new()),
notified: AtomicBool::new(true),
sleepers: Mutex::new(Sleepers {
count: 0,
callbacks: Vec::new(),
}),
}),
}
}
/// Spawns a future onto this queue.
///
/// Returns a [`Task`] handle for the spawned task.
pub fn spawn<T: Send + 'static>(
&self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
let global = self.global.clone();
// The function that schedules a runnable task when it gets woken up.
let schedule = move |runnable| {
if WORKER.is_set() {
WORKER.with(|w| {
if Arc::ptr_eq(&global, &w.global) {
if let Err(err) = w.shard.push(runnable) {
global.queue.push(err.into_inner()).unwrap();
}
} else {
global.queue.push(runnable).unwrap();
}
});
} else {
global.queue.push(runnable).unwrap();
}
global.notify();
};
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn(future, schedule, ());
runnable.schedule();
Task(Some(handle))
}
/// Registers a new worker.
///
/// The worker will automatically deregister itself when dropped.
pub fn worker(&self, notify: impl Fn() + Send + Sync + 'static) -> Worker {
let mut shards = self.global.shards.write().unwrap();
let vacant = shards.vacant_entry();
// Create a worker and put its stealer handle into the executor.
let worker = Worker {
key: vacant.key(),
global: Arc::new(self.global.clone()),
shard: SlotQueue {
slot: Cell::new(None),
queue: Arc::new(ConcurrentQueue::bounded(512)),
},
local: SlotQueue {
slot: Cell::new(None),
queue: Arc::new(ConcurrentQueue::unbounded()),
},
callback: Callback::new(notify),
sleeping: Cell::new(false),
ticker: Cell::new(0),
};
vacant.insert(worker.shard.queue.clone());
worker
}
}
impl Default for Queue {
fn default() -> Queue {
Queue::new()
}
}
/// A worker that participates in the work-stealing executor.
///
/// Each invocation of `run()` creates its own worker.
pub(crate) struct Worker {
/// The ID of this worker obtained during registration.
key: usize,
/// The global queue.
global: Arc<Arc<Global>>,
/// A shard of the global queue.
shard: SlotQueue<Runnable>,
/// Local queue for `!Send` tasks.
local: SlotQueue<Runnable>,
/// Callback invoked to wake this worker up.
callback: Callback,
/// Set to `true` when in sleeping state.
///
/// States a worker can be in:
/// 1) Woken.
/// 2a) Sleeping and unnotified.
/// 2b) Sleeping and notified.
sleeping: Cell<bool>,
/// Bumped every time a task is run.
ticker: Cell<usize>,
}
impl Worker {
/// Spawns a local future onto this executor.
///
/// Returns a [`Task`] handle for the spawned task.
pub fn spawn_local<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
let queue = self.local.queue.clone();
let callback = self.callback.clone();
let id = thread_id();
// The function that schedules a runnable task when it gets woken up.
let schedule = move |runnable| {
if thread_id() == id && WORKER.is_set() {
WORKER.with(|w| {
if Arc::ptr_eq(&queue, &w.local.queue) {
w.local.push(runnable).unwrap();
} else {
queue.push(runnable).unwrap();
}
});
} else {
queue.push(runnable).unwrap();
}
callback.call();
};
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn_local(future, schedule, ());
runnable.schedule();
Task(Some(handle))
}
/// Moves the worker into sleeping and unnotified state.
///
/// Returns `true` if the worker was already sleeping and unnotified.
fn sleep(&self) -> bool {
let mut sleepers = self.global.sleepers.lock().unwrap();
if self.sleeping.get() {
// Already sleeping, check if notified.
if !sleepers.update(&self.callback) {
return false;
}
} else {
// Move to sleeping state.
sleepers.insert(&self.callback);
}
self.global
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
self.sleeping.set(true);
true
}
/// Moves the worker into woken state.
fn wake(&self) -> bool {
if self.sleeping.get() {
let mut sleepers = self.global.sleepers.lock().unwrap();
sleepers.remove(&self.callback);
self.global
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
}
self.sleeping.replace(false)
}
/// Runs a single task and returns `true` if one was found.
pub fn tick(&self) -> bool {
loop {
match self.search() {
None => {
// Move to sleeping and unnotified state.
if !self.sleep() {
// If already sleeping and unnotified, return.
return false;
}
}
Some(r) => {
// Wake up.
if !self.wake() {
// If already woken, notify another worker.
self.global.notify();
}
// Bump the ticker.
let ticker = self.ticker.get();
self.ticker.set(ticker.wrapping_add(1));
// Flush slots to ensure fair task scheduling.
if ticker % 16 == 0 {
if let Err(err) = self.shard.flush() {
self.global.queue.push(err.into_inner()).unwrap();
self.global.notify();
}
self.local.flush().unwrap();
}
// Steal tasks from the global queue to ensure fair task scheduling.
if ticker % 64 == 0 {
self.shard.steal(&self.global.queue);
}
// Run the task.
if WORKER.set(self, || r.run()) {
// The task was woken while it was running, which means it got
// scheduled the moment running completed. Therefore, it is now inside
// the slot and would be the next task to run.
//
// Instead of re-running the task in the next iteration, let's flush
// the slot in order to give other tasks a chance to run.
//
// This is a necessary step to ensure task yielding works as expected.
// If a task wakes itself and returns `Poll::Pending`, we don't want it
// to run immediately after that because that'd defeat the whole
// purpose of yielding.
if let Err(err) = self.shard.flush() {
self.global.queue.push(err.into_inner()).unwrap();
self.global.notify();
}
self.local.flush().unwrap();
}
return true;
}
}
}
}
/// Finds the next task to run.
fn search(&self) -> Option<Runnable> {
if self.ticker.get() % 2 == 0 {
// On even ticks, look into the local queue and then into the shard.
if let Ok(r) = self.local.pop().or_else(|_| self.shard.pop()) {
return Some(r);
}
} else {
// On odd ticks, look into the shard and then into the local queue.
if let Ok(r) = self.shard.pop().or_else(|_| self.local.pop()) {
return Some(r);
}
}
// Try stealing from the global queue.
self.shard.steal(&self.global.queue);
if let Ok(r) = self.shard.pop() {
return Some(r);
}
// Try stealing from other shards.
let shards = self.global.shards.read().unwrap();
// Pick a random starting point in the iterator list and rotate the list.
let n = shards.len();
let start = fastrand::usize(..n);
let iter = shards.iter().chain(shards.iter()).skip(start).take(n);
// Remove this worker's shard.
let iter = iter.filter(|(key, _)| *key != self.key);
let iter = iter.map(|(_, shard)| shard);
// Try stealing from each shard in the list.
for shard in iter {
self.shard.steal(shard);
if let Ok(r) = self.shard.pop() {
return Some(r);
}
}
None
}
}
impl Drop for Worker {
fn drop(&mut self) {
// Wake and unregister the worker.
self.wake();
self.global.shards.write().unwrap().remove(self.key);
// Re-schedule remaining tasks in the shard.
while let Ok(r) = self.shard.pop() {
r.schedule();
}
// Notify another worker to start searching for tasks.
self.global.notify();
// TODO(stjepang): Close the local queue and empty it.
}
}
/// A queue with a single-item slot in front of it.
struct SlotQueue<T> {
slot: Cell<Option<T>>,
queue: Arc<ConcurrentQueue<T>>,
}
impl<T> SlotQueue<T> {
/// Pushes an item into the slot, overflowing the old item into the queue.
fn push(&self, t: T) -> Result<(), PushError<T>> {
match self.slot.replace(Some(t)) {
None => Ok(()),
Some(t) => self.queue.push(t),
}
}
/// Pops an item from the slot, or queue if the slot is empty.
fn pop(&self) -> Result<T, PopError> {
match self.slot.take() {
None => self.queue.pop(),
Some(t) => Ok(t),
}
}
/// Flushes the slot into the queue.
fn flush(&self) -> Result<(), PushError<T>> {
match self.slot.take() {
None => Ok(()),
Some(t) => self.queue.push(t),
}
}
/// Steals some items from another queue.
fn steal(&self, from: &ConcurrentQueue<T>) {
// Flush the slot before stealing.
if let Err(err) = self.flush() {
self.slot.set(Some(err.into_inner()));
return;
}
// Half of `from`'s length rounded up.
let mut count = (from.len() + 1) / 2;
if count > 0 {
// Don't steal more than fits into the queue.
if let Some(cap) = self.queue.capacity() {
count = count.min(cap - self.queue.len());
}
// Steal tasks.
for _ in 0..count {
if let Ok(t) = from.pop() {
assert!(self.queue.push(t).is_ok());
} else {
break;
}
}
}
}
}
/// Same as `std::thread::current().id()`, but more efficient.
fn thread_id() -> ThreadId {
thread_local! {
static ID: ThreadId = thread::current().id();
}
ID.try_with(|id| *id)
.unwrap_or_else(|_| thread::current().id())
}
#[derive(Clone)]
struct Callback(Arc<Box<dyn Fn() + Send + Sync>>);
impl Callback {
fn new(f: impl Fn() + Send + Sync + 'static) -> Callback {
Callback(Arc::new(Box::new(f)))
}
fn call(&self) {
(self.0)();
}
}
impl PartialEq for Callback {
fn eq(&self, other: &Callback) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}
impl Eq for Callback {}

View File

@ -1,268 +0,0 @@
use std::cell::Cell;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
use once_cell::sync::Lazy;
use slab::Slab;
use crate::reactor::Reactor;
static REGISTRY: Lazy<Mutex<Slab<Unparker>>> = Lazy::new(|| Mutex::new(Slab::new()));
/// Parks a thread.
pub(crate) struct Parker {
key: Cell<Option<usize>>,
unparker: Unparker,
}
impl Parker {
/// Creates a new [`Parker`].
pub fn new() -> Parker {
Parker {
key: Cell::new(None),
unparker: Unparker {
inner: Arc::new(Inner {
state: AtomicUsize::new(EMPTY),
lock: Mutex::new(()),
cvar: Condvar::new(),
}),
},
}
}
/// Blocks the current thread until the token is made available.
pub fn park(&self) {
self.register();
self.unparker.inner.park(None);
}
/// Blocks the current thread until the token is made available or the timeout is reached.
pub fn park_timeout(&self, timeout: Duration) -> bool {
self.register();
self.unparker.inner.park(Some(timeout))
}
// /// Blocks the current thread until the token is made available or the deadline is reached.
// pub fn park_deadline(&self, deadline: Instant) -> bool {
// self.register();
// self.unparker
// .inner
// .park(Some(deadline.saturating_duration_since(Instant::now())))
// }
//
// /// Atomically makes the token available if it is not already.
// pub fn unpark(&self) {
// self.unparker.unpark()
// }
/// Returns a handle for unparking.
pub fn unparker(&self) -> Unparker {
self.unparker.clone()
}
fn register(&self) {
if self.key.get().is_none() {
let mut reg = REGISTRY.lock().unwrap();
let key = reg.insert(self.unparker.clone());
self.key.set(Some(key));
}
}
fn unregister(&self) {
if let Some(key) = self.key.take() {
let mut reg = REGISTRY.lock().unwrap();
reg.remove(key);
// Notify another parker to make sure the reactor keeps getting polled.
if let Some((_, u)) = reg.iter().next() {
u.unpark();
}
}
}
}
impl Drop for Parker {
fn drop(&mut self) {
self.unregister();
}
}
impl fmt::Debug for Parker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Parker { .. }")
}
}
/// Unparks a thread.
pub(crate) struct Unparker {
inner: Arc<Inner>,
}
impl Unparker {
/// Atomically makes the token available if it is not already.
pub fn unpark(&self) {
self.inner.unpark()
}
}
impl fmt::Debug for Unparker {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("Unparker { .. }")
}
}
impl Clone for Unparker {
fn clone(&self) -> Unparker {
Unparker {
inner: self.inner.clone(),
}
}
}
const EMPTY: usize = 0;
const PARKED: usize = 1;
const POLLING: usize = 2;
const NOTIFIED: usize = 3;
struct Inner {
state: AtomicUsize,
lock: Mutex<()>,
cvar: Condvar,
}
impl Inner {
fn park(&self, timeout: Option<Duration>) -> bool {
// If we were previously notified then we consume this notification and return quickly.
if self
.state
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
.is_ok()
{
// Process available I/O events.
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
reactor_lock
.react(Some(Duration::from_secs(0)))
.expect("failure while polling I/O");
}
return true;
}
// If the timeout is zero, then there is no need to actually block.
if let Some(dur) = timeout {
if dur == Duration::from_millis(0) {
// Process available I/O events.
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
reactor_lock
.react(Some(Duration::from_secs(0)))
.expect("failure while polling I/O");
}
return false;
}
}
// Otherwise we need to coordinate going to sleep.
let mut reactor_lock = Reactor::get().try_lock();
let state = match reactor_lock {
None => PARKED,
Some(_) => POLLING,
};
let mut m = self.lock.lock().unwrap();
match self.state.compare_exchange(EMPTY, state, SeqCst, SeqCst) {
Ok(_) => {}
// Consume this notification to avoid spurious wakeups in the next park.
Err(NOTIFIED) => {
// We must read `state` here, even though we know it will be `NOTIFIED`. This is
// because `unpark` may have been called again since we read `NOTIFIED` in the
// `compare_exchange` above. We must perform an acquire operation that synchronizes
// with that `unpark` to observe any writes it made before the call to `unpark`. To
// do that we must read from the write it made to `state`.
let old = self.state.swap(EMPTY, SeqCst);
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return true;
}
Err(n) => panic!("inconsistent park_timeout state: {}", n),
}
match timeout {
None => {
loop {
// Block the current thread on the conditional variable.
match &mut reactor_lock {
None => m = self.cvar.wait(m).unwrap(),
Some(reactor_lock) => {
drop(m);
reactor_lock.react(None).expect("failure while polling I/O");
m = self.lock.lock().unwrap();
}
}
match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
Ok(_) => return true, // got a notification
Err(_) => {} // spurious wakeup, go back to sleep
}
}
}
Some(timeout) => {
// Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
// notification we just want to unconditionally set `state` back to `EMPTY`, either
// consuming a notification or un-flagging ourselves as parked.
let _m = match reactor_lock.as_mut() {
None => self.cvar.wait_timeout(m, timeout).unwrap().0,
Some(reactor_lock) => {
drop(m);
let deadline = Instant::now() + timeout;
loop {
reactor_lock
.react(Some(deadline.saturating_duration_since(Instant::now())))
.expect("failure while polling I/O");
if Instant::now() >= deadline {
break;
}
}
self.lock.lock().unwrap()
}
};
match self.state.swap(EMPTY, SeqCst) {
NOTIFIED => true, // got a notification
PARKED | POLLING => false, // no notification
n => panic!("inconsistent park_timeout state: {}", n),
}
}
}
}
pub fn unpark(&self) {
// To ensure the unparked thread will observe any writes we made before this call, we must
// perform a release operation that `park` can synchronize with. To do that we must write
// `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
// than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
let state = match self.state.swap(NOTIFIED, SeqCst) {
EMPTY => return, // no one was waiting
NOTIFIED => return, // already unparked
state => state, // gotta go wake someone up
};
// There is a period between when the parked thread sets `state` to `PARKED` (or last
// checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
// If we were to notify during this period it would be ignored and then when the parked
// thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
// stage so we can acquire `lock` to wait until it is ready to receive the notification.
//
// Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
// it doesn't get woken only to have to wait for us to release `lock`.
drop(self.lock.lock().unwrap());
if state == PARKED {
self.cvar.notify_one();
} else {
Reactor::get().notify();
}
}
}

View File

@ -1,881 +0,0 @@
//! The reactor notifying [`Async`][`crate::Async`] and [`Timer`][`crate::Timer`].
//!
//! There is a single global reactor that contains all registered I/O handles and timers. The
//! reactor is polled by the executor, i.e. the [`run()`][`crate::run()`] function.
#[cfg(not(any(
target_os = "linux", // epoll
target_os = "android", // epoll
target_os = "illumos", // epoll
target_os = "macos", // kqueue
target_os = "ios", // kqueue
target_os = "freebsd", // kqueue
target_os = "netbsd", // kqueue
target_os = "openbsd", // kqueue
target_os = "dragonfly", // kqueue
target_os = "windows", // wepoll
)))]
compile_error!("reactor does not support this target OS");
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::io;
use std::mem;
#[cfg(unix)]
use std::os::unix::io::RawFd;
#[cfg(windows)]
use std::os::windows::io::RawSocket;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Poll, Waker};
use std::time::{Duration, Instant};
use concurrent_queue::ConcurrentQueue;
use futures_util::future;
use once_cell::sync::Lazy;
use slab::Slab;
/// The reactor.
///
/// Every async I/O handle and every timer is registered here. Invocations of
/// [`run()`][`crate::run()`] poll the reactor to check for new events every now and then.
///
/// There is only one global instance of this type, accessible by [`Reactor::get()`].
pub(crate) struct Reactor {
/// Raw bindings to epoll/kqueue/wepoll.
sys: sys::Reactor,
/// Ticker bumped before polling.
ticker: AtomicUsize,
/// Registered sources.
sources: Mutex<Slab<Arc<Source>>>,
/// Temporary storage for I/O events when polling the reactor.
events: Mutex<sys::Events>,
/// An ordered map of registered timers.
///
/// Timers are in the order in which they fire. The `usize` in this type is a timer ID used to
/// distinguish timers that fire at the same time. The `Waker` represents the task awaiting the
/// timer.
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
/// A queue of timer operations (insert and remove).
///
/// When inserting or removing a timer, we don't process it immediately - we just push it into
/// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
timer_ops: ConcurrentQueue<TimerOp>,
}
impl Reactor {
/// Returns a reference to the reactor.
pub fn get() -> &'static Reactor {
static REACTOR: Lazy<Reactor> = Lazy::new(|| Reactor {
sys: sys::Reactor::new().expect("cannot initialize I/O event notification"),
ticker: AtomicUsize::new(0),
sources: Mutex::new(Slab::new()),
events: Mutex::new(sys::Events::new()),
timers: Mutex::new(BTreeMap::new()),
timer_ops: ConcurrentQueue::bounded(1000),
});
&REACTOR
}
/// Notifies the thread blocked on the reactor.
pub fn notify(&self) {
self.sys.notify().expect("failed to notify reactor");
}
/// Registers an I/O source in the reactor.
pub fn insert_io(
&self,
#[cfg(unix)] raw: RawFd,
#[cfg(windows)] raw: RawSocket,
) -> io::Result<Arc<Source>> {
let mut sources = self.sources.lock().unwrap();
let vacant = sources.vacant_entry();
// Create a source and register it.
let key = vacant.key();
self.sys.register(raw, key)?;
let source = Arc::new(Source {
raw,
key,
wakers: Mutex::new(Wakers {
tick_readable: 0,
tick_writable: 0,
readers: Vec::new(),
writers: Vec::new(),
}),
});
Ok(vacant.insert(source).clone())
}
/// Deregisters an I/O source from the reactor.
pub fn remove_io(&self, source: &Source) -> io::Result<()> {
let mut sources = self.sources.lock().unwrap();
sources.remove(source.key);
self.sys.deregister(source.raw)
}
/// Registers a timer in the reactor.
///
/// Returns the inserted timer's ID.
pub fn insert_timer(&self, when: Instant, waker: &Waker) -> usize {
// Generate a new timer ID.
static ID_GENERATOR: AtomicUsize = AtomicUsize::new(1);
let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
// Push an insert operation.
while self
.timer_ops
.push(TimerOp::Insert(when, id, waker.clone()))
.is_err()
{
// Fire timers to drain the queue.
self.fire_timers();
}
// Notify that a timer was added.
self.notify();
id
}
/// Deregisters a timer from the reactor.
pub fn remove_timer(&self, when: Instant, id: usize) {
// Push a remove operation.
while self.timer_ops.push(TimerOp::Remove(when, id)).is_err() {
// Fire timers to drain the queue.
self.fire_timers();
}
}
/// Attempts to lock the reactor.
pub fn try_lock(&self) -> Option<ReactorLock<'_>> {
self.events.try_lock().ok().map(|events| {
let reactor = self;
ReactorLock { reactor, events }
})
}
/// Fires ready timers.
///
/// Returns the duration until the next timer before this method was called.
fn fire_timers(&self) -> Option<Duration> {
let mut timers = self.timers.lock().unwrap();
// Process timer operations, but no more than the queue capacity because otherwise we could
// keep popping operations forever.
for _ in 0..self.timer_ops.capacity().unwrap() {
match self.timer_ops.pop() {
Ok(TimerOp::Insert(when, id, waker)) => {
timers.insert((when, id), waker);
}
Ok(TimerOp::Remove(when, id)) => {
timers.remove(&(when, id));
}
Err(_) => break,
}
}
let now = Instant::now();
// Split timers into ready and pending timers.
let pending = timers.split_off(&(now, 0));
let ready = mem::replace(&mut *timers, pending);
// Calculate the duration until the next event.
let dur = if ready.is_empty() {
// Duration until the next timer.
timers
.keys()
.next()
.map(|(when, _)| when.saturating_duration_since(now))
} else {
// Timers are about to fire right now.
Some(Duration::from_secs(0))
};
// Drop the lock before waking.
drop(timers);
// Wake up tasks waiting on timers.
for (_, waker) in ready {
waker.wake();
}
dur
}
}
/// A lock on the reactor.
pub(crate) struct ReactorLock<'a> {
reactor: &'a Reactor,
events: MutexGuard<'a, sys::Events>,
}
impl ReactorLock<'_> {
/// Processes new events, blocking until the first event or the timeout.
pub fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
// Fire timers.
let next_timer = self.reactor.fire_timers();
// compute the timeout for blocking on I/O events.
let timeout = match (next_timer, timeout) {
(None, None) => None,
(Some(t), None) | (None, Some(t)) => Some(t),
(Some(a), Some(b)) => Some(a.min(b)),
};
// Bump the ticker before polling I/O.
let tick = self
.reactor
.ticker
.fetch_add(1, Ordering::SeqCst)
.wrapping_add(1);
// Block on I/O events.
match self.reactor.sys.wait(&mut self.events, timeout) {
// No I/O events occurred.
Ok(0) => {
if timeout != Some(Duration::from_secs(0)) {
// The non-zero timeout was hit so fire ready timers.
self.reactor.fire_timers();
}
Ok(())
}
// At least one I/O event occurred.
Ok(_) => {
// Iterate over sources in the event list.
let sources = self.reactor.sources.lock().unwrap();
let mut ready = Vec::new();
for ev in self.events.iter() {
// Check if there is a source in the table with this key.
if let Some(source) = sources.get(ev.key) {
let mut wakers = source.wakers.lock().unwrap();
// Wake readers if a readability event was emitted.
if ev.readable {
wakers.tick_readable = tick;
ready.append(&mut wakers.readers);
}
// Wake writers if a writability event was emitted.
if ev.writable {
wakers.tick_writable = tick;
ready.append(&mut wakers.writers);
}
// Re-register if there are still writers or
// readers. The can happen if e.g. we were
// previously interested in both readability and
// writability, but only one of them was emitted.
if !(wakers.writers.is_empty() && wakers.readers.is_empty()) {
self.reactor.sys.reregister(
source.raw,
source.key,
!wakers.readers.is_empty(),
!wakers.writers.is_empty(),
)?;
}
}
}
// Drop the lock before waking.
drop(sources);
// Wake up tasks waiting on I/O.
for waker in ready {
waker.wake();
}
Ok(())
}
// The syscall was interrupted.
Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
// An actual error occureed.
Err(err) => Err(err),
}
}
}
/// A single timer operation.
enum TimerOp {
Insert(Instant, usize, Waker),
Remove(Instant, usize),
}
/// A registered source of I/O events.
#[derive(Debug)]
pub(crate) struct Source {
/// Raw file descriptor on Unix platforms.
#[cfg(unix)]
pub(crate) raw: RawFd,
/// Raw socket handle on Windows.
#[cfg(windows)]
pub(crate) raw: RawSocket,
/// The key of this source obtained during registration.
key: usize,
/// Tasks interested in events on this source.
wakers: Mutex<Wakers>,
}
/// Tasks interested in events on a source.
#[derive(Debug)]
struct Wakers {
/// Last reactor tick that delivered a readability event.
tick_readable: usize,
/// Last reactor tick that delivered a writability event.
tick_writable: usize,
/// Tasks waiting for the next readability event.
readers: Vec<Waker>,
/// Tasks waiting for the next writability event.
writers: Vec<Waker>,
}
impl Source {
/// Waits until the I/O source is readable.
pub(crate) async fn readable(&self) -> io::Result<()> {
let mut ticks = None;
future::poll_fn(|cx| {
let mut wakers = self.wakers.lock().unwrap();
// Check if the reactor has delivered a readability event.
if let Some((a, b)) = ticks {
// If `tick_readable` has changed to a value other than the old reactor tick, that
// means a newer reactor tick has delivered a readability event.
if wakers.tick_readable != a && wakers.tick_readable != b {
return Poll::Ready(Ok(()));
}
}
// If there are no other readers, re-register in the reactor.
if wakers.readers.is_empty() {
Reactor::get().sys.reregister(
self.raw,
self.key,
true,
!wakers.writers.is_empty(),
)?;
}
// Register the current task's waker if not present already.
if wakers.readers.iter().all(|w| !w.will_wake(cx.waker())) {
wakers.readers.push(cx.waker().clone());
}
// Remember the current ticks.
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
wakers.tick_readable,
));
}
Poll::Pending
})
.await
}
/// Waits until the I/O source is writable.
pub(crate) async fn writable(&self) -> io::Result<()> {
let mut ticks = None;
future::poll_fn(|cx| {
let mut wakers = self.wakers.lock().unwrap();
// Check if the reactor has delivered a writability event.
if let Some((a, b)) = ticks {
// If `tick_writable` has changed to a value other than the old reactor tick, that
// means a newer reactor tick has delivered a writability event.
if wakers.tick_writable != a && wakers.tick_writable != b {
return Poll::Ready(Ok(()));
}
}
// If there are no other writers, re-register in the reactor.
if wakers.writers.is_empty() {
Reactor::get().sys.reregister(
self.raw,
self.key,
!wakers.readers.is_empty(),
true,
)?;
}
// Register the current task's waker if not present already.
if wakers.writers.iter().all(|w| !w.will_wake(cx.waker())) {
wakers.writers.push(cx.waker().clone());
}
// Remember the current ticks.
if ticks.is_none() {
ticks = Some((
Reactor::get().ticker.load(Ordering::SeqCst),
wakers.tick_writable,
));
}
Poll::Pending
})
.await
}
}
/// Raw bindings to epoll (Linux, Android, illumos).
#[cfg(any(target_os = "linux", target_os = "android", target_os = "illumos"))]
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::unix::io::RawFd;
use std::time::Duration;
use crate::sys::epoll::{
epoll_create1, epoll_ctl, epoll_wait, EpollEvent, EpollFlags, EpollOp,
};
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { libc::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
pub struct Reactor {
epoll_fd: RawFd,
event_fd: RawFd,
}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let epoll_fd = epoll_create1()?;
let event_fd = syscall!(eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK))?;
let reactor = Reactor { epoll_fd, event_fd };
reactor.register(event_fd, !0)?;
reactor.reregister(event_fd, !0, true, false)?;
Ok(reactor)
}
pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> {
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
let ev = &mut EpollEvent::new(0, key as u64);
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlAdd, fd, Some(ev))
}
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
let mut flags = libc::EPOLLONESHOT;
if read {
flags |= read_flags();
}
if write {
flags |= write_flags();
}
let ev = &mut EpollEvent::new(flags, key as u64);
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlMod, fd, Some(ev))
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
epoll_ctl(self.epoll_fd, EpollOp::EpollCtlDel, fd, None)
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = timeout
.map(|t| {
if t == Duration::from_millis(0) {
t
} else {
t.max(Duration::from_millis(1))
}
})
.and_then(|t| t.as_millis().try_into().ok())
.unwrap_or(-1);
events.len = epoll_wait(self.epoll_fd, &mut events.list, timeout_ms)?;
let mut buf = [0u8; 8];
let _ = syscall!(read(
self.event_fd,
&mut buf[0] as *mut u8 as *mut libc::c_void,
buf.len()
));
self.reregister(self.event_fd, !0, true, false)?;
Ok(events.len)
}
pub fn notify(&self) -> io::Result<()> {
let buf: [u8; 8] = 1u64.to_ne_bytes();
let _ = syscall!(write(
self.event_fd,
&buf[0] as *const u8 as *const libc::c_void,
buf.len()
));
Ok(())
}
}
fn read_flags() -> EpollFlags {
libc::EPOLLIN | libc::EPOLLRDHUP | libc::EPOLLHUP | libc::EPOLLERR | libc::EPOLLPRI
}
fn write_flags() -> EpollFlags {
libc::EPOLLOUT | libc::EPOLLHUP | libc::EPOLLERR
}
pub struct Events {
list: Box<[EpollEvent]>,
len: usize,
}
impl Events {
pub fn new() -> Events {
let list = vec![EpollEvent::empty(); 1000].into_boxed_slice();
let len = 0;
Events { list, len }
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
readable: (ev.events() & read_flags()) != 0,
writable: (ev.events() & write_flags()) != 0,
key: ev.data() as usize,
})
}
}
pub struct Event {
pub readable: bool,
pub writable: bool,
pub key: usize,
}
}
/// Raw bindings to kqueue (macOS, iOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
))]
mod sys {
use std::io::{self, Read, Write};
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
use std::time::Duration;
use crate::sys::event::{kevent_ts, kqueue, KEvent};
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { libc::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
pub struct Reactor {
kqueue_fd: RawFd,
read_stream: UnixStream,
write_stream: UnixStream,
}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let kqueue_fd = kqueue()?;
syscall!(fcntl(kqueue_fd, libc::F_SETFD, libc::FD_CLOEXEC))?;
let (read_stream, write_stream) = UnixStream::pair()?;
read_stream.set_nonblocking(true)?;
write_stream.set_nonblocking(true)?;
let reactor = Reactor {
kqueue_fd,
read_stream,
write_stream,
};
reactor.reregister(reactor.read_stream.as_raw_fd(), !0, true, false)?;
Ok(reactor)
}
pub fn register(&self, fd: RawFd, _key: usize) -> io::Result<()> {
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
Ok(())
}
pub fn reregister(&self, fd: RawFd, key: usize, read: bool, write: bool) -> io::Result<()> {
let mut read_flags = libc::EV_ONESHOT | libc::EV_RECEIPT;
let mut write_flags = libc::EV_ONESHOT | libc::EV_RECEIPT;
if read {
read_flags |= libc::EV_ADD;
} else {
read_flags |= libc::EV_DELETE;
}
if write {
write_flags |= libc::EV_ADD;
} else {
write_flags |= libc::EV_DELETE;
}
let udata = key as _;
let changelist = [
KEvent::new(fd as _, libc::EVFILT_READ, read_flags, 0, 0, udata),
KEvent::new(fd as _, libc::EVFILT_WRITE, write_flags, 0, 0, udata),
];
let mut eventlist = changelist;
kevent_ts(self.kqueue_fd, &changelist, &mut eventlist, None)?;
for ev in &eventlist {
// Explanation for ignoring EPIPE: https://github.com/tokio-rs/mio/issues/582
let (flags, data) = (ev.flags(), ev.data());
if (flags & libc::EV_ERROR) == 1
&& data != 0
&& data != libc::ENOENT as _
&& data != libc::EPIPE as _
{
return Err(io::Error::from_raw_os_error(data as _));
}
}
Ok(())
}
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
let flags = libc::EV_DELETE | libc::EV_RECEIPT;
let changelist = [
KEvent::new(fd as _, libc::EVFILT_WRITE, flags, 0, 0, 0),
KEvent::new(fd as _, libc::EVFILT_READ, flags, 0, 0, 0),
];
let mut eventlist = changelist;
kevent_ts(self.kqueue_fd, &changelist, &mut eventlist, None)?;
for ev in &eventlist {
let (flags, data) = (ev.flags(), ev.data());
if (flags & libc::EV_ERROR == 1) && data != 0 && data != libc::ENOENT as _ {
return Err(io::Error::from_raw_os_error(data as _));
}
}
Ok(())
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout = timeout.map(|t| libc::timespec {
tv_sec: t.as_secs() as libc::time_t,
tv_nsec: t.subsec_nanos() as libc::c_long,
});
events.len = kevent_ts(self.kqueue_fd, &[], &mut events.list, timeout)?;
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
self.reregister(self.read_stream.as_raw_fd(), !0, true, false)?;
Ok(events.len)
}
pub fn notify(&self) -> io::Result<()> {
let _ = (&self.write_stream).write(&[1]);
Ok(())
}
}
pub struct Events {
list: Box<[KEvent]>,
len: usize,
}
impl Events {
pub fn new() -> Events {
let flags = 0;
let event = KEvent::new(0, 0, flags, 0, 0, 0);
let list = vec![event; 1000].into_boxed_slice();
let len = 0;
Events { list, len }
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
// On some platforms, closing the read end of a pipe wakes up writers, but the
// event is reported as EVFILT_READ with the EV_EOF flag.
//
// https://github.com/golang/go/commit/23aad448b1e3f7c3b4ba2af90120bde91ac865b4
self.list[..self.len].iter().map(|ev| Event {
readable: ev.filter() == libc::EVFILT_READ,
writable: ev.filter() == libc::EVFILT_WRITE
|| (ev.filter() == libc::EVFILT_READ && (ev.flags() & libc::EV_EOF) != 0),
key: ev.udata() as usize,
})
}
}
pub struct Event {
pub readable: bool,
pub writable: bool,
pub key: usize,
}
}
/// Raw bindings to wepoll (Windows).
#[cfg(target_os = "windows")]
mod sys {
use std::convert::TryInto;
use std::io;
use std::os::windows::io::{AsRawSocket, RawSocket};
use std::time::Duration;
use wepoll_sys_stjepang as we;
use winapi::um::winsock2;
macro_rules! syscall {
($fn:ident $args:tt) => {{
let res = unsafe { we::$fn $args };
if res == -1 {
Err(std::io::Error::last_os_error())
} else {
Ok(res)
}
}};
}
pub struct Reactor {
handle: we::HANDLE,
}
unsafe impl Send for Reactor {}
unsafe impl Sync for Reactor {}
impl Reactor {
pub fn new() -> io::Result<Reactor> {
let handle = unsafe { we::epoll_create1(0) };
if handle.is_null() {
return Err(io::Error::last_os_error());
}
Ok(Reactor { handle })
}
pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> {
unsafe {
let mut nonblocking = true as libc::c_ulong;
let res = winsock2::ioctlsocket(
sock as winsock2::SOCKET,
winsock2::FIONBIO,
&mut nonblocking,
);
if res != 0 {
return Err(io::Error::last_os_error());
}
}
let mut ev = we::epoll_event {
events: 0,
data: we::epoll_data { u64: key as u64 },
};
syscall!(epoll_ctl(
self.handle,
we::EPOLL_CTL_ADD as libc::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
}
pub fn reregister(
&self,
sock: RawSocket,
key: usize,
read: bool,
write: bool,
) -> io::Result<()> {
let mut flags = we::EPOLLONESHOT;
if read {
flags |= READ_FLAGS;
}
if write {
flags |= WRITE_FLAGS;
}
let mut ev = we::epoll_event {
events: flags as u32,
data: we::epoll_data { u64: key as u64 },
};
syscall!(epoll_ctl(
self.handle,
we::EPOLL_CTL_MOD as libc::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
}
pub fn deregister(&self, sock: RawSocket) -> io::Result<()> {
syscall!(epoll_ctl(
self.handle,
we::EPOLL_CTL_DEL as libc::c_int,
sock as we::SOCKET,
0 as *mut we::epoll_event,
))?;
Ok(())
}
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = match timeout {
None => -1,
Some(t) => {
if t == Duration::from_millis(0) {
0
} else {
t.max(Duration::from_millis(1))
.as_millis()
.try_into()
.unwrap_or(libc::c_int::max_value())
}
}
};
events.len = syscall!(epoll_wait(
self.handle,
events.list.as_mut_ptr(),
events.list.len() as libc::c_int,
timeout_ms,
))? as usize;
Ok(events.len)
}
pub fn notify(&self) -> io::Result<()> {
unsafe {
// This errors if a notification has already been posted, but that's okay.
winapi::um::ioapiset::PostQueuedCompletionStatus(
self.handle as winapi::um::winnt::HANDLE,
0,
0,
0 as *mut _,
);
}
Ok(())
}
}
struct As(RawSocket);
impl AsRawSocket for As {
fn as_raw_socket(&self) -> RawSocket {
self.0
}
}
const READ_FLAGS: u32 =
we::EPOLLIN | we::EPOLLRDHUP | we::EPOLLHUP | we::EPOLLERR | we::EPOLLPRI;
const WRITE_FLAGS: u32 = we::EPOLLOUT | we::EPOLLHUP | we::EPOLLERR;
pub struct Events {
list: Box<[we::epoll_event]>,
len: usize,
}
unsafe impl Send for Events {}
unsafe impl Sync for Events {}
impl Events {
pub fn new() -> Events {
let ev = we::epoll_event {
events: 0,
data: we::epoll_data { u64: 0 },
};
Events {
list: vec![ev; 1000].into_boxed_slice(),
len: 0,
}
}
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
readable: (ev.events & READ_FLAGS) != 0,
writable: (ev.events & WRITE_FLAGS) != 0,
key: unsafe { ev.data.u64 } as usize,
})
}
}
pub struct Event {
pub readable: bool,
pub writable: bool,
pub key: usize,
}
}

View File

@ -1,135 +0,0 @@
//! Implementation of [`run()`].
//!
//! This function is the entry point to the smol executor.
use std::future::Future;
use std::task::{Context, Poll};
use std::time::Duration;
use once_cell::sync::Lazy;
use crate::context;
use crate::multitask;
use crate::parking::Parker;
use scoped_tls::scoped_thread_local;
/// The global task queue.
pub(crate) static QUEUE: Lazy<multitask::Queue> = Lazy::new(|| multitask::Queue::new());
scoped_thread_local! {
/// Thread-local worker queue.
pub(crate) static WORKER: multitask::Worker
}
/// Runs executors and polls the reactor.
///
/// This function simultaneously runs the thread-local executor, runs the work-stealing
/// executor, and polls the reactor for I/O events and timers. At least one thread has to be
/// calling [`run()`] in order for futures waiting on I/O and timers to get notified.
///
/// # Examples
///
/// Single-threaded executor:
///
/// ```
/// // Run the thread-local and work-stealing executor on the current thread.
/// smol::run(async {
/// println!("Hello from the smol executor!");
/// });
/// ```
///
/// Multi-threaded executor:
///
/// ```no_run
/// use futures::future;
/// use smol::Task;
/// use std::thread;
///
/// // Same number of threads as there are CPU cores.
/// let num_threads = num_cpus::get().max(1);
///
/// // Run the thread-local and work-stealing executor on a thread pool.
/// for _ in 0..num_threads {
/// // A pending future is one that simply yields forever.
/// thread::spawn(|| smol::run(future::pending::<()>()));
/// }
///
/// // No need to `run()`, now we can just block on the main future.
/// smol::block_on(async {
/// Task::spawn(async {
/// println!("Hello from an executor thread!");
/// })
/// .await;
/// });
/// ```
///
/// Stoppable multi-threaded executor:
///
/// ```
/// use smol::Task;
/// use std::thread;
///
/// // Same number of threads as there are CPU cores.
/// let num_threads = num_cpus::get().max(1);
///
/// // A channel that sends the shutdown signal.
/// let (s, r) = piper::chan::<()>(0);
/// let mut threads = Vec::new();
///
/// // Create an executor thread pool.
/// for _ in 0..num_threads {
/// // Spawn an executor thread that waits for the shutdown signal.
/// let r = r.clone();
/// threads.push(thread::spawn(move || smol::run(r.recv())));
/// }
///
/// // No need to `run()`, now we can just block on the main future.
/// smol::block_on(async {
/// Task::spawn(async {
/// println!("Hello from an executor thread!");
/// })
/// .await;
/// });
///
/// // Send a shutdown signal.
/// drop(s);
///
/// // Wait for threads to finish.
/// for t in threads {
/// t.join().unwrap();
/// }
/// ```
pub fn run<T>(future: impl Future<Output = T>) -> T {
let parker = Parker::new();
let unparker = parker.unparker();
let worker = QUEUE.worker(move || unparker.unpark());
// Create a waker that triggers an I/O event in the thread-local scheduler.
let unparker = parker.unparker();
let waker = async_task::waker_fn(move || unparker.unpark());
let cx = &mut Context::from_waker(&waker);
futures_util::pin_mut!(future);
// Set up tokio if enabled.
context::enter(|| {
WORKER.set(&worker, || {
'start: loop {
// Poll the main future.
if let Poll::Ready(val) = future.as_mut().poll(cx) {
return val;
}
for _ in 0..200 {
if !worker.tick() {
parker.park();
continue 'start;
}
}
// Process ready I/O events without blocking.
parker.park_timeout(Duration::from_secs(0));
}
})
})
}

View File

@ -1,277 +0,0 @@
#[cfg(unix)]
fn check_err(res: libc::c_int) -> Result<libc::c_int, std::io::Error> {
if res == -1 {
return Err(std::io::Error::last_os_error());
}
Ok(res)
}
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "netbsd",
target_os = "openbsd",
target_os = "dragonfly",
))]
/// Kqueue.
pub mod event {
use super::check_err;
use std::os::unix::io::RawFd;
#[cfg(any(
target_os = "macos",
target_os = "ios",
target_os = "freebsd",
target_os = "dragonfly",
target_os = "openbsd"
))]
#[allow(non_camel_case_types)]
type type_of_nchanges = libc::c_int;
#[cfg(target_os = "netbsd")]
#[allow(non_camel_case_types)]
type type_of_nchanges = libc::size_t;
#[cfg(target_os = "netbsd")]
#[allow(non_camel_case_types)]
type type_of_event_filter = u32;
#[cfg(not(target_os = "netbsd"))]
#[allow(non_camel_case_types)]
type type_of_event_filter = i16;
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "openbsd"
))]
#[allow(non_camel_case_types)]
type type_of_udata = *mut libc::c_void;
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
#[allow(non_camel_case_types)]
type type_of_data = libc::intptr_t;
#[cfg(any(target_os = "netbsd"))]
#[allow(non_camel_case_types)]
type type_of_udata = libc::intptr_t;
#[cfg(any(target_os = "netbsd", target_os = "openbsd"))]
#[allow(non_camel_case_types)]
type type_of_data = libc::int64_t;
#[derive(Clone, Copy)]
#[repr(C)]
pub struct KEvent(libc::kevent);
unsafe impl Send for KEvent {}
impl KEvent {
pub fn new(
ident: libc::uintptr_t,
filter: EventFilter,
flags: EventFlag,
fflags: FilterFlag,
data: libc::intptr_t,
udata: libc::intptr_t,
) -> KEvent {
KEvent(libc::kevent {
ident,
filter: filter as type_of_event_filter,
flags,
fflags,
data: data as type_of_data,
udata: udata as type_of_udata,
})
}
pub fn filter(&self) -> EventFilter {
unsafe { std::mem::transmute(self.0.filter as type_of_event_filter) }
}
pub fn flags(&self) -> EventFlag {
self.0.flags
}
pub fn data(&self) -> libc::intptr_t {
self.0.data as libc::intptr_t
}
pub fn udata(&self) -> libc::intptr_t {
self.0.udata as libc::intptr_t
}
}
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos",
target_os = "openbsd"
))]
pub type EventFlag = u16;
#[cfg(any(target_os = "netbsd"))]
pub type EventFlag = u32;
pub type FilterFlag = u32;
#[cfg(target_os = "netbsd")]
pub type EventFilter = u32;
#[cfg(not(target_os = "netbsd"))]
pub type EventFilter = i16;
pub fn kqueue() -> Result<RawFd, std::io::Error> {
let res = unsafe { libc::kqueue() };
check_err(res)
}
pub fn kevent_ts(
kq: RawFd,
changelist: &[KEvent],
eventlist: &mut [KEvent],
timeout_opt: Option<libc::timespec>,
) -> Result<usize, std::io::Error> {
let res = unsafe {
libc::kevent(
kq,
changelist.as_ptr() as *const libc::kevent,
changelist.len() as type_of_nchanges,
eventlist.as_mut_ptr() as *mut libc::kevent,
eventlist.len() as type_of_nchanges,
if let Some(ref timeout) = timeout_opt {
timeout as *const libc::timespec
} else {
std::ptr::null()
},
)
};
check_err(res).map(|r| r as usize)
}
}
#[cfg(any(target_os = "linux", target_os = "android", target_os = "illumos"))]
/// Epoll.
pub mod epoll {
use super::check_err;
use std::os::unix::io::RawFd;
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[repr(i32)]
pub enum EpollOp {
EpollCtlAdd = libc::EPOLL_CTL_ADD,
EpollCtlDel = libc::EPOLL_CTL_DEL,
EpollCtlMod = libc::EPOLL_CTL_MOD,
}
pub type EpollFlags = libc::c_int;
pub fn epoll_create1() -> Result<RawFd, std::io::Error> {
// According to libuv, `EPOLL_CLOEXEC` is not defined on Android API < 21.
// But `EPOLL_CLOEXEC` is an alias for `O_CLOEXEC` on that platform, so we use it instead.
#[cfg(target_os = "android")]
const CLOEXEC: libc::c_int = libc::O_CLOEXEC;
#[cfg(not(target_os = "android"))]
const CLOEXEC: libc::c_int = libc::EPOLL_CLOEXEC;
let fd = unsafe {
// Check if the `epoll_create1` symbol is available on this platform.
let ptr = libc::dlsym(
libc::RTLD_DEFAULT,
"epoll_create1\0".as_ptr() as *const libc::c_char,
);
if ptr.is_null() {
// If not, use `epoll_create` and manually set `CLOEXEC`.
let fd = check_err(libc::epoll_create(1024))?;
let flags = libc::fcntl(fd, libc::F_GETFD);
libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
fd
} else {
// Use `epoll_create1` with `CLOEXEC`.
let epoll_create1 = std::mem::transmute::<
*mut libc::c_void,
unsafe extern "C" fn(libc::c_int) -> libc::c_int,
>(ptr);
check_err(epoll_create1(CLOEXEC))?
}
};
Ok(fd)
}
pub fn epoll_ctl<'a, T>(
epfd: RawFd,
op: EpollOp,
fd: RawFd,
event: T,
) -> Result<(), std::io::Error>
where
T: Into<Option<&'a mut EpollEvent>>,
{
let mut event: Option<&mut EpollEvent> = event.into();
if event.is_none() && op != EpollOp::EpollCtlDel {
Err(std::io::Error::from_raw_os_error(libc::EINVAL))
} else {
let res = unsafe {
if let Some(ref mut event) = event {
libc::epoll_ctl(epfd, op as libc::c_int, fd, &mut event.event)
} else {
libc::epoll_ctl(epfd, op as libc::c_int, fd, std::ptr::null_mut())
}
};
check_err(res).map(drop)
}
}
pub fn epoll_wait(
epfd: RawFd,
events: &mut [EpollEvent],
timeout_ms: isize,
) -> Result<usize, std::io::Error> {
let res = unsafe {
libc::epoll_wait(
epfd,
events.as_mut_ptr() as *mut libc::epoll_event,
events.len() as libc::c_int,
timeout_ms as libc::c_int,
)
};
check_err(res).map(|r| r as usize)
}
#[derive(Clone, Copy)]
#[repr(transparent)]
pub struct EpollEvent {
event: libc::epoll_event,
}
impl EpollEvent {
pub fn new(events: EpollFlags, data: u64) -> Self {
EpollEvent {
event: libc::epoll_event {
events: events as u32,
u64: data,
},
}
}
pub fn empty() -> Self {
unsafe { std::mem::zeroed::<EpollEvent>() }
}
pub fn events(&self) -> EpollFlags {
self.event.events as libc::c_int
}
pub fn data(&self) -> u64 {
self.event.u64
}
}
}

View File

@ -1,283 +0,0 @@
//! The task system.
//!
//! A [`Task`] handle represents a spawned future that is run by the executor.
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::blocking::BlockingExecutor;
use crate::run::{QUEUE, WORKER};
/// A runnable future, ready for execution.
///
/// When a future is internally spawned using `async_task::spawn()` or `async_task::spawn_local()`,
/// we get back two values:
///
/// 1. an `async_task::Task<()>`, which we refer to as a `Runnable`
/// 2. an `async_task::JoinHandle<T, ()>`, which is wrapped inside a `Task<T>`
///
/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken. When it's
/// woken up, its schedule function is called, which means the `Runnable` gets pushed into a task
/// queue in an executor.
pub(crate) type Runnable = async_task::Task<()>;
/// A spawned future.
///
/// Tasks are also futures themselves and yield the output of the spawned future.
///
/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit
/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method.
///
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
///
/// If the future panics, the panic will be unwound into the [`run()`] invocation that polled it.
/// However, this does not apply to the blocking executor - it will simply ignore panics and
/// continue running.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// // Spawn a task onto the work-stealing executor.
/// let task = Task::spawn(async {
/// println!("Hello from a task!");
/// 1 + 2
/// });
///
/// // Wait for the task to complete.
/// assert_eq!(task.await, 3);
/// # });
/// ```
///
/// [`run()`]: `crate::run()`
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
#[derive(Debug)]
pub struct Task<T>(pub(crate) Option<async_task::JoinHandle<T, ()>>);
impl<T: 'static> Task<T> {
/// Spawns a future onto the thread-local executor.
///
/// Panics if the current thread is not inside an invocation of [`run()`].
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// let task = Task::local(async { 1 + 2 });
/// assert_eq!(task.await, 3);
/// # })
/// ```
///
/// [`run()`]: `crate::run()`
pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
if WORKER.is_set() {
WORKER.with(|w| w.spawn_local(future))
} else {
panic!("cannot spawn a thread-local task if not inside an executor")
}
}
}
impl<T: Send + 'static> Task<T> {
/// Spawns a future onto the work-stealing executor.
///
/// This future may be stolen and polled by any thread calling [`run()`].
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # smol::run(async {
/// let task = Task::spawn(async { 1 + 2 });
/// assert_eq!(task.await, 3);
/// # });
/// ```
///
/// [`run()`]: `crate::run()`
pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
if WORKER.is_set() {
WORKER.with(|w| w.spawn_local(future))
} else {
QUEUE.spawn(future)
}
}
/// Spawns a future onto the blocking executor.
///
/// This future is allowed to block for an indefinite length of time.
///
/// For convenience, there is also the [`blocking!`] macro that spawns a blocking tasks and
/// immediately awaits it.
///
/// # Examples
///
/// Read a line from the standard input:
///
/// ```no_run
/// use smol::Task;
/// use std::io::stdin;
///
/// # smol::block_on(async {
/// let line = Task::blocking(async {
/// let mut line = String::new();
/// std::io::stdin().read_line(&mut line).unwrap();
/// line
/// })
/// .await;
/// # });
/// ```
///
/// See also examples for [`blocking!`], [`iter()`], [`reader()`], and [`writer()`].
///
/// [`iter()`]: `crate::iter()`
/// [`reader()`]: `crate::reader()`
/// [`writer()`]: `crate::writer()`
pub fn blocking(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
BlockingExecutor::get().spawn(future)
}
}
impl<T, E> Task<Result<T, E>>
where
T: Send + 'static,
E: Debug + Send + 'static,
{
/// Spawns a new task that awaits and unwraps the result.
///
/// The new task will panic if the original task results in an error.
///
/// # Examples
///
/// ```
/// use smol::{Async, Task};
/// use std::net::TcpStream;
///
/// # smol::run(async {
/// let stream = Task::spawn(async {
/// Async::<TcpStream>::connect("example.com:80").await
/// })
/// .unwrap()
/// .await;
/// # })
/// ```
pub fn unwrap(self) -> Task<T> {
Task::spawn(async { self.await.unwrap() })
}
/// Spawns a new task that awaits and unwraps the result.
///
/// The new task will panic with the provided message if the original task results in an error.
///
/// # Examples
///
/// ```
/// use smol::{Async, Task};
/// use std::net::TcpStream;
///
/// # smol::run(async {
/// let stream = Task::spawn(async {
/// Async::<TcpStream>::connect("example.com:80").await
/// })
/// .expect("cannot connect")
/// .await;
/// # })
/// ```
pub fn expect(self, msg: &str) -> Task<T> {
let msg = msg.to_owned();
Task::spawn(async move { self.await.expect(&msg) })
}
}
impl Task<()> {
/// Detaches the task to let it keep running in the background.
///
/// # Examples
///
/// ```no_run
/// use smol::{Task, Timer};
/// use std::time::Duration;
///
/// # smol::run(async {
/// Task::spawn(async {
/// loop {
/// println!("I'm a daemon task looping forever.");
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// })
/// .detach();
/// # })
/// ```
pub fn detach(mut self) {
self.0.take().unwrap();
}
}
impl<T> Task<T> {
/// Cancels the task and waits for it to stop running.
///
/// Returns the task's output if it was completed just before it got canceled, or `None` if it
/// didn't complete.
///
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
/// canceling because it also waits for the task to stop running.
///
/// # Examples
///
/// ```
/// use smol::{Task, Timer};
/// use std::time::Duration;
///
/// # smol::run(async {
/// let task = Task::spawn(async {
/// loop {
/// println!("Even though I'm in an infinite loop, you can still cancel me!");
/// Timer::after(Duration::from_secs(1)).await;
/// }
/// });
///
/// Timer::after(Duration::from_secs(3)).await;
/// task.cancel().await;
/// # })
/// ```
pub async fn cancel(self) -> Option<T> {
// There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just
// do `{ self }` here to avoid marking `self` as mutable.
let handle = { self }.0.take().unwrap();
handle.cancel();
handle.await
}
}
impl<T> Drop for Task<T> {
fn drop(&mut self) {
if let Some(handle) = &self.0 {
handle.cancel();
}
}
}
impl<T> Future for Task<T> {
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => Poll::Ready(output.expect("task has failed")),
}
}
}
impl<T> Into<async_task::JoinHandle<T, ()>> for Task<T> {
fn into(mut self) -> async_task::JoinHandle<T, ()> {
self.0
.take()
.expect("task was already canceled or has failed")
}
}

View File

@ -1,139 +0,0 @@
//! Implementation of [`Timer`].
//!
//! Timers are futures that fire at a predefined point in time.
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use crate::reactor::Reactor;
/// Fires at the chosen point in time.
///
/// Timers are futures that output the [`Instant`] at which they fired.
///
/// # Examples
///
/// Sleep for 1 second:
///
/// ```
/// use smol::Timer;
/// use std::time::Duration;
///
/// async fn sleep(dur: Duration) {
/// Timer::after(dur).await;
/// }
///
/// # smol::run(async {
/// sleep(Duration::from_secs(1)).await;
/// # });
/// ```
///
/// Set a timeout on an I/O operation:
///
/// ```
/// use futures::future::Either;
/// use futures::io::{self, BufReader};
/// use futures::prelude::*;
/// use smol::Timer;
/// use std::time::Duration;
///
/// async fn timeout<T>(
/// dur: Duration,
/// f: impl Future<Output = io::Result<T>>,
/// ) -> io::Result<T> {
/// futures::pin_mut!(f);
/// match future::select(f, Timer::after(dur)).await {
/// Either::Left((out, _)) => out,
/// Either::Right(_) => Err(io::ErrorKind::TimedOut.into()),
/// }
/// }
///
/// # smol::run(async {
/// // Create a buffered stdin reader.
/// let mut stdin = BufReader::new(smol::reader(std::io::stdin()));
///
/// // Read a line within 5 seconds.
/// let mut line = String::new();
/// timeout(Duration::from_secs(5), stdin.read_line(&mut line)).await?;
/// # io::Result::Ok(()) });
/// ```
#[derive(Debug)]
pub struct Timer {
/// This timer's ID.
///
/// When this field is set to `None`, this timer is not registered in the reactor.
id: Option<usize>,
/// When this timer fires.
when: Instant,
}
impl Timer {
/// Fires after the specified duration of time.
///
/// # Examples
///
/// ```
/// use smol::Timer;
/// use std::time::Duration;
///
/// # smol::run(async {
/// Timer::after(Duration::from_secs(1)).await;
/// # });
/// ```
pub fn after(dur: Duration) -> Timer {
Timer::at(Instant::now() + dur)
}
/// Fires at the specified instant in time.
///
/// # Examples
///
/// ```
/// use smol::Timer;
/// use std::time::{Duration, Instant};
///
/// # smol::run(async {
/// let now = Instant::now();
/// let when = now + Duration::from_secs(1);
/// Timer::at(when).await;
/// # });
/// ```
pub fn at(when: Instant) -> Timer {
let id = None;
Timer { id, when }
}
}
impl Drop for Timer {
fn drop(&mut self) {
if let Some(id) = self.id.take() {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
}
}
}
impl Future for Timer {
type Output = Instant;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Check if the timer has already fired.
if Instant::now() >= self.when {
if let Some(id) = self.id.take() {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
}
Poll::Ready(self.when)
} else {
if self.id.is_none() {
// Register the timer in the reactor.
self.id = Some(Reactor::get().insert_timer(self.when, cx.waker()));
}
Poll::Pending
}
}
}