m: Replace unnecessary atomics with non-atomic operations

This commit is contained in:
James Liu 2024-02-16 17:22:43 -08:00 committed by GitHub
parent 0baba46152
commit 7ffdf5ba92
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 22 additions and 26 deletions

View File

@ -38,7 +38,7 @@ use std::future::Future;
use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex, RwLock, TryLockError};
use std::task::{Poll, Waker};
@ -243,7 +243,7 @@ impl<'a> Executor<'a> {
/// assert_eq!(res, 6);
/// ```
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
let runner = Runner::new(self.state());
let mut runner = Runner::new(self.state());
let mut rng = fastrand::Rng::new();
// A future that runs tasks forever.
@ -639,29 +639,26 @@ struct Ticker<'a> {
/// 1) Woken.
/// 2a) Sleeping and unnotified.
/// 2b) Sleeping and notified.
sleeping: AtomicUsize,
sleeping: usize,
}
impl Ticker<'_> {
/// Creates a ticker.
fn new(state: &State) -> Ticker<'_> {
Ticker {
state,
sleeping: AtomicUsize::new(0),
}
Ticker { state, sleeping: 0 }
}
/// Moves the ticker into sleeping and unnotified state.
///
/// Returns `false` if the ticker was already sleeping and unnotified.
fn sleep(&self, waker: &Waker) -> bool {
fn sleep(&mut self, waker: &Waker) -> bool {
let mut sleepers = self.state.sleepers.lock().unwrap();
match self.sleeping.load(Ordering::SeqCst) {
match self.sleeping {
// Move to sleeping state.
0 => self
.sleeping
.store(sleepers.insert(waker), Ordering::SeqCst),
0 => {
self.sleeping = sleepers.insert(waker);
}
// Already sleeping, check if notified.
id => {
@ -679,25 +676,25 @@ impl Ticker<'_> {
}
/// Moves the ticker into woken state.
fn wake(&self) {
let id = self.sleeping.swap(0, Ordering::SeqCst);
if id != 0 {
fn wake(&mut self) {
if self.sleeping != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
sleepers.remove(id);
sleepers.remove(self.sleeping);
self.state
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
}
self.sleeping = 0;
}
/// Waits for the next runnable task to run.
async fn runnable(&self) -> Runnable {
async fn runnable(&mut self) -> Runnable {
self.runnable_with(|| self.state.queue.pop().ok()).await
}
/// Waits for the next runnable task to run, given a function that searches for a task.
async fn runnable_with(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
future::poll_fn(|cx| {
loop {
match search() {
@ -728,10 +725,9 @@ impl Ticker<'_> {
impl Drop for Ticker<'_> {
fn drop(&mut self) {
// If this ticker is in sleeping state, it must be removed from the sleepers list.
let id = self.sleeping.swap(0, Ordering::SeqCst);
if id != 0 {
if self.sleeping != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap();
let notified = sleepers.remove(id);
let notified = sleepers.remove(self.sleeping);
self.state
.notified
@ -760,7 +756,7 @@ struct Runner<'a> {
local: Arc<ConcurrentQueue<Runnable>>,
/// Bumped every time a runnable task is found.
ticks: AtomicUsize,
ticks: usize,
}
impl Runner<'_> {
@ -770,7 +766,7 @@ impl Runner<'_> {
state,
ticker: Ticker::new(state),
local: Arc::new(ConcurrentQueue::bounded(512)),
ticks: AtomicUsize::new(0),
ticks: 0,
};
state
.local_queues
@ -781,7 +777,7 @@ impl Runner<'_> {
}
/// Waits for the next runnable task to run.
async fn runnable(&self, rng: &mut fastrand::Rng) -> Runnable {
async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
let runnable = self
.ticker
.runnable_with(|| {
@ -824,9 +820,9 @@ impl Runner<'_> {
.await;
// Bump the tick counter.
let ticks = self.ticks.fetch_add(1, Ordering::SeqCst);
self.ticks += 1;
if ticks % 64 == 0 {
if self.ticks % 64 == 0 {
// Steal tasks from the global queue to ensure fair task scheduling.
steal(&self.state.queue, &self.local);
}