More docs and small fixes

This commit is contained in:
Stjepan Glavina 2020-09-20 02:12:51 +02:00
parent 59a5a260c6
commit c5604c3eff
7 changed files with 164 additions and 67 deletions

View File

@ -18,7 +18,7 @@ where
T: 'static,
{
// Create a task that is scheduled by pushing itself into the queue.
let schedule = |t| QUEUE.with(|(s, _)| s.send(t).unwrap());
let schedule = |runnable| QUEUE.with(|(s, _)| s.send(runnable).unwrap());
let (runnable, task) = async_task::spawn_local(future, schedule);
// Schedule the task by pushing it into the queue.

View File

@ -28,7 +28,7 @@ where
};
// Create a task that is scheduled by sending it into the channel.
let schedule = move |t| s.upgrade().unwrap().send(t).unwrap();
let schedule = move |runnable| s.upgrade().unwrap().send(runnable).unwrap();
let (runnable, task) = async_task::spawn(future, schedule);
// Schedule the task by sending it into the channel.

View File

@ -21,7 +21,7 @@ where
// Start the executor thread.
thread::spawn(|| {
for runnable in receiver {
// Ignore panics for simplicity.
// Ignore panics inside futures.
let _ignore_panic = catch_unwind(|| runnable.run());
}
});
@ -30,7 +30,7 @@ where
});
// Create a task that is scheduled by pushing it into the queue.
let schedule = |t| QUEUE.send(t).unwrap();
let schedule = |runnable| QUEUE.send(runnable).unwrap();
let (runnable, task) = async_task::spawn(future, schedule);
// Schedule the task by pushing it into the queue.

View File

@ -34,6 +34,16 @@ impl Header {
/// If the awaiter is the same as the current waker, it will not be notified.
#[inline]
pub(crate) fn notify(&self, current: Option<&Waker>) {
if let Some(w) = self.take(current) {
abort_on_panic(|| w.wake());
}
}
/// Takes the awaiter blocked on this task.
///
/// If there is no awaiter or if it is the same as the current waker, returns `None`.
#[inline]
pub(crate) fn take(&self, current: Option<&Waker>) -> Option<Waker> {
// Set the bit indicating that the task is notifying its awaiter.
let state = self.state.fetch_or(NOTIFYING, Ordering::AcqRel);
@ -48,14 +58,15 @@ impl Header {
// Finally, notify the waker if it's different from the current waker.
if let Some(w) = waker {
// We need a safeguard against panics because waking can panic.
abort_on_panic(|| match current {
None => w.wake(),
Some(c) if !w.will_wake(c) => w.wake(),
Some(_) => {}
});
match current {
None => return Some(w),
Some(c) if !w.will_wake(c) => return Some(w),
Some(_) => abort_on_panic(|| drop(w)),
}
}
}
None
}
/// Registers a new awaiter blocked on this task.

View File

@ -451,13 +451,19 @@ where
// Mark the task as unscheduled.
let state = (*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
// Notify the awaiter that the future has been dropped.
// Take the awaiter out.
let mut awaiter = None;
if state & AWAITER != 0 {
(*raw.header).notify(None);
awaiter = (*raw.header).take(None);
}
// Drop the task reference.
Self::drop_ref(ptr);
// Notify the awaiter that the future has been dropped.
if let Some(w) = awaiter {
abort_on_panic(|| w.wake());
}
return false;
}
@ -489,9 +495,6 @@ where
Self::drop_future(ptr);
raw.output.write(out);
// A place where the output will be stored in case it needs to be dropped.
let mut output = None;
// The task is now completed.
loop {
// If the `Task` is dropped, we'll need to close it and drop the output.
@ -512,25 +515,28 @@ where
// If the `Task` is dropped or if the task was closed while running,
// now it's time to drop the output.
if state & TASK == 0 || state & CLOSED != 0 {
// Read the output.
output = Some(raw.output.read());
// Drop the output.
abort_on_panic(|| raw.output.drop_in_place());
}
// Notify the awaiter that the task has been completed.
// Take the awaiter out.
let mut awaiter = None;
if state & AWAITER != 0 {
(*raw.header).notify(None);
awaiter = (*raw.header).take(None);
}
// Drop the task reference.
Self::drop_ref(ptr);
// Notify the awaiter that the future has been dropped.
if let Some(w) = awaiter {
abort_on_panic(|| w.wake());
}
break;
}
Err(s) => state = s,
}
}
// Drop the output if it was taken out of the task.
drop(output);
}
Poll::Pending => {
let mut future_dropped = false;
@ -564,12 +570,19 @@ where
// If the task was woken up while running, we need to schedule it.
// Otherwise, we just drop the task reference.
if state & CLOSED != 0 {
// Notify the awaiter that the future has been dropped.
// Take the awaiter out.
let mut awaiter = None;
if state & AWAITER != 0 {
(*raw.header).notify(None);
awaiter = (*raw.header).take(None);
}
// Drop the task reference.
Self::drop_ref(ptr);
// Notify the awaiter that the future has been dropped.
if let Some(w) = awaiter {
abort_on_panic(|| w.wake());
}
} else if state & SCHEDULED != 0 {
// The thread that woke the task up didn't reschedule it because
// it was running so now it's our responsibility to do so.
@ -620,13 +633,19 @@ where
.state
.fetch_and(!RUNNING & !SCHEDULED, Ordering::AcqRel);
// Notify the awaiter that the future has been dropped.
// Take the awaiter out.
let mut awaiter = None;
if state & AWAITER != 0 {
(*raw.header).notify(None);
awaiter = (*raw.header).take(None);
}
// Drop the task reference.
RawTask::<F, T, S>::drop_ref(ptr);
// Notify the awaiter that the future has been dropped.
if let Some(w) = awaiter {
abort_on_panic(|| w.wake());
}
break;
}
@ -641,13 +660,19 @@ where
// Drop the future because the task is now closed.
RawTask::<F, T, S>::drop_future(ptr);
// Notify the awaiter that the future has been dropped.
// Take the awaiter out.
let mut awaiter = None;
if state & AWAITER != 0 {
(*raw.header).notify(None);
awaiter = (*raw.header).take(None);
}
// Drop the task reference.
RawTask::<F, T, S>::drop_ref(ptr);
// Notify the awaiter that the future has been dropped.
if let Some(w) = awaiter {
abort_on_panic(|| w.wake());
}
break;
}
Err(s) => state = s,

