feat: Add a way to batch spawn tasks
For some workloads many tasks are spawned at a time. This requires locking and unlocking the executor's inner lock every time you spawn a task. If you spawn many tasks this can be expensive. This commit exposes a new "spawn_batch" method on both types. This method allows the user to spawn an entire set of tasks at a time. Closes #91 Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
parent
17720b098a
commit
d3196999f4
|
@ -51,6 +51,21 @@ fn running_benches(c: &mut Criterion) {
|
|||
);
|
||||
});
|
||||
|
||||
group.bench_function("executor::spawn_batch", |b| {
|
||||
run(
|
||||
|| {
|
||||
let mut handles = vec![];
|
||||
|
||||
b.iter(|| {
|
||||
EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles);
|
||||
});
|
||||
|
||||
handles.clear();
|
||||
},
|
||||
*multithread,
|
||||
)
|
||||
});
|
||||
|
||||
group.bench_function("executor::spawn_many_local", |b| {
|
||||
run(
|
||||
|| {
|
||||
|
|
203
src/lib.rs
203
src/lib.rs
|
@ -149,6 +149,85 @@ impl<'a> Executor<'a> {
|
|||
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
|
||||
let mut active = self.state().active.lock().unwrap();
|
||||
|
||||
// SAFETY: `T` and the future are `Send`.
|
||||
unsafe { self.spawn_inner(future, &mut active) }
|
||||
}
|
||||
|
||||
/// Spawns many tasks onto the executor.
|
||||
///
|
||||
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
|
||||
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
|
||||
/// contention.
|
||||
///
|
||||
/// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
|
||||
/// prevent runner thread starvation. It is assumed that the iterator provided does not
|
||||
/// block; blocking iterators can lock up the internal mutex and therefore the entire
|
||||
/// executor.
|
||||
///
|
||||
/// ## Example
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::Executor;
|
||||
/// use futures_lite::{stream, prelude::*};
|
||||
/// use std::future::ready;
|
||||
///
|
||||
/// # futures_lite::future::block_on(async {
|
||||
/// let mut ex = Executor::new();
|
||||
///
|
||||
/// let futures = [
|
||||
/// ready(1),
|
||||
/// ready(2),
|
||||
/// ready(3)
|
||||
/// ];
|
||||
///
|
||||
/// // Spawn all of the futures onto the executor at once.
|
||||
/// let mut tasks = vec![];
|
||||
/// ex.spawn_many(futures, &mut tasks);
|
||||
///
|
||||
/// // Await all of them.
|
||||
/// let results = ex.run(async move {
|
||||
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
|
||||
/// }).await;
|
||||
/// assert_eq!(results, [1, 2, 3]);
|
||||
/// # });
|
||||
/// ```
|
||||
///
|
||||
/// [`spawn`]: Executor::spawn
|
||||
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
|
||||
&self,
|
||||
futures: impl IntoIterator<Item = F>,
|
||||
handles: &mut impl Extend<Task<F::Output>>,
|
||||
) {
|
||||
let mut active = Some(self.state().active.lock().unwrap());
|
||||
|
||||
// Convert the futures into tasks.
|
||||
let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
|
||||
// SAFETY: `T` and the future are `Send`.
|
||||
let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
|
||||
|
||||
// Yield the lock every once in a while to ease contention.
|
||||
if i.wrapping_sub(1) % 500 == 0 {
|
||||
drop(active.take());
|
||||
active = Some(self.state().active.lock().unwrap());
|
||||
}
|
||||
|
||||
task
|
||||
});
|
||||
|
||||
// Push the tasks to the user's collection.
|
||||
handles.extend(tasks);
|
||||
}
|
||||
|
||||
/// Spawn a future while holding the inner lock.
|
||||
///
|
||||
/// # Safety
|
||||
///
|
||||
/// If this is an `Executor`, `F` and `T` must be `Send`.
|
||||
unsafe fn spawn_inner<T: 'a>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + 'a,
|
||||
active: &mut Slab<Waker>,
|
||||
) -> Task<T> {
|
||||
// Remove the task from the set of active tasks when the future finishes.
|
||||
let entry = active.vacant_entry();
|
||||
let index = entry.key();
|
||||
|
@ -159,11 +238,30 @@ impl<'a> Executor<'a> {
|
|||
};
|
||||
|
||||
// Create the task and register it in the set of active tasks.
|
||||
let (runnable, task) = unsafe {
|
||||
Builder::new()
|
||||
.propagate_panic(true)
|
||||
.spawn_unchecked(|()| future, self.schedule())
|
||||
};
|
||||
//
|
||||
// SAFETY:
|
||||
//
|
||||
// If `future` is not `Send`, this must be a `LocalExecutor` as per this
|
||||
// function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
|
||||
// `try_tick`, `tick` and `run` can only be called from the origin
|
||||
// thread of the `LocalExecutor`. Similarly, `spawn` can only be called
|
||||
// from the origin thread, ensuring that `future` and the executor share
|
||||
// the same origin thread. The `Runnable` can be scheduled from other
|
||||
// threads, but because of the above `Runnable` can only be called or
|
||||
// dropped on the origin thread.
|
||||
//
|
||||
// `future` is not `'static`, but we make sure that the `Runnable` does
|
||||
// not outlive `'a`. When the executor is dropped, the `active` field is
|
||||
// drained and all of the `Waker`s are woken. Then, the queue inside of
|
||||
// the `Executor` is drained of all of its runnables. This ensures that
|
||||
// runnables are dropped and this precondition is satisfied.
|
||||
//
|
||||
// `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
|
||||
// Therefore we do not need to worry about what is done with the
|
||||
// `Waker`.
|
||||
let (runnable, task) = Builder::new()
|
||||
.propagate_panic(true)
|
||||
.spawn_unchecked(|()| future, self.schedule());
|
||||
entry.insert(runnable.waker());
|
||||
|
||||
runnable.schedule();
|
||||
|
@ -292,7 +390,7 @@ impl<'a> Executor<'a> {
|
|||
impl Drop for Executor<'_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(state) = self.state.get() {
|
||||
let mut active = state.active.lock().unwrap();
|
||||
let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner());
|
||||
for w in active.drain() {
|
||||
w.wake();
|
||||
}
|
||||
|
@ -397,25 +495,70 @@ impl<'a> LocalExecutor<'a> {
|
|||
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
|
||||
let mut active = self.inner().state().active.lock().unwrap();
|
||||
|
||||
// Remove the task from the set of active tasks when the future finishes.
|
||||
let entry = active.vacant_entry();
|
||||
let index = entry.key();
|
||||
let state = self.inner().state().clone();
|
||||
let future = async move {
|
||||
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
|
||||
future.await
|
||||
};
|
||||
// SAFETY: This executor is not thread safe, so the future and its result
|
||||
// cannot be sent to another thread.
|
||||
unsafe { self.inner().spawn_inner(future, &mut active) }
|
||||
}
|
||||
|
||||
// Create the task and register it in the set of active tasks.
|
||||
let (runnable, task) = unsafe {
|
||||
Builder::new()
|
||||
.propagate_panic(true)
|
||||
.spawn_unchecked(|()| future, self.schedule())
|
||||
};
|
||||
entry.insert(runnable.waker());
|
||||
/// Spawns many tasks onto the executor.
|
||||
///
|
||||
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
|
||||
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
|
||||
/// contention.
|
||||
///
|
||||
/// It is assumed that the iterator provided does not block; blocking iterators can lock up
|
||||
/// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
|
||||
/// mutex is not released, as there are no other threads that can poll this executor.
|
||||
///
|
||||
/// ## Example
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
/// use futures_lite::{stream, prelude::*};
|
||||
/// use std::future::ready;
|
||||
///
|
||||
/// # futures_lite::future::block_on(async {
|
||||
/// let mut ex = LocalExecutor::new();
|
||||
///
|
||||
/// let futures = [
|
||||
/// ready(1),
|
||||
/// ready(2),
|
||||
/// ready(3)
|
||||
/// ];
|
||||
///
|
||||
/// // Spawn all of the futures onto the executor at once.
|
||||
/// let mut tasks = vec![];
|
||||
/// ex.spawn_many(futures, &mut tasks);
|
||||
///
|
||||
/// // Await all of them.
|
||||
/// let results = ex.run(async move {
|
||||
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
|
||||
/// }).await;
|
||||
/// assert_eq!(results, [1, 2, 3]);
|
||||
/// # });
|
||||
/// ```
|
||||
///
|
||||
/// [`spawn`]: LocalExecutor::spawn
|
||||
/// [`Executor::spawn_many`]: Executor::spawn_many
|
||||
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
|
||||
&self,
|
||||
futures: impl IntoIterator<Item = F>,
|
||||
handles: &mut impl Extend<Task<F::Output>>,
|
||||
) {
|
||||
let mut active = self.inner().state().active.lock().unwrap();
|
||||
|
||||
runnable.schedule();
|
||||
task
|
||||
// Convert all of the futures to tasks.
|
||||
let tasks = futures.into_iter().map(|future| {
|
||||
// SAFETY: This executor is not thread safe, so the future and its result
|
||||
// cannot be sent to another thread.
|
||||
unsafe { self.inner().spawn_inner(future, &mut active) }
|
||||
|
||||
// As only one thread can spawn or poll tasks at a time, there is no need
|
||||
// to release lock contention here.
|
||||
});
|
||||
|
||||
// Push them to the user's collection.
|
||||
handles.extend(tasks);
|
||||
}
|
||||
|
||||
/// Attempts to run a task if at least one is scheduled.
|
||||
|
@ -481,16 +624,6 @@ impl<'a> LocalExecutor<'a> {
|
|||
self.inner().run(future).await
|
||||
}
|
||||
|
||||
/// Returns a function that schedules a runnable task when it gets woken up.
|
||||
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
|
||||
let state = self.inner().state().clone();
|
||||
|
||||
move |runnable| {
|
||||
state.queue.push(runnable).unwrap();
|
||||
state.notify();
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the inner executor.
|
||||
fn inner(&self) -> &Executor<'a> {
|
||||
&self.inner
|
||||
|
@ -953,6 +1086,7 @@ fn _ensure_send_and_sync() {
|
|||
|
||||
fn is_send<T: Send>(_: T) {}
|
||||
fn is_sync<T: Sync>(_: T) {}
|
||||
fn is_static<T: 'static>(_: T) {}
|
||||
|
||||
is_send::<Executor<'_>>(Executor::new());
|
||||
is_sync::<Executor<'_>>(Executor::new());
|
||||
|
@ -962,6 +1096,9 @@ fn _ensure_send_and_sync() {
|
|||
is_sync(ex.run(pending::<()>()));
|
||||
is_send(ex.tick());
|
||||
is_sync(ex.tick());
|
||||
is_send(ex.schedule());
|
||||
is_sync(ex.schedule());
|
||||
is_static(ex.schedule());
|
||||
|
||||
/// ```compile_fail
|
||||
/// use async_executor::LocalExecutor;
|
||||
|
|
|
@ -121,6 +121,20 @@ fn drop_finished_task_and_then_drop_executor() {
|
|||
assert_eq!(DROP.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn iterator_panics_mid_run() {
|
||||
let ex = Executor::new();
|
||||
|
||||
let panic = std::panic::catch_unwind(|| {
|
||||
let mut handles = vec![];
|
||||
ex.spawn_many(
|
||||
(0..50).map(|i| if i == 25 { panic!() } else { future::ready(i) }),
|
||||
&mut handles,
|
||||
)
|
||||
});
|
||||
assert!(panic.is_err());
|
||||
}
|
||||
|
||||
struct CallOnDrop<F: Fn()>(F);
|
||||
|
||||
impl<F: Fn()> Drop for CallOnDrop<F> {
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
use async_executor::{Executor, LocalExecutor};
|
||||
use futures_lite::future;
|
||||
|
||||
#[cfg(not(miri))]
|
||||
const READY_COUNT: usize = 50_000;
|
||||
#[cfg(miri)]
|
||||
const READY_COUNT: usize = 505;
|
||||
|
||||
#[test]
|
||||
fn spawn_many() {
|
||||
future::block_on(async {
|
||||
let ex = Executor::new();
|
||||
|
||||
// Spawn a lot of tasks.
|
||||
let mut tasks = vec![];
|
||||
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks);
|
||||
|
||||
// Run all of the tasks in parallel.
|
||||
ex.run(async move {
|
||||
for (i, task) in tasks.into_iter().enumerate() {
|
||||
assert_eq!(task.await, i);
|
||||
}
|
||||
})
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn spawn_many_local() {
|
||||
future::block_on(async {
|
||||
let ex = LocalExecutor::new();
|
||||
|
||||
// Spawn a lot of tasks.
|
||||
let mut tasks = vec![];
|
||||
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks);
|
||||
|
||||
// Run all of the tasks in parallel.
|
||||
ex.run(async move {
|
||||
for (i, task) in tasks.into_iter().enumerate() {
|
||||
assert_eq!(task.await, i);
|
||||
}
|
||||
})
|
||||
.await;
|
||||
});
|
||||
}
|
Loading…
Reference in New Issue