From 6c6c1b1c2fb0d3768953eefebace581052103cf7 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sat, 29 Aug 2020 18:26:28 +0200 Subject: [PATCH] Add tick() and try_tick() --- examples/priority.rs | 88 ++++++++++++++ src/lib.rs | 280 +++++++++++++++++++++++++++++++++++-------- 2 files changed, 316 insertions(+), 52 deletions(-) create mode 100644 examples/priority.rs diff --git a/examples/priority.rs b/examples/priority.rs new file mode 100644 index 0000000..4e5920a --- /dev/null +++ b/examples/priority.rs @@ -0,0 +1,88 @@ +//! An executor with task priorities. + +use std::future::Future; +use std::thread; + +use async_executor::{Executor, Task}; +use futures_lite::{future, FutureExt}; + +/// Task priority. +#[repr(usize)] +#[derive(Debug, Clone, Copy)] +enum Priority { + High = 0, + Medium = 1, + Low = 2, +} + +/// An executor with task priorities. +/// +/// Tasks with lower priorities only get polled when there are no tasks with higher priorities. +struct PriorityExecutor { + ex: [Executor; 3], +} + +impl PriorityExecutor { + /// Creates a new executor. + const fn new() -> PriorityExecutor { + PriorityExecutor { + ex: [Executor::new(), Executor::new(), Executor::new()], + } + } + + /// Spawns a task with the given priority. + fn spawn( + &self, + priority: Priority, + future: impl Future + Send + 'static, + ) -> Task { + self.ex[priority as usize].spawn(future) + } + + /// Runs the executor until the future completes. + async fn run(&self, future: impl Future) -> T { + future + .or(async { + // Keep ticking inner executors forever. + loop { + let t0 = self.ex[0].tick(); + let t1 = self.ex[1].tick(); + let t2 = self.ex[2].tick(); + + // Wait until one of the ticks completes, trying them in order from highest + // priority to lowest priority. + t0.or(t1).or(t2).await; + } + }) + .await + } +} + +fn main() { + static EX: PriorityExecutor = PriorityExecutor::new(); + + // Spawn a thread running the executor forever. + thread::spawn(|| { + let forever = future::pending::<()>(); + future::block_on(EX.run(forever)); + }); + + let mut tasks = Vec::new(); + + for _ in 0..20 { + // Choose a random priority. + let choice = [Priority::High, Priority::Medium, Priority::Low]; + let priority = choice[fastrand::usize(..choice.len())]; + + // Spawn a task with this priority. + tasks.push(EX.spawn(priority, async move { + println!("{:?}", priority); + future::yield_now().await; + println!("{:?}", priority); + })); + } + + for task in tasks { + future::block_on(task); + } +} diff --git a/src/lib.rs b/src/lib.rs index 59f6ebe..06a885e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -32,7 +32,7 @@ use std::sync::{Arc, Mutex, RwLock}; use std::task::{Context, Poll, Waker}; use concurrent_queue::ConcurrentQueue; -use futures_lite::future; +use futures_lite::{future, ready, FutureExt}; /// A runnable future, ready for execution. /// @@ -84,6 +84,9 @@ type Runnable = async_task::Task<()>; #[derive(Debug)] pub struct Task(Option>); +impl UnwindSafe for Task {} +impl RefUnwindSafe for Task {} + impl Task { /// Detaches the task to let it keep running in the background. /// @@ -260,14 +263,17 @@ impl Sleepers { } /// Removes a previously inserted sleeping ticker. - fn remove(&mut self, id: u64) { + /// + /// Returns `true` if the ticker was notified. + fn remove(&mut self, id: u64) -> bool { self.count -= 1; for i in (0..self.wakers.len()).rev() { if self.wakers[i].0 == id { self.wakers.remove(i); - return; + return false; } } + true } /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. @@ -358,6 +364,65 @@ impl Executor { Task(Some(handle)) } + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// + /// let ex = Executor::new(); + /// assert!(!ex.try_tick()); // no tasks to run + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(ex.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + match self.state().queue.pop() { + Err(_) => false, + Ok(r) => { + // Notify another ticker now to pick up where this ticker left off, just in case + // running the task takes a long time. + self.state().notify(); + + // Run the task. + r.run(); + true + } + } + } + + /// Run a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::Executor; + /// use futures_lite::future; + /// + /// let ex = Executor::new(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// future::block_on(ex.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + // Create a ticker that doesn't use sharding. + let ticker = Ticker::new(self.state()); + + // Keep trying until a single `poll_tick()` is successful. + future::poll_fn(|cx| ticker.poll_tick(cx)).await + } + /// Runs the executor until the given future completes. /// /// # Examples @@ -374,30 +439,30 @@ impl Executor { /// assert_eq!(res, 6); /// ``` pub async fn run(&self, future: impl Future) -> T { - let ticker = Ticker::new(self.state()); + // Create a ticker that uses sharding. + let runner = Runner::new(self.state()); - future::race( - future, - future::poll_fn(|cx| { - // Run a batch of tasks. - for _ in 0..200 { - if !ticker.tick(cx.waker()) { - return Poll::Pending; - } - } + // A future that ticks the executor forever. + let tick_forever = future::poll_fn(|cx| { + // Run a batch of tasks. + for _ in 0..200 { + ready!(runner.poll_tick(cx)); + } - // If there are more tasks, yield. - cx.waker().wake_by_ref(); - Poll::Pending - }), - ) - .await + // If there are more tasks, yield. + cx.waker().wake_by_ref(); + Poll::Pending + }); + + // Run `future` and `tick_forever` concurrently until `future` completes. + future.or(tick_forever).await } /// Returns a function that schedules a runnable task when it gets woken up. fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { let state = self.state().clone(); + // TODO(stjepang): If possible, push into the current shard and notify the ticker. move |runnable| { state.queue.push(runnable).unwrap(); state.notify(); @@ -410,6 +475,12 @@ impl Executor { } } +impl Drop for Executor { + fn drop(&mut self) { + // TODO(stjepang): Cancel all remaining tasks. + } +} + impl Default for Executor { fn default() -> Executor { Executor::new() @@ -424,9 +495,6 @@ struct Ticker<'a> { /// The executor state. state: &'a State, - /// A shard of the global queue. - shard: Arc>, - /// Set to `true` when in sleeping state. /// /// States a ticker can be in: @@ -434,25 +502,15 @@ struct Ticker<'a> { /// 2a) Sleeping and unnotified. /// 2b) Sleeping and notified. sleeping: Cell>, - - /// Bumped every time a task is run. - ticks: Cell, } -impl UnwindSafe for Ticker<'_> {} -impl RefUnwindSafe for Ticker<'_> {} - impl Ticker<'_> { /// Creates a ticker and registers it in the executor state. fn new(state: &State) -> Ticker<'_> { - let ticker = Ticker { + Ticker { state, - shard: Arc::new(ConcurrentQueue::bounded(512)), sleeping: Cell::new(None), - ticks: Cell::new(0), - }; - state.shards.write().unwrap().push(ticker.shard.clone()); - ticker + } } /// Moves the ticker into sleeping and unnotified state. @@ -481,8 +539,6 @@ impl Ticker<'_> { } /// Moves the ticker into woken state. - /// - /// Returns `false` if the ticker was already woken. fn wake(&self) { if let Some(id) = self.sleeping.take() { let mut sleepers = self.state.sleepers.lock().unwrap(); @@ -494,23 +550,101 @@ impl Ticker<'_> { } } - /// Executes a single task. + /// Attempts to execute a single task. /// - /// This method takes a scheduled task and polls its future. It returns `true` if a scheduled - /// task was found, or `false` otherwise. - pub fn tick(&self, waker: &Waker) -> bool { + /// This method takes a scheduled task and polls its future. + fn poll_tick(&self, cx: &mut Context<'_>) -> Poll<()> { + loop { + match self.state.queue.pop() { + Err(_) => { + // Move to sleeping and unnotified state. + if !self.sleep(cx.waker()) { + // If already sleeping and unnotified, return. + return Poll::Pending; + } + } + Ok(r) => { + // Wake up. + self.wake(); + + // Notify another ticker now to pick up where this ticker left off, just in + // case running the task takes a long time. + self.state.notify(); + + // Run the task. + r.run(); + return Poll::Ready(()); + } + } + } + } +} + +impl Drop for Ticker<'_> { + fn drop(&mut self) { + // If this ticker is in sleeping state, it must be removed from the sleepers list. + if let Some(id) = self.sleeping.take() { + let mut sleepers = self.state.sleepers.lock().unwrap(); + let notified = sleepers.remove(id); + + self.state + .notified + .swap(sleepers.is_notified(), Ordering::SeqCst); + + // If this ticker was notified, then notify another ticker. + if notified { + drop(sleepers); + self.state.notify(); + } + } + } +} + +/// Executes tasks in a work-stealing executor. +/// +/// A ticker represents a "worker" in a work-stealing executor. +#[derive(Debug)] +struct Runner<'a> { + state: &'a State, + + ticker: Ticker<'a>, + + /// A shard of the global queue. + shard: Arc>, + + /// Bumped every time a task is run. + ticks: Cell, +} + +impl Runner<'_> { + /// Creates a runner and registers it in the executor state. + fn new(state: &State) -> Runner<'_> { + let runner = Runner { + state, + ticker: Ticker::new(state), + shard: Arc::new(ConcurrentQueue::bounded(512)), + ticks: Cell::new(0), + }; + state.shards.write().unwrap().push(runner.shard.clone()); + runner + } + + /// Attempts to execute a single task. + /// + /// This method takes a scheduled task and polls its future. + fn poll_tick(&self, cx: &mut Context<'_>) -> Poll<()> { loop { match self.search() { None => { // Move to sleeping and unnotified state. - if !self.sleep(waker) { + if !self.ticker.sleep(cx.waker()) { // If already sleeping and unnotified, return. - return false; + return Poll::Pending; } } Some(r) => { // Wake up. - self.wake(); + self.ticker.wake(); // Notify another ticker now to pick up where this ticker left off, just in // case running the task takes a long time. @@ -527,8 +661,7 @@ impl Ticker<'_> { // Run the task. r.run(); - - return true; + return Poll::Ready(()); } } } @@ -536,6 +669,7 @@ impl Ticker<'_> { /// Finds the next task to run. fn search(&self) -> Option { + // Try the shard. if let Ok(r) = self.shard.pop() { return Some(r); } @@ -569,10 +703,9 @@ impl Ticker<'_> { } } -impl Drop for Ticker<'_> { +impl Drop for Runner<'_> { fn drop(&mut self) { - // Wake and unregister the ticker. - self.wake(); + // Remove the shard. self.state .shards .write() @@ -583,10 +716,6 @@ impl Drop for Ticker<'_> { while let Ok(r) = self.shard.pop() { r.schedule(); } - // Notify another ticker to start searching for tasks. - self.state.notify(); - - // TODO(stjepang): Cancel all remaining tasks. } } @@ -637,6 +766,9 @@ pub struct LocalExecutor { _marker: PhantomData>, } +impl UnwindSafe for LocalExecutor {} +impl RefUnwindSafe for LocalExecutor {} + impl LocalExecutor { /// Creates a single-threaded executor. /// @@ -674,6 +806,50 @@ impl LocalExecutor { Task(Some(handle)) } + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let ex = LocalExecutor::new(); + /// assert!(!ex.try_tick()); // no tasks to run + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(ex.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + self.inner().try_tick() + } + + /// Run a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// future::block_on(ex.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + self.inner().tick().await + } + /// Runs the executor until the given future completes. /// /// # Examples