From f40acf640c16733db0fae8c725f3b19b3afc75fa Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sat, 20 Jun 2020 15:54:22 +0200 Subject: [PATCH] Fix lost wakeups in the executor --- src/multitask.rs | 58 ++++++++++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/src/multitask.rs b/src/multitask.rs index 1d1ec9e..a8b97b8 100644 --- a/src/multitask.rs +++ b/src/multitask.rs @@ -46,10 +46,12 @@ impl Global { /// A list of sleeping workers. struct Sleepers { - /// Number of sleeping workers. + /// Number of sleeping workers (both notified and unnotified). count: usize, /// Callbacks of sleeping unnotified workers. + /// + /// A sleeping worker is notified when its callback is missing from this list. callbacks: Vec, } @@ -60,14 +62,19 @@ impl Sleepers { self.callbacks.push(callback.clone()); } - /// Updates the callback of an already inserted worker. - fn update(&mut self, callback: &Callback) { + /// Re-inserts a sleeping worker's callback if it was notified. + /// + /// Returns `true` if the worker was notified. + fn update(&mut self, callback: &Callback) -> bool { if self.callbacks.iter().all(|cb| cb != callback) { self.callbacks.push(callback.clone()); + true + } else { + false } } - /// Removes a previously inserted worker. + /// Removes a previously inserted sleeping worker. fn remove(&mut self, callback: &Callback) { self.count -= 1; for i in (0..self.callbacks.len()).rev() { @@ -205,6 +212,11 @@ pub(crate) struct Worker { callback: Callback, /// Set to `true` when in sleeping state. + /// + /// States a worker can be in: + /// 1) Woken. + /// 2a) Sleeping and unnotified. + /// 2b) Sleeping and notified. sleeping: Cell, /// Bumped every time a task is run. @@ -243,24 +255,28 @@ impl Worker { Task(Some(handle)) } - /// Moves the worker into sleeping state. + /// Moves the worker into sleeping and unnotified state. + /// + /// Returns `true` if the worker was already sleeping and unnotified. fn sleep(&self) -> bool { let mut sleepers = self.global.sleepers.lock().unwrap(); if self.sleeping.get() { - sleepers.update(&self.callback); - self.global - .notified - .swap(sleepers.is_notified(), Ordering::SeqCst); - false + // Already sleeping, check if notified. + if !sleepers.update(&self.callback) { + return false; + } } else { + // Move to sleeping state. sleepers.insert(&self.callback); - self.global - .notified - .swap(sleepers.is_notified(), Ordering::SeqCst); - self.sleeping.set(true); - true } + + self.global + .notified + .swap(sleepers.is_notified(), Ordering::SeqCst); + + self.sleeping.set(true); + true } /// Moves the worker into woken state. @@ -268,14 +284,13 @@ impl Worker { if self.sleeping.get() { let mut sleepers = self.global.sleepers.lock().unwrap(); sleepers.remove(&self.callback); + self.global .notified .swap(sleepers.is_notified(), Ordering::SeqCst); - self.sleeping.set(false); - true - } else { - false } + + self.sleeping.replace(false) } /// Runs a single task and returns `true` if one was found. @@ -283,10 +298,9 @@ impl Worker { loop { match self.search() { None => { - // Go to sleep and then: - // - If already in sleeping state, return. - // - Otherwise, search again. + // Move to sleeping and unnotified state. if !self.sleep() { + // If already sleeping and unnotified, return. return false; } }