Add tick() and try_tick()

This commit is contained in:
Stjepan Glavina 2020-08-29 18:26:28 +02:00
parent ec695c2e05
commit 6c6c1b1c2f
2 changed files with 316 additions and 52 deletions

88
examples/priority.rs Normal file
View File

@ -0,0 +1,88 @@
//! An executor with task priorities.
use std::future::Future;
use std::thread;
use async_executor::{Executor, Task};
use futures_lite::{future, FutureExt};
/// Task priority.
#[repr(usize)]
#[derive(Debug, Clone, Copy)]
enum Priority {
High = 0,
Medium = 1,
Low = 2,
}
/// An executor with task priorities.
///
/// Tasks with lower priorities only get polled when there are no tasks with higher priorities.
struct PriorityExecutor {
ex: [Executor; 3],
}
impl PriorityExecutor {
/// Creates a new executor.
const fn new() -> PriorityExecutor {
PriorityExecutor {
ex: [Executor::new(), Executor::new(), Executor::new()],
}
}
/// Spawns a task with the given priority.
fn spawn<T: Send + 'static>(
&self,
priority: Priority,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
self.ex[priority as usize].spawn(future)
}
/// Runs the executor until the future completes.
async fn run<T>(&self, future: impl Future<Output = T>) -> T {
future
.or(async {
// Keep ticking inner executors forever.
loop {
let t0 = self.ex[0].tick();
let t1 = self.ex[1].tick();
let t2 = self.ex[2].tick();
// Wait until one of the ticks completes, trying them in order from highest
// priority to lowest priority.
t0.or(t1).or(t2).await;
}
})
.await
}
}
fn main() {
static EX: PriorityExecutor = PriorityExecutor::new();
// Spawn a thread running the executor forever.
thread::spawn(|| {
let forever = future::pending::<()>();
future::block_on(EX.run(forever));
});
let mut tasks = Vec::new();
for _ in 0..20 {
// Choose a random priority.
let choice = [Priority::High, Priority::Medium, Priority::Low];
let priority = choice[fastrand::usize(..choice.len())];
// Spawn a task with this priority.
tasks.push(EX.spawn(priority, async move {
println!("{:?}", priority);
future::yield_now().await;
println!("{:?}", priority);
}));
}
for task in tasks {
future::block_on(task);
}
}

View File

