mirror of https://github.com/smol-rs/async-task
Add panic propagation (#37)
* Add panic propogation * Make propagation a builder option
This commit is contained in:
parent
e2493422e5
commit
2e0a196eef
|
@ -23,6 +23,7 @@ atomic-waker = "1"
|
|||
easy-parallel = "3"
|
||||
flaky_test = "0.1"
|
||||
flume = { version = "0.10", default-features = false }
|
||||
futures-lite = "1.12.0"
|
||||
once_cell = "1"
|
||||
smol = "1"
|
||||
|
||||
|
|
|
@ -31,6 +31,10 @@ pub(crate) struct Header<M> {
|
|||
///
|
||||
/// This metadata may be provided to the user.
|
||||
pub(crate) metadata: M,
|
||||
|
||||
/// Whether or not a panic that occurs in the task should be propagated.
|
||||
#[cfg(feature = "std")]
|
||||
pub(crate) propagate_panic: bool,
|
||||
}
|
||||
|
||||
impl<M> Header<M> {
|
||||
|
|
46
src/raw.rs
46
src/raw.rs
|
@ -13,6 +13,12 @@ use crate::state::*;
|
|||
use crate::utils::{abort, abort_on_panic, max, Layout};
|
||||
use crate::Runnable;
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
pub(crate) type Panic = alloc::boxed::Box<dyn core::any::Any + Send + 'static>;
|
||||
|
||||
#[cfg(not(feature = "std"))]
|
||||
pub(crate) type Panic = core::convert::Infallible;
|
||||
|
||||
/// The vtable for a task.
|
||||
pub(crate) struct TaskVTable {
|
||||
/// Schedules the task.
|
||||
|
@ -76,7 +82,7 @@ pub(crate) struct RawTask<F, T, S, M> {
|
|||
pub(crate) future: *mut F,
|
||||
|
||||
/// The output of the future.
|
||||
pub(crate) output: *mut T,
|
||||
pub(crate) output: *mut Result<T, Panic>,
|
||||
}
|
||||
|
||||
impl<F, T, S, M> Copy for RawTask<F, T, S, M> {}
|
||||
|
@ -97,7 +103,7 @@ impl<F, T, S, M> RawTask<F, T, S, M> {
|
|||
let layout_header = Layout::new::<Header<M>>();
|
||||
let layout_s = Layout::new::<S>();
|
||||
let layout_f = Layout::new::<F>();
|
||||
let layout_r = Layout::new::<T>();
|
||||
let layout_r = Layout::new::<Result<T, Panic>>();
|
||||
|
||||
// Compute the layout for `union { F, T }`.
|
||||
let size_union = max(layout_f.size(), layout_r.size());
|
||||
|
@ -138,7 +144,7 @@ where
|
|||
pub(crate) fn allocate<'a, Gen: FnOnce(&'a M) -> F>(
|
||||
future: Gen,
|
||||
schedule: S,
|
||||
metadata: M,
|
||||
builder: crate::Builder<M>,
|
||||
) -> NonNull<()>
|
||||
where
|
||||
F: 'a,
|
||||
|
@ -158,6 +164,12 @@ where
|
|||
|
||||
let raw = Self::from_ptr(ptr.as_ptr());
|
||||
|
||||
let crate::Builder {
|
||||
metadata,
|
||||
#[cfg(feature = "std")]
|
||||
propagate_panic,
|
||||
} = builder;
|
||||
|
||||
// Write the header as the first field of the task.
|
||||
(raw.header as *mut Header<M>).write(Header {
|
||||
state: AtomicUsize::new(SCHEDULED | TASK | REFERENCE),
|
||||
|
@ -173,6 +185,8 @@ where
|
|||
layout_info: &Self::TASK_LAYOUT,
|
||||
},
|
||||
metadata,
|
||||
#[cfg(feature = "std")]
|
||||
propagate_panic,
|
||||
});
|
||||
|
||||
// Write the schedule function as the third field of the task.
|
||||
|
@ -199,7 +213,7 @@ where
|
|||
header: p as *const Header<M>,
|
||||
schedule: p.add(task_layout.offset_s) as *const S,
|
||||
future: p.add(task_layout.offset_f) as *mut F,
|
||||
output: p.add(task_layout.offset_r) as *mut T,
|
||||
output: p.add(task_layout.offset_r) as *mut Result<T, Panic>,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -525,8 +539,30 @@ where
|
|||
|
||||
// Poll the inner future, but surround it with a guard that closes the task in case polling
|
||||
// panics.
|
||||
// If available, we should also try to catch the panic so that it is propagated correctly.
|
||||
let guard = Guard(raw);
|
||||
let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
|
||||
|
||||
// Panic propagation is not available for no_std.
|
||||
#[cfg(not(feature = "std"))]
|
||||
let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok);
|
||||
|
||||
#[cfg(feature = "std")]
|
||||
let poll = {
|
||||
// Check if we should propagate panics.
|
||||
if (*raw.header).propagate_panic {
|
||||
// Use catch_unwind to catch the panic.
|
||||
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
|
||||
<F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx)
|
||||
})) {
|
||||
Ok(Poll::Ready(v)) => Poll::Ready(Ok(v)),
|
||||
Ok(Poll::Pending) => Poll::Pending,
|
||||
Err(e) => Poll::Ready(Err(e)),
|
||||
}
|
||||
} else {
|
||||
<F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx).map(Ok)
|
||||
}
|
||||
};
|
||||
|
||||
mem::forget(guard);
|
||||
|
||||
match poll {
|
||||
|
|
|
@ -17,7 +17,11 @@ use crate::Task;
|
|||
#[derive(Debug)]
|
||||
pub struct Builder<M> {
|
||||
/// The metadata associated with the task.
|
||||
metadata: M,
|
||||
pub(crate) metadata: M,
|
||||
|
||||
/// Whether or not a panic that occurs in the task should be propagated.
|
||||
#[cfg(feature = "std")]
|
||||
pub(crate) propagate_panic: bool,
|
||||
}
|
||||
|
||||
impl<M: Default> Default for Builder<M> {
|
||||
|
@ -40,7 +44,11 @@ impl Builder<()> {
|
|||
/// let (runnable, task) = Builder::new().spawn(|()| async {}, |_| {});
|
||||
/// ```
|
||||
pub fn new() -> Builder<()> {
|
||||
Builder { metadata: () }
|
||||
Builder {
|
||||
metadata: (),
|
||||
#[cfg(feature = "std")]
|
||||
propagate_panic: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Adds metadata to the task.
|
||||
|
@ -123,11 +131,63 @@ impl Builder<()> {
|
|||
/// # });
|
||||
/// ```
|
||||
pub fn metadata<M>(self, metadata: M) -> Builder<M> {
|
||||
Builder { metadata }
|
||||
Builder {
|
||||
metadata,
|
||||
#[cfg(feature = "std")]
|
||||
propagate_panic: self.propagate_panic,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<M> Builder<M> {
|
||||
/// Propagates panics that occur in the task.
|
||||
///
|
||||
/// When this is `true`, panics that occur in the task will be propagated to the caller of
|
||||
/// the [`Task`]. When this is false, no special action is taken when a panic occurs in the
|
||||
/// task, meaning that the caller of [`Runnable::run`] will observe a panic.
|
||||
///
|
||||
/// This is only available when the `std` feature is enabled. By default, this is `false`.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_task::Builder;
|
||||
/// use futures_lite::future::poll_fn;
|
||||
/// use std::future::Future;
|
||||
/// use std::panic;
|
||||
/// use std::pin::Pin;
|
||||
/// use std::task::{Context, Poll};
|
||||
///
|
||||
/// fn did_panic<F: FnOnce()>(f: F) -> bool {
|
||||
/// panic::catch_unwind(panic::AssertUnwindSafe(f)).is_err()
|
||||
/// }
|
||||
///
|
||||
/// # smol::future::block_on(async {
|
||||
/// let (runnable1, mut task1) = Builder::new()
|
||||
/// .propagate_panic(true)
|
||||
/// .spawn(|()| async move { panic!() }, |_| {});
|
||||
///
|
||||
/// let (runnable2, mut task2) = Builder::new()
|
||||
/// .propagate_panic(false)
|
||||
/// .spawn(|()| async move { panic!() }, |_| {});
|
||||
///
|
||||
/// assert!(!did_panic(|| { runnable1.run(); }));
|
||||
/// assert!(did_panic(|| { runnable2.run(); }));
|
||||
///
|
||||
/// let waker = poll_fn(|cx| Poll::Ready(cx.waker().clone())).await;
|
||||
/// let mut cx = Context::from_waker(&waker);
|
||||
/// assert!(did_panic(|| { let _ = Pin::new(&mut task1).poll(&mut cx); }));
|
||||
/// assert!(did_panic(|| { let _ = Pin::new(&mut task2).poll(&mut cx); }));
|
||||
/// # });
|
||||
/// ```
|
||||
#[cfg(feature = "std")]
|
||||
pub fn propagate_panic(self, propagate_panic: bool) -> Builder<M> {
|
||||
Builder {
|
||||
metadata: self.metadata,
|
||||
propagate_panic,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new task.
|
||||
///
|
||||
/// The returned [`Runnable`] is used to poll the `future`, and the [`Task`] is used to await its
|
||||
|
@ -313,8 +373,6 @@ impl<M> Builder<M> {
|
|||
S: Fn(Runnable<M>),
|
||||
M: 'a,
|
||||
{
|
||||
let Self { metadata } = self;
|
||||
|
||||
// Allocate large futures on the heap.
|
||||
let ptr = if mem::size_of::<Fut>() >= 2048 {
|
||||
let future = |meta| {
|
||||
|
@ -322,9 +380,9 @@ impl<M> Builder<M> {
|
|||
Box::pin(future)
|
||||
};
|
||||
|
||||
RawTask::<_, Fut::Output, S, M>::allocate(future, schedule, metadata)
|
||||
RawTask::<_, Fut::Output, S, M>::allocate(future, schedule, self)
|
||||
} else {
|
||||
RawTask::<Fut, Fut::Output, S, M>::allocate(future, schedule, metadata)
|
||||
RawTask::<Fut, Fut::Output, S, M>::allocate(future, schedule, self)
|
||||
};
|
||||
|
||||
let runnable = Runnable {
|
||||
|
|
27
src/task.rs
27
src/task.rs
|
@ -8,6 +8,7 @@ use core::sync::atomic::Ordering;
|
|||
use core::task::{Context, Poll};
|
||||
|
||||
use crate::header::Header;
|
||||
use crate::raw::Panic;
|
||||
use crate::state::*;
|
||||
|
||||
/// A spawned task.
|
||||
|
@ -226,7 +227,7 @@ impl<T, M> Task<T, M> {
|
|||
}
|
||||
|
||||
/// Puts the task in detached state.
|
||||
fn set_detached(&mut self) -> Option<T> {
|
||||
fn set_detached(&mut self) -> Option<Result<T, Panic>> {
|
||||
let ptr = self.ptr.as_ptr();
|
||||
let header = ptr as *const Header<M>;
|
||||
|
||||
|
@ -256,8 +257,10 @@ impl<T, M> Task<T, M> {
|
|||
) {
|
||||
Ok(_) => {
|
||||
// Read the output.
|
||||
output =
|
||||
Some((((*header).vtable.get_output)(ptr) as *mut T).read());
|
||||
output = Some(
|
||||
(((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>)
|
||||
.read(),
|
||||
);
|
||||
|
||||
// Update the state variable because we're continuing the loop.
|
||||
state |= CLOSED;
|
||||
|
@ -382,8 +385,22 @@ impl<T, M> Task<T, M> {
|
|||
}
|
||||
|
||||
// Take the output from the task.
|
||||
let output = ((*header).vtable.get_output)(ptr) as *mut T;
|
||||
return Poll::Ready(Some(output.read()));
|
||||
let output = ((*header).vtable.get_output)(ptr) as *mut Result<T, Panic>;
|
||||
let output = output.read();
|
||||
|
||||
// Propagate the panic if the task panicked.
|
||||
let output = match output {
|
||||
Ok(output) => output,
|
||||
Err(panic) => {
|
||||
#[cfg(feature = "std")]
|
||||
std::panic::resume_unwind(panic);
|
||||
|
||||
#[cfg(not(feature = "std"))]
|
||||
match panic {}
|
||||
}
|
||||
};
|
||||
|
||||
return Poll::Ready(Some(output));
|
||||
}
|
||||
Err(s) => state = s,
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue