diff --git a/Cargo.toml b/Cargo.toml index 6a6bee5..cf40a60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,7 @@ std = [] [dev-dependencies] atomic-waker = "1.0.0" -concurrent-queue = "1.2.2" easy-parallel = "3.1.0" flume = { version = "0.9.0", default-features = false } -futures-lite = "1.7.0" once_cell = "1.4.1" +smol = "1.0.1" diff --git a/benches/spawn.rs b/benches/spawn.rs index b6d5f26..75d059e 100644 --- a/benches/spawn.rs +++ b/benches/spawn.rs @@ -2,7 +2,7 @@ extern crate test; -use futures_lite::future; +use smol::future; use test::Bencher; #[bench] diff --git a/examples/spawn-local.rs b/examples/spawn-local.rs index 170cd71..6713f9c 100644 --- a/examples/spawn-local.rs +++ b/examples/spawn-local.rs @@ -7,7 +7,7 @@ use std::rc::Rc; use async_task::{Runnable, Task}; thread_local! { - // A channel that holds scheduled tasks. + // A queue that holds scheduled tasks. static QUEUE: (flume::Sender, flume::Receiver) = flume::unbounded(); } @@ -17,11 +17,11 @@ where F: Future + 'static, T: 'static, { - // Create a task that is scheduled by sending itself into the channel. + // Create a task that is scheduled by pushing itself into the queue. let schedule = |t| QUEUE.with(|(s, _)| s.send(t).unwrap()); let (runnable, task) = async_task::spawn_local(future, schedule); - // Schedule the task by sending it into the queue. + // Schedule the task by pushing it into the queue. runnable.schedule(); task diff --git a/examples/spawn-on-thread.rs b/examples/spawn-on-thread.rs index 8d23240..4e28d3e 100644 --- a/examples/spawn-on-thread.rs +++ b/examples/spawn-on-thread.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use std::thread; use async_task::Task; -use futures_lite::future; +use smol::future; /// Spawns a future on a new dedicated thread. /// @@ -27,7 +27,7 @@ where future.await }; - // Create a task that is scheduled by sending itself into the channel. + // Create a task that is scheduled by sending it into the channel. let schedule = move |t| s.upgrade().unwrap().send(t).unwrap(); let (runnable, task) = async_task::spawn(future, schedule); diff --git a/examples/spawn.rs b/examples/spawn.rs index 74de7f9..71e2198 100644 --- a/examples/spawn.rs +++ b/examples/spawn.rs @@ -5,8 +5,8 @@ use std::panic::catch_unwind; use std::thread; use async_task::{Runnable, Task}; -use futures_lite::future; use once_cell::sync::Lazy; +use smol::future; /// Spawns a future on the executor. fn spawn(future: F) -> Task @@ -14,7 +14,7 @@ where F: Future + Send + 'static, T: Send + 'static, { - // A channel that holds scheduled tasks. + // A queue that holds scheduled tasks. static QUEUE: Lazy> = Lazy::new(|| { let (sender, receiver) = flume::unbounded::(); @@ -29,11 +29,11 @@ where sender }); - // Create a task that is scheduled by sending itself into the channel. + // Create a task that is scheduled by pushing it into the queue. let schedule = |t| QUEUE.send(t).unwrap(); let (runnable, task) = async_task::spawn(future, schedule); - // Schedule the task by sending it into the channel. + // Schedule the task by pushing it into the queue. runnable.schedule(); task diff --git a/src/header.rs b/src/header.rs index 33bee6e..9c4960d 100644 --- a/src/header.rs +++ b/src/header.rs @@ -9,14 +9,14 @@ use crate::utils::abort_on_panic; /// The header of a task. /// -/// This header is stored right at the beginning of every heap-allocated task. +/// This header is stored in memory at the beginning of the heap-allocated task. pub(crate) struct Header { /// Current state of the task. /// /// Contains flags representing the current state and the reference count. pub(crate) state: AtomicUsize, - /// The task that is blocked on the `Task`. + /// The task that is blocked on the `Task` handle. /// /// This waker needs to be woken up once the task completes or is closed. pub(crate) awaiter: UnsafeCell>, @@ -29,49 +29,24 @@ pub(crate) struct Header { } impl Header { - /// Cancels the task. - /// - /// This method will mark the task as closed, but it won't reschedule the task or drop its - /// future. - pub(crate) fn cancel(&self) { - let mut state = self.state.load(Ordering::Acquire); - - loop { - // If the task has been completed or closed, it can't be canceled. - if state & (COMPLETED | CLOSED) != 0 { - break; - } - - // Mark the task as closed. - match self.state.compare_exchange_weak( - state, - state | CLOSED, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => break, - Err(s) => state = s, - } - } - } - /// Notifies the awaiter blocked on this task. /// /// If the awaiter is the same as the current waker, it will not be notified. #[inline] pub(crate) fn notify(&self, current: Option<&Waker>) { - // Mark the awaiter as being notified. + // Set the bit indicating that the task is notifying its awaiter. let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel); - // If the awaiter was not being notified nor registered... + // If the task was not notifying or registering an awaiter... if state & (NOTIFYING | REGISTERING) == 0 { // Take the waker out. let waker = unsafe { (*self.awaiter.get()).take() }; - // Mark the state as not being notified anymore nor containing an awaiter. + // Unset the bit indicating that the task is notifying its awaiter. self.state .fetch_and(!NOTIFYING & !AWAITER, Ordering::Release); + // Finally, notify the waker if it's different from the current waker. if let Some(w) = waker { // We need a safeguard against panics because waking can panic. abort_on_panic(|| match current { @@ -85,7 +60,7 @@ impl Header { /// Registers a new awaiter blocked on this task. /// - /// This method is called when `Task` is polled and the task has not completed. + /// This method is called when `Task` is polled and it has not yet completed. #[inline] pub(crate) fn register(&self, waker: &Waker) { // Load the state and synchronize with it. @@ -169,7 +144,7 @@ impl fmt::Debug for Header { .field("completed", &(state & COMPLETED != 0)) .field("closed", &(state & CLOSED != 0)) .field("awaiter", &(state & AWAITER != 0)) - .field("handle", &(state & HANDLE != 0)) + .field("task", &(state & TASK != 0)) .field("ref_count", &(state / REFERENCE)) .finish() } diff --git a/src/lib.rs b/src/lib.rs index 369f43f..852696e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,10 @@ //! Task abstraction for building executors. //! -//! # Spawning -//! //! To spawn a future onto an executor, we first need to allocate it on the heap and keep some -//! state alongside it. The state indicates whether the future is ready for polling, waiting to be -//! woken up, or completed. Such a future is called a *task*. +//! state attached to it. The state indicates whether the future is ready for polling, waiting to +//! be woken up, or completed. Such a stateful future is called a *task*. //! -//! All executors have some kind of queue that holds runnable tasks: +//! All executors have a queue that holds scheduled tasks: //! //! ``` //! let (sender, receiver) = flume::unbounded(); @@ -17,11 +15,12 @@ //! # // A function that schedules the task when it gets woken up. //! # let schedule = move |runnable| sender.send(runnable).unwrap(); //! # -//! # // Construct a task. +//! # // Create a task. //! # let (runnable, task) = async_task::spawn(future, schedule); //! ``` //! -//! A task is constructed using either [`spawn`] or [`spawn_local`]: +//! A task is created using either [`spawn()`], [`spawn_local()`], or [`spawn_unchecked()`] which +//! return a [`Runnable`] and a [`Task`]: //! //! ``` //! # let (sender, receiver) = flume::unbounded(); @@ -39,12 +38,10 @@ //! runnable.schedule(); //! ``` //! -//! The function returns a runnable [`Runnable`] and a [`Task`] that can await the result. +//! The [`Runnable`] is used to poll the task's future, and the [`Task`] is used to await its +//! output. //! -//! # Execution -//! -//! Task executors have some kind of main loop that drives tasks to completion. That means taking -//! runnable tasks out of the queue and running each one in order: +//! Finally, we need a loop that takes scheduled tasks from the queue and runs them: //! //! ```no_run //! # let (sender, receiver) = flume::unbounded(); @@ -55,7 +52,7 @@ //! # // A function that schedules the task when it gets woken up. //! # let schedule = move |runnable| sender.send(runnable).unwrap(); //! # -//! # // Construct a task. +//! # // Create a task. //! # let (runnable, task) = async_task::spawn(future, schedule); //! # //! # // Push the task into the queue by invoking its schedule function. @@ -66,31 +63,9 @@ //! } //! ``` //! -//! When a task is run, its future gets polled. If polling does not complete the task, that means -//! it's waiting for another future and needs to go to sleep. When woken up, its schedule function -//! will be invoked, pushing it back into the queue so that it can be run again. -//! -//! # Cancelation -//! -//! Both [`Runnable`] and [`Task`] have methods that cancel the task. When canceled, the -//! task's future will not be polled again and will get dropped instead. -//! -//! If canceled by the [`Runnable`] instance, the task is destroyed immediately. If canceled by the -//! [`Task`] instance, it will be scheduled one more time and the next attempt to run it will -//! simply destroy it. -//! -//! The `Task` future will then evaluate to `None`, but only after the task's future is -//! dropped. -//! -//! # Performance -//! -//! Task construction incurs a single allocation that holds its state, the schedule function, and -//! the future or the result of the future if completed. -//! -//! The layout of a task is equivalent to 4 `usize`s followed by the schedule function, and then by -//! a union of the future and its output. -//! -//! [`block_on`]: https://github.com/stjepang/async-task/blob/master/examples/block.rs +//! Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`] +//! vanishes and only reappears when its [`Waker`][`core::task::Waker`] wakes the task, thus +//! scheduling it to be run again. #![cfg_attr(not(feature = "std"), no_std)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] @@ -106,7 +81,7 @@ mod state; mod task; mod utils; -pub use crate::runnable::{spawn, Runnable}; +pub use crate::runnable::{spawn, spawn_unchecked, Runnable}; pub use crate::task::Task; #[cfg(feature = "std")] diff --git a/src/raw.rs b/src/raw.rs index 48a153c..c397bc1 100644 --- a/src/raw.rs +++ b/src/raw.rs @@ -23,8 +23,8 @@ pub(crate) struct TaskVTable { /// Returns a pointer to the output stored after completion. pub(crate) get_output: unsafe fn(*const ()) -> *const (), - /// Drops the task. - pub(crate) drop_task: unsafe fn(ptr: *const ()), + /// Drops the task reference (`Runnable` or `Waker`). + pub(crate) drop_ref: unsafe fn(ptr: *const ()), /// Destroys the task. pub(crate) destroy: unsafe fn(*const ()), @@ -82,8 +82,8 @@ impl Clone for RawTask { impl RawTask where - F: Future + 'static, - S: Fn(Runnable) + Send + Sync + 'static, + F: Future, + S: Fn(Runnable), { const RAW_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( Self::clone_waker, @@ -94,29 +94,29 @@ where /// Allocates a task with the given `future` and `schedule` function. /// - /// It is assumed that initially only the `Runnable` reference and the `Task` exist. + /// It is assumed that initially only the `Runnable` and the `Task` exist. pub(crate) fn allocate(future: F, schedule: S) -> NonNull<()> { // Compute the layout of the task for allocation. Abort if the computation fails. let task_layout = abort_on_panic(|| Self::task_layout()); unsafe { // Allocate enough space for the entire task. - let raw_task = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) { + let ptr = match NonNull::new(alloc::alloc::alloc(task_layout.layout) as *mut ()) { None => abort(), Some(p) => p, }; - let raw = Self::from_ptr(raw_task.as_ptr()); + let raw = Self::from_ptr(ptr.as_ptr()); // Write the header as the first field of the task. (raw.header as *mut Header).write(Header { - state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE), + state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE), awaiter: UnsafeCell::new(None), vtable: &TaskVTable { schedule: Self::schedule, drop_future: Self::drop_future, get_output: Self::get_output, - drop_task: Self::drop_task, + drop_ref: Self::drop_ref, destroy: Self::destroy, run: Self::run, clone_waker: Self::clone_waker, @@ -129,7 +129,7 @@ where // Write the future as the fourth field of the task. raw.future.write(future); - raw_task + ptr } } @@ -296,7 +296,7 @@ where // because the schedule function cannot be destroyed while the waker is // still alive. let task = Runnable { - raw_task: NonNull::new_unchecked(ptr as *mut ()), + ptr: NonNull::new_unchecked(ptr as *mut ()), }; (*raw.schedule)(task); } @@ -328,7 +328,7 @@ where /// Drops a waker. /// /// This function will decrement the reference count. If it drops down to zero, the associated - /// join handle has been dropped too, and the task has not been completed, then it will get + /// `Task` has been dropped too, and the task has not been completed, then it will get /// scheduled one more time so that its future gets dropped by the executor. #[inline] unsafe fn drop_waker(ptr: *const ()) { @@ -339,7 +339,7 @@ where // If this was the last reference to the task and the `Task` has been dropped too, // then we need to decide how to destroy the task. - if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 { + if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { if new & (COMPLETED | CLOSED) == 0 { // If the task was not completed nor closed, close it and schedule one more time so // that its future gets dropped by the executor. @@ -354,12 +354,12 @@ where } } - /// Drops a task. + /// Drops a task reference (`Runnable` or `Waker`). /// /// This function will decrement the reference count. If it drops down to zero and the - /// associated join handle has been dropped too, then the task gets destroyed. + /// associated `Task` handle has been dropped too, then the task gets destroyed. #[inline] - unsafe fn drop_task(ptr: *const ()) { + unsafe fn drop_ref(ptr: *const ()) { let raw = Self::from_ptr(ptr); // Decrement the reference count. @@ -367,7 +367,7 @@ where // If this was the last reference to the task and the `Task` has been dropped too, // then destroy the task. - if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 { + if new & !(REFERENCE - 1) == 0 && new & TASK == 0 { Self::destroy(ptr); } } @@ -387,7 +387,7 @@ where } let task = Runnable { - raw_task: NonNull::new_unchecked(ptr as *mut ()), + ptr: NonNull::new_unchecked(ptr as *mut ()), }; (*raw.schedule)(task); } @@ -457,7 +457,7 @@ where } // Drop the task reference. - Self::drop_task(ptr); + Self::drop_ref(ptr); return false; } @@ -494,8 +494,8 @@ where // The task is now completed. loop { - // If the handle is dropped, we'll need to close it and drop the output. - let new = if state & HANDLE == 0 { + // If the `Task` is dropped, we'll need to close it and drop the output. + let new = if state & TASK == 0 { (state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED } else { (state & !RUNNING & !SCHEDULED) | COMPLETED @@ -509,9 +509,9 @@ where Ordering::Acquire, ) { Ok(_) => { - // If the handle is dropped or if the task was closed while running, + // If the `Task` is dropped or if the task was closed while running, // now it's time to drop the output. - if state & HANDLE == 0 || state & CLOSED != 0 { + if state & TASK == 0 || state & CLOSED != 0 { // Read the output. output = Some(raw.output.read()); } @@ -522,7 +522,7 @@ where } // Drop the task reference. - Self::drop_task(ptr); + Self::drop_ref(ptr); break; } Err(s) => state = s, @@ -569,7 +569,7 @@ where (*raw.header).notify(None); } // Drop the task reference. - Self::drop_task(ptr); + Self::drop_ref(ptr); } else if state & SCHEDULED != 0 { // The thread that woke the task up didn't reschedule it because // it was running so now it's our responsibility to do so. @@ -577,7 +577,7 @@ where return true; } else { // Drop the task reference. - Self::drop_task(ptr); + Self::drop_ref(ptr); } break; } @@ -592,13 +592,13 @@ where /// A guard that closes the task if polling its future panics. struct Guard(RawTask) where - F: Future + 'static, - S: Fn(Runnable) + Send + Sync + 'static; + F: Future, + S: Fn(Runnable); impl Drop for Guard where - F: Future + 'static, - S: Fn(Runnable) + Send + Sync + 'static, + F: Future, + S: Fn(Runnable), { fn drop(&mut self) { let raw = self.0; @@ -626,7 +626,7 @@ where } // Drop the task reference. - RawTask::::drop_task(ptr); + RawTask::::drop_ref(ptr); break; } @@ -647,7 +647,7 @@ where } // Drop the task reference. - RawTask::::drop_task(ptr); + RawTask::::drop_ref(ptr); break; } Err(s) => state = s, diff --git a/src/runnable.rs b/src/runnable.rs index d8107f3..0e2f140 100644 --- a/src/runnable.rs +++ b/src/runnable.rs @@ -13,17 +13,18 @@ use crate::Task; /// Creates a new task. /// -/// This constructor returns a [`Runnable`] reference that runs the future and a [`Task`] -/// that awaits its result. +/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its +/// output. /// -/// When run, the task polls `future`. When woken up, it gets scheduled for running by the -/// `schedule` function. +/// Method [`Runnable::run()`] polls the `future` once. Then, the [`Runnable`] vanishes and +/// only reappears when its [`Waker`] wakes the task, thus scheduling it to be run again. /// -/// The schedule function should not attempt to run the task nor to drop it. Instead, it should -/// push the task into some kind of queue so that it can be processed later. +/// When the task is woken, its [`Runnable`] is passed to the `schedule` function. +/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it +/// should push it into a task queue so that it can be processed later. /// -/// If you need to spawn a future that does not implement [`Send`], consider using the -/// [`spawn_local`] function instead. +/// If you need to spawn a future that does not implement [`Send`] or isn't `'static`, consider +/// using [`spawn_local()`] or [`spawn_unchecked()`] instead. /// /// # Examples /// @@ -40,40 +41,28 @@ use crate::Task; /// // Create a task with the future and the schedule function. /// let (runnable, task) = async_task::spawn(future, schedule); /// ``` -pub fn spawn(future: F, schedule: S) -> (Runnable, Task) +pub fn spawn(future: F, schedule: S) -> (Runnable, Task) where - F: Future + Send + 'static, - T: Send + 'static, + F: Future + Send + 'static, + F::Output: Send + 'static, S: Fn(Runnable) + Send + Sync + 'static, { - // Allocate large futures on the heap. - let raw_task = if mem::size_of::() >= 2048 { - let future = alloc::boxed::Box::pin(future); - RawTask::<_, T, S>::allocate(future, schedule) - } else { - RawTask::::allocate(future, schedule) - }; - - let runnable = Runnable { raw_task }; - let task = Task { - raw_task, - _marker: PhantomData, - }; - (runnable, task) + unsafe { spawn_unchecked(future, schedule) } } /// Creates a new local task. /// -/// This constructor returns a [`Runnable`] reference that runs the future and a [`Task`] -/// that awaits its result. +/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its +/// output. /// -/// When run, the task polls `future`. When woken up, it gets scheduled for running by the -/// `schedule` function. +/// Method [`Runnable::run()`] polls the `future` once. Then, the [`Runnable`] vanishes and +/// only reappears when its [`Waker`] wakes the task, thus scheduling it to be run again. /// -/// The schedule function should not attempt to run the task nor to drop it. Instead, it should -/// push the task into some kind of queue so that it can be processed later. +/// When the task is woken, its [`Runnable`] is passed to the `schedule` function. +/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it +/// should push it into a task queue so that it can be processed later. /// -/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the +/// Unlike [`spawn()`], this function does not require the `future` to implement [`Send`]. If the /// [`Runnable`] reference is run or dropped on a thread it was not created on, a panic will occur. /// /// **NOTE:** This function is only available when the `std` feature for this crate is enabled (it @@ -95,10 +84,10 @@ where /// let (runnable, task) = async_task::spawn_local(future, schedule); /// ``` #[cfg(feature = "std")] -pub fn spawn_local(future: F, schedule: S) -> (Runnable, Task) +pub fn spawn_local(future: F, schedule: S) -> (Runnable, Task) where - F: Future + 'static, - T: 'static, + F: Future + 'static, + F::Output: 'static, S: Fn(Runnable) + Send + Sync + 'static, { use std::mem::ManuallyDrop; @@ -144,23 +133,60 @@ where } } - // Wrap the future into one that which thread it's on. + // Wrap the future into one that checks which thread it's on. let future = Checked { id: thread_id(), inner: ManuallyDrop::new(future), }; + unsafe { spawn_unchecked(future, schedule) } +} + +/// Creates a new task. +/// +/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its +/// output. +/// +/// Method [`Runnable::run()`] polls the `future` once. Then, the [`Runnable`] vanishes and +/// only reappears when its [`Waker`] wakes the task, thus scheduling it to be run again. +/// +/// When the task is woken, its [`Runnable`] is passed to the `schedule` function. +/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it +/// should push it into a task queue so that it can be processed later. +/// +/// Safe but more restrictive variants of this function are [`spawn()`] or [`spawn_local()`]. +/// +/// # Examples +/// +/// ``` +/// // The future inside the task. +/// let future = async { +/// println!("Hello, world!"); +/// }; +/// +/// // If the task gets woken up, it will be sent into this channel. +/// let (s, r) = flume::unbounded(); +/// let schedule = move |runnable| s.send(runnable).unwrap(); +/// +/// // Create a task with the future and the schedule function. +/// let (runnable, task) = unsafe { async_task::spawn_unchecked(future, schedule) }; +/// ``` +pub unsafe fn spawn_unchecked(future: F, schedule: S) -> (Runnable, Task) +where + F: Future, + S: Fn(Runnable), +{ // Allocate large futures on the heap. - let raw_task = if mem::size_of::() >= 2048 { + let ptr = if mem::size_of::() >= 2048 { let future = alloc::boxed::Box::pin(future); - RawTask::<_, T, S>::allocate(future, schedule) + RawTask::<_, F::Output, S>::allocate(future, schedule) } else { - RawTask::<_, T, S>::allocate(future, schedule) + RawTask::::allocate(future, schedule) }; - let runnable = Runnable { raw_task }; + let runnable = Runnable { ptr }; let task = Task { - raw_task, + ptr, _marker: PhantomData, }; (runnable, task) @@ -182,9 +208,17 @@ where /// canceled. When canceled, the task won't be scheduled again even if a [`Waker`] wakes it. It is /// possible for the [`Task`] to cancel while the [`Runnable`] reference exists, in which /// case an attempt to run the task won't do anything. +/// +/// ---------------- +/// +/// A runnable future, ready for execution. +/// +/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken. When it's +/// woken up, its schedule function is called, which means the `Runnable` gets pushed into a task +/// queue in an executor. pub struct Runnable { /// A pointer to the heap-allocated task. - pub(crate) raw_task: NonNull<()>, + pub(crate) ptr: NonNull<()>, } unsafe impl Send for Runnable {} @@ -203,7 +237,7 @@ impl Runnable { /// /// If the task is canceled, this method won't do anything. pub fn schedule(self) { - let ptr = self.raw_task.as_ptr(); + let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; mem::forget(self); @@ -226,9 +260,10 @@ impl Runnable { /// /// It is possible that polling the future panics, in which case the panic will be propagated /// into the caller. It is advised that invocations of this method are wrapped inside - /// [`catch_unwind`]. If a panic occurs, the task is automatically canceled. + /// [`catch_unwind`][`std::panic::catch_unwind`]. If a panic occurs, the task is automatically + /// canceled. pub fn run(self) -> bool { - let ptr = self.raw_task.as_ptr(); + let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; mem::forget(self); @@ -237,7 +272,7 @@ impl Runnable { /// Returns a waker associated with this task. pub fn waker(&self) -> Waker { - let ptr = self.raw_task.as_ptr(); + let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; unsafe { @@ -249,12 +284,29 @@ impl Runnable { impl Drop for Runnable { fn drop(&mut self) { - let ptr = self.raw_task.as_ptr(); + let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; unsafe { - // Cancel the task. - (*header).cancel(); + let mut state = (*header).state.load(Ordering::Acquire); + + loop { + // If the task has been completed or closed, it can't be canceled. + if state & (COMPLETED | CLOSED) != 0 { + break; + } + + // Mark the task as closed. + match (*header).state.compare_exchange_weak( + state, + state | CLOSED, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => break, + Err(s) => state = s, + } + } // Drop the future. ((*header).vtable.drop_future)(ptr); @@ -268,14 +320,14 @@ impl Drop for Runnable { } // Drop the task reference. - ((*header).vtable.drop_task)(ptr); + ((*header).vtable.drop_ref)(ptr); } } } impl fmt::Debug for Runnable { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let ptr = self.raw_task.as_ptr(); + let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; f.debug_struct("Runnable") diff --git a/src/state.rs b/src/state.rs index e71cff5..2fc6cf3 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,7 +1,6 @@ /// Set if the task is scheduled for running. /// -/// A task is considered to be scheduled whenever its `Runnable` reference exists. It therefore -/// also begins in scheduled state at the moment of creation. +/// A task is considered to be scheduled whenever its `Runnable` exists. /// /// This flag can't be set when the task is completed. However, it can be set while the task is /// running, in which case it will be rescheduled as soon as polling finishes. @@ -27,7 +26,7 @@ pub(crate) const COMPLETED: usize = 1 << 2; /// Set if the task is closed. /// /// If a task is closed, that means it's either canceled or its output has been consumed by the -/// `Task`. A task becomes closed when: +/// `Task`. A task becomes closed in the following cases: /// /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. /// 2. Its output gets awaited by the `Task`. @@ -39,7 +38,7 @@ pub(crate) const CLOSED: usize = 1 << 3; /// /// The `Task` is a special case in that it is only tracked by this flag, while all other /// task references (`Runnable` and `Waker`s) are tracked by the reference count. -pub(crate) const HANDLE: usize = 1 << 4; +pub(crate) const TASK: usize = 1 << 4; /// Set if the `Task` is awaiting the output. /// @@ -66,5 +65,5 @@ pub(crate) const NOTIFYING: usize = 1 << 7; /// total reference count. /// /// Note that the reference counter only tracks the `Runnable` and `Waker`s. The `Task` is -/// tracked separately by the `HANDLE` flag. +/// tracked separately by the `TASK` flag. pub(crate) const REFERENCE: usize = 1 << 8; diff --git a/src/task.rs b/src/task.rs index e1e2c20..0f14e82 100644 --- a/src/task.rs +++ b/src/task.rs @@ -10,16 +10,43 @@ use core::task::{Context, Poll}; use crate::header::Header; use crate::state::*; -/// A handle that awaits the result of a task. +/// A spawned task. /// -/// This type is a future that resolves to an `Option` where: +/// A [`Task`] can be awaited to retrieve the output of its future. /// -/// * `None` indicates the task has panicked or was canceled. -/// * `Some(result)` indicates the task has completed with `result` of type `T`. +/// Dropping a [`Task`] cancels it, which means its future won't be polled again. +/// To drop the [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. +/// To cancel a task gracefully and wait until it is fully destroyed, use the +/// [`cancel()`][Task::cancel()] method. +/// +/// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor +/// can destroy the task by simply dropping its [`Runnable`][`crate::Runnable`] or by invoking +/// [`run()`][`crate::Runnable::run()`]. +/// +/// # Examples +/// +/// ``` +/// use smol::{future, Executor}; +/// use std::thread; +/// +/// let ex = Executor::new(); +/// +/// // Spawn a future onto the executor. +/// let task = ex.spawn(async { +/// println!("Hello from a task!"); +/// 1 + 2 +/// }); +/// +/// // Run an executor thread. +/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); +/// +/// // Wait for the task's output. +/// assert_eq!(future::block_on(task), 3); +/// ``` #[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] pub struct Task { /// A raw task pointer. - pub(crate) raw_task: NonNull<()>, + pub(crate) ptr: NonNull<()>, /// A marker capturing generic type `T`. pub(crate) _marker: PhantomData, @@ -36,12 +63,64 @@ impl std::panic::UnwindSafe for Task {} impl std::panic::RefUnwindSafe for Task {} impl Task { + /// Detaches the task to let it keep running in the background. + /// + /// # Examples + /// + /// ``` + /// use smol::{Executor, Timer}; + /// use std::time::Duration; + /// + /// let ex = Executor::new(); + /// + /// // Spawn a deamon future. + /// ex.spawn(async { + /// loop { + /// println!("I'm a daemon task looping forever."); + /// Timer::after(Duration::from_secs(1)).await; + /// } + /// }) + /// .detach(); + /// ``` pub fn detach(self) { let mut this = self; let _out = this.set_detached(); mem::forget(this); } + /// Cancels the task and waits for it to stop running. + /// + /// Returns the task's output if it was completed just before it got canceled, or [`None`] if + /// it didn't complete. + /// + /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of + /// canceling because it also waits for the task to stop running. + /// + /// # Examples + /// + /// ``` + /// use smol::{future, Executor, Timer}; + /// use std::thread; + /// use std::time::Duration; + /// + /// let ex = Executor::new(); + /// + /// // Spawn a deamon future. + /// let task = ex.spawn(async { + /// loop { + /// println!("Even though I'm in an infinite loop, you can still cancel me!"); + /// Timer::after(Duration::from_secs(1)).await; + /// } + /// }); + /// + /// // Run an executor thread. + /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); + /// + /// future::block_on(async { + /// Timer::after(Duration::from_secs(3)).await; + /// task.cancel().await; + /// }); + /// ``` pub async fn cancel(self) -> Option { let mut this = self; this.set_canceled(); @@ -52,15 +131,16 @@ impl Task { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.0.poll_result(cx) + self.0.poll_task(cx) } } Fut(this).await } + /// Puts the task in canceled state. fn set_canceled(&mut self) { - let ptr = self.raw_task.as_ptr(); + let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; unsafe { @@ -106,19 +186,20 @@ impl Task { } } + /// Puts the task in detached state. fn set_detached(&mut self) -> Option { - let ptr = self.raw_task.as_ptr(); + let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; unsafe { // A place where the output will be stored in case it needs to be dropped. let mut output = None; - // Optimistically assume the `Task` is being detached just after creating the - // task. This is a common case so if the handle is not used, the overhead of it is only - // one compare-exchange operation. + // Optimistically assume the `Task` is being detached just after creating the task. + // This is a common case so if the `Task` is datached, the overhead of it is only one + // compare-exchange operation. if let Err(mut state) = (*header).state.compare_exchange_weak( - SCHEDULED | HANDLE | REFERENCE, + SCHEDULED | TASK | REFERENCE, SCHEDULED | REFERENCE, Ordering::AcqRel, Ordering::Acquire, @@ -151,10 +232,10 @@ impl Task { let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 { SCHEDULED | CLOSED | REFERENCE } else { - state & !HANDLE + state & !TASK }; - // Unset the handle flag. + // Unset the `TASK` flag. match (*header).state.compare_exchange_weak( state, new, @@ -184,8 +265,18 @@ impl Task { } } - fn poll_result(&mut self, cx: &mut Context<'_>) -> Poll> { - let ptr = self.raw_task.as_ptr(); + /// Polls the task to retrieve its output. + /// + /// Returns `Some` if the task has completed or `None` if it was closed. + /// + /// A task becomes closed in the following cases: + /// + /// 1. It gets canceled by `Runnable::drop()`, `Task::drop()`, or `Task::cancel()`. + /// 2. Its output gets awaited by the `Task`. + /// 3. It panics while polling the future. + /// 4. It is completed and the `Task` gets dropped. + fn poll_task(&mut self, cx: &mut Context<'_>) -> Poll> { + let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; unsafe { @@ -273,7 +364,7 @@ impl Future for Task { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.poll_result(cx) { + match self.poll_task(cx) { Poll::Ready(t) => Poll::Ready(t.expect("task has failed")), Poll::Pending => Poll::Pending, } @@ -282,7 +373,7 @@ impl Future for Task { impl fmt::Debug for Task { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let ptr = self.raw_task.as_ptr(); + let ptr = self.ptr.as_ptr(); let header = ptr as *const Header; f.debug_struct("Task") diff --git a/tests/basic.rs b/tests/basic.rs index 7f948bf..1439e16 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::task::{Context, Poll}; use async_task::Runnable; -use futures_lite::future; +use smol::future; // Creates a future with event counters. // diff --git a/tests/cancel.rs b/tests/cancel.rs index 7461856..a0b097c 100644 --- a/tests/cancel.rs +++ b/tests/cancel.rs @@ -7,7 +7,7 @@ use std::time::Duration; use async_task::Runnable; use easy_parallel::Parallel; -use futures_lite::future; +use smol::future; // Creates a future with event counters. // diff --git a/tests/join.rs b/tests/join.rs index 1d131af..17312a4 100644 --- a/tests/join.rs +++ b/tests/join.rs @@ -9,7 +9,7 @@ use std::time::Duration; use async_task::Runnable; use easy_parallel::Parallel; -use futures_lite::future; +use smol::future; // Creates a future with event counters. // diff --git a/tests/panic.rs b/tests/panic.rs index 8473343..f38fb3e 100644 --- a/tests/panic.rs +++ b/tests/panic.rs @@ -8,7 +8,7 @@ use std::time::Duration; use async_task::Runnable; use easy_parallel::Parallel; -use futures_lite::future; +use smol::future; // Creates a future with event counters. // diff --git a/tests/ready.rs b/tests/ready.rs index 5a85d6f..ebfdc63 100644 --- a/tests/ready.rs +++ b/tests/ready.rs @@ -7,7 +7,7 @@ use std::time::Duration; use async_task::Runnable; use easy_parallel::Parallel; -use futures_lite::future; +use smol::future; // Creates a future with event counters. // diff --git a/tests/waker_panic.rs b/tests/waker_panic.rs index 76a044d..68b0a81 100644 --- a/tests/waker_panic.rs +++ b/tests/waker_panic.rs @@ -10,7 +10,7 @@ use std::time::Duration; use async_task::Runnable; use atomic_waker::AtomicWaker; use easy_parallel::Parallel; -use futures_lite::future; +use smol::future; // Creates a future with event counters. //