From d3196999f466107fd22694662b502bc1e781f8dc Mon Sep 17 00:00:00 2001 From: John Nunley Date: Tue, 13 Feb 2024 07:18:54 -0800 Subject: [PATCH] feat: Add a way to batch spawn tasks For some workloads many tasks are spawned at a time. This requires locking and unlocking the executor's inner lock every time you spawn a task. If you spawn many tasks this can be expensive. This commit exposes a new "spawn_batch" method on both types. This method allows the user to spawn an entire set of tasks at a time. Closes #91 Signed-off-by: John Nunley --- benches/executor.rs | 15 ++++ src/lib.rs | 203 +++++++++++++++++++++++++++++++++++++------- tests/drop.rs | 14 +++ tests/spawn_many.rs | 45 ++++++++++ 4 files changed, 244 insertions(+), 33 deletions(-) create mode 100644 tests/spawn_many.rs diff --git a/benches/executor.rs b/benches/executor.rs index b6e33c2..791610f 100644 --- a/benches/executor.rs +++ b/benches/executor.rs @@ -51,6 +51,21 @@ fn running_benches(c: &mut Criterion) { ); }); + group.bench_function("executor::spawn_batch", |b| { + run( + || { + let mut handles = vec![]; + + b.iter(|| { + EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles); + }); + + handles.clear(); + }, + *multithread, + ) + }); + group.bench_function("executor::spawn_many_local", |b| { run( || { diff --git a/src/lib.rs b/src/lib.rs index 31fd40d..a7955bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -149,6 +149,85 @@ impl<'a> Executor<'a> { pub fn spawn(&self, future: impl Future + Send + 'a) -> Task { let mut active = self.state().active.lock().unwrap(); + // SAFETY: `T` and the future are `Send`. + unsafe { self.spawn_inner(future, &mut active) } + } + + /// Spawns many tasks onto the executor. + /// + /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and + /// spawns all of the tasks in one go. With large amounts of tasks this can improve + /// contention. + /// + /// For very large numbers of tasks the lock is occasionally dropped and re-acquired to + /// prevent runner thread starvation. It is assumed that the iterator provided does not + /// block; blocking iterators can lock up the internal mutex and therefore the entire + /// executor. + /// + /// ## Example + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::{stream, prelude::*}; + /// use std::future::ready; + /// + /// # futures_lite::future::block_on(async { + /// let mut ex = Executor::new(); + /// + /// let futures = [ + /// ready(1), + /// ready(2), + /// ready(3) + /// ]; + /// + /// // Spawn all of the futures onto the executor at once. + /// let mut tasks = vec![]; + /// ex.spawn_many(futures, &mut tasks); + /// + /// // Await all of them. + /// let results = ex.run(async move { + /// stream::iter(tasks).then(|x| x).collect::>().await + /// }).await; + /// assert_eq!(results, [1, 2, 3]); + /// # }); + /// ``` + /// + /// [`spawn`]: Executor::spawn + pub fn spawn_many + Send + 'a>( + &self, + futures: impl IntoIterator, + handles: &mut impl Extend>, + ) { + let mut active = Some(self.state().active.lock().unwrap()); + + // Convert the futures into tasks. + let tasks = futures.into_iter().enumerate().map(move |(i, future)| { + // SAFETY: `T` and the future are `Send`. + let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) }; + + // Yield the lock every once in a while to ease contention. + if i.wrapping_sub(1) % 500 == 0 { + drop(active.take()); + active = Some(self.state().active.lock().unwrap()); + } + + task + }); + + // Push the tasks to the user's collection. + handles.extend(tasks); + } + + /// Spawn a future while holding the inner lock. + /// + /// # Safety + /// + /// If this is an `Executor`, `F` and `T` must be `Send`. + unsafe fn spawn_inner( + &self, + future: impl Future + 'a, + active: &mut Slab, + ) -> Task { // Remove the task from the set of active tasks when the future finishes. let entry = active.vacant_entry(); let index = entry.key(); @@ -159,11 +238,30 @@ impl<'a> Executor<'a> { }; // Create the task and register it in the set of active tasks. - let (runnable, task) = unsafe { - Builder::new() - .propagate_panic(true) - .spawn_unchecked(|()| future, self.schedule()) - }; + // + // SAFETY: + // + // If `future` is not `Send`, this must be a `LocalExecutor` as per this + // function's unsafe precondition. Since `LocalExecutor` is `!Sync`, + // `try_tick`, `tick` and `run` can only be called from the origin + // thread of the `LocalExecutor`. Similarly, `spawn` can only be called + // from the origin thread, ensuring that `future` and the executor share + // the same origin thread. The `Runnable` can be scheduled from other + // threads, but because of the above `Runnable` can only be called or + // dropped on the origin thread. + // + // `future` is not `'static`, but we make sure that the `Runnable` does + // not outlive `'a`. When the executor is dropped, the `active` field is + // drained and all of the `Waker`s are woken. Then, the queue inside of + // the `Executor` is drained of all of its runnables. This ensures that + // runnables are dropped and this precondition is satisfied. + // + // `self.schedule()` is `Send`, `Sync` and `'static`, as checked below. + // Therefore we do not need to worry about what is done with the + // `Waker`. + let (runnable, task) = Builder::new() + .propagate_panic(true) + .spawn_unchecked(|()| future, self.schedule()); entry.insert(runnable.waker()); runnable.schedule(); @@ -292,7 +390,7 @@ impl<'a> Executor<'a> { impl Drop for Executor<'_> { fn drop(&mut self) { if let Some(state) = self.state.get() { - let mut active = state.active.lock().unwrap(); + let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner()); for w in active.drain() { w.wake(); } @@ -397,25 +495,70 @@ impl<'a> LocalExecutor<'a> { pub fn spawn(&self, future: impl Future + 'a) -> Task { let mut active = self.inner().state().active.lock().unwrap(); - // Remove the task from the set of active tasks when the future finishes. - let entry = active.vacant_entry(); - let index = entry.key(); - let state = self.inner().state().clone(); - let future = async move { - let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index))); - future.await - }; + // SAFETY: This executor is not thread safe, so the future and its result + // cannot be sent to another thread. + unsafe { self.inner().spawn_inner(future, &mut active) } + } - // Create the task and register it in the set of active tasks. - let (runnable, task) = unsafe { - Builder::new() - .propagate_panic(true) - .spawn_unchecked(|()| future, self.schedule()) - }; - entry.insert(runnable.waker()); + /// Spawns many tasks onto the executor. + /// + /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and + /// spawns all of the tasks in one go. With large amounts of tasks this can improve + /// contention. + /// + /// It is assumed that the iterator provided does not block; blocking iterators can lock up + /// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the + /// mutex is not released, as there are no other threads that can poll this executor. + /// + /// ## Example + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::{stream, prelude::*}; + /// use std::future::ready; + /// + /// # futures_lite::future::block_on(async { + /// let mut ex = LocalExecutor::new(); + /// + /// let futures = [ + /// ready(1), + /// ready(2), + /// ready(3) + /// ]; + /// + /// // Spawn all of the futures onto the executor at once. + /// let mut tasks = vec![]; + /// ex.spawn_many(futures, &mut tasks); + /// + /// // Await all of them. + /// let results = ex.run(async move { + /// stream::iter(tasks).then(|x| x).collect::>().await + /// }).await; + /// assert_eq!(results, [1, 2, 3]); + /// # }); + /// ``` + /// + /// [`spawn`]: LocalExecutor::spawn + /// [`Executor::spawn_many`]: Executor::spawn_many + pub fn spawn_many + Send + 'a>( + &self, + futures: impl IntoIterator, + handles: &mut impl Extend>, + ) { + let mut active = self.inner().state().active.lock().unwrap(); - runnable.schedule(); - task + // Convert all of the futures to tasks. + let tasks = futures.into_iter().map(|future| { + // SAFETY: This executor is not thread safe, so the future and its result + // cannot be sent to another thread. + unsafe { self.inner().spawn_inner(future, &mut active) } + + // As only one thread can spawn or poll tasks at a time, there is no need + // to release lock contention here. + }); + + // Push them to the user's collection. + handles.extend(tasks); } /// Attempts to run a task if at least one is scheduled. @@ -481,16 +624,6 @@ impl<'a> LocalExecutor<'a> { self.inner().run(future).await } - /// Returns a function that schedules a runnable task when it gets woken up. - fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { - let state = self.inner().state().clone(); - - move |runnable| { - state.queue.push(runnable).unwrap(); - state.notify(); - } - } - /// Returns a reference to the inner executor. fn inner(&self) -> &Executor<'a> { &self.inner @@ -953,6 +1086,7 @@ fn _ensure_send_and_sync() { fn is_send(_: T) {} fn is_sync(_: T) {} + fn is_static(_: T) {} is_send::>(Executor::new()); is_sync::>(Executor::new()); @@ -962,6 +1096,9 @@ fn _ensure_send_and_sync() { is_sync(ex.run(pending::<()>())); is_send(ex.tick()); is_sync(ex.tick()); + is_send(ex.schedule()); + is_sync(ex.schedule()); + is_static(ex.schedule()); /// ```compile_fail /// use async_executor::LocalExecutor; diff --git a/tests/drop.rs b/tests/drop.rs index 2b1ce56..54a0741 100644 --- a/tests/drop.rs +++ b/tests/drop.rs @@ -121,6 +121,20 @@ fn drop_finished_task_and_then_drop_executor() { assert_eq!(DROP.load(Ordering::SeqCst), 1); } +#[test] +fn iterator_panics_mid_run() { + let ex = Executor::new(); + + let panic = std::panic::catch_unwind(|| { + let mut handles = vec![]; + ex.spawn_many( + (0..50).map(|i| if i == 25 { panic!() } else { future::ready(i) }), + &mut handles, + ) + }); + assert!(panic.is_err()); +} + struct CallOnDrop(F); impl Drop for CallOnDrop { diff --git a/tests/spawn_many.rs b/tests/spawn_many.rs new file mode 100644 index 0000000..cebe2d3 --- /dev/null +++ b/tests/spawn_many.rs @@ -0,0 +1,45 @@ +use async_executor::{Executor, LocalExecutor}; +use futures_lite::future; + +#[cfg(not(miri))] +const READY_COUNT: usize = 50_000; +#[cfg(miri)] +const READY_COUNT: usize = 505; + +#[test] +fn spawn_many() { + future::block_on(async { + let ex = Executor::new(); + + // Spawn a lot of tasks. + let mut tasks = vec![]; + ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks); + + // Run all of the tasks in parallel. + ex.run(async move { + for (i, task) in tasks.into_iter().enumerate() { + assert_eq!(task.await, i); + } + }) + .await; + }); +} + +#[test] +fn spawn_many_local() { + future::block_on(async { + let ex = LocalExecutor::new(); + + // Spawn a lot of tasks. + let mut tasks = vec![]; + ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks); + + // Run all of the tasks in parallel. + ex.run(async move { + for (i, task) in tasks.into_iter().enumerate() { + assert_eq!(task.await, i); + } + }) + .await; + }); +}