bugfix: Account for local queue corner cases

It turns out that with the current strategy it is possible for tasks to
be stuck in the local queue without any hope of being picked back up.
In practice this seems to happen when the only entities polling the
system are tickers, as opposed to runners. Since tickets don't steal
tasks, it is possible for tasks to be left over in the local queue that
don't filter out.

One possible solution is to make it so tickers steal tasks, but this
kind of defeats the point of tickers. So I've instead elected to replace
the current strategy with one that accounts for the corner cases with
local queues.

The main difference is that I replace the Sleepers struct with two
event_listener::Event's. One that handles tickers subscribed to the
global queue and one that handles tickers subscribed to the local queue.
The other main difference is that each local queue now has a reference
counter. If this count reaches zero, no tasks will be pushed to this
queue. Only runners increment or decrement this counter.

This makes the previously instituted tests pass, so hopefully this works
for most use cases.

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2024-03-01 21:11:14 -08:00 committed by John Nunley
parent d5dc7a8008
commit 22a9e8b305
3 changed files with 112 additions and 261 deletions

View File

@ -17,8 +17,8 @@ exclude = ["/.*"]
[dependencies]
async-lock = "3.0.0"
async-task = "4.4.0"
atomic-waker = "1.0"
concurrent-queue = "2.0.0"
event-listener = { version = "5.2.0", default-features = false, features = ["std"] }
fastrand = "2.0.0"
futures-lite = { version = "2.0.0", default-features = false }
slab = "0.4.4"

View File

