Update async-task

This commit is contained in:
Stjepan Glavina 2020-09-20 02:36:54 +02:00
parent 184185a7fa
commit 8cea09da36
3 changed files with 65 additions and 49 deletions

View File

@ -13,7 +13,7 @@ categories = ["asynchronous", "concurrency"]
readme = "README.md" readme = "README.md"
[dependencies] [dependencies]
async-task = { path = "../async-task" } async-task = "4.0.0"
concurrent-queue = "1.2.2" concurrent-queue = "1.2.2"
fastrand = "1.3.4" fastrand = "1.3.4"
futures-lite = "1.0.0" futures-lite = "1.0.0"

View File

@ -18,23 +18,23 @@ enum Priority {
/// An executor with task priorities. /// An executor with task priorities.
/// ///
/// Tasks with lower priorities only get polled when there are no tasks with higher priorities. /// Tasks with lower priorities only get polled when there are no tasks with higher priorities.
struct PriorityExecutor<'a> { struct PriorityExecutor {
ex: [Executor<'a>; 3], ex: [Executor; 3],
} }
impl<'a> PriorityExecutor<'a> { impl PriorityExecutor {
/// Creates a new executor. /// Creates a new executor.
const fn new() -> PriorityExecutor<'a> { const fn new() -> PriorityExecutor {
PriorityExecutor { PriorityExecutor {
ex: [Executor::new(), Executor::new(), Executor::new()], ex: [Executor::new(), Executor::new(), Executor::new()],
} }
} }
/// Spawns a task with the given priority. /// Spawns a task with the given priority.
fn spawn<T: Send + 'a>( fn spawn<T: Send + 'static>(
&self, &self,
priority: Priority, priority: Priority,
future: impl Future<Output = T> + Send + 'a, future: impl Future<Output = T> + Send + 'static,
) -> Task<T> { ) -> Task<T> {
self.ex[priority as usize].spawn(future) self.ex[priority as usize].spawn(future)
} }
@ -59,7 +59,7 @@ impl<'a> PriorityExecutor<'a> {
} }
fn main() { fn main() {
static EX: PriorityExecutor<'_> = PriorityExecutor::new(); static EX: PriorityExecutor = PriorityExecutor::new();
// Spawn a thread running the executor forever. // Spawn a thread running the executor forever.
thread::spawn(|| future::block_on(EX.run())); thread::spawn(|| future::block_on(EX.run()));

View File

@ -61,18 +61,14 @@ pub use async_task::Task;
/// })); /// }));
/// ``` /// ```
#[derive(Debug)] #[derive(Debug)]
pub struct Executor<'a> { pub struct Executor {
state: once_cell::sync::OnceCell<Arc<State>>, state: once_cell::sync::OnceCell<Arc<State>>,
_marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
} }
unsafe impl Send for Executor<'_> {} impl UnwindSafe for Executor {}
unsafe impl Sync for Executor<'_> {} impl RefUnwindSafe for Executor {}
impl UnwindSafe for Executor<'_> {} impl Executor {
impl RefUnwindSafe for Executor<'_> {}
impl<'a> Executor<'a> {
/// Creates a new executor. /// Creates a new executor.
/// ///
/// # Examples /// # Examples
@ -82,10 +78,9 @@ impl<'a> Executor<'a> {
/// ///
/// let ex = Executor::new(); /// let ex = Executor::new();
/// ``` /// ```
pub const fn new() -> Executor<'a> { pub const fn new() -> Executor {
Executor { Executor {
state: once_cell::sync::OnceCell::new(), state: once_cell::sync::OnceCell::new(),
_marker: PhantomData,
} }
} }
@ -102,8 +97,22 @@ impl<'a> Executor<'a> {
/// println!("Hello world"); /// println!("Hello world");
/// }); /// });
/// ``` /// ```
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> { pub fn spawn<T: Send + 'static>(
unsafe { self.spawn_unchecked(future) } &self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
let mut active = self.state().active.lock().unwrap();
let index = active.next_vacant();
let state = self.state().clone();
let future = async move {
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(index)));
future.await
};
let (runnable, task) = async_task::spawn(future, self.schedule());
active.insert(runnable.waker());
runnable.schedule();
task
} }
/// Attempts to run a task if at least one is scheduled. /// Attempts to run a task if at least one is scheduled.
@ -211,24 +220,9 @@ impl<'a> Executor<'a> {
fn state(&self) -> &Arc<State> { fn state(&self) -> &Arc<State> {
self.state.get_or_init(|| Arc::new(State::new())) self.state.get_or_init(|| Arc::new(State::new()))
} }
unsafe fn spawn_unchecked<T>(&self, future: impl Future<Output = T>) -> Task<T> {
let mut active = self.state().active.lock().unwrap();
let index = active.next_vacant();
let state = self.state().clone();
let future = async move {
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(index)));
future.await
};
let (runnable, task) = async_task::spawn_unchecked(future, self.schedule());
active.insert(runnable.waker());
runnable.schedule();
task
}
} }
impl Drop for Executor<'_> { impl Drop for Executor {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(state) = self.state.get() { if let Some(state) = self.state.get() {
let mut active = state.active.lock().unwrap(); let mut active = state.active.lock().unwrap();
@ -244,8 +238,8 @@ impl Drop for Executor<'_> {
} }
} }
impl<'a> Default for Executor<'a> { impl Default for Executor {
fn default() -> Executor<'a> { fn default() -> Executor {
Executor::new() Executor::new()
} }
} }
@ -267,18 +261,18 @@ impl<'a> Default for Executor<'a> {
/// })); /// }));
/// ``` /// ```
#[derive(Debug)] #[derive(Debug)]
pub struct LocalExecutor<'a> { pub struct LocalExecutor {
/// The inner executor. /// The inner executor.
inner: once_cell::unsync::OnceCell<Executor<'a>>, inner: once_cell::unsync::OnceCell<Executor>,
/// Make sure the type is `!Send` and `!Sync`. /// Make sure the type is `!Send` and `!Sync`.
_marker: PhantomData<Rc<()>>, _marker: PhantomData<Rc<()>>,
} }
impl UnwindSafe for LocalExecutor<'_> {} impl UnwindSafe for LocalExecutor {}
impl RefUnwindSafe for LocalExecutor<'_> {} impl RefUnwindSafe for LocalExecutor {}
impl<'a> LocalExecutor<'a> { impl LocalExecutor {
/// Creates a single-threaded executor. /// Creates a single-threaded executor.
/// ///
/// # Examples /// # Examples
@ -288,7 +282,7 @@ impl<'a> LocalExecutor<'a> {
/// ///
/// let local_ex = LocalExecutor::new(); /// let local_ex = LocalExecutor::new();
/// ``` /// ```
pub const fn new() -> LocalExecutor<'a> { pub const fn new() -> LocalExecutor {
LocalExecutor { LocalExecutor {
inner: once_cell::unsync::OnceCell::new(), inner: once_cell::unsync::OnceCell::new(),
_marker: PhantomData, _marker: PhantomData,
@ -308,8 +302,19 @@ impl<'a> LocalExecutor<'a> {
/// println!("Hello world"); /// println!("Hello world");
/// }); /// });
/// ``` /// ```
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> { pub fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
unsafe { self.inner().spawn_unchecked(future) } let mut active = self.inner().state().active.lock().unwrap();
let index = active.next_vacant();
let state = self.inner().state().clone();
let future = async move {
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(index)));
future.await
};
let (runnable, task) = async_task::spawn_local(future, self.schedule());
active.insert(runnable.waker());
runnable.schedule();
task
} }
/// Attempts to run a task if at least one is scheduled. /// Attempts to run a task if at least one is scheduled.
@ -375,14 +380,24 @@ impl<'a> LocalExecutor<'a> {
self.inner().run(future).await self.inner().run(future).await
} }
/// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.inner().state().clone();
move |runnable| {
state.queue.push(runnable).unwrap();
state.notify();
}
}
/// Returns a reference to the inner executor. /// Returns a reference to the inner executor.
fn inner(&self) -> &Executor<'a> { fn inner(&self) -> &Executor {
self.inner.get_or_init(|| Executor::new()) self.inner.get_or_init(|| Executor::new())
} }
} }
impl<'a> Default for LocalExecutor<'a> { impl Default for LocalExecutor {
fn default() -> LocalExecutor<'a> { fn default() -> LocalExecutor {
LocalExecutor::new() LocalExecutor::new()
} }
} }
@ -402,6 +417,7 @@ struct State {
/// A list of sleeping tickers. /// A list of sleeping tickers.
sleepers: Mutex<Sleepers>, sleepers: Mutex<Sleepers>,
/// Currently active tasks.
active: Mutex<Arena<Waker>>, active: Mutex<Arena<Waker>>,
} }