@ -32,7 +32,7 @@ use std::sync::{Arc, Mutex, RwLock};
use std::task::{Context, Poll, Waker};
use concurrent_queue::ConcurrentQueue;
use futures_lite::future;
use futures_lite::{future, ready, FutureExt};
/// A runnable future, ready for execution.
///
@ -84,6 +84,9 @@ type Runnable = async_task::Task<()>;
#[derive(Debug)]
pub struct Task<T>(Option<async_task::JoinHandle<T, ()>>);
impl<T> UnwindSafe for Task<T> {}
impl<T> RefUnwindSafe for Task<T> {}
impl<T> Task<T> {
/// Detaches the task to let it keep running in the background.
///
@ -260,14 +263,17 @@ impl Sleepers {
}
/// Removes a previously inserted sleeping ticker.
fn remove(&mut self, id: u64) {
///
/// Returns `true` if the ticker was notified.
fn remove(&mut self, id: u64) -> bool {
self.count -= 1;
for i in (0..self.wakers.len()).rev() {
if self.wakers[i].0 == id {
self.wakers.remove(i);
return;
return false;
}
}
true
}
/// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
@ -358,6 +364,65 @@ impl Executor {
Task(Some(handle))
}
/// Attempts to run a task if at least one is scheduled.
///
/// Running a scheduled task means simply polling its future once.
///
/// # Examples
///
/// ```
/// use async_executor::Executor;
///
/// let ex = Executor::new();
/// assert!(!ex.try_tick()); // no tasks to run
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
/// assert!(ex.try_tick()); // a task was found
/// ```
pub fn try_tick(&self) -> bool {
match self.state().queue.pop() {
Err(_) => false,
Ok(r) => {
// Notify another ticker now to pick up where this ticker left off, just in case
// running the task takes a long time.
self.state().notify();
// Run the task.
r.run();
true
}
}
}
/// Run a single task.
///
/// Running a task means simply polling its future once.
///
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
///
/// # Examples
///
/// ```
/// use async_executor::Executor;
/// use futures_lite::future;
///
/// let ex = Executor::new();
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
/// future::block_on(ex.tick()); // runs the task
/// ```
pub async fn tick(&self) {
// Create a ticker that doesn't use sharding.
let ticker = Ticker::new(self.state());
// Keep trying until a single `poll_tick()` is successful.
future::poll_fn(|cx| ticker.poll_tick(cx)).await
}
/// Runs the executor until the given future completes.
///
/// # Examples
@ -374,30 +439,30 @@ impl Executor {
/// assert_eq!(res, 6);
/// ```
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
let ticker = Ticker::new(self.state());
// Create a ticker that uses sharding.
let runner = Runner::new(self.state());
future::race(
future,
future::poll_fn(|cx| {
// Run a batch of tasks.
for _ in 0..200 {
if !ticker.tick(cx.waker()) {
return Poll::Pending;
}
}
// A future that ticks the executor forever.
let tick_forever = future::poll_fn(|cx| {
// Run a batch of tasks.
for _ in 0..200 {
ready!(runner.poll_tick(cx));
}
// If there are more tasks, yield.
cx.waker().wake_by_ref();
Poll::Pending
}),
)
.await
// If there are more tasks, yield.
cx.waker().wake_by_ref();
Poll::Pending
});
// Run `future` and `tick_forever` concurrently until `future` completes.
future.or(tick_forever).await
}
/// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.state().clone();
// TODO(stjepang): If possible, push into the current shard and notify the ticker.
move |runnable| {
state.queue.push(runnable).unwrap();
state.notify();
@ -410,6 +475,12 @@ impl Executor {
}
}
impl Drop for Executor {
fn drop(&mut self) {
// TODO(stjepang): Cancel all remaining tasks.
}
}
impl Default for Executor {
fn default() -> Executor {
Executor::new()
@ -424,9 +495,6 @@ struct Ticker<'a> {
/// The executor state.
state: &'a State,
/// A shard of the global queue.
shard: Arc<ConcurrentQueue<Runnable>>,
/// Set to `true` when in sleeping state.
///
/// States a ticker can be in:
@ -434,25 +502,15 @@ struct Ticker<'a> {
/// 2a) Sleeping and unnotified.
/// 2b) Sleeping and notified.
sleeping: Cell<Option<u64>>,
/// Bumped every time a task is run.
ticks: Cell<usize>,
}
impl UnwindSafe for Ticker<'_> {}
impl RefUnwindSafe for Ticker<'_> {}
impl Ticker<'_> {
/// Creates a ticker and registers it in the executor state.
fn new(state: &State) -> Ticker<'_> {
let ticker = Ticker {
Ticker {
state,
shard: Arc::new(ConcurrentQueue::bounded(512)),
sleeping: Cell::new(None),
ticks: Cell::new(0),
};
state.shards.write().unwrap().push(ticker.shard.clone());
ticker
}
}
/// Moves the ticker into sleeping and unnotified state.
@ -481,8 +539,6 @@ impl Ticker<'_> {
}
/// Moves the ticker into woken state.
///
/// Returns `false` if the ticker was already woken.
fn wake(&self) {
if let Some(id) = self.sleeping.take() {
let mut sleepers = self.state.sleepers.lock().unwrap();
@ -494,23 +550,101 @@ impl Ticker<'_> {
}
}
/// Executes a single task.
/// Attempts to execute a single task.
///
/// This method takes a scheduled task and polls its future. It returns `true` if a scheduled
/// task was found, or `false` otherwise.
pub fn tick(&self, waker: &Waker) -> bool {
/// This method takes a scheduled task and polls its future.
fn poll_tick(&self, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.state.queue.pop() {
Err(_) => {
// Move to sleeping and unnotified state.
if !self.sleep(cx.waker()) {
// If already sleeping and unnotified, return.
return Poll::Pending;
}
}
Ok(r) => {
// Wake up.
self.wake();
// Notify another ticker now to pick up where this ticker left off, just in
// case running the task takes a long time.
self.state.notify();
// Run the task.
r.run();
return Poll::Ready(());
}
}
}
}
}
impl Drop for Ticker<'_> {
fn drop(&mut self) {
// If this ticker is in sleeping state, it must be removed from the sleepers list.
if let Some(id) = self.sleeping.take() {
let mut sleepers = self.state.sleepers.lock().unwrap();
let notified = sleepers.remove(id);
self.state
.notified
.swap(sleepers.is_notified(), Ordering::SeqCst);
// If this ticker was notified, then notify another ticker.
if notified {
drop(sleepers);
self.state.notify();
}
}
}
}
/// Executes tasks in a work-stealing executor.
///
/// A ticker represents a "worker" in a work-stealing executor.
#[derive(Debug)]
struct Runner<'a> {
state: &'a State,
ticker: Ticker<'a>,
/// A shard of the global queue.
shard: Arc<ConcurrentQueue<Runnable>>,
/// Bumped every time a task is run.
ticks: Cell<usize>,
}
impl Runner<'_> {
/// Creates a runner and registers it in the executor state.
fn new(state: &State) -> Runner<'_> {
let runner = Runner {
state,
ticker: Ticker::new(state),
shard: Arc::new(ConcurrentQueue::bounded(512)),
ticks: Cell::new(0),
};
state.shards.write().unwrap().push(runner.shard.clone());
runner
}
/// Attempts to execute a single task.
///
/// This method takes a scheduled task and polls its future.
fn poll_tick(&self, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.search() {
None => {
// Move to sleeping and unnotified state.
if !self.sleep(waker) {
if !self.ticker.sleep(cx.waker()) {
// If already sleeping and unnotified, return.
return false;
return Poll::Pending;
}
}
Some(r) => {
// Wake up.
self.wake();
self.ticker.wake();
// Notify another ticker now to pick up where this ticker left off, just in
// case running the task takes a long time.
@ -527,8 +661,7 @@ impl Ticker<'_> {
// Run the task.
r.run();
return true;
return Poll::Ready(());
}
}
}
@ -536,6 +669,7 @@ impl Ticker<'_> {
/// Finds the next task to run.
fn search(&self) -> Option<Runnable> {
// Try the shard.
if let Ok(r) = self.shard.pop() {
return Some(r);
}
@ -569,10 +703,9 @@ impl Ticker<'_> {
}
}
impl Drop for Ticker<'_> {
impl Drop for Runner<'_> {
fn drop(&mut self) {
// Wake and unregister the ticker.
self.wake();
// Remove the shard.
self.state
.shards
.write()
@ -583,10 +716,6 @@ impl Drop for Ticker<'_> {
while let Ok(r) = self.shard.pop() {
r.schedule();
}
// Notify another ticker to start searching for tasks.
self.state.notify();
// TODO(stjepang): Cancel all remaining tasks.
}
}
@ -637,6 +766,9 @@ pub struct LocalExecutor {
_marker: PhantomData<Rc<()>>,
}
impl UnwindSafe for LocalExecutor {}
impl RefUnwindSafe for LocalExecutor {}
impl LocalExecutor {
/// Creates a single-threaded executor.
///
@ -674,6 +806,50 @@ impl LocalExecutor {
Task(Some(handle))
}
/// Attempts to run a task if at least one is scheduled.
///
/// Running a scheduled task means simply polling its future once.
///
/// # Examples
///
/// ```
/// use async_executor::LocalExecutor;
///
/// let ex = LocalExecutor::new();
/// assert!(!ex.try_tick()); // no tasks to run
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
/// assert!(ex.try_tick()); // a task was found
/// ```
pub fn try_tick(&self) -> bool {
self.inner().try_tick()
}
/// Run a single task.
///
/// Running a task means simply polling its future once.
///
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
///
/// # Examples
///
/// ```
/// use async_executor::LocalExecutor;
/// use futures_lite::future;
///
/// let ex = LocalExecutor::new();
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
/// future::block_on(ex.tick()); // runs the task
/// ```
pub async fn tick(&self) {
self.inner().tick().await
}
/// Runs the executor until the given future completes.
///
/// # Examples