From 2da645e6e0fe7ed643f0652a42ec1d4678e562e7 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sat, 29 Aug 2020 19:57:21 +0200 Subject: [PATCH] Refactor --- src/lib.rs | 95 +++++++++++++++++++++++++++++++----------------------- 1 file changed, 54 insertions(+), 41 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ff2212f..7d801a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -180,8 +180,8 @@ struct State { /// The global queue. queue: ConcurrentQueue, - /// Shards of the global queue created by tickers. - shards: RwLock>>>, + /// Local queues created by runners. + local_queues: RwLock>>>, /// Set to `true` when a sleeping ticker is notified or no tickers are sleeping. notified: AtomicBool, @@ -195,7 +195,7 @@ impl State { fn new() -> State { State { queue: ConcurrentQueue::unbounded(), - shards: RwLock::new(Vec::new()), + local_queues: RwLock::new(Vec::new()), notified: AtomicBool::new(true), sleepers: Mutex::new(Sleepers { count: 0, @@ -417,7 +417,7 @@ impl Executor { /// ``` pub async fn tick(&self) { let state = self.state(); - let runnable = Ticker::new(state).runnable(|| state.queue.pop().ok()).await; + let runnable = Ticker::new(state).runnable().await; runnable.run(); } @@ -458,7 +458,7 @@ impl Executor { fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { let state = self.state().clone(); - // TODO(stjepang): If possible, push into the current shard and notify the ticker. + // TODO(stjepang): If possible, push into the current local queue and notify the ticker. move |runnable| { state.queue.push(runnable).unwrap(); state.notify(); @@ -483,9 +483,7 @@ impl Default for Executor { } } -/// Executes tasks in a work-stealing executor. -/// -/// A ticker represents a "worker" in a work-stealing executor. +/// Runs task one by one. #[derive(Debug)] struct Ticker<'a> { /// The executor state. @@ -501,7 +499,7 @@ struct Ticker<'a> { } impl Ticker<'_> { - /// Creates a ticker and registers it in the executor state. + /// Creates a ticker. fn new(state: &State) -> Ticker<'_> { Ticker { state, @@ -546,8 +544,13 @@ impl Ticker<'_> { } } - /// Finds a task to run. - async fn runnable(&self, mut search: impl FnMut() -> Option) -> Runnable { + /// Waits for the next runnable task to run. + async fn runnable(&self) -> Runnable { + self.runnable_with(|| self.state.queue.pop().ok()).await + } + + /// Waits for the next runnable task to run, given a function that searches for a task. + async fn runnable_with(&self, mut search: impl FnMut() -> Option) -> Runnable { future::poll_fn(|cx| { loop { match search() { @@ -595,19 +598,21 @@ impl Drop for Ticker<'_> { } } -/// Executes tasks in a work-stealing executor. +/// A worker in a work-stealing executor. /// -/// A ticker represents a "worker" in a work-stealing executor. +/// This is just a ticker that also has an associated local queue for improved cache locality. #[derive(Debug)] struct Runner<'a> { + /// The executor state. state: &'a State, + /// Inner ticker. ticker: Ticker<'a>, - /// A shard of the global queue. - shard: Arc>, + /// The local queue. + local: Arc>, - /// Bumped every time a task is run. + /// Bumped every time a runnable task is found. ticks: Cell, } @@ -617,44 +622,52 @@ impl Runner<'_> { let runner = Runner { state, ticker: Ticker::new(state), - shard: Arc::new(ConcurrentQueue::bounded(512)), + local: Arc::new(ConcurrentQueue::bounded(512)), ticks: Cell::new(0), }; - state.shards.write().unwrap().push(runner.shard.clone()); + state + .local_queues + .write() + .unwrap() + .push(runner.local.clone()); runner } - /// Finds a task to run. + /// Waits for the next runnable task to run. async fn runnable(&self) -> Runnable { let runnable = self .ticker - .runnable(|| { - // Try the shard. - if let Ok(r) = self.shard.pop() { + .runnable_with(|| { + // Try the local queue. + if let Ok(r) = self.local.pop() { return Some(r); } // Try stealing from the global queue. if let Ok(r) = self.state.queue.pop() { - steal(&self.state.queue, &self.shard); + steal(&self.state.queue, &self.local); return Some(r); } - // Try stealing from other shards. - let shards = self.state.shards.read().unwrap(); + // Try stealing from other runners. + let local_queues = self.state.local_queues.read().unwrap(); // Pick a random starting point in the iterator list and rotate the list. - let n = shards.len(); + let n = local_queues.len(); let start = fastrand::usize(..n); - let iter = shards.iter().chain(shards.iter()).skip(start).take(n); + let iter = local_queues + .iter() + .chain(local_queues.iter()) + .skip(start) + .take(n); - // Remove this ticker's shard. - let iter = iter.filter(|shard| !Arc::ptr_eq(shard, &self.shard)); + // Remove this runner's local queue. + let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local)); - // Try stealing from each shard in the list. - for shard in iter { - steal(shard, &self.shard); - if let Ok(r) = self.shard.pop() { + // Try stealing from each local queue in the list. + for local in iter { + steal(local, &self.local); + if let Ok(r) = self.local.pop() { return Some(r); } } @@ -663,13 +676,13 @@ impl Runner<'_> { }) .await; - // Bump the ticker. + // Bump the tick counter. let ticks = self.ticks.get(); self.ticks.set(ticks.wrapping_add(1)); - // Steal tasks from the global queue to ensure fair task scheduling. if ticks % 64 == 0 { - steal(&self.state.queue, &self.shard); + // Steal tasks from the global queue to ensure fair task scheduling. + steal(&self.state.queue, &self.local); } runnable @@ -678,15 +691,15 @@ impl Runner<'_> { impl Drop for Runner<'_> { fn drop(&mut self) { - // Remove the shard. + // Remove the local queue. self.state - .shards + .local_queues .write() .unwrap() - .retain(|shard| !Arc::ptr_eq(shard, &self.shard)); + .retain(|local| !Arc::ptr_eq(local, &self.local)); - // Re-schedule remaining tasks in the shard. - while let Ok(r) = self.shard.pop() { + // Re-schedule remaining tasks in the local queue. + while let Ok(r) = self.local.pop() { r.schedule(); } }