Make executors scoped
This commit is contained in:
parent
19eb3ccd6e
commit
6f2b0b8a49
|
@ -13,11 +13,12 @@ categories = ["asynchronous", "concurrency"]
|
|||
readme = "README.md"
|
||||
|
||||
[dependencies]
|
||||
async-task = "3.0.0"
|
||||
async-task = { path = "../async-task" }
|
||||
concurrent-queue = "1.2.2"
|
||||
fastrand = "1.3.4"
|
||||
futures-lite = "1.0.0"
|
||||
once_cell = "1.4.1"
|
||||
vec-arena = "1.0.0"
|
||||
|
||||
[dev-dependencies]
|
||||
async-channel = "1.4.1"
|
||||
|
|
|
@ -18,23 +18,23 @@ enum Priority {
|
|||
/// An executor with task priorities.
|
||||
///
|
||||
/// Tasks with lower priorities only get polled when there are no tasks with higher priorities.
|
||||
struct PriorityExecutor {
|
||||
ex: [Executor; 3],
|
||||
struct PriorityExecutor<'a> {
|
||||
ex: [Executor<'a>; 3],
|
||||
}
|
||||
|
||||
impl PriorityExecutor {
|
||||
impl<'a> PriorityExecutor<'a> {
|
||||
/// Creates a new executor.
|
||||
const fn new() -> PriorityExecutor {
|
||||
const fn new() -> PriorityExecutor<'a> {
|
||||
PriorityExecutor {
|
||||
ex: [Executor::new(), Executor::new(), Executor::new()],
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a task with the given priority.
|
||||
fn spawn<T: Send + 'static>(
|
||||
fn spawn<T: Send + 'a>(
|
||||
&self,
|
||||
priority: Priority,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
future: impl Future<Output = T> + Send + 'a,
|
||||
) -> Task<T> {
|
||||
self.ex[priority as usize].spawn(future)
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ impl PriorityExecutor {
|
|||
}
|
||||
|
||||
fn main() {
|
||||
static EX: PriorityExecutor = PriorityExecutor::new();
|
||||
static EX: PriorityExecutor<'_> = PriorityExecutor::new();
|
||||
|
||||
// Spawn a thread running the executor forever.
|
||||
thread::spawn(|| future::block_on(EX.run()));
|
||||
|
|
257
src/lib.rs
257
src/lib.rs
|
@ -18,160 +18,23 @@
|
|||
//! future::block_on(ex.run(task));
|
||||
//! ```
|
||||
|
||||
#![forbid(unsafe_code)]
|
||||
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
|
||||
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::panic::{RefUnwindSafe, UnwindSafe};
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::task::{Context, Poll, Waker};
|
||||
use std::task::{Poll, Waker};
|
||||
|
||||
use async_task::Runnable;
|
||||
use concurrent_queue::ConcurrentQueue;
|
||||
use futures_lite::{future, FutureExt};
|
||||
use vec_arena::Arena;
|
||||
|
||||
/// A runnable future, ready for execution.
|
||||
///
|
||||
/// When a future is internally spawned using `async_task::spawn()` or `async_task::spawn_local()`,
|
||||
/// we get back two values:
|
||||
///
|
||||
/// 1. an `async_task::Task<()>`, which we refer to as a `Runnable`
|
||||
/// 2. an `async_task::JoinHandle<T, ()>`, which is wrapped inside a `Task<T>`
|
||||
///
|
||||
/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken. When it's
|
||||
/// woken up, its schedule function is called, which means the `Runnable` gets pushed into a task
|
||||
/// queue in an executor.
|
||||
type Runnable = async_task::Task<()>;
|
||||
|
||||
/// A spawned future.
|
||||
///
|
||||
/// Tasks are also futures themselves and yield the output of the spawned future.
|
||||
///
|
||||
/// When a task is dropped, its gets canceled and won't be polled again. To cancel a task a bit
|
||||
/// more gracefully and wait until it stops running, use the [`cancel()`][Task::cancel()] method.
|
||||
///
|
||||
/// Tasks that panic get immediately canceled. Awaiting a canceled task also causes a panic.
|
||||
///
|
||||
/// If a task panics, the panic will be thrown into the [`Executor::run()`] or
|
||||
/// [`LocalExecutor::run()`] invocation that polled it.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::Executor;
|
||||
/// use futures_lite::future;
|
||||
/// use std::thread;
|
||||
///
|
||||
/// let ex = Executor::new();
|
||||
///
|
||||
/// // Spawn a future onto the executor.
|
||||
/// let task = ex.spawn(async {
|
||||
/// println!("Hello from a task!");
|
||||
/// 1 + 2
|
||||
/// });
|
||||
///
|
||||
/// // Run an executor thread.
|
||||
/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
|
||||
///
|
||||
/// // Wait for the result.
|
||||
/// assert_eq!(future::block_on(task), 3);
|
||||
/// ```
|
||||
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
|
||||
#[derive(Debug)]
|
||||
pub struct Task<T>(Option<async_task::JoinHandle<T, ()>>);
|
||||
|
||||
impl<T> UnwindSafe for Task<T> {}
|
||||
impl<T> RefUnwindSafe for Task<T> {}
|
||||
|
||||
impl<T> Task<T> {
|
||||
/// Detaches the task to let it keep running in the background.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::Executor;
|
||||
/// use async_io::Timer;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// let ex = Executor::new();
|
||||
///
|
||||
/// // Spawn a deamon future.
|
||||
/// ex.spawn(async {
|
||||
/// loop {
|
||||
/// println!("I'm a daemon task looping forever.");
|
||||
/// Timer::after(Duration::from_secs(1)).await;
|
||||
/// }
|
||||
/// })
|
||||
/// .detach();
|
||||
/// ```
|
||||
pub fn detach(mut self) {
|
||||
self.0.take().unwrap();
|
||||
}
|
||||
|
||||
/// Cancels the task and waits for it to stop running.
|
||||
///
|
||||
/// Returns the task's output if it was completed just before it got canceled, or [`None`] if
|
||||
/// it didn't complete.
|
||||
///
|
||||
/// While it's possible to simply drop the [`Task`] to cancel it, this is a cleaner way of
|
||||
/// canceling because it also waits for the task to stop running.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::Executor;
|
||||
/// use async_io::Timer;
|
||||
/// use futures_lite::future;
|
||||
/// use std::thread;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// let ex = Executor::new();
|
||||
///
|
||||
/// // Spawn a deamon future.
|
||||
/// let task = ex.spawn(async {
|
||||
/// loop {
|
||||
/// println!("Even though I'm in an infinite loop, you can still cancel me!");
|
||||
/// Timer::after(Duration::from_secs(1)).await;
|
||||
/// }
|
||||
/// });
|
||||
///
|
||||
/// // Run an executor thread.
|
||||
/// thread::spawn(move || future::block_on(ex.run(future::pending::<()>())));
|
||||
///
|
||||
/// future::block_on(async {
|
||||
/// Timer::after(Duration::from_secs(3)).await;
|
||||
/// task.cancel().await;
|
||||
/// });
|
||||
/// ```
|
||||
pub async fn cancel(self) -> Option<T> {
|
||||
let mut task = self;
|
||||
let handle = task.0.take().unwrap();
|
||||
handle.cancel();
|
||||
handle.await
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Drop for Task<T> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(handle) = &self.0 {
|
||||
handle.cancel();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Future for Task<T> {
|
||||
type Output = T;
|
||||
|
||||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
match Pin::new(&mut self.0.as_mut().unwrap()).poll(cx) {
|
||||
Poll::Pending => Poll::Pending,
|
||||
Poll::Ready(output) => Poll::Ready(output.expect("task has failed")),
|
||||
}
|
||||
}
|
||||
}
|
||||
#[doc(no_inline)]
|
||||
pub use async_task::Task;
|
||||
|
||||
/// The state of a executor.
|
||||
#[derive(Debug)]
|
||||
|
@ -187,6 +50,8 @@ struct State {
|
|||
|
||||
/// A list of sleeping tickers.
|
||||
sleepers: Mutex<Sleepers>,
|
||||
|
||||
active: Mutex<Arena<Waker>>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
|
@ -201,6 +66,7 @@ impl State {
|
|||
wakers: Vec::new(),
|
||||
free_ids: Vec::new(),
|
||||
}),
|
||||
active: Mutex::new(Arena::new()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -321,14 +187,18 @@ impl Sleepers {
|
|||
/// }));
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct Executor {
|
||||
pub struct Executor<'a> {
|
||||
state: once_cell::sync::OnceCell<Arc<State>>,
|
||||
_marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
|
||||
}
|
||||
|
||||
impl UnwindSafe for Executor {}
|
||||
impl RefUnwindSafe for Executor {}
|
||||
unsafe impl Send for Executor<'_> {}
|
||||
unsafe impl Sync for Executor<'_> {}
|
||||
|
||||
impl Executor {
|
||||
impl UnwindSafe for Executor<'_> {}
|
||||
impl RefUnwindSafe for Executor<'_> {}
|
||||
|
||||
impl<'a> Executor<'a> {
|
||||
/// Creates a new executor.
|
||||
///
|
||||
/// # Examples
|
||||
|
@ -338,9 +208,10 @@ impl Executor {
|
|||
///
|
||||
/// let ex = Executor::new();
|
||||
/// ```
|
||||
pub const fn new() -> Executor {
|
||||
pub const fn new() -> Executor<'a> {
|
||||
Executor {
|
||||
state: once_cell::sync::OnceCell::new(),
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -357,14 +228,8 @@ impl Executor {
|
|||
/// println!("Hello world");
|
||||
/// });
|
||||
/// ```
|
||||
pub fn spawn<T: Send + 'static>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> Task<T> {
|
||||
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
|
||||
let (runnable, handle) = async_task::spawn(future, self.schedule(), ());
|
||||
runnable.schedule();
|
||||
Task(Some(handle))
|
||||
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
|
||||
unsafe { self.spawn_unchecked(future) }
|
||||
}
|
||||
|
||||
/// Attempts to run a task if at least one is scheduled.
|
||||
|
@ -472,16 +337,42 @@ impl Executor {
|
|||
fn state(&self) -> &Arc<State> {
|
||||
self.state.get_or_init(|| Arc::new(State::new()))
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Executor {
|
||||
fn drop(&mut self) {
|
||||
// TODO(stjepang): Cancel all remaining tasks.
|
||||
unsafe fn spawn_unchecked<T>(&self, future: impl Future<Output = T>) -> Task<T> {
|
||||
let mut active = self.state().active.lock().unwrap();
|
||||
let index = active.next_vacant();
|
||||
let state = self.state().clone();
|
||||
let future = async move {
|
||||
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(index)));
|
||||
future.await
|
||||
};
|
||||
|
||||
let (runnable, task) = async_task::spawn_unchecked(future, self.schedule());
|
||||
active.insert(runnable.waker());
|
||||
runnable.schedule();
|
||||
task
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for Executor {
|
||||
fn default() -> Executor {
|
||||
impl Drop for Executor<'_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(state) = self.state.get() {
|
||||
{
|
||||
let mut state = state.active.lock().unwrap();
|
||||
for i in 0..state.capacity() {
|
||||
if let Some(w) = state.remove(i) {
|
||||
w.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
while state.queue.pop().is_ok() {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Default for Executor<'a> {
|
||||
fn default() -> Executor<'a> {
|
||||
Executor::new()
|
||||
}
|
||||
}
|
||||
|
@ -750,18 +641,18 @@ fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
|
|||
/// }));
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct LocalExecutor {
|
||||
pub struct LocalExecutor<'a> {
|
||||
/// The inner executor.
|
||||
inner: once_cell::unsync::OnceCell<Executor>,
|
||||
inner: once_cell::unsync::OnceCell<Executor<'a>>,
|
||||
|
||||
/// Make sure the type is `!Send` and `!Sync`.
|
||||
_marker: PhantomData<Rc<()>>,
|
||||
}
|
||||
|
||||
impl UnwindSafe for LocalExecutor {}
|
||||
impl RefUnwindSafe for LocalExecutor {}
|
||||
impl UnwindSafe for LocalExecutor<'_> {}
|
||||
impl RefUnwindSafe for LocalExecutor<'_> {}
|
||||
|
||||
impl LocalExecutor {
|
||||
impl<'a> LocalExecutor<'a> {
|
||||
/// Creates a single-threaded executor.
|
||||
///
|
||||
/// # Examples
|
||||
|
@ -771,7 +662,7 @@ impl LocalExecutor {
|
|||
///
|
||||
/// let local_ex = LocalExecutor::new();
|
||||
/// ```
|
||||
pub const fn new() -> LocalExecutor {
|
||||
pub const fn new() -> LocalExecutor<'a> {
|
||||
LocalExecutor {
|
||||
inner: once_cell::unsync::OnceCell::new(),
|
||||
_marker: PhantomData,
|
||||
|
@ -791,11 +682,8 @@ impl LocalExecutor {
|
|||
/// println!("Hello world");
|
||||
/// });
|
||||
/// ```
|
||||
pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
|
||||
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
|
||||
let (runnable, handle) = async_task::spawn_local(future, self.schedule(), ());
|
||||
runnable.schedule();
|
||||
Task(Some(handle))
|
||||
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
|
||||
unsafe { self.inner().spawn_unchecked(future) }
|
||||
}
|
||||
|
||||
/// Attempts to run a task if at least one is scheduled.
|
||||
|
@ -861,24 +749,23 @@ impl LocalExecutor {
|
|||
self.inner().run(future).await
|
||||
}
|
||||
|
||||
/// Returns a function that schedules a runnable task when it gets woken up.
|
||||
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
|
||||
let state = self.inner().state().clone();
|
||||
|
||||
move |runnable| {
|
||||
state.queue.push(runnable).unwrap();
|
||||
state.notify();
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a reference to the inner executor.
|
||||
fn inner(&self) -> &Executor {
|
||||
fn inner(&self) -> &Executor<'a> {
|
||||
self.inner.get_or_init(|| Executor::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for LocalExecutor {
|
||||
fn default() -> LocalExecutor {
|
||||
impl<'a> Default for LocalExecutor<'a> {
|
||||
fn default() -> LocalExecutor<'a> {
|
||||
LocalExecutor::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs a closure when dropped.
|
||||
struct CallOnDrop<F: Fn()>(F);
|
||||
|
||||
impl<F: Fn()> Drop for CallOnDrop<F> {
|
||||
fn drop(&mut self) {
|
||||
(self.0)();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,46 @@
|
|||
use std::panic::catch_unwind;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::Mutex;
|
||||
use std::task::{Poll, Waker};
|
||||
|
||||
use async_executor::Executor;
|
||||
use futures_lite::future;
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
#[test]
|
||||
fn smoke() {
|
||||
static DROP: AtomicUsize = AtomicUsize::new(0);
|
||||
static WAKER: Lazy<Mutex<Option<Waker>>> = Lazy::new(|| Default::default());
|
||||
|
||||
let ex = Executor::new();
|
||||
|
||||
let task = ex.spawn(async {
|
||||
let _guard = CallOnDrop(|| {
|
||||
DROP.fetch_add(1, Ordering::SeqCst);
|
||||
});
|
||||
|
||||
future::poll_fn(|cx| {
|
||||
*WAKER.lock().unwrap() = Some(cx.waker().clone());
|
||||
Poll::Pending::<()>
|
||||
})
|
||||
.await;
|
||||
});
|
||||
|
||||
future::block_on(ex.tick());
|
||||
assert!(WAKER.lock().unwrap().is_some());
|
||||
assert_eq!(DROP.load(Ordering::SeqCst), 0);
|
||||
|
||||
drop(ex);
|
||||
assert_eq!(DROP.load(Ordering::SeqCst), 1);
|
||||
|
||||
assert!(catch_unwind(|| future::block_on(task)).is_err());
|
||||
assert_eq!(DROP.load(Ordering::SeqCst), 1);
|
||||
}
|
||||
|
||||
struct CallOnDrop<F: Fn()>(F);
|
||||
|
||||
impl<F: Fn()> Drop for CallOnDrop<F> {
|
||||
fn drop(&mut self) {
|
||||
(self.0)();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue