Refactor
This commit is contained in:
parent
6f2b0b8a49
commit
184185a7fa
537
src/lib.rs
537
src/lib.rs
|
@ -36,132 +36,6 @@ use vec_arena::Arena;
|
|||
#[doc(no_inline)]
|
||||
pub use async_task::Task;
|
||||
|
||||
/// The state of a executor.
|
||||
#[derive(Debug)]
|
||||
struct State {
|
||||
/// The global queue.
|
||||
queue: ConcurrentQueue<Runnable>,
|
||||
|
||||
/// Local queues created by runners.
|
||||
local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
|
||||
|
||||
/// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
|
||||
notified: AtomicBool,
|
||||
|
||||
/// A list of sleeping tickers.
|
||||
sleepers: Mutex<Sleepers>,
|
||||
|
||||
active: Mutex<Arena<Waker>>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
/// Creates state for a new executor.
|
||||
fn new() -> State {
|
||||
State {
|
||||
queue: ConcurrentQueue::unbounded(),
|
||||
local_queues: RwLock::new(Vec::new()),
|
||||
notified: AtomicBool::new(true),
|
||||
sleepers: Mutex::new(Sleepers {
|
||||
count: 0,
|
||||
wakers: Vec::new(),
|
||||
free_ids: Vec::new(),
|
||||
}),
|
||||
active: Mutex::new(Arena::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Notifies a sleeping ticker.
|
||||
#[inline]
|
||||
fn notify(&self) {
|
||||
if !self
|
||||
.notified
|
||||
.compare_and_swap(false, true, Ordering::SeqCst)
|
||||
{
|
||||
let waker = self.sleepers.lock().unwrap().notify();
|
||||
if let Some(w) = waker {
|
||||
w.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A list of sleeping tickers.
|
||||
#[derive(Debug)]
|
||||
struct Sleepers {
|
||||
/// Number of sleeping tickers (both notified and unnotified).
|
||||
count: usize,
|
||||
|
||||
/// IDs and wakers of sleeping unnotified tickers.
|
||||
///
|
||||
/// A sleeping ticker is notified when its waker is missing from this list.
|
||||
wakers: Vec<(usize, Waker)>,
|
||||
|
||||
/// Reclaimed IDs.
|
||||
free_ids: Vec<usize>,
|
||||
}
|
||||
|
||||
impl Sleepers {
|
||||
/// Inserts a new sleeping ticker.
|
||||
fn insert(&mut self, waker: &Waker) -> usize {
|
||||
let id = match self.free_ids.pop() {
|
||||
Some(id) => id,
|
||||
None => self.count + 1,
|
||||
};
|
||||
self.count += 1;
|
||||
self.wakers.push((id, waker.clone()));
|
||||
id
|
||||
}
|
||||
|
||||
/// Re-inserts a sleeping ticker's waker if it was notified.
|
||||
///
|
||||
/// Returns `true` if the ticker was notified.
|
||||
fn update(&mut self, id: usize, waker: &Waker) -> bool {
|
||||
for item in &mut self.wakers {
|
||||
if item.0 == id {
|
||||
if !item.1.will_wake(waker) {
|
||||
item.1 = waker.clone();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
self.wakers.push((id, waker.clone()));
|
||||
true
|
||||
}
|
||||
|
||||
/// Removes a previously inserted sleeping ticker.
|
||||
///
|
||||
/// Returns `true` if the ticker was notified.
|
||||
fn remove(&mut self, id: usize) -> bool {
|
||||
self.count -= 1;
|
||||
self.free_ids.push(id);
|
||||
|
||||
for i in (0..self.wakers.len()).rev() {
|
||||
if self.wakers[i].0 == id {
|
||||
self.wakers.remove(i);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
/// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
|
||||
fn is_notified(&self) -> bool {
|
||||
self.count == 0 || self.count > self.wakers.len()
|
||||
}
|
||||
|
||||
/// Returns notification waker for a sleeping ticker.
|
||||
///
|
||||
/// If a ticker was notified already or there are no tickers, `None` will be returned.
|
||||
fn notify(&mut self) -> Option<Waker> {
|
||||
if self.wakers.len() == self.count {
|
||||
self.wakers.pop().map(|item| item.1)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// An async executor.
|
||||
///
|
||||
/// # Examples
|
||||
|
@ -357,14 +231,13 @@ impl<'a> Executor<'a> {
|
|||
impl Drop for Executor<'_> {
|
||||
fn drop(&mut self) {
|
||||
if let Some(state) = self.state.get() {
|
||||
{
|
||||
let mut state = state.active.lock().unwrap();
|
||||
for i in 0..state.capacity() {
|
||||
if let Some(w) = state.remove(i) {
|
||||
w.wake();
|
||||
}
|
||||
let mut active = state.active.lock().unwrap();
|
||||
for i in 0..active.capacity() {
|
||||
if let Some(w) = active.remove(i) {
|
||||
w.wake();
|
||||
}
|
||||
}
|
||||
drop(active);
|
||||
|
||||
while state.queue.pop().is_ok() {}
|
||||
}
|
||||
|
@ -377,6 +250,269 @@ impl<'a> Default for Executor<'a> {
|
|||
}
|
||||
}
|
||||
|
||||
/// A thread-local executor.
|
||||
///
|
||||
/// The executor can only be run on the thread that created it.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
/// use futures_lite::future;
|
||||
///
|
||||
/// let local_ex = LocalExecutor::new();
|
||||
///
|
||||
/// future::block_on(local_ex.run(async {
|
||||
/// println!("Hello world!");
|
||||
/// }));
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct LocalExecutor<'a> {
|
||||
/// The inner executor.
|
||||
inner: once_cell::unsync::OnceCell<Executor<'a>>,
|
||||
|
||||
/// Make sure the type is `!Send` and `!Sync`.
|
||||
_marker: PhantomData<Rc<()>>,
|
||||
}
|
||||
|
||||
impl UnwindSafe for LocalExecutor<'_> {}
|
||||
impl RefUnwindSafe for LocalExecutor<'_> {}
|
||||
|
||||
impl<'a> LocalExecutor<'a> {
|
||||
/// Creates a single-threaded executor.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
///
|
||||
/// let local_ex = LocalExecutor::new();
|
||||
/// ```
|
||||
pub const fn new() -> LocalExecutor<'a> {
|
||||
LocalExecutor {
|
||||
inner: once_cell::unsync::OnceCell::new(),
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a task onto the executor.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
///
|
||||
/// let local_ex = LocalExecutor::new();
|
||||
///
|
||||
/// let task = local_ex.spawn(async {
|
||||
/// println!("Hello world");
|
||||
/// });
|
||||
/// ```
|
||||
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
|
||||
unsafe { self.inner().spawn_unchecked(future) }
|
||||
}
|
||||
|
||||
/// Attempts to run a task if at least one is scheduled.
|
||||
///
|
||||
/// Running a scheduled task means simply polling its future once.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
///
|
||||
/// let ex = LocalExecutor::new();
|
||||
/// assert!(!ex.try_tick()); // no tasks to run
|
||||
///
|
||||
/// let task = ex.spawn(async {
|
||||
/// println!("Hello world");
|
||||
/// });
|
||||
/// assert!(ex.try_tick()); // a task was found
|
||||
/// ```
|
||||
pub fn try_tick(&self) -> bool {
|
||||
self.inner().try_tick()
|
||||
}
|
||||
|
||||
/// Run a single task.
|
||||
///
|
||||
/// Running a task means simply polling its future once.
|
||||
///
|
||||
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
/// use futures_lite::future;
|
||||
///
|
||||
/// let ex = LocalExecutor::new();
|
||||
///
|
||||
/// let task = ex.spawn(async {
|
||||
/// println!("Hello world");
|
||||
/// });
|
||||
/// future::block_on(ex.tick()); // runs the task
|
||||
/// ```
|
||||
pub async fn tick(&self) {
|
||||
self.inner().tick().await
|
||||
}
|
||||
|
||||
/// Runs the executor until the given future completes.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
/// use futures_lite::future;
|
||||
///
|
||||
/// let local_ex = LocalExecutor::new();
|
||||
///
|
||||
/// let task = local_ex.spawn(async { 1 + 2 });
|
||||
/// let res = future::block_on(local_ex.run(async { task.await * 2 }));
|
||||
///
|
||||
/// assert_eq!(res, 6);
|
||||
/// ```
|
||||
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
|
||||
self.inner().run(future).await
|
||||
}
|
||||
|
||||
/// Returns a reference to the inner executor.
|
||||
fn inner(&self) -> &Executor<'a> {
|
||||
self.inner.get_or_init(|| Executor::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Default for LocalExecutor<'a> {
|
||||
fn default() -> LocalExecutor<'a> {
|
||||
LocalExecutor::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// The state of a executor.
|
||||
#[derive(Debug)]
|
||||
struct State {
|
||||
/// The global queue.
|
||||
queue: ConcurrentQueue<Runnable>,
|
||||
|
||||
/// Local queues created by runners.
|
||||
local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
|
||||
|
||||
/// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
|
||||
notified: AtomicBool,
|
||||
|
||||
/// A list of sleeping tickers.
|
||||
sleepers: Mutex<Sleepers>,
|
||||
|
||||
active: Mutex<Arena<Waker>>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
/// Creates state for a new executor.
|
||||
fn new() -> State {
|
||||
State {
|
||||
queue: ConcurrentQueue::unbounded(),
|
||||
local_queues: RwLock::new(Vec::new()),
|
||||
notified: AtomicBool::new(true),
|
||||
sleepers: Mutex::new(Sleepers {
|
||||
count: 0,
|
||||
wakers: Vec::new(),
|
||||
free_ids: Vec::new(),
|
||||
}),
|
||||
active: Mutex::new(Arena::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Notifies a sleeping ticker.
|
||||
#[inline]
|
||||
fn notify(&self) {
|
||||
if !self
|
||||
.notified
|
||||
.compare_and_swap(false, true, Ordering::SeqCst)
|
||||
{
|
||||
let waker = self.sleepers.lock().unwrap().notify();
|
||||
if let Some(w) = waker {
|
||||
w.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A list of sleeping tickers.
|
||||
#[derive(Debug)]
|
||||
struct Sleepers {
|
||||
/// Number of sleeping tickers (both notified and unnotified).
|
||||
count: usize,
|
||||
|
||||
/// IDs and wakers of sleeping unnotified tickers.
|
||||
///
|
||||
/// A sleeping ticker is notified when its waker is missing from this list.
|
||||
wakers: Vec<(usize, Waker)>,
|
||||
|
||||
/// Reclaimed IDs.
|
||||
free_ids: Vec<usize>,
|
||||
}
|
||||
|
||||
impl Sleepers {
|
||||
/// Inserts a new sleeping ticker.
|
||||
fn insert(&mut self, waker: &Waker) -> usize {
|
||||
let id = match self.free_ids.pop() {
|
||||
Some(id) => id,
|
||||
None => self.count + 1,
|
||||
};
|
||||
self.count += 1;
|
||||
self.wakers.push((id, waker.clone()));
|
||||
id
|
||||
}
|
||||
|
||||
/// Re-inserts a sleeping ticker's waker if it was notified.
|
||||
///
|
||||
/// Returns `true` if the ticker was notified.
|
||||
fn update(&mut self, id: usize, waker: &Waker) -> bool {
|
||||
for item in &mut self.wakers {
|
||||
if item.0 == id {
|
||||
if !item.1.will_wake(waker) {
|
||||
item.1 = waker.clone();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
self.wakers.push((id, waker.clone()));
|
||||
true
|
||||
}
|
||||
|
||||
/// Removes a previously inserted sleeping ticker.
|
||||
///
|
||||
/// Returns `true` if the ticker was notified.
|
||||
fn remove(&mut self, id: usize) -> bool {
|
||||
self.count -= 1;
|
||||
self.free_ids.push(id);
|
||||
|
||||
for i in (0..self.wakers.len()).rev() {
|
||||
if self.wakers[i].0 == id {
|
||||
self.wakers.remove(i);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
/// Returns `true` if a sleeping ticker is notified or no tickers are sleeping.
|
||||
fn is_notified(&self) -> bool {
|
||||
self.count == 0 || self.count > self.wakers.len()
|
||||
}
|
||||
|
||||
/// Returns notification waker for a sleeping ticker.
|
||||
///
|
||||
/// If a ticker was notified already or there are no tickers, `None` will be returned.
|
||||
fn notify(&mut self) -> Option<Waker> {
|
||||
if self.wakers.len() == self.count {
|
||||
self.wakers.pop().map(|item| item.1)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs task one by one.
|
||||
#[derive(Debug)]
|
||||
struct Ticker<'a> {
|
||||
|
@ -624,143 +760,6 @@ fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
|
|||
}
|
||||
}
|
||||
|
||||
/// A thread-local executor.
|
||||
///
|
||||
/// The executor can only be run on the thread that created it.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
/// use futures_lite::future;
|
||||
///
|
||||
/// let local_ex = LocalExecutor::new();
|
||||
///
|
||||
/// future::block_on(local_ex.run(async {
|
||||
/// println!("Hello world!");
|
||||
/// }));
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub struct LocalExecutor<'a> {
|
||||
/// The inner executor.
|
||||
inner: once_cell::unsync::OnceCell<Executor<'a>>,
|
||||
|
||||
/// Make sure the type is `!Send` and `!Sync`.
|
||||
_marker: PhantomData<Rc<()>>,
|
||||
}
|
||||
|
||||
impl UnwindSafe for LocalExecutor<'_> {}
|
||||
impl RefUnwindSafe for LocalExecutor<'_> {}
|
||||
|
||||
impl<'a> LocalExecutor<'a> {
|
||||
/// Creates a single-threaded executor.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
///
|
||||
/// let local_ex = LocalExecutor::new();
|
||||
/// ```
|
||||
pub const fn new() -> LocalExecutor<'a> {
|
||||
LocalExecutor {
|
||||
inner: once_cell::unsync::OnceCell::new(),
|
||||
_marker: PhantomData,
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawns a task onto the executor.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
///
|
||||
/// let local_ex = LocalExecutor::new();
|
||||
///
|
||||
/// let task = local_ex.spawn(async {
|
||||
/// println!("Hello world");
|
||||
/// });
|
||||
/// ```
|
||||
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
|
||||
unsafe { self.inner().spawn_unchecked(future) }
|
||||
}
|
||||
|
||||
/// Attempts to run a task if at least one is scheduled.
|
||||
///
|
||||
/// Running a scheduled task means simply polling its future once.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
///
|
||||
/// let ex = LocalExecutor::new();
|
||||
/// assert!(!ex.try_tick()); // no tasks to run
|
||||
///
|
||||
/// let task = ex.spawn(async {
|
||||
/// println!("Hello world");
|
||||
/// });
|
||||
/// assert!(ex.try_tick()); // a task was found
|
||||
/// ```
|
||||
pub fn try_tick(&self) -> bool {
|
||||
self.inner().try_tick()
|
||||
}
|
||||
|
||||
/// Run a single task.
|
||||
///
|
||||
/// Running a task means simply polling its future once.
|
||||
///
|
||||
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
/// use futures_lite::future;
|
||||
///
|
||||
/// let ex = LocalExecutor::new();
|
||||
///
|
||||
/// let task = ex.spawn(async {
|
||||
/// println!("Hello world");
|
||||
/// });
|
||||
/// future::block_on(ex.tick()); // runs the task
|
||||
/// ```
|
||||
pub async fn tick(&self) {
|
||||
self.inner().tick().await
|
||||
}
|
||||
|
||||
/// Runs the executor until the given future completes.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_executor::LocalExecutor;
|
||||
/// use futures_lite::future;
|
||||
///
|
||||
/// let local_ex = LocalExecutor::new();
|
||||
///
|
||||
/// let task = local_ex.spawn(async { 1 + 2 });
|
||||
/// let res = future::block_on(local_ex.run(async { task.await * 2 }));
|
||||
///
|
||||
/// assert_eq!(res, 6);
|
||||
/// ```
|
||||
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
|
||||
self.inner().run(future).await
|
||||
}
|
||||
|
||||
/// Returns a reference to the inner executor.
|
||||
fn inner(&self) -> &Executor<'a> {
|
||||
self.inner.get_or_init(|| Executor::new())
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Default for LocalExecutor<'a> {
|
||||
fn default() -> LocalExecutor<'a> {
|
||||
LocalExecutor::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs a closure when dropped.
|
||||
struct CallOnDrop<F: Fn()>(F);
|
||||
|
||||
|
|
Loading…
Reference in New Issue