Fix bugs in timer

This commit is contained in:
Stjepan Glavina 2020-04-20 22:41:52 +02:00
parent 0a71b39aec
commit 53dee1c23e
6 changed files with 88 additions and 72 deletions

View File

@ -55,7 +55,7 @@ struct SelfPipe {
impl SelfPipe {
/// Creates a self-pipe.
fn new() -> io::Result<SelfPipe> {
let (writer, reader) = pipe()?;
let (writer, reader) = socket_pair()?;
writer.set_send_buffer_size(1)?;
reader.set_recv_buffer_size(1)?;
Ok(SelfPipe {
@ -109,7 +109,7 @@ impl SelfPipe {
/// TODO
#[cfg(unix)]
fn pipe() -> io::Result<(Socket, Socket)> {
fn socket_pair() -> io::Result<(Socket, Socket)> {
let (sock1, sock2) = Socket::pair(Domain::unix(), Type::stream(), None)?;
sock1.set_nonblocking(true)?;
sock2.set_nonblocking(true)?;
@ -117,16 +117,15 @@ fn pipe() -> io::Result<(Socket, Socket)> {
}
/// TODO
/// TODO The only portable way of manually triggering I/O events is to create a socket and
/// send/receive dummy data on it. This pattern is also known as "the self-pipe trick".
/// See the links below for more information.
///
/// https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py
/// https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows
/// https://gist.github.com/geertj/4325783
#[cfg(windows)]
fn pipe() -> io::Result<(Socket, Socket)> {
// TODO The only portable way of manually triggering I/O events is to create a socket and
// send/receive dummy data on it. This pattern is also known as "the self-pipe trick".
// See the links below for more information.
//
// https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py
// https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows
// https://gist.github.com/geertj/4325783
fn socket_pair() -> io::Result<(Socket, Socket)> {
// Create a temporary listener.
let listener = Socket::new(Domain::ipv4(), Type::stream(), None)?;
listener.bind(&SocketAddr::from(([127, 0, 0, 1], 0)).into())?;

View File

@ -80,6 +80,9 @@
//! Finally, you can mix this runtime with [async-std][compat-async-std] and [tokio][compat-tokio]
//! to use runtime-dependent libraries like [surf][compat-surf] and [reqwest][compat-reqwest].
//!
//! TODO: See [here][compat] for an example of using this runtime with libraries like async-std, tokio,
//! reqwest, and surf.
//!
//! [epoll]: https://en.wikipedia.org/wiki/Epoll
//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
//! [wepoll]: https://github.com/piscisaureus/wepoll

View File

@ -129,20 +129,19 @@ impl Reactor {
pub fn insert_timer(&self, when: Instant, waker: Waker) -> u64 {
let mut timers = self.timers.lock();
// If this timer is going to be the earliest one, interrupt the reactor.
if let Some((first, _)) = timers.keys().next() {
if when < *first {
self.event.set();
}
}
// Generate a new ID.
// Generate a new timer ID.
static ID_GENERATOR: AtomicU64 = AtomicU64::new(1);
let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
assert!(id < u64::max_value() / 2, "exhausted timer IDs");
// Insert this timer into the timers map.
timers.insert((when, id), waker);
// If this timer is now the earliest one, interrupt the reactor.
if timers.keys().next().map(|(when, _)| *when) == Some(when) {
self.event.set();
}
id
}
@ -189,26 +188,27 @@ impl ReactorLock<'_> {
// TODO: document this function.......................
self.reactor.event.clear();
let next_timer = {
let timeout = {
// Split timers into ready and pending timers.
let mut timers = self.reactor.timers.lock();
let pending = timers.split_off(&(Instant::now(), 0));
let now = Instant::now();
let pending = timers.split_off(&(now, 0));
let ready = mem::replace(&mut *timers, pending);
let timeout = if ready.is_empty() && block {
// Calculate the timeout till the first timer fires.
timers.keys().next().map(|(when, _)| when.saturating_duration_since(now))
} else {
// If there are ready timers or this poll doesn't block, the timeout is zero.
Some(Duration::from_secs(0))
};
// Wake up tasks waiting on timers.
for (_, waker) in ready {
waker.wake();
}
// Find when the next timer fires.
timers.keys().next().map(|(when, _)| *when)
};
let timeout = if block {
// Calculate the timeout till the first timer fires.
next_timer.map(|when| when.saturating_duration_since(Instant::now()))
} else {
// If this poll doesn't block, the timeout is zero.
Some(Duration::from_secs(0))
timeout
};
// Block on I/O events.

View File

@ -33,6 +33,7 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
let local = ThreadLocalExecutor::new();
let ws_executor = WorkStealingExecutor::get();
let worker = ws_executor.worker();
let reactor = Reactor::get();
// Create a waker that triggers an I/O event in the thread-local scheduler.
let ev = local.event().clone();
@ -52,17 +53,17 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
// 1. `future` - the main future.
// 2. `local - the thread-local executor.
// 3. `worker` - the work-stealing executor.
// 4. `Reactor::get()` - the reactor.
// 4. `reactor` - the reactor.
//
// When all four components are out of work, we block the current thread on
// epoll/kevent/wepoll. If new work comes in that isn't naturally triggered by an I/O event
// registered with `Async` handles, we use `IoEvent`s to simulate an I/O event that will
// unblock the thread:
//
// - When the main future is woken, `local.event` is triggered.
// - When thread-local executor gets new work, `local.event` is triggered.
// - When work-stealing executor gets new work, `ws_executor.event` is triggered.
// - When a new earliest timer is registered, `Reactor::get().event` is triggered.
// - When the main future is woken, `local.event()` is triggered.
// - When thread-local executor gets new work, `local.event()` is triggered.
// - When work-stealing executor gets new work, `ws_executor.event()` is triggered.
// - When a new earliest timer is registered, `reactor.event()` is triggered.
//
// This way we make sure that if any changes happen that might give us new work will
// unblock epoll/kevent/wepoll and let us continue the loop.
@ -76,7 +77,7 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
// 3. Run a batch of tasks in the work-stealing executor.
let more_worker = worker.execute();
// 4. Poll the reactor.
Reactor::get().poll().expect("failure while polling I/O");
reactor.poll().expect("failure while polling I/O");
// If there is more work in the thread-local or the work-stealing executor, continue
// the loop.
@ -84,22 +85,22 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
continue;
}
// Prepare for blocking until the reactor is locked or `local.event` is triggered.
// Prepare for blocking until the reactor is locked or `local.event()` is triggered.
//
// Note that there is no need to wait for `executor.event`. If the reactor is locked
// immediately, we'll check for the I/O event right after that anyway.
// Note that there is no need to wait for `ws_executor.event()`. If the reactor is
// locked immediately, we'll check for the I/O event right after that anyway.
//
// If some other worker is holding the reactor locked, it will be unblocked as soon as
// the I/O event is triggered. Then, another worker will be allowed to lock the
// reactor, and will be unblocked if there is more work to do. Every worker triggers
// `worker.executor.event` each time it finds a runnable task.
let lock = Reactor::get().lock();
// `ws_executor.event()` each time it finds a runnable task.
let lock = reactor.lock();
let ready = local.event().ready();
futures::pin_mut!(lock);
futures::pin_mut!(ready);
// Block until either the reactor is locked or `local.event` is triggered.
if let Either::Left((mut reactor, _)) = block_on(future::select(lock, ready)) {
// Block until either the reactor is locked or `local.event()` is triggered.
if let Either::Left((mut reactor_lock, _)) = block_on(future::select(lock, ready)) {
// Clear the two I/O events.
let local_ev = local.event().clear();
let ws_ev = ws_executor.event().clear();
@ -110,7 +111,7 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
}
// Block until an I/O event occurs.
reactor.wait().expect("failure while waiting on I/O");
reactor_lock.wait().expect("failure while waiting on I/O");
}
}
})

View File

@ -39,7 +39,7 @@ impl Timer {
impl Drop for Timer {
fn drop(&mut self) {
if let Some(id) = self.id {
if let Some(id) = self.id.take() {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
}
@ -52,7 +52,7 @@ impl Future for Timer {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Check if the timer has already fired.
if Instant::now() >= self.when {
if let Some(id) = self.id {
if let Some(id) = self.id.take() {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
}

View File

@ -1,4 +1,25 @@
//! The work-stealing executor.
//!
//! Tasks created by [`Task::spawn()`] go into this executor. Any thread calling [`run()`]
//! initializes a `Worker` that participates in work stealing, which is allowed to run any task in
//! this executor or in other workers.
//!
//! Since tasks can be stolen by any worker and thus move from thread to thread, their futures must
//! implement [`Send`].
//!
//! There is only one global instance of this type, accessible by [`WorkStealingExecutor::get()`].
//!
//! Work stealing is a strategy that reduces contention in a multi-threaded environment. If all
//! invocations of [`run()`] used the same global task queue all the time, they would constantly
//! "step on each other's toes", causing a lot of CPU cache traffic and too often waste time
//! retrying queue operations in compare-and-swap loops.
//!
//! The solution is to have a separate queue in each invocation of [`run()`], called a "worker".
//! Each thread is primarily using its own worker. Once there are no more tasks in the worker, we
//! either grab a batch of tasks from the main global queue, or steal tasks from other workers.
//! Of course, work-stealing still causes contention in some cases, but much less often.
//!
//! More about work stealing: https://en.wikipedia.org/wiki/Work_stealing
use std::cell::Cell;
use std::future::Future;
@ -25,27 +46,6 @@ use crate::throttle;
scoped_thread_local!(static WORKER: for<'a> &'a Worker<'a>);
/// The global work-stealing executor.
///
/// Tasks created by `Task::spawn()` go into this executor. Any calling `run()` initializes a
/// `Worker` that participates in work stealing, which is allowed to run any task in this executor
/// or in other workers.
///
/// Since tasks can be stolen by any worker and thus move from thread to thread, their futures must
/// implement `Send`.
///
/// There is only one global instance of this type, accessible by `WorkStealingExecutor::get()`.
///
/// Work stealing is a strategy that reduces contention in a multi-threaded environment. If all
/// invocations of `run()` used the same global task queue all the time, they would constantly
/// "step on each other's toes", causing a lot of CPU cache traffic and too often waste time
/// retrying queue operations in compare-and-swap loops.
///
/// The solution is to have a separate queue in each invocation of `run()`, called a "worker".
/// Each thread is primarily using its own worker. Once there are no more tasks in the worker, we
/// either grab a batch of tasks from the main global queue, or steal tasks from other workers.
/// Of course, work-stealing still causes contention in some cases, but much less often.
///
/// More about work stealing: https://en.wikipedia.org/wiki/Work_stealing
pub(crate) struct WorkStealingExecutor {
/// When a thread that is not inside `run()` spawns or wakes a task, it goes into this queue.
injector: deque::Injector<Runnable>,
@ -88,7 +88,8 @@ impl WorkStealingExecutor {
} else {
// If scheduling from a non-worker thread, push into the injector queue.
self.injector.push(runnable);
// Trigger an I/O event to let workers know that a task has been scheduled.
// Notify workers that there is a task in the injector queue.
self.event.set();
}
};
@ -159,7 +160,11 @@ impl Worker<'_> {
return false;
}
Some(r) => {
// Notify other workers that there may be more tasks.
// Notify other workers that there may be stealable tasks.
//
// This is necessary because `pop()` sometimes re-shuffles tasks between
// queues, which races with other workers looking for tasks. They might
// believe there are no tasks while there really are, so we notify here.
self.executor.event.set();
// Run the task.
@ -194,6 +199,9 @@ impl Worker<'_> {
if let Some(r) = self.slot.replace(runnable.into()) {
// If the slot had a task, push it into the queue.
self.queue.push(r);
// Notify other workers that there are stealable tasks.
self.executor.event.set();
}
}
@ -203,8 +211,10 @@ impl Worker<'_> {
if let Some(r) = self.slot.take().or_else(|| self.queue.pop()) {
return Some(r);
}
// If not, fetch more tasks from the injector queue, the reactor, or other workers.
self.fetch();
// Check the slot and the queue again.
self.slot.take().or_else(|| self.queue.pop())
}
@ -225,6 +235,7 @@ impl Worker<'_> {
self.slot.set(Some(r));
return;
}
// If there is at least one task in the queue, return.
if !self.queue.is_empty() {
return;
@ -259,12 +270,14 @@ impl Drop for Worker<'_> {
// Unregister the worker.
self.executor.stealers.write().unwrap().remove(self.key);
// Flush the slot.
self.push(None);
// Move the task in the slot into the injector queue.
if let Some(r) = self.slot.take() {
r.schedule();
}
// Move all tasks in this worker's queue into the injector queue.
while let Some(r) = self.queue.pop() {
self.executor.injector.push(r);
r.schedule();
}
}
}