Big refactor

This commit is contained in:
Stjepan Glavina 2020-07-23 12:24:16 +02:00
parent 2f00e2c3d3
commit 9cffe01a92
25 changed files with 228 additions and 373 deletions

View File

@ -13,26 +13,20 @@ categories = ["asynchronous", "concurrency", "network-programming"]
readme = "README.md"
[features]
# Optional feature for seamless integration with crates depending on tokio.
# It creates a global tokio runtime and sets up its context inside smol.
#
# Enable the feature as follows:
# ```
# [dependencies]
# smol = { version = "0.2", features = ["tokio02"] }
# ```
tokio02 = ["tokio"]
default = []
tokio02 = ["tokio"] # Optional feature for compatibility with tokio-based libraries.
[dependencies]
async-io = "0.1.3"
blocking = "0.4.7"
futures-lite = "0.1.5"
multitask = "0.2.0"
async-channel = "1.1.1"
async-executor = "0.1.1"
async-io = "0.1.5"
blocking = "0.5.0"
easy-parallel = "3.1.0"
futures-lite = "0.1.8"
num_cpus = "1.13.0"
once_cell = "1.4.0"
[dependencies.tokio]
version = "0.2.21"
version = "0.2.22"
default-features = false
features = ["rt-threaded"]
optional = true
@ -43,6 +37,7 @@ async-channel = "1.1.1"
async-dup = "1.2.1"
async-h1 = "2.1.0"
async-native-tls = "0.3.3"
async-net = "0.1.1"
async-std = "1.6.2"
async-tungstenite = { version = "0.7.1", features = ["async-native-tls"] }
base64 = "0.12.3"
@ -56,11 +51,13 @@ num_cpus = "1.13.0"
reqwest = "0.10.6"
scraper = "0.12.0"
signal-hook = "0.1.16"
smol = { path = ".", features = ["tokio02"] }
surf = { version = "2.0.0-alpha.4", default-features = false, features = ["h1-client"] }
tempfile = "3.1.0"
tokio = { version = "0.2.21", default-features = false }
tide = "0.12.0"
tungstenite = "0.11.0"
url = "2.1.1"
warp = "0.2.4"
[target.'cfg(target_os = "linux")'.dev-dependencies]
inotify = { version = "0.8.3", default-features = false }

View File

