From d598899e95f69e03261aee949792bb4dedcd02c0 Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Wed, 12 Dec 2018 18:18:22 -0500 Subject: [PATCH] Move draft from wg-net --- LICENSE | 2 +- src/TOC.md | 34 ++++ src/async_await/chapter.md | 154 ++++++++++++++++ src/execution/chapter.md | 13 ++ src/execution/executor.md | 183 +++++++++++++++++++ src/execution/future.md | 189 +++++++++++++++++++ src/execution/io.md | 135 ++++++++++++++ src/execution/wakeups.md | 142 +++++++++++++++ src/getting_started/async_await_primer.md | 103 +++++++++++ src/getting_started/chapter.md | 25 +++ src/getting_started/http_server_example.md | 199 +++++++++++++++++++++ src/getting_started/state_of_async_rust.md | 0 src/getting_started/why_async.md | 61 +++++++ src/pinning/chapter.md | 138 ++++++++++++++ src/streams/chapter.md | 104 +++++++++++ 15 files changed, 1481 insertions(+), 1 deletion(-) create mode 100644 src/TOC.md create mode 100644 src/async_await/chapter.md create mode 100644 src/execution/chapter.md create mode 100644 src/execution/executor.md create mode 100644 src/execution/future.md create mode 100644 src/execution/io.md create mode 100644 src/execution/wakeups.md create mode 100644 src/getting_started/async_await_primer.md create mode 100644 src/getting_started/chapter.md create mode 100644 src/getting_started/http_server_example.md create mode 100644 src/getting_started/state_of_async_rust.md create mode 100644 src/getting_started/why_async.md create mode 100644 src/pinning/chapter.md create mode 100644 src/streams/chapter.md diff --git a/LICENSE b/LICENSE index 5b3db54..5e5d5d7 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2018 The Rust Programming Language +Copyright (c) 2018 Aaron Turon Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/src/TOC.md b/src/TOC.md new file mode 100644 index 0000000..2396182 --- /dev/null +++ b/src/TOC.md @@ -0,0 +1,34 @@ +# Table of Contents + +- [Getting Started](getting_started/chapter.md) + - [What This Book Covers](getting_started/chapter.md#what-this-book-covers) + - [Why Async?](getting_started/why_async.md) + - [The State of Asynchronous Rust](getting_started/state_of_async_rust.md) + - [`async`/`await!` Primer](getting_started/async_await_primer.md) + - [Applied: HTTP Server](getting_started/http_server_example.md) +- [Under the Hood: Executing `Future`s and Tasks](execution/chapter.md) + - [The `Future` Trait](execution/future.md) + - [Task Wakeups with `LocalWaker` and `Waker`](execution/wakeups.md) + - [Applied: Build a Timer](execution/wakeups.md) + - [Applied: Build an Executor](execution/executor.md) + - [Executors and System IO](execution/io.md) +- [`async`/`await`]() + - [What and Why]() + - [`async` Blocks, Closures, and Functions]() + - [Applied: XXX]() +- [Pinning](pinning/chapter.md) + - [Practical Usage](pinning/chapter.md#how-to-use-pinning) +- [Streams](streams/chapter.md) + - [Patterns: Iteration and Concurrency]() +- [Executing Multiple Futures at a Time]() + - [`select!` and `join!`]() + - [Spawning]() + - [Cancellation and Timeouts]() + - [`FuturesUnordered`]() +- [I/O]() + - [`AsyncRead` and `AsyncWrite`]() +- [Asynchronous Design Patterns: Solutions and Suggestions]() + - [Modeling Servers and the Request/Response Pattern]() + - [Managing Shared State]() +- [The Ecosystem: Tokio and More]() + - Lots, lots more?... diff --git a/src/async_await/chapter.md b/src/async_await/chapter.md new file mode 100644 index 0000000..8b2eb23 --- /dev/null +++ b/src/async_await/chapter.md @@ -0,0 +1,154 @@ +# `async`/`await!` + +In [the first chapter], we took a brief look at `async`/`await!` and used +it to build a simple server. This chapter will discuss `async`/`await!` in +greater detail, explaining how it works and how `async` code differs from +traditional Rust programs. + +`async`/`await!` are special pieces of Rust syntax that make it possible to +yield control of the current thread rather than blocking, allowing other +code to make progress while waiting on an operation to complete. + +There are three main ways to use `async`: `async fn`, `async` blocks, and +`async` closures. Each returns a value that implements the `Future` trait: + +```rust +// `foo()` returns a type that implements `Future`. +// `await!(foo())` will result in a value of type `u8`. +async fn foo() -> u8 { 5 } + +fn bar() -> impl Future { + // This `async` block results in a type that implements + // `Future`. + async { + let x: u8 = await!(foo()); + x + 5 + } +} + +fn baz() -> impl Future { + // This `async` closure, when called, returns a type that + // implements `Future` + let closure = async |x: u8| { + await!(bar()) + x + }; + closure(5) +} +``` + +As we saw in the first chapter, `async` bodies and other futures are lazy: +they do nothing until they are run. The most common way to run a `Future` +is to `await!` it. When `await!` is called on a `Future`, it will attempt +to run it to completion. If the `Future` is blocked, it will yield control +of the current thread. When more progress can be made, the `Future` will be picked +up by the executor and will resume running, allowing the `await!` to resolve. + +## `async` Lifetimes + +Unlike traditional functions, `async fn`s which take references or other +non-`'static` arguments return a `Future` which is bounded by the lifetime of +the arguments: + +```rust +// This function: +async fn foo(x: &u8) -> u8 { *x } + +// Is equivalent ot this function: +fn foo<'a>(x: &'a u8) -> impl Future + 'a { + async { *x } +} +``` + +This means that the future returned from an `async fn` must be `await!`ed +while its non-`'static` arguments are still valid. In the common +case of `await!`ing the future immediately after calling the function +(like `await!(foo(&x))`) this is not an issue. However, if storing the future +or sending it over to another task or thread, this may be an issue. + +One common workaround for turning an `async fn` with references-as-arguments +into a `'static` future is to bundle the arguments with the call to the +`async fn` inside an `async` block: + +```rust +async fn foo(x: &u8) -> u8 { *x } + +fn bad() -> impl Future { + let x = 5; + foo(&x) // ERROR: `x` does not live long enough +} + +fn good() -> impl Future { + async { + let x = 5; + await!(foo(&x)) + } +} +``` + +By moving the argument into the `async` block, we extend its lifetime to match +that of the `Future` returned from the call to `foo`. + +## `async move` + +`async` blocks and closures allow the `move` keyword, much like normal +closures. An `async move` block will take ownership of the variables it +references, allowing it to outlive the current scope, but giving up the ability +to share those variables with other code: + +```rust +/// `async` block: +/// +/// Multiple different `async` blocks can access the same local variable +/// so long as they're executed within the variable's scope. +async fn foo() { + let my_string = "foo".to_string(); + + let future_one = async { + ... + println!("{}", my_string); + }; + + let future_two = async { + ... + println!("{}", my_string); + }; + + // Run both futures to completion, printing "foo" twice + let ((), ()) = join!(future_one, future_two); +} + +/// `async move` block: +/// +/// Only one `async` block can access captured variables, since they are +/// moved into the `Future` generated by the `async` block. However, +/// this allows the `Future` to outlive the original scope of the variable: +fn foo() -> impl Future { + let my_string = "foo".to_string(); + async move { + ... + println!("{}", my_string); + } +} +``` + +## `await!`ing on a Multithreaded Executor + +Note that, when using a multithreaded `Future` executor, a `Future` may move +between threads, so any variables used in `async` bodies must be able to travel +between threads, as any `await!` can potentially result in a switch to a new +thread. + +This means that it is not safe to use `Rc`, `&RefCell` or any other types +that don't implement the `Send` trait, including references to types that don't +implement the `Sync` trait. + +(Caveat: it is possible to use these types so long as they aren't in scope +during a call to `await!`.) + +Similarly, it isn't a good idea to hold a traditional non-futures-aware lock +across an `await!`, as it can cause the threadpool to lock up: one task could +take out a lock, `await!` and yield to the executor, allowing another task to +attempt to take the lock and cause a deadlock. To avoid this, use the `Mutex` +in `futures::lock` rather than the one from `std::sync`. + +[the first chapter]: TODO ../getting_started/async_await_primer.md diff --git a/src/execution/chapter.md b/src/execution/chapter.md new file mode 100644 index 0000000..7a5e9d4 --- /dev/null +++ b/src/execution/chapter.md @@ -0,0 +1,13 @@ +# Under the Hood: Executing `Future`s and Tasks + +In this section, we'll cover the underlying structure of how `Future`s and +asynchronous tasks are scheduled. If you're only interested in learning +how to write higher-level code that uses existing `Future` types and aren't +interested in the details of how `Future` types work, you can skip ahead to +the `async`/`await` chapter. However, several of the topics discussed in this +chapter are useful for understanding how `async`/`await` code works, +understanding the runtime and performance properties of `async`/`await` code, +and building new asynchronous primitives. If you decide to skip this section +now, you may want to bookmark it to revisit in the future. + +Now, with that out of the, way, let's talk about the `Future` trait. diff --git a/src/execution/executor.md b/src/execution/executor.md new file mode 100644 index 0000000..a2245a7 --- /dev/null +++ b/src/execution/executor.md @@ -0,0 +1,183 @@ +# Applied: Build an Executor + +`Future`s are lazy and must be actively driven to completion in order to do +anything. A common way to drive a future to completion is to `await!` it inside +an `async` function, but that just pushes the problem one level up: who will +run the futures returned from the top-level `async` functions? The answer is +that we need a `Future` executor. + +`Future` executors take a set of top-level `Future`s and run them to completion +by calling `poll` whenever the `Future` can make progress. Typically, an +executor will `poll` a future once to start off. When `Future`s indicate that +they are ready to make progress by calling `wake()`, they are placed back +onto a queue and `poll` is called again, repeating until the `Future` has +completed. + +In this section, we'll write our own simple executor capable of running a large +number of top-level futures to completion concurrently. + +For this one, we're going to have to include the `futures` crate in order to +get the `FutureObj` type, which is a dynamically-dispatched `Future`, similar +to `Box>`. `Cargo.toml` should look something like this: + +```toml +[package] +name = "xyz" +version = "0.1.0" +authors = ["XYZ Author"] +edition = "2018" + +[dependencies] +futures-preview = "0.3.0-alpha.9" +``` + +Next, we need the following imports at the top of `src/main.rs`: + +```rust +#![feature(arbitrary_self_types, async_await, await_macro, futures_api, pin)] + +use { + futures::future::FutureObj, + std::{ + future::Future, + pin::Pin, + sync::{Arc, Mutex}, + sync::mpsc::{sync_channel, SyncSender, Receiver}, + task::{ + local_waker_from_nonlocal, + Poll, Wake, + }, + }, +}; +``` + +Our executor will work by sending tasks to run over a channel. The executor +will pull events off of the channel and run them. When a task is ready to +do more work (is awoken), it can schedule itself to be polled again by +putting itself back onto the channel. + +In this design, the executor itself just needs the receiving end of the task +channel. The user will get a sending end so that they can spawn new futures. +Tasks themselves are just futures that can reschedule themselves, so we'll +store them as a future paired with a sender that the task can use to requeue +itself. + +```rust +/// Task executor that receives tasks off of a channel and runs them. +struct Executor { + ready_queue: Receiver>, +} + +/// `Spawner` spawns new futures onto the task channel. +#[derive(Clone)] +struct Spawner { + task_sender: SyncSender>, +} + +/// A future that can reschedule itself to be polled using a channel. +struct Task { + // In-progress future that should be pushed to completion + // + // The `Mutex` is not necessary for correctness, since we only have + // one thread executing tasks at once. However, `rustc` isn't smart + // enough to know that `future` is only mutated from one thread, + // so we use it in order to provide safety. A production executor would + // not need this, and could use `UnsafeCell` instead. + future: Mutex>>, + + // Handle to spawn tasks onto the task queue + task_sender: SyncSender>, +} + +fn new_executor_and_spawner() -> (Executor, Spawner) { + // Maximum number of tasks to allow queueing in the channel at once. + // This is just to make `sync_channel` happy, and wouldn't be present in + // a real executor. + const MAX_QUEUED_TASKS: usize = 10_000; + let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS); + (Executor { ready_queue }, Spawner { task_sender}) +} +``` + +Let's also add a method to spawner to make it easy to spawn new futures. +This method will take a future type, box it and put it in a FutureObj, +and create a new `Arc` with it inside which can be enqueued onto the +executor. + +```rust +impl Spawner { + fn spawn(&self, future: impl Future + 'static + Send) { + let future_obj = FutureObj::new(Box::new(future)); + let task = Arc::new(Task { + future: Mutex::new(Some(future_obj)), + task_sender: self.task_sender.clone(), + }); + self.task_sender.send(task).expect("too many tasks queued"); + } +} +``` + +In order poll futures, we'll also need to create a `LocalWaker` to provide to +poll. As discussed in the [task wakeups section], `LocalWaker`s are responsible +for scheduling a task to be polled again once `wake` is called. Remember that +`LocalWaker`s tell the executor exactly which task has become ready, allowing +them to poll just the futures that are ready to make progress. The easiest way +to create a new `LocalWaker` is by implementing the `Wake` trait and then using +the `local_waker_from_nonlocal` or `local_waker` functions to turn a `Arc` +into a `LocalWaker`. Let's implement `Wake` for our tasks to allow them to be +turned into `LocalWaker`s and awoken: + +```rust +impl Wake for Task { + fn wake(arc_self: &Arc) { + // Implement `wake` by sending this task back onto the task channel + // so that it will be polled again by the executor. + let cloned = arc_self.clone(); + arc_self.task_sender.send(cloned).expect("too many tasks queued"); + } +} +``` + +When a `LocalWaker` is created from an `Arc`, calling `wake()` on it will +cause a copy of the `Arc` to be sent onto the task channel. Our executor then +needs to pick up the task and poll it. Let's implement that: + +```rust +impl Executor { + fn run(&self) { + while let Ok(task) = self.ready_queue.recv() { + let mut future_slot = task.future.lock().unwrap(); + // Take the future, and if it has not yet completed (is still Some), + // poll it in an attempt to complete it. + if let Some(mut future) = future_slot.take() { + // Create a `LocalWaker` from the task itself + let lw = local_waker_from_nonlocal(task.clone()); + if let Poll::Pending = Pin::new(&mut future).poll(&lw) { + // We're not done processing the future, so put it + // back in its task to be run again in the future. + *future_slot = Some(future); + } + } + } + } +} +``` + +Congratulations! We now have a working futures executor. We can even use it +to run `async/await!` code and custom futures, such as the `TimerFuture` we +wrote earlier: + +```rust +fn main() { + let (executor, spawner) = new_executor_and_spawner(); + spawner.spawn(async { + println!("howdy!"); + // Wait for our timer future to complete after two seconds. + await!(TimerFuture::new(Duration::new(2, 0))); + println!("done!"); + }); + executor.run(); +} +``` + +[task wakeups section]: TODO diff --git a/src/execution/future.md b/src/execution/future.md new file mode 100644 index 0000000..249aa76 --- /dev/null +++ b/src/execution/future.md @@ -0,0 +1,189 @@ +# The `Future` Trait + +The `Future` trait is at the center of asynchronous programming in Rust. +A `Future` is an asynchronous computation that can produce a value +(although that value may be empty, e.g. `()`). A *simplified* version of +the future trait might look something like this: + +```rust +trait SimpleFuture { + type Output; + fn poll(&mut self, wake: fn()) -> Poll; +} + +enum Poll { + Ready(T), + Pending, +} +``` + +Futures can be advanced by calling the `poll` function, which will drive the +future as far towards completion as possible. If the future completes, it +returns `Poll::Ready(result)`. If the future is not able to complete yet, it +returns `Poll::Pending` and arranges for the `wake()` function to be called +when the `Future` is ready to make more progress. When `wake()` is called, the +executor driving the `Future` will call `poll` again so that the `Future` can +make more progress. + +Without `wake()`, the executor would have no way of knowing when a particular +future could make progress, and would have to be constantly polling every +future. With `wake()`, the executor knows exactly which futures are ready to +be `poll`ed. + +For example, consider the case where we want to read from a socket that may +or may not have data available already. If there is data, we can read it +in and return `Poll::Ready(data)`, but if no data is ready, our future is +blocked and can no longer make progress. When no data is available, we +must register `wake` to be called when data becomes ready on the socket, +which will tell the executor that our future is ready to make progress. +A simple `SocketRead` future might look something like this: + +```rust +struct SocketRead<'a> { + socket: &'a Socket, +} + +impl SimpleFuture for SocketRead<'_> { + type Output = Vec; + + fn poll(&mut self, wake: fn()) -> Poll { + if self.socket.has_data_to_read() { + // The socket has data-- read it into a buffer and return it. + Poll::Ready(self.socket.read_buf()) + } else { + // The socket does not yet have data. + // + // Arrange for `wake` to be called once data is available. + // When data becomes available, `wake` will be called, and the + // user of this `Future` will know to call `poll` again and + // receive data. + self.socket.set_readable_callback(wake); + Poll::Pending + } + } +} +``` + +This model of `Future`s allows for composing together multiple asynchronous +operations without needing intermediate allocations. Running multiple futures +at once or chaining futures together can be implemented via allocation-free +state machines, like this: + +```rust +/// A SimpleFuture that runs two other futures to completion concurrently. +/// +/// Concurrency is achieved via the fact that calls to `poll` each future +/// may be interleaved, allowing each future to advance itself at its own pace. +struct Join2 { + // Each field may contain a future that should be run to completion. + // If the future has already completed, the field is set to `None`. + a: Option, + b: Option, +} + +impl SimpleFuture for Join2 { + type Output = (); + fn poll(&mut self, wake: fn()) -> Poll { + // Attempt to complete future `a`. + let finished_a = match &mut self.a { + Some(a) => { + match a.poll(wake) { + Poll::Ready(()) => true, + Poll::Pending => false, + } + } + None => true, + }; + if finished_a { self.a.take() } + + // Attempt to complete future `b`. + let finished_b = match &mut self.b { + Some(b) => { + match b.poll(wake) { + Poll::Ready(()) => true, + Poll::Pending => false, + } + } + None => true, + }; + if finished_b { self.b.take() } + + if finished_a && finished_b { + // Both futures have completed-- we can return successfully + Poll::Ready(()) + } else { + // One or both futures still have work to do, and will call + // `wake()` when progress can be made. + Poll::Pending + } + } +} +``` + +This shows how multiple futures can be run simultaneously without needing +separate allocations, allowing for more efficient asynchronous programs. +Similarly, multiple sequential futures can be run one after another, like this: + +```rust +/// A SimpleFuture that runs two futures to completion, one after another. +// +// Note: for the purposes of this simple example, `AndThenFut` assumes both +// the first and second futures are available at creation-time. The real +// `AndThen` combinator allows creating the second future based on the output +// of the first future, like `get_breakfast.and_then(|food| eat(food))`. +enum AndThenFut { + first: Option, + second: FutureB, +} + +impl SimpleFuture for AndThenFut { + type Output = (); + fn poll(&mut self, wake: fn()) -> Poll { + if let Some(first) = &mut self.first { + match first.poll(wake) { + // We've completed the first future-- remove it and start on + // the second! + Poll::Ready(()) => self.first.take(), + // We couldn't yet complete the first future. + Poll::Pending => return Poll::Pending, + } + } + // Now that the first future is done, attempt to complete the second. + second.poll(wake) + } +} +``` + +These examples show how the `Future` trait can be used to express asynchronous +control flow without requiring multiple allocated objects and deeply nested +callbacks. With the basic control-flow out of the way, let's talk about the +real `Future` trait and how it is different. + +```rust +trait Future { + type Output; + fn poll( + // note the change from `&mut self` to `Pin<&mut Self>` + self: Pin<&mut Self>, + lw: &LocalWaker, // note the change from `wake: fn()` + ) -> Poll; +} +``` + +The first change you'll notice is that our `self` type is no longer `&mut self`, +but has changed to `Pin<&mut Self>`. We'll talk more about pinning in [a later +section][pinning], but for now know that it allows us to create futures that +are immovable. Immovable objects can store pointers between their fields, +e.g. `struct MyFut { a: i32, ptr_to_a: *const i32 }`. This feature is necessary +in order to enable async/await. + +Secondly, `wake: fn()` has changed to `LocalWaker`. In `SimpleFuture`, we used +a call to a function pointer (`fn()`) to tell the future executor that the +future in question should be polled. However, since `fn()` is zero-sized, it +can't store any data about *which* `Future` called `wake`. +In a real-world scenario, a complex application like a web server may have +thousands of different connections whose wakeups should all be +managed separately. This is where `LocalWaker` and its sibling type `Waker` +come in. + +[pinning]: TODO diff --git a/src/execution/io.md b/src/execution/io.md new file mode 100644 index 0000000..89e359c --- /dev/null +++ b/src/execution/io.md @@ -0,0 +1,135 @@ +# Executors and System IO + +In the previous section on [The `Future` Trait], we discussed this example of +a future that performed an asynchronous read on a socket: + +```rust +struct SocketRead<'a> { + socket: &'a Socket, +} + +impl SimpleFuture for SocketRead<'_> { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll { + if self.socket.has_data_to_read() { + // The socket has data-- read it into a buffer and return it. + Poll::Ready(self.socket.read_buf()) + } else { + // The socket does not yet have data. + // + // Arrange for `wake` to be called once data is available. + // When data becomes available, `wake` will be called, and the + // user of this `Future` will know to call `poll` again and + // receive data. + self.socket.set_readable_callback(lw); + Poll::Pending + } + } +} +``` + +This future will read available data on a socket, and if no data is available, +it will yield to the executor, requesting that its task be awoken when the +socket becomes readable again. However, it's not clear from this example how +the `Socket` type is implemented, and in particular it isn't obvious how the +`set_readable_callback` function works. How can we arrange for `lw.wake()` +to be called once the socket becomes readable? One option would be to have +a thread that continually checks whether `socket` is readable, calling +`wake()` when appropriate. However, this would be quite inefficient, requiring +a separate thread for each blocked IO future. This would greatly reduce the +efficiency of our async code. + +In practice, this problem is solved through integration with an IO-aware +system blocking primitive, such as `epoll` on Linux, `kqueue` on FreeBSD and +Mac OS, IOCP on Windows, and `port`s on Fuchsia (all of which are exposed +through the cross-platform Rust crate [`mio`]). These primitives all allow +a thread to block on multiple asynchronous IO events, returning once one of +the events completes. In practice, these APIs usually look something like +this: + +```rust +struct IoBlocker { + ... +} + +struct Event { + // An ID uniquely identifying the event that occurred and was listened for. + id: usize, + + // A set of signals to wait for, or which occurred. + signals: Signals, +} + +impl IoBlocker { + /// Create a new collection of asynchronous IO events to block on. + fn new() -> Self { ... } + + /// Express an interest in a particular IO event. + fn add_io_event_interest( + &self, + + /// The object on which the event will occur + io_object: &IoObject, + + /// A set of signals that may appear on the `io_object` for + /// which an event should be triggered, paried with + /// an ID to give to events that result from this interest. + event: Event, + ) { ... } + + /// Block until one of the events occurs. + /// This will only trigget + fn block(&self) -> Event { ... } +} + +let mut io_blocker = IoBlocker::new(); +io_blocker.add_io_event_interest( + &socket_1, + Event { id: 1, signals: READABLE }, +); +io_blocker.add_io_event_interest( + &socket_2, + Event { id: 2, signals: READABLE | WRITABLE }, +); +let event = io_blocker.block(); + +// prints e.g. "Socket 1 is now READABLE" if socket one became readable. +println!("Socket {:?} is now {:?}", event.id, event.signals); +``` + +Futures executors can use these primitives to provide asynchronous IO objects +such as sockets that can configure callbacks to be run when a particular IO +event occurs. In the case of our `SocketRead` example above, the +`Socket::set_readable_callback` function might look like the following pseudocode: + +```rust +impl Socket { + fn set_readable_callback(&self, lw: &LocalWaker) { + // `local_executor` is a reference to the local executor. + // this could be provided at creation of the socket, but in practice + // many executor implementations pass it down through thread local + // storage for convenience. + let local_executor = self.local_executor; + + // Unique ID for this IO object. + let id = self.id; + + // Store the local waker in the executor's map so that it can be called + // once the IO event arrives. + local_executor.event_map.insert(id, lw.clone()); + local_executor.add_io_event_interest( + &self.socket_file_descriptor, + Event { id, signals: READABLE }, + ); + } +} +``` + +We can now have just one executor thread which can receive and dispatch any +IO event to the appropriate `LocalWaker`, which will wake up the corresponding +task, allowing the executor to drive more tasks to completion before returning +to check for more IO events (and the cycle continues...). + +[The `Future` Trait]: TODO +[`mio`]: https://github.com/carllerche/mio diff --git a/src/execution/wakeups.md b/src/execution/wakeups.md new file mode 100644 index 0000000..512f6da --- /dev/null +++ b/src/execution/wakeups.md @@ -0,0 +1,142 @@ +# Task Wakeups with `LocalWaker` and `Waker` + +It's common that futures aren't able to complete the first time they are +`poll`ed. When this happens, the future needs to ensure that it is polled +again once it is ready to make more progress. This is done with the +`LocalWaker` and `Waker` types. + +Each time a future is polled, it is polled as part of a "task". Tasks are +the top-level futures that have been submitted to an executor. + +`LocalWaker` and `Waker` each provide a `wake()` method that can be used to +tell the executor that their associated task should be awoken. When `wake()` is +called, the executor knows that the task associated with the `Waker` is ready to +make progress, and its future should be polled again. + +`LocalWaker` and `Waker` also implement `clone()` so that +they can be copied around and stored. The difference between the two is +thread-safety: `LocalWaker` is `!Send` and `!Sync`, and so cannot be used from +threads other than the one it was created from. This allows `LocalWaker` +implementations to perform special optimized behavior for the current thread. +`Waker`s, on the other hand, are `Send` and `Sync`, and so can be used across +multiple threads. A `LocalWaker` can be turned into a thread-safe `Waker` using +the `into_waker()` function. This function is free to call-- it doesn't +allocate at runtime or anything similar, but calling `wake()` on the resulting +`Waker` may be less performant than calling `wake()` on the original +`LocalWaker`. + +Let's try implementing a simple timer future using `Waker` and `LocalWaker`. + +## Applied: Build a Timer + +For the sake of the example, we'll just spin up a new thread when the timer +is created, sleep for the required time, and then signal the timer future +when the time window has elapsed. + +Here are the imports we'll need to get started: + +```rust +#![feature(arbitrary_self_types, futures_api, pin)] + +use std::{ + future::Future, + pin::{Pin, Unpin}, + sync::{Arc, Mutex}, + task::{LocalWaker, Poll, Waker}, + thread, + time::Duration, +}; +``` + +Let's start by defining the future type itself. Our future needs a way for the +thread to communicate that the timer has elapsed and the future should complete. +We'll use a shared `Arc>` value to communicate between the thread and +the future. + +```rust +struct TimerFuture { + shared_state: Arc>, +} + +/// Shared state between the future and the thread +struct SharedState { + /// Whether or not the sleep time has elapsed + completed: bool, + + /// The waker for the task that `TimerFuture` is running on. + /// The thread can use this after setting `completed = true` to tell + /// `TimerFuture`'s task to wake up, see that `completed = true`, and + /// move forward. + waker: Option, +} + +// Pinning will be covered later-- for now, it's enough to understand that our +// `TimerFuture` type doesn't require it, so it is `Unpin`. +impl Unpin for TimerFuture {} +``` + +Now, let's actually write the `Future` implementation! + +```rust +impl Future for TimerFuture { + type Output = (); + fn poll(self: Pin<&mut Self>, lw: &LocalWaker) + -> Poll + { + // Look at the shared state to see if the timer has already completed. + let mut shared_state = self.shared_state.lock().unwrap(); + if shared_state.completed { + Poll::Ready(()) + } else { + // Set waker so that the thread can wake up the current task + // when the timer has completed, ensuring that the future is polled + // again and sees that `completed = true`. + shared_state.waker = Some(lw.clone().into_waker()); + Poll::Pending + } + } +} +``` + +Pretty simple, right? If the thread has set `shared_state.completed = true`, +we're done! Otherwise, we clone the `LocalWaker` for the current task, +convert it into a `Waker`, and pass it to `shared_state.waker` so that the +thread can wake the task back up. + +Importantly, we have to update the `Waker` every time the future is polled +because the future may have moved to a different task with a different +`Waker`. This will happen when futures are passed around between tasks after +being polled. + +Finally, we need the API to actually construct the timer and start the thread: + +```rust +impl TimerFuture { + /// Create a new `TimerFuture` which will complete after the provided + /// timeout. + pub fn new(duration: Duration) -> Self { + let shared_state = Arc::new(Mutex::new(SharedState { + completed: false, + waker: None, + })); + + // Spawn the new thread + let thread_shared_state = shared_state.clone(); + thread::spawn(move || { + thread::sleep(duration); + let mut shared_state = thread_shared_state.lock().unwrap(); + // Signal that the timer has completed and wake up the last + // task on which the future was polled, if one exists. + shared_state.completed = true; + if let Some(waker) = &shared_state.waker { + waker.wake(); + } + }); + + TimerFuture { shared_state } + } +} +``` + +Woot! That's all we need to build a simple timer future. Now, if only we had +an executor to run the future on... diff --git a/src/getting_started/async_await_primer.md b/src/getting_started/async_await_primer.md new file mode 100644 index 0000000..02c7302 --- /dev/null +++ b/src/getting_started/async_await_primer.md @@ -0,0 +1,103 @@ +# `async`/`await!` Primer + +`async`/`await!` is Rust's built-in tool for writing asynchronous functions +that look like synchronous code. `async` transforms a block of code into a +state machine that implements a trait called `Future`. Whereas calling a +blocking function in a synchronous method would block the whole thread, +blocked `Future`s will yield control of the thread, allowing other +`Future`s to run. + +To create an asynchronous function, you can use the `async fn` syntax: + +```rust +async fn do_something() { ... } +``` + +The value returned by `async fn` is a `Future` that needs to be run on +an executor in order for anything to happen: + +```rust +// `block_on` blocks the current thread until the provided future has run to +// completion. Other executors provide more complex behavior, like scheudling +// multiple futures onto the same thread. +use futures::executor::block_on; + +async fn hello_world() { + println!("hello, world!"); +} + +fn main() { + let future = hello_world(); // Nothing is printed + block_on(future); // `future` is run and "hello, world!" is printed +} +``` + +Inside an `async fn`, you can use `await!` to wait for the completion of +another type that implements the `Future` trait, such as the output of +another `async fn`. Unlike `block_on`, `await!` doesn't block the current +thread, but instead asynchronously waits for the future to complete, allowing +other tasks to run if the future is currently unable to make progress. + +For example, imagine that we have three `async fn`: `learn_song`, `sing_song`, +and `dance`: + +```rust +async fn learn_song() -> Song { ... } +async fn sing_song(song: Song) { ... } +async fn dance() { ... } +``` + +One way to do learn, sing, and dance would be to block on each of these +individually: + +```rust +fn main() { + let song = block_on(learn_song()); + block_on(sing_song(song)); + block_on(dance); +} +``` + +However, we're not giving the best performance possible this way-- we're +only ever doing one thing at once! Clearly we have to learn the song before +we can sing it, but it's possible to dance at the same time as learning and +singing the song. To do this, we can create two separate `async fn` which +can be run concurrently: + +```rust +async fn learn_and_sing() { + // Wait until the song has been learned before singing it. + // We use `await!` here rather than `block_on` to prevent blocking the + // thread, which makes it possible to `dance` at the same time. + let song = await!(learn_song()); + await!(sing_song(song)); +} + +async fn async_main() { + let f1 = learn_and_sing(); + let f2 = dance(); + + // `join!` is like `await!` but can wait for multiple futures concurrently. + // If we're temporarily blocked in the `learn_and_sing` future, the `dance` + // future will take over the current thread. If `dance` becomes blocked, + // `learn_and_sing` can take back over. If both futures are blocked, then + // `async_main` is blocked and will yield to the executor. + join!(f1, f2) +} + +fn main() { + block_on(async_main()); +} +``` + +In this example, learning the song must happen before singing the song, but +both learning and singing can happen at the same time as dancing. If we used +`block_on(learn_song())` rather than `await!(learn_song())` in `learn_and_sing`, +the thread wouldn't be able to do anything else while `learn_song` was running. +This would make it impossible to dance at the same time. By `await!`ing +the `learn_song` future, we allow other tasks to take over the current thread +if `learn_song` is blocked. This makes it possible to run multiple futures +to completion concurrently on the same thread. + +Now that you've learned the basics of `async`/`await!`, let's try out an +example. diff --git a/src/getting_started/chapter.md b/src/getting_started/chapter.md new file mode 100644 index 0000000..b8a837d --- /dev/null +++ b/src/getting_started/chapter.md @@ -0,0 +1,25 @@ +# Getting Started + +Welcome to Asynchronous Programming in Rust! If you're looking to start writing +asynchronous Rust code, you've come to the right place. Whether you're building +a web server, a database, or an operating system, this book will show you +how to use Rust's asynchronous programming tools to get the most out of your +hardware. + +## What This Book Covers + +This book aims to be a comprehensive, up-to-date guide to using Rust's async +language features and libraries, appropriate for beginners and old hands alike. + +- The early chapters provide an introduction to async programming in general, +and to Rust's particular take on it. + +- The middle chapters discuss key utilities and control-flow tools you can use +when writing async code, and describe best-practices for structuring libraries +and applications to maximize performance and reusability. + +- The last section of the book covers the broader async ecosystem, and provides +a number of examples of how to accomplish common tasks. + +With that out of the way, let's explore the exciting world of Asynchronous +Programming in Rust! diff --git a/src/getting_started/http_server_example.md b/src/getting_started/http_server_example.md new file mode 100644 index 0000000..1494cd5 --- /dev/null +++ b/src/getting_started/http_server_example.md @@ -0,0 +1,199 @@ +# Applied: Simple HTTP Server + +Let's use `async`/`await!` to build an echo server! + +To start, run `rustup update nightly` to make sure you've got the latest and +greatest copy of Rust-- we're working with bleeding-edge features, so it's +essential to stay up-to-date. Once you've done that, run +`cargo +nightly new async-await-echo` to create a new project, and open up +the resulting `async-await-echo` folder. + +Let's add some dependencies to the `Cargo.toml` file: + +```toml +[dependencies] + +# The latest version of the "futures" library, which has lots of utilities +# for writing async code. Enable the "tokio-compat" feature to include the +# functions for using futures 0.3 and async/await with the Tokio library. +futures-preview = { version = "0.3.0-alpha.9", features = ["tokio-compat"] } + +# Hyper is an asynchronous HTTP library. We'll use it to power our HTTP +# server and to make HTTP requests. +hyper = "0.12.9" + +# Tokio is a runtime for asynchronous I/O applications. Hyper uses +# it for the default server runtime. The `tokio` crate also provides an +# an `await!` macro similar to the one in `std`, but it supports `await!`ing +# both futures 0.1 futures (the kind used by Hyper and Tokio) and +# futures 0.3 futures (the kind produced by the new `async`/`await!` language +# feature). +tokio = { version = "0.1.11", features = ["async-await-preview"] } +``` + +Now that we've got our dependencies out of the way, let's start writing some +code. Open up `src/main.rs` and enable the following features at the top of +the file: + +```rust +#![feature(async_await, await_macro, futures_api)] +``` + +- `async_await` adds support for the `async fn` syntax. +- `await_macro` adds support for the `await!` macro. +- `futures_api` adds support for the nightly `std::future` and `std::task` +modules which define the core `Future` trait and dependent types. + +Additionally, we have some imports to add: + +```rust +use { + hyper::{ + // Miscellaneous types from Hyper for working with HTTP. + Body, Client, Request, Response, Server, Uri, + + // This function turns a closure which returns a future into an + // implementation of the the Hyper `Service` trait, which is an + // asynchronous function from a generic `Request` to a `Response`. + service::service_fn, + + // A function which runs a future to completion using the Hyper runtime. + rt::run, + }, + futures::{ + // `TokioDefaultSpawner` tells futures 0.3 futures how to spawn tasks + // onto the Tokio runtime. + compat::TokioDefaultSpawner, + + // Extension traits providing additional methods on futures. + // `FutureExt` adds methods that work for all futures, whereas + // `TryFutureExt` adds methods to futures that return `Result` types. + future::{FutureExt, TryFutureExt}, + }, + std::net::SocketAddr, + + // This is the redefinition of the await! macro which supports both + // futures 0.1 (used by Hyper and Tokio) and futures 0.3 (the new API + // exposed by `std::future` and implemented by `async fn` syntax). + tokio::await, +}; +``` + +Once the imports are out of the way, we can start putting together the +boilerplate to allow us to serve requests: + +```rust +async fn serve_req(req: Request) -> Result, hyper::Error> { + unimplemented!() +} + +async fn run_server(addr: SocketAddr) { + println!("Listening on http://{}", addr); + + // Create a server bound on the provided address + let serve_future = Server::bind(&addr) + // Serve requests using our `async serve_req` function. + // `serve` takes a closure which returns a type implementing the + // `Service` trait. `service_fn` returns a value implementing the + // `Service` trait, and accepts a closure which goes from request + // to a future of the response. In order to use our `serve_req` + // function with Hyper, we have to box it and put it in a compatability + // wrapper to go from a futures 0.3 future (the kind returned by + // `async fn`) to a futures 0.1 future (the kind used by Hyper). + .serve(|| service_fn(|req| + serve_req(req).boxed().compat(TokioDefaultSpawner) + )); + + // Wait for the server to complete serving or exit with an error. + // If an error occurred, print it to stderr. + if let Err(e) = await!(serve_future) { + eprintln!("server error: {}", e); + } +} + +fn main() { + // Set the address to run our socket on. + let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); + + // Call our run_server function, which returns a future. + // As with every `async fn`, we need to run that future in order for + // `run_server` to do anything. Additionally, since `run_server` is an + // `async fn`, we need to convert it from a futures 0.3 future into a + // futures 0.1 future. + let futures_03_future = run_server(addr); + let futures_01_future = + futures_03_future.unit_error().boxed().compat(TokioDefaultSpawner); + + // Finally, we can run the future to completion using the `run` function + // provided by Hyper. + run(futures_01_future); +} +``` + +If you `cargo run` now, you should see the message "Listening on +http://127.0.0.1:300" printed on your terminal. If you open that URL in your +browser of choice, you'll see "thread ... panicked at 'not yet implemented'." +Great! Now we just need to actually handle requests. To start, let's just +return a static message: + +```rust +async fn serve_req(req: Request) -> Result, hyper::Error> { + // Always return successfully with a response containing a body with + // a friendly greeting ;) + Ok(Response::new(Body::from("hello, world!"))) +} +``` + +If you `cargo run` again and refresh the page, you should see "hello, world!" +appear in your browser. Congratulations! You just wrote your first asynchronous +webserver in Rust. + +You can also inspect the request itself, which contains information such as +the request URI, HTTP version, headers, and other metadata. For example, we +can print out the URI of the request like this: + +```rust +println!("Got request at {:?}", req.uri()); +``` + +You may have noticed that we're not yet doing +anything asynchronous when handling the request-- we just respond immediately, +so we're not taking advantage of the flexibility that `async fn` gives us. +Rather than just returning a static message, let's try proxying the user's +request to another website using Hyper's HTTP client. + +We start by parsing out the URL we want to request: + +```rust +let url_str = "http://www.rust-lang.org/en-US/"; +let url = url_str.parse::().expect("failed to parse URL"); +``` + +Then we can create a new `hyper::Client` and use it to make a `GET` request, +returning the response to the user: + +```rust +let res = await!(Client::new().get(url)); +// Return the result of the request directly to the user +println!("request finished --returning response"); +res +``` + +`Client::get` returns a `hyper::client::FutureResponse`, which implements +`Future>` +(or `Future` in futures 0.1 terms). +When we `await!` that future, an HTTP request is sent out, the current task +is suspended, and the task is queued to be continued once a response has +become available. + +Now, if you `cargo run` and open `http://127.0.0.1:3000/foo` in your browser, +you'll see the Rust homepage, and the following terminal output: + +``` +Listening on http://127.0.0.1:3000 +Got request at /foo +making request to http://www.rust-lang.org/en-US/ +request finished-- returning response +``` + +Congratulations! You just proxied an HTTP request. diff --git a/src/getting_started/state_of_async_rust.md b/src/getting_started/state_of_async_rust.md new file mode 100644 index 0000000..e69de29 diff --git a/src/getting_started/why_async.md b/src/getting_started/why_async.md new file mode 100644 index 0000000..a92970d --- /dev/null +++ b/src/getting_started/why_async.md @@ -0,0 +1,61 @@ +## Why Async? + +We all love how Rust allows us to write fast, safe software. But why write +asynchronous code? + +Asynchonous code allows us to run multiple tasks concurrently on the same OS +thread. In a typical threaded application, if you wanted to download two +different webpages at the same time, you would spread the work across two +different threads, like this: + +```rust +fn get_two_sites() { + // Spawn two threads to do work. + let thread_one = thread::spawn(|| download("https:://www.foo.com")); + let thread_two = thread::spawn(|| download("https:://www.bar.com")); + + // Wait for both threads to complete. + thread_one.join(); + thread_two.join(); +} +``` + +This works fine for many applications-- after, all threads were designed +to do just this: run multiple different tasks at once. However, they also +come with some limitations. There's a lot of overhead involved in the +process of switching between different threads and sharing data between +threads. Even a thread which just sits and does nothing uses up valuable +system resources. These are the costs that asynchronous code is designed +to eliminate. We can rewrite the function above using Rust's +`async`/`await!` notation, which will allow us to run multiple tasks at +once without creating multiple threads: + +```rust +async fn get_two_sites() { + // Create a two different "futures" which, when run to completion, + // will asynchronously download the webpages. + let future_one = download_async("https:://www.foo.com"); + let future_two = download_async("https:://www.bar.com"); + + // Run both futures to completion at the same time. + join!(future_one, future_two); +} +``` + +Overall, asynchronous applications have the potential to be much faster and +use fewer resources than a corresponding threaded implementation. However, +there is a cost. Threads are natively supported by the operating system, +and using them doesn't require any special programming model-- any function +can create a thread, and calling a function that uses threads is usually +just as easy as calling any normal function. However, asynchronous functions +require special support from the language or libraries in order to work. +In Rust, `async fn` creates an asynchronous function which, when called, +will return a future which needs to be run to completion in order for the +body of the function to execute. + +It's important to remember that traditional threaded applications can be quite +effective, and that Rust's small memory footprint and predictability mean that +you can get far without ever using `async`. The increased complexity of the +asynchronous programming model isn't always worth it, and it's important to +consider whether your application would be better served by using a simpler +threaded model. diff --git a/src/pinning/chapter.md b/src/pinning/chapter.md new file mode 100644 index 0000000..9a99963 --- /dev/null +++ b/src/pinning/chapter.md @@ -0,0 +1,138 @@ +# Pinning + +In order to poll futures, they must be pinned using a special type called +`Pin`. If you read the explanation of [the `Future` trait] in the +previous section ["Executing `Future`s and Tasks"], you'll recognise +`Pin` from the `self: Pin<&mut Self>` in the `Future:poll` method's definition. +But what does it mean, and why do we need it? + +## Why Pinning + +Pinning makes it possible to guarantee that an object won't ever be moved. +To understand why this is necessary, we need remember how `async`/`await!` +works. Consider the following code: + +```rust +async { + await!(fut_one); + await!(fut_two); +} +``` + +Under the hood, this creates an anonymous type that implements `Future`, +providing a `poll` method that looks something like this: + +```rust +// The `Future` type generated by our `async { ... }` block +struct AsyncFuture { + fut_one: FutOne, + fut_two: FutTwo, + state: State, +} + +// List of states our `async` block can be in +enum State { + AwaitingFutOne, + AwaitingFutTwo, + Done, +} + +impl AsyncFuture { + fn poll(...) -> Poll<()> { + loop { + match self.state { + State::AwaitingFutOne => match self.fut_one.poll(..) { + Poll::Ready(()) => self.state = State::AwaitingFutTwo, + Poll::Pending => return Poll::Pending, + } + State::AwaitingFutTwo => match self.fut_two.poll(..) { + Poll::Ready(()) => self.state = State::Done, + Poll::Pending => return Poll::Pending, + } + State::Done => return Poll::Ready(()), + } + } + } +} +``` + + +When `poll` is first called, it will poll `fut_one`. If `fut_one` can't +complete, `AsyncFuture::poll` will return. Future calls to `poll` will pick +up where the previous one left off. This process continues until the future +is able to successfully complete. + +However, what happens if we have an `async` block that uses references? +For example: + +```rust +async { + let mut x = [0; 128]; + let read_into_buf_fut = read_into_buf(&mut x); + await!(read_into_buf_fut); + println!("{:?}", x); +} +``` + +What struct does this compile down to? + +```rust +struct ReadIntoBuf<'a> { + buf: &'a mut [u8], // points to `x` below +} + +struct AsyncFuture { + x: [u8; 128], + read_into_buf_fut: ReadIntoBuf<'what_lifetime?>, +} +``` + +Here, the `ReadIntoBuf` future holds a reference into the other field of our +structure, `x`. However, if `AsyncFuture` is moved, the location of `x` will +move as well, invalidating the pointer stored in `read_into_buf_fut.buf`. + +Pinning futures to a particular spot in memory prevents this problem, making +it safe to create references to values inside an `async` block. + +## How to Use Pinning + +The `Pin` type wraps pointer types, guaranteeing that the values behind the +pointer won't be moved. For example, `Pin<&mut T>`, `Pin<&T>`, +`Pin>` all guarantee that `T` won't be moved. + +Most types don't have a problem being moved. These types implement a trait +called `Unpin`. Pointers to `Unpin` types can be freely placed into or taken +out of `Pin`. For example, `u8` is `Unpin`, so `Pin<&mut T>` behaves just like +a normal `&mut T`. + +Some functions require the futures they work with to be `Unpin`. To use a +`Future` or `Stream` that isn't `Unpin` with a function that requires +`Unpin` types, you'll first have to pin the value using either +`Box::pinned` (to create a `Pin>`) or the `pin_utils::pin_mut!` macro +(to create a `Pin<&mut T>`). `Pin>` and `Pin<&mut Fut>` can both be +used as futures, and both implement `Unpin`. + +For example: + +```rust +use pin_utils::pin_mut; // `pin_utils` is a handy crate available on crates.io + +// A function which takes a `Future` that implements `Unpin`. +fn execute_unpin_future(x: impl Future + Unpin) { ... } + +let fut = async { ... }; +execute_unpin_future(fut); // Error: `fut` does not implement `Unpin` trait + +// Pinning with `Box`: +let fut = async { ... }; +let fut = Box::pinned(fut); +execute_unpin_future(fut); // OK + +// Pinning with `pin_mut!`: +let fut = async { ... }; +pin_mut!(fut); +execute_unpin_future(fut); // OK +``` + +["Executing `Future`s and Tasks"]: TODO +[the `Future` trait]: TODO diff --git a/src/streams/chapter.md b/src/streams/chapter.md new file mode 100644 index 0000000..bf5d4b8 --- /dev/null +++ b/src/streams/chapter.md @@ -0,0 +1,104 @@ +# The `Stream` Trait + +The `Stream` trait is similar to `Future` but can yield multiple values before +completing, similar to the `Iterator` trait from the standard library: + +```rust +trait Stream { + /// The type of value yielded by the stream. + type Item; + + /// Attempt to resolve the next item in the stream. + /// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value + /// is ready, and `Poll::Ready(None)` if the stream has completed. + fn poll_next(self: Pin<&mut Self>, lw: &LocalWaker) + -> Poll>; +} +``` + +One common example of a `Stream` is the `Receiver` for the channel type from +the `futures` crate. It will yield `Some(val)` every time a value is sent +from the `Sender` end, and will yield `None` once the `Sender` has been +dropped and all pending messages have been received: + +```rust +use futures::channel::mpsc; +use futures::prelude::*; + +let fut = async { + let (tx, rx) = mpsc::channel(BUFFER_SIZE); + await!(tx.send(1)).unwrap(); + await!(tx.send(2)).unwrap(); + drop(tx); + + // `StreamExt::next` is similar to `Iterator::next`, but returns a + // type that implements `Future>`. + assert_eq!(Some(1), await!(rx.next())); + assert_eq!(Some(2), await!(rx.next())); + assert_eq!(None, await!(rx.next())); +}; +``` + +## Patterns: Iteration and Concurrency + +Similar to synchronous `Iterator`s, there are many different ways to iterate +over and process the values in a `Stream`. There are combinator-style methods +such as `map`, `filter`, and `fold`, and their early-exit-on-error cousins +`try_map`, `try_filter`, and `try_fold`. + +Unfortunately, `for` loops are not yet usable with `Stream`s, but for +imperative-style code, `while let` and `.for_each` are available: + +```rust +use futures::prelude::*; + +let fut = async { + let mut stream: impl Stream> = ...; + + // processing with `try_for_each`: + await!(stream.try_for_each(async |item| { + // handle `item` + Ok(()) + }))?; + + // processing with `while let`: + while let Some(item) = await!(stream.try_next())? { + // handle `item` + } + + ... + + Ok(()) +}; +``` + +However, if we're just processing one element at a time, we're potentially +leaving behind opportunity for concurrency, which is, after all, why we're +writing async code in the first place. To process multiple items from a stream +concurrently, use the `for_each_concurrent` and `try_for_each_concurrent` +methods: + +```rust +use futures::prelude::*; + +let fut = async { + let mut stream: impl Stream> = ...; + + await!(stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, async |num| { + await!(jump_n_times(num))?; + await!(report_jumps(num))?; + Ok(()) + })?; + + ... + Ok(()) +}; +``` + +This approach allows up to `MAX_CONCURRENT_JUMPERS` to all be jumping at once +(or performing any operation on the items, for that matter-- the API isn't +strictly tied to jumping). If you want to allow an unlimited number of +operations at once, you can use `None` rather than `MAX_CONCURRENT_...`, but +beware that if `stream` comes from untrusted user input, this can allow +badly behaved clients to overload the system with too many simultaneous +requests.