Merge pull request #192 from stjepang/reexports

Reexport other crates into smol
This commit is contained in:
Stjepan Glavina 2020-07-23 12:42:06 +02:00 committed by GitHub
commit b8f5364e6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 240 additions and 428 deletions

View File

@ -13,25 +13,20 @@ categories = ["asynchronous", "concurrency", "network-programming"]
readme = "README.md" readme = "README.md"
[features] [features]
# Optional feature for seamless integration with crates depending on tokio. default = []
# It creates a global tokio runtime and sets up its context inside smol. tokio02 = ["tokio"] # Optional feature for compatibility with tokio-based libraries.
#
# Enable the feature as follows:
# ```
# [dependencies]
# smol = { version = "0.2", features = ["tokio02"] }
# ```
tokio02 = ["tokio"]
[dependencies] [dependencies]
async-io = "0.1.3" async-channel = "1.1.1"
multitask = "0.2.0" async-executor = "0.1.1"
async-io = "0.1.5"
blocking = "0.5.0"
easy-parallel = "3.1.0"
futures-lite = "0.1.8"
num_cpus = "1.13.0" num_cpus = "1.13.0"
once_cell = "1.4.0"
blocking = "0.4.7"
[dependencies.tokio] [dependencies.tokio]
version = "0.2.21" version = "0.2.22"
default-features = false default-features = false
features = ["rt-threaded"] features = ["rt-threaded"]
optional = true optional = true
@ -42,12 +37,12 @@ async-channel = "1.1.1"
async-dup = "1.2.1" async-dup = "1.2.1"
async-h1 = "2.1.0" async-h1 = "2.1.0"
async-native-tls = "0.3.3" async-native-tls = "0.3.3"
async-net = "0.1.1"
async-std = "1.6.2" async-std = "1.6.2"
async-tungstenite = { version = "0.7.1", features = ["async-native-tls"] } async-tungstenite = { version = "0.7.1", features = ["async-native-tls"] }
base64 = "0.12.3" base64 = "0.12.3"
ctrlc = "3.1.5" ctrlc = "3.1.5"
futures = "0.3.5" futures = "0.3.5"
futures-lite = "0.1.5"
http = "0.2.1" http = "0.2.1"
http-types = "2.3.0" http-types = "2.3.0"
hyper = { version = "0.13.7", default-features = false, features = ["stream"] } hyper = { version = "0.13.7", default-features = false, features = ["stream"] }
@ -56,11 +51,13 @@ num_cpus = "1.13.0"
reqwest = "0.10.6" reqwest = "0.10.6"
scraper = "0.12.0" scraper = "0.12.0"
signal-hook = "0.1.16" signal-hook = "0.1.16"
smol = { path = ".", features = ["tokio02"] }
surf = { version = "2.0.0-alpha.4", default-features = false, features = ["h1-client"] } surf = { version = "2.0.0-alpha.4", default-features = false, features = ["h1-client"] }
tempfile = "3.1.0" tempfile = "3.1.0"
tokio = { version = "0.2.21", default-features = false } tide = "0.12.0"
tungstenite = "0.11.0" tungstenite = "0.11.0"
url = "2.1.1" url = "2.1.1"
warp = "0.2.4"
[target.'cfg(target_os = "linux")'.dev-dependencies] [target.'cfg(target_os = "linux")'.dev-dependencies]
inotify = { version = "0.8.3", default-features = false } inotify = { version = "0.8.3", default-features = false }

View File