View File

@ -16,8 +16,9 @@ use crate::Task;
/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
/// output.
///
/// Method [`Runnable::run()`] polls the `future` once. Then, the [`Runnable`] vanishes and
/// only reappears when its [`Waker`] wakes the task, thus scheduling it to be run again.
/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
/// again.
///
/// When the task is woken, its [`Runnable`] is passed to the `schedule` function.
/// The `schedule` function should not attempt to run the [`Runnable`] nor to drop it. Instead, it
@ -191,30 +192,44 @@ where
(runnable, task)
}
/// A task reference that runs its future.
/// A handle to a runnable task.
///
/// At any moment in time, there is at most one [`Runnable`] reference associated with a particular
/// task. Running consumes the [`Runnable`] reference and polls its internal future. If the future
/// is still pending after getting polled, the [`Runnable`] reference simply won't exist until a
/// [`Waker`] notifies the task. If the future completes, its result becomes available to the
/// [`Task`].
/// Every spawned task has a single [`Runnable`] handle, which only exists when the task is
/// scheduled for running.
///
/// When a task is woken up, its [`Runnable`] reference is recreated and passed to the schedule
/// function. In most executors, scheduling simply pushes the [`Runnable`] reference into a queue
/// of runnable tasks.
/// Method [`run()`][`Runnable::run()`] polls the task's future once. Then, the [`Runnable`]
/// vanishes and only reappears when its [`Waker`] wakes the task, thus scheduling it to be run
/// again.
///
/// If the [`Runnable`] reference is dropped without getting run, the task is automatically
/// canceled. When canceled, the task won't be scheduled again even if a [`Waker`] wakes it. It is
/// possible for the [`Task`] to cancel while the [`Runnable`] reference exists, in which
/// case an attempt to run the task won't do anything.
/// Dropping a [`Runnable`] cancels the task, which means its future won't be polled again, and
/// awaiting the [`Task`] after that will result in a panic.
///
/// ----------------
/// # Examples
///
/// A runnable future, ready for execution.
/// ```
/// use async_task::Runnable;
/// use once_cell::sync::Lazy;
/// use std::{panic, thread};
///
/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken. When it's
/// woken up, its schedule function is called, which means the `Runnable` gets pushed into a task
/// queue in an executor.
/// // A simple executor.
/// static QUEUE: Lazy<flume::Sender<Runnable>> = Lazy::new(|| {
/// let (sender, receiver) = flume::unbounded::<Runnable>();
/// thread::spawn(|| {
/// for runnable in receiver {
/// let _ignore_panic = panic::catch_unwind(|| runnable.run());
/// }
/// });
/// sender
/// });
///
/// // Create a task with a simple future.
/// let schedule = |runnable| QUEUE.send(runnable).unwrap();
/// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
///
/// // Schedule the task and await its output.
/// runnable.schedule();
/// assert_eq!(smol::future::block_on(task), 3);
/// ```
pub struct Runnable {
/// A pointer to the heap-allocated task.
pub(crate) ptr: NonNull<()>,
@ -231,10 +246,23 @@ impl std::panic::RefUnwindSafe for Runnable {}
impl Runnable {
/// Schedules the task.
///
/// This is a convenience method that simply reschedules the task by passing it to its schedule
/// function.
/// This is a convenience method that passes the [`Runnable`] to the schedule function.
///
/// If the task is canceled, this method won't do anything.
/// # Examples
///
/// ```
/// // A function that schedules the task when it gets woken up.
/// let (s, r) = flume::unbounded();
/// let schedule = move |runnable| s.send(runnable).unwrap();
///
/// // Create a task with a simple future and the schedule function.
/// let (runnable, task) = async_task::spawn(async {}, schedule);
///
/// // Schedule the task.
/// assert_eq!(r.len(), 0);
/// runnable.schedule();
/// assert_eq!(r.len(), 1);
/// ```
pub fn schedule(self) {
let ptr = self.ptr.as_ptr();
let header = ptr as *const Header;
@ -245,22 +273,33 @@ impl Runnable {
}
}
/// Runs the task.
/// Runs the task by polling its future.
///
/// Returns `true` if the task was woken while running, in which case it gets rescheduled at
/// the end of this method invocation.
/// Returns `true` if the task was woken while running, in which case the [`Runnable`] gets
/// rescheduled at the end of this method invocation.
///
/// This method polls the task's future. If the future completes, its result will become
/// available to the [`Task`]. And if the future is still pending, the task will have to
/// be woken up in order to be rescheduled and run again.
/// Otherwise, returns `true` and the [`Runnable`] vanishes until the task is woken.
///
/// If the task was canceled by a [`Task`] before it gets run, then this method won't do
/// anything.
/// If the [`Task`] handle was dropped or if [`cancel()`][`Task::cancel()`] was called, then
/// this method simply destroys the task.
///
/// It is possible that polling the future panics, in which case the panic will be propagated
/// into the caller. It is advised that invocations of this method are wrapped inside
/// [`catch_unwind`][`std::panic::catch_unwind`]. If a panic occurs, the task is automatically
/// canceled.
/// If the polled future panics, this method propagates the panic, and awaiting the [`Task`]
/// after that will also result in a panic.
///
/// # Examples
///
/// ```
/// // A function that schedules the task when it gets woken up.
/// let (s, r) = flume::unbounded();
/// let schedule = move |runnable| s.send(runnable).unwrap();
///
/// // Create a task with a simple future and the schedule function.
/// let (runnable, task) = async_task::spawn(async { 1 + 2 }, schedule);
///
/// // Run the task and check its output.
/// runnable.run();
/// assert_eq!(smol::future::block_on(task), 3);
/// ```
pub fn run(self) -> bool {
let ptr = self.ptr.as_ptr();
let header = ptr as *const Header;
@ -270,6 +309,28 @@ impl Runnable {
}
/// Returns a waker associated with this task.
///
/// # Examples
///
/// ```
/// use smol::future;
///
/// // A function that schedules the task when it gets woken up.
/// let (s, r) = flume::unbounded();
/// let schedule = move |runnable| s.send(runnable).unwrap();
///
/// // Create a task with a simple future and the schedule function.
/// let (runnable, task) = async_task::spawn(future::pending::<()>(), schedule);
///
/// // Take a waker and run the task.
/// let waker = runnable.waker();
/// runnable.run();
///
/// // Reschedule the task by waking it.
/// assert_eq!(r.len(), 0);
/// waker.wake();
/// assert_eq!(r.len(), 1);
/// ```
pub fn waker(&self) -> Waker {
let ptr = self.ptr.as_ptr();
let header = ptr as *const Header;

View File

@ -14,10 +14,10 @@ use crate::state::*;
///
/// A [`Task`] can be awaited to retrieve the output of its future.
///
/// Dropping a [`Task`] cancels it, which means its future won't be polled again.
/// To drop the [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead.
/// To cancel a task gracefully and wait until it is fully destroyed, use the
/// [`cancel()`][Task::cancel()] method.
/// Dropping a [`Task`] cancels it, which means its future won't be polled again. To drop the
/// [`Task`] handle without canceling it, use [`detach()`][`Task::detach()`] instead. To cancel a
/// task gracefully and wait until it is fully destroyed, use the [`cancel()`][Task::cancel()]
/// method.
///
/// Note that canceling a task actually wakes it and reschedules one last time. Then, the executor
/// can destroy the task by simply dropping its [`Runnable`][`crate::Runnable`] or by invoking