mirror of https://github.com/smol-rs/async-task
Docs and comments
This commit is contained in:
parent
77a70b5f5d
commit
2404b1b32e
|
@ -18,8 +18,7 @@ std = []
|
|||
|
||||
[dev-dependencies]
|
||||
atomic-waker = "1.0.0"
|
||||
concurrent-queue = "1.2.2"
|
||||
easy-parallel = "3.1.0"
|
||||
flume = { version = "0.9.0", default-features = false }
|
||||
futures-lite = "1.7.0"
|
||||
once_cell = "1.4.1"
|
||||
smol = "1.0.1"
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
|
||||
extern crate test;
|
||||
|
||||
use futures_lite::future;
|
||||
use smol::future;
|
||||
use test::Bencher;
|
||||
|
||||
#[bench]
|
||||
|
|
|
@ -7,7 +7,7 @@ use std::rc::Rc;
|
|||
use async_task::{Runnable, Task};
|
||||
|
||||
thread_local! {
|
||||
// A channel that holds scheduled tasks.
|
||||
// A queue that holds scheduled tasks.
|
||||
static QUEUE: (flume::Sender<Runnable>, flume::Receiver<Runnable>) = flume::unbounded();
|
||||
}
|
||||
|
||||
|
@ -17,11 +17,11 @@ where
|
|||
F: Future<Output = T> + 'static,
|
||||
T: 'static,
|
||||
{
|
||||
// Create a task that is scheduled by sending itself into the channel.
|
||||
// Create a task that is scheduled by pushing itself into the queue.
|
||||
let schedule = |t| QUEUE.with(|(s, _)| s.send(t).unwrap());
|
||||
let (runnable, task) = async_task::spawn_local(future, schedule);
|
||||
|
||||
// Schedule the task by sending it into the queue.
|
||||
// Schedule the task by pushing it into the queue.
|
||||
runnable.schedule();
|
||||
|
||||
task
|
||||
|
|
|
@ -5,7 +5,7 @@ use std::sync::Arc;
|
|||
use std::thread;
|
||||
|
||||
use async_task::Task;
|
||||
use futures_lite::future;
|
||||
use smol::future;
|
||||
|
||||
/// Spawns a future on a new dedicated thread.
|
||||
///
|
||||
|
@ -27,7 +27,7 @@ where
|
|||
future.await
|
||||
};
|
||||
|
||||
// Create a task that is scheduled by sending itself into the channel.
|
||||
// Create a task that is scheduled by sending it into the channel.
|
||||
let schedule = move |t| s.upgrade().unwrap().send(t).unwrap();
|
||||
let (runnable, task) = async_task::spawn(future, schedule);
|
||||
|
||||
|
|
|
@ -5,8 +5,8 @@ use std::panic::catch_unwind;
|
|||
use std::thread;
|
||||
|
||||
use async_task::{Runnable, Task};
|
||||
use futures_lite::future;
|
||||
use once_cell::sync::Lazy;
|
||||
use smol::future;
|
||||
|
||||
/// Spawns a future on the executor.
|
||||
fn spawn<F, T>(future: F) -> Task<T>
|
||||
|
@ -14,7 +14,7 @@ where
|
|||
F: Future<Output = T> + Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
// A channel that holds scheduled tasks.
|
||||
// A queue that holds scheduled tasks.
|
||||
static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
|
||||
let (sender, receiver) = flume::unbounded::<Runnable>();
|
||||
|
||||
|
@ -29,11 +29,11 @@ where
|
|||
sender
|
||||
});
|
||||
|
||||
// Create a task that is scheduled by sending itself into the channel.
|
||||
// Create a task that is scheduled by pushing it into the queue.
|
||||
let schedule = |t| QUEUE.send(t).unwrap();
|
||||
let (runnable, task) = async_task::spawn(future, schedule);
|
||||
|
||||
// Schedule the task by sending it into the channel.
|
||||
// Schedule the task by pushing it into the queue.
|
||||
runnable.schedule();
|
||||
|
||||
task
|
||||
|
|
|
@ -9,14 +9,14 @@ use crate::utils::abort_on_panic;
|
|||
|
||||
/// The header of a task.
|
||||
///
|
||||
/// This header is stored right at the beginning of every heap-allocated task.
|
||||
/// This header is stored in memory at the beginning of the heap-allocated task.
|
||||
pub(crate) struct Header {
|
||||
/// Current state of the task.
|
||||
///
|
||||
/// Contains flags representing the current state and the reference count.
|
||||
pub(crate) state: AtomicUsize,
|
||||
|
||||
/// The task that is blocked on the `Task`.
|
||||
/// The task that is blocked on the `Task` handle.
|
||||
///
|
||||
/// This waker needs to be woken up once the task completes or is closed.
|
||||
pub(crate) awaiter: UnsafeCell<Option<Waker>>,
|
||||
|
@ -29,49 +29,24 @@ pub(crate) struct Header {
|
|||
}
|
||||
|
||||
impl Header {
|
||||
/// Cancels the task.
|
||||
///
|
||||
/// This method will mark the task as closed, but it won't reschedule the task or drop its
|
||||
/// future.
|
||||
pub(crate) fn cancel(&self) {
|
||||
let mut state = self.state.load(Ordering::Acquire);
|
||||
|
||||
loop {
|
||||
// If the task has been completed or closed, it can't be canceled.
|
||||
if state & (COMPLETED | CLOSED) != 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
// Mark the task as closed.
|
||||
match self.state.compare_exchange_weak(
|
||||
state,
|
||||
state | CLOSED,
|
||||
Ordering::AcqRel,
|
||||
Ordering::Acquire,
|
||||
) {
|
||||
Ok(_) => break,
|
||||
Err(s) => state = s,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Notifies the awaiter blocked on this task.
|
||||
///
|
||||
/// If the awaiter is the same as the current waker, it will not be notified.
|
||||
#[inline]
|
||||
pub(crate) fn notify(&self, current: Option<&Waker>) {
|
||||
// Mark the awaiter as being notified.
|
||||
// Set the bit indicating that the task is notifying its awaiter.
|
||||
let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel);
|
||||
|
||||
// If the awaiter was not being notified nor registered...
|
||||
// If the task was not notifying or registering an awaiter...
|
||||
if state & (NOTIFYING | REGISTERING) == 0 {
|
||||
// Take the waker out.
|
||||
let waker = unsafe { (*self.awaiter.get()).take() };
|
||||
|
||||
// Mark the state as not being notified anymore nor containing an awaiter.
|
||||
// Unset the bit indicating that the task is notifying its awaiter.
|
||||
self.state
|
||||
.fetch_and(!NOTIFYING & !AWAITER, Ordering::Release);
|
||||
|
||||
// Finally, notify the waker if it's different from the current waker.
|
||||
if let Some(w) = waker {
|
||||
// We need a safeguard against panics because waking can panic.
|
||||
abort_on_panic(|| match current {
|
||||
|
@ -85,7 +60,7 @@ impl Header {
|
|||
|
||||
/// Registers a new awaiter blocked on this task.
|
||||
///
|
||||
/// This method is called when `Task` is polled and the task has not completed.
|
||||
/// This method is called when `Task` is polled and it has not yet completed.
|
||||
#[inline]
|
||||
pub(crate) fn register(&self, waker: &Waker) {
|
||||
// Load the state and synchronize with it.
|
||||
|
@ -169,7 +144,7 @@ impl fmt::Debug for Header {
|
|||
.field("completed", &(state & COMPLETED != 0))
|
||||
.field("closed", &(state & CLOSED != 0))
|
||||
.field("awaiter", &(state & AWAITER != 0))
|
||||
.field("handle", &(state & HANDLE != 0))
|
||||
.field("task", &(state & TASK != 0))
|
||||
.field("ref_count", &(state / REFERENCE))
|
||||
.finish()
|
||||
}
|
||||
|
|
53
src/lib.rs
53
src/lib.rs
|
@ -1,12 +1,10 @@
|
|||
//! Task abstraction for building executors.
|
||||
//!
|
||||
//! # Spawning
|
||||
//!
|
||||
//! To spawn a future onto an executor, we first need to allocate it on the heap and keep some
|
||||
//! state alongside it. The state indicates whether the future is ready for polling, waiting to be
|
||||
//! woken up, or completed. Such a future is called a *task*.
|
||||
//! state attached to it. The state indicates whether the future is ready for polling, waiting to
|
||||
//! be woken up, or completed. Such a stateful future is called a *task*.
|
||||
//!
|
||||
//! All executors have some kind of queue that holds runnable tasks:
|
||||
//! All executors have a queue that holds scheduled tasks:
|
||||
//!
|
||||
//! ```
|
||||
//! let (sender, receiver) = flume::unbounded();
|
||||
|
@ -17,11 +15,12 @@
|
|||
//! # // A function that schedules the task when it gets woken up.
|
||||
//! # let schedule = move |runnable| sender.send(runnable).unwrap();
|
||||
//! #
|
||||
//! # // Construct a task.
|
||||
//! # // Create a task.
|
||||
//! # let (runnable, task) = async_task::spawn(future, schedule);
|
||||
//! ```
|
||||
//!
|
||||
//! A task is constructed using either [`spawn`] or [`spawn_local`]:
|
||||
//! A task is created using either [`spawn()`], [`spawn_local()`], or [`spawn_unchecked()`] which
|
||||
//! return a [`Runnable`] and a [`Task`]:
|
||||
//!
|
||||
//! ```
|
||||
//! # let (sender, receiver) = flume::unbounded();
|
||||
|
@ -39,12 +38,10 @@
|
|||
//! runnable.schedule();
|
||||
//! ```
|
||||
//!
|
||||
//! The function returns a runnable [`Runnable`] and a [`Task`] that can await the result.
|
||||
//! The [`Runnable`] is used to poll the task's future, and the [`Task`] is used to await its
|
||||
//! output.
|
||||
//!
|
||||
//! # Execution
|
||||
//!
|
||||
//! Task executors have some kind of main loop that drives tasks to completion. That means taking
|
||||
//! runnable tasks out of the queue and running each one in order:
|
||||
//! Finally, we need a loop that takes scheduled tasks from the queue and runs them:
|
||||
//!
|
||||
//! ```no_run
|
||||
//! # let (sender, receiver) = flume::unbounded();
|
||||
|
@ -55,7 +52,7 @@
|
|||
//! # // A function that schedules the task when it gets woken up.
|
||||
//! # let schedule = move |runnable| sender.send(runnable).unwrap();
|
||||
//! #
|
||||
//! # // Construct a task.
|
||||
//! # // Create a task.
|
||||
//! # let (runnable, task) = async_task::spawn(future, schedule);
|
||||
//! #
|
||||
//! # // Push the task into the queue by invoking its schedule function.
|
||||
|
@ -66,31 +63,9 @@
|
|||
//! }
|
||||
//! ```
|
||||
//!
|
||||
//! When a task is run, its future gets polled. If polling does not complete the task, that means
|
||||
//! it's waiting for another future and needs to go to sleep. When woken up, its schedule function
|
||||
//! will be invoked, pushing it back into the queue so that it can be run again.
|
||||
//!
|
||||
//! # Cancelation
|
||||
//!
|
||||
//! Both [`Runnable`] and [`Task`] have methods that cancel the task. When canceled, the
|
||||
//! task's future will not be polled again and will get dropped instead.
|
||||
//!
|
||||
//! If canceled by the [`Runnable`] instance, the task is destroyed immediately. If canceled by the
|
||||
//! [`Task`] instance, it will be scheduled one more time and the next attempt to run it will
|
||||
//! simply destroy it.
|
||||
//!
|
||||
//! The `Task` future will then evaluate to `None`, but only after the task's future is
|
||||
//! dropped.
|
||||
//!
|
||||
//! # Performance
|
||||
//!
|
||||
//! Task construction incurs a single allocation that holds its state, the schedule function, and
|
||||
//! the future or the result of the future if completed.
|
||||
//!
|
||||
//! The layout of a task is equivalent to 4 `usize`s followed by the schedule function, and then by
|
||||
//! a union of the future and its output.
|
||||
//!
|
||||
//! [`block_on`]: https://github.com/stjepang/async-task/blob/master/examples/block.rs
|
||||
//! Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
|
||||
//! vanishes and only reappears when its [`Waker`][`core::task::Waker`] wakes the task, thus
|
||||
//! scheduling it to be run again.
|
||||
|
||||
#![cfg_attr(not(feature = "std"), no_std)]
|
||||
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
|
||||
|
@ -106,7 +81,7 @@ mod state;
|
|||
mod task;
|
||||
mod utils;
|
||||
|
||||
pub use crate::runnable::{spawn, Runnable};
|
||||
pub use crate::runnable::{spawn, spawn_unchecked, Runnable};
|
||||
pub use crate::task::Task;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
|
|
64
src/raw.rs
64
src/raw.rs
|
@ -23,8 +23,8 @@ pub(crate) struct TaskVTable {
|
|||
/// Returns a pointer to the output stored after completion.
|
||||
pub(crate) get_output: unsafe fn(*const ()) -> *const (),
|
||||
|
||||
/// Drops the task.
|
||||
pub(crate) drop_task: unsafe fn(ptr: *const ()),
|
||||
/// Drops the task reference (`Runnable` or `Waker`).
|
||||
pub(crate) drop_ref: unsafe fn(ptr: *const ()),
|
||||
|
||||
/// Destroys the task.
|
||||
pub(crate) destroy: unsafe fn(*const ()),
|
||||
|
@ -82,8 +82,8 @@ impl<F, T, S> Clone for RawTask<F, T, S> {
|
|||
|
||||
impl<F, T, S> RawTask<F, T, S>
|
||||
where
|
||||
F: Future<Output = T> + 'static,
|
||||
S: Fn(Runnable) + Send + Sync + 'static,
|
||||
F: Future<Output = T>,
|
||||
S: Fn(Runnable),
|
||||
{
|
||||
const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
|
||||
Self::clone_waker,
|
||||
|
@ -94,29 +94,29 @@ where
|
|||
|
||||
/// Allocates a task with the given `future` and `schedule` function.
|
||||
///
|
||||
/// It is assumed that initially only the `Runnable` reference and the `Task` exist.
|
||||
/// It is assumed that initially only the `Runnable` and the `Task` exist.
|
||||
pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> {
|
||||
// Compute the layout of the task for allocation. Abort if the computation fails.
|
||||
let task_layout = abort_on_panic(|| Self::task_layout());
|
||||
|
||||
unsafe {
|
||||
// Allocate enough space for the entire task.
|
||||
let raw_task = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
|
||||
let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) {
|
||||
None => abort(),
|
||||
Some(p) => p,
|
||||
};
|
||||
|
||||
let raw = Self::from_ptr(raw_task.as_ptr());
|
||||
let raw = Self::from_ptr(ptr.as_ptr());
|
||||
|
||||
// Write the header as the first field of the task.
|
||||
(raw.header as *mut Header).write(Header {
|
||||
state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
|
||||
state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
|
||||
awaiter: UnsafeCell::new(None),
|
||||
vtable: &TaskVTable {
|
||||
schedule: Self::schedule,
|
||||
drop_future: Self::drop_future,
|
||||
get_output: Self::get_output,
|
||||
drop_task: Self::drop_task,
|
||||
drop_ref: Self::drop_ref,
|
||||
destroy: Self::destroy,
|
||||
run: Self::run,
|
||||
clone_waker: Self::clone_waker,
|
||||
|
@ -129,7 +129,7 @@ where
|
|||
// Write the future as the fourth field of the task.
|
||||
raw.future.write(future);
|
||||
|
||||
raw_task
|
||||
ptr
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -296,7 +296,7 @@ where
|
|||
// because the schedule function cannot be destroyed while the waker is
|
||||
// still alive.
|
||||
let task = Runnable {
|
||||
raw_task: NonNull::new_unchecked(ptr as *mut ()),
|
||||
ptr: NonNull::new_unchecked(ptr as *mut ()),
|
||||
};
|
||||
(*raw.schedule)(task);
|
||||
}
|
||||
|
@ -328,7 +328,7 @@ where
|
|||
/// Drops a waker.
|
||||
///
|
||||
/// This function will decrement the reference count. If it drops down to zero, the associated
|
||||
/// join handle has been dropped too, and the task has not been completed, then it will get
|
||||
/// `Task` has been dropped too, and the task has not been completed, then it will get
|
||||
/// scheduled one more time so that its future gets dropped by the executor.
|
||||
#[inline]
|
||||
unsafe fn drop_waker(ptr: *const ()) {
|
||||
|
@ -339,7 +339,7 @@ where
|
|||
|
||||
// If this was the last reference to the task and the `Task` has been dropped too,
|
||||
// then we need to decide how to destroy the task.
|
||||
if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
|
||||
if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
|
||||
if new & (COMPLETED | CLOSED) == 0 {
|
||||
// If the task was not completed nor closed, close it and schedule one more time so
|
||||
// that its future gets dropped by the executor.
|
||||
|
@ -354,12 +354,12 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
/// Drops a task.
|
||||
/// Drops a task reference (`Runnable` or `Waker`).
|
||||
///
|
||||
/// This function will decrement the reference count. If it drops down to zero and the
|
||||
/// associated join handle has been dropped too, then the task gets destroyed.
|
||||
/// associated `Task` handle has been dropped too, then the task gets destroyed.
|
||||
#[inline]
|
||||
unsafe fn drop_task(ptr: *const ()) {
|
||||
unsafe fn drop_ref(ptr: *const ()) {
|
||||
let raw = Self::from_ptr(ptr);
|
||||
|
||||
// Decrement the reference count.
|
||||
|
@ -367,7 +367,7 @@ where
|
|||
|
||||
// If this was the last reference to the task and the `Task` has been dropped too,
|
||||
// then destroy the task.
|
||||
if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
|
||||
if new & !(REFERENCE - 1) == 0 && new & TASK == 0 {
|
||||
Self::destroy(ptr);
|
||||
}
|
||||
}
|
||||
|
@ -387,7 +387,7 @@ where
|
|||
}
|
||||
|
||||
let task = Runnable {
|
||||
raw_task: NonNull::new_unchecked(ptr as *mut ()),
|
||||
ptr: NonNull::new_unchecked(ptr as *mut ()),
|
||||
};
|
||||
(*raw.schedule)(task);
|
||||
}
|
||||
|
@ -457,7 +457,7 @@ where
|
|||
}
|
||||
|
||||
// Drop the task reference.
|
||||
Self::drop_task(ptr);
|
||||
Self::drop_ref(ptr);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -494,8 +494,8 @@ where
|
|||
|
||||
// The task is now completed.
|
||||
loop {
|
||||
// If the handle is dropped, we'll need to close it and drop the output.
|
||||
let new = if state & HANDLE == 0 {
|
||||
// If the `Task` is dropped, we'll need to close it and drop the output.
|
||||
let new = if state & TASK == 0 {
|
||||
(state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
|
||||
} else {
|
||||
(state & !RUNNING & !SCHEDULED) | COMPLETED
|
||||
|
@ -509,9 +509,9 @@ where
|
|||
Ordering::Acquire,
|
||||
) {
|
||||
Ok(_) => {
|
||||
// If the handle is dropped or if the task was closed while running,
|
||||
// If the `Task` is dropped or if the task was closed while running,
|
||||
// now it's time to drop the output.
|
||||
if state & HANDLE == 0 || state & CLOSED != 0 {
|
||||
if state & TASK == 0 || state & CLOSED != 0 {
|
||||
// Read the output.
|
||||
output = Some(raw.output.read());
|
||||
}
|
||||
|
@ -522,7 +522,7 @@ where
|
|||
}
|
||||
|
||||
// Drop the task reference.
|
||||
Self::drop_task(ptr);
|
||||
Self::drop_ref(ptr);
|
||||
break;
|
||||
}
|
||||
Err(s) => state = s,
|
||||
|
@ -569,7 +569,7 @@ where
|
|||
(*raw.header).notify(None);
|
||||
}
|
||||
// Drop the task reference.
|
||||
Self::drop_task(ptr);
|
||||
Self::drop_ref(ptr);
|
||||
} else if state & SCHEDULED != 0 {
|
||||
// The thread that woke the task up didn't reschedule it because
|
||||
// it was running so now it's our responsibility to do so.
|
||||
|
@ -577,7 +577,7 @@ where
|
|||
return true;
|
||||
} else {
|
||||
// Drop the task reference.
|
||||
Self::drop_task(ptr);
|
||||
Self::drop_ref(ptr);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -592,13 +592,13 @@ where
|
|||
/// A guard that closes the task if polling its future panics.
|
||||
struct Guard<F, T, S>(RawTask<F, T, S>)
|
||||
where
|
||||
F: Future<Output = T> + 'static,
|
||||
S: Fn(Runnable) + Send + Sync + 'static;
|
||||
F: Future<Output = T>,
|
||||
S: Fn(Runnable);
|
||||
|
||||
impl<F, T, S> Drop for Guard<F, T, S>
|
||||
where
|
||||
F: Future<Output = T> + 'static,
|
||||
S: Fn(Runnable) + Send + Sync + 'static,
|
||||
F: Future<Output = T>,
|
||||
S: Fn(Runnable),
|
||||
{
|
||||
fn drop(&mut self) {
|
||||
let raw = self.0;
|
||||
|
@ -626,7 +626,7 @@ where
|
|||
}
|
||||
|
||||
// Drop the task reference.
|
||||
RawTask::<F, T, S>::drop_task(ptr);
|
||||
RawTask::<F, T, S>::drop_ref(ptr);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -647,7 +647,7 @@ where
|
|||
}
|
||||
|
||||
// Drop the task reference.
|
||||
RawTask::<F, T, S>::drop_task(ptr);
|
||||
RawTask::<F, T, S>::drop_ref(ptr);
|
||||
break;
|
||||
}
|
||||
Err(s) => state = s,
|
||||
|
|
154
src/runnable.rs
154
src/runnable.rs
|
@ -13,17 +13,18 @@ use crate::Task;
|
|||
|
||||
/// Creates a new task.
|
||||
///
|
||||
/// This constructor returns a [`Runnable`] reference that runs the future and a [`Task`]
|
||||
/// that awaits its result.
|
||||
/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
|
||||
/// output.
|
||||
///
|
||||
/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
|
||||
/// `schedule` function.
|
||||
/// Method [`Runnable::run()`] polls the `future` once. Then, the [`Runnable`] vanishes and
|
||||
/// only reappears when its [`Waker`] wakes the task, thus scheduling it to be run again.
|
||||
///
|
||||
/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
|
||||
/// push the task into some kind of queue so that it can be processed later.
|
||||
/// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
|
||||
/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
|
||||
/// should push it into a task queue so that it can be processed later.
|
||||
///
|
||||
/// If you need to spawn a future that does not implement [`Send`], consider using the
|
||||
/// [`spawn_local`] function instead.
|
||||
/// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider
|
||||
/// using [`spawn_local()`] or [`spawn_unchecked()`] instead.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
|
@ -40,40 +41,28 @@ use crate::Task;
|
|||
/// // Create a task with the future and the schedule function.
|
||||
/// let (runnable, task) = async_task::spawn(future, schedule);
|
||||
/// ```
|
||||
pub fn spawn<F, T, S>(future: F, schedule: S) -> (Runnable, Task<T>)
|
||||
pub fn spawn<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
|
||||
where
|
||||
F: Future<Output = T> + Send + 'static,
|
||||
T: Send + 'static,
|
||||
F: Future + Send + 'static,
|
||||
F::Output: Send + 'static,
|
||||
S: Fn(Runnable) + Send + Sync + 'static,
|
||||
{
|
||||
// Allocate large futures on the heap.
|
||||
let raw_task = if mem::size_of::<F>() >= 2048 {
|
||||
let future = alloc::boxed::Box::pin(future);
|
||||
RawTask::<_, T, S>::allocate(future, schedule)
|
||||
} else {
|
||||
RawTask::<F, T, S>::allocate(future, schedule)
|
||||
};
|
||||
|
||||
let runnable = Runnable { raw_task };
|
||||
let task = Task {
|
||||
raw_task,
|
||||
_marker: PhantomData,
|
||||
};
|
||||
(runnable, task)
|
||||
unsafe { spawn_unchecked(future, schedule) }
|
||||
}
|
||||
|
||||
/// Creates a new local task.
|
||||
///
|
||||
/// This constructor returns a [`Runnable`] reference that runs the future and a [`Task`]
|
||||
/// that awaits its result.
|
||||
/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
|
||||
/// output.
|
||||
///
|
||||
/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
|
||||
/// `schedule` function.
|
||||
/// Method [`Runnable::run()`] polls the `future` once. Then, the [`Runnable`] vanishes and
|
||||
/// only reappears when its [`Waker`] wakes the task, thus scheduling it to be run again.
|
||||
///
|
||||
/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
|
||||
/// push the task into some kind of queue so that it can be processed later.
|
||||
/// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
|
||||
/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
|
||||
/// should push it into a task queue so that it can be processed later.
|
||||
///
|
||||
/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the
|
||||
/// Unlike [`spawn()`], this function does not require the `future` to implement [`Send`]. If the
|
||||
/// [`Runnable`] reference is run or dropped on a thread it was not created on, a panic will occur.
|
||||
///
|
||||
/// **NOTE:** This function is only available when the `std` feature for this crate is enabled (it
|
||||
|
@ -95,10 +84,10 @@ where
|
|||
/// let (runnable, task) = async_task::spawn_local(future, schedule);
|
||||
/// ```
|
||||
#[cfg(feature = "std")]
|
||||
pub fn spawn_local<F, T, S>(future: F, schedule: S) -> (Runnable, Task<T>)
|
||||
pub fn spawn_local<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
|
||||
where
|
||||
F: Future<Output = T> + 'static,
|
||||
T: 'static,
|
||||
F: Future + 'static,
|
||||
F::Output: 'static,
|
||||
S: Fn(Runnable) + Send + Sync + 'static,
|
||||
{
|
||||
use std::mem::ManuallyDrop;
|
||||
|
@ -144,23 +133,60 @@ where
|
|||
}
|
||||
}
|
||||
|
||||
// Wrap the future into one that which thread it's on.
|
||||
// Wrap the future into one that checks which thread it's on.
|
||||
let future = Checked {
|
||||
id: thread_id(),
|
||||
inner: ManuallyDrop::new(future),
|
||||
};
|
||||
|
||||
unsafe { spawn_unchecked(future, schedule) }
|
||||
}
|
||||
|
||||
/// Creates a new task.
|
||||
///
|
||||
/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
|
||||
/// output.
|
||||
///
|
||||
/// Method [`Runnable::run()`] polls the `future` once. Then, the [`Runnable`] vanishes and
|
||||
/// only reappears when its [`Waker`] wakes the task, thus scheduling it to be run again.
|
||||
///
|
||||
/// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
|
||||
/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
|
||||
/// should push it into a task queue so that it can be processed later.
|
||||
///
|
||||
/// Safe but more restrictive variants of this function are [`spawn()`] or [`spawn_local()`].
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// // The future inside the task.
|
||||
/// let future = async {
|
||||
/// println!("Hello, world!");
|
||||
/// };
|
||||
///
|
||||
/// // If the task gets woken up, it will be sent into this channel.
|
||||
/// let (s, r) = flume::unbounded();
|
||||
/// let schedule = move |runnable| s.send(runnable).unwrap();
|
||||
///
|
||||
/// // Create a task with the future and the schedule function.
|
||||
/// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) };
|
||||
/// ```
|
||||
pub unsafe fn spawn_unchecked<F, S>(future: F, schedule: S) -> (Runnable, Task<F::Output>)
|
||||
where
|
||||
F: Future,
|
||||
S: Fn(Runnable),
|
||||
{
|
||||
// Allocate large futures on the heap.
|
||||
let raw_task = if mem::size_of::<F>() >= 2048 {
|
||||
let ptr = if mem::size_of::<F>() >= 2048 {
|
||||
let future = alloc::boxed::Box::pin(future);
|
||||
RawTask::<_, T, S>::allocate(future, schedule)
|
||||
RawTask::<_, F::Output, S>::allocate(future, schedule)
|
||||
} else {
|
||||
RawTask::<_, T, S>::allocate(future, schedule)
|
||||
RawTask::<F, F::Output, S>::allocate(future, schedule)
|
||||
};
|
||||
|
||||
let runnable = Runnable { raw_task };
|
||||
let runnable = Runnable { ptr };
|
||||
let task = Task {
|
||||
raw_task,
|
||||
ptr,
|
||||
_marker: PhantomData,
|
||||
};
|
||||
(runnable, task)
|
||||
|
@ -182,9 +208,17 @@ where
|
|||
/// canceled. When canceled, the task won't be scheduled again even if a [`Waker`] wakes it. It is
|
||||
/// possible for the [`Task`] to cancel while the [`Runnable`] reference exists, in which
|
||||
/// case an attempt to run the task won't do anything.
|
||||
///
|
||||
/// ----------------
|
||||
///
|
||||
/// A runnable future, ready for execution.
|
||||
///
|
||||
/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken. When it's
|
||||
/// woken up, its schedule function is called, which means the `Runnable` gets pushed into a task
|
||||
/// queue in an executor.
|
||||
pub struct Runnable {
|
||||
/// A pointer to the heap-allocated task.
|
||||
pub(crate) raw_task: NonNull<()>,
|
||||
pub(crate) ptr: NonNull<()>,
|
||||
}
|
||||
|
||||
unsafe impl Send for Runnable {}
|
||||
|
@ -203,7 +237,7 @@ impl Runnable {
|
|||
///
|
||||
/// If the task is canceled, this method won't do anything.
|
||||
pub fn schedule(self) {
|
||||
let ptr = self.raw_task.as_ptr();
|
||||
let ptr = self.ptr.as_ptr();
|
||||
let header = ptr as *const Header;
|
||||
mem::forget(self);
|
||||
|
||||
|
@ -226,9 +260,10 @@ impl Runnable {
|
|||
///
|
||||
/// It is possible that polling the future panics, in which case the panic will be propagated
|
||||
/// into the caller. It is advised that invocations of this method are wrapped inside
|
||||
/// [`catch_unwind`]. If a panic occurs, the task is automatically canceled.
|
||||
/// [`catch_unwind`][`std::panic::catch_unwind`]. If a panic occurs, the task is automatically
|
||||
/// canceled.
|
||||
pub fn run(self) -> bool {
|
||||
let ptr = self.raw_task.as_ptr();
|
||||
let ptr = self.ptr.as_ptr();
|
||||
let header = ptr as *const Header;
|
||||
mem::forget(self);
|
||||
|
||||
|
@ -237,7 +272,7 @@ impl Runnable {
|
|||
|
||||
/// Returns a waker associated with this task.
|
||||
pub fn waker(&self) -> Waker {
|
||||
let ptr = self.raw_task.as_ptr();
|
||||
let ptr = self.ptr.as_ptr();
|
||||
let header = ptr as *const Header;
|
||||
|
||||
unsafe {
|
||||
|
@ -249,12 +284,29 @@ impl Runnable {
|
|||
|
||||
impl Drop for Runnable {
|
||||
fn drop(&mut self) {
|
||||
let ptr = self.raw_task.as_ptr();
|
||||
let ptr = self.ptr.as_ptr();
|
||||
let header = ptr as *const Header;
|
||||
|
||||
unsafe {
|
||||
// Cancel the task.
|
||||
(*header).cancel();
|
||||
let mut state = (*header).state.load(Ordering::Acquire);
|
||||
|
||||
loop {
|
||||
// If the task has been completed or closed, it can't be canceled.
|
||||
if state & (COMPLETED | CLOSED) != 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
// Mark the task as closed.
|
||||
match (*header).state.compare_exchange_weak(
|
||||
state,
|
||||
state | CLOSED,
|
||||
Ordering::AcqRel,
|
||||
Ordering::Acquire,
|
||||
) {
|
||||
Ok(_) => break,
|
||||
Err(s) => state = s,
|
||||
}
|
||||
}
|
||||
|
||||
// Drop the future.
|
||||
((*header).vtable.drop_future)(ptr);
|
||||
|
@ -268,14 +320,14 @@ impl Drop for Runnable {
|
|||
}
|
||||
|
||||
// Drop the task reference.
|
||||
((*header).vtable.drop_task)(ptr);
|
||||
((*header).vtable.drop_ref)(ptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Runnable {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let ptr = self.raw_task.as_ptr();
|
||||
let ptr = self.ptr.as_ptr();
|
||||
let header = ptr as *const Header;
|
||||
|
||||
f.debug_struct("Runnable")
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
/// Set if the task is scheduled for running.
|
||||
///
|
||||
/// A task is considered to be scheduled whenever its `Runnable` reference exists. It therefore
|
||||
/// also begins in scheduled state at the moment of creation.
|
||||
/// A task is considered to be scheduled whenever its `Runnable` exists.
|
||||
///
|
||||
/// This flag can't be set when the task is completed. However, it can be set while the task is
|
||||
/// running, in which case it will be rescheduled as soon as polling finishes.
|
||||
|
@ -27,7 +26,7 @@ pub(crate) const COMPLETED: usize = 1 << 2;
|
|||
/// Set if the task is closed.
|
||||
///
|
||||
/// If a task is closed, that means it's either canceled or its output has been consumed by the
|
||||
/// `Task`. A task becomes closed when:
|
||||
/// `Task`. A task becomes closed in the following cases:
|
||||
///
|
||||
/// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
|
||||
/// 2. Its output gets awaited by the `Task`.
|
||||
|
@ -39,7 +38,7 @@ pub(crate) const CLOSED: usize = 1 << 3;
|
|||
///
|
||||
/// The `Task` is a special case in that it is only tracked by this flag, while all other
|
||||
/// task references (`Runnable` and `Waker`s) are tracked by the reference count.
|
||||
pub(crate) const HANDLE: usize = 1 << 4;
|
||||
pub(crate) const TASK: usize = 1 << 4;
|
||||
|
||||
/// Set if the `Task` is awaiting the output.
|
||||
///
|
||||
|
@ -66,5 +65,5 @@ pub(crate) const NOTIFYING: usize = 1 << 7;
|
|||
/// total reference count.
|
||||
///
|
||||
/// Note that the reference counter only tracks the `Runnable` and `Waker`s. The `Task` is
|
||||
/// tracked separately by the `HANDLE` flag.
|
||||
/// tracked separately by the `TASK` flag.
|
||||
pub(crate) const REFERENCE: usize = 1 << 8;
|
||||
|
|
127
src/task.rs
127
src/task.rs
|
@ -10,16 +10,43 @@ use core::task::{Context, Poll};
|
|||
use crate::header::Header;
|
||||
use crate::state::*;
|
||||
|
||||
/// A handle that awaits the result of a task.
|
||||
/// A spawned task.
|
||||
///
|
||||
/// This type is a future that resolves to an `Option<T>` where:
|
||||
/// A [`Task`] can be awaited to retrieve the output of its future.
|
||||
///
|
||||
/// * `None` indicates the task has panicked or was canceled.
|
||||
/// * `Some(result)` indicates the task has completed with `result` of type `T`.
|
||||
/// Dropping a [`Task`] cancels it, which means its future won't be polled again.
|
||||
/// To drop the [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead.
|
||||
/// To cancel a task gracefully and wait until it is fully destroyed, use the
|
||||
/// [`cancel()`][Task::cancel()] method.
|
||||
///
|
||||
/// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
|
||||
/// can destroy the task by simply dropping its [`Runnable`][`crate::Runnable`] or by invoking
|
||||
/// [`run()`][`crate::Runnable::run()`].
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use smol::{future, Executor};
|
||||
/// use std::thread;
|
||||
///
|
||||
/// let ex = Executor::new();
|
||||
///
|
||||
/// // Spawn a future onto the executor.
|
||||
/// let task = ex.spawn(async {
|
||||
/// println!("Hello from a task!");
|
||||
/// 1 + 2
|
||||
/// });
|
||||
///
|
||||
/// // Run an executor thread.
|
||||
/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
|
||||
///
|
||||
/// // Wait for the task's output.
|
||||
/// assert_eq!(future::block_on(task), 3);
|
||||
/// ```
|
||||
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
|
||||
pub struct Task<T> {
|
||||
/// A raw task pointer.
|
||||
pub(crate) raw_task: NonNull<()>,
|
||||
pub(crate) ptr: NonNull<()>,
|
||||
|
||||
/// A marker capturing generic type `T`.
|
||||
pub(crate) _marker: PhantomData<T>,
|
||||
|
@ -36,12 +63,64 @@ impl<T> std::panic::UnwindSafe for Task<T> {}
|
|||
impl<T> std::panic::RefUnwindSafe for Task<T> {}
|
||||
|
||||
impl<T> Task<T> {
|
||||
/// Detaches the task to let it keep running in the background.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use smol::{Executor, Timer};
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// let ex = Executor::new();
|
||||
///
|
||||
/// // Spawn a deamon future.
|
||||
/// ex.spawn(async {
|
||||
/// loop {
|
||||
/// println!("I'm a daemon task looping forever.");
|
||||
/// Timer::after(Duration::from_secs(1)).await;
|
||||
/// }
|
||||
/// })
|
||||
/// .detach();
|
||||
/// ```
|
||||
pub fn detach(self) {
|
||||
let mut this = self;
|
||||
let _out = this.set_detached();
|
||||
mem::forget(this);
|
||||
}
|
||||
|
||||
/// Cancels the task and waits for it to stop running.
|
||||
///
|
||||
/// Returns the task's output if it was completed just before it got canceled, or [`None`] if
|
||||
/// it didn't complete.
|
||||
///
|
||||
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
|
||||
/// canceling because it also waits for the task to stop running.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use smol::{future, Executor, Timer};
|
||||
/// use std::thread;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// let ex = Executor::new();
|
||||
///
|
||||
/// // Spawn a deamon future.
|
||||
/// let task = ex.spawn(async {
|
||||
/// loop {
|
||||
/// println!("Even though I'm in an infinite loop, you can still cancel me!");
|
||||
/// Timer::after(Duration::from_secs(1)).await;
|
||||
/// }
|
||||
/// });
|
||||
///
|
||||
/// // Run an executor thread.
|
||||
/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
|
||||
///
|
||||
/// future::block_on(async {
|
||||
/// Timer::after(Duration::from_secs(3)).await;
|
||||
/// task.cancel().await;
|
||||
/// });
|
||||
/// ```
|
||||
pub async fn cancel(self) -> Option<T> {
|
||||
let mut this = self;
|
||||
this.set_canceled();
|
||||
|
@ -52,15 +131,16 @@ impl<T> Task<T> {
|
|||
type Output = Option<T>;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
self.0.poll_result(cx)
|
||||
self.0.poll_task(cx)
|
||||
}
|
||||
}
|
||||
|
||||
Fut(this).await
|
||||
}
|
||||
|
||||
/// Puts the task in canceled state.
|
||||
fn set_canceled(&mut self) {
|
||||
let ptr = self.raw_task.as_ptr();
|
||||
let ptr = self.ptr.as_ptr();
|
||||
let header = ptr as *const Header;
|
||||
|
||||
unsafe {
|
||||
|
@ -106,19 +186,20 @@ impl<T> Task<T> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Puts the task in detached state.
|
||||
fn set_detached(&mut self) -> Option<T> {
|
||||
let ptr = self.raw_task.as_ptr();
|
||||
let ptr = self.ptr.as_ptr();
|
||||
let header = ptr as *const Header;
|
||||
|
||||
unsafe {
|
||||
// A place where the output will be stored in case it needs to be dropped.
|
||||
let mut output = None;
|
||||
|
||||
// Optimistically assume the `Task` is being detached just after creating the
|
||||
// task. This is a common case so if the handle is not used, the overhead of it is only
|
||||
// one compare-exchange operation.
|
||||
// Optimistically assume the `Task` is being detached just after creating the task.
|
||||
// This is a common case so if the `Task` is datached, the overhead of it is only one
|
||||
// compare-exchange operation.
|
||||
if let Err(mut state) = (*header).state.compare_exchange_weak(
|
||||
SCHEDULED | HANDLE | REFERENCE,
|
||||
SCHEDULED | TASK | REFERENCE,
|
||||
SCHEDULED | REFERENCE,
|
||||
Ordering::AcqRel,
|
||||
Ordering::Acquire,
|
||||
|
@ -151,10 +232,10 @@ impl<T> Task<T> {
|
|||
let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
|
||||
SCHEDULED | CLOSED | REFERENCE
|
||||
} else {
|
||||
state & !HANDLE
|
||||
state & !TASK
|
||||
};
|
||||
|
||||
// Unset the handle flag.
|
||||
// Unset the `TASK` flag.
|
||||
match (*header).state.compare_exchange_weak(
|
||||
state,
|
||||
new,
|
||||
|
@ -184,8 +265,18 @@ impl<T> Task<T> {
|
|||
}
|
||||
}
|
||||
|
||||
fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
|
||||
let ptr = self.raw_task.as_ptr();
|
||||
/// Polls the task to retrieve its output.
|
||||
///
|
||||
/// Returns `Some` if the task has completed or `None` if it was closed.
|
||||
///
|
||||
/// A task becomes closed in the following cases:
|
||||
///
|
||||
/// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`.
|
||||
/// 2. Its output gets awaited by the `Task`.
|
||||
/// 3. It panics while polling the future.
|
||||
/// 4. It is completed and the `Task` gets dropped.
|
||||
fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
|
||||
let ptr = self.ptr.as_ptr();
|
||||
let header = ptr as *const Header;
|
||||
|
||||
unsafe {
|
||||
|
@ -273,7 +364,7 @@ impl<T> Future for Task<T> {
|
|||
type Output = T;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match self.poll_result(cx) {
|
||||
match self.poll_task(cx) {
|
||||
Poll::Ready(t) => Poll::Ready(t.expect("task has failed")),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
|
@ -282,7 +373,7 @@ impl<T> Future for Task<T> {
|
|||
|
||||
impl<T> fmt::Debug for Task<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
let ptr = self.raw_task.as_ptr();
|
||||
let ptr = self.ptr.as_ptr();
|
||||
let header = ptr as *const Header;
|
||||
|
||||
f.debug_struct("Task")
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
|
|||
use std::task::{Context, Poll};
|
||||
|
||||
use async_task::Runnable;
|
||||
use futures_lite::future;
|
||||
use smol::future;
|
||||
|
||||
// Creates a future with event counters.
|
||||
//
|
||||
|
|
|
@ -7,7 +7,7 @@ use std::time::Duration;
|
|||
|
||||
use async_task::Runnable;
|
||||
use easy_parallel::Parallel;
|
||||
use futures_lite::future;
|
||||
use smol::future;
|
||||
|
||||
// Creates a future with event counters.
|
||||
//
|
||||
|
|
|
@ -9,7 +9,7 @@ use std::time::Duration;
|
|||
|
||||
use async_task::Runnable;
|
||||
use easy_parallel::Parallel;
|
||||
use futures_lite::future;
|
||||
use smol::future;
|
||||
|
||||
// Creates a future with event counters.
|
||||
//
|
||||
|
|
|
@ -8,7 +8,7 @@ use std::time::Duration;
|
|||
|
||||
use async_task::Runnable;
|
||||
use easy_parallel::Parallel;
|
||||
use futures_lite::future;
|
||||
use smol::future;
|
||||
|
||||
// Creates a future with event counters.
|
||||
//
|
||||
|
|
|
@ -7,7 +7,7 @@ use std::time::Duration;
|
|||
|
||||
use async_task::Runnable;
|
||||
use easy_parallel::Parallel;
|
||||
use futures_lite::future;
|
||||
use smol::future;
|
||||
|
||||
// Creates a future with event counters.
|
||||
//
|
||||
|
|
|
@ -10,7 +10,7 @@ use std::time::Duration;
|
|||
use async_task::Runnable;
|
||||
use atomic_waker::AtomicWaker;
|
||||
use easy_parallel::Parallel;
|
||||
use futures_lite::future;
|
||||
use smol::future;
|
||||
|
||||
// Creates a future with event counters.
|
||||
//
|
||||
|
|
Loading…
Reference in New Issue