@ -11,87 +11,55 @@ https://docs.rs/smol)
[![Chat](https://img.shields.io/discord/701824908866617385.svg?logo=discord)](
https://discord.gg/x6m5Vvt)
A small and fast executor.
A small and fast async runtime.
Reading the [docs] or looking at the [examples] is a great way to start learning
async Rust.
## Examples
[docs]: https://docs.rs/smol
[examples]: ./examples
Connect to an HTTP website, make a GET request, and pipe the response to the standard output:
Async I/O is implemented using [epoll] on Linux/Android, [kqueue] on
macOS/iOS/BSD, and [wepoll] on Windows.
```rust
use async_net::TcpStream;
use smol::{io, prelude::*, Unblock};
What makes smol different from [async-std] and [tokio]?
Read this [blog post](https://stjepang.github.io/2020/04/03/why-im-building-a-new-async-runtime.html).
fn main() -> io::Result<()> {
smol::run(async {
let mut stream = TcpStream::connect("example.com:80").await?;
let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
stream.write_all(req).await?;
[epoll]: https://en.wikipedia.org/wiki/Epoll
[kqueue]: https://en.wikipedia.org/wiki/Kqueue
[wepoll]: https://github.com/piscisaureus/wepoll
let mut stdout = Unblock::new(std::io::stdout());
io::copy(&stream, &mut stdout).await?;
Ok(())
})
}
```
## Features
This example uses [`async-net`] for networking, but you can also use the primitive `Async`
type. See the [full code][get-request].
* Async TCP, UDP, Unix domain sockets, and custom file descriptors.
* Thread-local executor for `!Send` futures.
* Work-stealing executor that adapts to uneven workloads.
* Blocking executor for files, processes, and standard I/O.
* Tasks that support cancellation.
* Userspace timers.
Look inside the [examples] directory for more.
[`async-net`]: https://docs.rs/async-net
[examples]: https://github.com/stjepang/smol/tree/master/examples
[get-request]: https://github.com/stjepang/smol/blob/master/examples/get-request.rs
## Compatibility
See [this example](./examples/other-runtimes.rs) for how to use smol with
[async-std], [tokio], [surf], and [reqwest].
All async libraries work with smol out of the box.
There is an optional feature for seamless integration with crates depending
on tokio. It creates a global tokio runtime and sets up its context inside smol.
Enable the feature as follows:
However, [tokio] is a special case due to its generally hostile attitude towards non-tokio
libraries, insistence on non-standard I/O traits, and lack of documentation on integration
with the larger Rust ecosystem. Fortunately, there are ways around it.
Enable the `tokio02` feature flag and `smol::run()` will create a minimal
tokio runtime for its libraries:
```toml
[dependencies]
smol = { version = "0.2", features = ["tokio02"] }
```
[async-std]: https://docs.rs/async-std
[tokio]: https://docs.rs/tokio
[surf]: https://docs.rs/surf
[reqwest]: https://docs.rs/reqwest
## Documentation
You can read the docs [here][docs], or generate them on your own.
If you'd like to explore the implementation in more depth, the following
command generates docs for the whole crate, including private modules:
```
cargo doc --document-private-items --no-deps --open
```
[docs]: https://docs.rs/smol
## Other crates
My personal crate recommendation list:
* Channels, pipes, and mutexes: [piper]
* HTTP clients: [surf], [isahc], [reqwest]
* HTTP servers: [async-h1], [hyper]
* WebSockets: [async-tungstenite]
* TLS authentication: [async-native-tls]
* Signals: [ctrlc], [signal-hook]
[piper]: https://docs.rs/piper
[surf]: https://docs.rs/surf
[isahc]: https://docs.rs/isahc
[reqwest]: https://docs.rs/reqwest
[async-h1]: https://docs.rs/async-h1
[hyper]: https://docs.rs/hyper
[async-tungstenite]: https://docs.rs/async-tungstenite
[async-native-tls]: https://docs.rs/async-native-tls
[native-tls]: https://docs.rs/native-tls
[ctrlc]: https://docs.rs/ctrlc
[signal-hook]: https://docs.rs/signal-hook
## TLS certificate

View File

@ -10,7 +10,7 @@ use std::net::{TcpStream, ToSocketAddrs};
use anyhow::{bail, Context as _, Error, Result};
use http_types::{Method, Request, Response};
use smol::{block_on, io::AsyncReadExt, unblock, Async};
use smol::{prelude::*, Async};
use url::Url;
/// Sends a request and fetches the response.
@ -25,7 +25,7 @@ async fn fetch(req: Request) -> Result<Response> {
// Connect to the host.
let socket_addr = {
let host = host.clone();
unblock!((host.as_str(), port).to_socket_addrs())?
smol::unblock!((host.as_str(), port).to_socket_addrs())?
.next()
.context("cannot resolve address")?
};
@ -45,7 +45,7 @@ async fn fetch(req: Request) -> Result<Response> {
}
fn main() -> Result<()> {
block_on(async {
smol::run(async {
// Create a request.
let addr = "https://www.rust-lang.org";
let req = Request::new(Method::Get, Url::parse(addr)?);

View File

@ -18,7 +18,7 @@ use std::net::TcpListener;
use anyhow::Result;
use async_native_tls::{Identity, TlsAcceptor};
use http_types::{Request, Response, StatusCode};
use smol::{block_on, future, Async, Task};
use smol::{future, Async, Task};
/// Serves a request and returns a response.
async fn serve(req: Request) -> http_types::Result<Response> {
@ -83,7 +83,7 @@ fn main() -> Result<()> {
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Start HTTP and HTTPS servers.
block_on(async {
smol::run(async {
let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None);
let https = listen(
Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?,

View File

@ -14,10 +14,10 @@
use std::net::TcpStream;
use smol::{block_on, future, io, Async, Unblock};
use smol::{future, io, Async, Unblock};
fn main() -> io::Result<()> {
block_on(async {
smol::run(async {
// Connect to the server and create async stdin and stdout.
let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 6000)).await?;
let stdin = Unblock::new(std::io::stdin());

View File

@ -17,7 +17,7 @@ use std::net::{SocketAddr, TcpListener, TcpStream};
use async_channel::{bounded, Receiver, Sender};
use async_dup::Arc;
use smol::{block_on, io, io::AsyncBufReadExt, io::AsyncWriteExt, stream::StreamExt, Async, Task};
use smol::{io, prelude::*, Async, Task};
/// An event on the chat server.
enum Event {
@ -76,7 +76,7 @@ async fn read_messages(sender: Sender<Event>, client: Arc<Async<TcpStream>>) ->
}
fn main() -> io::Result<()> {
block_on(async {
smol::run(async {
// Create a listener for incoming client connections.
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 6000))?;

View File

@ -6,7 +6,7 @@
//! cargo run --example ctrl-c
//! ```
use smol::{block_on, future};
use smol::future;
fn main() {
// Set a handler that sends a message through a channel.
@ -16,7 +16,7 @@ fn main() {
};
ctrlc::set_handler(handle).unwrap();
block_on(async {
smol::run(async {
println!("Waiting for Ctrl-C...");
// Receive a message that indicates the Ctrl-C signal occurred.

28
examples/get-request.rs Normal file
View File

@ -0,0 +1,28 @@
//! Connect to an HTTP website, make a GET request, and pipes the response to the standard output.
//!
//! Run with:
//!
//! ```
//! cargo run --example get-request
//! ```
use smol::{io, prelude::*, Async, Unblock};
use std::net::{TcpStream, ToSocketAddrs};
fn main() -> io::Result<()> {
smol::run(async {
// Connect to http://example.com
let mut addrs = smol::unblock!(("example.com", 80).to_socket_addrs())?;
let addr = addrs.next().unwrap();
let mut stream = Async::<TcpStream>::connect(addr).await?;
// Send an HTTP GET request.
let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
stream.write_all(req).await?;
// Read the response and pipe it to the standard output.
let mut stdout = Unblock::new(std::io::stdout());
io::copy(&stream, &mut stdout).await?;
Ok(())
})
}

View File

@ -15,10 +15,7 @@ use anyhow::{bail, Context as _, Error, Result};
use async_native_tls::TlsStream;
use http::Uri;
use hyper::{Body, Client, Request, Response};
use smol::{
block_on, future::Future, io, io::AsyncRead, io::AsyncWrite, stream::StreamExt, unblock, Async,
Task,
};
use smol::{io, prelude::*, Async, Task};
/// Sends a request and fetches the response.
async fn fetch(req: Request<Body>) -> Result<Response<Body>> {
@ -30,7 +27,7 @@ async fn fetch(req: Request<Body>) -> Result<Response<Body>> {
}
fn main() -> Result<()> {
block_on(async {
smol::run(async {
// Create a request.
let req = Request::get("https://www.rust-lang.org").body(Body::empty())?;
@ -84,7 +81,7 @@ impl hyper::service::Service<Uri> for SmolConnector {
let socket_addr = {
let host = host.to_string();
let port = uri.port_u16().unwrap_or(80);
unblock!((host.as_str(), port).to_socket_addrs())?
smol::unblock!((host.as_str(), port).to_socket_addrs())?
.next()
.context("cannot resolve address")?
};
@ -96,7 +93,7 @@ impl hyper::service::Service<Uri> for SmolConnector {
let socket_addr = {
let host = host.to_string();
let port = uri.port_u16().unwrap_or(443);
unblock!((host.as_str(), port).to_socket_addrs())?
smol::unblock!((host.as_str(), port).to_socket_addrs())?
.next()
.context("cannot resolve address")?
};

View File

@ -21,10 +21,7 @@ use anyhow::{Error, Result};
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server};
use smol::{
block_on, future, future::Future, io, io::AsyncRead, io::AsyncWrite, ready, stream::Stream,
Async, Task,
};
use smol::{future, io, prelude::*, Async, Task};
/// Serves a request and returns a response.
async fn serve(req: Request<Body>, host: String) -> Result<Response<Body>> {
@ -59,7 +56,7 @@ fn main() -> Result<()> {
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Start HTTP and HTTPS servers.
block_on(async {
smol::run(async {
let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None);
let https = listen(
Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?,
@ -97,11 +94,11 @@ impl hyper::server::accept::Accept for SmolListener {
type Error = Error;
fn poll_accept(
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut Context,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let poll = Pin::new(&mut self.listener.incoming()).poll_next(cx);
let stream = ready!(poll).unwrap()?;
let stream = smol::ready!(poll).unwrap()?;
let stream = match &self.tls {
None => SmolStream::Plain(stream),
@ -150,7 +147,7 @@ impl tokio::io::AsyncRead for SmolStream {
SmolStream::Plain(s) => return Pin::new(s).poll_read(cx, buf),
SmolStream::Tls(s) => return Pin::new(s).poll_read(cx, buf),
SmolStream::Handshake(f) => {
let s = ready!(f.as_mut().poll(cx))?;
let s = smol::ready!(f.as_mut().poll(cx))?;
*self = SmolStream::Tls(s);
}
}
@ -169,7 +166,7 @@ impl tokio::io::AsyncWrite for SmolStream {
SmolStream::Plain(s) => return Pin::new(s).poll_write(cx, buf),
SmolStream::Tls(s) => return Pin::new(s).poll_write(cx, buf),
SmolStream::Handshake(f) => {
let s = ready!(f.as_mut().poll(cx))?;
let s = smol::ready!(f.as_mut().poll(cx))?;
*self = SmolStream::Tls(s);
}
}

View File

@ -11,7 +11,7 @@ fn main() -> std::io::Result<()> {
use std::ffi::OsString;
use inotify::{EventMask, Inotify, WatchMask};
use smol::{block_on, io, Async};
use smol::{io, Async};
type Event = (OsString, EventMask);
@ -32,7 +32,7 @@ fn main() -> std::io::Result<()> {
}
}
block_on(async {
smol::run(async {
// Watch events in the current directory.
let mut inotify = Async::new(Inotify::init()?)?;
inotify.get_mut().add_watch(".", WatchMask::ALL_EVENTS)?;

View File

@ -11,7 +11,7 @@ fn main() -> std::io::Result<()> {
use std::os::unix::io::AsRawFd;
use std::time::{Duration, Instant};
use smol::{block_on, io, Async};
use smol::{io, Async};
use timerfd::{SetTimeFlags, TimerFd, TimerState};
/// Converts a [`nix::Error`] into [`std::io::Error`].
@ -35,7 +35,7 @@ fn main() -> std::io::Result<()> {
Ok(())
}
block_on(async {
smol::run(async {
let start = Instant::now();
println!("Sleeping...");

View File

@ -1,9 +1,6 @@
//! Demonstrates how to use `async-std`, `tokio`, `surf`, and `request`.
//!
//! There is an optional feature for seamless integration with crates depending on tokio.
//! It creates a global tokio runtime and sets up its context inside smol.
//!
//! Enable the feature as follows:
//! For compatibility with tokio-based libraries, enable the `tokio02` feature flag:
//!
//! ```toml
//! [dependencies]
@ -19,10 +16,9 @@
use std::time::{Duration, Instant};
use anyhow::{Error, Result};
use smol::block_on;
fn main() -> Result<()> {
block_on(async {
smol::run(async {
// Sleep using async-std.
let start = Instant::now();
println!("Sleeping using async-std...");

View File

@ -9,7 +9,7 @@
use std::net::{TcpStream, ToSocketAddrs};
use anyhow::{bail, Context as _, Result};
use smol::{block_on, io::AsyncReadExt, io::AsyncWriteExt, unblock, Async};
use smol::{prelude::*, Async};
use url::Url;
/// Sends a GET request and fetches the response.
@ -33,7 +33,7 @@ async fn fetch(addr: &str) -> Result<Vec<u8>> {
// Connect to the host.
let socket_addr = {
let host = host.clone();
unblock!((host.as_str(), port).to_socket_addrs())?
smol::unblock!((host.as_str(), port).to_socket_addrs())?
.next()
.context("cannot resolve address")?
};
@ -59,7 +59,7 @@ async fn fetch(addr: &str) -> Result<Vec<u8>> {
}
fn main() -> Result<()> {
block_on(async {
smol::run(async {
let addr = "https://www.rust-lang.org";
let resp = fetch(addr).await?;
println!("{}", String::from_utf8_lossy(&resp));

View File

@ -17,7 +17,7 @@ use std::net::{TcpListener, TcpStream};
use anyhow::Result;
use async_native_tls::{Identity, TlsAcceptor};
use smol::{block_on, future, io::AsyncWriteExt, Async, Task};
use smol::{future, prelude::*, Async, Task};
const RESPONSE: &[u8] = br#"
HTTP/1.1 200 OK
@ -80,7 +80,7 @@ fn main() -> Result<()> {
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Start HTTP and HTTPS servers.
block_on(async {
smol::run(async {
let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None);
let https = listen(
Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?,

View File

@ -14,10 +14,10 @@
use std::net::TcpStream;
use smol::{block_on, future, io, Async, Unblock};
use smol::{future, io, Async, Unblock};
fn main() -> io::Result<()> {
block_on(async {
smol::run(async {
// Create async stdin and stdout handles.
let stdin = Unblock::new(std::io::stdin());
let mut stdout = Unblock::new(std::io::stdout());

View File

@ -14,7 +14,7 @@
use std::net::{TcpListener, TcpStream};
use smol::{block_on, io, Async, Task};
use smol::{io, Async, Task};
/// Echoes messages from the client back to it.
async fn echo(stream: Async<TcpStream>) -> io::Result<()> {
@ -23,7 +23,7 @@ async fn echo(stream: Async<TcpStream>) -> io::Result<()> {
}
fn main() -> io::Result<()> {
block_on(async {
smol::run(async {
// Create a listener.
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 7000))?;
println!("Listening on {}", listener.get_ref().local_addr()?);

View File

@ -16,7 +16,7 @@ use std::net::TcpStream;
use anyhow::Result;
use async_native_tls::{Certificate, TlsConnector};
use smol::{block_on, future, io, Async, Unblock};
use smol::{future, io, Async, Unblock};
fn main() -> Result<()> {
// Initialize TLS with the local certificate.
@ -24,7 +24,7 @@ fn main() -> Result<()> {
builder.add_root_certificate(Certificate::from_pem(include_bytes!("certificate.pem"))?);
let tls = TlsConnector::from(builder);
block_on(async {
smol::run(async {
// Create async stdin and stdout handles.
let stdin = Unblock::new(std::io::stdin());
let mut stdout = Unblock::new(std::io::stdout());

View File

@ -16,7 +16,7 @@ use std::net::{TcpListener, TcpStream};
use anyhow::Result;
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use smol::{block_on, io, Async, Task};
use smol::{io, Async, Task};
/// Echoes messages from the client back to it.
async fn echo(stream: TlsStream<Async<TcpStream>>) -> Result<()> {
@ -30,7 +30,7 @@ fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
block_on(async {
smol::run(async {
// Create a listener.
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 7001))?;
println!("Listening on {}", listener.get_ref().local_addr()?);

View File

@ -10,9 +10,9 @@
fn main() -> std::io::Result<()> {
use std::os::unix::net::UnixStream;
use smol::{block_on, io::AsyncReadExt, Async};
use smol::{prelude::*, Async};
block_on(async {
smol::run(async {
// Create a Unix stream that receives a byte on each signal occurrence.
let (a, mut b) = Async::<UnixStream>::pair()?;
signal_hook::pipe::register(signal_hook::SIGINT, a)?;

View File

@ -11,7 +11,7 @@ use std::collections::{HashSet, VecDeque};
use anyhow::Result;
use async_channel::{bounded, Sender};
use scraper::{Html, Selector};
use smol::{block_on, Task};
use smol::Task;
const ROOT: &str = "https://www.rust-lang.org";
@ -34,7 +34,7 @@ fn links(body: String) -> Vec<String> {
}
fn main() -> Result<()> {
block_on(async {
smol::run(async {
let mut seen = HashSet::new();
let mut queue = VecDeque::new();
seen.insert(ROOT.to_string());

View File

@ -20,7 +20,7 @@ use anyhow::{bail, Context as _, Result};
use async_native_tls::{Certificate, TlsConnector, TlsStream};
use async_tungstenite::WebSocketStream;
use futures::sink::{Sink, SinkExt};
use smol::{block_on, stream::Stream, stream::StreamExt, unblock, Async};
use smol::{prelude::*, Async};
use tungstenite::handshake::client::Response;
use tungstenite::Message;
use url::Url;
@ -35,7 +35,7 @@ async fn connect(addr: &str, tls: TlsConnector) -> Result<(WsStream, Response)>
// Resolve the address.
let socket_addr = {
let host = host.clone();
unblock!((host.as_str(), port).to_socket_addrs())?
smol::unblock!((host.as_str(), port).to_socket_addrs())?
.next()
.context("cannot resolve address")?
};
@ -64,7 +64,7 @@ fn main() -> Result<()> {
builder.add_root_certificate(Certificate::from_pem(include_bytes!("certificate.pem"))?);
let tls = TlsConnector::from(builder);
block_on(async {
smol::run(async {
// Connect to the server.
let (mut stream, resp) = connect("wss://127.0.0.1:9001", tls).await?;
dbg!(resp);

View File

@ -20,7 +20,7 @@ use anyhow::{Context as _, Result};
use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use async_tungstenite::WebSocketStream;
use futures::sink::{Sink, SinkExt};
use smol::{block_on, future, stream::Stream, stream::StreamExt, Async, Task};
use smol::{future, prelude::*, Async, Task};
use tungstenite::Message;
/// Echoes messages from the client back to it.
@ -64,7 +64,7 @@ fn main() -> Result<()> {
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Start WS and WSS servers.
block_on(async {
smol::run(async {
let ws = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 9000))?, None);
let wss = listen(
Async::<TcpListener>::bind(([127, 0, 0, 1], 9001))?,

View File

@ -10,7 +10,7 @@
fn main() -> std::io::Result<()> {
use std::path::PathBuf;
use smol::{block_on, io, io::AsyncWriteExt, Async, Task, Unblock};
use smol::{io, prelude::*, Async, Task, Unblock};
use tempfile::tempdir;
use uds_windows::{UnixListener, UnixStream};
@ -28,7 +28,7 @@ fn main() -> std::io::Result<()> {
let dir = tempdir()?;
let path = dir.path().join("socket");
block_on(async {
smol::run(async {
// Create a listener.
let listener = Async::new(UnixListener::bind(&path)?)?;
println!("Listening on {:?}", listener.get_ref().local_addr()?);

View File

@ -1,284 +1,156 @@
//! A small and fast async runtime.
//!
//! This library provides:
//!
//! * Tools for working with [`future`]s, [`stream`]s, and async [I/O][`io`].
//! * Hooks into epoll/kqueue/wepoll for [`Async`] I/O and [`Timer`]s.
//! * Glue between async and blocking code: [`block_on()`], [`BlockOn`], [`unblock()`], [`Unblock`].
//! * An executor for spawning [`Task`]s.
//!
//! The whole implementation is a trivial amount of code - it mostly reexports types and functions
//! from other small and independent crates.
//!
//! The focus of the crate is on simplicity
//!
//! # Examples
//!
//! A simple TCP server that prints messages received from clients:
//! Connect to an HTTP website, make a GET request, and pipe the response to the standard output:
//!
//! ```no_run
//! use smol::{Async, Task, Unblock};
//! use std::net::TcpListener;
//! ```
//! use async_net::TcpStream;
//! use smol::{io, prelude::*, Unblock};
//!
//! fn main() -> std::io::Result<()> {
//! smol::block_on(async {
//! // Start listening on port 9000.
//! let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 9000))?;
//! fn main() -> io::Result<()> {
//! smol::run(async {
//! let mut stream = TcpStream::connect("example.com:80").await?;
//! let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
//! stream.write_all(req).await?;
//!
//! loop {
//! // Accept a new client.
//! let (stream, _) = listener.accept().await?;
//!
//! // Spawn a task handling this client.
//! let task = Task::spawn(async move {
//! // Create an async stdio handle.
//! let mut stdout = Unblock::new(std::io::stdout());
//!
//! // Copy data received from the client into stdout.
//! smol::io::copy(&stream, &mut stdout).await
//! });
//!
//! // Keep running the task in the background.
//! task.detach();
//! }
//! let mut stdout = Unblock::new(std::io::stdout());
//! io::copy(&stream, &mut stdout).await?;
//! Ok(())
//! })
//! }
//! ```
//!
//! To interact with the server, run `nc 127.0.0.1 9000` and type a few lines of text.
//! This example uses [`async-net`] for networking, but you can also use the primitive [`Async`]
//! type. See the [full code][get-request].
//!
//! ### More examples
//! Look inside the [examples] directory for more.
//!
//! Look inside the [examples] directory for more:
//! a [web crawler][web-crawler],
//! a [Ctrl-C handler][ctrl-c],
//! a TCP [client][tcp-client]/[server][tcp-server],
//! a TCP chat [client][chat-client]/[server][chat-server],
//! a TLS [client][tls-client]/[server][tls-server],
//! an HTTP+TLS [client][simple-client]/[server][simple-server],
//! an [async-h1] [client][async-h1-client]/[server][async-h1-server],
//! a [hyper] [client][hyper-client]/[server][hyper-server],
//! and a WebSocket+TLS [client][websocket-client]/[server][websocket-server].
//! [`async-net`]: https://docs.rs/async-net
//! [examples]: https://github.com/stjepang/smol/tree/master/examples
//! [get-request]: https://github.com/stjepang/smol/blob/master/examples/get-request.rs
//!
//! It's also possible to plug non-async libraries into the runtime: see
//! [inotify], [timerfd], [signal-hook], and [uds_windows].
//! # Compatibility
//!
//! Finally, there's an [example][other-runtimes] showing how to use smol with
//! [async-std], [tokio], [surf], and [reqwest].
//! All async libraries work with smol out of the box.
//!
//! However, [tokio] is a special case due to its generally hostile attitude towards non-tokio
//! libraries, insistence on non-standard I/O traits, and lack of documentation on integration
//! with the larger Rust ecosystem. Fortunately, there are ways around it.
//!
//! Enable the `tokio02` feature flag and [`smol::run()`][`crate::run()`] will create a minimal
//! tokio runtime for its libraries:
//!
//! ```toml
//! [dependencies]
//! smol = { version = "0.2", features = ["tokio02"] }
//! ```
//!
//! [examples]: https://github.com/stjepang/smol/tree/master/examples/!
//! [async-h1]: https://docs.rs/async-h1
//! [hyper]: https://docs.rs/hyper
//! [hyper]: https://docs.rs/tokio
//! [async-std]: https://docs.rs/async-std
//! [tokio]: https://docs.rs/tokio
//! [surf]: https://docs.rs/surf
//! [reqwest]: https://docs.rs/reqwest
//!
//! [async-h1-client]: https://github.com/stjepang/smol/blob/master/examples/async-h1-client.rs
//! [async-h1-server]: https://github.com/stjepang/smol/blob/master/examples/async-h1-server.rs
//! [chat-client]: https://github.com/stjepang/smol/blob/master/examples/chat-client.rs
//! [chat-server]: https://github.com/stjepang/smol/blob/master/examples/chat-server.rs
//! [ctrl-c]: https://github.com/stjepang/smol/blob/master/examples/ctrl-c.rs
//! [hyper-client]: https://github.com/stjepang/smol/blob/master/examples/hyper-client.rs
//! [hyper-server]: https://github.com/stjepang/smol/blob/master/examples/hyper-server.rs
//! [inotify]: https://github.com/stjepang/smol/blob/master/examples/linux-inotify.rs
//! [other-runtimes]: https://github.com/stjepang/smol/blob/master/examples/other-runtimes.rs
//! [signal-hook]: https://github.com/stjepang/smol/blob/master/examples/unix-signal.rs
//! [simple-client]: https://github.com/stjepang/smol/blob/master/examples/simple-client.rs
//! [simple-server]: https://github.com/stjepang/smol/blob/master/examples/simple-server.rs
//! [tcp-client]: https://github.com/stjepang/smol/blob/master/examples/tcp-client.rs
//! [tcp-server]: https://github.com/stjepang/smol/blob/master/examples/tcp-server.rs
//! [timerfd]: https://github.com/stjepang/smol/blob/master/examples/linux-timerfd.rs
//! [tls-client]: https://github.com/stjepang/smol/blob/master/examples/tls-client.rs
//! [tls-server]: https://github.com/stjepang/smol/blob/master/examples/tls-server.rs
//! [uds_windows]: https://github.com/stjepang/smol/blob/master/examples/windows-uds.rs
//! [web-crawler]: https://github.com/stjepang/smol/blob/master/examples/web-crawler.rs
//! [websocket-client]: https://github.com/stjepang/smol/blob/master/examples/websocket-client.rs
//! [websocket-server]: https://github.com/stjepang/smol/blob/master/examples/websocket-server.rs
#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::future::Future;
use std::panic::catch_unwind;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use multitask::Executor;
use once_cell::sync::Lazy;
use async_executor::{Executor, LocalExecutor};
use easy_parallel::Parallel;
#[doc(inline)]
pub use {
async_executor::Task,
async_io::Async,
async_io::Timer,
blocking::{block_on, BlockOn},
blocking::{unblock, Unblock},
futures_lite::{future, io, stream},
futures_lite::{pin, ready},
};
/// A spawned future.
/// Async traits and their extensions.
///
/// Tasks are also futures themselves and yield the output of the spawned future.
/// # Examples
///
/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit
/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method.
/// ```
/// use smol::prelude::*;
/// ```
pub mod prelude {
#[doc(no_inline)]
pub use futures_lite::{
future::Future,
io::{AsyncBufRead, AsyncBufReadExt},
io::{AsyncRead, AsyncReadExt},
io::{AsyncSeek, AsyncSeekExt},
io::{AsyncWrite, AsyncWriteExt},
stream::{Stream, StreamExt},
};
}
/// Starts an executor and runs the future on it.
///
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
/// This function runs two executors at the same time:
///
/// 1. The current thread runs a [`LocalExecutor`] and the main `future` on it.
/// 2. A thread pool runs an [`Executor`] until the main `future` completes.
///
/// The number of spawned threads matches the number of logical CPU cores on the system.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # blocking::block_on(async {
/// // Spawn a task onto the work-stealing executor.
/// let task = Task::spawn(async {
/// println!("Hello from a task!");
/// 1 + 2
/// });
///
/// // Wait for the task to complete.
/// assert_eq!(task.await, 3);
/// # });
/// smol::run(async {
/// let task = Task::spawn(async {
/// println!("Hello world");
/// });
/// task.await;
/// })
/// ```
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
#[derive(Debug)]
pub struct Task<T>(multitask::Task<T>);
impl<T> Task<T> {
/// Spawns a future.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # blocking::block_on(async {
/// let task = Task::spawn(async { 1 + 2 });
/// assert_eq!(task.await, 3);
/// # });
/// ```
pub fn spawn<F>(future: F) -> Task<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
static EXECUTOR: Lazy<Executor> = Lazy::new(|| {
for _ in 0..num_cpus::get().max(1) {
thread::spawn(|| {
enter(|| {
let (p, u) = async_io::parking::pair();
let ticker = EXECUTOR.ticker(move || u.unpark());
loop {
if let Ok(false) = catch_unwind(|| ticker.tick()) {
p.park();
}
}
})
});
}
Executor::new()
});
Task(EXECUTOR.spawn(future))
}
/// Detaches the task to let it keep running in the background.
///
/// # Examples
///
/// ```no_run
/// use async_io::Timer;
/// use smol::Task;
/// use std::time::Duration;
///
/// # blocking::block_on(async {
/// Task::spawn(async {
/// loop {
/// println!("I'm a daemon task looping forever.");
/// Timer::new(Duration::from_secs(1)).await;
/// }
/// })
/// .detach();
/// # })
/// ```
pub fn detach(self) {
self.0.detach();
}
/// Cancels the task and waits for it to stop running.
///
/// Returns the task's output if it was completed just before it got canceled, or [`None`] if
/// it didn't complete.
///
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
/// canceling because it also waits for the task to stop running.
///
/// # Examples
///
/// ```
/// use async_io::Timer;
/// use smol::Task;
/// use std::time::Duration;
///
/// # blocking::block_on(async {
/// let task = Task::spawn(async {
/// loop {
/// println!("Even though I'm in an infinite loop, you can still cancel me!");
/// Timer::new(Duration::from_secs(1)).await;
/// }
/// });
///
/// Timer::new(Duration::from_secs(3)).await;
/// task.cancel().await;
/// # })
/// ```
pub async fn cancel(self) -> Option<T> {
self.0.cancel().await
}
pub fn run<T>(future: impl Future<Output = T>) -> T {
setup(num_cpus::get(), future)
}
impl<T> Future for Task<T> {
type Output = T;
#[cfg(not(feature = "tokio02"))]
fn setup<T>(num_threads: usize, future: impl Future<Output = T>) -> T {
let ex = Executor::new();
let local_ex = LocalExecutor::new();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
}
// A channel that coordinates shutdown when the main future completes.
let (trigger, shutdown) = async_channel::unbounded::<()>();
let future = async move {
let _trigger = trigger; // Dropped at the end of this async block.
future.await
};
Parallel::new()
.each(0..num_threads, |_| ex.run(shutdown.recv()))
.finish(|| ex.enter(|| local_ex.run(future)))
.1
}
/// Enters the tokio context if the `tokio` feature is enabled.
fn enter<T>(f: impl FnOnce() -> T) -> T {
#[cfg(not(feature = "tokio02"))]
return f();
#[cfg(feature = "tokio02")]
fn setup<T>(num_threads: usize, future: impl Future<Output = T>) -> T {
let ex = Executor::new();
let local_ex = LocalExecutor::new();
#[cfg(feature = "tokio02")]
{
use std::cell::Cell;
use tokio::runtime::Runtime;
// A channel that signals shutdown to the thread pool when the main future completes.
let (s, shutdown) = async_channel::unbounded::<()>();
let future = async move {
let _s = s; // Drops sender at the end of this async block.
future.await
};
thread_local! {
/// The level of nested `enter` calls we are in, to ensure that the outermost always
/// has a runtime spawned.
static NESTING: Cell<usize> = Cell::new(0);
}
// A minimal tokio runtime.
let mut rt = tokio::runtime::Builder::new()
.enable_all()
.basic_scheduler()
.build()
.expect("cannot start tokio runtime");
let handle = rt.handle().clone();
/// The global tokio runtime.
static RT: Lazy<Runtime> = Lazy::new(|| Runtime::new().expect("cannot initialize tokio"));
NESTING.with(|nesting| {
let res = if nesting.get() == 0 {
nesting.replace(1);
RT.enter(f)
} else {
nesting.replace(nesting.get() + 1);
f()
};
nesting.replace(nesting.get() - 1);
res
})
}
Parallel::new()
.add(|| ex.enter(|| rt.block_on(shutdown.recv())))
.each(0..num_threads, |_| handle.enter(|| ex.run(shutdown.recv())))
.finish(|| handle.enter(|| ex.enter(|| local_ex.run(future))))
.1
}