@ -11,87 +11,55 @@ https://docs.rs/smol)
[![Chat](https://img.shields.io/discord/701824908866617385.svg?logo=discord)]( [![Chat](https://img.shields.io/discord/701824908866617385.svg?logo=discord)](
https://discord.gg/x6m5Vvt) https://discord.gg/x6m5Vvt)
A small and fast executor. A small and fast async runtime.
Reading the [docs] or looking at the [examples] is a great way to start learning ## Examples
async Rust.
[docs]: https://docs.rs/smol Connect to an HTTP website, make a GET request, and pipe the response to the standard output:
[examples]: ./examples
Async I/O is implemented using [epoll] on Linux/Android, [kqueue] on ```rust
macOS/iOS/BSD, and [wepoll] on Windows. use async_net::TcpStream;
use smol::{io, prelude::*, Unblock};
What makes smol different from [async-std] and [tokio]? fn main() -> io::Result<()> {
Read this [blog post](https://stjepang.github.io/2020/04/03/why-im-building-a-new-async-runtime.html). smol::run(async {
let mut stream = TcpStream::connect("example.com:80").await?;
let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
stream.write_all(req).await?;
[epoll]: https://en.wikipedia.org/wiki/Epoll let mut stdout = Unblock::new(std::io::stdout());
[kqueue]: https://en.wikipedia.org/wiki/Kqueue io::copy(&stream, &mut stdout).await?;
[wepoll]: https://github.com/piscisaureus/wepoll Ok(())
})
}
```
## Features This example uses [`async-net`] for networking, but you can also use the primitive `Async`
type. See the [full code][get-request].
* Async TCP, UDP, Unix domain sockets, and custom file descriptors. Look inside the [examples] directory for more.
* Thread-local executor for `!Send` futures.
* Work-stealing executor that adapts to uneven workloads. [`async-net`]: https://docs.rs/async-net
* Blocking executor for files, processes, and standard I/O. [examples]: https://github.com/stjepang/smol/tree/master/examples
* Tasks that support cancellation. [get-request]: https://github.com/stjepang/smol/blob/master/examples/get-request.rs
* Userspace timers.
## Compatibility ## Compatibility
See [this example](./examples/other-runtimes.rs) for how to use smol with All async libraries work with smol out of the box.
[async-std], [tokio], [surf], and [reqwest].
There is an optional feature for seamless integration with crates depending However, [tokio] is generally hostile towards non-tokio libraries, insists on non-standard I/O
on tokio. It creates a global tokio runtime and sets up its context inside smol. traits, and deliberately lacks documentation on integration with the larger Rust ecosystem.
Enable the feature as follows: Fortunately, there are ways around it.
Enable the `tokio02` feature flag and `smol::run()` will create a minimal
tokio runtime for its libraries:
```toml ```toml
[dependencies] [dependencies]
smol = { version = "0.2", features = ["tokio02"] } smol = { version = "0.2", features = ["tokio02"] }
``` ```
[async-std]: https://docs.rs/async-std
[tokio]: https://docs.rs/tokio [tokio]: https://docs.rs/tokio
[surf]: https://docs.rs/surf
[reqwest]: https://docs.rs/reqwest
## Documentation
You can read the docs [here][docs], or generate them on your own.
If you'd like to explore the implementation in more depth, the following
command generates docs for the whole crate, including private modules:
```
cargo doc --document-private-items --no-deps --open
```
[docs]: https://docs.rs/smol
## Other crates
My personal crate recommendation list:
* Channels, pipes, and mutexes: [piper]
* HTTP clients: [surf], [isahc], [reqwest]
* HTTP servers: [async-h1], [hyper]
* WebSockets: [async-tungstenite]
* TLS authentication: [async-native-tls]
* Signals: [ctrlc], [signal-hook]
[piper]: https://docs.rs/piper
[surf]: https://docs.rs/surf
[isahc]: https://docs.rs/isahc
[reqwest]: https://docs.rs/reqwest
[async-h1]: https://docs.rs/async-h1
[hyper]: https://docs.rs/hyper
[async-tungstenite]: https://docs.rs/async-tungstenite
[async-native-tls]: https://docs.rs/async-native-tls
[native-tls]: https://docs.rs/native-tls
[ctrlc]: https://docs.rs/ctrlc
[signal-hook]: https://docs.rs/signal-hook
## TLS certificate ## TLS certificate

View File

@ -9,10 +9,8 @@
use std::net::{TcpStream, ToSocketAddrs}; use std::net::{TcpStream, ToSocketAddrs};
use anyhow::{bail, Context as _, Error, Result}; use anyhow::{bail, Context as _, Error, Result};
use async_io::Async;
use blocking::{block_on, unblock};
use futures_lite::*;
use http_types::{Method, Request, Response}; use http_types::{Method, Request, Response};
use smol::{prelude::*, Async};
use url::Url; use url::Url;
/// Sends a request and fetches the response. /// Sends a request and fetches the response.
@ -27,7 +25,7 @@ async fn fetch(req: Request) -> Result<Response> {
// Connect to the host. // Connect to the host.
let socket_addr = { let socket_addr = {
let host = host.clone(); let host = host.clone();
unblock!((host.as_str(), port).to_socket_addrs())? smol::unblock!((host.as_str(), port).to_socket_addrs())?
.next() .next()
.context("cannot resolve address")? .context("cannot resolve address")?
}; };
@ -47,7 +45,7 @@ async fn fetch(req: Request) -> Result<Response> {
} }
fn main() -> Result<()> { fn main() -> Result<()> {
block_on(async { smol::run(async {
// Create a request. // Create a request.
let addr = "https://www.rust-lang.org"; let addr = "https://www.rust-lang.org";
let req = Request::new(Method::Get, Url::parse(addr)?); let req = Request::new(Method::Get, Url::parse(addr)?);

View File

@ -16,12 +16,9 @@
use std::net::TcpListener; use std::net::TcpListener;
use anyhow::Result; use anyhow::Result;
use async_io::Async;
use async_native_tls::{Identity, TlsAcceptor}; use async_native_tls::{Identity, TlsAcceptor};
use blocking::block_on;
use futures_lite::*;
use http_types::{Request, Response, StatusCode}; use http_types::{Request, Response, StatusCode};
use smol::Task; use smol::{future, Async, Task};
/// Serves a request and returns a response. /// Serves a request and returns a response.
async fn serve(req: Request) -> http_types::Result<Response> { async fn serve(req: Request) -> http_types::Result<Response> {
@ -86,7 +83,7 @@ fn main() -> Result<()> {
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?); let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Start HTTP and HTTPS servers. // Start HTTP and HTTPS servers.
block_on(async { smol::run(async {
let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None); let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None);
let https = listen( let https = listen(
Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?, Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?,

View File

@ -14,12 +14,10 @@
use std::net::TcpStream; use std::net::TcpStream;
use async_io::Async; use smol::{future, io, Async, Unblock};
use blocking::{block_on, Unblock};
use futures_lite::*;
fn main() -> io::Result<()> { fn main() -> io::Result<()> {
block_on(async { smol::run(async {
// Connect to the server and create async stdin and stdout. // Connect to the server and create async stdin and stdout.
let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 6000)).await?; let stream = Async::<TcpStream>::connect(([127, 0, 0, 1], 6000)).await?;
let stdin = Unblock::new(std::io::stdin()); let stdin = Unblock::new(std::io::stdin());

View File

@ -17,10 +17,7 @@ use std::net::{SocketAddr, TcpListener, TcpStream};
use async_channel::{bounded, Receiver, Sender}; use async_channel::{bounded, Receiver, Sender};
use async_dup::Arc; use async_dup::Arc;
use async_io::Async; use smol::{io, prelude::*, Async, Task};
use blocking::block_on;
use futures_lite::*;
use smol::Task;
/// An event on the chat server. /// An event on the chat server.
enum Event { enum Event {
@ -79,7 +76,7 @@ async fn read_messages(sender: Sender<Event>, client: Arc<Async<TcpStream>>) ->
} }
fn main() -> io::Result<()> { fn main() -> io::Result<()> {
block_on(async { smol::run(async {
// Create a listener for incoming client connections. // Create a listener for incoming client connections.
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 6000))?; let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 6000))?;

View File

@ -6,8 +6,7 @@
//! cargo run --example ctrl-c //! cargo run --example ctrl-c
//! ``` //! ```
use blocking::block_on; use smol::future;
use futures_lite::*;
fn main() { fn main() {
// Set a handler that sends a message through a channel. // Set a handler that sends a message through a channel.
@ -17,7 +16,7 @@ fn main() {
}; };
ctrlc::set_handler(handle).unwrap(); ctrlc::set_handler(handle).unwrap();
block_on(async { smol::run(async {
println!("Waiting for Ctrl-C..."); println!("Waiting for Ctrl-C...");
// Receive a message that indicates the Ctrl-C signal occurred. // Receive a message that indicates the Ctrl-C signal occurred.

28
examples/get-request.rs Normal file
View File

@ -0,0 +1,28 @@
//! Connect to an HTTP website, make a GET request, and pipes the response to the standard output.
//!
//! Run with:
//!
//! ```
//! cargo run --example get-request
//! ```
use smol::{io, prelude::*, Async, Unblock};
use std::net::{TcpStream, ToSocketAddrs};
fn main() -> io::Result<()> {
smol::run(async {
// Connect to http://example.com
let mut addrs = smol::unblock!(("example.com", 80).to_socket_addrs())?;
let addr = addrs.next().unwrap();
let mut stream = Async::<TcpStream>::connect(addr).await?;
// Send an HTTP GET request.
let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
stream.write_all(req).await?;
// Read the response and pipe it to the standard output.
let mut stdout = Unblock::new(std::io::stdout());
io::copy(&stream, &mut stdout).await?;
Ok(())
})
}

View File

@ -6,20 +6,16 @@
//! cargo run --example hyper-client //! cargo run --example hyper-client
//! ``` //! ```
use std::io;
use std::net::Shutdown; use std::net::Shutdown;
use std::net::{TcpStream, ToSocketAddrs}; use std::net::{TcpStream, ToSocketAddrs};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use anyhow::{bail, Context as _, Error, Result}; use anyhow::{bail, Context as _, Error, Result};
use async_io::Async;
use async_native_tls::TlsStream; use async_native_tls::TlsStream;
use blocking::{block_on, unblock};
use futures_lite::*;
use http::Uri; use http::Uri;
use hyper::{Body, Client, Request, Response}; use hyper::{Body, Client, Request, Response};
use smol::Task; use smol::{io, prelude::*, Async, Task};
/// Sends a request and fetches the response. /// Sends a request and fetches the response.
async fn fetch(req: Request<Body>) -> Result<Response<Body>> { async fn fetch(req: Request<Body>) -> Result<Response<Body>> {
@ -31,7 +27,7 @@ async fn fetch(req: Request<Body>) -> Result<Response<Body>> {
} }
fn main() -> Result<()> { fn main() -> Result<()> {
block_on(async { smol::run(async {
// Create a request. // Create a request.
let req = Request::get("https://www.rust-lang.org").body(Body::empty())?; let req = Request::get("https://www.rust-lang.org").body(Body::empty())?;
@ -85,7 +81,7 @@ impl hyper::service::Service<Uri> for SmolConnector {
let socket_addr = { let socket_addr = {
let host = host.to_string(); let host = host.to_string();
let port = uri.port_u16().unwrap_or(80); let port = uri.port_u16().unwrap_or(80);
unblock!((host.as_str(), port).to_socket_addrs())? smol::unblock!((host.as_str(), port).to_socket_addrs())?
.next() .next()
.context("cannot resolve address")? .context("cannot resolve address")?
}; };
@ -97,7 +93,7 @@ impl hyper::service::Service<Uri> for SmolConnector {
let socket_addr = { let socket_addr = {
let host = host.to_string(); let host = host.to_string();
let port = uri.port_u16().unwrap_or(443); let port = uri.port_u16().unwrap_or(443);
unblock!((host.as_str(), port).to_socket_addrs())? smol::unblock!((host.as_str(), port).to_socket_addrs())?
.next() .next()
.context("cannot resolve address")? .context("cannot resolve address")?
}; };

View File

@ -13,20 +13,15 @@
//! //!
//! Refer to `README.md` to see how to the TLS certificate was generated. //! Refer to `README.md` to see how to the TLS certificate was generated.
use std::io; use std::net::{Shutdown, TcpListener, TcpStream};
use std::net::Shutdown;
use std::net::{TcpListener, TcpStream};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use async_io::Async;
use async_native_tls::{Identity, TlsAcceptor, TlsStream}; use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use blocking::block_on;
use futures_lite::*;
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Request, Response, Server}; use hyper::{Body, Request, Response, Server};
use smol::Task; use smol::{future, io, prelude::*, Async, Task};
/// Serves a request and returns a response. /// Serves a request and returns a response.
async fn serve(req: Request<Body>, host: String) -> Result<Response<Body>> { async fn serve(req: Request<Body>, host: String) -> Result<Response<Body>> {
@ -61,7 +56,7 @@ fn main() -> Result<()> {
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?); let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Start HTTP and HTTPS servers. // Start HTTP and HTTPS servers.
block_on(async { smol::run(async {
let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None); let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None);
let https = listen( let https = listen(
Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?, Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?,
@ -99,11 +94,12 @@ impl hyper::server::accept::Accept for SmolListener {
type Error = Error; type Error = Error;
fn poll_accept( fn poll_accept(
mut self: Pin<&mut Self>, self: Pin<&mut Self>,
cx: &mut Context, cx: &mut Context,
) -> Poll<Option<Result<Self::Conn, Self::Error>>> { ) -> Poll<Option<Result<Self::Conn, Self::Error>>> {
let poll = Pin::new(&mut self.listener.incoming()).poll_next(cx); let incoming = self.listener.incoming();
let stream = ready!(poll).unwrap()?; smol::pin!(incoming);
let stream = smol::ready!(incoming.poll_next(cx)).unwrap()?;
let stream = match &self.tls { let stream = match &self.tls {
None => SmolStream::Plain(stream), None => SmolStream::Plain(stream),
@ -152,7 +148,7 @@ impl tokio::io::AsyncRead for SmolStream {
SmolStream::Plain(s) => return Pin::new(s).poll_read(cx, buf), SmolStream::Plain(s) => return Pin::new(s).poll_read(cx, buf),
SmolStream::Tls(s) => return Pin::new(s).poll_read(cx, buf), SmolStream::Tls(s) => return Pin::new(s).poll_read(cx, buf),
SmolStream::Handshake(f) => { SmolStream::Handshake(f) => {
let s = ready!(f.as_mut().poll(cx))?; let s = smol::ready!(f.as_mut().poll(cx))?;
*self = SmolStream::Tls(s); *self = SmolStream::Tls(s);
} }
} }
@ -171,7 +167,7 @@ impl tokio::io::AsyncWrite for SmolStream {
SmolStream::Plain(s) => return Pin::new(s).poll_write(cx, buf), SmolStream::Plain(s) => return Pin::new(s).poll_write(cx, buf),
SmolStream::Tls(s) => return Pin::new(s).poll_write(cx, buf), SmolStream::Tls(s) => return Pin::new(s).poll_write(cx, buf),
SmolStream::Handshake(f) => { SmolStream::Handshake(f) => {
let s = ready!(f.as_mut().poll(cx))?; let s = smol::ready!(f.as_mut().poll(cx))?;
*self = SmolStream::Tls(s); *self = SmolStream::Tls(s);
} }
} }

View File

@ -9,11 +9,9 @@
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
fn main() -> std::io::Result<()> { fn main() -> std::io::Result<()> {
use std::ffi::OsString; use std::ffi::OsString;
use std::io;
use async_io::Async;
use blocking::block_on;
use inotify::{EventMask, Inotify, WatchMask}; use inotify::{EventMask, Inotify, WatchMask};
use smol::{io, Async};
type Event = (OsString, EventMask); type Event = (OsString, EventMask);
@ -34,7 +32,7 @@ fn main() -> std::io::Result<()> {
} }
} }
block_on(async { smol::run(async {
// Watch events in the current directory. // Watch events in the current directory.
let mut inotify = Async::new(Inotify::init()?)?; let mut inotify = Async::new(Inotify::init()?)?;
inotify.get_mut().add_watch(".", WatchMask::ALL_EVENTS)?; inotify.get_mut().add_watch(".", WatchMask::ALL_EVENTS)?;

View File

@ -8,12 +8,10 @@
#[cfg(target_os = "linux")] #[cfg(target_os = "linux")]
fn main() -> std::io::Result<()> { fn main() -> std::io::Result<()> {
use std::io;
use std::os::unix::io::AsRawFd; use std::os::unix::io::AsRawFd;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use async_io::Async; use smol::{io, Async};
use blocking::block_on;
use timerfd::{SetTimeFlags, TimerFd, TimerState}; use timerfd::{SetTimeFlags, TimerFd, TimerState};
/// Converts a [`nix::Error`] into [`std::io::Error`]. /// Converts a [`nix::Error`] into [`std::io::Error`].
@ -37,7 +35,7 @@ fn main() -> std::io::Result<()> {
Ok(()) Ok(())
} }
block_on(async { smol::run(async {
let start = Instant::now(); let start = Instant::now();
println!("Sleeping..."); println!("Sleeping...");

View File

@ -1,9 +1,6 @@
//! Demonstrates how to use `async-std`, `tokio`, `surf`, and `request`. //! Demonstrates how to use `async-std`, `tokio`, `surf`, and `request`.
//! //!
//! There is an optional feature for seamless integration with crates depending on tokio. //! For compatibility with tokio-based libraries, enable the `tokio02` feature flag:
//! It creates a global tokio runtime and sets up its context inside smol.
//!
//! Enable the feature as follows:
//! //!
//! ```toml //! ```toml
//! [dependencies] //! [dependencies]
@ -19,10 +16,9 @@
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use anyhow::{Error, Result}; use anyhow::{Error, Result};
use blocking::block_on;
fn main() -> Result<()> { fn main() -> Result<()> {
block_on(async { smol::run(async {
// Sleep using async-std. // Sleep using async-std.
let start = Instant::now(); let start = Instant::now();
println!("Sleeping using async-std..."); println!("Sleeping using async-std...");

View File

@ -9,9 +9,7 @@
use std::net::{TcpStream, ToSocketAddrs}; use std::net::{TcpStream, ToSocketAddrs};
use anyhow::{bail, Context as _, Result}; use anyhow::{bail, Context as _, Result};
use async_io::Async; use smol::{prelude::*, Async};
use blocking::{block_on, unblock};
use futures_lite::*;
use url::Url; use url::Url;
/// Sends a GET request and fetches the response. /// Sends a GET request and fetches the response.
@ -35,7 +33,7 @@ async fn fetch(addr: &str) -> Result<Vec<u8>> {
// Connect to the host. // Connect to the host.
let socket_addr = { let socket_addr = {
let host = host.clone(); let host = host.clone();
unblock!((host.as_str(), port).to_socket_addrs())? smol::unblock!((host.as_str(), port).to_socket_addrs())?
.next() .next()
.context("cannot resolve address")? .context("cannot resolve address")?
}; };
@ -61,7 +59,7 @@ async fn fetch(addr: &str) -> Result<Vec<u8>> {
} }
fn main() -> Result<()> { fn main() -> Result<()> {
block_on(async { smol::run(async {
let addr = "https://www.rust-lang.org"; let addr = "https://www.rust-lang.org";
let resp = fetch(addr).await?; let resp = fetch(addr).await?;
println!("{}", String::from_utf8_lossy(&resp)); println!("{}", String::from_utf8_lossy(&resp));

View File

@ -16,11 +16,8 @@
use std::net::{TcpListener, TcpStream}; use std::net::{TcpListener, TcpStream};
use anyhow::Result; use anyhow::Result;
use async_io::Async;
use async_native_tls::{Identity, TlsAcceptor}; use async_native_tls::{Identity, TlsAcceptor};
use blocking::block_on; use smol::{future, prelude::*, Async, Task};
use futures_lite::*;
use smol::Task;
const RESPONSE: &[u8] = br#" const RESPONSE: &[u8] = br#"
HTTP/1.1 200 OK HTTP/1.1 200 OK
@ -83,7 +80,7 @@ fn main() -> Result<()> {
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?); let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Start HTTP and HTTPS servers. // Start HTTP and HTTPS servers.
block_on(async { smol::run(async {
let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None); let http = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 8000))?, None);
let https = listen( let https = listen(
Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?, Async::<TcpListener>::bind(([127, 0, 0, 1], 8001))?,

View File

@ -14,12 +14,10 @@
use std::net::TcpStream; use std::net::TcpStream;
use async_io::Async; use smol::{future, io, Async, Unblock};
use blocking::{block_on, Unblock};
use futures_lite::*;
fn main() -> io::Result<()> { fn main() -> io::Result<()> {
block_on(async { smol::run(async {
// Create async stdin and stdout handles. // Create async stdin and stdout handles.
let stdin = Unblock::new(std::io::stdin()); let stdin = Unblock::new(std::io::stdin());
let mut stdout = Unblock::new(std::io::stdout()); let mut stdout = Unblock::new(std::io::stdout());

View File

@ -14,10 +14,7 @@
use std::net::{TcpListener, TcpStream}; use std::net::{TcpListener, TcpStream};
use async_io::Async; use smol::{io, Async, Task};
use blocking::block_on;
use futures_lite::*;
use smol::Task;
/// Echoes messages from the client back to it. /// Echoes messages from the client back to it.
async fn echo(stream: Async<TcpStream>) -> io::Result<()> { async fn echo(stream: Async<TcpStream>) -> io::Result<()> {
@ -26,7 +23,7 @@ async fn echo(stream: Async<TcpStream>) -> io::Result<()> {
} }
fn main() -> io::Result<()> { fn main() -> io::Result<()> {
block_on(async { smol::run(async {
// Create a listener. // Create a listener.
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 7000))?; let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 7000))?;
println!("Listening on {}", listener.get_ref().local_addr()?); println!("Listening on {}", listener.get_ref().local_addr()?);

View File

@ -15,10 +15,8 @@
use std::net::TcpStream; use std::net::TcpStream;
use anyhow::Result; use anyhow::Result;
use async_io::Async;
use async_native_tls::{Certificate, TlsConnector}; use async_native_tls::{Certificate, TlsConnector};
use blocking::{block_on, Unblock}; use smol::{future, io, Async, Unblock};
use futures_lite::*;
fn main() -> Result<()> { fn main() -> Result<()> {
// Initialize TLS with the local certificate. // Initialize TLS with the local certificate.
@ -26,7 +24,7 @@ fn main() -> Result<()> {
builder.add_root_certificate(Certificate::from_pem(include_bytes!("certificate.pem"))?); builder.add_root_certificate(Certificate::from_pem(include_bytes!("certificate.pem"))?);
let tls = TlsConnector::from(builder); let tls = TlsConnector::from(builder);
block_on(async { smol::run(async {
// Create async stdin and stdout handles. // Create async stdin and stdout handles.
let stdin = Unblock::new(std::io::stdin()); let stdin = Unblock::new(std::io::stdin());
let mut stdout = Unblock::new(std::io::stdout()); let mut stdout = Unblock::new(std::io::stdout());

View File

@ -15,11 +15,8 @@
use std::net::{TcpListener, TcpStream}; use std::net::{TcpListener, TcpStream};
use anyhow::Result; use anyhow::Result;
use async_io::Async;
use async_native_tls::{Identity, TlsAcceptor, TlsStream}; use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use blocking::block_on; use smol::{io, Async, Task};
use futures_lite::*;
use smol::Task;
/// Echoes messages from the client back to it. /// Echoes messages from the client back to it.
async fn echo(stream: TlsStream<Async<TcpStream>>) -> Result<()> { async fn echo(stream: TlsStream<Async<TcpStream>>) -> Result<()> {
@ -33,7 +30,7 @@ fn main() -> Result<()> {
let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?; let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?;
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?); let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
block_on(async { smol::run(async {
// Create a listener. // Create a listener.
let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 7001))?; let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 7001))?;
println!("Listening on {}", listener.get_ref().local_addr()?); println!("Listening on {}", listener.get_ref().local_addr()?);

View File

@ -10,11 +10,9 @@
fn main() -> std::io::Result<()> { fn main() -> std::io::Result<()> {
use std::os::unix::net::UnixStream; use std::os::unix::net::UnixStream;
use async_io::Async; use smol::{prelude::*, Async};
use blocking::block_on;
use futures_lite::*;
block_on(async { smol::run(async {
// Create a Unix stream that receives a byte on each signal occurrence. // Create a Unix stream that receives a byte on each signal occurrence.
let (a, mut b) = Async::<UnixStream>::pair()?; let (a, mut b) = Async::<UnixStream>::pair()?;
signal_hook::pipe::register(signal_hook::SIGINT, a)?; signal_hook::pipe::register(signal_hook::SIGINT, a)?;

View File

@ -10,7 +10,6 @@ use std::collections::{HashSet, VecDeque};
use anyhow::Result; use anyhow::Result;
use async_channel::{bounded, Sender}; use async_channel::{bounded, Sender};
use blocking::block_on;
use scraper::{Html, Selector}; use scraper::{Html, Selector};
use smol::Task; use smol::Task;
@ -35,7 +34,7 @@ fn links(body: String) -> Vec<String> {
} }
fn main() -> Result<()> { fn main() -> Result<()> {
block_on(async { smol::run(async {
let mut seen = HashSet::new(); let mut seen = HashSet::new();
let mut queue = VecDeque::new(); let mut queue = VecDeque::new();
seen.insert(ROOT.to_string()); seen.insert(ROOT.to_string());

View File

@ -17,12 +17,10 @@ use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use anyhow::{bail, Context as _, Result}; use anyhow::{bail, Context as _, Result};
use async_io::Async;
use async_native_tls::{Certificate, TlsConnector, TlsStream}; use async_native_tls::{Certificate, TlsConnector, TlsStream};
use async_tungstenite::WebSocketStream; use async_tungstenite::WebSocketStream;
use blocking::{block_on, unblock};
use futures::sink::{Sink, SinkExt}; use futures::sink::{Sink, SinkExt};
use futures_lite::*; use smol::{prelude::*, Async};
use tungstenite::handshake::client::Response; use tungstenite::handshake::client::Response;
use tungstenite::Message; use tungstenite::Message;
use url::Url; use url::Url;
@ -37,7 +35,7 @@ async fn connect(addr: &str, tls: TlsConnector) -> Result<(WsStream, Response)>
// Resolve the address. // Resolve the address.
let socket_addr = { let socket_addr = {
let host = host.clone(); let host = host.clone();
unblock!((host.as_str(), port).to_socket_addrs())? smol::unblock!((host.as_str(), port).to_socket_addrs())?
.next() .next()
.context("cannot resolve address")? .context("cannot resolve address")?
}; };
@ -66,7 +64,7 @@ fn main() -> Result<()> {
builder.add_root_certificate(Certificate::from_pem(include_bytes!("certificate.pem"))?); builder.add_root_certificate(Certificate::from_pem(include_bytes!("certificate.pem"))?);
let tls = TlsConnector::from(builder); let tls = TlsConnector::from(builder);
block_on(async { smol::run(async {
// Connect to the server. // Connect to the server.
let (mut stream, resp) = connect("wss://127.0.0.1:9001", tls).await?; let (mut stream, resp) = connect("wss://127.0.0.1:9001", tls).await?;
dbg!(resp); dbg!(resp);

View File

@ -17,13 +17,10 @@ use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use anyhow::{Context as _, Result}; use anyhow::{Context as _, Result};
use async_io::Async;
use async_native_tls::{Identity, TlsAcceptor, TlsStream}; use async_native_tls::{Identity, TlsAcceptor, TlsStream};
use async_tungstenite::WebSocketStream; use async_tungstenite::WebSocketStream;
use blocking::block_on;
use futures::sink::{Sink, SinkExt}; use futures::sink::{Sink, SinkExt};
use futures_lite::*; use smol::{future, prelude::*, Async, Task};
use smol::Task;
use tungstenite::Message; use tungstenite::Message;
/// Echoes messages from the client back to it. /// Echoes messages from the client back to it.
@ -67,7 +64,7 @@ fn main() -> Result<()> {
let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?); let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?);
// Start WS and WSS servers. // Start WS and WSS servers.
block_on(async { smol::run(async {
let ws = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 9000))?, None); let ws = listen(Async::<TcpListener>::bind(([127, 0, 0, 1], 9000))?, None);
let wss = listen( let wss = listen(
Async::<TcpListener>::bind(([127, 0, 0, 1], 9001))?, Async::<TcpListener>::bind(([127, 0, 0, 1], 9001))?,

View File

@ -10,10 +10,7 @@
fn main() -> std::io::Result<()> { fn main() -> std::io::Result<()> {
use std::path::PathBuf; use std::path::PathBuf;
use async_io::Async; use smol::{io, prelude::*, Async, Task, Unblock};
use blocking::{block_on, Unblock};
use futures_lite::*;
use smol::Task;
use tempfile::tempdir; use tempfile::tempdir;
use uds_windows::{UnixListener, UnixStream}; use uds_windows::{UnixListener, UnixStream};
@ -31,7 +28,7 @@ fn main() -> std::io::Result<()> {
let dir = tempdir()?; let dir = tempdir()?;
let path = dir.path().join("socket"); let path = dir.path().join("socket");
block_on(async { smol::run(async {
// Create a listener. // Create a listener.
let listener = Async::new(UnixListener::bind(&path)?)?; let listener = Async::new(UnixListener::bind(&path)?)?;
println!("Listening on {:?}", listener.get_ref().local_addr()?); println!("Listening on {:?}", listener.get_ref().local_addr()?);

View File

@ -1,284 +1,156 @@
//! A small and fast executor. //! A small and fast async runtime.
//! //!
//! This crate runs a global executor thread pool and only has one type, [`Task`]. Despite the //! # Examples
//! trivially simple codebase, this executor and its related crates offer performance and features
//! comparable to more complex frameworks like [tokio].
//! //!
//! Related async crates: //! Connect to an HTTP website, make a GET request, and pipe the response to the standard output:
//! //!
//! * For async I/O and timers, use [`async-io`]. //! ```
//! * For higher-level networking primitives, use [`async-net`]. //! use async_net::TcpStream;
//! * For executors, use [`multitask`]. //! use smol::{io, prelude::*, Unblock};
//! * To call blocking code from async code or the other way around, use [`blocking`].
//! * For async traits and combinators, use [`futures-lite`].
//! //!
//! [`async-io`]: https://docs.rs/async-io //! fn main() -> io::Result<()> {
//! [`async-net`]: https://docs.rs/async-net //! smol::run(async {
//! [`blocking`]: https://docs.rs/blocking //! let mut stream = TcpStream::connect("example.com:80").await?;
//! [`futures-lite`]: https://docs.rs/futures-lite //! let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n";
//! [`multitask`]: https://docs.rs/multitask //! stream.write_all(req).await?;
//! [`tokio`]: https://docs.rs/tokio
//! //!
//! # TCP server //! let mut stdout = Unblock::new(std::io::stdout());
//! //! io::copy(&stream, &mut stdout).await?;
//! A simple TCP server that prints messages received from clients: //! Ok(())
//!
//! ```no_run
//! use async_io::Async;
//! use blocking::{block_on, Unblock};
//! use smol::Task;
//! use std::net::TcpListener;
//!
//! fn main() -> std::io::Result<()> {
//! block_on(async {
//! // Start listening on port 9000.
//! let listener = Async::<TcpListener>::bind(([127, 0, 0, 1], 9000))?;
//!
//! loop {
//! // Accept a new client.
//! let (stream, _) = listener.accept().await?;
//!
//! // Spawn a task handling this client.
//! let task = Task::spawn(async move {
//! // Create an async stdio handle.
//! let mut stdout = Unblock::new(std::io::stdout());
//!
//! // Copy data received from the client into stdout.
//! futures::io::copy(&stream, &mut stdout).await
//! });
//!
//! // Keep running the task in the background.
//! task.detach();
//! }
//! }) //! })
//! } //! }
//! ``` //! ```
//! //!
//! To interact with the server, run `nc 127.0.0.1 9000` and type a few lines of text. //! This example uses [`async-net`] for networking, but you can also use the primitive [`Async`]
//! type. See the [full code][get-request].
//! //!
//! # Examples //! Look inside the [examples] directory for more.
//! //!
//! Look inside the [examples] directory for more: //! [`async-net`]: https://docs.rs/async-net
//! a [web crawler][web-crawler], //! [examples]: https://github.com/stjepang/smol/tree/master/examples
//! a [Ctrl-C handler][ctrl-c], //! [get-request]: https://github.com/stjepang/smol/blob/master/examples/get-request.rs
//! a TCP [client][tcp-client]/[server][tcp-server],
//! a TCP chat [client][chat-client]/[server][chat-server],
//! a TLS [client][tls-client]/[server][tls-server],
//! an HTTP+TLS [client][simple-client]/[server][simple-server],
//! an [async-h1] [client][async-h1-client]/[server][async-h1-server],
//! a [hyper] [client][hyper-client]/[server][hyper-server],
//! and a WebSocket+TLS [client][websocket-client]/[server][websocket-server].
//! //!
//! It's also possible to plug non-async libraries into the runtime: see //! # Compatibility
//! [inotify], [timerfd], [signal-hook], and [uds_windows].
//! //!
//! Finally, there's an [example][other-runtimes] showing how to use smol with //! All async libraries work with smol out of the box.
//! [async-std], [tokio], [surf], and [reqwest]. //!
//! However, [tokio] is generally hostile towards non-tokio libraries, insists on non-standard I/O
//! traits, and deliberately lacks documentation on integration with the larger Rust ecosystem.
//! Fortunately, there are ways around it.
//!
//! Enable the `tokio02` feature flag and [`smol::run()`][`crate::run()`] will create a minimal
//! tokio runtime for its libraries:
//!
//! ```toml
//! [dependencies]
//! smol = { version = "0.2", features = ["tokio02"] }
//! ```
//! //!
//! [examples]: https://github.com/stjepang/smol/tree/master/examples/!
//! [async-h1]: https://docs.rs/async-h1
//! [hyper]: https://docs.rs/hyper
//! [hyper]: https://docs.rs/tokio
//! [async-std]: https://docs.rs/async-std
//! [tokio]: https://docs.rs/tokio //! [tokio]: https://docs.rs/tokio
//! [surf]: https://docs.rs/surf
//! [reqwest]: https://docs.rs/reqwest
//!
//! [async-h1-client]: https://github.com/stjepang/smol/blob/master/examples/async-h1-client.rs
//! [async-h1-server]: https://github.com/stjepang/smol/blob/master/examples/async-h1-server.rs
//! [chat-client]: https://github.com/stjepang/smol/blob/master/examples/chat-client.rs
//! [chat-server]: https://github.com/stjepang/smol/blob/master/examples/chat-server.rs
//! [ctrl-c]: https://github.com/stjepang/smol/blob/master/examples/ctrl-c.rs
//! [hyper-client]: https://github.com/stjepang/smol/blob/master/examples/hyper-client.rs
//! [hyper-server]: https://github.com/stjepang/smol/blob/master/examples/hyper-server.rs
//! [inotify]: https://github.com/stjepang/smol/blob/master/examples/linux-inotify.rs
//! [other-runtimes]: https://github.com/stjepang/smol/blob/master/examples/other-runtimes.rs
//! [signal-hook]: https://github.com/stjepang/smol/blob/master/examples/unix-signal.rs
//! [simple-client]: https://github.com/stjepang/smol/blob/master/examples/simple-client.rs
//! [simple-server]: https://github.com/stjepang/smol/blob/master/examples/simple-server.rs
//! [tcp-client]: https://github.com/stjepang/smol/blob/master/examples/tcp-client.rs
//! [tcp-server]: https://github.com/stjepang/smol/blob/master/examples/tcp-server.rs
//! [timerfd]: https://github.com/stjepang/smol/blob/master/examples/linux-timerfd.rs
//! [tls-client]: https://github.com/stjepang/smol/blob/master/examples/tls-client.rs
//! [tls-server]: https://github.com/stjepang/smol/blob/master/examples/tls-server.rs
//! [uds_windows]: https://github.com/stjepang/smol/blob/master/examples/windows-uds.rs
//! [web-crawler]: https://github.com/stjepang/smol/blob/master/examples/web-crawler.rs
//! [websocket-client]: https://github.com/stjepang/smol/blob/master/examples/websocket-client.rs
//! [websocket-server]: https://github.com/stjepang/smol/blob/master/examples/websocket-server.rs
#![forbid(unsafe_code)] #![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::future::Future; use std::future::Future;
use std::panic::catch_unwind;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use multitask::Executor; use async_executor::{Executor, LocalExecutor};
use once_cell::sync::Lazy; use easy_parallel::Parallel;
/// A spawned future. #[doc(inline)]
pub use {
async_executor::Task,
async_io::Async,
async_io::Timer,
blocking::{unblock, Unblock},
futures_lite::{future, io, stream},
futures_lite::{pin, ready},
};
/// Async traits and their extensions.
/// ///
/// Tasks are also futures themselves and yield the output of the spawned future. /// # Examples
/// ///
/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit /// ```
/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method. /// use smol::prelude::*;
/// ```
pub mod prelude {
#[doc(no_inline)]
pub use futures_lite::{
future::Future,
io::{AsyncBufRead, AsyncBufReadExt},
io::{AsyncRead, AsyncReadExt},
io::{AsyncSeek, AsyncSeekExt},
io::{AsyncWrite, AsyncWriteExt},
stream::{Stream, StreamExt},
};
}
/// Starts an executor and runs the future on it.
/// ///
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic. /// This function runs two executors at the same time:
///
/// 1. The current thread runs a [`LocalExecutor`] and the main `future` on it.
/// 2. A thread pool runs an [`Executor`] until the main `future` completes.
///
/// The number of spawned threads matches the number of logical CPU cores on the system.
/// ///
/// # Examples /// # Examples
/// ///
/// ``` /// ```
/// use smol::Task; /// use smol::Task;
/// ///
/// # blocking::block_on(async { /// smol::run(async {
/// // Spawn a task onto the work-stealing executor. /// let task = Task::spawn(async {
/// let task = Task::spawn(async { /// println!("Hello world");
/// println!("Hello from a task!"); /// });
/// 1 + 2 /// task.await;
/// }); /// })
///
/// // Wait for the task to complete.
/// assert_eq!(task.await, 3);
/// # });
/// ``` /// ```
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] pub fn run<T>(future: impl Future<Output = T>) -> T {
#[derive(Debug)] setup(num_cpus::get(), future)
pub struct Task<T>(multitask::Task<T>);
impl<T> Task<T> {
/// Spawns a future.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// # blocking::block_on(async {
/// let task = Task::spawn(async { 1 + 2 });
/// assert_eq!(task.await, 3);
/// # });
/// ```
pub fn spawn<F>(future: F) -> Task<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
static EXECUTOR: Lazy<Executor> = Lazy::new(|| {
for _ in 0..num_cpus::get().max(1) {
thread::spawn(|| {
enter(|| {
let (p, u) = async_io::parking::pair();
let ticker = EXECUTOR.ticker(move || u.unpark());
loop {
if let Ok(false) = catch_unwind(|| ticker.tick()) {
p.park();
}
}
})
});
}
Executor::new()
});
Task(EXECUTOR.spawn(future))
}
/// Detaches the task to let it keep running in the background.
///
/// # Examples
///
/// ```no_run
/// use async_io::Timer;
/// use smol::Task;
/// use std::time::Duration;
///
/// # blocking::block_on(async {
/// Task::spawn(async {
/// loop {
/// println!("I'm a daemon task looping forever.");
/// Timer::new(Duration::from_secs(1)).await;
/// }
/// })
/// .detach();
/// # })
/// ```
pub fn detach(self) {
self.0.detach();
}
/// Cancels the task and waits for it to stop running.
///
/// Returns the task's output if it was completed just before it got canceled, or [`None`] if
/// it didn't complete.
///
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
/// canceling because it also waits for the task to stop running.
///
/// # Examples
///
/// ```
/// use async_io::Timer;
/// use smol::Task;
/// use std::time::Duration;
///
/// # blocking::block_on(async {
/// let task = Task::spawn(async {
/// loop {
/// println!("Even though I'm in an infinite loop, you can still cancel me!");
/// Timer::new(Duration::from_secs(1)).await;
/// }
/// });
///
/// Timer::new(Duration::from_secs(3)).await;
/// task.cancel().await;
/// # })
/// ```
pub async fn cancel(self) -> Option<T> {
self.0.cancel().await
}
} }
impl<T> Future for Task<T> { #[cfg(not(feature = "tokio02"))]
type Output = T; fn setup<T>(num_threads: usize, future: impl Future<Output = T>) -> T {
let ex = Executor::new();
let local_ex = LocalExecutor::new();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // A channel that coordinates shutdown when the main future completes.
Pin::new(&mut self.0).poll(cx) let (trigger, shutdown) = async_channel::unbounded::<()>();
} let future = async move {
let _trigger = trigger; // Dropped at the end of this async block.
future.await
};
Parallel::new()
.each(0..num_threads, |_| ex.run(shutdown.recv()))
.finish(|| ex.enter(|| local_ex.run(future)))
.1
} }
/// Enters the tokio context if the `tokio` feature is enabled. #[cfg(feature = "tokio02")]
fn enter<T>(f: impl FnOnce() -> T) -> T { fn setup<T>(num_threads: usize, future: impl Future<Output = T>) -> T {
#[cfg(not(feature = "tokio02"))] let ex = Executor::new();
return f(); let local_ex = LocalExecutor::new();
#[cfg(feature = "tokio02")] // A channel that signals shutdown to the thread pool when the main future completes.
{ let (s, shutdown) = async_channel::unbounded::<()>();
use std::cell::Cell; let future = async move {
use tokio::runtime::Runtime; let _s = s; // Drops sender at the end of this async block.
future.await
};
thread_local! { // A minimal tokio runtime.
/// The level of nested `enter` calls we are in, to ensure that the outermost always let mut rt = tokio::runtime::Builder::new()
/// has a runtime spawned. .enable_all()
static NESTING: Cell<usize> = Cell::new(0); .basic_scheduler()
} .build()
.expect("cannot start tokio runtime");
let handle = rt.handle().clone();
/// The global tokio runtime. Parallel::new()
static RT: Lazy<Runtime> = Lazy::new(|| Runtime::new().expect("cannot initialize tokio")); .add(|| ex.enter(|| rt.block_on(shutdown.recv())))
.each(0..num_threads, |_| handle.enter(|| ex.run(shutdown.recv())))
NESTING.with(|nesting| { .finish(|| handle.enter(|| ex.enter(|| local_ex.run(future))))
let res = if nesting.get() == 0 { .1
nesting.replace(1);
RT.enter(f)
} else {
nesting.replace(nesting.get() + 1);
f()
};
nesting.replace(nesting.get() - 1);
res
})
}
} }