diff --git a/src/lib.rs b/src/lib.rs index 2647a89..e963d3a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,132 +36,6 @@ use vec_arena::Arena; #[doc(no_inline)] pub use async_task::Task; -/// The state of a executor. -#[derive(Debug)] -struct State { - /// The global queue. - queue: ConcurrentQueue, - - /// Local queues created by runners. - local_queues: RwLock>>>, - - /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. - notified: AtomicBool, - - /// A list of sleeping tickers. - sleepers: Mutex, - - active: Mutex>, -} - -impl State { - /// Creates state for a new executor. - fn new() -> State { - State { - queue: ConcurrentQueue::unbounded(), - local_queues: RwLock::new(Vec::new()), - notified: AtomicBool::new(true), - sleepers: Mutex::new(Sleepers { - count: 0, - wakers: Vec::new(), - free_ids: Vec::new(), - }), - active: Mutex::new(Arena::new()), - } - } - - /// Notifies a sleeping ticker. - #[inline] - fn notify(&self) { - if !self - .notified - .compare_and_swap(false, true, Ordering::SeqCst) - { - let waker = self.sleepers.lock().unwrap().notify(); - if let Some(w) = waker { - w.wake(); - } - } - } -} - -/// A list of sleeping tickers. -#[derive(Debug)] -struct Sleepers { - /// Number of sleeping tickers (both notified and unnotified). - count: usize, - - /// IDs and wakers of sleeping unnotified tickers. - /// - /// A sleeping ticker is notified when its waker is missing from this list. - wakers: Vec<(usize, Waker)>, - - /// Reclaimed IDs. - free_ids: Vec, -} - -impl Sleepers { - /// Inserts a new sleeping ticker. - fn insert(&mut self, waker: &Waker) -> usize { - let id = match self.free_ids.pop() { - Some(id) => id, - None => self.count + 1, - }; - self.count += 1; - self.wakers.push((id, waker.clone())); - id - } - - /// Re-inserts a sleeping ticker's waker if it was notified. - /// - /// Returns `true` if the ticker was notified. - fn update(&mut self, id: usize, waker: &Waker) -> bool { - for item in &mut self.wakers { - if item.0 == id { - if !item.1.will_wake(waker) { - item.1 = waker.clone(); - } - return false; - } - } - - self.wakers.push((id, waker.clone())); - true - } - - /// Removes a previously inserted sleeping ticker. - /// - /// Returns `true` if the ticker was notified. - fn remove(&mut self, id: usize) -> bool { - self.count -= 1; - self.free_ids.push(id); - - for i in (0..self.wakers.len()).rev() { - if self.wakers[i].0 == id { - self.wakers.remove(i); - return false; - } - } - true - } - - /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. - fn is_notified(&self) -> bool { - self.count == 0 || self.count > self.wakers.len() - } - - /// Returns notification waker for a sleeping ticker. - /// - /// If a ticker was notified already or there are no tickers, `None` will be returned. - fn notify(&mut self) -> Option { - if self.wakers.len() == self.count { - self.wakers.pop().map(|item| item.1) - } else { - None - } - } -} - /// An async executor. /// /// # Examples @@ -357,14 +231,13 @@ impl<'a> Executor<'a> { impl Drop for Executor<'_> { fn drop(&mut self) { if let Some(state) = self.state.get() { - { - let mut state = state.active.lock().unwrap(); - for i in 0..state.capacity() { - if let Some(w) = state.remove(i) { - w.wake(); - } + let mut active = state.active.lock().unwrap(); + for i in 0..active.capacity() { + if let Some(w) = active.remove(i) { + w.wake(); } } + drop(active); while state.queue.pop().is_ok() {} } @@ -377,6 +250,269 @@ impl<'a> Default for Executor<'a> { } } +/// A thread-local executor. +/// +/// The executor can only be run on the thread that created it. +/// +/// # Examples +/// +/// ``` +/// use async_executor::LocalExecutor; +/// use futures_lite::future; +/// +/// let local_ex = LocalExecutor::new(); +/// +/// future::block_on(local_ex.run(async { +/// println!("Hello world!"); +/// })); +/// ``` +#[derive(Debug)] +pub struct LocalExecutor<'a> { + /// The inner executor. + inner: once_cell::unsync::OnceCell>, + + /// Make sure the type is `!Send` and `!Sync`. + _marker: PhantomData>, +} + +impl UnwindSafe for LocalExecutor<'_> {} +impl RefUnwindSafe for LocalExecutor<'_> {} + +impl<'a> LocalExecutor<'a> { + /// Creates a single-threaded executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let local_ex = LocalExecutor::new(); + /// ``` + pub const fn new() -> LocalExecutor<'a> { + LocalExecutor { + inner: once_cell::unsync::OnceCell::new(), + _marker: PhantomData, + } + } + + /// Spawns a task onto the executor. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let local_ex = LocalExecutor::new(); + /// + /// let task = local_ex.spawn(async { + /// println!("Hello world"); + /// }); + /// ``` + pub fn spawn(&self, future: impl Future + 'a) -> Task { + unsafe { self.inner().spawn_unchecked(future) } + } + + /// Attempts to run a task if at least one is scheduled. + /// + /// Running a scheduled task means simply polling its future once. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// + /// let ex = LocalExecutor::new(); + /// assert!(!ex.try_tick()); // no tasks to run + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// assert!(ex.try_tick()); // a task was found + /// ``` + pub fn try_tick(&self) -> bool { + self.inner().try_tick() + } + + /// Run a single task. + /// + /// Running a task means simply polling its future once. + /// + /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let ex = LocalExecutor::new(); + /// + /// let task = ex.spawn(async { + /// println!("Hello world"); + /// }); + /// future::block_on(ex.tick()); // runs the task + /// ``` + pub async fn tick(&self) { + self.inner().tick().await + } + + /// Runs the executor until the given future completes. + /// + /// # Examples + /// + /// ``` + /// use async_executor::LocalExecutor; + /// use futures_lite::future; + /// + /// let local_ex = LocalExecutor::new(); + /// + /// let task = local_ex.spawn(async { 1 + 2 }); + /// let res = future::block_on(local_ex.run(async { task.await * 2 })); + /// + /// assert_eq!(res, 6); + /// ``` + pub async fn run(&self, future: impl Future) -> T { + self.inner().run(future).await + } + + /// Returns a reference to the inner executor. + fn inner(&self) -> &Executor<'a> { + self.inner.get_or_init(|| Executor::new()) + } +} + +impl<'a> Default for LocalExecutor<'a> { + fn default() -> LocalExecutor<'a> { + LocalExecutor::new() + } +} + +/// The state of a executor. +#[derive(Debug)] +struct State { + /// The global queue. + queue: ConcurrentQueue, + + /// Local queues created by runners. + local_queues: RwLock>>>, + + /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. + notified: AtomicBool, + + /// A list of sleeping tickers. + sleepers: Mutex, + + active: Mutex>, +} + +impl State { + /// Creates state for a new executor. + fn new() -> State { + State { + queue: ConcurrentQueue::unbounded(), + local_queues: RwLock::new(Vec::new()), + notified: AtomicBool::new(true), + sleepers: Mutex::new(Sleepers { + count: 0, + wakers: Vec::new(), + free_ids: Vec::new(), + }), + active: Mutex::new(Arena::new()), + } + } + + /// Notifies a sleeping ticker. + #[inline] + fn notify(&self) { + if !self + .notified + .compare_and_swap(false, true, Ordering::SeqCst) + { + let waker = self.sleepers.lock().unwrap().notify(); + if let Some(w) = waker { + w.wake(); + } + } + } +} + +/// A list of sleeping tickers. +#[derive(Debug)] +struct Sleepers { + /// Number of sleeping tickers (both notified and unnotified). + count: usize, + + /// IDs and wakers of sleeping unnotified tickers. + /// + /// A sleeping ticker is notified when its waker is missing from this list. + wakers: Vec<(usize, Waker)>, + + /// Reclaimed IDs. + free_ids: Vec, +} + +impl Sleepers { + /// Inserts a new sleeping ticker. + fn insert(&mut self, waker: &Waker) -> usize { + let id = match self.free_ids.pop() { + Some(id) => id, + None => self.count + 1, + }; + self.count += 1; + self.wakers.push((id, waker.clone())); + id + } + + /// Re-inserts a sleeping ticker's waker if it was notified. + /// + /// Returns `true` if the ticker was notified. + fn update(&mut self, id: usize, waker: &Waker) -> bool { + for item in &mut self.wakers { + if item.0 == id { + if !item.1.will_wake(waker) { + item.1 = waker.clone(); + } + return false; + } + } + + self.wakers.push((id, waker.clone())); + true + } + + /// Removes a previously inserted sleeping ticker. + /// + /// Returns `true` if the ticker was notified. + fn remove(&mut self, id: usize) -> bool { + self.count -= 1; + self.free_ids.push(id); + + for i in (0..self.wakers.len()).rev() { + if self.wakers[i].0 == id { + self.wakers.remove(i); + return false; + } + } + true + } + + /// Returns `true` if a sleeping ticker is notified or no tickers are sleeping. + fn is_notified(&self) -> bool { + self.count == 0 || self.count > self.wakers.len() + } + + /// Returns notification waker for a sleeping ticker. + /// + /// If a ticker was notified already or there are no tickers, `None` will be returned. + fn notify(&mut self) -> Option { + if self.wakers.len() == self.count { + self.wakers.pop().map(|item| item.1) + } else { + None + } + } +} + /// Runs task one by one. #[derive(Debug)] struct Ticker<'a> { @@ -624,143 +760,6 @@ fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { } } -/// A thread-local executor. -/// -/// The executor can only be run on the thread that created it. -/// -/// # Examples -/// -/// ``` -/// use async_executor::LocalExecutor; -/// use futures_lite::future; -/// -/// let local_ex = LocalExecutor::new(); -/// -/// future::block_on(local_ex.run(async { -/// println!("Hello world!"); -/// })); -/// ``` -#[derive(Debug)] -pub struct LocalExecutor<'a> { - /// The inner executor. - inner: once_cell::unsync::OnceCell>, - - /// Make sure the type is `!Send` and `!Sync`. - _marker: PhantomData>, -} - -impl UnwindSafe for LocalExecutor<'_> {} -impl RefUnwindSafe for LocalExecutor<'_> {} - -impl<'a> LocalExecutor<'a> { - /// Creates a single-threaded executor. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// - /// let local_ex = LocalExecutor::new(); - /// ``` - pub const fn new() -> LocalExecutor<'a> { - LocalExecutor { - inner: once_cell::unsync::OnceCell::new(), - _marker: PhantomData, - } - } - - /// Spawns a task onto the executor. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// - /// let local_ex = LocalExecutor::new(); - /// - /// let task = local_ex.spawn(async { - /// println!("Hello world"); - /// }); - /// ``` - pub fn spawn(&self, future: impl Future + 'a) -> Task { - unsafe { self.inner().spawn_unchecked(future) } - } - - /// Attempts to run a task if at least one is scheduled. - /// - /// Running a scheduled task means simply polling its future once. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// - /// let ex = LocalExecutor::new(); - /// assert!(!ex.try_tick()); // no tasks to run - /// - /// let task = ex.spawn(async { - /// println!("Hello world"); - /// }); - /// assert!(ex.try_tick()); // a task was found - /// ``` - pub fn try_tick(&self) -> bool { - self.inner().try_tick() - } - - /// Run a single task. - /// - /// Running a task means simply polling its future once. - /// - /// If no tasks are scheduled when this method is called, it will wait until one is scheduled. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// use futures_lite::future; - /// - /// let ex = LocalExecutor::new(); - /// - /// let task = ex.spawn(async { - /// println!("Hello world"); - /// }); - /// future::block_on(ex.tick()); // runs the task - /// ``` - pub async fn tick(&self) { - self.inner().tick().await - } - - /// Runs the executor until the given future completes. - /// - /// # Examples - /// - /// ``` - /// use async_executor::LocalExecutor; - /// use futures_lite::future; - /// - /// let local_ex = LocalExecutor::new(); - /// - /// let task = local_ex.spawn(async { 1 + 2 }); - /// let res = future::block_on(local_ex.run(async { task.await * 2 })); - /// - /// assert_eq!(res, 6); - /// ``` - pub async fn run(&self, future: impl Future) -> T { - self.inner().run(future).await - } - - /// Returns a reference to the inner executor. - fn inner(&self) -> &Executor<'a> { - self.inner.get_or_init(|| Executor::new()) - } -} - -impl<'a> Default for LocalExecutor<'a> { - fn default() -> LocalExecutor<'a> { - LocalExecutor::new() - } -} - /// Runs a closure when dropped. struct CallOnDrop(F);