@ -37,14 +37,14 @@ use std::fmt;
use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, TryLockError};
use std::task::{Poll, Waker};
use std::task::Waker;
use async_lock::OnceCell;
use async_task::{Builder, Runnable};
use atomic_waker::AtomicWaker;
use concurrent_queue::ConcurrentQueue;
use event_listener::{listener, Event};
use futures_lite::{future, prelude::*};
use slab::Slab;
use thread_local::ThreadLocal;
@ -225,7 +225,15 @@ impl<'a> Executor<'a> {
/// ```
pub async fn tick(&self) {
let state = self.state();
let runnable = Ticker::new(state).runnable().await;
let runnable = state
.tick_with(|local, steal| {
local
.queue
.pop()
.ok()
.or_else(|| if steal { state.queue.pop().ok() } else { None })
})
.await;
runnable.run();
}
@ -270,22 +278,23 @@ impl<'a> Executor<'a> {
move |mut runnable| {
// If possible, push into the current local queue and notify the ticker.
if let Some(local) = state.local_queue.get() {
runnable = if let Err(err) = local.queue.push(runnable) {
err.into_inner()
} else {
// Wake up this thread if it's asleep, otherwise notify another
// thread to try to have the task stolen.
if let Some(waker) = local.waker.take() {
waker.wake();
// Don't push into the local queue if no one is ticking it.
if local.tickers.load(Ordering::Acquire) > 0 {
runnable = if let Err(err) = local.queue.push(runnable) {
err.into_inner()
} else {
state.notify();
// Try to notify threads waiting on this queue. If there are
// none, notify another thread.
if local.waiters.notify_additional(1) == 0 {
state.new_tasks.notify_additional(1);
}
return;
}
return;
}
}
// If the local queue is full, fallback to pushing onto the global injector queue.
state.queue.push(runnable).unwrap();
state.notify();
state.new_tasks.notify_additional(1);
}
}
@ -315,6 +324,9 @@ impl Drop for Executor<'_> {
}
drop(active);
for local_queue in state.local_queue.iter() {
while local_queue.queue.pop().is_ok() {}
}
while state.queue.pop().is_ok() {}
}
}
@ -537,11 +549,8 @@ struct State {
/// thread steals the task.
local_queue: ThreadLocal<LocalQueue>,
/// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
notified: AtomicBool,
/// A list of sleeping tickers.
sleepers: Mutex<Sleepers>,
/// Tickers waiting on new tasks from the global queue.
new_tasks: Event,
/// Currently active tasks.
active: Mutex<Slab<Waker>>,
@ -553,12 +562,7 @@ impl State {
State {
queue: ConcurrentQueue::unbounded(),
local_queue: ThreadLocal::new(),
notified: AtomicBool::new(true),
sleepers: Mutex::new(Sleepers {
count: 0,
wakers: Vec::new(),
free_ids: Vec::new(),
}),
new_tasks: Event::new(),
active: Mutex::new(Slab::new()),
}
}
@ -566,218 +570,40 @@ impl State {
/// Notifies a sleeping ticker.
#[inline]
fn notify(&self) {
if self
.notified
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let waker = self.sleepers.lock().unwrap().notify();
if let Some(w) = waker {
w.wake();
}
}
}
}
/// A list of sleeping tickers.
struct Sleepers {
/// Number of sleeping tickers (both notified and unnotified).
count: usize,
/// IDs and wakers of sleeping unnotified tickers.
///
/// A sleeping ticker is notified when its waker is missing from this list.
wakers: Vec<(usize, Waker)>,
/// Reclaimed IDs.
free_ids: Vec<usize>,
}
impl Sleepers {
/// Inserts a new sleeping ticker.
fn insert(&mut self, waker: &Waker) -> usize {
let id = match self.free_ids.pop() {
Some(id) => id,
None => self.count + 1,
};
self.count += 1;
self.wakers.push((id, waker.clone()));
id
self.new_tasks.notify(1);
}
/// Re-inserts a sleeping ticker's waker if it was notified.
///
/// Returns `true` if the ticker was notified.
fn update(&mut self, id: usize, waker: &Waker) -> bool {
for item in &mut self.wakers {
if item.0 == id {
if !item.1.will_wake(waker) {
item.1 = waker.clone();
}
return false;
}
}
/// Run a tick using the provided function to get the next task.
async fn tick_with(
&self,
mut local_ticker: impl FnMut(&LocalQueue, bool) -> Option<Runnable>,
) -> Runnable {
let local = self.local_queue.get_or_default();
self.wakers.push((id, waker.clone()));
true
}
/// Removes a previously inserted sleeping ticker.
///
/// Returns `true` if the ticker was notified.
fn remove(&mut self, id: usize) -> bool {
self.count -= 1;
self.free_ids.push(id);
for i in (0..self.wakers.len()).rev() {
if self.wakers[i].0 == id {
self.wakers.remove(i);
return false;
}
}
true
}
/// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
fn is_notified(&self) -> bool {
self.count == 0 || self.count > self.wakers.len()
}
/// Returns notification waker for a sleeping ticker.
///
/// If a ticker was notified already or there are no tickers, `None` will be returned.
fn notify(&mut self) -> Option<Waker> {
if self.wakers.len() == self.count {
self.wakers.pop().map(|item| item.1)
} else {
None
}
}
}
/// Runs task one by one.
struct Ticker<'a> {
/// The executor state.
state: &'a State,
/// Set to a non-zero sleeper ID when in sleeping state.
///
/// States a ticker can be in:
/// 1) Woken.
/// 2a) Sleeping and unnotified.
/// 2b) Sleeping and notified.
sleeping: usize,
}
impl Ticker<'_> {
/// Creates a ticker.
fn new(state: &State) -> Ticker<'_> {
Ticker { state, sleeping: 0 }
}
/// Moves the ticker into sleeping and unnotified state.
///
/// Returns `false` if the ticker was already sleeping and unnotified.
fn sleep(&mut self, waker: &Waker) -> bool {
self.state
.local_queue
.get_or_default()
.waker
.register(waker);
let mut sleepers = self.state.sleepers.lock().unwrap();
match self.sleeping {
// Move to sleeping state.
0 => {
self.sleeping = sleepers.insert(waker);
loop {
// Try to get a runnable from the local queue.
if let Some(runnable) = local_ticker(local, false) {
return runnable;
}
// Already sleeping, check if notified.
id => {
if !sleepers.update(id, waker) {
return false;
}
// Register a local waiter.
listener!(local.waiters => local_listener);
// Try for a global runner.
if let Ok(runnable) = self.queue.pop() {
return runnable;
}
}
self.state
.notified
.store(sleepers.is_notified(), Ordering::Release);
// Register a global waiter.
listener!(self.new_tasks => global_listener);
true
}
/// Moves the ticker into woken state.
fn wake(&mut self) {
if self.sleeping != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
sleepers.remove(self.sleeping);
self.state
.notified
.store(sleepers.is_notified(), Ordering::Release);
}
self.sleeping = 0;
}
/// Waits for the next runnable task to run.
async fn runnable(&mut self) -> Runnable {
self.runnable_with(|| {
self.state
.local_queue
.get()
.and_then(|local| local.queue.pop().ok())
.or_else(|| self.state.queue.pop().ok())
})
.await
}
/// Waits for the next runnable task to run, given a function that searches for a task.
async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
future::poll_fn(|cx| {
loop {
match search() {
None => {
// Move to sleeping and unnotified state.
if !self.sleep(cx.waker()) {
// If already sleeping and unnotified, return.
return Poll::Pending;
}
}
Some(r) => {
// Wake up.
self.wake();
// Notify another ticker now to pick up where this ticker left off, just in
// case running the task takes a long time.
self.state.notify();
return Poll::Ready(r);
}
}
// Try for both again.
if let Some(runnable) = local_ticker(local, true) {
return runnable;
}
})
.await
}
}
impl Drop for Ticker<'_> {
fn drop(&mut self) {
// If this ticker is in sleeping state, it must be removed from the sleepers list.
if self.sleeping != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
let notified = sleepers.remove(self.sleeping);
self.state
.notified
.store(sleepers.is_notified(), Ordering::Release);
// If this ticker was notified, then notify another ticker.
if notified {
drop(sleepers);
self.state.notify();
}
// Wait on both listeners in parallel.
local_listener.or(global_listener).await;
}
}
}
@ -789,9 +615,6 @@ struct Runner<'a> {
/// The executor state.
state: &'a State,
/// Inner ticker.
ticker: Ticker<'a>,
/// Bumped every time a runnable task is found.
ticks: usize,
}
@ -799,12 +622,9 @@ struct Runner<'a> {
impl Runner<'_> {
/// Creates a runner and registers it in the executor state.
fn new(state: &State) -> Runner<'_> {
let runner = Runner {
state,
ticker: Ticker::new(state),
ticks: 0,
};
runner
state.local_queue.get_or_default().start_ticking();
Runner { state, ticks: 0 }
}
/// Waits for the next runnable task to run.
@ -812,13 +632,18 @@ impl Runner<'_> {
let local = self.state.local_queue.get_or_default();
let runnable = self
.ticker
.runnable_with(|| {
.state
.tick_with(|_, try_stealing| {
// Try the local queue.
if let Ok(r) = local.queue.pop() {
return Some(r);
}
// Remaining work involves stealing.
if !try_stealing {
return None;
}
// Try stealing from the global queue.
if let Ok(r) = self.state.queue.pop() {
steal(&self.state.queue, &local.queue);
@ -868,12 +693,7 @@ impl Drop for Runner<'_> {
fn drop(&mut self) {
// Remove the local queue.
if let Some(local) = self.state.local_queue.get() {
// Re-schedule remaining tasks in the local queue.
for r in local.queue.try_iter() {
// Explicitly reschedule the runnable back onto the global
// queue to avoid rescheduling onto the local one.
self.state.queue.push(r).unwrap();
}
local.stop_ticking(self.state);
}
}
}
@ -943,24 +763,10 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_
}
}
/// Debug wrapper for the sleepers.
struct SleepCount<'a>(&'a Mutex<Sleepers>);
impl fmt::Debug for SleepCount<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.0.try_lock() {
Ok(lock) => fmt::Debug::fmt(&lock.count, f),
Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
}
}
}
f.debug_struct(name)
.field("active", &ActiveTasks(&state.active))
.field("global_tasks", &state.queue.len())
.field("local_runners", &LocalRunners(&state.local_queue))
.field("sleepers", &SleepCount(&state.sleepers))
.finish()
}
@ -972,15 +778,54 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_
/// The local queue *must* be flushed, and all pending runnables
/// rescheduled onto the global queue when a runner is dropped.
struct LocalQueue {
/// Queue of concurrent tasks.
queue: ConcurrentQueue<Runnable>,
waker: AtomicWaker,
/// Tickers waiting on an event from this queue.
waiters: Event,
/// Number of tickers waiting on this queue.
tickers: AtomicUsize,
}
impl Default for LocalQueue {
fn default() -> Self {
Self {
queue: ConcurrentQueue::bounded(512),
waker: AtomicWaker::new(),
waiters: Event::new(),
tickers: AtomicUsize::new(0),
}
}
}
impl LocalQueue {
/// Indicate that we are now waiting on this queue.
fn start_ticking(&self) {
// Relaxed ordering is fine here.
let old_tickers = self.tickers.fetch_add(1, Ordering::Relaxed);
if old_tickers > isize::MAX as usize {
panic!("too many tickers waiting on one thread");
}
}
/// Indicate that we are no longer waiting on this queue.
#[inline]
fn stop_ticking(&self, state: &State) {
if self.tickers.fetch_sub(1, Ordering::Release) == 1 {
// Make sure everyone knows we're about to release tasks.
std::sync::atomic::fence(Ordering::Acquire);
// Drain any tasks.
self.drain_tasks(state);
}
}
/// Drain all tasks from this queue.
#[cold]
fn drain_tasks(&self, state: &State) {
while let Ok(task) = self.queue.pop() {
state.queue.push(task).ok();
state.notify();
}
}
}

View File

@ -6,11 +6,15 @@ use futures_lite::prelude::*;
use std::sync::Arc;
use std::thread;
#[cfg(not(miri))]
use std::time::Duration;
fn do_run<Fut: Future<Output = ()>>(mut f: impl FnMut(Arc<Executor<'static>>) -> Fut) {
// This should not run for longer than two minutes.
#[cfg(not(miri))]
let (_stop_timeout, stopper) = async_channel::bounded::<()>(1);
#[cfg(not(miri))]
thread::spawn(move || {
block_on(async move {
let timeout = async {
@ -86,6 +90,8 @@ fn yield_now() {
do_run(|ex| async move { ex.spawn(future::yield_now()).await })
}
// Miri does not support timers.
#[cfg(not(miri))]
#[test]
fn timer() {
do_run(|ex| async move {