Use atomics to make run() and tick() futures Send + Sync

This commit is contained in:
Stjepan Glavina 2020-09-10 23:27:56 +02:00
parent 05456efbee
commit 5e08a9a351
1 changed files with 15 additions and 15 deletions

View File

@ -21,13 +21,12 @@
#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::cell::Cell;
use std::future::Future;
use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::atomic::{AtomicBool, Ordering, AtomicU64, AtomicUsize};
use std::sync::{Arc, Mutex, RwLock};
use std::task::{Context, Poll, Waker};
@ -200,7 +199,7 @@ impl State {
sleepers: Mutex::new(Sleepers {
count: 0,
wakers: Vec::new(),
id_gen: 0,
id_gen: 1,
}),
}
}
@ -489,13 +488,13 @@ struct Ticker<'a> {
/// The executor state.
state: &'a State,
/// Set to `true` when in sleeping state.
/// Set to a non-zero sleeper ID when in sleeping state.
///
/// States a ticker can be in:
/// 1) Woken.
/// 2a) Sleeping and unnotified.
/// 2b) Sleeping and notified.
sleeping: Cell<Option<u64>>,
sleeping: AtomicU64,
}
impl Ticker<'_> {
@ -503,7 +502,7 @@ impl Ticker<'_> {
fn new(state: &State) -> Ticker<'_> {
Ticker {
state,
sleeping: Cell::new(None),
sleeping: AtomicU64::new(0),
}
}
@ -513,12 +512,12 @@ impl Ticker<'_> {
fn sleep(&self, waker: &Waker) -> bool {
let mut sleepers = self.state.sleepers.lock().unwrap();
match self.sleeping.get() {
match self.sleeping.load(Ordering::SeqCst) {
// Move to sleeping state.
None => self.sleeping.set(Some(sleepers.insert(waker))),
0 => self.sleeping.store(sleepers.insert(waker), Ordering::SeqCst),
// Already sleeping, check if notified.
Some(id) => {
id => {
if !sleepers.update(id, waker) {
return false;
}
@ -534,7 +533,8 @@ impl Ticker<'_> {
/// Moves the ticker into woken state.
fn wake(&self) {
if let Some(id) = self.sleeping.take() {
let id = self.sleeping.swap(0, Ordering::SeqCst);
if id != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
sleepers.remove(id);
@ -581,7 +581,8 @@ impl Ticker<'_> {
impl Drop for Ticker<'_> {
fn drop(&mut self) {
// If this ticker is in sleeping state, it must be removed from the sleepers list.
if let Some(id) = self.sleeping.take() {
let id = self.sleeping.swap(0, Ordering::SeqCst);
if id != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
let notified = sleepers.remove(id);
@ -613,7 +614,7 @@ struct Runner<'a> {
local: Arc<ConcurrentQueue<Runnable>>,
/// Bumped every time a runnable task is found.
ticks: Cell<usize>,
ticks: AtomicUsize,
}
impl Runner<'_> {
@ -623,7 +624,7 @@ impl Runner<'_> {
state,
ticker: Ticker::new(state),
local: Arc::new(ConcurrentQueue::bounded(512)),
ticks: Cell::new(0),
ticks: AtomicUsize::new(0),
};
state
.local_queues
@ -677,8 +678,7 @@ impl Runner<'_> {
.await;
// Bump the tick counter.
let ticks = self.ticks.get();
self.ticks.set(ticks.wrapping_add(1));
let ticks = self.ticks.fetch_add(1, Ordering::SeqCst);
if ticks % 64 == 0 {
// Steal tasks from the global queue to ensure fair task scheduling.