This commit is contained in:
Stjepan Glavina 2020-08-29 19:57:21 +02:00
parent d69638b2d3
commit 2da645e6e0
1 changed files with 54 additions and 41 deletions

View File

@ -180,8 +180,8 @@ struct State {
/// The global queue.
queue: ConcurrentQueue<Runnable>,
/// Shards of the global queue created by tickers.
shards: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
/// Local queues created by runners.
local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
/// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
notified: AtomicBool,
@ -195,7 +195,7 @@ impl State {
fn new() -> State {
State {
queue: ConcurrentQueue::unbounded(),
shards: RwLock::new(Vec::new()),
local_queues: RwLock::new(Vec::new()),
notified: AtomicBool::new(true),
sleepers: Mutex::new(Sleepers {
count: 0,
@ -417,7 +417,7 @@ impl Executor {
/// ```
pub async fn tick(&self) {
let state = self.state();
let runnable = Ticker::new(state).runnable(|| state.queue.pop().ok()).await;
let runnable = Ticker::new(state).runnable().await;
runnable.run();
}
@ -458,7 +458,7 @@ impl Executor {
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.state().clone();
// TODO(stjepang): If possible, push into the current shard and notify the ticker.
// TODO(stjepang): If possible, push into the current local queue and notify the ticker.
move |runnable| {
state.queue.push(runnable).unwrap();
state.notify();
@ -483,9 +483,7 @@ impl Default for Executor {
}
}
/// Executes tasks in a work-stealing executor.
///
/// A ticker represents a "worker" in a work-stealing executor.
/// Runs task one by one.
#[derive(Debug)]
struct Ticker<'a> {
/// The executor state.
@ -501,7 +499,7 @@ struct Ticker<'a> {
}
impl Ticker<'_> {
/// Creates a ticker and registers it in the executor state.
/// Creates a ticker.
fn new(state: &State) -> Ticker<'_> {
Ticker {
state,
@ -546,8 +544,13 @@ impl Ticker<'_> {
}
}
/// Finds a task to run.
async fn runnable(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
/// Waits for the next runnable task to run.
async fn runnable(&self) -> Runnable {
self.runnable_with(|| self.state.queue.pop().ok()).await
}
/// Waits for the next runnable task to run, given a function that searches for a task.
async fn runnable_with(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
future::poll_fn(|cx| {
loop {
match search() {
@ -595,19 +598,21 @@ impl Drop for Ticker<'_> {
}
}
/// Executes tasks in a work-stealing executor.
/// A worker in a work-stealing executor.
///
/// A ticker represents a "worker" in a work-stealing executor.
/// This is just a ticker that also has an associated local queue for improved cache locality.
#[derive(Debug)]
struct Runner<'a> {
/// The executor state.
state: &'a State,
/// Inner ticker.
ticker: Ticker<'a>,
/// A shard of the global queue.
shard: Arc<ConcurrentQueue<Runnable>>,
/// The local queue.
local: Arc<ConcurrentQueue<Runnable>>,
/// Bumped every time a task is run.
/// Bumped every time a runnable task is found.
ticks: Cell<usize>,
}
@ -617,44 +622,52 @@ impl Runner<'_> {
let runner = Runner {
state,
ticker: Ticker::new(state),
shard: Arc::new(ConcurrentQueue::bounded(512)),
local: Arc::new(ConcurrentQueue::bounded(512)),
ticks: Cell::new(0),
};
state.shards.write().unwrap().push(runner.shard.clone());
state
.local_queues
.write()
.unwrap()
.push(runner.local.clone());
runner
}
/// Finds a task to run.
/// Waits for the next runnable task to run.
async fn runnable(&self) -> Runnable {
let runnable = self
.ticker
.runnable(|| {
// Try the shard.
if let Ok(r) = self.shard.pop() {
.runnable_with(|| {
// Try the local queue.
if let Ok(r) = self.local.pop() {
return Some(r);
}
// Try stealing from the global queue.
if let Ok(r) = self.state.queue.pop() {
steal(&self.state.queue, &self.shard);
steal(&self.state.queue, &self.local);
return Some(r);
}
// Try stealing from other shards.
let shards = self.state.shards.read().unwrap();
// Try stealing from other runners.
let local_queues = self.state.local_queues.read().unwrap();
// Pick a random starting point in the iterator list and rotate the list.
let n = shards.len();
let n = local_queues.len();
let start = fastrand::usize(..n);
let iter = shards.iter().chain(shards.iter()).skip(start).take(n);
let iter = local_queues
.iter()
.chain(local_queues.iter())
.skip(start)
.take(n);
// Remove this ticker's shard.
let iter = iter.filter(|shard| !Arc::ptr_eq(shard, &self.shard));
// Remove this runner's local queue.
let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));
// Try stealing from each shard in the list.
for shard in iter {
steal(shard, &self.shard);
if let Ok(r) = self.shard.pop() {
// Try stealing from each local queue in the list.
for local in iter {
steal(local, &self.local);
if let Ok(r) = self.local.pop() {
return Some(r);
}
}
@ -663,13 +676,13 @@ impl Runner<'_> {
})
.await;
// Bump the ticker.
// Bump the tick counter.
let ticks = self.ticks.get();
self.ticks.set(ticks.wrapping_add(1));
// Steal tasks from the global queue to ensure fair task scheduling.
if ticks % 64 == 0 {
steal(&self.state.queue, &self.shard);
// Steal tasks from the global queue to ensure fair task scheduling.
steal(&self.state.queue, &self.local);
}
runnable
@ -678,15 +691,15 @@ impl Runner<'_> {
impl Drop for Runner<'_> {
fn drop(&mut self) {
// Remove the shard.
// Remove the local queue.
self.state
.shards
.local_queues
.write()
.unwrap()
.retain(|shard| !Arc::ptr_eq(shard, &self.shard));
.retain(|local| !Arc::ptr_eq(local, &self.local));
// Re-schedule remaining tasks in the shard.
while let Ok(r) = self.shard.pop() {
// Re-schedule remaining tasks in the local queue.
while let Ok(r) = self.local.pop() {
r.schedule();
}
}