mirror of https://github.com/stjepang/smol
Refactor the executor
This commit is contained in:
parent
3e78e59f52
commit
01dd5dccc9
|
@ -33,7 +33,7 @@ futures-io = { version = "0.3.5", default-features = false, features = ["std"] }
|
|||
futures-util = { version = "0.3.5", default-features = false, features = ["std", "io"] }
|
||||
once_cell = "1.3.1"
|
||||
piper = "0.1.2"
|
||||
scoped-tls-hkt = "0.1.2"
|
||||
scoped-tls = "1.0.0"
|
||||
slab = "0.4.2"
|
||||
socket2 = { version = "0.3.12", features = ["pair", "unix"] }
|
||||
|
||||
|
|
|
@ -0,0 +1,476 @@
|
|||
use std::cell::Cell;
|
||||
use std::future::Future;
|
||||
use std::panic;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
use std::thread::{self, ThreadId};
|
||||
|
||||
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
|
||||
use scoped_tls::scoped_thread_local;
|
||||
use slab::Slab;
|
||||
|
||||
use crate::task::{Runnable, Task};
|
||||
|
||||
scoped_thread_local! {
|
||||
static WORKER: Worker
|
||||
}
|
||||
|
||||
/// State shared between [`Queue`] and [`Worker`].
|
||||
struct Global {
|
||||
/// The global queue.
|
||||
queue: ConcurrentQueue<Runnable>,
|
||||
|
||||
/// Shards of the global queue created by workers.
|
||||
shards: RwLock<Slab<Arc<ConcurrentQueue<Runnable>>>>,
|
||||
|
||||
/// Set to `true` when a sleeping worker is notified or no workers are sleeping.
|
||||
notified: AtomicBool,
|
||||
|
||||
/// A list of sleeping workers.
|
||||
sleepers: Mutex<Sleepers>,
|
||||
}
|
||||
|
||||
impl Global {
|
||||
/// Notifies a sleeping worker.
|
||||
fn notify(&self) {
|
||||
if !self
|
||||
.notified
|
||||
.compare_and_swap(false, true, Ordering::SeqCst)
|
||||
{
|
||||
let callback = self.sleepers.lock().unwrap().notify();
|
||||
if let Some(cb) = callback {
|
||||
(cb)();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A list of sleeping workers.
|
||||
struct Sleepers {
|
||||
/// Number of sleeping workers.
|
||||
count: usize,
|
||||
|
||||
/// Callbacks of sleeping unnotified workers.
|
||||
callbacks: Vec<Arc<dyn Fn() + Send + Sync>>,
|
||||
}
|
||||
|
||||
impl Sleepers {
|
||||
/// Inserts a new sleeping worker.
|
||||
fn insert(&mut self, callback: &Arc<dyn Fn() + Send + Sync>) {
|
||||
self.count += 1;
|
||||
self.callbacks.push(callback.clone());
|
||||
}
|
||||
|
||||
/// Updates the callback of an already inserted worker.
|
||||
fn update(&mut self, callback: &Arc<dyn Fn() + Send + Sync>) {
|
||||
if self.callbacks.iter().all(|cb| !Arc::ptr_eq(cb, callback)) {
|
||||
self.callbacks.push(callback.clone());
|
||||
}
|
||||
}
|
||||
|
||||
/// Removes a previously inserted worker.
|
||||
fn remove(&mut self, callback: &Arc<dyn Fn() + Send + Sync>) {
|
||||
self.count -= 1;
|
||||
for i in (0..self.callbacks.len()).rev() {
|
||||
if Arc::ptr_eq(&self.callbacks[i], callback) {
|
||||
self.callbacks.remove(i);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` if a sleeping worker is notified or no workers are sleeping.
|
||||
fn is_notified(&self) -> bool {
|
||||
self.count == 0 || self.count > self.callbacks.len()
|
||||
}
|
||||
|
||||
/// Returns notification callback for a sleeping worker.
|
||||
///
|
||||
/// If a worker was notified already or there are no workers, `None` will be returned.
|
||||
fn notify(&mut self) -> Option<Arc<dyn Fn() + Send + Sync>> {
|
||||
if self.callbacks.len() == self.count {
|
||||
self.callbacks.pop().clone()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A queue for spawning tasks.
|
||||
pub(crate) struct Queue {
|
||||
global: Arc<Global>,
|
||||
}
|
||||
|
||||
impl Queue {
|
||||
/// Creates a new queue for spawning tasks.
|
||||
pub fn new() -> Queue {
|
||||
Queue {
|
||||
global: Arc::new(Global {
|
||||
queue: ConcurrentQueue::unbounded(),
|
||||
shards: RwLock::new(Slab::new()),
|
||||
notified: AtomicBool::new(true),
|
||||
sleepers: Mutex::new(Sleepers {
|
||||
count: 0,
|
||||
callbacks: Vec::new(),
|
||||
}),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a future onto this queue.
|
||||
///
|
||||
/// Returns a [`Task`] handle for the spawned task.
|
||||
pub fn spawn<T: Send + 'static>(
|
||||
&self,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> Task<T> {
|
||||
let global = self.global.clone();
|
||||
|
||||
// The function that schedules a runnable task when it gets woken up.
|
||||
let schedule = move |runnable| {
|
||||
if WORKER.is_set() {
|
||||
WORKER.with(|w| {
|
||||
if Arc::ptr_eq(&global, &w.global) {
|
||||
if let Err(err) = w.shard.push(runnable) {
|
||||
global.queue.push(err.into_inner()).unwrap();
|
||||
}
|
||||
} else {
|
||||
global.queue.push(runnable).unwrap();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
global.queue.push(runnable).unwrap();
|
||||
}
|
||||
|
||||
global.notify();
|
||||
};
|
||||
|
||||
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
|
||||
let (runnable, handle) = async_task::spawn(future, schedule, ());
|
||||
runnable.schedule();
|
||||
Task(Some(handle))
|
||||
}
|
||||
|
||||
/// Registers a new worker.
|
||||
///
|
||||
/// The worker will automatically deregister itself when dropped.
|
||||
pub fn worker(&self, notify: impl Fn() + Send + Sync + 'static) -> Worker {
|
||||
let mut shards = self.global.shards.write().unwrap();
|
||||
let vacant = shards.vacant_entry();
|
||||
|
||||
// Create a worker and put its stealer handle into the executor.
|
||||
let worker = Worker {
|
||||
key: vacant.key(),
|
||||
global: Arc::new(self.global.clone()),
|
||||
shard: SlotQueue {
|
||||
slot: Cell::new(None),
|
||||
queue: Arc::new(ConcurrentQueue::bounded(512)),
|
||||
},
|
||||
local: SlotQueue {
|
||||
slot: Cell::new(None),
|
||||
queue: Arc::new(ConcurrentQueue::unbounded()),
|
||||
},
|
||||
callback: Arc::new(notify),
|
||||
sleeping: Cell::new(false),
|
||||
ticker: Cell::new(0),
|
||||
};
|
||||
vacant.insert(worker.shard.queue.clone());
|
||||
|
||||
worker
|
||||
}
|
||||
}
|
||||
|
||||
/// A worker that participates in the work-stealing executor.
|
||||
///
|
||||
/// Each invocation of `run()` creates its own worker.
|
||||
pub(crate) struct Worker {
|
||||
/// The ID of this worker obtained during registration.
|
||||
key: usize,
|
||||
|
||||
/// The global queue.
|
||||
global: Arc<Arc<Global>>,
|
||||
|
||||
/// A shard of the global queue.
|
||||
shard: SlotQueue<Runnable>,
|
||||
|
||||
/// Local queue for `!Send` tasks.
|
||||
local: SlotQueue<Runnable>,
|
||||
|
||||
/// Callback invoked to wake this worker up.
|
||||
callback: Arc<dyn Fn() + Send + Sync>,
|
||||
|
||||
/// Set to `true` when in sleeping state.
|
||||
sleeping: Cell<bool>,
|
||||
|
||||
/// Bumped every time a task is run.
|
||||
ticker: Cell<usize>,
|
||||
}
|
||||
|
||||
impl Worker {
|
||||
/// Spawns a local future onto this executor.
|
||||
///
|
||||
/// Returns a [`Task`] handle for the spawned task.
|
||||
pub fn spawn_local<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
|
||||
let queue = self.local.queue.clone();
|
||||
let callback = self.callback.clone();
|
||||
let id = thread_id();
|
||||
|
||||
// The function that schedules a runnable task when it gets woken up.
|
||||
let schedule = move |runnable| {
|
||||
if thread_id() == id && WORKER.is_set() {
|
||||
WORKER.with(|w| {
|
||||
if Arc::ptr_eq(&queue, &w.local.queue) {
|
||||
w.local.push(runnable).unwrap();
|
||||
} else {
|
||||
queue.push(runnable).unwrap();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
queue.push(runnable).unwrap();
|
||||
}
|
||||
|
||||
callback();
|
||||
};
|
||||
|
||||
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
|
||||
let (runnable, handle) = async_task::spawn_local(future, schedule, ());
|
||||
runnable.schedule();
|
||||
Task(Some(handle))
|
||||
}
|
||||
|
||||
/// Enters the context of this executor.
|
||||
fn enter<T>(&self, f: impl FnOnce() -> T) -> T {
|
||||
// TODO(stjepang): Allow recursive executors.
|
||||
if WORKER.is_set() {
|
||||
panic!("cannot run an executor inside another executor");
|
||||
}
|
||||
WORKER.set(self, f)
|
||||
}
|
||||
|
||||
/// Moves the worker into sleeping state.
|
||||
fn sleep(&self) -> bool {
|
||||
let mut sleepers = self.global.sleepers.lock().unwrap();
|
||||
|
||||
if self.sleeping.get() {
|
||||
sleepers.update(&self.callback);
|
||||
self.global
|
||||
.notified
|
||||
.swap(sleepers.is_notified(), Ordering::SeqCst);
|
||||
false
|
||||
} else {
|
||||
sleepers.insert(&self.callback);
|
||||
self.global
|
||||
.notified
|
||||
.swap(sleepers.is_notified(), Ordering::SeqCst);
|
||||
self.sleeping.set(true);
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
/// Moves the worker into woken state.
|
||||
fn wake(&self) -> bool {
|
||||
if self.sleeping.get() {
|
||||
let mut sleepers = self.global.sleepers.lock().unwrap();
|
||||
sleepers.remove(&self.callback);
|
||||
self.global
|
||||
.notified
|
||||
.swap(sleepers.is_notified(), Ordering::SeqCst);
|
||||
self.sleeping.set(false);
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs a single task and returns `true` if one was found.
|
||||
pub fn tick(&self) -> bool {
|
||||
loop {
|
||||
match self.search() {
|
||||
None => {
|
||||
// Go to sleep and then:
|
||||
// - If already in sleeping state, return.
|
||||
// - Otherwise, search again.
|
||||
if !self.sleep() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Some(r) => {
|
||||
// Wake up.
|
||||
if !self.wake() {
|
||||
// If already woken, notify another worker.
|
||||
self.global.notify();
|
||||
}
|
||||
|
||||
// Bump the ticker.
|
||||
let ticker = self.ticker.get();
|
||||
self.ticker.set(ticker.wrapping_add(1));
|
||||
|
||||
// Flush slots to ensure fair task scheduling.
|
||||
if ticker % 16 == 0 {
|
||||
if let Err(err) = self.shard.flush() {
|
||||
self.global.queue.push(err.into_inner()).unwrap();
|
||||
self.global.notify();
|
||||
}
|
||||
self.local.flush().unwrap();
|
||||
}
|
||||
|
||||
// Steal tasks from the global queue to ensure fair task scheduling.
|
||||
if ticker % 64 == 0 {
|
||||
self.shard.steal(&self.global.queue);
|
||||
}
|
||||
|
||||
// Run the task.
|
||||
if self.enter(|| r.run()) {
|
||||
// The task was woken while it was running, which means it got
|
||||
// scheduled the moment running completed. Therefore, it is now inside
|
||||
// the slot and would be the next task to run.
|
||||
//
|
||||
// Instead of re-running the task in the next iteration, let's flush
|
||||
// the slot in order to give other tasks a chance to run.
|
||||
//
|
||||
// This is a necessary step to ensure task yielding works as expected.
|
||||
// If a task wakes itself and returns `Poll::Pending`, we don't want it
|
||||
// to run immediately after that because that'd defeat the whole
|
||||
// purpose of yielding.
|
||||
if let Err(err) = self.shard.flush() {
|
||||
self.global.queue.push(err.into_inner()).unwrap();
|
||||
self.global.notify();
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Finds the next task to run.
|
||||
fn search(&self) -> Option<Runnable> {
|
||||
if self.ticker.get() % 2 == 0 {
|
||||
// On even ticks, look into the local queue and then into the shard.
|
||||
if let Ok(r) = self.local.pop().or_else(|_| self.shard.pop()) {
|
||||
return Some(r);
|
||||
}
|
||||
} else {
|
||||
// On odd ticks, look into the shard and then into the local queue.
|
||||
if let Ok(r) = self.shard.pop().or_else(|_| self.local.pop()) {
|
||||
return Some(r);
|
||||
}
|
||||
}
|
||||
|
||||
// Try stealing from the global queue.
|
||||
self.shard.steal(&self.global.queue);
|
||||
if let Ok(r) = self.shard.pop() {
|
||||
return Some(r);
|
||||
}
|
||||
|
||||
// Try stealing from other shards.
|
||||
let shards = self.global.shards.read().unwrap();
|
||||
|
||||
// Pick a random starting point in the iterator list and rotate the list.
|
||||
let n = shards.len();
|
||||
let start = fastrand::usize(..n);
|
||||
let iter = shards.iter().chain(shards.iter()).skip(start).take(n);
|
||||
|
||||
// Remove this worker's shard.
|
||||
let iter = iter.filter(|(key, _)| *key != self.key);
|
||||
let iter = iter.map(|(_, shard)| shard);
|
||||
|
||||
// Try stealing from each shard in the list.
|
||||
for shard in iter {
|
||||
self.shard.steal(shard);
|
||||
if let Ok(r) = self.shard.pop() {
|
||||
return Some(r);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Worker {
|
||||
fn drop(&mut self) {
|
||||
// Wake and unregister the worker.
|
||||
self.wake();
|
||||
self.global.shards.write().unwrap().remove(self.key);
|
||||
|
||||
// Re-schedule remaining tasks in the shard.
|
||||
while let Ok(r) = self.shard.pop() {
|
||||
r.schedule();
|
||||
}
|
||||
// Notify another worker to start searching for tasks.
|
||||
self.global.notify();
|
||||
|
||||
// TODO(stjepang): Close the local queue and empty it.
|
||||
}
|
||||
}
|
||||
|
||||
/// A queue with a single-item slot in front of it.
|
||||
struct SlotQueue<T> {
|
||||
slot: Cell<Option<T>>,
|
||||
queue: Arc<ConcurrentQueue<T>>,
|
||||
}
|
||||
|
||||
impl<T> SlotQueue<T> {
|
||||
/// Pushes an item into the slot, overflowing the old item into the queue.
|
||||
fn push(&self, t: T) -> Result<(), PushError<T>> {
|
||||
match self.slot.replace(Some(t)) {
|
||||
None => Ok(()),
|
||||
Some(t) => self.queue.push(t),
|
||||
}
|
||||
}
|
||||
|
||||
/// Pops an item from the slot, or queue if the slot is empty.
|
||||
fn pop(&self) -> Result<T, PopError> {
|
||||
match self.slot.take() {
|
||||
None => self.queue.pop(),
|
||||
Some(t) => Ok(t),
|
||||
}
|
||||
}
|
||||
|
||||
/// Flushes the slot into the queue.
|
||||
fn flush(&self) -> Result<(), PushError<T>> {
|
||||
match self.slot.take() {
|
||||
None => Ok(()),
|
||||
Some(t) => self.queue.push(t),
|
||||
}
|
||||
}
|
||||
|
||||
/// Steals some items from another queue.
|
||||
fn steal(&self, from: &ConcurrentQueue<T>) {
|
||||
// Flush the slot before stealing.
|
||||
if let Err(err) = self.flush() {
|
||||
self.slot.set(Some(err.into_inner()));
|
||||
return;
|
||||
}
|
||||
|
||||
// Half of `from`'s length rounded up.
|
||||
let mut count = (from.len() + 1) / 2;
|
||||
|
||||
if count > 0 {
|
||||
// Don't steal more than fits into the queue.
|
||||
if let Some(cap) = self.queue.capacity() {
|
||||
count = count.min(cap - self.queue.len());
|
||||
}
|
||||
|
||||
// Steal tasks.
|
||||
for _ in 0..count {
|
||||
if let Ok(t) = from.pop() {
|
||||
assert!(self.queue.push(t).is_ok());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Same as `std::thread::current().id()`, but more efficient.
|
||||
fn thread_id() -> ThreadId {
|
||||
thread_local! {
|
||||
static ID: ThreadId = thread::current().id();
|
||||
}
|
||||
|
||||
ID.try_with(|id| *id)
|
||||
.unwrap_or_else(|_| thread::current().id())
|
||||
}
|
|
@ -8,7 +8,7 @@
|
|||
use std::io::{self, Read, Write};
|
||||
#[cfg(windows)]
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{self, AtomicBool, Ordering};
|
||||
use std::sync::atomic::{self, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
|
@ -24,9 +24,6 @@ type Notifier = linux::EventFd;
|
|||
|
||||
/// A self-pipe.
|
||||
struct Inner {
|
||||
/// Set to `true` if notified.
|
||||
flag: AtomicBool,
|
||||
|
||||
/// The writer side, emptied by `clear()`.
|
||||
writer: Notifier,
|
||||
|
||||
|
@ -44,7 +41,6 @@ impl IoEvent {
|
|||
let (writer, reader) = notifier()?;
|
||||
|
||||
Ok(IoEvent(Arc::new(Inner {
|
||||
flag: AtomicBool::new(false),
|
||||
writer,
|
||||
reader: Async::new(reader)?,
|
||||
})))
|
||||
|
@ -55,46 +51,21 @@ impl IoEvent {
|
|||
// Publish all in-memory changes before setting the flag.
|
||||
atomic::fence(Ordering::SeqCst);
|
||||
|
||||
// If the flag is not set...
|
||||
if !self.0.flag.load(Ordering::SeqCst) {
|
||||
// If this thread sets it...
|
||||
if !self.0.flag.swap(true, Ordering::SeqCst) {
|
||||
// Trigger an I/O event by writing a byte into the sending socket.
|
||||
let _ = (&self.0.writer).write(&1u64.to_ne_bytes());
|
||||
let _ = (&self.0.writer).flush();
|
||||
// Trigger an I/O event by writing a byte into the sending socket.
|
||||
let _ = (&self.0.writer).write(&1u64.to_ne_bytes());
|
||||
let _ = (&self.0.writer).flush();
|
||||
|
||||
// Re-register to wake up the poller.
|
||||
let _ = self.0.reader.reregister_io_event();
|
||||
}
|
||||
}
|
||||
// Re-register to wake up the poller.
|
||||
let _ = self.0.reader.reregister_io_event();
|
||||
}
|
||||
|
||||
/// Sets the flag to `false`.
|
||||
pub fn clear(&self) -> bool {
|
||||
pub fn clear(&self) {
|
||||
// Read all available bytes from the receiving socket.
|
||||
while self.0.reader.get_ref().read(&mut [0; 64]).is_ok() {}
|
||||
let value = self.0.flag.swap(false, Ordering::SeqCst);
|
||||
|
||||
// Publish all in-memory changes after clearing the flag.
|
||||
atomic::fence(Ordering::SeqCst);
|
||||
value
|
||||
}
|
||||
|
||||
/// Waits until notified.
|
||||
///
|
||||
/// You should assume notifications may spuriously occur.
|
||||
pub async fn notified(&self) {
|
||||
self.0
|
||||
.reader
|
||||
.read_with(|_| {
|
||||
if self.0.flag.load(Ordering::SeqCst) {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(io::ErrorKind::WouldBlock.into())
|
||||
}
|
||||
})
|
||||
.await
|
||||
.expect("failure while waiting on a self-pipe");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -119,16 +119,15 @@ mod async_io;
|
|||
mod block_on;
|
||||
mod blocking;
|
||||
mod context;
|
||||
mod executor;
|
||||
mod io_event;
|
||||
mod io_parking;
|
||||
mod parking;
|
||||
mod reactor;
|
||||
mod run;
|
||||
mod sys;
|
||||
mod task;
|
||||
mod thread_local;
|
||||
mod throttle;
|
||||
mod timer;
|
||||
mod work_stealing;
|
||||
|
||||
pub use self::blocking::{iter, reader, writer};
|
||||
pub use async_io::Async;
|
||||
|
|
|
@ -1,93 +1,127 @@
|
|||
use std::cell::Cell;
|
||||
use std::fmt;
|
||||
use std::marker::PhantomData;
|
||||
use std::sync::atomic::AtomicUsize;
|
||||
use std::sync::atomic::Ordering::SeqCst;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
|
||||
use std::sync::{Arc, Condvar, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
use slab::Slab;
|
||||
|
||||
use crate::io_event::IoEvent;
|
||||
use crate::reactor::{Reactor, ReactorLock};
|
||||
use crate::reactor::Reactor;
|
||||
|
||||
pub(crate) struct IoParker {
|
||||
unparker: IoUnparker,
|
||||
_marker: PhantomData<*const ()>,
|
||||
static REGISTRY: Lazy<Mutex<Slab<Unparker>>> = Lazy::new(|| Mutex::new(Slab::new()));
|
||||
|
||||
/// Parks a thread.
|
||||
pub(crate) struct Parker {
|
||||
key: Cell<Option<usize>>,
|
||||
unparker: Unparker,
|
||||
}
|
||||
|
||||
unsafe impl Send for IoParker {}
|
||||
unsafe impl Send for Parker {}
|
||||
|
||||
impl IoParker {
|
||||
pub fn new() -> IoParker {
|
||||
IoParker {
|
||||
unparker: IoUnparker {
|
||||
impl Parker {
|
||||
/// Creates a new [`Parker`].
|
||||
pub fn new() -> Parker {
|
||||
Parker {
|
||||
key: Cell::new(None),
|
||||
unparker: Unparker {
|
||||
inner: Arc::new(Inner {
|
||||
state: AtomicUsize::new(EMPTY),
|
||||
lock: Mutex::new(()),
|
||||
cvar: Condvar::new(),
|
||||
}),
|
||||
},
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks the current thread until the token is made available.
|
||||
pub fn park(&self) {
|
||||
self.register();
|
||||
self.unparker.inner.park(None);
|
||||
}
|
||||
|
||||
/// Blocks the current thread until the token is made available or the timeout is reached.
|
||||
pub fn park_timeout(&self, timeout: Duration) -> bool {
|
||||
self.register();
|
||||
self.unparker.inner.park(Some(timeout))
|
||||
}
|
||||
|
||||
pub fn park_deadline(&self, deadline: Instant) -> bool {
|
||||
self.unparker
|
||||
.inner
|
||||
.park(Some(deadline.saturating_duration_since(Instant::now())))
|
||||
}
|
||||
// /// Blocks the current thread until the token is made available or the deadline is reached.
|
||||
// pub fn park_deadline(&self, deadline: Instant) -> bool {
|
||||
// self.register();
|
||||
// self.unparker
|
||||
// .inner
|
||||
// .park(Some(deadline.saturating_duration_since(Instant::now())))
|
||||
// }
|
||||
//
|
||||
// /// Atomically makes the token available if it is not already.
|
||||
// pub fn unpark(&self) {
|
||||
// self.unparker.unpark()
|
||||
// }
|
||||
|
||||
pub fn unpark(&self) {
|
||||
self.unparker.unpark()
|
||||
}
|
||||
|
||||
pub fn unparker(&self) -> IoUnparker {
|
||||
/// Returns a handle for unparking.
|
||||
pub fn unparker(&self) -> Unparker {
|
||||
self.unparker.clone()
|
||||
}
|
||||
|
||||
fn register(&self) {
|
||||
if self.key.get().is_none() {
|
||||
let mut reg = REGISTRY.lock().unwrap();
|
||||
let key = reg.insert(self.unparker.clone());
|
||||
self.key.set(Some(key));
|
||||
}
|
||||
}
|
||||
|
||||
fn unregister(&self) {
|
||||
if let Some(key) = self.key.take() {
|
||||
let mut reg = REGISTRY.lock().unwrap();
|
||||
reg.remove(key);
|
||||
|
||||
// Notify another parker to make sure the reactor keeps getting polled.
|
||||
if let Some((_, u)) = reg.iter().next() {
|
||||
u.unpark();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for IoParker {
|
||||
impl Drop for Parker {
|
||||
fn drop(&mut self) {
|
||||
// TODO: wake up another active IoParker
|
||||
self.unregister();
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for IoParker {
|
||||
impl fmt::Debug for Parker {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.pad("IoParker { .. }")
|
||||
f.pad("Parker { .. }")
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct IoUnparker {
|
||||
/// Unparks a thread.
|
||||
pub(crate) struct Unparker {
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
unsafe impl Send for IoUnparker {}
|
||||
unsafe impl Sync for IoUnparker {}
|
||||
unsafe impl Send for Unparker {}
|
||||
unsafe impl Sync for Unparker {}
|
||||
|
||||
impl IoUnparker {
|
||||
impl Unparker {
|
||||
/// Atomically makes the token available if it is not already.
|
||||
pub fn unpark(&self) {
|
||||
self.inner.unpark()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for IoUnparker {
|
||||
impl fmt::Debug for Unparker {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.pad("IoUnparker { .. }")
|
||||
f.pad("Unparker { .. }")
|
||||
}
|
||||
}
|
||||
|
||||
impl Clone for IoUnparker {
|
||||
fn clone(&self) -> IoUnparker {
|
||||
IoUnparker {
|
||||
impl Clone for Unparker {
|
||||
fn clone(&self) -> Unparker {
|
||||
Unparker {
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
|
@ -108,8 +142,6 @@ struct Inner {
|
|||
|
||||
impl Inner {
|
||||
fn park(&self, timeout: Option<Duration>) -> bool {
|
||||
let mut reactor_lock = Reactor::get().try_lock();
|
||||
|
||||
// If we were previously notified then we consume this notification and return quickly.
|
||||
if self
|
||||
.state
|
||||
|
@ -118,7 +150,9 @@ impl Inner {
|
|||
{
|
||||
// Process available I/O events.
|
||||
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
|
||||
reactor_lock.poll().expect("failure while polling I/O");
|
||||
reactor_lock
|
||||
.react(Some(Duration::from_secs(0)))
|
||||
.expect("failure while polling I/O");
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -128,19 +162,21 @@ impl Inner {
|
|||
if dur == Duration::from_millis(0) {
|
||||
// Process available I/O events.
|
||||
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
|
||||
reactor_lock.poll().expect("failure while polling I/O");
|
||||
reactor_lock
|
||||
.react(Some(Duration::from_secs(0)))
|
||||
.expect("failure while polling I/O");
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Otherwise we need to coordinate going to sleep.
|
||||
let mut m = self.lock.lock().unwrap();
|
||||
|
||||
let mut reactor_lock = Reactor::get().try_lock();
|
||||
let state = match reactor_lock {
|
||||
None => PARKED,
|
||||
Some(_) => POLLING,
|
||||
};
|
||||
let mut m = self.lock.lock().unwrap();
|
||||
|
||||
match self.state.compare_exchange(EMPTY, state, SeqCst, SeqCst) {
|
||||
Ok(_) => {}
|
||||
|
@ -166,11 +202,10 @@ impl Inner {
|
|||
None => m = self.cvar.wait(m).unwrap(),
|
||||
Some(reactor_lock) => {
|
||||
drop(m);
|
||||
if EVENT.clear() {
|
||||
reactor_lock.poll().expect("TODO");
|
||||
} else {
|
||||
reactor_lock.wait().expect("TODO");
|
||||
}
|
||||
|
||||
reactor_lock.react(None).expect("failure while polling I/O");
|
||||
EVENT.clear();
|
||||
|
||||
m = self.lock.lock().unwrap();
|
||||
}
|
||||
}
|
||||
|
@ -185,14 +220,20 @@ impl Inner {
|
|||
// Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
|
||||
// notification we just want to unconditionally set `state` back to `EMPTY`, either
|
||||
// consuming a notification or un-flagging ourselves as parked.
|
||||
let _m = match &mut reactor_lock {
|
||||
let _m = match reactor_lock.as_mut() {
|
||||
None => self.cvar.wait_timeout(m, timeout).unwrap().0,
|
||||
Some(reactor_lock) => {
|
||||
drop(m);
|
||||
if EVENT.clear() {
|
||||
reactor_lock.poll().expect("TODO");
|
||||
} else {
|
||||
reactor_lock.wait().expect("TODO"); // TODO: use actual timeout
|
||||
let deadline = Instant::now() + timeout;
|
||||
loop {
|
||||
reactor_lock
|
||||
.react(Some(deadline.saturating_duration_since(Instant::now())))
|
||||
.expect("failure while polling I/O");
|
||||
EVENT.clear();
|
||||
|
||||
if Instant::now() >= deadline {
|
||||
break;
|
||||
}
|
||||
}
|
||||
self.lock.lock().unwrap()
|
||||
}
|
115
src/reactor.rs
115
src/reactor.rs
|
@ -56,7 +56,7 @@ pub(crate) struct Reactor {
|
|||
sources: piper::Mutex<Slab<Arc<Source>>>,
|
||||
|
||||
/// Temporary storage for I/O events when polling the reactor.
|
||||
events: piper::Lock<sys::Events>,
|
||||
events: piper::Mutex<sys::Events>,
|
||||
|
||||
/// An ordered map of registered timers.
|
||||
///
|
||||
|
@ -84,7 +84,7 @@ impl Reactor {
|
|||
static REACTOR: Lazy<Reactor> = Lazy::new(|| Reactor {
|
||||
sys: sys::Reactor::new().expect("cannot initialize I/O event notification"),
|
||||
sources: piper::Mutex::new(Slab::new()),
|
||||
events: piper::Lock::new(sys::Events::new()),
|
||||
events: piper::Mutex::new(sys::Events::new()),
|
||||
timers: piper::Mutex::new(BTreeMap::new()),
|
||||
timer_ops: ConcurrentQueue::bounded(1000),
|
||||
timer_event: Lazy::new(|| IoEvent::new().expect("cannot create an `IoEvent`")),
|
||||
|
@ -177,13 +177,6 @@ impl Reactor {
|
|||
})
|
||||
}
|
||||
|
||||
/// Locks the reactor.
|
||||
pub async fn lock(&self) -> ReactorLock<'_> {
|
||||
let reactor = self;
|
||||
let events = self.events.lock().await;
|
||||
ReactorLock { reactor, events }
|
||||
}
|
||||
|
||||
/// Fires ready timers.
|
||||
///
|
||||
/// Returns the duration until the next timer before this method was called.
|
||||
|
@ -240,79 +233,69 @@ impl Reactor {
|
|||
/// A lock on the reactor.
|
||||
pub(crate) struct ReactorLock<'a> {
|
||||
reactor: &'a Reactor,
|
||||
events: piper::LockGuard<sys::Events>,
|
||||
events: piper::MutexGuard<'a, sys::Events>,
|
||||
}
|
||||
|
||||
impl ReactorLock<'_> {
|
||||
/// Processes ready events without blocking.
|
||||
pub fn poll(&mut self) -> io::Result<()> {
|
||||
self.react(false)
|
||||
}
|
||||
|
||||
/// Blocks until at least one event is processed.
|
||||
pub fn wait(&mut self) -> io::Result<()> {
|
||||
self.react(true)
|
||||
}
|
||||
|
||||
/// Processes new events, optionally blocking until the first event.
|
||||
fn react(&mut self, block: bool) -> io::Result<()> {
|
||||
// Fire timers and compute the timeout for blocking on I/O events.
|
||||
/// Processes new events, blocking until the first event or the timeout.
|
||||
pub fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
|
||||
// Fire timers.
|
||||
let next_timer = self.reactor.fire_timers();
|
||||
let timeout = if block {
|
||||
next_timer
|
||||
} else {
|
||||
Some(Duration::from_secs(0))
|
||||
|
||||
// compute the timeout for blocking on I/O events.
|
||||
let timeout = match (next_timer, timeout) {
|
||||
(None, None) => None,
|
||||
(Some(t), None) | (None, Some(t)) => Some(t),
|
||||
(Some(a), Some(b)) => Some(a.min(b)),
|
||||
};
|
||||
|
||||
loop {
|
||||
// Block on I/O events.
|
||||
match self.reactor.sys.wait(&mut self.events, timeout) {
|
||||
// The timeout was hit so fire ready timers.
|
||||
Ok(0) => {
|
||||
self.reactor.fire_timers();
|
||||
return Ok(());
|
||||
}
|
||||
// Block on I/O events.
|
||||
match self.reactor.sys.wait(&mut self.events, timeout) {
|
||||
// The timeout was hit so fire ready timers.
|
||||
Ok(0) => {
|
||||
self.reactor.fire_timers();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// At least one I/O event occured.
|
||||
Ok(_) => {
|
||||
// Iterate over sources in the event list.
|
||||
let sources = self.reactor.sources.lock();
|
||||
let mut ready = Vec::new();
|
||||
// At least one I/O event occured.
|
||||
Ok(_) => {
|
||||
// Iterate over sources in the event list.
|
||||
let sources = self.reactor.sources.lock();
|
||||
let mut ready = Vec::new();
|
||||
|
||||
for ev in self.events.iter() {
|
||||
// Check if there is a source in the table with this key.
|
||||
if let Some(source) = sources.get(ev.key) {
|
||||
let mut wakers = source.wakers.lock();
|
||||
for ev in self.events.iter() {
|
||||
// Check if there is a source in the table with this key.
|
||||
if let Some(source) = sources.get(ev.key) {
|
||||
let mut wakers = source.wakers.lock();
|
||||
|
||||
// Wake readers if a readability event was emitted.
|
||||
if ev.readable {
|
||||
ready.append(&mut wakers.readers);
|
||||
}
|
||||
// Wake readers if a readability event was emitted.
|
||||
if ev.readable {
|
||||
ready.append(&mut wakers.readers);
|
||||
}
|
||||
|
||||
// Wake writers if a writability event was emitted.
|
||||
if ev.writable {
|
||||
ready.append(&mut wakers.writers);
|
||||
}
|
||||
// Wake writers if a writability event was emitted.
|
||||
if ev.writable {
|
||||
ready.append(&mut wakers.writers);
|
||||
}
|
||||
}
|
||||
|
||||
// Drop the lock before waking.
|
||||
drop(sources);
|
||||
|
||||
// Wake up tasks waiting on I/O.
|
||||
for waker in ready {
|
||||
waker.wake();
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// The syscall was interrupted.
|
||||
Err(err) if err.kind() == io::ErrorKind::Interrupted => continue,
|
||||
// Drop the lock before waking.
|
||||
drop(sources);
|
||||
|
||||
// An actual error occureed.
|
||||
Err(err) => return Err(err),
|
||||
// Wake up tasks waiting on I/O.
|
||||
for waker in ready {
|
||||
waker.wake();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// The syscall was interrupted.
|
||||
Err(err) if err.kind() == io::ErrorKind::Interrupted => Ok(()),
|
||||
|
||||
// An actual error occureed.
|
||||
Err(err) => Err(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
73
src/run.rs
73
src/run.rs
|
@ -4,19 +4,23 @@
|
|||
|
||||
use std::future::Future;
|
||||
use std::task::{Context, Poll};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use futures_util::future::{self, Either};
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::block_on;
|
||||
use crate::context;
|
||||
use crate::io_event::IoEvent;
|
||||
use crate::io_parking::{IoParker, IoUnparker};
|
||||
use crate::reactor::{Reactor, ReactorLock};
|
||||
use crate::thread_local::LocalExecutor;
|
||||
use crate::executor::{Queue, Worker};
|
||||
use crate::parking::Parker;
|
||||
use crate::throttle;
|
||||
use crate::work_stealing::{Executor, Worker};
|
||||
use scoped_tls::scoped_thread_local;
|
||||
|
||||
/// The global task queue.
|
||||
pub(crate) static QUEUE: Lazy<Queue> = Lazy::new(|| Queue::new());
|
||||
|
||||
scoped_thread_local! {
|
||||
/// Thread-local worker queue.
|
||||
pub(crate) static WORKER: Worker
|
||||
}
|
||||
|
||||
/// Runs executors and polls the reactor.
|
||||
///
|
||||
|
@ -97,13 +101,10 @@ use crate::work_stealing::{Executor, Worker};
|
|||
/// }
|
||||
/// ```
|
||||
pub fn run<T>(future: impl Future<Output = T>) -> T {
|
||||
let parker = IoParker::new();
|
||||
let parker = Parker::new();
|
||||
|
||||
let unparker = parker.unparker();
|
||||
let worker = Executor::get().worker(move || unparker.unpark());
|
||||
|
||||
let unparker = parker.unparker();
|
||||
let local = LocalExecutor::new(move || unparker.unpark());
|
||||
let worker = QUEUE.worker(move || unparker.unpark());
|
||||
|
||||
// Create a waker that triggers an I/O event in the thread-local scheduler.
|
||||
let unparker = parker.unparker();
|
||||
|
@ -112,38 +113,24 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
|
|||
futures_util::pin_mut!(future);
|
||||
|
||||
// Set up tokio if enabled.
|
||||
// let mut enter = context::enter;
|
||||
|
||||
loop {
|
||||
// Poll the main future.
|
||||
if let Poll::Ready(val) = throttle::setup(|| future.as_mut().poll(cx)) {
|
||||
return val;
|
||||
}
|
||||
|
||||
let mut more_worker = true;
|
||||
let mut more_local = true;
|
||||
// enter(|| {
|
||||
for _ in 0..200 {
|
||||
if !worker.tick() {
|
||||
more_worker = false;
|
||||
break;
|
||||
context::enter(|| {
|
||||
WORKER.set(&worker, || {
|
||||
'start: loop {
|
||||
// Poll the main future.
|
||||
if let Poll::Ready(val) = throttle::setup(|| future.as_mut().poll(cx)) {
|
||||
return val;
|
||||
}
|
||||
}
|
||||
|
||||
for _ in 0..200 {
|
||||
if !local.tick() {
|
||||
more_local = false;
|
||||
break;
|
||||
for _ in 0..200 {
|
||||
if !worker.tick() {
|
||||
parker.park();
|
||||
continue 'start;
|
||||
}
|
||||
}
|
||||
}
|
||||
// });
|
||||
|
||||
if more_local || more_worker {
|
||||
// Process ready I/O events without blocking.
|
||||
parker.park_timeout(Duration::from_secs(0));
|
||||
} else {
|
||||
// Wait until unparked.
|
||||
parker.park();
|
||||
}
|
||||
}
|
||||
// Process ready I/O events without blocking.
|
||||
parker.park_timeout(Duration::from_secs(0));
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
15
src/task.rs
15
src/task.rs
|
@ -8,8 +8,7 @@ use std::pin::Pin;
|
|||
use std::task::{Context, Poll};
|
||||
|
||||
use crate::blocking::BlockingExecutor;
|
||||
use crate::thread_local::LocalExecutor;
|
||||
use crate::work_stealing::Executor;
|
||||
use crate::run::{QUEUE, WORKER};
|
||||
|
||||
/// A runnable future, ready for execution.
|
||||
///
|
||||
|
@ -77,7 +76,11 @@ impl<T: 'static> Task<T> {
|
|||
///
|
||||
/// [`run()`]: `crate::run()`
|
||||
pub fn local(future: impl Future<Output = T> + 'static) -> Task<T> {
|
||||
LocalExecutor::spawn(future)
|
||||
if WORKER.is_set() {
|
||||
WORKER.with(|w| w.spawn_local(future))
|
||||
} else {
|
||||
panic!("cannot spawn a thread-local task if not inside an executor")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -99,7 +102,11 @@ impl<T: Send + 'static> Task<T> {
|
|||
///
|
||||
/// [`run()`]: `crate::run()`
|
||||
pub fn spawn(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
|
||||
Executor::get().spawn(future)
|
||||
QUEUE.spawn(future)
|
||||
// WORKER.with(|w| match &*w.borrow() {
|
||||
// None => QUEUE.spawn(future),
|
||||
// Some(w) => w.spawn(future),
|
||||
// })
|
||||
}
|
||||
|
||||
/// Spawns a future onto the blocking executor.
|
||||
|
|
|
@ -1,152 +0,0 @@
|
|||
//! The thread-local executor.
|
||||
//!
|
||||
//! Tasks created by [`Task::local()`] go into this executor. Every thread calling
|
||||
//! [`run()`][`crate::run()`] creates a thread-local executor. Tasks cannot be spawned onto a
|
||||
//! thread-local executor if it is not running.
|
||||
|
||||
use std::cell::{Cell, RefCell};
|
||||
use std::collections::VecDeque;
|
||||
use std::future::Future;
|
||||
use std::sync::Arc;
|
||||
use std::thread::{self, ThreadId};
|
||||
|
||||
use concurrent_queue::ConcurrentQueue;
|
||||
use scoped_tls_hkt::scoped_thread_local;
|
||||
|
||||
use crate::task::{Runnable, Task};
|
||||
|
||||
scoped_thread_local! {
|
||||
/// The thread-local executor.
|
||||
///
|
||||
/// This thread-local is only set while inside [`LocalExecutor::enter()`].
|
||||
static EXECUTOR: LocalExecutor
|
||||
}
|
||||
|
||||
/// An executor for thread-local tasks.
|
||||
///
|
||||
/// Thread-local tasks are spawned by calling [`Task::local()`] and their futures do not have to
|
||||
/// implement [`Send`]. They can only be run by the same thread that created them.
|
||||
pub(crate) struct LocalExecutor {
|
||||
/// The main task queue.
|
||||
queue: RefCell<VecDeque<Runnable>>,
|
||||
|
||||
/// When another thread wakes a task belonging to this executor, it goes into this queue.
|
||||
injector: Arc<ConcurrentQueue<Runnable>>,
|
||||
|
||||
callback: Arc<dyn Fn() + Send + Sync>,
|
||||
|
||||
sleeping: Cell<bool>,
|
||||
|
||||
ticks: Cell<usize>,
|
||||
}
|
||||
|
||||
impl LocalExecutor {
|
||||
/// Creates a new thread-local executor.
|
||||
pub fn new(notify: impl Fn() + Send + Sync + 'static) -> LocalExecutor {
|
||||
LocalExecutor {
|
||||
queue: RefCell::new(VecDeque::new()),
|
||||
injector: Arc::new(ConcurrentQueue::unbounded()),
|
||||
callback: Arc::new(notify),
|
||||
sleeping: Cell::new(false),
|
||||
ticks: Cell::new(0),
|
||||
}
|
||||
}
|
||||
|
||||
/// Enters the context of this executor.
|
||||
pub fn enter<T>(&self, f: impl FnOnce() -> T) -> T {
|
||||
// TODO(stjepang): Allow recursive executors.
|
||||
if EXECUTOR.is_set() {
|
||||
panic!("cannot run an executor inside another executor");
|
||||
}
|
||||
EXECUTOR.set(self, f)
|
||||
}
|
||||
|
||||
/// Spawns a future onto this executor.
|
||||
///
|
||||
/// Returns a [`Task`] handle for the spawned task.
|
||||
pub fn spawn<T: 'static>(future: impl Future<Output = T> + 'static) -> Task<T> {
|
||||
if !EXECUTOR.is_set() {
|
||||
panic!("cannot spawn a thread-local task if not inside an executor");
|
||||
}
|
||||
|
||||
EXECUTOR.with(|ex| {
|
||||
// Why weak reference here? Injector may hold the task while the task's waker holds a
|
||||
// reference to the injector. So this reference must be weak to break the cycle.
|
||||
let injector = Arc::downgrade(&ex.injector);
|
||||
let callback = ex.callback.clone();
|
||||
let id = thread_id();
|
||||
|
||||
// The function that schedules a runnable task when it gets woken up.
|
||||
let schedule = move |runnable| {
|
||||
if thread_id() == id {
|
||||
// If scheduling from the original thread, push into the main queue.
|
||||
EXECUTOR.with(|ex| ex.queue.borrow_mut().push_back(runnable));
|
||||
} else if let Some(injector) = injector.upgrade() {
|
||||
// If scheduling from a different thread, push into the injector queue.
|
||||
injector.push(runnable).unwrap();
|
||||
}
|
||||
|
||||
// Trigger an I/O event to let the original thread know that a task has been
|
||||
// scheduled. If that thread is inside epoll/kqueue/wepoll, an I/O event will wake
|
||||
// it up.
|
||||
callback();
|
||||
};
|
||||
|
||||
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
|
||||
let (runnable, handle) = async_task::spawn_local(future, schedule, ());
|
||||
runnable.schedule();
|
||||
Task(Some(handle))
|
||||
})
|
||||
}
|
||||
|
||||
pub fn tick(&self) -> bool {
|
||||
match self.search() {
|
||||
None => {
|
||||
self.ticks.set(0);
|
||||
false
|
||||
}
|
||||
Some(r) => {
|
||||
self.ticks.set(self.ticks.get() + 1);
|
||||
if self.ticks.get() == 50 {
|
||||
self.ticks.set(0);
|
||||
self.fetch();
|
||||
}
|
||||
|
||||
self.enter(|| r.run());
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Finds the next task to run.
|
||||
fn search(&self) -> Option<Runnable> {
|
||||
// Check if there is a task in the main queue.
|
||||
if let Some(r) = self.queue.borrow_mut().pop_front() {
|
||||
return Some(r);
|
||||
}
|
||||
|
||||
// If not, fetch tasks from the injector queue.
|
||||
self.fetch();
|
||||
|
||||
// Check the main queue again.
|
||||
self.queue.borrow_mut().pop_front()
|
||||
}
|
||||
|
||||
/// Moves all tasks from the injector queue into the main queue.
|
||||
fn fetch(&self) {
|
||||
let mut queue = self.queue.borrow_mut();
|
||||
while let Ok(r) = self.injector.pop() {
|
||||
queue.push_back(r);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Same as `std::thread::current().id()`, but more efficient.
|
||||
fn thread_id() -> ThreadId {
|
||||
thread_local! {
|
||||
static ID: ThreadId = thread::current().id();
|
||||
}
|
||||
|
||||
ID.try_with(|id| *id)
|
||||
.unwrap_or_else(|_| thread::current().id())
|
||||
}
|
|
@ -7,7 +7,7 @@
|
|||
use std::cell::Cell;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use scoped_tls_hkt::scoped_thread_local;
|
||||
use scoped_tls::scoped_thread_local;
|
||||
|
||||
scoped_thread_local! {
|
||||
/// Number of times the current task is allowed to poll I/O operations.
|
||||
|
|
|
@ -1,394 +0,0 @@
|
|||
//! The work-stealing executor.
|
||||
//!
|
||||
//! Tasks created by [`Task::spawn()`] go into this executor. Every thread calling [`run()`]
|
||||
//! initializes a [`Worker`] that participates in work stealing, which is allowed to run any task
|
||||
//! in this executor or in other workers. Since tasks can be stolen by any worker and thus move
|
||||
//! from thread to thread, their futures must implement [`Send`].
|
||||
//!
|
||||
//! There is only one global instance of this type, accessible by [`Executor::get()`].
|
||||
//!
|
||||
//! [Work stealing] is a strategy that reduces contention in multi-threaded environments. If all
|
||||
//! invocations of [`run()`] used the same global task queue all the time, they would contend on
|
||||
//! the queue all the time, thus slowing the executor down.
|
||||
//!
|
||||
//! The solution is to have a separate queue for each invocation of [`run()`], called a "worker".
|
||||
//! Each thread is primarily using its own worker. Once all tasks in the worker are exhausted, then
|
||||
//! we look for tasks in the global queue, called "injector", or steal tasks from other workers.
|
||||
//!
|
||||
//! [`run()`]: crate::run()
|
||||
//! [Work stealing]: https://en.wikipedia.org/wiki/Work_stealing
|
||||
|
||||
use std::cell::Cell;
|
||||
use std::future::Future;
|
||||
use std::panic;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::{Arc, Mutex, RwLock};
|
||||
|
||||
use concurrent_queue::ConcurrentQueue;
|
||||
use once_cell::sync::Lazy;
|
||||
use scoped_tls_hkt::scoped_thread_local;
|
||||
use slab::Slab;
|
||||
|
||||
use crate::task::{Runnable, Task};
|
||||
|
||||
scoped_thread_local! {
|
||||
/// The current thread's worker.
|
||||
///
|
||||
/// Other threads may steal tasks from this worker through its associated stealer that was
|
||||
/// registered in the work-stealing executor.
|
||||
///
|
||||
/// This thread-local is only set while inside [`Worker::enter()`].
|
||||
static WORKER: for<'a> &'a Worker<'a>
|
||||
}
|
||||
|
||||
/// The global work-stealing executor.
|
||||
pub(crate) struct Executor {
|
||||
/// When a thread that is not inside [`run()`][`crate::run()`] spawns or wakes a task, it goes
|
||||
/// into this queue.
|
||||
injector: ConcurrentQueue<Runnable>,
|
||||
|
||||
/// Registered handles for stealing tasks from workers.
|
||||
stealers: RwLock<Slab<Arc<ConcurrentQueue<Runnable>>>>,
|
||||
|
||||
notified: AtomicBool,
|
||||
sleepers: Mutex<Sleepers>,
|
||||
}
|
||||
|
||||
struct Sleepers {
|
||||
count: usize,
|
||||
items: Vec<Arc<dyn Fn() + Send + Sync>>,
|
||||
}
|
||||
|
||||
impl Executor {
|
||||
/// Returns a reference to the global work-stealing executor.
|
||||
pub fn get() -> &'static Executor {
|
||||
static EXECUTOR: Lazy<Executor> = Lazy::new(|| Executor {
|
||||
injector: ConcurrentQueue::unbounded(),
|
||||
stealers: RwLock::new(Slab::new()),
|
||||
notified: AtomicBool::new(true),
|
||||
sleepers: Mutex::new(Sleepers {
|
||||
count: 0,
|
||||
items: Vec::new(),
|
||||
}),
|
||||
});
|
||||
&EXECUTOR
|
||||
}
|
||||
|
||||
fn notify(&self) {
|
||||
if !self
|
||||
.notified
|
||||
.compare_and_swap(false, true, Ordering::SeqCst)
|
||||
{
|
||||
let mut sleepers = self.sleepers.lock().unwrap();
|
||||
if sleepers.items.len() == sleepers.count {
|
||||
if let Some(callback) = sleepers.items.pop() {
|
||||
let callback = callback.clone();
|
||||
drop(sleepers);
|
||||
callback();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a future onto this executor.
|
||||
///
|
||||
/// Returns a [`Task`] handle for the spawned task.
|
||||
pub fn spawn<T: Send + 'static>(
|
||||
&'static self,
|
||||
future: impl Future<Output = T> + Send + 'static,
|
||||
) -> Task<T> {
|
||||
// The function that schedules a runnable task when it gets woken up.
|
||||
let schedule = move |runnable| {
|
||||
if WORKER.is_set() {
|
||||
// If scheduling from a worker thread, push into the worker's queue.
|
||||
WORKER.with(|w| w.push(runnable));
|
||||
} else {
|
||||
// If scheduling from a non-worker thread, push into the injector queue.
|
||||
self.injector.push(runnable).unwrap();
|
||||
|
||||
self.notify();
|
||||
}
|
||||
};
|
||||
|
||||
// Create a task, push it into the queue by scheduling it, and return its `Task` handle.
|
||||
let (runnable, handle) = async_task::spawn(future, schedule, ());
|
||||
runnable.schedule();
|
||||
Task(Some(handle))
|
||||
}
|
||||
|
||||
/// Registers a new worker.
|
||||
///
|
||||
/// The worker will automatically deregister itself when dropped.
|
||||
pub fn worker(&self, notify: impl Fn() + Send + Sync + 'static) -> Worker<'_> {
|
||||
let mut stealers = self.stealers.write().unwrap();
|
||||
let vacant = stealers.vacant_entry();
|
||||
|
||||
// Create a worker and put its stealer handle into the executor.
|
||||
let worker = Worker {
|
||||
key: vacant.key(),
|
||||
slot: Cell::new(None),
|
||||
queue: Arc::new(ConcurrentQueue::bounded(512)),
|
||||
executor: self,
|
||||
callback: Arc::new(notify),
|
||||
sleeping: Cell::new(false),
|
||||
ticks: Cell::new(0),
|
||||
};
|
||||
vacant.insert(worker.queue.clone());
|
||||
drop(stealers);
|
||||
|
||||
worker
|
||||
}
|
||||
}
|
||||
|
||||
/// A worker that participates in the work-stealing executor.
|
||||
///
|
||||
/// Each invocation of `run()` creates its own worker.
|
||||
pub(crate) struct Worker<'a> {
|
||||
/// The ID of this worker obtained during registration.
|
||||
key: usize,
|
||||
|
||||
/// A slot into which tasks go before entering the actual queue.
|
||||
///
|
||||
/// Note that other workers cannot steal this task.
|
||||
slot: Cell<Option<Runnable>>,
|
||||
|
||||
/// A queue of tasks.
|
||||
///
|
||||
/// Other workers are able to steal tasks from this queue.
|
||||
queue: Arc<ConcurrentQueue<Runnable>>,
|
||||
|
||||
/// The parent work-stealing executor.
|
||||
executor: &'a Executor,
|
||||
|
||||
callback: Arc<dyn Fn() + Send + Sync>,
|
||||
|
||||
sleeping: Cell<bool>,
|
||||
|
||||
ticks: Cell<usize>,
|
||||
}
|
||||
|
||||
impl Worker<'_> {
|
||||
/// Enters the context of this executor.
|
||||
fn enter<T>(&self, f: impl FnOnce() -> T) -> T {
|
||||
// TODO(stjepang): Allow recursive executors.
|
||||
if WORKER.is_set() {
|
||||
panic!("cannot run an executor inside another executor");
|
||||
}
|
||||
WORKER.set(self, f)
|
||||
}
|
||||
|
||||
fn sleep(&self) -> bool {
|
||||
let sleeping = self.sleeping.get();
|
||||
self.sleeping.set(true);
|
||||
|
||||
let mut sleepers = self.executor.sleepers.lock().unwrap();
|
||||
|
||||
if sleeping {
|
||||
if sleepers
|
||||
.items
|
||||
.iter()
|
||||
.all(|i| !Arc::ptr_eq(i, &self.callback))
|
||||
{
|
||||
sleepers.items.push(self.callback.clone());
|
||||
}
|
||||
} else {
|
||||
sleepers.count += 1;
|
||||
sleepers.items.push(self.callback.clone());
|
||||
}
|
||||
|
||||
if sleepers.count == 0 || sleepers.count > sleepers.items.len() {
|
||||
self.executor.notified.swap(true, Ordering::SeqCst);
|
||||
} else {
|
||||
self.executor.notified.swap(false, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
!sleeping
|
||||
}
|
||||
|
||||
fn wake(&self) -> bool {
|
||||
if !self.sleeping.get() {
|
||||
return false;
|
||||
}
|
||||
|
||||
self.sleeping.set(false);
|
||||
|
||||
let mut sleepers = self.executor.sleepers.lock().unwrap();
|
||||
sleepers.count -= 1;
|
||||
sleepers.items.retain(|i| !Arc::ptr_eq(i, &self.callback)); // TODO: optimize
|
||||
|
||||
if sleepers.count == 0 || sleepers.count > sleepers.items.len() {
|
||||
self.executor.notified.swap(true, Ordering::SeqCst);
|
||||
} else {
|
||||
self.executor.notified.swap(false, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
/// Executes a batch of tasks and returns `true` if there may be more tasks to run.
|
||||
pub fn tick(&self) -> bool {
|
||||
loop {
|
||||
match self.search() {
|
||||
None => {
|
||||
if !self.sleep() {
|
||||
// self.ticks.set(self.ticks.get().wrapping_add(1));
|
||||
return false;
|
||||
}
|
||||
}
|
||||
Some(r) => {
|
||||
if !self.wake() {
|
||||
self.executor.notify();
|
||||
}
|
||||
|
||||
// Run the task.
|
||||
if self.enter(|| r.run()) {
|
||||
// The task was woken while it was running, which means it got
|
||||
// scheduled the moment running completed. Therefore, it is now inside
|
||||
// the slot and would be the next task to run.
|
||||
//
|
||||
// Instead of re-running the task in the next iteration, let's flush
|
||||
// the slot in order to give other tasks a chance to run.
|
||||
//
|
||||
// This is a necessary step to ensure task yielding works as expected.
|
||||
// If a task wakes itself and returns `Poll::Pending`, we don't want it
|
||||
// to run immediately after that because that'd defeat the whole
|
||||
// purpose of yielding.
|
||||
self.flush_slot();
|
||||
}
|
||||
|
||||
let ticks = self.ticks.get();
|
||||
self.ticks.set(ticks.wrapping_add(1));
|
||||
if ticks % 16 == 0 {
|
||||
self.flush_slot();
|
||||
}
|
||||
if ticks % 64 == 0 {
|
||||
if let Some(r) = self.steal_global() {
|
||||
self.push(r);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Pushes a task into this worker.
|
||||
fn push(&self, runnable: Runnable) {
|
||||
// Put the task into the slot.
|
||||
if let Some(r) = self.slot.replace(Some(runnable)) {
|
||||
// If the slot had a task, push it into the queue.
|
||||
if let Err(err) = self.queue.push(r) {
|
||||
self.executor.injector.push(err.into_inner()).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Moves a task from the slot into the local queue.
|
||||
fn flush_slot(&self) {
|
||||
if let Some(r) = self.slot.take() {
|
||||
if let Err(err) = self.queue.push(r) {
|
||||
self.executor.injector.push(err.into_inner()).unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Finds the next task to run.
|
||||
fn search(&self) -> Option<Runnable> {
|
||||
// Check if there is a task in the slot.
|
||||
if let Some(r) = self.slot.take() {
|
||||
return Some(r);
|
||||
}
|
||||
|
||||
// Check if there is a task in the queue.
|
||||
if let Ok(r) = self.queue.pop() {
|
||||
return Some(r);
|
||||
}
|
||||
|
||||
// Try stealing from the injector queue.
|
||||
if let Some(r) = self.steal_global() {
|
||||
return Some(r);
|
||||
}
|
||||
|
||||
// Try stealing from other workers.
|
||||
let stealers = self.executor.stealers.read().unwrap();
|
||||
|
||||
// Pick a random starting point in the iterator list and rotate the list.
|
||||
let n = stealers.len();
|
||||
let start = fastrand::usize(..n);
|
||||
let iter = stealers.iter().chain(stealers.iter()).skip(start).take(n);
|
||||
|
||||
// Remove this worker's stealer handle.
|
||||
let iter = iter.filter(|(k, _)| *k != self.key);
|
||||
let iter = iter.map(|(_, q)| q);
|
||||
|
||||
// Try stealing from each worker in the list.
|
||||
for q in iter {
|
||||
let count = self
|
||||
.queue
|
||||
.capacity()
|
||||
.unwrap_or(usize::MAX)
|
||||
.min((q.len() + 1) / 2);
|
||||
|
||||
// Steal half of the tasks from this worker.
|
||||
for _ in 0..count {
|
||||
if let Ok(r) = q.pop() {
|
||||
self.push(r);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if there is a task in the slot.
|
||||
if let Some(r) = self.slot.take() {
|
||||
return Some(r);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Steals tasks from the injector queue.
|
||||
fn steal_global(&self) -> Option<Runnable> {
|
||||
let count = self
|
||||
.queue
|
||||
.capacity()
|
||||
.unwrap_or(usize::MAX)
|
||||
.min((self.executor.injector.len() + 1) / 2);
|
||||
|
||||
// Steal half of the tasks from the injector queue.
|
||||
for _ in 0..count {
|
||||
if let Ok(r) = self.executor.injector.pop() {
|
||||
self.push(r);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// If anything was stolen, a task must be in the slot.
|
||||
self.slot.take()
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Worker<'_> {
|
||||
fn drop(&mut self) {
|
||||
// Unregister the worker.
|
||||
self.executor.stealers.write().unwrap().remove(self.key);
|
||||
|
||||
// Move the task in the slot into the injector queue.
|
||||
if let Some(r) = self.slot.take() {
|
||||
r.schedule();
|
||||
}
|
||||
|
||||
// Move all tasks in this worker's queue into the injector queue.
|
||||
while let Ok(r) = self.queue.pop() {
|
||||
r.schedule();
|
||||
}
|
||||
|
||||
self.wake();
|
||||
|
||||
// This task will not search for tasks anymore and therefore won't notify other workers if
|
||||
// new tasks are found. Notify another worker to start searching right away.
|
||||
self.executor.notify();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue