More docs

This commit is contained in:
Stjepan Glavina 2020-04-25 23:19:30 +02:00
parent e8f797bb9b
commit 6a02cbb054
10 changed files with 161 additions and 68 deletions

View File

@ -10,12 +10,10 @@ https://crates.io/crates/smol)
https://docs.rs/smol)
[![chat](https://img.shields.io/discord/701824908866617385.svg?logo=discord)](https://discord.gg/5RxMVnr)
**NOTE:** Still a work in progress.
A small and fast async runtime for Rust.
This runtime extends [the standard library][std] with async combinators
and is only 1500 lines of code.
and is only 1500 lines of code long.
[std]: https://docs.rs/std

View File

@ -1,5 +1,17 @@
//! The blocking executor.
//!
//! Tasks created by [`Task::blocking()`] go into this executor. This executor is independent of
//! [`run()`][`crate::run()`] - it does not need to be driven.
//!
//! Blocking tasks are allowed to block without restrictions. However, the executor puts a limit on
//! the number of concurrently running tasks. Once that limit is hit, a task will need to complete
//! or yield in order for others to run.
//!
//! In idle state, this executor has no threads and consumes no resources. Once tasks are spawned,
//! new threads will get started, as many as is needed to keep up with the present amount of work.
//! When threads are idle, they wait for some time for new work to come in and shut down after a
//! certain timeout.
//!
//! This module also implements convenient adapters:
//!
//! - [`blocking!`] as syntax sugar around [`Task::blocking()`]
@ -25,22 +37,28 @@ use crate::context;
use crate::task::{Runnable, Task};
use crate::throttle;
// TODO: docs
/// A thread pool for blocking tasks.
/// The blocking executor.
pub(crate) struct BlockingExecutor {
/// The current state of the executor.
state: Mutex<State>,
/// Used to put idle threads to sleep and wake them up when new work comes in.
cvar: Condvar,
}
/// Current state of the blocking executor.
struct State {
/// Number of sleeping threads in the pool.
/// Number of idle threads in the pool.
///
/// Idle threads are sleeping, waiting to get a task to run.
idle_count: usize,
/// Total number of thread in the pool.
///
/// This is the number of idle threads + the number of active threads.
thread_count: usize,
/// Runnable blocking tasks.
/// The queue of blocking tasks.
queue: VecDeque<Runnable>,
}
@ -60,7 +78,7 @@ impl BlockingExecutor {
/// Spawns a future onto this executor.
///
/// Returns a `Task` handle for the spawned task.
/// Returns a [`Task`] handle for the spawned task.
pub fn spawn<T: Send + 'static>(
&'static self,
future: impl Future<Output = T> + Send + 'static,
@ -72,26 +90,36 @@ impl BlockingExecutor {
}
/// Runs the main loop on the current thread.
///
/// This function runs blocking tasks until it becomes idle and times out.
fn main_loop(&'static self) {
let mut state = self.state.lock().unwrap();
loop {
// This thread is not idle anymore because it's going to run tasks.
state.idle_count -= 1;
// Run tasks in the queue.
while let Some(runnable) = state.queue.pop_front() {
// We have found a task - grow the pool if needed.
self.grow_pool(state);
// Run the task.
let _ = panic::catch_unwind(|| runnable.run());
// Re-lock the state and continue.
state = self.state.lock().unwrap();
}
// Put the thread to sleep until another task is scheduled.
// This thread is now becoming idle.
state.idle_count += 1;
// Put the thread to sleep until another task is scheduled.
let timeout = Duration::from_millis(500);
let (s, res) = self.cvar.wait_timeout(state, timeout).unwrap();
state = s;
// If there are no tasks after a while, stop this thread.
if res.timed_out() && state.queue.is_empty() {
// If there are no tasks after a while, stop this thread.
state.idle_count -= 1;
state.thread_count -= 1;
break;
@ -103,6 +131,7 @@ impl BlockingExecutor {
fn schedule(&'static self, runnable: Runnable) {
let mut state = self.state.lock().unwrap();
state.queue.push_back(runnable);
// Notify a sleeping thread and spawn more threads if needed.
self.cvar.notify_one();
self.grow_pool(state);
@ -113,10 +142,14 @@ impl BlockingExecutor {
// If runnable tasks greatly outnumber idle threads and there aren't too many threads
// already, then be aggressive: wake all idle threads and spawn one more thread.
while state.queue.len() > state.idle_count * 5 && state.thread_count < 500 {
// The new thread starts in idle state.
state.idle_count += 1;
state.thread_count += 1;
// Notify all existing idle threads because we need to hurry up.
self.cvar.notify_all();
// Spawn the new thread.
thread::spawn(move || {
// If enabled, set up tokio before the main loop begins.
context::enter(|| self.main_loop())
@ -127,7 +160,8 @@ impl BlockingExecutor {
/// Spawns blocking code onto a thread.
///
/// TODO
/// Note that `blocking!(expr)` is just syntax sugar for
/// `Task::blocking(async move { foo }).await`.
///
/// # Examples
///
@ -148,19 +182,22 @@ macro_rules! blocking {
/// Creates a stream that iterates on a thread.
///
/// TODO
/// This adapter converts any kind of synchronous iterator into an asynchronous stream by running
/// it on the blocking executor and sending items back over a channel.
///
/// # Examples
///
/// List files in the current directory:
///
/// ```no_run
/// use smol::blocking;
/// use std::fs;
/// use futures::stream::StreamExt;
/// use smol::{blocking, iter};
/// use std::fs;
///
/// # smol::run(async {
/// let mut dir = smol::iter(blocking!(fs::read_dir("."))?);
/// //
/// let mut dir = blocking!(fs::read_dir("."))?;
/// let mut dir = iter(dir);
///
/// while let Some(res) = dir.next().await {
/// println!("{}", res?.file_name().to_string_lossy());
@ -232,6 +269,29 @@ pub fn iter<T: Send + 'static>(
}
/// Creates an async reader that runs on a thread.
///
/// This adapter converts any kind of synchronous reader into an asynchronous reader by running it
/// on the blocking executor and sending bytes back over a pipe.
///
/// # Examples
///
/// Create an async reader that reads a file:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::{blocking, reader};
/// use std::fs::File;
///
/// # smol::run(async {
/// // Open a file for reading.
/// let file = blocking!(File::open("foo.txt"))?;
/// let mut file = reader(file);
///
/// // Read the whole file.
/// let mut contents = Vec::new();
/// file.read_to_end(&mut contents).await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn reader(reader: impl Read + Send + 'static) -> impl AsyncRead + Send + Unpin + 'static {
/// Current state of the reader.
enum State<T> {
@ -301,9 +361,30 @@ pub fn reader(reader: impl Read + Send + 'static) -> impl AsyncRead + Send + Unp
/// Creates an async writer that runs on a thread.
///
/// Make sure to flush at the end.
/// This adapter converts any kind of synchronous writer into an asynchronous writer by running it
/// on the blocking executor and receiving bytes over a pipe.
///
/// TODO
/// **Note:** Don't forget to flush the writer at the end, or some written bytes might get lost!
///
/// # Examples
///
/// Create an async writer that writes into a file:
///
/// ```no_run
/// use futures::prelude::*;
/// use smol::{blocking, writer};
/// use std::fs::File;
///
/// # smol::run(async {
/// // Open a file for writing.
/// let file = blocking!(File::open("foo.txt"))?;
/// let mut file = writer(file);
///
/// // Write some bytes into the file and flush.
/// file.write_all(b"hello").await?;
/// file.flush().await?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn writer(writer: impl Write + Send + 'static) -> impl AsyncWrite + Send + Unpin + 'static {
/// Current state of the writer.
enum State<T> {

View File

@ -40,7 +40,7 @@
//!
//! Connect to a HTTP website, make a GET request, and pipe the response to the standard output:
//!
//! ```
//! ```no_run
//! use futures::prelude::*;
//! use smol::Async;
//! use std::net::TcpStream;

View File

@ -43,8 +43,8 @@ use crate::throttle;
/// The reactor.
///
/// Every async I/O handle and every timer is registered here. Invocations of [`run()`] poll the
/// reactor to check for new events every now and then.
/// Every async I/O handle and every timer is registered here. Invocations of
/// [`run()`][`crate::run()`] poll the reactor to check for new events every now and then.
///
/// There is only one global instance of this type, accessible by [`Reactor::get()`].
pub(crate) struct Reactor {
@ -354,7 +354,7 @@ impl Source {
}
}
/// Converts a `nix::Error` into `std::io::Error`.
/// Converts a [`nix::Error`] into [`io::Error`].
#[cfg(unix)]
fn io_err(err: nix::Error) -> io::Error {
match err {

View File

@ -1,4 +1,6 @@
//! TODO
//! Implementation of [`run()`].
//!
//! This function is the entry point to the smol executor.
use std::future::Future;
use std::task::{Context, Poll};

View File

@ -1,4 +1,6 @@
//! TODO
//! The task system.
//!
//! A [`Task`] handle represents a spawned future that is run by the executor.
use std::fmt::Debug;
use std::future::Future;

View File

@ -1,6 +1,8 @@
//! The thread-local executor.
//!
//! TODO
//! Tasks created by [`Task::local()`] go into this executor. Every thread calling
//! [`run()`][`crate::run()`] creates a thread-local executor. Tasks cannot be spawned onto a
//! thread-local executor if it is not running.
use std::cell::RefCell;
use std::collections::VecDeque;
@ -15,15 +17,17 @@ use crate::io_event::IoEvent;
use crate::task::{Runnable, Task};
use crate::throttle;
// The thread-local executor.
//
// This thread-local is only set while inside `ThreadLocalExecutor::enter()`.
scoped_thread_local!(static EXECUTOR: ThreadLocalExecutor);
scoped_thread_local! {
/// The thread-local executor.
///
/// This thread-local is only set while inside [`ThreadLocalExecutor::enter()`].
static EXECUTOR: ThreadLocalExecutor
}
/// An executor for thread-local tasks.
///
/// Thread-local tasks are spawned by calling `Task::local()` and their futures do not have to
/// implement `Send`. They can only be run by the same thread that created them.
/// Thread-local tasks are spawned by calling [`Task::local()`] and their futures do not have to
/// implement [`Send`]. They can only be run by the same thread that created them.
pub(crate) struct ThreadLocalExecutor {
/// The main task queue.
queue: RefCell<VecDeque<Runnable>>,
@ -60,7 +64,7 @@ impl ThreadLocalExecutor {
/// Spawns a future onto this executor.
///
/// Returns a `Task` handle for the spawned task.
/// Returns a [`Task`] handle for the spawned task.
pub fn spawn<T: 'static>(future: impl Future<Output = T> + 'static) -> Task<T> {
if !EXECUTOR.is_set() {
panic!("cannot spawn a thread-local task if not inside an executor");

View File

@ -1,24 +1,28 @@
//! Throttle tasks if they poll too many I/O operations without yielding.
//!
//! TODO
//! This is used to prevent futures from running forever. Once a certain number of I/O operation is
//! hit in a single run, I/O operations will begin returning
//! [`Poll::Pending`][`std::task::Poll::Pending`] even if they're ready.
use std::cell::Cell;
use std::task::{Context, Poll};
use scoped_tls_hkt::scoped_thread_local;
// Number of times the current task is allowed to poll I/O operations.
//
// When this budget is used up, I/O operations will wake the current task and return
// `Poll::Pending`.
//
// This thread-local is set before running any task.
scoped_thread_local!(static BUDGET: Cell<u32>);
scoped_thread_local! {
/// Number of times the current task is allowed to poll I/O operations.
///
/// When this budget is used up, I/O operations will wake the current task and return
/// [`Poll::Pending`][`std::task::Poll::Pending`].
///
/// This thread-local is set before running any task.
static BUDGET: Cell<u32>
}
/// Sets an I/O budget for polling a future.
///
/// Once this budget is exceeded, polled I/O operations will always wake the current task and
/// return `Poll::Pending`.
/// return [`Poll::Pending`][`std::task::Poll::Pending`].
///
/// We throttle I/O this way in order to prevent futures from running for
/// too long and thus starving other futures.
@ -27,7 +31,9 @@ pub(crate) fn setup<T>(poll: impl FnOnce() -> T) -> T {
BUDGET.set(&Cell::new(200), poll)
}
/// Returns `Poll::Pending` if the I/O budget has been used up.
/// Returns [`Poll::Pending`] if the I/O budget has been used up.
///
/// [`Poll::Pending`]: `std::task::Poll::Pending`
pub(crate) fn poll(cx: &mut Context<'_>) -> Poll<()> {
// Decrement the budget and check if it was zero.
if BUDGET.is_set() && BUDGET.with(|b| b.replace(b.get().saturating_sub(1))) == 0 {

View File

@ -1,4 +1,6 @@
//! TODO
//! Implementation of [`Timer`].
//!
//! Timers are futures that fire at a predefined point in time.
use std::fmt::Debug;
use std::future::Future;

View File

@ -1,27 +1,22 @@
//! The work-stealing executor.
//!
//! Tasks created by [`Task::spawn()`] go into this executor. Any thread calling [`run()`]
//! initializes a `Worker` that participates in work stealing, which is allowed to run any task in
//! this executor or in other workers.
//!
//! Since tasks can be stolen by any worker and thus move from thread to thread, their futures must
//! implement [`Send`].
//! Tasks created by [`Task::spawn()`] go into this executor. Every thread calling [`run()`]
//! initializes a [`Worker`] that participates in work stealing, which is allowed to run any task in
//! this executor or in other workers. Since tasks can be stolen by any worker and thus move from
//! thread to thread, their futures must implement [`Send`].
//!
//! There is only one global instance of this type, accessible by [`WorkStealingExecutor::get()`].
//!
//! Work stealing is a strategy that reduces contention in a multi-threaded environment. If all
//! invocations of [`run()`] used the same global task queue all the time, they would constantly
//! "step on each other's toes", causing a lot of CPU cache traffic and too often waste time
//! retrying queue operations in compare-and-swap loops.
//! [Work stealing] is a strategy that reduces contention in multi-threaded environments. If all
//! invocations of [`run()`] used the same global task queue all the time, they would contend on
//! the queue all the time, thus slowing the executor down.
//!
//! The solution is to have a separate queue in each invocation of [`run()`], called a "worker".
//! Each thread is primarily using its own worker. Once there are no more tasks in the worker, we
//! either grab a batch of tasks from the main global queue ("injector"), or steal tasks from other
//! workers. Of course, work-stealing still causes contention in some cases, but much less often.
//!
//! More about work stealing: https://en.wikipedia.org/wiki/Work_stealing
//! The solution is to have a separate queue for each invocation of [`run()`], called a "worker".
//! Each thread is primarily using its own worker. Once all tasks in the worker are exhausted, then
//! we look for tasks in the global queue, called "injector", or steal tasks from other workers.
//!
//! [`run()`]: crate::run()
//! [Work stealing]: https://en.wikipedia.org/wiki/Work_stealing
use std::cell::Cell;
use std::future::Future;
@ -38,17 +33,20 @@ use crate::io_event::IoEvent;
use crate::task::{Runnable, Task};
use crate::throttle;
// The current thread's worker.
//
// Other threads may steal tasks from this worker through its associated stealer that was
// registered in the work-stealing executor.
//
// This thread-local is only set while inside `WorkStealingExecutor::enter()`.
scoped_thread_local!(static WORKER: for<'a> &'a Worker<'a>);
scoped_thread_local! {
/// The current thread's worker.
///
/// Other threads may steal tasks from this worker through its associated stealer that was
/// registered in the work-stealing executor.
///
/// This thread-local is only set while inside [`Worker::enter()`].
static WORKER: for<'a> &'a Worker<'a>
}
/// The global work-stealing executor.
pub(crate) struct WorkStealingExecutor {
/// When a thread that is not inside `run()` spawns or wakes a task, it goes into this queue.
/// When a thread that is not inside [`run()`][`crate::run()`] spawns or wakes a task, it goes
/// into this queue.
injector: deque::Injector<Runnable>,
/// Registered handles for stealing tasks from workers.
@ -76,7 +74,7 @@ impl WorkStealingExecutor {
/// Spawns a future onto this executor.
///
/// Returns a `Task` handle for the spawned task.
/// Returns a [`Task`] handle for the spawned task.
pub fn spawn<T: Send + 'static>(
&'static self,
future: impl Future<Output = T> + Send + 'static,