Revert "bugfix: Account for local queue corner cases"

This reverts commit 22a9e8b305.
This commit is contained in:
John Nunley 2024-03-23 16:03:09 -07:00 committed by John Nunley
parent 22a9e8b305
commit c90fd306cd
3 changed files with 261 additions and 112 deletions

View File

@ -17,8 +17,8 @@ exclude = ["/.*"]
[dependencies]
async-lock = "3.0.0"
async-task = "4.4.0"
atomic-waker = "1.0"
concurrent-queue = "2.0.0"
event-listener = { version = "5.2.0", default-features = false, features = ["std"] }
fastrand = "2.0.0"
futures-lite = { version = "2.0.0", default-features = false }
slab = "0.4.4"

View File

@ -37,14 +37,14 @@ use std::fmt;
use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, TryLockError};
use std::task::Waker;
use std::task::{Poll, Waker};
use async_lock::OnceCell;
use async_task::{Builder, Runnable};
use atomic_waker::AtomicWaker;
use concurrent_queue::ConcurrentQueue;
use event_listener::{listener, Event};
use futures_lite::{future, prelude::*};
use slab::Slab;
use thread_local::ThreadLocal;
@ -225,15 +225,7 @@ impl<'a> Executor<'a> {
/// ```
pub async fn tick(&self) {
let state = self.state();
let runnable = state
.tick_with(|local, steal| {
local
.queue
.pop()
.ok()
.or_else(|| if steal { state.queue.pop().ok() } else { None })
})
.await;
let runnable = Ticker::new(state).runnable().await;
runnable.run();
}
@ -278,23 +270,22 @@ impl<'a> Executor<'a> {
move |mut runnable| {
// If possible, push into the current local queue and notify the ticker.
if let Some(local) = state.local_queue.get() {
// Don't push into the local queue if no one is ticking it.
if local.tickers.load(Ordering::Acquire) > 0 {
runnable = if let Err(err) = local.queue.push(runnable) {
err.into_inner()
runnable = if let Err(err) = local.queue.push(runnable) {
err.into_inner()
} else {
// Wake up this thread if it's asleep, otherwise notify another
// thread to try to have the task stolen.
if let Some(waker) = local.waker.take() {
waker.wake();
} else {
// Try to notify threads waiting on this queue. If there are
// none, notify another thread.
if local.waiters.notify_additional(1) == 0 {
state.new_tasks.notify_additional(1);
}
return;
state.notify();
}
return;
}
}
// If the local queue is full, fallback to pushing onto the global injector queue.
state.queue.push(runnable).unwrap();
state.new_tasks.notify_additional(1);
state.notify();
}
}
@ -324,9 +315,6 @@ impl Drop for Executor<'_> {
}
drop(active);
for local_queue in state.local_queue.iter() {
while local_queue.queue.pop().is_ok() {}
}
while state.queue.pop().is_ok() {}
}
}
@ -549,8 +537,11 @@ struct State {
/// thread steals the task.
local_queue: ThreadLocal<LocalQueue>,
/// Tickers waiting on new tasks from the global queue.
new_tasks: Event,
/// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
notified: AtomicBool,
/// A list of sleeping tickers.
sleepers: Mutex<Sleepers>,
/// Currently active tasks.
active: Mutex<Slab<Waker>>,
@ -562,7 +553,12 @@ impl State {
State {
queue: ConcurrentQueue::unbounded(),
local_queue: ThreadLocal::new(),
new_tasks: Event::new(),
notified: AtomicBool::new(true),
sleepers: Mutex::new(Sleepers {
count: 0,
wakers: Vec::new(),
free_ids: Vec::new(),
}),
active: Mutex::new(Slab::new()),
}
}
@ -570,40 +566,218 @@ impl State {
/// Notifies a sleeping ticker.
#[inline]
fn notify(&self) {
self.new_tasks.notify(1);
if self
.notified
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
let waker = self.sleepers.lock().unwrap().notify();
if let Some(w) = waker {
w.wake();
}
}
}
}
/// A list of sleeping tickers.
struct Sleepers {
/// Number of sleeping tickers (both notified and unnotified).
count: usize,
/// IDs and wakers of sleeping unnotified tickers.
///
/// A sleeping ticker is notified when its waker is missing from this list.
wakers: Vec<(usize, Waker)>,
/// Reclaimed IDs.
free_ids: Vec<usize>,
}
impl Sleepers {
/// Inserts a new sleeping ticker.
fn insert(&mut self, waker: &Waker) -> usize {
let id = match self.free_ids.pop() {
Some(id) => id,
None => self.count + 1,
};
self.count += 1;
self.wakers.push((id, waker.clone()));
id
}
/// Run a tick using the provided function to get the next task.
async fn tick_with(
&self,
mut local_ticker: impl FnMut(&LocalQueue, bool) -> Option<Runnable>,
) -> Runnable {
let local = self.local_queue.get_or_default();
/// Re-inserts a sleeping ticker's waker if it was notified.
///
/// Returns `true` if the ticker was notified.
fn update(&mut self, id: usize, waker: &Waker) -> bool {
for item in &mut self.wakers {
if item.0 == id {
if !item.1.will_wake(waker) {
item.1 = waker.clone();
}
return false;
}
}
loop {
// Try to get a runnable from the local queue.
if let Some(runnable) = local_ticker(local, false) {
return runnable;
self.wakers.push((id, waker.clone()));
true
}
/// Removes a previously inserted sleeping ticker.
///
/// Returns `true` if the ticker was notified.
fn remove(&mut self, id: usize) -> bool {
self.count -= 1;
self.free_ids.push(id);
for i in (0..self.wakers.len()).rev() {
if self.wakers[i].0 == id {
self.wakers.remove(i);
return false;
}
}
true
}
/// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
fn is_notified(&self) -> bool {
self.count == 0 || self.count > self.wakers.len()
}
/// Returns notification waker for a sleeping ticker.
///
/// If a ticker was notified already or there are no tickers, `None` will be returned.
fn notify(&mut self) -> Option<Waker> {
if self.wakers.len() == self.count {
self.wakers.pop().map(|item| item.1)
} else {
None
}
}
}
/// Runs task one by one.
struct Ticker<'a> {
/// The executor state.
state: &'a State,
/// Set to a non-zero sleeper ID when in sleeping state.
///
/// States a ticker can be in:
/// 1) Woken.
/// 2a) Sleeping and unnotified.
/// 2b) Sleeping and notified.
sleeping: usize,
}
impl Ticker<'_> {
/// Creates a ticker.
fn new(state: &State) -> Ticker<'_> {
Ticker { state, sleeping: 0 }
}
/// Moves the ticker into sleeping and unnotified state.
///
/// Returns `false` if the ticker was already sleeping and unnotified.
fn sleep(&mut self, waker: &Waker) -> bool {
self.state
.local_queue
.get_or_default()
.waker
.register(waker);
let mut sleepers = self.state.sleepers.lock().unwrap();
match self.sleeping {
// Move to sleeping state.
0 => {
self.sleeping = sleepers.insert(waker);
}
// Register a local waiter.
listener!(local.waiters => local_listener);
// Try for a global runner.
if let Ok(runnable) = self.queue.pop() {
return runnable;
// Already sleeping, check if notified.
id => {
if !sleepers.update(id, waker) {
return false;
}
}
}
// Register a global waiter.
listener!(self.new_tasks => global_listener);
self.state
.notified
.store(sleepers.is_notified(), Ordering::Release);
// Try for both again.
if let Some(runnable) = local_ticker(local, true) {
return runnable;
true
}
/// Moves the ticker into woken state.
fn wake(&mut self) {
if self.sleeping != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
sleepers.remove(self.sleeping);
self.state
.notified
.store(sleepers.is_notified(), Ordering::Release);
}
self.sleeping = 0;
}
/// Waits for the next runnable task to run.
async fn runnable(&mut self) -> Runnable {
self.runnable_with(|| {
self.state
.local_queue
.get()
.and_then(|local| local.queue.pop().ok())
.or_else(|| self.state.queue.pop().ok())
})
.await
}
/// Waits for the next runnable task to run, given a function that searches for a task.
async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
future::poll_fn(|cx| {
loop {
match search() {
None => {
// Move to sleeping and unnotified state.
if !self.sleep(cx.waker()) {
// If already sleeping and unnotified, return.
return Poll::Pending;
}
}
Some(r) => {
// Wake up.
self.wake();
// Notify another ticker now to pick up where this ticker left off, just in
// case running the task takes a long time.
self.state.notify();
return Poll::Ready(r);
}
}
}
})
.await
}
}
// Wait on both listeners in parallel.
local_listener.or(global_listener).await;
impl Drop for Ticker<'_> {
fn drop(&mut self) {
// If this ticker is in sleeping state, it must be removed from the sleepers list.
if self.sleeping != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
let notified = sleepers.remove(self.sleeping);
self.state
.notified
.store(sleepers.is_notified(), Ordering::Release);
// If this ticker was notified, then notify another ticker.
if notified {
drop(sleepers);
self.state.notify();
}
}
}
}
@ -615,6 +789,9 @@ struct Runner<'a> {
/// The executor state.
state: &'a State,
/// Inner ticker.
ticker: Ticker<'a>,
/// Bumped every time a runnable task is found.
ticks: usize,
}
@ -622,9 +799,12 @@ struct Runner<'a> {
impl Runner<'_> {
/// Creates a runner and registers it in the executor state.
fn new(state: &State) -> Runner<'_> {
state.local_queue.get_or_default().start_ticking();
Runner { state, ticks: 0 }
let runner = Runner {
state,
ticker: Ticker::new(state),
ticks: 0,
};
runner
}
/// Waits for the next runnable task to run.
@ -632,18 +812,13 @@ impl Runner<'_> {
let local = self.state.local_queue.get_or_default();
let runnable = self
.state
.tick_with(|_, try_stealing| {
.ticker
.runnable_with(|| {
// Try the local queue.
if let Ok(r) = local.queue.pop() {
return Some(r);
}
// Remaining work involves stealing.
if !try_stealing {
return None;
}
// Try stealing from the global queue.
if let Ok(r) = self.state.queue.pop() {
steal(&self.state.queue, &local.queue);
@ -693,7 +868,12 @@ impl Drop for Runner<'_> {
fn drop(&mut self) {
// Remove the local queue.
if let Some(local) = self.state.local_queue.get() {
local.stop_ticking(self.state);
// Re-schedule remaining tasks in the local queue.
for r in local.queue.try_iter() {
// Explicitly reschedule the runnable back onto the global
// queue to avoid rescheduling onto the local one.
self.state.queue.push(r).unwrap();
}
}
}
}
@ -763,10 +943,24 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_
}
}
/// Debug wrapper for the sleepers.
struct SleepCount<'a>(&'a Mutex<Sleepers>);
impl fmt::Debug for SleepCount<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.0.try_lock() {
Ok(lock) => fmt::Debug::fmt(&lock.count, f),
Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
}
}
}
f.debug_struct(name)
.field("active", &ActiveTasks(&state.active))
.field("global_tasks", &state.queue.len())
.field("local_runners", &LocalRunners(&state.local_queue))
.field("sleepers", &SleepCount(&state.sleepers))
.finish()
}
@ -778,54 +972,15 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_
/// The local queue *must* be flushed, and all pending runnables
/// rescheduled onto the global queue when a runner is dropped.
struct LocalQueue {
/// Queue of concurrent tasks.
queue: ConcurrentQueue<Runnable>,
/// Tickers waiting on an event from this queue.
waiters: Event,
/// Number of tickers waiting on this queue.
tickers: AtomicUsize,
waker: AtomicWaker,
}
impl Default for LocalQueue {
fn default() -> Self {
Self {
queue: ConcurrentQueue::bounded(512),
waiters: Event::new(),
tickers: AtomicUsize::new(0),
}
}
}
impl LocalQueue {
/// Indicate that we are now waiting on this queue.
fn start_ticking(&self) {
// Relaxed ordering is fine here.
let old_tickers = self.tickers.fetch_add(1, Ordering::Relaxed);
if old_tickers > isize::MAX as usize {
panic!("too many tickers waiting on one thread");
}
}
/// Indicate that we are no longer waiting on this queue.
#[inline]
fn stop_ticking(&self, state: &State) {
if self.tickers.fetch_sub(1, Ordering::Release) == 1 {
// Make sure everyone knows we're about to release tasks.
std::sync::atomic::fence(Ordering::Acquire);
// Drain any tasks.
self.drain_tasks(state);
}
}
/// Drain all tasks from this queue.
#[cold]
fn drain_tasks(&self, state: &State) {
while let Ok(task) = self.queue.pop() {
state.queue.push(task).ok();
state.notify();
waker: AtomicWaker::new(),
}
}
}

View File

@ -6,15 +6,11 @@ use futures_lite::prelude::*;
use std::sync::Arc;
use std::thread;
#[cfg(not(miri))]
use std::time::Duration;
fn do_run<Fut: Future<Output = ()>>(mut f: impl FnMut(Arc<Executor<'static>>) -> Fut) {
// This should not run for longer than two minutes.
#[cfg(not(miri))]
let (_stop_timeout, stopper) = async_channel::bounded::<()>(1);
#[cfg(not(miri))]
thread::spawn(move || {
block_on(async move {
let timeout = async {
@ -90,8 +86,6 @@ fn yield_now() {
do_run(|ex| async move { ex.spawn(future::yield_now()).await })
}
// Miri does not support timers.
#[cfg(not(miri))]
#[test]
fn timer() {
do_run(|ex| async move {