From 6f2b0b8a49aa348a091a21b65201973f425c8d71 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sat, 19 Sep 2020 22:38:11 +0200 Subject: [PATCH] Make executors scoped --- Cargo.toml | 3 +- examples/priority.rs | 14 +-- src/lib.rs | 257 ++++++++++++------------------------------- tests/drop.rs | 46 ++++++++ 4 files changed, 127 insertions(+), 193 deletions(-) create mode 100644 tests/drop.rs diff --git a/Cargo.toml b/Cargo.toml index 4a23147..89dba96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,11 +13,12 @@ categories = ["asynchronous", "concurrency"] readme = "README.md" [dependencies] -async-task = "3.0.0" +async-task = { path = "../async-task" } concurrent-queue = "1.2.2" fastrand = "1.3.4" futures-lite = "1.0.0" once_cell = "1.4.1" +vec-arena = "1.0.0" [dev-dependencies] async-channel = "1.4.1" diff --git a/examples/priority.rs b/examples/priority.rs index ef62324..a30ad97 100644 --- a/examples/priority.rs +++ b/examples/priority.rs @@ -18,23 +18,23 @@ enum Priority { /// An executor with task priorities. /// /// Tasks with lower priorities only get polled when there are no tasks with higher priorities. -struct PriorityExecutor { - ex: [Executor; 3], +struct PriorityExecutor<'a> { + ex: [Executor<'a>; 3], } -impl PriorityExecutor { +impl<'a> PriorityExecutor<'a> { /// Creates a new executor. - const fn new() -> PriorityExecutor { + const fn new() -> PriorityExecutor<'a> { PriorityExecutor { ex: [Executor::new(), Executor::new(), Executor::new()], } } /// Spawns a task with the given priority. - fn spawn( + fn spawn( &self, priority: Priority, - future: impl Future + Send + 'static, + future: impl Future + Send + 'a, ) -> Task { self.ex[priority as usize].spawn(future) } @@ -59,7 +59,7 @@ impl PriorityExecutor { } fn main() { - static EX: PriorityExecutor = PriorityExecutor::new(); + static EX: PriorityExecutor<'_> = PriorityExecutor::new(); // Spawn a thread running the executor forever. thread::spawn(|| future::block_on(EX.run())); diff --git a/src/lib.rs b/src/lib.rs index 83195af..2647a89 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -18,160 +18,23 @@ //! future::block_on(ex.run(task)); //! ``` -#![forbid(unsafe_code)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] use std::future::Future; use std::marker::PhantomData; use std::panic::{RefUnwindSafe, UnwindSafe}; -use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; -use std::task::{Context, Poll, Waker}; +use std::task::{Poll, Waker}; +use async_task::Runnable; use concurrent_queue::ConcurrentQueue; use futures_lite::{future, FutureExt}; +use vec_arena::Arena; -/// A runnable future, ready for execution. -/// -/// When a future is internally spawned using `async_task::spawn()` or `async_task::spawn_local()`, -/// we get back two values: -/// -/// 1. an `async_task::Task<()>`, which we refer to as a `Runnable` -/// 2. an `async_task::JoinHandle`, which is wrapped inside a `Task` -/// -/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken. When it's -/// woken up, its schedule function is called, which means the `Runnable` gets pushed into a task -/// queue in an executor. -type Runnable = async_task::Task<()>; - -/// A spawned future. -/// -/// Tasks are also futures themselves and yield the output of the spawned future. -/// -/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit -/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method. -/// -/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic. -/// -/// If a task panics, the panic will be thrown into the [`Executor::run()`] or -/// [`LocalExecutor::run()`] invocation that polled it. -/// -/// # Examples -/// -/// ``` -/// use async_executor::Executor; -/// use futures_lite::future; -/// use std::thread; -/// -/// let ex = Executor::new(); -/// -/// // Spawn a future onto the executor. -/// let task = ex.spawn(async { -/// println!("Hello from a task!"); -/// 1 + 2 -/// }); -/// -/// // Run an executor thread. -/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); -/// -/// // Wait for the result. -/// assert_eq!(future::block_on(task), 3); -/// ``` -#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"] -#[derive(Debug)] -pub struct Task(Option>); - -impl UnwindSafe for Task {} -impl RefUnwindSafe for Task {} - -impl Task { - /// Detaches the task to let it keep running in the background. - /// - /// # Examples - /// - /// ``` - /// use async_executor::Executor; - /// use async_io::Timer; - /// use std::time::Duration; - /// - /// let ex = Executor::new(); - /// - /// // Spawn a deamon future. - /// ex.spawn(async { - /// loop { - /// println!("I'm a daemon task looping forever."); - /// Timer::after(Duration::from_secs(1)).await; - /// } - /// }) - /// .detach(); - /// ``` - pub fn detach(mut self) { - self.0.take().unwrap(); - } - - /// Cancels the task and waits for it to stop running. - /// - /// Returns the task's output if it was completed just before it got canceled, or [`None`] if - /// it didn't complete. - /// - /// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of - /// canceling because it also waits for the task to stop running. - /// - /// # Examples - /// - /// ``` - /// use async_executor::Executor; - /// use async_io::Timer; - /// use futures_lite::future; - /// use std::thread; - /// use std::time::Duration; - /// - /// let ex = Executor::new(); - /// - /// // Spawn a deamon future. - /// let task = ex.spawn(async { - /// loop { - /// println!("Even though I'm in an infinite loop, you can still cancel me!"); - /// Timer::after(Duration::from_secs(1)).await; - /// } - /// }); - /// - /// // Run an executor thread. - /// thread::spawn(move || future::block_on(ex.run(future::pending::<()>()))); - /// - /// future::block_on(async { - /// Timer::after(Duration::from_secs(3)).await; - /// task.cancel().await; - /// }); - /// ``` - pub async fn cancel(self) -> Option { - let mut task = self; - let handle = task.0.take().unwrap(); - handle.cancel(); - handle.await - } -} - -impl Drop for Task { - fn drop(&mut self) { - if let Some(handle) = &self.0 { - handle.cancel(); - } - } -} - -impl Future for Task { - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.0.as_mut().unwrap()).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(output) => Poll::Ready(output.expect("task has failed")), - } - } -} +#[doc(no_inline)] +pub use async_task::Task; /// The state of a executor. #[derive(Debug)] @@ -187,6 +50,8 @@ struct State { /// A list of sleeping tickers. sleepers: Mutex, + + active: Mutex>, } impl State { @@ -201,6 +66,7 @@ impl State { wakers: Vec::new(), free_ids: Vec::new(), }), + active: Mutex::new(Arena::new()), } } @@ -321,14 +187,18 @@ impl Sleepers { /// })); /// ``` #[derive(Debug)] -pub struct Executor { +pub struct Executor<'a> { state: once_cell::sync::OnceCell>, + _marker: PhantomData>, } -impl UnwindSafe for Executor {} -impl RefUnwindSafe for Executor {} +unsafe impl Send for Executor<'_> {} +unsafe impl Sync for Executor<'_> {} -impl Executor { +impl UnwindSafe for Executor<'_> {} +impl RefUnwindSafe for Executor<'_> {} + +impl<'a> Executor<'a> { /// Creates a new executor. /// /// # Examples @@ -338,9 +208,10 @@ impl Executor { /// /// let ex = Executor::new(); /// ``` - pub const fn new() -> Executor { + pub const fn new() -> Executor<'a> { Executor { state: once_cell::sync::OnceCell::new(), + _marker: PhantomData, } } @@ -357,14 +228,8 @@ impl Executor { /// println!("Hello world"); /// }); /// ``` - pub fn spawn( - &self, - future: impl Future + Send + 'static, - ) -> Task { - // Create a task, push it into the queue by scheduling it, and return its `Task` handle. - let (runnable, handle) = async_task::spawn(future, self.schedule(), ()); - runnable.schedule(); - Task(Some(handle)) + pub fn spawn(&self, future: impl Future + Send + 'a) -> Task { + unsafe { self.spawn_unchecked(future) } } /// Attempts to run a task if at least one is scheduled. @@ -472,16 +337,42 @@ impl Executor { fn state(&self) -> &Arc { self.state.get_or_init(|| Arc::new(State::new())) } -} -impl Drop for Executor { - fn drop(&mut self) { - // TODO(stjepang): Cancel all remaining tasks. + unsafe fn spawn_unchecked(&self, future: impl Future) -> Task { + let mut active = self.state().active.lock().unwrap(); + let index = active.next_vacant(); + let state = self.state().clone(); + let future = async move { + let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(index))); + future.await + }; + + let (runnable, task) = async_task::spawn_unchecked(future, self.schedule()); + active.insert(runnable.waker()); + runnable.schedule(); + task } } -impl Default for Executor { - fn default() -> Executor { +impl Drop for Executor<'_> { + fn drop(&mut self) { + if let Some(state) = self.state.get() { + { + let mut state = state.active.lock().unwrap(); + for i in 0..state.capacity() { + if let Some(w) = state.remove(i) { + w.wake(); + } + } + } + + while state.queue.pop().is_ok() {} + } + } +} + +impl<'a> Default for Executor<'a> { + fn default() -> Executor<'a> { Executor::new() } } @@ -750,18 +641,18 @@ fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { /// })); /// ``` #[derive(Debug)] -pub struct LocalExecutor { +pub struct LocalExecutor<'a> { /// The inner executor. - inner: once_cell::unsync::OnceCell, + inner: once_cell::unsync::OnceCell>, /// Make sure the type is `!Send` and `!Sync`. _marker: PhantomData>, } -impl UnwindSafe for LocalExecutor {} -impl RefUnwindSafe for LocalExecutor {} +impl UnwindSafe for LocalExecutor<'_> {} +impl RefUnwindSafe for LocalExecutor<'_> {} -impl LocalExecutor { +impl<'a> LocalExecutor<'a> { /// Creates a single-threaded executor. /// /// # Examples @@ -771,7 +662,7 @@ impl LocalExecutor { /// /// let local_ex = LocalExecutor::new(); /// ``` - pub const fn new() -> LocalExecutor { + pub const fn new() -> LocalExecutor<'a> { LocalExecutor { inner: once_cell::unsync::OnceCell::new(), _marker: PhantomData, @@ -791,11 +682,8 @@ impl LocalExecutor { /// println!("Hello world"); /// }); /// ``` - pub fn spawn(&self, future: impl Future + 'static) -> Task { - // Create a task, push it into the queue by scheduling it, and return its `Task` handle. - let (runnable, handle) = async_task::spawn_local(future, self.schedule(), ()); - runnable.schedule(); - Task(Some(handle)) + pub fn spawn(&self, future: impl Future + 'a) -> Task { + unsafe { self.inner().spawn_unchecked(future) } } /// Attempts to run a task if at least one is scheduled. @@ -861,24 +749,23 @@ impl LocalExecutor { self.inner().run(future).await } - /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { - let state = self.inner().state().clone(); - - move |runnable| { - state.queue.push(runnable).unwrap(); - state.notify(); - } - } - /// Returns a reference to the inner executor. - fn inner(&self) -> &Executor { + fn inner(&self) -> &Executor<'a> { self.inner.get_or_init(|| Executor::new()) } } -impl Default for LocalExecutor { - fn default() -> LocalExecutor { +impl<'a> Default for LocalExecutor<'a> { + fn default() -> LocalExecutor<'a> { LocalExecutor::new() } } + +/// Runs a closure when dropped. +struct CallOnDrop(F); + +impl Drop for CallOnDrop { + fn drop(&mut self) { + (self.0)(); + } +} diff --git a/tests/drop.rs b/tests/drop.rs new file mode 100644 index 0000000..6dc2b49 --- /dev/null +++ b/tests/drop.rs @@ -0,0 +1,46 @@ +use std::panic::catch_unwind; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Mutex; +use std::task::{Poll, Waker}; + +use async_executor::Executor; +use futures_lite::future; +use once_cell::sync::Lazy; + +#[test] +fn smoke() { + static DROP: AtomicUsize = AtomicUsize::new(0); + static WAKER: Lazy>> = Lazy::new(|| Default::default()); + + let ex = Executor::new(); + + let task = ex.spawn(async { + let _guard = CallOnDrop(|| { + DROP.fetch_add(1, Ordering::SeqCst); + }); + + future::poll_fn(|cx| { + *WAKER.lock().unwrap() = Some(cx.waker().clone()); + Poll::Pending::<()> + }) + .await; + }); + + future::block_on(ex.tick()); + assert!(WAKER.lock().unwrap().is_some()); + assert_eq!(DROP.load(Ordering::SeqCst), 0); + + drop(ex); + assert_eq!(DROP.load(Ordering::SeqCst), 1); + + assert!(catch_unwind(|| future::block_on(task)).is_err()); + assert_eq!(DROP.load(Ordering::SeqCst), 1); +} + +struct CallOnDrop(F); + +impl Drop for CallOnDrop { + fn drop(&mut self) { + (self.0)(); + } +}