Fix lost wakeups in the executor

This commit is contained in:
Stjepan Glavina 2020-06-20 15:54:22 +02:00
parent ab9c9d9e4e
commit f40acf640c
1 changed files with 36 additions and 22 deletions

View File

@ -46,10 +46,12 @@ impl Global {
/// A list of sleeping workers.
struct Sleepers {
/// Number of sleeping workers.
/// Number of sleeping workers (both notified and unnotified).
count: usize,
/// Callbacks of sleeping unnotified workers.
///
/// A sleeping worker is notified when its callback is missing from this list.
callbacks: Vec<Callback>,
}
@ -60,14 +62,19 @@ impl Sleepers {
self.callbacks.push(callback.clone());
}
/// Updates the callback of an already inserted worker.
fn update(&mut self, callback: &Callback) {
/// Re-inserts a sleeping worker's callback if it was notified.
///
/// Returns `true` if the worker was notified.
fn update(&mut self, callback: &Callback) -> bool {
if self.callbacks.iter().all(|cb| cb != callback) {
self.callbacks.push(callback.clone());
true
} else {
false
}
}
/// Removes a previously inserted worker.
/// Removes a previously inserted sleeping worker.
fn remove(&mut self, callback: &Callback) {
self.count -= 1;
for i in (0..self.callbacks.len()).rev() {
@ -205,6 +212,11 @@ pub(crate) struct Worker {
callback: Callback,
/// Set to `true` when in sleeping state.
///
/// States a worker can be in:
/// 1) Woken.
/// 2a) Sleeping and unnotified.
/// 2b) Sleeping and notified.
sleeping: Cell<bool>,
/// Bumped every time a task is run.
@ -243,24 +255,28 @@ impl Worker {
Task(Some(handle))
}
/// Moves the worker into sleeping state.
/// Moves the worker into sleeping and unnotified state.
///
/// Returns `true` if the worker was already sleeping and unnotified.
fn sleep(&self) -> bool {
let mut sleepers = self.global.sleepers.lock().unwrap();
if self.sleeping.get() {
sleepers.update(&self.callback);
self.global
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
false
// Already sleeping, check if notified.
if !sleepers.update(&self.callback) {
return false;
}
} else {
// Move to sleeping state.
sleepers.insert(&self.callback);
self.global
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
self.sleeping.set(true);
true
}
self.global
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
self.sleeping.set(true);
true
}
/// Moves the worker into woken state.
@ -268,14 +284,13 @@ impl Worker {
if self.sleeping.get() {
let mut sleepers = self.global.sleepers.lock().unwrap();
sleepers.remove(&self.callback);
self.global
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
self.sleeping.set(false);
true
} else {
false
}
self.sleeping.replace(false)
}
/// Runs a single task and returns `true` if one was found.
@ -283,10 +298,9 @@ impl Worker {
loop {
match self.search() {
None => {
// Go to sleep and then:
// - If already in sleeping state, return.
// - Otherwise, search again.
// Move to sleeping and unnotified state.
if !self.sleep() {
// If already sleeping and unnotified, return.
return false;
}
}