diff --git a/Cargo.toml b/Cargo.toml index 89dba96..e6b054a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ categories = ["asynchronous", "concurrency"] readme = "README.md" [dependencies] -async-task = { path = "../async-task" } +async-task = "4.0.0" concurrent-queue = "1.2.2" fastrand = "1.3.4" futures-lite = "1.0.0" diff --git a/examples/priority.rs b/examples/priority.rs index a30ad97..ef62324 100644 --- a/examples/priority.rs +++ b/examples/priority.rs @@ -18,23 +18,23 @@ enum Priority { /// An executor with task priorities. /// /// Tasks with lower priorities only get polled when there are no tasks with higher priorities. -struct PriorityExecutor<'a> { - ex: [Executor<'a>; 3], +struct PriorityExecutor { + ex: [Executor; 3], } -impl<'a> PriorityExecutor<'a> { +impl PriorityExecutor { /// Creates a new executor. - const fn new() -> PriorityExecutor<'a> { + const fn new() -> PriorityExecutor { PriorityExecutor { ex: [Executor::new(), Executor::new(), Executor::new()], } } /// Spawns a task with the given priority. - fn spawn( + fn spawn( &self, priority: Priority, - future: impl Future + Send + 'a, + future: impl Future + Send + 'static, ) -> Task { self.ex[priority as usize].spawn(future) } @@ -59,7 +59,7 @@ impl<'a> PriorityExecutor<'a> { } fn main() { - static EX: PriorityExecutor<'_> = PriorityExecutor::new(); + static EX: PriorityExecutor = PriorityExecutor::new(); // Spawn a thread running the executor forever. thread::spawn(|| future::block_on(EX.run())); diff --git a/src/lib.rs b/src/lib.rs index e963d3a..b3a7ae3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,18 +61,14 @@ pub use async_task::Task; /// })); /// ``` #[derive(Debug)] -pub struct Executor<'a> { +pub struct Executor { state: once_cell::sync::OnceCell>, - _marker: PhantomData>, } -unsafe impl Send for Executor<'_> {} -unsafe impl Sync for Executor<'_> {} +impl UnwindSafe for Executor {} +impl RefUnwindSafe for Executor {} -impl UnwindSafe for Executor<'_> {} -impl RefUnwindSafe for Executor<'_> {} - -impl<'a> Executor<'a> { +impl Executor { /// Creates a new executor. /// /// # Examples @@ -82,10 +78,9 @@ impl<'a> Executor<'a> { /// /// let ex = Executor::new(); /// ``` - pub const fn new() -> Executor<'a> { + pub const fn new() -> Executor { Executor { state: once_cell::sync::OnceCell::new(), - _marker: PhantomData, } } @@ -102,8 +97,22 @@ impl<'a> Executor<'a> { /// println!("Hello world"); /// }); /// ``` - pub fn spawn(&self, future: impl Future + Send + 'a) -> Task { - unsafe { self.spawn_unchecked(future) } + pub fn spawn( + &self, + future: impl Future + Send + 'static, + ) -> Task { + let mut active = self.state().active.lock().unwrap(); + let index = active.next_vacant(); + let state = self.state().clone(); + let future = async move { + let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(index))); + future.await + }; + + let (runnable, task) = async_task::spawn(future, self.schedule()); + active.insert(runnable.waker()); + runnable.schedule(); + task } /// Attempts to run a task if at least one is scheduled. @@ -211,24 +220,9 @@ impl<'a> Executor<'a> { fn state(&self) -> &Arc { self.state.get_or_init(|| Arc::new(State::new())) } - - unsafe fn spawn_unchecked(&self, future: impl Future) -> Task { - let mut active = self.state().active.lock().unwrap(); - let index = active.next_vacant(); - let state = self.state().clone(); - let future = async move { - let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(index))); - future.await - }; - - let (runnable, task) = async_task::spawn_unchecked(future, self.schedule()); - active.insert(runnable.waker()); - runnable.schedule(); - task - } } -impl Drop for Executor<'_> { +impl Drop for Executor { fn drop(&mut self) { if let Some(state) = self.state.get() { let mut active = state.active.lock().unwrap(); @@ -244,8 +238,8 @@ impl Drop for Executor<'_> { } } -impl<'a> Default for Executor<'a> { - fn default() -> Executor<'a> { +impl Default for Executor { + fn default() -> Executor { Executor::new() } } @@ -267,18 +261,18 @@ impl<'a> Default for Executor<'a> { /// })); /// ``` #[derive(Debug)] -pub struct LocalExecutor<'a> { +pub struct LocalExecutor { /// The inner executor. - inner: once_cell::unsync::OnceCell>, + inner: once_cell::unsync::OnceCell, /// Make sure the type is `!Send` and `!Sync`. _marker: PhantomData>, } -impl UnwindSafe for LocalExecutor<'_> {} -impl RefUnwindSafe for LocalExecutor<'_> {} +impl UnwindSafe for LocalExecutor {} +impl RefUnwindSafe for LocalExecutor {} -impl<'a> LocalExecutor<'a> { +impl LocalExecutor { /// Creates a single-threaded executor. /// /// # Examples @@ -288,7 +282,7 @@ impl<'a> LocalExecutor<'a> { /// /// let local_ex = LocalExecutor::new(); /// ``` - pub const fn new() -> LocalExecutor<'a> { + pub const fn new() -> LocalExecutor { LocalExecutor { inner: once_cell::unsync::OnceCell::new(), _marker: PhantomData, @@ -308,8 +302,19 @@ impl<'a> LocalExecutor<'a> { /// println!("Hello world"); /// }); /// ``` - pub fn spawn(&self, future: impl Future + 'a) -> Task { - unsafe { self.inner().spawn_unchecked(future) } + pub fn spawn(&self, future: impl Future + 'static) -> Task { + let mut active = self.inner().state().active.lock().unwrap(); + let index = active.next_vacant(); + let state = self.inner().state().clone(); + let future = async move { + let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().remove(index))); + future.await + }; + + let (runnable, task) = async_task::spawn_local(future, self.schedule()); + active.insert(runnable.waker()); + runnable.schedule(); + task } /// Attempts to run a task if at least one is scheduled. @@ -375,14 +380,24 @@ impl<'a> LocalExecutor<'a> { self.inner().run(future).await } + /// Returns a function that schedules a runnable task when it gets woken up. + fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { + let state = self.inner().state().clone(); + + move |runnable| { + state.queue.push(runnable).unwrap(); + state.notify(); + } + } + /// Returns a reference to the inner executor. - fn inner(&self) -> &Executor<'a> { + fn inner(&self) -> &Executor { self.inner.get_or_init(|| Executor::new()) } } -impl<'a> Default for LocalExecutor<'a> { - fn default() -> LocalExecutor<'a> { +impl Default for LocalExecutor { + fn default() -> LocalExecutor { LocalExecutor::new() } } @@ -402,6 +417,7 @@ struct State { /// A list of sleeping tickers. sleepers: Mutex, + /// Currently active tasks. active: Mutex>, }