Revert "feat: Use actual thread local queues instead of using a RwLock"
This reverts commit 7592d4188a
.
This commit is contained in:
parent
b2dfa7c5ae
commit
91998f411c
|
@ -6,7 +6,7 @@ name = "async-executor"
|
|||
version = "1.9.0"
|
||||
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
|
||||
edition = "2021"
|
||||
rust-version = "1.61"
|
||||
rust-version = "1.60"
|
||||
description = "Async executor"
|
||||
license = "Apache-2.0 OR MIT"
|
||||
repository = "https://github.com/smol-rs/async-executor"
|
||||
|
@ -17,12 +17,10 @@ exclude = ["/.*"]
|
|||
[dependencies]
|
||||
async-lock = "3.0.0"
|
||||
async-task = "4.4.0"
|
||||
atomic-waker = "1.0"
|
||||
concurrent-queue = "2.0.0"
|
||||
fastrand = "2.0.0"
|
||||
futures-lite = { version = "2.0.0", default-features = false }
|
||||
slab = "0.4.4"
|
||||
thread_local = "1.1"
|
||||
|
||||
[target.'cfg(target_family = "wasm")'.dependencies]
|
||||
futures-lite = { version = "2.0.0", default-features = false, features = ["std"] }
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use std::future::Future;
|
||||
use std::thread::available_parallelism;
|
||||
|
||||
use async_executor::Executor;
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
//! An executor with task priorities.
|
||||
|
||||
use std::future::Future;
|
||||
use std::thread;
|
||||
|
||||
use async_executor::{Executor, Task};
|
||||
|
|
133
src/lib.rs
133
src/lib.rs
|
@ -34,20 +34,19 @@
|
|||
)]
|
||||
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
use std::panic::{RefUnwindSafe, UnwindSafe};
|
||||
use std::rc::Rc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex, TryLockError};
|
||||
use std::sync::{Arc, Mutex, RwLock, TryLockError};
|
||||
use std::task::{Poll, Waker};
|
||||
|
||||
use async_lock::OnceCell;
|
||||
use async_task::{Builder, Runnable};
|
||||
use atomic_waker::AtomicWaker;
|
||||
use concurrent_queue::ConcurrentQueue;
|
||||
use futures_lite::{future, prelude::*};
|
||||
use slab::Slab;
|
||||
use thread_local::ThreadLocal;
|
||||
|
||||
#[doc(no_inline)]
|
||||
pub use async_task::Task;
|
||||
|
@ -267,23 +266,8 @@ impl<'a> Executor<'a> {
|
|||
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
|
||||
let state = self.state().clone();
|
||||
|
||||
move |mut runnable| {
|
||||
// If possible, push into the current local queue and notify the ticker.
|
||||
if let Some(local) = state.local_queue.get() {
|
||||
runnable = if let Err(err) = local.queue.push(runnable) {
|
||||
err.into_inner()
|
||||
} else {
|
||||
// Wake up this thread if it's asleep, otherwise notify another
|
||||
// thread to try to have the task stolen.
|
||||
if let Some(waker) = local.waker.take() {
|
||||
waker.wake();
|
||||
} else {
|
||||
state.notify();
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
// If the local queue is full, fallback to pushing onto the global injector queue.
|
||||
// TODO: If possible, push into the current local queue and notify the ticker.
|
||||
move |runnable| {
|
||||
state.queue.push(runnable).unwrap();
|
||||
state.notify();
|
||||
}
|
||||
|
@ -526,16 +510,7 @@ struct State {
|
|||
queue: ConcurrentQueue<Runnable>,
|
||||
|
||||
/// Local queues created by runners.
|
||||
///
|
||||
/// If possible, tasks are scheduled onto the local queue, and will only defer
|
||||
/// to other global queue when they're full, or the task is being scheduled from
|
||||
/// a thread without a runner.
|
||||
///
|
||||
/// Note: if a runner terminates and drains its local queue, any subsequent
|
||||
/// spawn calls from the same thread will be added to the same queue, but won't
|
||||
/// be executed until `Executor::run` is run on the thread again, or another
|
||||
/// thread steals the task.
|
||||
local_queue: ThreadLocal<LocalQueue>,
|
||||
local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
|
||||
|
||||
/// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
|
||||
notified: AtomicBool,
|
||||
|
@ -552,7 +527,7 @@ impl State {
|
|||
fn new() -> State {
|
||||
State {
|
||||
queue: ConcurrentQueue::unbounded(),
|
||||
local_queue: ThreadLocal::new(),
|
||||
local_queues: RwLock::new(Vec::new()),
|
||||
notified: AtomicBool::new(true),
|
||||
sleepers: Mutex::new(Sleepers {
|
||||
count: 0,
|
||||
|
@ -679,12 +654,6 @@ impl Ticker<'_> {
|
|||
///
|
||||
/// Returns `false` if the ticker was already sleeping and unnotified.
|
||||
fn sleep(&mut self, waker: &Waker) -> bool {
|
||||
self.state
|
||||
.local_queue
|
||||
.get_or_default()
|
||||
.waker
|
||||
.register(waker);
|
||||
|
||||
let mut sleepers = self.state.sleepers.lock().unwrap();
|
||||
|
||||
match self.sleeping {
|
||||
|
@ -723,14 +692,7 @@ impl Ticker<'_> {
|
|||
|
||||
/// Waits for the next runnable task to run.
|
||||
async fn runnable(&mut self) -> Runnable {
|
||||
self.runnable_with(|| {
|
||||
self.state
|
||||
.local_queue
|
||||
.get()
|
||||
.and_then(|local| local.queue.pop().ok())
|
||||
.or_else(|| self.state.queue.pop().ok())
|
||||
})
|
||||
.await
|
||||
self.runnable_with(|| self.state.queue.pop().ok()).await
|
||||
}
|
||||
|
||||
/// Waits for the next runnable task to run, given a function that searches for a task.
|
||||
|
@ -792,6 +754,9 @@ struct Runner<'a> {
|
|||
/// Inner ticker.
|
||||
ticker: Ticker<'a>,
|
||||
|
||||
/// The local queue.
|
||||
local: Arc<ConcurrentQueue<Runnable>>,
|
||||
|
||||
/// Bumped every time a runnable task is found.
|
||||
ticks: usize,
|
||||
}
|
||||
|
@ -802,34 +767,38 @@ impl Runner<'_> {
|
|||
let runner = Runner {
|
||||
state,
|
||||
ticker: Ticker::new(state),
|
||||
local: Arc::new(ConcurrentQueue::bounded(512)),
|
||||
ticks: 0,
|
||||
};
|
||||
state
|
||||
.local_queues
|
||||
.write()
|
||||
.unwrap()
|
||||
.push(runner.local.clone());
|
||||
runner
|
||||
}
|
||||
|
||||
/// Waits for the next runnable task to run.
|
||||
async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
|
||||
let local = self.state.local_queue.get_or_default();
|
||||
|
||||
let runnable = self
|
||||
.ticker
|
||||
.runnable_with(|| {
|
||||
// Try the local queue.
|
||||
if let Ok(r) = local.queue.pop() {
|
||||
if let Ok(r) = self.local.pop() {
|
||||
return Some(r);
|
||||
}
|
||||
|
||||
// Try stealing from the global queue.
|
||||
if let Ok(r) = self.state.queue.pop() {
|
||||
steal(&self.state.queue, &local.queue);
|
||||
steal(&self.state.queue, &self.local);
|
||||
return Some(r);
|
||||
}
|
||||
|
||||
// Try stealing from other runners.
|
||||
let local_queues = &self.state.local_queue;
|
||||
let local_queues = self.state.local_queues.read().unwrap();
|
||||
|
||||
// Pick a random starting point in the iterator list and rotate the list.
|
||||
let n = local_queues.iter().count();
|
||||
let n = local_queues.len();
|
||||
let start = rng.usize(..n);
|
||||
let iter = local_queues
|
||||
.iter()
|
||||
|
@ -838,12 +807,12 @@ impl Runner<'_> {
|
|||
.take(n);
|
||||
|
||||
// Remove this runner's local queue.
|
||||
let iter = iter.filter(|other| !core::ptr::eq(*other, local));
|
||||
let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
|
||||
|
||||
// Try stealing from each local queue in the list.
|
||||
for other in iter {
|
||||
steal(&other.queue, &local.queue);
|
||||
if let Ok(r) = local.queue.pop() {
|
||||
for local in iter {
|
||||
steal(local, &self.local);
|
||||
if let Ok(r) = self.local.pop() {
|
||||
return Some(r);
|
||||
}
|
||||
}
|
||||
|
@ -857,7 +826,7 @@ impl Runner<'_> {
|
|||
|
||||
if self.ticks % 64 == 0 {
|
||||
// Steal tasks from the global queue to ensure fair task scheduling.
|
||||
steal(&self.state.queue, &local.queue);
|
||||
steal(&self.state.queue, &self.local);
|
||||
}
|
||||
|
||||
runnable
|
||||
|
@ -867,13 +836,15 @@ impl Runner<'_> {
|
|||
impl Drop for Runner<'_> {
|
||||
fn drop(&mut self) {
|
||||
// Remove the local queue.
|
||||
if let Some(local) = self.state.local_queue.get() {
|
||||
// Re-schedule remaining tasks in the local queue.
|
||||
for r in local.queue.try_iter() {
|
||||
// Explicitly reschedule the runnable back onto the global
|
||||
// queue to avoid rescheduling onto the local one.
|
||||
self.state.queue.push(r).unwrap();
|
||||
}
|
||||
self.state
|
||||
.local_queues
|
||||
.write()
|
||||
.unwrap()
|
||||
.retain(|local| !Arc::ptr_eq(local, &self.local));
|
||||
|
||||
// Re-schedule remaining tasks in the local queue.
|
||||
while let Ok(r) = self.local.pop() {
|
||||
r.schedule();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -933,13 +904,18 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_
|
|||
}
|
||||
|
||||
/// Debug wrapper for the local runners.
|
||||
struct LocalRunners<'a>(&'a ThreadLocal<LocalQueue>);
|
||||
struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
|
||||
|
||||
impl fmt::Debug for LocalRunners<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_list()
|
||||
.entries(self.0.iter().map(|local| local.queue.len()))
|
||||
.finish()
|
||||
match self.0.try_read() {
|
||||
Ok(lock) => f
|
||||
.debug_list()
|
||||
.entries(lock.iter().map(|queue| queue.len()))
|
||||
.finish(),
|
||||
Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
|
||||
Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -959,32 +935,11 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_
|
|||
f.debug_struct(name)
|
||||
.field("active", &ActiveTasks(&state.active))
|
||||
.field("global_tasks", &state.queue.len())
|
||||
.field("local_runners", &LocalRunners(&state.local_queue))
|
||||
.field("local_runners", &LocalRunners(&state.local_queues))
|
||||
.field("sleepers", &SleepCount(&state.sleepers))
|
||||
.finish()
|
||||
}
|
||||
|
||||
/// A queue local to each thread.
|
||||
///
|
||||
/// It's Default implementation is used for initializing each
|
||||
/// thread's queue via `ThreadLocal::get_or_default`.
|
||||
///
|
||||
/// The local queue *must* be flushed, and all pending runnables
|
||||
/// rescheduled onto the global queue when a runner is dropped.
|
||||
struct LocalQueue {
|
||||
queue: ConcurrentQueue<Runnable>,
|
||||
waker: AtomicWaker,
|
||||
}
|
||||
|
||||
impl Default for LocalQueue {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
queue: ConcurrentQueue::bounded(512),
|
||||
waker: AtomicWaker::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs a closure when dropped.
|
||||
struct CallOnDrop<F: FnMut()>(F);
|
||||
|
||||
|
|
Loading…
Reference in New Issue