diff --git a/Cargo.toml b/Cargo.toml index 7877b6f..c58c8a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,26 +13,20 @@ categories = ["asynchronous", "concurrency", "network-programming"] readme = "README.md" [features] -# Optional feature for seamless integration with crates depending on tokio. -# It creates a global tokio runtime and sets up its context inside smol. -# -# Enable the feature as follows: -# ``` -# [dependencies] -# smol = { version = "0.2", features = ["tokio02"] } -# ``` -tokio02 = ["tokio"] +default = [] +tokio02 = ["tokio"] # Optional feature for compatibility with tokio-based libraries. [dependencies] -async-io = "0.1.3" -blocking = "0.4.7" -futures-lite = "0.1.5" -multitask = "0.2.0" +async-channel = "1.1.1" +async-executor = "0.1.1" +async-io = "0.1.5" +blocking = "0.5.0" +easy-parallel = "3.1.0" +futures-lite = "0.1.8" num_cpus = "1.13.0" -once_cell = "1.4.0" [dependencies.tokio] -version = "0.2.21" +version = "0.2.22" default-features = false features = ["rt-threaded"] optional = true @@ -43,6 +37,7 @@ async-channel = "1.1.1" async-dup = "1.2.1" async-h1 = "2.1.0" async-native-tls = "0.3.3" +async-net = "0.1.1" async-std = "1.6.2" async-tungstenite = { version = "0.7.1", features = ["async-native-tls"] } base64 = "0.12.3" @@ -56,11 +51,13 @@ num_cpus = "1.13.0" reqwest = "0.10.6" scraper = "0.12.0" signal-hook = "0.1.16" +smol = { path = ".", features = ["tokio02"] } surf = { version = "2.0.0-alpha.4", default-features = false, features = ["h1-client"] } tempfile = "3.1.0" -tokio = { version = "0.2.21", default-features = false } +tide = "0.12.0" tungstenite = "0.11.0" url = "2.1.1" +warp = "0.2.4" [target.'cfg(target_os = "linux")'.dev-dependencies] inotify = { version = "0.8.3", default-features = false } diff --git a/README.md b/README.md index 57752ed..dcf7323 100644 --- a/README.md +++ b/README.md @@ -11,87 +11,55 @@ https://docs.rs/smol) [![Chat](https://img.shields.io/discord/701824908866617385.svg?logo=discord)]( https://discord.gg/x6m5Vvt) -A small and fast executor. +A small and fast async runtime. -Reading the [docs] or looking at the [examples] is a great way to start learning -async Rust. +## Examples -[docs]: https://docs.rs/smol -[examples]: ./examples +Connect to an HTTP website, make a GET request, and pipe the response to the standard output: -Async I/O is implemented using [epoll] on Linux/Android, [kqueue] on -macOS/iOS/BSD, and [wepoll] on Windows. +```rust +use async_net::TcpStream; +use smol::{io, prelude::*, Unblock}; -What makes smol different from [async-std] and [tokio]? -Read this [blog post](https://stjepang.github.io/2020/04/03/why-im-building-a-new-async-runtime.html). +fn main() -> io::Result<()> { + smol::run(async { + let mut stream = TcpStream::connect("example.com:80").await?; + let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n"; + stream.write_all(req).await?; -[epoll]: https://en.wikipedia.org/wiki/Epoll -[kqueue]: https://en.wikipedia.org/wiki/Kqueue -[wepoll]: https://github.com/piscisaureus/wepoll + let mut stdout = Unblock::new(std::io::stdout()); + io::copy(&stream, &mut stdout).await?; + Ok(()) + }) +} +``` -## Features +This example uses [`async-net`] for networking, but you can also use the primitive `Async` +type. See the [full code][get-request]. -* Async TCP, UDP, Unix domain sockets, and custom file descriptors. -* Thread-local executor for `!Send` futures. -* Work-stealing executor that adapts to uneven workloads. -* Blocking executor for files, processes, and standard I/O. -* Tasks that support cancellation. -* Userspace timers. +Look inside the [examples] directory for more. + +[`async-net`]: https://docs.rs/async-net +[examples]: https://github.com/stjepang/smol/tree/master/examples +[get-request]: https://github.com/stjepang/smol/blob/master/examples/get-request.rs ## Compatibility -See [this example](./examples/other-runtimes.rs) for how to use smol with -[async-std], [tokio], [surf], and [reqwest]. +All async libraries work with smol out of the box. -There is an optional feature for seamless integration with crates depending -on tokio. It creates a global tokio runtime and sets up its context inside smol. -Enable the feature as follows: +However, [tokio] is a special case due to its generally hostile attitude towards non-tokio +libraries, insistence on non-standard I/O traits, and lack of documentation on integration +with the larger Rust ecosystem. Fortunately, there are ways around it. + +Enable the `tokio02` feature flag and `smol::run()` will create a minimal +tokio runtime for its libraries: ```toml [dependencies] smol = { version = "0.2", features = ["tokio02"] } ``` -[async-std]: https://docs.rs/async-std [tokio]: https://docs.rs/tokio -[surf]: https://docs.rs/surf -[reqwest]: https://docs.rs/reqwest - -## Documentation - -You can read the docs [here][docs], or generate them on your own. - -If you'd like to explore the implementation in more depth, the following -command generates docs for the whole crate, including private modules: - -``` -cargo doc --document-private-items --no-deps --open -``` - -[docs]: https://docs.rs/smol - -## Other crates - -My personal crate recommendation list: - -* Channels, pipes, and mutexes: [piper] -* HTTP clients: [surf], [isahc], [reqwest] -* HTTP servers: [async-h1], [hyper] -* WebSockets: [async-tungstenite] -* TLS authentication: [async-native-tls] -* Signals: [ctrlc], [signal-hook] - -[piper]: https://docs.rs/piper -[surf]: https://docs.rs/surf -[isahc]: https://docs.rs/isahc -[reqwest]: https://docs.rs/reqwest -[async-h1]: https://docs.rs/async-h1 -[hyper]: https://docs.rs/hyper -[async-tungstenite]: https://docs.rs/async-tungstenite -[async-native-tls]: https://docs.rs/async-native-tls -[native-tls]: https://docs.rs/native-tls -[ctrlc]: https://docs.rs/ctrlc -[signal-hook]: https://docs.rs/signal-hook ## TLS certificate diff --git a/examples/async-h1-client.rs b/examples/async-h1-client.rs index 09a5bc5..fe04b4e 100644 --- a/examples/async-h1-client.rs +++ b/examples/async-h1-client.rs @@ -10,7 +10,7 @@ use std::net::{TcpStream, ToSocketAddrs}; use anyhow::{bail, Context as _, Error, Result}; use http_types::{Method, Request, Response}; -use smol::{block_on, io::AsyncReadExt, unblock, Async}; +use smol::{prelude::*, Async}; use url::Url; /// Sends a request and fetches the response. @@ -25,7 +25,7 @@ async fn fetch(req: Request) -> Result { // Connect to the host. let socket_addr = { let host = host.clone(); - unblock!((host.as_str(), port).to_socket_addrs())? + smol::unblock!((host.as_str(), port).to_socket_addrs())? .next() .context("cannot resolve address")? }; @@ -45,7 +45,7 @@ async fn fetch(req: Request) -> Result { } fn main() -> Result<()> { - block_on(async { + smol::run(async { // Create a request. let addr = "https://www.rust-lang.org"; let req = Request::new(Method::Get, Url::parse(addr)?); diff --git a/examples/async-h1-server.rs b/examples/async-h1-server.rs index df9dc7a..8ab13a2 100644 --- a/examples/async-h1-server.rs +++ b/examples/async-h1-server.rs @@ -18,7 +18,7 @@ use std::net::TcpListener; use anyhow::Result; use async_native_tls::{Identity, TlsAcceptor}; use http_types::{Request, Response, StatusCode}; -use smol::{block_on, future, Async, Task}; +use smol::{future, Async, Task}; /// Serves a request and returns a response. async fn serve(req: Request) -> http_types::Result { @@ -83,7 +83,7 @@ fn main() -> Result<()> { let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?); // Start HTTP and HTTPS servers. - block_on(async { + smol::run(async { let http = listen(Async::::bind(([127, 0, 0, 1], 8000))?, None); let https = listen( Async::::bind(([127, 0, 0, 1], 8001))?, diff --git a/examples/chat-client.rs b/examples/chat-client.rs index 8b51c81..a0dbab9 100644 --- a/examples/chat-client.rs +++ b/examples/chat-client.rs @@ -14,10 +14,10 @@ use std::net::TcpStream; -use smol::{block_on, future, io, Async, Unblock}; +use smol::{future, io, Async, Unblock}; fn main() -> io::Result<()> { - block_on(async { + smol::run(async { // Connect to the server and create async stdin and stdout. let stream = Async::::connect(([127, 0, 0, 1], 6000)).await?; let stdin = Unblock::new(std::io::stdin()); diff --git a/examples/chat-server.rs b/examples/chat-server.rs index 3668ff3..e671f28 100644 --- a/examples/chat-server.rs +++ b/examples/chat-server.rs @@ -17,7 +17,7 @@ use std::net::{SocketAddr, TcpListener, TcpStream}; use async_channel::{bounded, Receiver, Sender}; use async_dup::Arc; -use smol::{block_on, io, io::AsyncBufReadExt, io::AsyncWriteExt, stream::StreamExt, Async, Task}; +use smol::{io, prelude::*, Async, Task}; /// An event on the chat server. enum Event { @@ -76,7 +76,7 @@ async fn read_messages(sender: Sender, client: Arc>) -> } fn main() -> io::Result<()> { - block_on(async { + smol::run(async { // Create a listener for incoming client connections. let listener = Async::::bind(([127, 0, 0, 1], 6000))?; diff --git a/examples/ctrl-c.rs b/examples/ctrl-c.rs index d593c2c..523c792 100644 --- a/examples/ctrl-c.rs +++ b/examples/ctrl-c.rs @@ -6,7 +6,7 @@ //! cargo run --example ctrl-c //! ``` -use smol::{block_on, future}; +use smol::future; fn main() { // Set a handler that sends a message through a channel. @@ -16,7 +16,7 @@ fn main() { }; ctrlc::set_handler(handle).unwrap(); - block_on(async { + smol::run(async { println!("Waiting for Ctrl-C..."); // Receive a message that indicates the Ctrl-C signal occurred. diff --git a/examples/get-request.rs b/examples/get-request.rs new file mode 100644 index 0000000..4f8ceae --- /dev/null +++ b/examples/get-request.rs @@ -0,0 +1,28 @@ +//! Connect to an HTTP website, make a GET request, and pipes the response to the standard output. +//! +//! Run with: +//! +//! ``` +//! cargo run --example get-request +//! ``` + +use smol::{io, prelude::*, Async, Unblock}; +use std::net::{TcpStream, ToSocketAddrs}; + +fn main() -> io::Result<()> { + smol::run(async { + // Connect to http://example.com + let mut addrs = smol::unblock!(("example.com", 80).to_socket_addrs())?; + let addr = addrs.next().unwrap(); + let mut stream = Async::::connect(addr).await?; + + // Send an HTTP GET request. + let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n"; + stream.write_all(req).await?; + + // Read the response and pipe it to the standard output. + let mut stdout = Unblock::new(std::io::stdout()); + io::copy(&stream, &mut stdout).await?; + Ok(()) + }) +} diff --git a/examples/hyper-client.rs b/examples/hyper-client.rs index 1604d05..7263e61 100644 --- a/examples/hyper-client.rs +++ b/examples/hyper-client.rs @@ -15,10 +15,7 @@ use anyhow::{bail, Context as _, Error, Result}; use async_native_tls::TlsStream; use http::Uri; use hyper::{Body, Client, Request, Response}; -use smol::{ - block_on, future::Future, io, io::AsyncRead, io::AsyncWrite, stream::StreamExt, unblock, Async, - Task, -}; +use smol::{io, prelude::*, Async, Task}; /// Sends a request and fetches the response. async fn fetch(req: Request) -> Result> { @@ -30,7 +27,7 @@ async fn fetch(req: Request) -> Result> { } fn main() -> Result<()> { - block_on(async { + smol::run(async { // Create a request. let req = Request::get("https://www.rust-lang.org").body(Body::empty())?; @@ -84,7 +81,7 @@ impl hyper::service::Service for SmolConnector { let socket_addr = { let host = host.to_string(); let port = uri.port_u16().unwrap_or(80); - unblock!((host.as_str(), port).to_socket_addrs())? + smol::unblock!((host.as_str(), port).to_socket_addrs())? .next() .context("cannot resolve address")? }; @@ -96,7 +93,7 @@ impl hyper::service::Service for SmolConnector { let socket_addr = { let host = host.to_string(); let port = uri.port_u16().unwrap_or(443); - unblock!((host.as_str(), port).to_socket_addrs())? + smol::unblock!((host.as_str(), port).to_socket_addrs())? .next() .context("cannot resolve address")? }; diff --git a/examples/hyper-server.rs b/examples/hyper-server.rs index 8b2a553..a3e5e02 100644 --- a/examples/hyper-server.rs +++ b/examples/hyper-server.rs @@ -21,10 +21,7 @@ use anyhow::{Error, Result}; use async_native_tls::{Identity, TlsAcceptor, TlsStream}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Request, Response, Server}; -use smol::{ - block_on, future, future::Future, io, io::AsyncRead, io::AsyncWrite, ready, stream::Stream, - Async, Task, -}; +use smol::{future, io, prelude::*, Async, Task}; /// Serves a request and returns a response. async fn serve(req: Request, host: String) -> Result> { @@ -59,7 +56,7 @@ fn main() -> Result<()> { let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?); // Start HTTP and HTTPS servers. - block_on(async { + smol::run(async { let http = listen(Async::::bind(([127, 0, 0, 1], 8000))?, None); let https = listen( Async::::bind(([127, 0, 0, 1], 8001))?, @@ -97,11 +94,11 @@ impl hyper::server::accept::Accept for SmolListener { type Error = Error; fn poll_accept( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context, ) -> Poll>> { let poll = Pin::new(&mut self.listener.incoming()).poll_next(cx); - let stream = ready!(poll).unwrap()?; + let stream = smol::ready!(poll).unwrap()?; let stream = match &self.tls { None => SmolStream::Plain(stream), @@ -150,7 +147,7 @@ impl tokio::io::AsyncRead for SmolStream { SmolStream::Plain(s) => return Pin::new(s).poll_read(cx, buf), SmolStream::Tls(s) => return Pin::new(s).poll_read(cx, buf), SmolStream::Handshake(f) => { - let s = ready!(f.as_mut().poll(cx))?; + let s = smol::ready!(f.as_mut().poll(cx))?; *self = SmolStream::Tls(s); } } @@ -169,7 +166,7 @@ impl tokio::io::AsyncWrite for SmolStream { SmolStream::Plain(s) => return Pin::new(s).poll_write(cx, buf), SmolStream::Tls(s) => return Pin::new(s).poll_write(cx, buf), SmolStream::Handshake(f) => { - let s = ready!(f.as_mut().poll(cx))?; + let s = smol::ready!(f.as_mut().poll(cx))?; *self = SmolStream::Tls(s); } } diff --git a/examples/linux-inotify.rs b/examples/linux-inotify.rs index f874996..cc36109 100644 --- a/examples/linux-inotify.rs +++ b/examples/linux-inotify.rs @@ -11,7 +11,7 @@ fn main() -> std::io::Result<()> { use std::ffi::OsString; use inotify::{EventMask, Inotify, WatchMask}; - use smol::{block_on, io, Async}; + use smol::{io, Async}; type Event = (OsString, EventMask); @@ -32,7 +32,7 @@ fn main() -> std::io::Result<()> { } } - block_on(async { + smol::run(async { // Watch events in the current directory. let mut inotify = Async::new(Inotify::init()?)?; inotify.get_mut().add_watch(".", WatchMask::ALL_EVENTS)?; diff --git a/examples/linux-timerfd.rs b/examples/linux-timerfd.rs index f2622a2..cb76204 100644 --- a/examples/linux-timerfd.rs +++ b/examples/linux-timerfd.rs @@ -11,7 +11,7 @@ fn main() -> std::io::Result<()> { use std::os::unix::io::AsRawFd; use std::time::{Duration, Instant}; - use smol::{block_on, io, Async}; + use smol::{io, Async}; use timerfd::{SetTimeFlags, TimerFd, TimerState}; /// Converts a [`nix::Error`] into [`std::io::Error`]. @@ -35,7 +35,7 @@ fn main() -> std::io::Result<()> { Ok(()) } - block_on(async { + smol::run(async { let start = Instant::now(); println!("Sleeping..."); diff --git a/examples/other-runtimes.rs b/examples/other-runtimes.rs index 00a2df5..fa5460d 100644 --- a/examples/other-runtimes.rs +++ b/examples/other-runtimes.rs @@ -1,9 +1,6 @@ //! Demonstrates how to use `async-std`, `tokio`, `surf`, and `request`. //! -//! There is an optional feature for seamless integration with crates depending on tokio. -//! It creates a global tokio runtime and sets up its context inside smol. -//! -//! Enable the feature as follows: +//! For compatibility with tokio-based libraries, enable the `tokio02` feature flag: //! //! ```toml //! [dependencies] @@ -19,10 +16,9 @@ use std::time::{Duration, Instant}; use anyhow::{Error, Result}; -use smol::block_on; fn main() -> Result<()> { - block_on(async { + smol::run(async { // Sleep using async-std. let start = Instant::now(); println!("Sleeping using async-std..."); diff --git a/examples/simple-client.rs b/examples/simple-client.rs index 44b98cd..fdfc167 100644 --- a/examples/simple-client.rs +++ b/examples/simple-client.rs @@ -9,7 +9,7 @@ use std::net::{TcpStream, ToSocketAddrs}; use anyhow::{bail, Context as _, Result}; -use smol::{block_on, io::AsyncReadExt, io::AsyncWriteExt, unblock, Async}; +use smol::{prelude::*, Async}; use url::Url; /// Sends a GET request and fetches the response. @@ -33,7 +33,7 @@ async fn fetch(addr: &str) -> Result> { // Connect to the host. let socket_addr = { let host = host.clone(); - unblock!((host.as_str(), port).to_socket_addrs())? + smol::unblock!((host.as_str(), port).to_socket_addrs())? .next() .context("cannot resolve address")? }; @@ -59,7 +59,7 @@ async fn fetch(addr: &str) -> Result> { } fn main() -> Result<()> { - block_on(async { + smol::run(async { let addr = "https://www.rust-lang.org"; let resp = fetch(addr).await?; println!("{}", String::from_utf8_lossy(&resp)); diff --git a/examples/simple-server.rs b/examples/simple-server.rs index e2c2c40..0ec477f 100644 --- a/examples/simple-server.rs +++ b/examples/simple-server.rs @@ -17,7 +17,7 @@ use std::net::{TcpListener, TcpStream}; use anyhow::Result; use async_native_tls::{Identity, TlsAcceptor}; -use smol::{block_on, future, io::AsyncWriteExt, Async, Task}; +use smol::{future, prelude::*, Async, Task}; const RESPONSE: &[u8] = br#" HTTP/1.1 200 OK @@ -80,7 +80,7 @@ fn main() -> Result<()> { let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?); // Start HTTP and HTTPS servers. - block_on(async { + smol::run(async { let http = listen(Async::::bind(([127, 0, 0, 1], 8000))?, None); let https = listen( Async::::bind(([127, 0, 0, 1], 8001))?, diff --git a/examples/tcp-client.rs b/examples/tcp-client.rs index 09b3a08..d98d532 100644 --- a/examples/tcp-client.rs +++ b/examples/tcp-client.rs @@ -14,10 +14,10 @@ use std::net::TcpStream; -use smol::{block_on, future, io, Async, Unblock}; +use smol::{future, io, Async, Unblock}; fn main() -> io::Result<()> { - block_on(async { + smol::run(async { // Create async stdin and stdout handles. let stdin = Unblock::new(std::io::stdin()); let mut stdout = Unblock::new(std::io::stdout()); diff --git a/examples/tcp-server.rs b/examples/tcp-server.rs index 5842093..5f37fa7 100644 --- a/examples/tcp-server.rs +++ b/examples/tcp-server.rs @@ -14,7 +14,7 @@ use std::net::{TcpListener, TcpStream}; -use smol::{block_on, io, Async, Task}; +use smol::{io, Async, Task}; /// Echoes messages from the client back to it. async fn echo(stream: Async) -> io::Result<()> { @@ -23,7 +23,7 @@ async fn echo(stream: Async) -> io::Result<()> { } fn main() -> io::Result<()> { - block_on(async { + smol::run(async { // Create a listener. let listener = Async::::bind(([127, 0, 0, 1], 7000))?; println!("Listening on {}", listener.get_ref().local_addr()?); diff --git a/examples/tls-client.rs b/examples/tls-client.rs index ae77c79..6c4d956 100644 --- a/examples/tls-client.rs +++ b/examples/tls-client.rs @@ -16,7 +16,7 @@ use std::net::TcpStream; use anyhow::Result; use async_native_tls::{Certificate, TlsConnector}; -use smol::{block_on, future, io, Async, Unblock}; +use smol::{future, io, Async, Unblock}; fn main() -> Result<()> { // Initialize TLS with the local certificate. @@ -24,7 +24,7 @@ fn main() -> Result<()> { builder.add_root_certificate(Certificate::from_pem(include_bytes!("certificate.pem"))?); let tls = TlsConnector::from(builder); - block_on(async { + smol::run(async { // Create async stdin and stdout handles. let stdin = Unblock::new(std::io::stdin()); let mut stdout = Unblock::new(std::io::stdout()); diff --git a/examples/tls-server.rs b/examples/tls-server.rs index 459e490..20431db 100644 --- a/examples/tls-server.rs +++ b/examples/tls-server.rs @@ -16,7 +16,7 @@ use std::net::{TcpListener, TcpStream}; use anyhow::Result; use async_native_tls::{Identity, TlsAcceptor, TlsStream}; -use smol::{block_on, io, Async, Task}; +use smol::{io, Async, Task}; /// Echoes messages from the client back to it. async fn echo(stream: TlsStream>) -> Result<()> { @@ -30,7 +30,7 @@ fn main() -> Result<()> { let identity = Identity::from_pkcs12(include_bytes!("identity.pfx"), "password")?; let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?); - block_on(async { + smol::run(async { // Create a listener. let listener = Async::::bind(([127, 0, 0, 1], 7001))?; println!("Listening on {}", listener.get_ref().local_addr()?); diff --git a/examples/unix-signal.rs b/examples/unix-signal.rs index 228c317..2483c43 100644 --- a/examples/unix-signal.rs +++ b/examples/unix-signal.rs @@ -10,9 +10,9 @@ fn main() -> std::io::Result<()> { use std::os::unix::net::UnixStream; - use smol::{block_on, io::AsyncReadExt, Async}; + use smol::{prelude::*, Async}; - block_on(async { + smol::run(async { // Create a Unix stream that receives a byte on each signal occurrence. let (a, mut b) = Async::::pair()?; signal_hook::pipe::register(signal_hook::SIGINT, a)?; diff --git a/examples/web-crawler.rs b/examples/web-crawler.rs index c4f360e..93b4649 100644 --- a/examples/web-crawler.rs +++ b/examples/web-crawler.rs @@ -11,7 +11,7 @@ use std::collections::{HashSet, VecDeque}; use anyhow::Result; use async_channel::{bounded, Sender}; use scraper::{Html, Selector}; -use smol::{block_on, Task}; +use smol::Task; const ROOT: &str = "https://www.rust-lang.org"; @@ -34,7 +34,7 @@ fn links(body: String) -> Vec { } fn main() -> Result<()> { - block_on(async { + smol::run(async { let mut seen = HashSet::new(); let mut queue = VecDeque::new(); seen.insert(ROOT.to_string()); diff --git a/examples/websocket-client.rs b/examples/websocket-client.rs index f523b71..8ddbc77 100644 --- a/examples/websocket-client.rs +++ b/examples/websocket-client.rs @@ -20,7 +20,7 @@ use anyhow::{bail, Context as _, Result}; use async_native_tls::{Certificate, TlsConnector, TlsStream}; use async_tungstenite::WebSocketStream; use futures::sink::{Sink, SinkExt}; -use smol::{block_on, stream::Stream, stream::StreamExt, unblock, Async}; +use smol::{prelude::*, Async}; use tungstenite::handshake::client::Response; use tungstenite::Message; use url::Url; @@ -35,7 +35,7 @@ async fn connect(addr: &str, tls: TlsConnector) -> Result<(WsStream, Response)> // Resolve the address. let socket_addr = { let host = host.clone(); - unblock!((host.as_str(), port).to_socket_addrs())? + smol::unblock!((host.as_str(), port).to_socket_addrs())? .next() .context("cannot resolve address")? }; @@ -64,7 +64,7 @@ fn main() -> Result<()> { builder.add_root_certificate(Certificate::from_pem(include_bytes!("certificate.pem"))?); let tls = TlsConnector::from(builder); - block_on(async { + smol::run(async { // Connect to the server. let (mut stream, resp) = connect("wss://127.0.0.1:9001", tls).await?; dbg!(resp); diff --git a/examples/websocket-server.rs b/examples/websocket-server.rs index 687cfc4..36e7de7 100644 --- a/examples/websocket-server.rs +++ b/examples/websocket-server.rs @@ -20,7 +20,7 @@ use anyhow::{Context as _, Result}; use async_native_tls::{Identity, TlsAcceptor, TlsStream}; use async_tungstenite::WebSocketStream; use futures::sink::{Sink, SinkExt}; -use smol::{block_on, future, stream::Stream, stream::StreamExt, Async, Task}; +use smol::{future, prelude::*, Async, Task}; use tungstenite::Message; /// Echoes messages from the client back to it. @@ -64,7 +64,7 @@ fn main() -> Result<()> { let tls = TlsAcceptor::from(native_tls::TlsAcceptor::new(identity)?); // Start WS and WSS servers. - block_on(async { + smol::run(async { let ws = listen(Async::::bind(([127, 0, 0, 1], 9000))?, None); let wss = listen( Async::::bind(([127, 0, 0, 1], 9001))?, diff --git a/examples/windows-uds.rs b/examples/windows-uds.rs index e97164a..5079552 100644 --- a/examples/windows-uds.rs +++ b/examples/windows-uds.rs @@ -10,7 +10,7 @@ fn main() -> std::io::Result<()> { use std::path::PathBuf; - use smol::{block_on, io, io::AsyncWriteExt, Async, Task, Unblock}; + use smol::{io, prelude::*, Async, Task, Unblock}; use tempfile::tempdir; use uds_windows::{UnixListener, UnixStream}; @@ -28,7 +28,7 @@ fn main() -> std::io::Result<()> { let dir = tempdir()?; let path = dir.path().join("socket"); - block_on(async { + smol::run(async { // Create a listener. let listener = Async::new(UnixListener::bind(&path)?)?; println!("Listening on {:?}", listener.get_ref().local_addr()?); diff --git a/src/lib.rs b/src/lib.rs index c507179..306e48f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,284 +1,156 @@ //! A small and fast async runtime. //! -//! This library provides: -//! -//! * Tools for working with [`future`]s, [`stream`]s, and async [I/O][`io`]. -//! * Hooks into epoll/kqueue/wepoll for [`Async`] I/O and [`Timer`]s. -//! * Glue between async and blocking code: [`block_on()`], [`BlockOn`], [`unblock()`], [`Unblock`]. -//! * An executor for spawning [`Task`]s. -//! -//! The whole implementation is a trivial amount of code - it mostly reexports types and functions -//! from other small and independent crates. -//! -//! The focus of the crate is on simplicity -//! //! # Examples //! -//! A simple TCP server that prints messages received from clients: +//! Connect to an HTTP website, make a GET request, and pipe the response to the standard output: //! -//! ```no_run -//! use smol::{Async, Task, Unblock}; -//! use std::net::TcpListener; +//! ``` +//! use async_net::TcpStream; +//! use smol::{io, prelude::*, Unblock}; //! -//! fn main() -> std::io::Result<()> { -//! smol::block_on(async { -//! // Start listening on port 9000. -//! let listener = Async::::bind(([127, 0, 0, 1], 9000))?; +//! fn main() -> io::Result<()> { +//! smol::run(async { +//! let mut stream = TcpStream::connect("example.com:80").await?; +//! let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n"; +//! stream.write_all(req).await?; //! -//! loop { -//! // Accept a new client. -//! let (stream, _) = listener.accept().await?; -//! -//! // Spawn a task handling this client. -//! let task = Task::spawn(async move { -//! // Create an async stdio handle. -//! let mut stdout = Unblock::new(std::io::stdout()); -//! -//! // Copy data received from the client into stdout. -//! smol::io::copy(&stream, &mut stdout).await -//! }); -//! -//! // Keep running the task in the background. -//! task.detach(); -//! } +//! let mut stdout = Unblock::new(std::io::stdout()); +//! io::copy(&stream, &mut stdout).await?; +//! Ok(()) //! }) //! } //! ``` //! -//! To interact with the server, run `nc 127.0.0.1 9000` and type a few lines of text. +//! This example uses [`async-net`] for networking, but you can also use the primitive [`Async`] +//! type. See the [full code][get-request]. //! -//! ### More examples +//! Look inside the [examples] directory for more. //! -//! Look inside the [examples] directory for more: -//! a [web crawler][web-crawler], -//! a [Ctrl-C handler][ctrl-c], -//! a TCP [client][tcp-client]/[server][tcp-server], -//! a TCP chat [client][chat-client]/[server][chat-server], -//! a TLS [client][tls-client]/[server][tls-server], -//! an HTTP+TLS [client][simple-client]/[server][simple-server], -//! an [async-h1] [client][async-h1-client]/[server][async-h1-server], -//! a [hyper] [client][hyper-client]/[server][hyper-server], -//! and a WebSocket+TLS [client][websocket-client]/[server][websocket-server]. +//! [`async-net`]: https://docs.rs/async-net +//! [examples]: https://github.com/stjepang/smol/tree/master/examples +//! [get-request]: https://github.com/stjepang/smol/blob/master/examples/get-request.rs //! -//! It's also possible to plug non-async libraries into the runtime: see -//! [inotify], [timerfd], [signal-hook], and [uds_windows]. +//! # Compatibility //! -//! Finally, there's an [example][other-runtimes] showing how to use smol with -//! [async-std], [tokio], [surf], and [reqwest]. +//! All async libraries work with smol out of the box. +//! +//! However, [tokio] is a special case due to its generally hostile attitude towards non-tokio +//! libraries, insistence on non-standard I/O traits, and lack of documentation on integration +//! with the larger Rust ecosystem. Fortunately, there are ways around it. +//! +//! Enable the `tokio02` feature flag and [`smol::run()`][`crate::run()`] will create a minimal +//! tokio runtime for its libraries: +//! +//! ```toml +//! [dependencies] +//! smol = { version = "0.2", features = ["tokio02"] } +//! ``` //! -//! [examples]: https://github.com/stjepang/smol/tree/master/examples/! -//! [async-h1]: https://docs.rs/async-h1 -//! [hyper]: https://docs.rs/hyper -//! [hyper]: https://docs.rs/tokio -//! [async-std]: https://docs.rs/async-std //! [tokio]: https://docs.rs/tokio -//! [surf]: https://docs.rs/surf -//! [reqwest]: https://docs.rs/reqwest -//! -//! [async-h1-client]: https://github.com/stjepang/smol/blob/master/examples/async-h1-client.rs -//! [async-h1-server]: https://github.com/stjepang/smol/blob/master/examples/async-h1-server.rs -//! [chat-client]: https://github.com/stjepang/smol/blob/master/examples/chat-client.rs -//! [chat-server]: https://github.com/stjepang/smol/blob/master/examples/chat-server.rs -//! [ctrl-c]: https://github.com/stjepang/smol/blob/master/examples/ctrl-c.rs -//! [hyper-client]: https://github.com/stjepang/smol/blob/master/examples/hyper-client.rs -//! [hyper-server]: https://github.com/stjepang/smol/blob/master/examples/hyper-server.rs -//! [inotify]: https://github.com/stjepang/smol/blob/master/examples/linux-inotify.rs -//! [other-runtimes]: https://github.com/stjepang/smol/blob/master/examples/other-runtimes.rs -//! [signal-hook]: https://github.com/stjepang/smol/blob/master/examples/unix-signal.rs -//! [simple-client]: https://github.com/stjepang/smol/blob/master/examples/simple-client.rs -//! [simple-server]: https://github.com/stjepang/smol/blob/master/examples/simple-server.rs -//! [tcp-client]: https://github.com/stjepang/smol/blob/master/examples/tcp-client.rs -//! [tcp-server]: https://github.com/stjepang/smol/blob/master/examples/tcp-server.rs -//! [timerfd]: https://github.com/stjepang/smol/blob/master/examples/linux-timerfd.rs -//! [tls-client]: https://github.com/stjepang/smol/blob/master/examples/tls-client.rs -//! [tls-server]: https://github.com/stjepang/smol/blob/master/examples/tls-server.rs -//! [uds_windows]: https://github.com/stjepang/smol/blob/master/examples/windows-uds.rs -//! [web-crawler]: https://github.com/stjepang/smol/blob/master/examples/web-crawler.rs -//! [websocket-client]: https://github.com/stjepang/smol/blob/master/examples/websocket-client.rs -//! [websocket-server]: https://github.com/stjepang/smol/blob/master/examples/websocket-server.rs #![forbid(unsafe_code)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] use std::future::Future; -use std::panic::catch_unwind; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::thread; -use multitask::Executor; -use once_cell::sync::Lazy; +use async_executor::{Executor, LocalExecutor}; +use easy_parallel::Parallel; +#[doc(inline)] pub use { + async_executor::Task, async_io::Async, async_io::Timer, - blocking::{block_on, BlockOn}, blocking::{unblock, Unblock}, futures_lite::{future, io, stream}, futures_lite::{pin, ready}, }; -/// A spawned future. +/// Async traits and their extensions. /// -/// Tasks are also futures themselves and yield the output of the spawned future. +/// # Examples /// -/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit -/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method. +/// ``` +/// use smol::prelude::*; +/// ``` +pub mod prelude { + #[doc(no_inline)] + pub use futures_lite::{ + future::Future, + io::{AsyncBufRead, AsyncBufReadExt}, + io::{AsyncRead, AsyncReadExt}, + io::{AsyncSeek, AsyncSeekExt}, + io::{AsyncWrite, AsyncWriteExt}, + stream::{Stream, StreamExt}, + }; +} + +/// Starts an executor and runs the future on it. /// -/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic. +/// This function runs two executors at the same time: +/// +/// 1. The current thread runs a [`LocalExecutor`] and the main `future` on it. +/// 2. A thread pool runs an [`Executor`] until the main `future` completes. +/// +/// The number of spawned threads matches the number of logical CPU cores on the system. /// /// # Examples /// /// ``` /// use smol::Task; /// -/// # blocking::block_on(async { -/// // Spawn a task onto the work-stealing executor. -/// let task = Task::spawn(async { -/// println!("Hello from a task!"); -/// 1 + 2 -/// }); -/// -/// // Wait for the task to complete. -/// assert_eq!(task.await, 3); -/// # }); +/// smol::run(async { +/// let task = Task::spawn(async { +/// println!("Hello world"); +/// }); +/// task.await; +/// }) /// ``` -#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] -#[derive(Debug)] -pub struct Task(multitask::Task); - -impl Task { - /// Spawns a future. - /// - /// # Examples - /// - /// ``` - /// use smol::Task; - /// - /// # blocking::block_on(async { - /// let task = Task::spawn(async { 1 + 2 }); - /// assert_eq!(task.await, 3); - /// # }); - /// ``` - pub fn spawn(future: F) -> Task - where - F: Future + Send + 'static, - T: Send + 'static, - { - static EXECUTOR: Lazy = Lazy::new(|| { - for _ in 0..num_cpus::get().max(1) { - thread::spawn(|| { - enter(|| { - let (p, u) = async_io::parking::pair(); - let ticker = EXECUTOR.ticker(move || u.unpark()); - - loop { - if let Ok(false) = catch_unwind(|| ticker.tick()) { - p.park(); - } - } - }) - }); - } - - Executor::new() - }); - - Task(EXECUTOR.spawn(future)) - } - - /// Detaches the task to let it keep running in the background. - /// - /// # Examples - /// - /// ```no_run - /// use async_io::Timer; - /// use smol::Task; - /// use std::time::Duration; - /// - /// # blocking::block_on(async { - /// Task::spawn(async { - /// loop { - /// println!("I'm a daemon task looping forever."); - /// Timer::new(Duration::from_secs(1)).await; - /// } - /// }) - /// .detach(); - /// # }) - /// ``` - pub fn detach(self) { - self.0.detach(); - } - - /// Cancels the task and waits for it to stop running. - /// - /// Returns the task's output if it was completed just before it got canceled, or [`None`] if - /// it didn't complete. - /// - /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of - /// canceling because it also waits for the task to stop running. - /// - /// # Examples - /// - /// ``` - /// use async_io::Timer; - /// use smol::Task; - /// use std::time::Duration; - /// - /// # blocking::block_on(async { - /// let task = Task::spawn(async { - /// loop { - /// println!("Even though I'm in an infinite loop, you can still cancel me!"); - /// Timer::new(Duration::from_secs(1)).await; - /// } - /// }); - /// - /// Timer::new(Duration::from_secs(3)).await; - /// task.cancel().await; - /// # }) - /// ``` - pub async fn cancel(self) -> Option { - self.0.cancel().await - } +pub fn run(future: impl Future) -> T { + setup(num_cpus::get(), future) } -impl Future for Task { - type Output = T; +#[cfg(not(feature = "tokio02"))] +fn setup(num_threads: usize, future: impl Future) -> T { + let ex = Executor::new(); + let local_ex = LocalExecutor::new(); - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.0).poll(cx) - } + // A channel that coordinates shutdown when the main future completes. + let (trigger, shutdown) = async_channel::unbounded::<()>(); + let future = async move { + let _trigger = trigger; // Dropped at the end of this async block. + future.await + }; + + Parallel::new() + .each(0..num_threads, |_| ex.run(shutdown.recv())) + .finish(|| ex.enter(|| local_ex.run(future))) + .1 } -/// Enters the tokio context if the `tokio` feature is enabled. -fn enter(f: impl FnOnce() -> T) -> T { - #[cfg(not(feature = "tokio02"))] - return f(); +#[cfg(feature = "tokio02")] +fn setup(num_threads: usize, future: impl Future) -> T { + let ex = Executor::new(); + let local_ex = LocalExecutor::new(); - #[cfg(feature = "tokio02")] - { - use std::cell::Cell; - use tokio::runtime::Runtime; + // A channel that signals shutdown to the thread pool when the main future completes. + let (s, shutdown) = async_channel::unbounded::<()>(); + let future = async move { + let _s = s; // Drops sender at the end of this async block. + future.await + }; - thread_local! { - /// The level of nested `enter` calls we are in, to ensure that the outermost always - /// has a runtime spawned. - static NESTING: Cell = Cell::new(0); - } + // A minimal tokio runtime. + let mut rt = tokio::runtime::Builder::new() + .enable_all() + .basic_scheduler() + .build() + .expect("cannot start tokio runtime"); + let handle = rt.handle().clone(); - /// The global tokio runtime. - static RT: Lazy = Lazy::new(|| Runtime::new().expect("cannot initialize tokio")); - - NESTING.with(|nesting| { - let res = if nesting.get() == 0 { - nesting.replace(1); - RT.enter(f) - } else { - nesting.replace(nesting.get() + 1); - f() - }; - nesting.replace(nesting.get() - 1); - res - }) - } + Parallel::new() + .add(|| ex.enter(|| rt.block_on(shutdown.recv()))) + .each(0..num_threads, |_| handle.enter(|| ex.run(shutdown.recv()))) + .finish(|| handle.enter(|| ex.enter(|| local_ex.run(future)))) + .1 }