mirror of https://github.com/stjepang/smol
Merge pull request #169 from stjepang/refactor
Refactor the executor and I/O event
This commit is contained in:
commit
b2b282fbce
10
Cargo.toml
10
Cargo.toml
|
@ -26,14 +26,14 @@ tokio02 = ["tokio"]
|
|||
|
||||
[dependencies]
|
||||
async-task = "3.0.0"
|
||||
crossbeam-deque = "0.7.3"
|
||||
crossbeam-queue = "0.2.1"
|
||||
crossbeam-utils = "0.7.2"
|
||||
futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] }
|
||||
blocking = "0.4.4"
|
||||
concurrent-queue = "1.1.1"
|
||||
fastrand = "1.1.0"
|
||||
futures-io = { version = "0.3.5", default-features = false, features = ["std"] }
|
||||
futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] }
|
||||
once_cell = "1.3.1"
|
||||
piper = "0.1.2"
|
||||
scoped-tls-hkt = "0.1.2"
|
||||
scoped-tls = "1.0.0"
|
||||
slab = "0.4.2"
|
||||
socket2 = { version = "0.3.12", features = ["pair", "unix"] }
|
||||
|
||||
|
|
|
@ -7,11 +7,7 @@
|
|||
//! [`futures::executor::block_on()`]: https://docs.rs/futures/0.3/futures/executor/fn.block_on.html
|
||||
//! [blog-post]: https://stjepang.github.io/2020/01/25/build-your-own-block-on.html
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::future::Future;
|
||||
use std::task::{Context, Poll, Waker};
|
||||
|
||||
use crossbeam_utils::sync::Parker;
|
||||
|
||||
use crate::context;
|
||||
|
||||
|
@ -43,33 +39,7 @@ use crate::context;
|
|||
/// })
|
||||
/// ```
|
||||
///
|
||||
/// [`run()`]: crate::run()
|
||||
/// [`run()`]: `crate::run()`
|
||||
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
|
||||
thread_local! {
|
||||
// Parker and waker associated with the current thread.
|
||||
static CACHE: RefCell<(Parker, Waker)> = {
|
||||
let parker = Parker::new();
|
||||
let unparker = parker.unparker().clone();
|
||||
let waker = async_task::waker_fn(move || unparker.unpark());
|
||||
RefCell::new((parker, waker))
|
||||
};
|
||||
}
|
||||
|
||||
CACHE.with(|cache| {
|
||||
// Panic if `block_on()` is called recursively.
|
||||
let (parker, waker) = &*cache.borrow();
|
||||
|
||||
// If enabled, set up tokio before execution begins.
|
||||
context::enter(|| {
|
||||
futures_util::pin_mut!(future);
|
||||
let cx = &mut Context::from_waker(&waker);
|
||||
|
||||
loop {
|
||||
match future.as_mut().poll(cx) {
|
||||
Poll::Ready(output) => return output,
|
||||
Poll::Pending => parker.park(),
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
context::enter(|| blocking::block_on(future))
|
||||
}
|
||||
|
|
|
@ -0,0 +1,497 @@
|
|||
use std::cell::Cell;
|
||||
use std::future::Future;
|
||||
use std::panic;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::thread::{self, ThreadId};
|
||||
|
||||
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
|
||||
use scoped_tls::scoped_thread_local;
|
||||
use slab::Slab;
|
||||
|
||||
use crate::task::{Runnable, Task};
|
||||
|
||||
scoped_thread_local! {
|
||||
static WORKER: Worker
|
||||
}
|
||||
|
||||
/// State shared between [`Queue`] and [`Worker`].
|
||||
struct Global {
|
||||
/// The global queue.
|
||||
queue: ConcurrentQueue<Runnable>,
|
||||
|
||||
/// Shards of the global queue created by workers.
|
||||
shards: RwLock<Slab<Arc<ConcurrentQueue<Runnable>>>>,
|
||||
|
||||
/// Set to `true` when a sleeping worker is notified or no workers are sleeping.
|
||||
notified: AtomicBool,
|
||||
|
||||
/// A list of sleeping workers.
|
||||
sleepers: Mutex<Sleepers>,
|
||||
}
|
||||
|
||||
impl Global {
|
||||
/// Notifies a sleeping worker.
|
||||
fn notify(&self) {
|
||||
if !self
|
||||
.notified
|
||||
.compare_and_swap(false, true, Ordering::SeqCst)
|
||||
{
|
||||
let callback = self.sleepers.lock().unwrap().notify();
|
||||
if let Some(cb) = callback {
|
||||
cb.call();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A list of sleeping workers.
|
||||
struct Sleepers {
|
||||
/// Number of sleeping workers.
|
||||
count: usize,
|
||||
|
||||
/// Callbacks of sleeping unnotified workers.
|
||||
callbacks: Vec<Callback>,
|
||||
}
|
||||
|
||||
impl Sleepers {
|
||||
/// Inserts a new sleeping worker.
|
||||
fn insert(&mut self, callback: &Callback) {
|
||||
self.count += 1;
|
||||
self.callbacks.push(callback.clone());
|
||||
}
|
||||
|
||||
/// Updates the callback of an already inserted worker.
|
||||
fn update(&mut self, callback: &Callback) {
|
||||
if self.callbacks.iter().all(|cb| cb != callback) {
|
||||
self.callbacks.push(callback.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes a previously inserted worker.
|
||||
fn remove(&mut self, callback: &Callback) {
|
||||
self.count -= 1;
|
||||
for i in (0..self.callbacks.len()).rev() {
|
||||
if &self.callbacks[i] == callback {
|
||||
self.callbacks.remove(i);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` if a sleeping worker is notified or no workers are sleeping.
|
||||
fn is_notified(&self) -> bool {
|
||||
self.count == 0 || self.count > self.callbacks.len()
|
||||
}
|
||||
|
||||
/// Returns notification callback for a sleeping worker.
|
||||
///
|
||||
/// If a worker was notified already or there are no workers, `None` will be returned.
|
||||
fn notify(&mut self) -> Option<Callback> {
|
||||
if self.callbacks.len() == self.count {
|
||||
self.callbacks.pop()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A queue for spawning tasks.
|
||||
pub(crate) struct Queue {
|
||||
global: Arc<Global>,
|
||||
}
|
||||
|
||||
impl Queue {
|
||||
/// Creates a new queue for spawning tasks.
|
||||
pub fn new() -> Queue {
|
||||
Queue {
|
||||
global: Arc::new(Global {
|
||||
queue: ConcurrentQueue::unbounded(),
|
||||
shards: RwLock::new(Slab::new()),
|
||||
notified: AtomicBool::new(true),
|
||||
sleepers: Mutex::new(Sleepers {
|
||||
count: 0,
|
||||
callbacks: Vec::new(),
|
||||
}),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a future onto this queue.
|
||||
///
|
||||
/// Returns a [`Task`] handle for the spawned task.
|
||||
pub fn spawn<T: Send + 'static>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> Task<T> {
|
||||
let global = self.global.clone();
|
||||
|
||||
// The function that schedules a runnable task when it gets woken up.
|
||||
let schedule = move |runnable| {
|
||||
if WORKER.is_set() {
|
||||
WORKER.with(|w| {
|
||||
if Arc::ptr_eq(&global, &w.global) {
|
||||
if let Err(err) = w.shard.push(runnable) {
|
||||
global.queue.push(err.into_inner()).unwrap();
|
||||
}
|
||||
} else {
|
||||
global.queue.push(runnable).unwrap();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
global.queue.push(runnable).unwrap();
|
||||
}
|
||||
|
||||
global.notify();
|
||||
};
|
||||
|
||||
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
|
||||
let (runnable, handle) = async_task::spawn(future, schedule, ());
|
||||
runnable.schedule();
|
||||
Task(Some(handle))
|
||||
}
|
||||
|
||||
/// Registers a new worker.
|
||||
///
|
||||
/// The worker will automatically deregister itself when dropped.
|
||||
pub fn worker(&self, notify: impl Fn() + Send + Sync + 'static) -> Worker {
|
||||
let mut shards = self.global.shards.write().unwrap();
|
||||
let vacant = shards.vacant_entry();
|
||||
|
||||
// Create a worker and put its stealer handle into the executor.
|
||||
let worker = Worker {
|
||||
key: vacant.key(),
|
||||
global: Arc::new(self.global.clone()),
|
||||
shard: SlotQueue {
|
||||
slot: Cell::new(None),
|
||||
queue: Arc::new(ConcurrentQueue::bounded(512)),
|
||||
},
|
||||
local: SlotQueue {
|
||||
slot: Cell::new(None),
|
||||
queue: Arc::new(ConcurrentQueue::unbounded()),
|
||||
},
|
||||
callback: Callback::new(notify),
|
||||
sleeping: Cell::new(false),
|
||||
ticker: Cell::new(0),
|
||||
};
|
||||
vacant.insert(worker.shard.queue.clone());
|
||||
|
||||
worker
|
||||
}
|
||||
}
|
||||
|
||||
/// A worker that participates in the work-stealing executor.
|
||||
///
|
||||
/// Each invocation of `run()` creates its own worker.
|
||||
pub(crate) struct Worker {
|
||||
/// The ID of this worker obtained during registration.
|
||||
key: usize,
|
||||
|
||||
/// The global queue.
|
||||
global: Arc<Arc<Global>>,
|
||||
|
||||
/// A shard of the global queue.
|
||||
shard: SlotQueue<Runnable>,
|
||||
|
||||
/// Local queue for `!Send` tasks.
|
||||
local: SlotQueue<Runnable>,
|
||||
|
||||
/// Callback invoked to wake this worker up.
|
||||
callback: Callback,
|
||||
|
||||
/// Set to `true` when in sleeping state.
|
||||
sleeping: Cell<bool>,
|
||||
|
||||
/// Bumped every time a task is run.
|
||||
ticker: Cell<usize>,
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
/// Spawns a local future onto this executor.
|
||||
///
|
||||
/// Returns a [`Task`] handle for the spawned task.
|
||||
pub fn spawn_local<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
|
||||
let queue = self.local.queue.clone();
|
||||
let callback = self.callback.clone();
|
||||
let id = thread_id();
|
||||
|
||||
// The function that schedules a runnable task when it gets woken up.
|
||||
let schedule = move |runnable| {
|
||||
if thread_id() == id && WORKER.is_set() {
|
||||
WORKER.with(|w| {
|
||||
if Arc::ptr_eq(&queue, &w.local.queue) {
|
||||
w.local.push(runnable).unwrap();
|
||||
} else {
|
||||
queue.push(runnable).unwrap();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
queue.push(runnable).unwrap();
|
||||
}
|
||||
|
||||
callback.call();
|
||||
};
|
||||
|
||||
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
|
||||
let (runnable, handle) = async_task::spawn_local(future, schedule, ());
|
||||
runnable.schedule();
|
||||
Task(Some(handle))
|
||||
}
|
||||
|
||||
/// Enters the context of this executor.
|
||||
fn enter<T>(&self, f: impl FnOnce() -> T) -> T {
|
||||
// TODO(stjepang): Allow recursive executors.
|
||||
if WORKER.is_set() {
|
||||
panic!("cannot run an executor inside another executor");
|
||||
}
|
||||
WORKER.set(self, f)
|
||||
}
|
||||
|
||||
/// Moves the worker into sleeping state.
|
||||
fn sleep(&self) -> bool {
|
||||
let mut sleepers = self.global.sleepers.lock().unwrap();
|
||||
|
||||
if self.sleeping.get() {
|
||||
sleepers.update(&self.callback);
|
||||
self.global
|
||||
.notified
|
||||
.swap(sleepers.is_notified(), Ordering::SeqCst);
|
||||
false
|
||||
} else {
|
||||
sleepers.insert(&self.callback);
|
||||
self.global
|
||||
.notified
|
||||
.swap(sleepers.is_notified(), Ordering::SeqCst);
|
||||
self.sleeping.set(true);
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// Moves the worker into woken state.
|
||||
fn wake(&self) -> bool {
|
||||
if self.sleeping.get() {
|
||||
let mut sleepers = self.global.sleepers.lock().unwrap();
|
||||
sleepers.remove(&self.callback);
|
||||
self.global
|
||||
.notified
|
||||
.swap(sleepers.is_notified(), Ordering::SeqCst);
|
||||
self.sleeping.set(false);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs a single task and returns `true` if one was found.
|
||||
pub fn tick(&self) -> bool {
|
||||
loop {
|
||||
match self.search() {
|
||||
None => {
|
||||
// Go to sleep and then:
|
||||
// - If already in sleeping state, return.
|
||||
// - Otherwise, search again.
|
||||
if !self.sleep() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Some(r) => {
|
||||
// Wake up.
|
||||
if !self.wake() {
|
||||
// If already woken, notify another worker.
|
||||
self.global.notify();
|
||||
}
|
||||
|
||||
// Bump the ticker.
|
||||
let ticker = self.ticker.get();
|
||||
self.ticker.set(ticker.wrapping_add(1));
|
||||
|
||||
// Flush slots to ensure fair task scheduling.
|
||||
if ticker % 16 == 0 {
|
||||
if let Err(err) = self.shard.flush() {
|
||||
self.global.queue.push(err.into_inner()).unwrap();
|
||||
self.global.notify();
|
||||
}
|
||||
self.local.flush().unwrap();
|
||||
}
|
||||
|
||||
// Steal tasks from the global queue to ensure fair task scheduling.
|
||||
if ticker % 64 == 0 {
|
||||
self.shard.steal(&self.global.queue);
|
||||
}
|
||||
|
||||
// Run the task.
|
||||
if self.enter(|| r.run()) {
|
||||
// The task was woken while it was running, which means it got
|
||||
// scheduled the moment running completed. Therefore, it is now inside
|
||||
// the slot and would be the next task to run.
|
||||
//
|
||||
// Instead of re-running the task in the next iteration, let's flush
|
||||
// the slot in order to give other tasks a chance to run.
|
||||
//
|
||||
// This is a necessary step to ensure task yielding works as expected.
|
||||
// If a task wakes itself and returns `Poll::Pending`, we don't want it
|
||||
// to run immediately after that because that'd defeat the whole
|
||||
// purpose of yielding.
|
||||
if let Err(err) = self.shard.flush() {
|
||||
self.global.queue.push(err.into_inner()).unwrap();
|
||||
self.global.notify();
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Finds the next task to run.
|
||||
fn search(&self) -> Option<Runnable> {
|
||||
if self.ticker.get() % 2 == 0 {
|
||||
// On even ticks, look into the local queue and then into the shard.
|
||||
if let Ok(r) = self.local.pop().or_else(|_| self.shard.pop()) {
|
||||
return Some(r);
|
||||
}
|
||||
} else {
|
||||
// On odd ticks, look into the shard and then into the local queue.
|
||||
if let Ok(r) = self.shard.pop().or_else(|_| self.local.pop()) {
|
||||
return Some(r);
|
||||
}
|
||||
}
|
||||
|
||||
// Try stealing from the global queue.
|
||||
self.shard.steal(&self.global.queue);
|
||||
if let Ok(r) = self.shard.pop() {
|
||||
return Some(r);
|
||||
}
|
||||
|
||||
// Try stealing from other shards.
|
||||
let shards = self.global.shards.read().unwrap();
|
||||
|
||||
// Pick a random starting point in the iterator list and rotate the list.
|
||||
let n = shards.len();
|
||||
let start = fastrand::usize(..n);
|
||||
let iter = shards.iter().chain(shards.iter()).skip(start).take(n);
|
||||
|
||||
// Remove this worker's shard.
|
||||
let iter = iter.filter(|(key, _)| *key != self.key);
|
||||
let iter = iter.map(|(_, shard)| shard);
|
||||
|
||||
// Try stealing from each shard in the list.
|
||||
for shard in iter {
|
||||
self.shard.steal(shard);
|
||||
if let Ok(r) = self.shard.pop() {
|
||||
return Some(r);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Worker {
|
||||
fn drop(&mut self) {
|
||||
// Wake and unregister the worker.
|
||||
self.wake();
|
||||
self.global.shards.write().unwrap().remove(self.key);
|
||||
|
||||
// Re-schedule remaining tasks in the shard.
|
||||
while let Ok(r) = self.shard.pop() {
|
||||
r.schedule();
|
||||
}
|
||||
// Notify another worker to start searching for tasks.
|
||||
self.global.notify();
|
||||
|
||||
// TODO(stjepang): Close the local queue and empty it.
|
||||
}
|
||||
}
|
||||
|
||||
/// A queue with a single-item slot in front of it.
|
||||
struct SlotQueue<T> {
|
||||
slot: Cell<Option<T>>,
|
||||
queue: Arc<ConcurrentQueue<T>>,
|
||||
}
|
||||
|
||||
impl<T> SlotQueue<T> {
|
||||
/// Pushes an item into the slot, overflowing the old item into the queue.
|
||||
fn push(&self, t: T) -> Result<(), PushError<T>> {
|
||||
match self.slot.replace(Some(t)) {
|
||||
None => Ok(()),
|
||||
Some(t) => self.queue.push(t),
|
||||
}
|
||||
}
|
||||
|
||||
/// Pops an item from the slot, or queue if the slot is empty.
|
||||
fn pop(&self) -> Result<T, PopError> {
|
||||
match self.slot.take() {
|
||||
None => self.queue.pop(),
|
||||
Some(t) => Ok(t),
|
||||
}
|
||||
}
|
||||
|
||||
/// Flushes the slot into the queue.
|
||||
fn flush(&self) -> Result<(), PushError<T>> {
|
||||
match self.slot.take() {
|
||||
None => Ok(()),
|
||||
Some(t) => self.queue.push(t),
|
||||
}
|
||||
}
|
||||
|
||||
/// Steals some items from another queue.
|
||||
fn steal(&self, from: &ConcurrentQueue<T>) {
|
||||
// Flush the slot before stealing.
|
||||
if let Err(err) = self.flush() {
|
||||
self.slot.set(Some(err.into_inner()));
|
||||
return;
|
||||
}
|
||||
|
||||
// Half of `from`'s length rounded up.
|
||||
let mut count = (from.len() + 1) / 2;
|
||||
|
||||
if count > 0 {
|
||||
// Don't steal more than fits into the queue.
|
||||
if let Some(cap) = self.queue.capacity() {
|
||||
count = count.min(cap - self.queue.len());
|
||||
}
|
||||
|
||||
// Steal tasks.
|
||||
for _ in 0..count {
|
||||
if let Ok(t) = from.pop() {
|
||||
assert!(self.queue.push(t).is_ok());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Same as `std::thread::current().id()`, but more efficient.
|
||||
fn thread_id() -> ThreadId {
|
||||
thread_local! {
|
||||
static ID: ThreadId = thread::current().id();
|
||||
}
|
||||
|
||||
ID.try_with(|id| *id)
|
||||
.unwrap_or_else(|_| thread::current().id())
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Callback(Arc<Box<dyn Fn() + Send + Sync>>);
|
||||
|
||||
impl Callback {
|
||||
fn new(f: impl Fn() + Send + Sync + 'static) -> Callback {
|
||||
Callback(Arc::new(Box::new(f)))
|
||||
}
|
||||
|
||||
fn call(&self) {
|
||||
(self.0)();
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialEq for Callback {
|
||||
fn eq(&self, other: &Callback) -> bool {
|
||||
Arc::ptr_eq(&self.0, &other.0)
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for Callback {}
|
|
@ -8,7 +8,7 @@
|
|||
use std::io::{self, Read, Write};
|
||||
#[cfg(windows)]
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{self, AtomicBool, Ordering};
|
||||
use std::sync::atomic::{self, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
|
@ -24,9 +24,6 @@ type Notifier = linux::EventFd;
|
|||
|
||||
/// A self-pipe.
|
||||
struct Inner {
|
||||
/// Set to `true` if notified.
|
||||
flag: AtomicBool,
|
||||
|
||||
/// The writer side, emptied by `clear()`.
|
||||
writer: Notifier,
|
||||
|
||||
|
@ -44,7 +41,6 @@ impl IoEvent {
|
|||
let (writer, reader) = notifier()?;
|
||||
|
||||
Ok(IoEvent(Arc::new(Inner {
|
||||
flag: AtomicBool::new(false),
|
||||
writer,
|
||||
reader: Async::new(reader)?,
|
||||
})))
|
||||
|
@ -55,46 +51,21 @@ impl IoEvent {
|
|||
// Publish all in-memory changes before setting the flag.
|
||||
atomic::fence(Ordering::SeqCst);
|
||||
|
||||
// If the flag is not set...
|
||||
if !self.0.flag.load(Ordering::SeqCst) {
|
||||
// If this thread sets it...
|
||||
if !self.0.flag.swap(true, Ordering::SeqCst) {
|
||||
// Trigger an I/O event by writing a byte into the sending socket.
|
||||
let _ = (&self.0.writer).write(&1u64.to_ne_bytes());
|
||||
let _ = (&self.0.writer).flush();
|
||||
// Trigger an I/O event by writing a byte into the sending socket.
|
||||
let _ = (&self.0.writer).write(&1u64.to_ne_bytes());
|
||||
let _ = (&self.0.writer).flush();
|
||||
|
||||
// Re-register to wake up the poller.
|
||||
let _ = self.0.reader.reregister_io_event();
|
||||
}
|
||||
}
|
||||
// Re-register to wake up the poller.
|
||||
let _ = self.0.reader.reregister_io_event();
|
||||
}
|
||||
|
||||
/// Sets the flag to `false`.
|
||||
pub fn clear(&self) -> bool {
|
||||
pub fn clear(&self) {
|
||||
// Read all available bytes from the receiving socket.
|
||||
while self.0.reader.get_ref().read(&mut [0; 64]).is_ok() {}
|
||||
let value = self.0.flag.swap(false, Ordering::SeqCst);
|
||||
|
||||
// Publish all in-memory changes after clearing the flag.
|
||||
atomic::fence(Ordering::SeqCst);
|
||||
value
|
||||
}
|
||||
|
||||
/// Waits until notified.
|
||||
///
|
||||
/// You should assume notifications may spuriously occur.
|
||||
pub async fn notified(&self) {
|
||||
self.0
|
||||
.reader
|
||||
.read_with(|_| {
|
||||
if self.0.flag.load(Ordering::SeqCst) {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(io::ErrorKind::WouldBlock.into())
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("failure while waiting on a self-pipe");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -119,15 +119,15 @@ mod async_io;
|
|||
mod block_on;
|
||||
mod blocking;
|
||||
mod context;
|
||||
mod executor;
|
||||
mod io_event;
|
||||
mod parking;
|
||||
mod reactor;
|
||||
mod run;
|
||||
mod sys;
|
||||
mod task;
|
||||
mod thread_local;
|
||||
mod throttle;
|
||||
mod timer;
|
||||
mod work_stealing;
|
||||
|
||||
pub use self::blocking::{iter, reader, writer};
|
||||
pub use async_io::Async;
|
||||
|
|
|
@ -0,0 +1,278 @@
|
|||
use std::cell::Cell;
|
||||
use std::fmt;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use slab::Slab;
|
||||
|
||||
use crate::io_event::IoEvent;
|
||||
use crate::reactor::Reactor;
|
||||
|
||||
static REGISTRY: Lazy<Mutex<Slab<Unparker>>> = Lazy::new(|| Mutex::new(Slab::new()));
|
||||
|
||||
/// Parks a thread.
|
||||
pub(crate) struct Parker {
|
||||
key: Cell<Option<usize>>,
|
||||
unparker: Unparker,
|
||||
}
|
||||
|
||||
unsafe impl Send for Parker {}
|
||||
|
||||
impl Parker {
|
||||
/// Creates a new [`Parker`].
|
||||
pub fn new() -> Parker {
|
||||
Parker {
|
||||
key: Cell::new(None),
|
||||
unparker: Unparker {
|
||||
inner: Arc::new(Inner {
|
||||
state: AtomicUsize::new(EMPTY),
|
||||
lock: Mutex::new(()),
|
||||
cvar: Condvar::new(),
|
||||
}),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks the current thread until the token is made available.
|
||||
pub fn park(&self) {
|
||||
self.register();
|
||||
self.unparker.inner.park(None);
|
||||
}
|
||||
|
||||
/// Blocks the current thread until the token is made available or the timeout is reached.
|
||||
pub fn park_timeout(&self, timeout: Duration) -> bool {
|
||||
self.register();
|
||||
self.unparker.inner.park(Some(timeout))
|
||||
}
|
||||
|
||||
// /// Blocks the current thread until the token is made available or the deadline is reached.
|
||||
// pub fn park_deadline(&self, deadline: Instant) -> bool {
|
||||
// self.register();
|
||||
// self.unparker
|
||||
// .inner
|
||||
// .park(Some(deadline.saturating_duration_since(Instant::now())))
|
||||
// }
|
||||
//
|
||||
// /// Atomically makes the token available if it is not already.
|
||||
// pub fn unpark(&self) {
|
||||
// self.unparker.unpark()
|
||||
// }
|
||||
|
||||
/// Returns a handle for unparking.
|
||||
pub fn unparker(&self) -> Unparker {
|
||||
self.unparker.clone()
|
||||
}
|
||||
|
||||
fn register(&self) {
|
||||
if self.key.get().is_none() {
|
||||
let mut reg = REGISTRY.lock().unwrap();
|
||||
let key = reg.insert(self.unparker.clone());
|
||||
self.key.set(Some(key));
|
||||
}
|
||||
}
|
||||
|
||||
fn unregister(&self) {
|
||||
if let Some(key) = self.key.take() {
|
||||
let mut reg = REGISTRY.lock().unwrap();
|
||||
reg.remove(key);
|
||||
|
||||
// Notify another parker to make sure the reactor keeps getting polled.
|
||||
if let Some((_, u)) = reg.iter().next() {
|
||||
u.unpark();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Parker {
|
||||
fn drop(&mut self) {
|
||||
self.unregister();
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Parker {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.pad("Parker { .. }")
|
||||
}
|
||||
}
|
||||
|
||||
/// Unparks a thread.
|
||||
pub(crate) struct Unparker {
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
unsafe impl Send for Unparker {}
|
||||
unsafe impl Sync for Unparker {}
|
||||
|
||||
impl Unparker {
|
||||
/// Atomically makes the token available if it is not already.
|
||||
pub fn unpark(&self) {
|
||||
self.inner.unpark()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Unparker {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.pad("Unparker { .. }")
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for Unparker {
|
||||
fn clone(&self) -> Unparker {
|
||||
Unparker {
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const EMPTY: usize = 0;
|
||||
const PARKED: usize = 1;
|
||||
const POLLING: usize = 2;
|
||||
const NOTIFIED: usize = 3;
|
||||
|
||||
static EVENT: Lazy<IoEvent> = Lazy::new(|| IoEvent::new().unwrap());
|
||||
|
||||
struct Inner {
|
||||
state: AtomicUsize,
|
||||
lock: Mutex<()>,
|
||||
cvar: Condvar,
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
fn park(&self, timeout: Option<Duration>) -> bool {
|
||||
// If we were previously notified then we consume this notification and return quickly.
|
||||
if self
|
||||
.state
|
||||
.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
|
||||
.is_ok()
|
||||
{
|
||||
// Process available I/O events.
|
||||
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
|
||||
reactor_lock
|
||||
.react(Some(Duration::from_secs(0)))
|
||||
.expect("failure while polling I/O");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// If the timeout is zero, then there is no need to actually block.
|
||||
if let Some(dur) = timeout {
|
||||
if dur == Duration::from_millis(0) {
|
||||
// Process available I/O events.
|
||||
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
|
||||
reactor_lock
|
||||
.react(Some(Duration::from_secs(0)))
|
||||
.expect("failure while polling I/O");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise we need to coordinate going to sleep.
|
||||
let mut reactor_lock = Reactor::get().try_lock();
|
||||
let state = match reactor_lock {
|
||||
None => PARKED,
|
||||
Some(_) => POLLING,
|
||||
};
|
||||
let mut m = self.lock.lock().unwrap();
|
||||
|
||||
match self.state.compare_exchange(EMPTY, state, SeqCst, SeqCst) {
|
||||
Ok(_) => {}
|
||||
// Consume this notification to avoid spurious wakeups in the next park.
|
||||
Err(NOTIFIED) => {
|
||||
// We must read `state` here, even though we know it will be `NOTIFIED`. This is
|
||||
// because `unpark` may have been called again since we read `NOTIFIED` in the
|
||||
// `compare_exchange` above. We must perform an acquire operation that synchronizes
|
||||
// with that `unpark` to observe any writes it made before the call to `unpark`. To
|
||||
// do that we must read from the write it made to `state`.
|
||||
let old = self.state.swap(EMPTY, SeqCst);
|
||||
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
|
||||
return true;
|
||||
}
|
||||
Err(n) => panic!("inconsistent park_timeout state: {}", n),
|
||||
}
|
||||
|
||||
match timeout {
|
||||
None => {
|
||||
loop {
|
||||
// Block the current thread on the conditional variable.
|
||||
match &mut reactor_lock {
|
||||
None => m = self.cvar.wait(m).unwrap(),
|
||||
Some(reactor_lock) => {
|
||||
drop(m);
|
||||
|
||||
reactor_lock.react(None).expect("failure while polling I/O");
|
||||
EVENT.clear();
|
||||
|
||||
m = self.lock.lock().unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
match self.state.compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) {
|
||||
Ok(_) => return true, // got a notification
|
||||
Err(_) => {} // spurious wakeup, go back to sleep
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(timeout) => {
|
||||
// Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
|
||||
// notification we just want to unconditionally set `state` back to `EMPTY`, either
|
||||
// consuming a notification or un-flagging ourselves as parked.
|
||||
let _m = match reactor_lock.as_mut() {
|
||||
None => self.cvar.wait_timeout(m, timeout).unwrap().0,
|
||||
Some(reactor_lock) => {
|
||||
drop(m);
|
||||
let deadline = Instant::now() + timeout;
|
||||
loop {
|
||||
reactor_lock
|
||||
.react(Some(deadline.saturating_duration_since(Instant::now())))
|
||||
.expect("failure while polling I/O");
|
||||
EVENT.clear();
|
||||
|
||||
if Instant::now() >= deadline {
|
||||
break;
|
||||
}
|
||||
}
|
||||
self.lock.lock().unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
match self.state.swap(EMPTY, SeqCst) {
|
||||
NOTIFIED => true, // got a notification
|
||||
PARKED | POLLING => false, // no notification
|
||||
n => panic!("inconsistent park_timeout state: {}", n),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unpark(&self) {
|
||||
// To ensure the unparked thread will observe any writes we made before this call, we must
|
||||
// perform a release operation that `park` can synchronize with. To do that we must write
|
||||
// `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
|
||||
// than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
|
||||
let state = match self.state.swap(NOTIFIED, SeqCst) {
|
||||
EMPTY => return, // no one was waiting
|
||||
NOTIFIED => return, // already unparked
|
||||
state => state, // gotta go wake someone up
|
||||
};
|
||||
|
||||
// There is a period between when the parked thread sets `state` to `PARKED` (or last
|
||||
// checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
|
||||
// If we were to notify during this period it would be ignored and then when the parked
|
||||
// thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
|
||||
// stage so we can acquire `lock` to wait until it is ready to receive the notification.
|
||||
//
|
||||
// Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
|
||||
// it doesn't get woken only to have to wait for us to release `lock`.
|
||||
drop(self.lock.lock().unwrap());
|
||||
|
||||
if state == PARKED {
|
||||
self.cvar.notify_one();
|
||||
} else {
|
||||
EVENT.notify();
|
||||
}
|
||||
}
|
||||
}
|
147
src/reactor.rs
147
src/reactor.rs
|
@ -30,7 +30,7 @@ use std::sync::Arc;
|
|||
use std::task::{Poll, Waker};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use crossbeam_queue::ArrayQueue;
|
||||
use concurrent_queue::ConcurrentQueue;
|
||||
use futures_util::future;
|
||||
use once_cell::sync::Lazy;
|
||||
use slab::Slab;
|
||||
|
@ -56,7 +56,7 @@ pub(crate) struct Reactor {
|
|||
sources: piper::Mutex<Slab<Arc<Source>>>,
|
||||
|
||||
/// Temporary storage for I/O events when polling the reactor.
|
||||
events: piper::Lock<sys::Events>,
|
||||
events: piper::Mutex<sys::Events>,
|
||||
|
||||
/// An ordered map of registered timers.
|
||||
///
|
||||
|
@ -69,7 +69,7 @@ pub(crate) struct Reactor {
|
|||
///
|
||||
/// When inserting or removing a timer, we don't process it immediately - we just push it into
|
||||
/// this queue. Timers actually get processed when the queue fills up or the reactor is polled.
|
||||
timer_ops: ArrayQueue<TimerOp>,
|
||||
timer_ops: ConcurrentQueue<TimerOp>,
|
||||
|
||||
/// An I/O event that is triggered when a new timer is registered.
|
||||
///
|
||||
|
@ -84,9 +84,9 @@ impl Reactor {
|
|||
static REACTOR: Lazy<Reactor> = Lazy::new(|| Reactor {
|
||||
sys: sys::Reactor::new().expect("cannot initialize I/O event notification"),
|
||||
sources: piper::Mutex::new(Slab::new()),
|
||||
events: piper::Lock::new(sys::Events::new()),
|
||||
events: piper::Mutex::new(sys::Events::new()),
|
||||
timers: piper::Mutex::new(BTreeMap::new()),
|
||||
timer_ops: ArrayQueue::new(1000),
|
||||
timer_ops: ConcurrentQueue::bounded(1000),
|
||||
timer_event: Lazy::new(|| IoEvent::new().expect("cannot create an `IoEvent`")),
|
||||
});
|
||||
&REACTOR
|
||||
|
@ -177,13 +177,6 @@ impl Reactor {
|
|||
})
|
||||
}
|
||||
|
||||
/// Locks the reactor.
|
||||
pub async fn lock(&self) -> ReactorLock<'_> {
|
||||
let reactor = self;
|
||||
let events = self.events.lock().await;
|
||||
ReactorLock { reactor, events }
|
||||
}
|
||||
|
||||
/// Fires ready timers.
|
||||
///
|
||||
/// Returns the duration until the next timer before this method was called.
|
||||
|
@ -195,7 +188,7 @@ impl Reactor {
|
|||
|
||||
// Process timer operations, but no more than the queue capacity because otherwise we could
|
||||
// keep popping operations forever.
|
||||
for _ in 0..self.timer_ops.capacity() {
|
||||
for _ in 0..self.timer_ops.capacity().unwrap() {
|
||||
match self.timer_ops.pop() {
|
||||
Ok(TimerOp::Insert(when, id, waker)) => {
|
||||
timers.insert((when, id), waker);
|
||||
|
@ -240,92 +233,82 @@ impl Reactor {
|
|||
/// A lock on the reactor.
|
||||
pub(crate) struct ReactorLock<'a> {
|
||||
reactor: &'a Reactor,
|
||||
events: piper::LockGuard<sys::Events>,
|
||||
events: piper::MutexGuard<'a, sys::Events>,
|
||||
}
|
||||
|
||||
impl ReactorLock<'_> {
|
||||
/// Processes ready events without blocking.
|
||||
pub fn poll(&mut self) -> io::Result<()> {
|
||||
self.react(false)
|
||||
}
|
||||
|
||||
/// Blocks until at least one event is processed.
|
||||
pub fn wait(&mut self) -> io::Result<()> {
|
||||
self.react(true)
|
||||
}
|
||||
|
||||
/// Processes new events, optionally blocking until the first event.
|
||||
fn react(&mut self, block: bool) -> io::Result<()> {
|
||||
// Fire timers and compute the timeout for blocking on I/O events.
|
||||
/// Processes new events, blocking until the first event or the timeout.
|
||||
pub fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
|
||||
// Fire timers.
|
||||
let next_timer = self.reactor.fire_timers();
|
||||
let timeout = if block {
|
||||
next_timer
|
||||
} else {
|
||||
Some(Duration::from_secs(0))
|
||||
|
||||
// compute the timeout for blocking on I/O events.
|
||||
let timeout = match (next_timer, timeout) {
|
||||
(None, None) => None,
|
||||
(Some(t), None) | (None, Some(t)) => Some(t),
|
||||
(Some(a), Some(b)) => Some(a.min(b)),
|
||||
};
|
||||
|
||||
loop {
|
||||
// Block on I/O events.
|
||||
match self.reactor.sys.wait(&mut self.events, timeout) {
|
||||
// The timeout was hit so fire ready timers.
|
||||
Ok(0) => {
|
||||
self.reactor.fire_timers();
|
||||
return Ok(());
|
||||
}
|
||||
// Block on I/O events.
|
||||
match self.reactor.sys.wait(&mut self.events, timeout) {
|
||||
// The timeout was hit so fire ready timers.
|
||||
Ok(0) => {
|
||||
self.reactor.fire_timers();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// At least one I/O event occured.
|
||||
Ok(_) => {
|
||||
// Iterate over sources in the event list.
|
||||
let sources = self.reactor.sources.lock();
|
||||
let mut ready = Vec::new();
|
||||
// At least one I/O event occured.
|
||||
Ok(_) => {
|
||||
// Iterate over sources in the event list.
|
||||
let sources = self.reactor.sources.lock();
|
||||
let mut ready = Vec::new();
|
||||
|
||||
for ev in self.events.iter() {
|
||||
// Check if there is a source in the table with this key.
|
||||
if let Some(source) = sources.get(ev.key) {
|
||||
let mut wakers = source.wakers.lock();
|
||||
for ev in self.events.iter() {
|
||||
// Check if there is a source in the table with this key.
|
||||
if let Some(source) = sources.get(ev.key) {
|
||||
let mut wakers = source.wakers.lock();
|
||||
|
||||
// Wake readers if a readability event was emitted.
|
||||
if ev.readable {
|
||||
ready.append(&mut wakers.readers);
|
||||
}
|
||||
// Wake readers if a readability event was emitted.
|
||||
if ev.readable {
|
||||
ready.append(&mut wakers.readers);
|
||||
}
|
||||
|
||||
// Wake writers if a writability event was emitted.
|
||||
if ev.writable {
|
||||
ready.append(&mut wakers.writers);
|
||||
}
|
||||
// Wake writers if a writability event was emitted.
|
||||
if ev.writable {
|
||||
ready.append(&mut wakers.writers);
|
||||
}
|
||||
|
||||
// Re-register if there are still writers or
|
||||
// readers. The can happen if e.g. we were
|
||||
// previously interested in both readability and
|
||||
// writability, but only one of them was emitted.
|
||||
if !(wakers.writers.is_empty() && wakers.readers.is_empty()) {
|
||||
self.reactor.sys.reregister(
|
||||
source.raw,
|
||||
source.key,
|
||||
!wakers.readers.is_empty(),
|
||||
!wakers.writers.is_empty(),
|
||||
)?;
|
||||
}
|
||||
// Re-register if there are still writers or
|
||||
// readers. The can happen if e.g. we were
|
||||
// previously interested in both readability and
|
||||
// writability, but only one of them was emitted.
|
||||
if !(wakers.writers.is_empty() && wakers.readers.is_empty()) {
|
||||
self.reactor.sys.reregister(
|
||||
source.raw,
|
||||
source.key,
|
||||
!wakers.readers.is_empty(),
|
||||
!wakers.writers.is_empty(),
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
// Drop the lock before waking.
|
||||
drop(sources);
|
||||
|
||||
// Wake up tasks waiting on I/O.
|
||||
for waker in ready {
|
||||
waker.wake();
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// The syscall was interrupted.
|
||||
Err(err) if err.kind() == io::ErrorKind::Interrupted => continue,
|
||||
// Drop the lock before waking.
|
||||
drop(sources);
|
||||
|
||||
// An actual error occureed.
|
||||
Err(err) => return Err(err),
|
||||
// Wake up tasks waiting on I/O.
|
||||
for waker in ready {
|
||||
waker.wake();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// The syscall was interrupted.
|
||||
Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
|
||||
|
||||
// An actual error occureed.
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
164
src/run.rs
164
src/run.rs
|
@ -4,17 +4,23 @@
|
|||
|
||||
use std::future::Future;
|
||||
use std::task::{Context, Poll};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures_util::future::{self, Either};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::block_on;
|
||||
use crate::context;
|
||||
use crate::io_event::IoEvent;
|
||||
use crate::reactor::{Reactor, ReactorLock};
|
||||
use crate::thread_local::ThreadLocalExecutor;
|
||||
use crate::executor::{Queue, Worker};
|
||||
use crate::parking::Parker;
|
||||
use crate::throttle;
|
||||
use crate::work_stealing::WorkStealingExecutor;
|
||||
use scoped_tls::scoped_thread_local;
|
||||
|
||||
/// The global task queue.
|
||||
pub(crate) static QUEUE: Lazy<Queue> = Lazy::new(|| Queue::new());
|
||||
|
||||
scoped_thread_local! {
|
||||
/// Thread-local worker queue.
|
||||
pub(crate) static WORKER: Worker
|
||||
}
|
||||
|
||||
/// Runs executors and polls the reactor.
|
||||
///
|
||||
|
@ -95,134 +101,36 @@ use crate::work_stealing::WorkStealingExecutor;
|
|||
/// }
|
||||
/// ```
|
||||
pub fn run<T>(future: impl Future<Output = T>) -> T {
|
||||
// Create a thread-local executor and a worker in the work-stealing executor.
|
||||
let local = ThreadLocalExecutor::new();
|
||||
let ws_executor = WorkStealingExecutor::get();
|
||||
let worker = ws_executor.worker();
|
||||
let reactor = Reactor::get();
|
||||
let parker = Parker::new();
|
||||
|
||||
let unparker = parker.unparker();
|
||||
let worker = QUEUE.worker(move || unparker.unpark());
|
||||
|
||||
// Create a waker that triggers an I/O event in the thread-local scheduler.
|
||||
let ev = local.event().clone();
|
||||
let waker = async_task::waker_fn(move || ev.notify());
|
||||
let unparker = parker.unparker();
|
||||
let waker = async_task::waker_fn(move || unparker.unpark());
|
||||
let cx = &mut Context::from_waker(&waker);
|
||||
futures_util::pin_mut!(future);
|
||||
|
||||
// Set up tokio (if enabled) and the thread-locals before execution begins.
|
||||
let enter = context::enter;
|
||||
let enter = |f| local.enter(|| enter(f));
|
||||
let enter = |f| worker.enter(|| enter(f));
|
||||
// Set up tokio if enabled.
|
||||
context::enter(|| {
|
||||
WORKER.set(&worker, || {
|
||||
'start: loop {
|
||||
// Poll the main future.
|
||||
if let Poll::Ready(val) = throttle::setup(|| future.as_mut().poll(cx)) {
|
||||
return val;
|
||||
}
|
||||
|
||||
enter(|| {
|
||||
// A list of I/O events that indicate there is work to do.
|
||||
let io_events = [local.event(), ws_executor.event()];
|
||||
for _ in 0..200 {
|
||||
if !worker.tick() {
|
||||
parker.park();
|
||||
continue 'start;
|
||||
}
|
||||
}
|
||||
|
||||
// Number of times this thread has yielded because it didn't find any work.
|
||||
let mut yields = 0;
|
||||
|
||||
// We run four components at the same time, treating them all fairly and making sure none
|
||||
// of them get starved:
|
||||
//
|
||||
// 1. `future` - the main future.
|
||||
// 2. `local - the thread-local executor.
|
||||
// 3. `worker` - the work-stealing executor.
|
||||
// 4. `reactor` - the reactor.
|
||||
//
|
||||
// When all four components are out of work, we block the current thread on
|
||||
// epoll/kevent/wepoll. If new work comes in that isn't naturally triggered by an I/O event
|
||||
// registered with `Async` handles, we use `IoEvent`s to simulate an I/O event that will
|
||||
// unblock the thread:
|
||||
//
|
||||
// - When the main future is woken, `local.event()` is triggered.
|
||||
// - When thread-local executor gets new work, `local.event()` is triggered.
|
||||
// - When work-stealing executor gets new work, `ws_executor.event()` is triggered.
|
||||
// - When a new earliest timer is registered, `reactor.event()` is triggered.
|
||||
//
|
||||
// This way we make sure that if any changes happen that might give us new work will
|
||||
// unblock epoll/kevent/wepoll and let us continue the loop.
|
||||
loop {
|
||||
// 1. Poll the main future.
|
||||
if let Poll::Ready(val) = throttle::setup(|| future.as_mut().poll(cx)) {
|
||||
return val;
|
||||
// Process ready I/O events without blocking.
|
||||
parker.park_timeout(Duration::from_secs(0));
|
||||
}
|
||||
|
||||
// 2. Run a batch of tasks in the thread-local executor.
|
||||
let more_local = local.execute();
|
||||
// 3. Run a batch of tasks in the work-stealing executor.
|
||||
let more_worker = worker.execute();
|
||||
|
||||
// 4. Poll the reactor.
|
||||
if let Some(reactor_lock) = reactor.try_lock() {
|
||||
yields = 0;
|
||||
react(reactor_lock, &io_events, more_local || more_worker);
|
||||
continue;
|
||||
}
|
||||
|
||||
// If there is more work in the thread-local or the work-stealing executor, continue.
|
||||
if more_local || more_worker {
|
||||
yields = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Yield a few times if no work is found.
|
||||
yields += 1;
|
||||
if yields <= 2 {
|
||||
thread::yield_now();
|
||||
continue;
|
||||
}
|
||||
|
||||
// If still no work is found, stop yielding and block the thread.
|
||||
yields = 0;
|
||||
|
||||
// Prepare for blocking until the reactor is locked or `local.event()` is triggered.
|
||||
//
|
||||
// Note that there is no need to wait for `ws_executor.event()`. If we lock the reactor
|
||||
// immediately, we'll check for the I/O event right after that anyway.
|
||||
//
|
||||
// If some other worker is holding the reactor locked, it will unlock it as soon as the
|
||||
// I/O event is triggered. Then, another worker will be allowed to lock the reactor,
|
||||
// and will unlock it if there is more work to do because every worker triggers the I/O
|
||||
// event whenever it finds a runnable task.
|
||||
let lock = reactor.lock();
|
||||
let notified = local.event().notified();
|
||||
futures_util::pin_mut!(lock);
|
||||
futures_util::pin_mut!(notified);
|
||||
|
||||
// Block until either the reactor is locked or `local.event()` is triggered.
|
||||
if let Either::Left((reactor_lock, _)) = block_on(future::select(lock, notified)) {
|
||||
react(reactor_lock, &io_events, false);
|
||||
} else {
|
||||
// Clear `local.event()` because it was triggered.
|
||||
local.event().clear();
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/// Polls or waits on the locked reactor.
|
||||
///
|
||||
/// If any of the I/O events are ready or there are more tasks to run, the reactor is polled.
|
||||
/// Otherwise, the current thread waits on it until a timer fires or an I/O event occurs.
|
||||
///
|
||||
/// I/O events are cleared at the end of this function.
|
||||
fn react(mut reactor_lock: ReactorLock<'_>, io_events: &[&IoEvent], mut more_tasks: bool) {
|
||||
// Clear all I/O events and check if any of them were triggered.
|
||||
for ev in io_events {
|
||||
if ev.clear() {
|
||||
more_tasks = true;
|
||||
}
|
||||
}
|
||||
|
||||
if more_tasks {
|
||||
// If there might be more tasks to run, just poll without blocking.
|
||||
reactor_lock.poll().expect("failure while polling I/O");
|
||||
} else {
|
||||
// Otherwise, block until the first I/O event or a timer.
|
||||
reactor_lock.wait().expect("failure while waiting on I/O");
|
||||
|
||||
// Clear all I/O events before dropping the lock. This is not really necessary, but
|
||||
// clearing flags here might prevent a redundant wakeup in the future.
|
||||
for ev in io_events {
|
||||
ev.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
21
src/task.rs
21
src/task.rs
|
@ -8,8 +8,7 @@ use std::pin::Pin;
|
|||
use std::task::{Context, Poll};
|
||||
|
||||
use crate::blocking::BlockingExecutor;
|
||||
use crate::thread_local::ThreadLocalExecutor;
|
||||
use crate::work_stealing::WorkStealingExecutor;
|
||||
use crate::run::{QUEUE, WORKER};
|
||||
|
||||
/// A runnable future, ready for execution.
|
||||
///
|
||||
|
@ -54,7 +53,7 @@ pub(crate) type Runnable = async_task::Task<()>;
|
|||
/// # });
|
||||
/// ```
|
||||
///
|
||||
/// [`run()`]: crate::run()
|
||||
/// [`run()`]: `crate::run()`
|
||||
#[must_use = "tasks get canceled when dropped, use `.detach()` to run them in the background"]
|
||||
#[derive(Debug)]
|
||||
pub struct Task<T>(pub(crate) Option<async_task::JoinHandle<T, ()>>);
|
||||
|
@ -75,9 +74,13 @@ impl<T: 'static> Task<T> {
|
|||
/// # })
|
||||
/// ```
|
||||
///
|
||||
/// [`run()`]: crate::run()
|
||||
/// [`run()`]: `crate::run()`
|
||||
pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
|
||||
ThreadLocalExecutor::spawn(future)
|
||||
if WORKER.is_set() {
|
||||
WORKER.with(|w| w.spawn_local(future))
|
||||
} else {
|
||||
panic!("cannot spawn a thread-local task if not inside an executor")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -97,9 +100,13 @@ impl<T: Send + 'static> Task<T> {
|
|||
/// # });
|
||||
/// ```
|
||||
///
|
||||
/// [`run()`]: crate::run()
|
||||
/// [`run()`]: `crate::run()`
|
||||
pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
|
||||
WorkStealingExecutor::get().spawn(future)
|
||||
QUEUE.spawn(future)
|
||||
// WORKER.with(|w| match &*w.borrow() {
|
||||
// None => QUEUE.spawn(future),
|
||||
// Some(w) => w.spawn(future),
|
||||
// })
|
||||
}
|
||||
|
||||
/// Spawns a future onto the blocking executor.
|
||||
|
|
|
@ -1,160 +0,0 @@
|
|||
//! The thread-local executor.
|
||||
//!
|
||||
//! Tasks created by [`Task::local()`] go into this executor. Every thread calling
|
||||
//! [`run()`][`crate::run()`] creates a thread-local executor. Tasks cannot be spawned onto a
|
||||
//! thread-local executor if it is not running.
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::collections::VecDeque;
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::thread::{self, ThreadId};
|
||||
|
||||
use crossbeam_queue::SegQueue;
|
||||
use scoped_tls_hkt::scoped_thread_local;
|
||||
|
||||
use crate::io_event::IoEvent;
|
||||
use crate::task::{Runnable, Task};
|
||||
use crate::throttle;
|
||||
|
||||
scoped_thread_local! {
|
||||
/// The thread-local executor.
|
||||
///
|
||||
/// This thread-local is only set while inside [`ThreadLocalExecutor::enter()`].
|
||||
static EXECUTOR: ThreadLocalExecutor
|
||||
}
|
||||
|
||||
/// An executor for thread-local tasks.
|
||||
///
|
||||
/// Thread-local tasks are spawned by calling [`Task::local()`] and their futures do not have to
|
||||
/// implement [`Send`]. They can only be run by the same thread that created them.
|
||||
pub(crate) struct ThreadLocalExecutor {
|
||||
/// The main task queue.
|
||||
queue: RefCell<VecDeque<Runnable>>,
|
||||
|
||||
/// When another thread wakes a task belonging to this executor, it goes into this queue.
|
||||
injector: Arc<SegQueue<Runnable>>,
|
||||
|
||||
/// An I/O event that is triggered when another thread wakes a task belonging to this executor.
|
||||
event: IoEvent,
|
||||
}
|
||||
|
||||
impl ThreadLocalExecutor {
|
||||
/// Creates a new thread-local executor.
|
||||
pub fn new() -> ThreadLocalExecutor {
|
||||
ThreadLocalExecutor {
|
||||
queue: RefCell::new(VecDeque::new()),
|
||||
injector: Arc::new(SegQueue::new()),
|
||||
event: IoEvent::new().expect("cannot create an `IoEvent`"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Enters the context of this executor.
|
||||
pub fn enter<T>(&self, f: impl FnOnce() -> T) -> T {
|
||||
if EXECUTOR.is_set() {
|
||||
panic!("cannot run an executor inside another executor");
|
||||
}
|
||||
EXECUTOR.set(self, f)
|
||||
}
|
||||
|
||||
/// Returns the event indicating there is a scheduled task.
|
||||
pub fn event(&self) -> &IoEvent {
|
||||
&self.event
|
||||
}
|
||||
|
||||
/// Spawns a future onto this executor.
|
||||
///
|
||||
/// Returns a [`Task`] handle for the spawned task.
|
||||
pub fn spawn<T: 'static>(future: impl Future<Output = T> + 'static) -> Task<T> {
|
||||
if !EXECUTOR.is_set() {
|
||||
panic!("cannot spawn a thread-local task if not inside an executor");
|
||||
}
|
||||
|
||||
EXECUTOR.with(|ex| {
|
||||
// Why weak reference here? Injector may hold the task while the task's waker holds a
|
||||
// reference to the injector. So this reference must be weak to break the cycle.
|
||||
let injector = Arc::downgrade(&ex.injector);
|
||||
let event = ex.event.clone();
|
||||
let id = thread_id();
|
||||
|
||||
// The function that schedules a runnable task when it gets woken up.
|
||||
let schedule = move |runnable| {
|
||||
if thread_id() == id {
|
||||
// If scheduling from the original thread, push into the main queue.
|
||||
EXECUTOR.with(|ex| ex.queue.borrow_mut().push_back(runnable));
|
||||
} else if let Some(injector) = injector.upgrade() {
|
||||
// If scheduling from a different thread, push into the injector queue.
|
||||
injector.push(runnable);
|
||||
}
|
||||
|
||||
// Trigger an I/O event to let the original thread know that a task has been
|
||||
// scheduled. If that thread is inside epoll/kqueue/wepoll, an I/O event will wake
|
||||
// it up.
|
||||
event.notify();
|
||||
};
|
||||
|
||||
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
|
||||
let (runnable, handle) = async_task::spawn_local(future, schedule, ());
|
||||
runnable.schedule();
|
||||
Task(Some(handle))
|
||||
})
|
||||
}
|
||||
|
||||
/// Executes a batch of tasks and returns `true` if there may be more tasks to run.
|
||||
pub fn execute(&self) -> bool {
|
||||
// Execute 4 series of 50 tasks.
|
||||
for _ in 0..4 {
|
||||
for _ in 0..50 {
|
||||
// Find the next task to run.
|
||||
match self.search() {
|
||||
None => {
|
||||
// There are no more tasks to run.
|
||||
return false;
|
||||
}
|
||||
Some(r) => {
|
||||
// Run the task.
|
||||
throttle::setup(|| r.run());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Drain the injector queue occasionally for fair scheduling.
|
||||
self.fetch();
|
||||
}
|
||||
|
||||
// There are likely more tasks to run.
|
||||
true
|
||||
}
|
||||
|
||||
/// Finds the next task to run.
|
||||
fn search(&self) -> Option<Runnable> {
|
||||
// Check if there is a task in the main queue.
|
||||
if let Some(r) = self.queue.borrow_mut().pop_front() {
|
||||
return Some(r);
|
||||
}
|
||||
|
||||
// If not, fetch tasks from the injector queue.
|
||||
self.fetch();
|
||||
|
||||
// Check the main queue again.
|
||||
self.queue.borrow_mut().pop_front()
|
||||
}
|
||||
|
||||
/// Moves all tasks from the injector queue into the main queue.
|
||||
fn fetch(&self) {
|
||||
let mut queue = self.queue.borrow_mut();
|
||||
while let Ok(r) = self.injector.pop() {
|
||||
queue.push_back(r);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Same as `std::thread::current().id()`, but more efficient.
|
||||
fn thread_id() -> ThreadId {
|
||||
thread_local! {
|
||||
static ID: ThreadId = thread::current().id();
|
||||
}
|
||||
|
||||
ID.try_with(|id| *id)
|
||||
.unwrap_or_else(|_| thread::current().id())
|
||||
}
|
|
@ -7,7 +7,7 @@
|
|||
use std::cell::Cell;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use scoped_tls_hkt::scoped_thread_local;
|
||||
use scoped_tls::scoped_thread_local;
|
||||
|
||||
scoped_thread_local! {
|
||||
/// Number of times the current task is allowed to poll I/O operations.
|
||||
|
|
|
@ -1,318 +0,0 @@
|
|||
//! The work-stealing executor.
|
||||
//!
|
||||
//! Tasks created by [`Task::spawn()`] go into this executor. Every thread calling [`run()`]
|
||||
//! initializes a [`Worker`] that participates in work stealing, which is allowed to run any task in
|
||||
//! this executor or in other workers. Since tasks can be stolen by any worker and thus move from
|
||||
//! thread to thread, their futures must implement [`Send`].
|
||||
//!
|
||||
//! There is only one global instance of this type, accessible by [`WorkStealingExecutor::get()`].
|
||||
//!
|
||||
//! [Work stealing] is a strategy that reduces contention in multi-threaded environments. If all
|
||||
//! invocations of [`run()`] used the same global task queue all the time, they would contend on
|
||||
//! the queue all the time, thus slowing the executor down.
|
||||
//!
|
||||
//! The solution is to have a separate queue for each invocation of [`run()`], called a "worker".
|
||||
//! Each thread is primarily using its own worker. Once all tasks in the worker are exhausted, then
|
||||
//! we look for tasks in the global queue, called "injector", or steal tasks from other workers.
|
||||
//!
|
||||
//! [`run()`]: crate::run()
|
||||
//! [Work stealing]: https://en.wikipedia.org/wiki/Work_stealing
|
||||
|
||||
use std::cell::Cell;
|
||||
use std::future::Future;
|
||||
use std::num::Wrapping;
|
||||
use std::panic;
|
||||
|
||||
use crossbeam_deque as deque;
|
||||
use crossbeam_utils::sync::ShardedLock;
|
||||
use once_cell::sync::Lazy;
|
||||
use scoped_tls_hkt::scoped_thread_local;
|
||||
use slab::Slab;
|
||||
|
||||
use crate::io_event::IoEvent;
|
||||
use crate::task::{Runnable, Task};
|
||||
use crate::throttle;
|
||||
|
||||
scoped_thread_local! {
|
||||
/// The current thread's worker.
|
||||
///
|
||||
/// Other threads may steal tasks from this worker through its associated stealer that was
|
||||
/// registered in the work-stealing executor.
|
||||
///
|
||||
/// This thread-local is only set while inside [`Worker::enter()`].
|
||||
static WORKER: for<'a> &'a Worker<'a>
|
||||
}
|
||||
|
||||
/// The global work-stealing executor.
|
||||
pub(crate) struct WorkStealingExecutor {
|
||||
/// When a thread that is not inside [`run()`][`crate::run()`] spawns or wakes a task, it goes
|
||||
/// into this queue.
|
||||
injector: deque::Injector<Runnable>,
|
||||
|
||||
/// Registered handles for stealing tasks from workers.
|
||||
stealers: ShardedLock<Slab<deque::Stealer<Runnable>>>,
|
||||
|
||||
/// An I/O event that is triggered whenever there might be available tasks to run.
|
||||
event: IoEvent,
|
||||
}
|
||||
|
||||
impl WorkStealingExecutor {
|
||||
/// Returns a reference to the global work-stealing executor.
|
||||
pub fn get() -> &'static WorkStealingExecutor {
|
||||
static EXECUTOR: Lazy<WorkStealingExecutor> = Lazy::new(|| WorkStealingExecutor {
|
||||
injector: deque::Injector::new(),
|
||||
stealers: ShardedLock::new(Slab::new()),
|
||||
event: IoEvent::new().expect("cannot create an `IoEvent`"),
|
||||
});
|
||||
&EXECUTOR
|
||||
}
|
||||
|
||||
/// Returns the event indicating there is a scheduled task.
|
||||
pub fn event(&self) -> &IoEvent {
|
||||
&self.event
|
||||
}
|
||||
|
||||
/// Spawns a future onto this executor.
|
||||
///
|
||||
/// Returns a [`Task`] handle for the spawned task.
|
||||
pub fn spawn<T: Send + 'static>(
|
||||
&'static self,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> Task<T> {
|
||||
// The function that schedules a runnable task when it gets woken up.
|
||||
let schedule = move |runnable| {
|
||||
if WORKER.is_set() {
|
||||
// If scheduling from a worker thread, push into the worker's queue.
|
||||
WORKER.with(|w| w.push(runnable));
|
||||
} else {
|
||||
// If scheduling from a non-worker thread, push into the injector queue.
|
||||
self.injector.push(runnable);
|
||||
|
||||
// Notify workers that there is a task in the injector queue.
|
||||
self.event.notify();
|
||||
}
|
||||
};
|
||||
|
||||
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
|
||||
let (runnable, handle) = async_task::spawn(future, schedule, ());
|
||||
runnable.schedule();
|
||||
Task(Some(handle))
|
||||
}
|
||||
|
||||
/// Registers a new worker.
|
||||
///
|
||||
/// The worker will automatically deregister itself when dropped.
|
||||
pub fn worker(&self) -> Worker<'_> {
|
||||
let mut stealers = self.stealers.write().unwrap();
|
||||
let vacant = stealers.vacant_entry();
|
||||
|
||||
// Create a worker and put its stealer handle into the executor.
|
||||
let worker = Worker {
|
||||
key: vacant.key(),
|
||||
slot: Cell::new(None),
|
||||
queue: deque::Worker::new_fifo(),
|
||||
executor: self,
|
||||
};
|
||||
vacant.insert(worker.queue.stealer());
|
||||
|
||||
worker
|
||||
}
|
||||
}
|
||||
|
||||
/// A worker that participates in the work-stealing executor.
|
||||
///
|
||||
/// Each invocation of `run()` creates its own worker.
|
||||
pub(crate) struct Worker<'a> {
|
||||
/// The ID of this worker obtained during registration.
|
||||
key: usize,
|
||||
|
||||
/// A slot into which tasks go before entering the actual queue.
|
||||
///
|
||||
/// Note that other workers cannot steal this task.
|
||||
slot: Cell<Option<Runnable>>,
|
||||
|
||||
/// A queue of tasks.
|
||||
///
|
||||
/// Other workers are able to steal tasks from this queue.
|
||||
queue: deque::Worker<Runnable>,
|
||||
|
||||
/// The parent work-stealing executor.
|
||||
executor: &'a WorkStealingExecutor,
|
||||
}
|
||||
|
||||
impl Worker<'_> {
|
||||
/// Enters the context of this executor.
|
||||
pub fn enter<T>(&self, f: impl FnOnce() -> T) -> T {
|
||||
if WORKER.is_set() {
|
||||
panic!("cannot run an executor inside another executor");
|
||||
}
|
||||
WORKER.set(self, f)
|
||||
}
|
||||
|
||||
/// Executes a batch of tasks and returns `true` if there may be more tasks to run.
|
||||
pub fn execute(&self) -> bool {
|
||||
// Execute 4 series of 50 tasks.
|
||||
for _ in 0..4 {
|
||||
for _ in 0..50 {
|
||||
// Find the next task to run.
|
||||
match self.search() {
|
||||
None => {
|
||||
// There are no more tasks to run.
|
||||
return false;
|
||||
}
|
||||
Some(r) => {
|
||||
// Notify other workers that there may be stealable tasks.
|
||||
//
|
||||
// Instead of notifying when we find a task, we could notify when we push a
|
||||
// task into the local queue - either strategy works.
|
||||
//
|
||||
// Notifying when we find a task is somewhat simpler because then we don't
|
||||
// need to worry about `search()` re-shuffling tasks between queues, which
|
||||
// races with other workers searching for tasks. Other workers might not
|
||||
// find a task while there is one! Notifying here avoids this problem.
|
||||
self.executor.event.notify();
|
||||
|
||||
// Run the task.
|
||||
if throttle::setup(|| r.run()) {
|
||||
// The task was woken while it was running, which means it got
|
||||
// scheduled the moment running completed. Therefore, it is now inside
|
||||
// the slot and would be the next task to run.
|
||||
//
|
||||
// Instead of re-running the task in the next iteration, let's flush
|
||||
// the slot in order to give other tasks a chance to run.
|
||||
//
|
||||
// This is a necessary step to ensure task yielding works as expected.
|
||||
// If a task wakes itself and returns `Poll::Pending`, we don't want it
|
||||
// to run immediately after that because that'd defeat the whole
|
||||
// purpose of yielding.
|
||||
self.flush_slot();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush the slot occasionally for fair scheduling.
|
||||
//
|
||||
// It is possible for two tasks to be exchanging messages between each other forever so
|
||||
// that every time one of them runs, it wakes the other one and puts it into the slot.
|
||||
// Flushing the slot prevena them from hogging the executor.
|
||||
self.flush_slot();
|
||||
|
||||
// Steal some tasks from the injector queue.
|
||||
//
|
||||
// If the executor always has tasks in the local queue, it might never get to run tasks
|
||||
// in the injector queue. To prevent them from starvation, we must move them into the
|
||||
// local queue every now and then.
|
||||
if let Some(r) = self.steal_global() {
|
||||
self.push(r);
|
||||
}
|
||||
}
|
||||
|
||||
// There are likely more tasks to run.
|
||||
true
|
||||
}
|
||||
|
||||
/// Pushes a task into this worker.
|
||||
fn push(&self, runnable: Runnable) {
|
||||
// Put the task into the slot.
|
||||
if let Some(r) = self.slot.replace(Some(runnable)) {
|
||||
// If the slot had a task, push it into the queue.
|
||||
self.queue.push(r);
|
||||
}
|
||||
}
|
||||
|
||||
/// Moves a task from the slot into the local queue.
|
||||
fn flush_slot(&self) {
|
||||
if let Some(r) = self.slot.take() {
|
||||
self.queue.push(r);
|
||||
}
|
||||
}
|
||||
|
||||
/// Finds the next task to run.
|
||||
fn search(&self) -> Option<Runnable> {
|
||||
// Check if there is a task in the slot or in the queue.
|
||||
if let Some(r) = self.slot.take().or_else(|| self.queue.pop()) {
|
||||
return Some(r);
|
||||
}
|
||||
|
||||
// Try stealing from the injector queue.
|
||||
if let Some(r) = self.steal_global() {
|
||||
return Some(r);
|
||||
}
|
||||
|
||||
// Try stealing from other workers.
|
||||
let stealers = self.executor.stealers.read().unwrap();
|
||||
retry_steal(|| {
|
||||
// Pick a random starting point in the iterator list and rotate the list.
|
||||
let n = stealers.len();
|
||||
let start = fast_random(n);
|
||||
let iter = stealers.iter().chain(stealers.iter()).skip(start).take(n);
|
||||
|
||||
// Remove this worker's stealer handle.
|
||||
let iter = iter.filter(|(k, _)| *k != self.key);
|
||||
|
||||
// Try stealing from each worker in the list. Collecting stops as soon as we get a
|
||||
// `Steal::Success`. Otherwise, if any steal attempt resulted in a `Steal::Retry`,
|
||||
// that's the collected result and we'll retry from the beginning.
|
||||
iter.map(|(_, s)| s.steal_batch_and_pop(&self.queue))
|
||||
.collect()
|
||||
})
|
||||
}
|
||||
|
||||
/// Steals tasks from the injector queue.
|
||||
fn steal_global(&self) -> Option<Runnable> {
|
||||
retry_steal(|| self.executor.injector.steal_batch_and_pop(&self.queue))
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Worker<'_> {
|
||||
fn drop(&mut self) {
|
||||
// Unregister the worker.
|
||||
self.executor.stealers.write().unwrap().remove(self.key);
|
||||
|
||||
// Move the task in the slot into the injector queue.
|
||||
if let Some(r) = self.slot.take() {
|
||||
r.schedule();
|
||||
}
|
||||
|
||||
// Move all tasks in this worker's queue into the injector queue.
|
||||
while let Some(r) = self.queue.pop() {
|
||||
r.schedule();
|
||||
}
|
||||
|
||||
// This task will not search for tasks anymore and therefore won't notify other workers if
|
||||
// new tasks are found. Notify another worker to start searching right away.
|
||||
self.executor.event.notify();
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a random number in the interval `0..n`.
|
||||
fn fast_random(n: usize) -> usize {
|
||||
thread_local! {
|
||||
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1));
|
||||
}
|
||||
|
||||
RNG.with(|rng| {
|
||||
// This is the 32-bit variant of Xorshift: https://en.wikipedia.org/wiki/Xorshift
|
||||
let mut x = rng.get();
|
||||
x ^= x << 13;
|
||||
x ^= x >> 17;
|
||||
x ^= x << 5;
|
||||
rng.set(x);
|
||||
|
||||
// This is a fast alternative to `x % n`:
|
||||
// https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction/
|
||||
((x.0 as u64).wrapping_mul(n as u64) >> 32) as usize
|
||||
})
|
||||
}
|
||||
|
||||
/// Retries a steal operation for as long as it returns `Steal::Retry`.
|
||||
fn retry_steal<T>(mut steal_op: impl FnMut() -> deque::Steal<T>) -> Option<T> {
|
||||
loop {
|
||||
match steal_op() {
|
||||
deque::Steal::Success(t) => return Some(t),
|
||||
deque::Steal::Empty => return None,
|
||||
deque::Steal::Retry => {}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue