Initial commit

This commit is contained in:
Stjepan Glavina 2019-08-12 20:18:51 +02:00
commit 1479e86ca6
25 changed files with 4781 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
/target
**/*.rs.bk
Cargo.lock

20
Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "async-task"
version = "0.1.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
license = "Apache-2.0/MIT"
repository = "https://github.com/stjepang/async-task"
homepage = "https://github.com/stjepang/async-task"
documentation = "https://docs.rs/async-task"
description = "Task abstraction for building executors"
keywords = ["future", "task", "executor", "spawn"]
categories = ["asynchronous", "concurrency"]
[dependencies]
crossbeam-utils = "0.6.5"
[dev-dependencies]
crossbeam = "0.7.1"
futures-preview = "0.3.0-alpha.17"
lazy_static = "1.3.0"

201
LICENSE-APACHE Normal file
View File

@ -0,0 +1,201 @@
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
APPENDIX: How to apply the Apache License to your work.
To apply the Apache License to your work, attach the following
boilerplate notice, with the fields enclosed by brackets "[]"
replaced with your own identifying information. (Don't include
the brackets!) The text should be enclosed in the appropriate
comment syntax for the file format. We also recommend that a
file or class name and description of purpose be included on the
same "printed page" as the copyright notice for easier
identification within third-party archives.
Copyright [yyyy] [name of copyright owner]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

23
LICENSE-MIT Normal file
View File

@ -0,0 +1,23 @@
Permission is hereby granted, free of charge, to any
person obtaining a copy of this software and associated
documentation files (the "Software"), to deal in the
Software without restriction, including without
limitation the rights to use, copy, modify, merge,
publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software
is furnished to do so, subject to the following
conditions:
The above copyright notice and this permission notice
shall be included in all copies or substantial portions
of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
DEALINGS IN THE SOFTWARE.

21
README.md Normal file
View File

@ -0,0 +1,21 @@
# async-task
A task abstraction for building executors.
This crate makes it possible to build an efficient and extendable executor in few lines of
code.
## License
Licensed under either of
* Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
* MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)
at your option.
#### Contribution
Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be
dual licensed as above, without any additional terms or conditions.

43
benches/bench.rs Normal file
View File

@ -0,0 +1,43 @@
#![feature(async_await, test)]
extern crate test;
use futures::channel::oneshot;
use futures::executor;
use futures::future::TryFutureExt;
use test::Bencher;
#[bench]
fn task_create(b: &mut Bencher) {
b.iter(|| {
async_task::spawn(async {}, drop, ());
});
}
#[bench]
fn task_run(b: &mut Bencher) {
b.iter(|| {
let (task, handle) = async_task::spawn(async {}, drop, ());
task.run();
executor::block_on(handle).unwrap();
});
}
#[bench]
fn oneshot_create(b: &mut Bencher) {
b.iter(|| {
let (tx, _rx) = oneshot::channel::<()>();
let _task = Box::new(async move { tx.send(()).map_err(|_| ()) });
});
}
#[bench]
fn oneshot_run(b: &mut Bencher) {
b.iter(|| {
let (tx, rx) = oneshot::channel::<()>();
let task = Box::new(async move { tx.send(()).map_err(|_| ()) });
let future = task.and_then(|_| rx.map_err(|_| ()));
executor::block_on(future).unwrap();
});
}

View File

@ -0,0 +1,75 @@
//! A single-threaded executor where join handles propagate panics from tasks.
#![feature(async_await)]
use std::future::Future;
use std::panic::{resume_unwind, AssertUnwindSafe};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use crossbeam::channel::{unbounded, Sender};
use futures::executor;
use futures::future::FutureExt;
use lazy_static::lazy_static;
/// Spawns a future on the executor.
fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
lazy_static! {
// A channel that holds scheduled tasks.
static ref QUEUE: Sender<async_task::Task<()>> = {
let (sender, receiver) = unbounded::<async_task::Task<()>>();
// Start the executor thread.
thread::spawn(|| {
for task in receiver {
// No need for `catch_unwind()` here because panics are already caught.
task.run();
}
});
sender
};
}
// Create a future that catches panics within itself.
let future = AssertUnwindSafe(future).catch_unwind();
// Create a task that is scheduled by sending itself into the channel.
let schedule = |t| QUEUE.send(t).unwrap();
let (task, handle) = async_task::spawn(future, schedule, ());
// Schedule the task by sending it into the channel.
task.schedule();
// Wrap the handle into one that propagates panics.
JoinHandle(handle)
}
/// A join handle that propagates panics inside the task.
struct JoinHandle<R>(async_task::JoinHandle<thread::Result<R>, ()>);
impl<R> Future for JoinHandle<R> {
type Output = Option<R>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.0).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok(val))) => Poll::Ready(Some(val)),
Poll::Ready(Some(Err(err))) => resume_unwind(err),
}
}
}
fn main() {
// Spawn a future that panics and block on it.
let handle = spawn(async {
panic!("Ooops!");
});
executor::block_on(handle);
}

74
examples/panic-result.rs Normal file
View File

@ -0,0 +1,74 @@
//! A single-threaded executor where join handles catch panics inside tasks.
#![feature(async_await)]
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::thread;
use crossbeam::channel::{unbounded, Sender};
use futures::executor;
use futures::future::FutureExt;
use lazy_static::lazy_static;
/// Spawns a future on the executor.
fn spawn<F, R>(future: F) -> async_task::JoinHandle<thread::Result<R>, ()>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
lazy_static! {
// A channel that holds scheduled tasks.
static ref QUEUE: Sender<async_task::Task<()>> = {
let (sender, receiver) = unbounded::<async_task::Task<()>>();
// Start the executor thread.
thread::spawn(|| {
for task in receiver {
// No need for `catch_unwind()` here because panics are already caught.
task.run();
}
});
sender
};
}
// Create a future that catches panics within itself.
let future = AssertUnwindSafe(future).catch_unwind();
// Create a task that is scheduled by sending itself into the channel.
let schedule = |t| QUEUE.send(t).unwrap();
let (task, handle) = async_task::spawn(future, schedule, ());
// Schedule the task by sending it into the channel.
task.schedule();
handle
}
fn main() {
// Spawn a future that completes succesfully.
let handle = spawn(async {
println!("Hello, world!");
});
// Block on the future and report its result.
match executor::block_on(handle) {
None => println!("The task was cancelled."),
Some(Ok(val)) => println!("The task completed with {:?}", val),
Some(Err(_)) => println!("The task has panicked"),
}
// Spawn a future that panics.
let handle = spawn(async {
panic!("Ooops!");
});
// Block on the future and report its result.
match executor::block_on(handle) {
None => println!("The task was cancelled."),
Some(Ok(val)) => println!("The task completed with {:?}", val),
Some(Err(_)) => println!("The task has panicked"),
}
}

View File

@ -0,0 +1,55 @@
//! A function that runs a future to completion on a dedicated thread.
#![feature(async_await)]
use std::future::Future;
use std::sync::Arc;
use std::thread;
use crossbeam::channel;
use futures::executor;
/// Spawns a future on a new dedicated thread.
///
/// The returned handle can be used to await the output of the future.
fn spawn_on_thread<F, R>(future: F) -> async_task::JoinHandle<R, ()>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
// Create a channel that holds the task when it is scheduled for running.
let (sender, receiver) = channel::unbounded();
let sender = Arc::new(sender);
let s = Arc::downgrade(&sender);
// Wrap the future into one that disconnects the channel on completion.
let future = async move {
// When the inner future completes, the sender gets dropped and disconnects the channel.
let _sender = sender;
future.await
};
// Create a task that is scheduled by sending itself into the channel.
let schedule = move |t| s.upgrade().unwrap().send(t).unwrap();
let (task, handle) = async_task::spawn(future, schedule, ());
// Schedule the task by sending it into the channel.
task.schedule();
// Spawn a thread running the task to completion.
thread::spawn(move || {
// Keep taking the task from the channel and running it until completion.
for task in receiver {
task.run();
}
});
handle
}
fn main() {
// Spawn a future on a dedicated thread.
executor::block_on(spawn_on_thread(async {
println!("Hello, world!");
}));
}

52
examples/spawn.rs Normal file
View File

@ -0,0 +1,52 @@
//! A simple single-threaded executor.
#![feature(async_await)]
use std::future::Future;
use std::panic::catch_unwind;
use std::thread;
use crossbeam::channel::{unbounded, Sender};
use futures::executor;
use lazy_static::lazy_static;
/// Spawns a future on the executor.
fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
lazy_static! {
// A channel that holds scheduled tasks.
static ref QUEUE: Sender<async_task::Task<()>> = {
let (sender, receiver) = unbounded::<async_task::Task<()>>();
// Start the executor thread.
thread::spawn(|| {
for task in receiver {
// Ignore panics for simplicity.
let _ignore_panic = catch_unwind(|| task.run());
}
});
sender
};
}
// Create a task that is scheduled by sending itself into the channel.
let schedule = |t| QUEUE.send(t).unwrap();
let (task, handle) = async_task::spawn(future, schedule, ());
// Schedule the task by sending it into the channel.
task.schedule();
handle
}
fn main() {
// Spawn a future and await its result.
let handle = spawn(async {
println!("Hello, world!");
});
executor::block_on(handle);
}

88
examples/task-id.rs Normal file
View File

@ -0,0 +1,88 @@
//! An executor that assigns an ID to every spawned task.
#![feature(async_await)]
use std::cell::Cell;
use std::future::Future;
use std::panic::catch_unwind;
use std::thread;
use crossbeam::atomic::AtomicCell;
use crossbeam::channel::{unbounded, Sender};
use futures::executor;
use lazy_static::lazy_static;
#[derive(Clone, Copy, Debug)]
struct TaskId(usize);
thread_local! {
/// The ID of the current task.
static TASK_ID: Cell<Option<TaskId>> = Cell::new(None);
}
/// Returns the ID of the currently executing task.
///
/// Returns `None` if called outside the runtime.
fn task_id() -> Option<TaskId> {
TASK_ID.with(|id| id.get())
}
/// Spawns a future on the executor.
fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, TaskId>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
lazy_static! {
// A channel that holds scheduled tasks.
static ref QUEUE: Sender<async_task::Task<TaskId>> = {
let (sender, receiver) = unbounded::<async_task::Task<TaskId>>();
// Start the executor thread.
thread::spawn(|| {
TASK_ID.with(|id| {
for task in receiver {
// Store the task ID into the thread-local before running.
id.set(Some(*task.tag()));
// Ignore panics for simplicity.
let _ignore_panic = catch_unwind(|| task.run());
}
})
});
sender
};
// A counter that assigns IDs to spawned tasks.
static ref COUNTER: AtomicCell<usize> = AtomicCell::new(0);
}
// Reserve an ID for the new task.
let id = TaskId(COUNTER.fetch_add(1));
// Create a task that is scheduled by sending itself into the channel.
let schedule = |task| QUEUE.send(task).unwrap();
let (task, handle) = async_task::spawn(future, schedule, id);
// Schedule the task by sending it into the channel.
task.schedule();
handle
}
fn main() {
let mut handles = vec![];
// Spawn a bunch of tasks.
for _ in 0..10 {
handles.push(spawn(async move {
println!("Hello from task with {:?}", task_id());
}));
}
// Wait for the tasks to finish.
for handle in handles {
executor::block_on(handle);
}
}

158
src/header.rs Normal file
View File

@ -0,0 +1,158 @@
use std::alloc::Layout;
use std::cell::Cell;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Waker;
use crossbeam_utils::Backoff;
use crate::raw::TaskVTable;
use crate::state::*;
use crate::utils::{abort_on_panic, extend};
/// The header of a task.
///
/// This header is stored right at the beginning of every heap-allocated task.
pub(crate) struct Header {
/// Current state of the task.
///
/// Contains flags representing the current state and the reference count.
pub(crate) state: AtomicUsize,
/// The task that is blocked on the `JoinHandle`.
///
/// This waker needs to be woken once the task completes or is closed.
pub(crate) awaiter: Cell<Option<Waker>>,
/// The virtual table.
///
/// In addition to the actual waker virtual table, it also contains pointers to several other
/// methods necessary for bookkeeping the heap-allocated task.
pub(crate) vtable: &'static TaskVTable,
}
impl Header {
/// Cancels the task.
///
/// This method will only mark the task as closed and will notify the awaiter, but it won't
/// reschedule the task if it's not completed.
pub(crate) fn cancel(&self) {
let mut state = self.state.load(Ordering::Acquire);
loop {
// If the task has been completed or closed, it can't be cancelled.
if state & (COMPLETED | CLOSED) != 0 {
break;
}
// Mark the task as closed.
match self.state.compare_exchange_weak(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Notify the awaiter that the task has been closed.
if state & AWAITER != 0 {
self.notify();
}
break;
}
Err(s) => state = s,
}
}
}
/// Notifies the task blocked on the task.
///
/// If there is a registered waker, it will be removed from the header and woken.
#[inline]
pub(crate) fn notify(&self) {
if let Some(waker) = self.swap_awaiter(None) {
// We need a safeguard against panics because waking can panic.
abort_on_panic(|| {
waker.wake();
});
}
}
/// Notifies the task blocked on the task unless its waker matches `current`.
///
/// If there is a registered waker, it will be removed from the header.
#[inline]
pub(crate) fn notify_unless(&self, current: &Waker) {
if let Some(waker) = self.swap_awaiter(None) {
if !waker.will_wake(current) {
// We need a safeguard against panics because waking can panic.
abort_on_panic(|| {
waker.wake();
});
}
}
}
/// Swaps the awaiter and returns the previous value.
#[inline]
pub(crate) fn swap_awaiter(&self, new: Option<Waker>) -> Option<Waker> {
let new_is_none = new.is_none();
// We're about to try acquiring the lock in a loop. If it's already being held by another
// thread, we'll have to spin for a while so it's best to employ a backoff strategy.
let backoff = Backoff::new();
loop {
// Acquire the lock. If we're storing an awaiter, then also set the awaiter flag.
let state = if new_is_none {
self.state.fetch_or(LOCKED, Ordering::Acquire)
} else {
self.state.fetch_or(LOCKED | AWAITER, Ordering::Acquire)
};
// If the lock was acquired, break from the loop.
if state & LOCKED == 0 {
break;
}
// Snooze for a little while because the lock is held by another thread.
backoff.snooze();
}
// Replace the awaiter.
let old = self.awaiter.replace(new);
// Release the lock. If we've cleared the awaiter, then also unset the awaiter flag.
if new_is_none {
self.state.fetch_and(!LOCKED & !AWAITER, Ordering::Release);
} else {
self.state.fetch_and(!LOCKED, Ordering::Release);
}
old
}
/// Returns the offset at which the tag of type `T` is stored.
#[inline]
pub(crate) fn offset_tag<T>() -> usize {
let layout_header = Layout::new::<Header>();
let layout_t = Layout::new::<T>();
let (_, offset_t) = extend(layout_header, layout_t);
offset_t
}
}
impl fmt::Debug for Header {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.load(Ordering::SeqCst);
f.debug_struct("Header")
.field("scheduled", &(state & SCHEDULED != 0))
.field("running", &(state & RUNNING != 0))
.field("completed", &(state & COMPLETED != 0))
.field("closed", &(state & CLOSED != 0))
.field("awaiter", &(state & AWAITER != 0))
.field("handle", &(state & HANDLE != 0))
.field("ref_count", &(state / REFERENCE))
.finish()
}
}

333
src/join_handle.rs Normal file
View File

@ -0,0 +1,333 @@
use std::fmt;
use std::future::Future;
use std::marker::{PhantomData, Unpin};
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll};
use crate::header::Header;
use crate::state::*;
use crate::utils::abort_on_panic;
/// A handle that awaits the result of a task.
///
/// If the task has completed with `value`, the handle returns it as `Some(value)`. If the task was
/// cancelled or has panicked, the handle returns `None`. Otherwise, the handle has to wait until
/// the task completes, panics, or gets cancelled.
///
/// # Examples
///
/// ```
/// #![feature(async_await)]
///
/// use crossbeam::channel;
/// use futures::executor;
///
/// // The future inside the task.
/// let future = async { 1 + 2 };
///
/// // If the task gets woken, it will be sent into this channel.
/// let (s, r) = channel::unbounded();
/// let schedule = move |task| s.send(task).unwrap();
///
/// // Create a task with the future and the schedule function.
/// let (task, handle) = async_task::spawn(future, schedule, ());
///
/// // Run the task. In this example, it will complete after a single run.
/// task.run();
/// assert!(r.is_empty());
///
/// // Await the result of the task.
/// let result = executor::block_on(handle);
/// assert_eq!(result, Some(3));
/// ```
pub struct JoinHandle<R, T> {
/// A raw task pointer.
pub(crate) raw_task: NonNull<()>,
/// A marker capturing the generic type `R`.
pub(crate) _marker: PhantomData<(R, T)>,
}
unsafe impl<R, T> Send for JoinHandle<R, T> {}
unsafe impl<R, T> Sync for JoinHandle<R, T> {}
impl<R, T> Unpin for JoinHandle<R, T> {}
impl<R, T> JoinHandle<R, T> {
/// Cancels the task.
///
/// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
/// to run it won't do anything. And if it's completed, awaiting its result evaluates to
/// `None`.
///
/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// use crossbeam::channel;
/// use futures::executor;
///
/// // The future inside the task.
/// let future = async { 1 + 2 };
///
/// // If the task gets woken, it will be sent into this channel.
/// let (s, r) = channel::unbounded();
/// let schedule = move |task| s.send(task).unwrap();
///
/// // Create a task with the future and the schedule function.
/// let (task, handle) = async_task::spawn(future, schedule, ());
///
/// // Cancel the task.
/// handle.cancel();
///
/// // Running a cancelled task does nothing.
/// task.run();
///
/// // Await the result of the task.
/// let result = executor::block_on(handle);
/// assert_eq!(result, None);
/// ```
pub fn cancel(&self) {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
unsafe {
let mut state = (*header).state.load(Ordering::Acquire);
loop {
// If the task has been completed or closed, it can't be cancelled.
if state & (COMPLETED | CLOSED) != 0 {
break;
}
// If the task is not scheduled nor running, we'll need to schedule it.
let new = if state & (SCHEDULED | RUNNING) == 0 {
(state | SCHEDULED | CLOSED) + REFERENCE
} else {
state | CLOSED
};
// Mark the task as closed.
match (*header).state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// If the task is not scheduled nor running, schedule it so that its future
// gets dropped by the executor.
if state & (SCHEDULED | RUNNING) == 0 {
((*header).vtable.schedule)(ptr);
}
// Notify the awaiter that the task has been closed.
if state & AWAITER != 0 {
(*header).notify();
}
break;
}
Err(s) => state = s,
}
}
}
}
/// Returns a reference to the tag stored inside the task.
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// use crossbeam::channel;
///
/// // The future inside the task.
/// let future = async { 1 + 2 };
///
/// // If the task gets woken, it will be sent into this channel.
/// let (s, r) = channel::unbounded();
/// let schedule = move |task| s.send(task).unwrap();
///
/// // Create a task with the future and the schedule function.
/// let (task, handle) = async_task::spawn(future, schedule, "a simple task");
///
/// // Access the tag.
/// assert_eq!(*handle.tag(), "a simple task");
/// ```
pub fn tag(&self) -> &T {
let offset = Header::offset_tag::<T>();
let ptr = self.raw_task.as_ptr();
unsafe {
let raw = (ptr as *mut u8).add(offset) as *const T;
&*raw
}
}
}
impl<R, T> Drop for JoinHandle<R, T> {
fn drop(&mut self) {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
// A place where the output will be stored in case it needs to be dropped.
let mut output = None;
unsafe {
// Optimistically assume the `JoinHandle` is being dropped just after creating the
// task. This is a common case so if the handle is not used, the overhead of it is only
// one compare-exchange operation.
if let Err(mut state) = (*header).state.compare_exchange_weak(
SCHEDULED | HANDLE | REFERENCE,
SCHEDULED | REFERENCE,
Ordering::AcqRel,
Ordering::Acquire,
) {
loop {
// If the task has been completed but not yet closed, that means its output
// must be dropped.
if state & COMPLETED != 0 && state & CLOSED == 0 {
// Mark the task as closed in order to grab its output.
match (*header).state.compare_exchange_weak(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Read the output.
output =
Some((((*header).vtable.get_output)(ptr) as *mut R).read());
// Update the state variable because we're continuing the loop.
state |= CLOSED;
}
Err(s) => state = s,
}
} else {
// If this is the last reference to task and it's not closed, then close
// it and schedule one more time so that its future gets dropped by the
// executor.
let new = if state & (!(REFERENCE - 1) | CLOSED) == 0 {
SCHEDULED | CLOSED | REFERENCE
} else {
state & !HANDLE
};
// Unset the handle flag.
match (*header).state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// If this is the last reference to the task, we need to either
// schedule dropping its future or destroy it.
if state & !(REFERENCE - 1) == 0 {
if state & CLOSED == 0 {
((*header).vtable.schedule)(ptr);
} else {
((*header).vtable.destroy)(ptr);
}
}
break;
}
Err(s) => state = s,
}
}
}
}
}
// Drop the output if it was taken out of the task.
drop(output);
}
}
impl<R, T> Future for JoinHandle<R, T> {
type Output = Option<R>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
unsafe {
let mut state = (*header).state.load(Ordering::Acquire);
loop {
// If the task has been closed, notify the awaiter and return `None`.
if state & CLOSED != 0 {
// Even though the awaiter is most likely the current task, it could also be
// another task.
(*header).notify_unless(cx.waker());
return Poll::Ready(None);
}
// If the task is not completed, register the current task.
if state & COMPLETED == 0 {
// Replace the waker with one associated with the current task. We need a
// safeguard against panics because dropping the previous waker can panic.
abort_on_panic(|| {
(*header).swap_awaiter(Some(cx.waker().clone()));
});
// Reload the state after registering. It is possible that the task became
// completed or closed just before registration so we need to check for that.
state = (*header).state.load(Ordering::Acquire);
// If the task has been closed, notify the awaiter and return `None`.
if state & CLOSED != 0 {
// Even though the awaiter is most likely the current task, it could also
// be another task.
(*header).notify_unless(cx.waker());
return Poll::Ready(None);
}
// If the task is still not completed, we're blocked on it.
if state & COMPLETED == 0 {
return Poll::Pending;
}
}
// Since the task is now completed, mark it as closed in order to grab its output.
match (*header).state.compare_exchange(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Notify the awaiter. Even though the awaiter is most likely the current
// task, it could also be another task.
if state & AWAITER != 0 {
(*header).notify_unless(cx.waker());
}
// Take the output from the task.
let output = ((*header).vtable.get_output)(ptr) as *mut R;
return Poll::Ready(Some(output.read()));
}
Err(s) => state = s,
}
}
}
}
}
impl<R, T> fmt::Debug for JoinHandle<R, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
f.debug_struct("JoinHandle")
.field("header", unsafe { &(*header) })
.finish()
}
}

149
src/lib.rs Normal file
View File

@ -0,0 +1,149 @@
//! Task abstraction for building executors.
//!
//! # What is an executor?
//!
//! An async block creates a future and an async function returns one. But futures don't do
//! anything unless they are awaited inside other async blocks or async functions. So the question
//! arises: who or what awaits the main future that awaits others?
//!
//! One solution is to call [`block_on()`] on the main future, which will block
//! the current thread and keep polling the future until it completes. But sometimes we don't want
//! to block the current thread and would prefer to *spawn* the future to let a background thread
//! block on it instead.
//!
//! This is where executors step in - they create a number of threads (typically equal to the
//! number of CPU cores on the system) that are dedicated to polling spawned futures. Each executor
//! thread keeps polling spawned futures in a loop and only blocks when all spawned futures are
//! either sleeping or running.
//!
//! # What is a task?
//!
//! In order to spawn a future on an executor, one needs to allocate the future on the heap and
//! keep some state alongside it, like whether the future is ready for polling, waiting to be woken
//! up, or completed. This allocation is usually called a *task*.
//!
//! The executor then runs the spawned task by polling its future. If the future is pending on a
//! resource, a [`Waker`] associated with the task will be registered somewhere so that the task
//! can be woken up and run again at a later time.
//!
//! For example, if the future wants to read something from a TCP socket that is not ready yet, the
//! networking system will clone the task's waker and wake it up once the socket becomes ready.
//!
//! # Task construction
//!
//! A task is constructed with [`Task::create()`]:
//!
//! ```
//! # #![feature(async_await)]
//! let future = async { 1 + 2 };
//! let schedule = |task| unimplemented!();
//!
//! let (task, handle) = async_task::spawn(future, schedule, ());
//! ```
//!
//! The first argument to the constructor, `()` in this example, is an arbitrary piece of data
//! called a *tag*. This can be a task identifier, a task name, task-local storage, or something
//! of similar nature.
//!
//! The second argument is the future that gets polled when the task is run.
//!
//! The third argument is the schedule function, which is called every time when the task gets
//! woken up. This function should push the received task into some kind of queue of runnable
//! tasks.
//!
//! The constructor returns a runnable [`Task`] and a [`JoinHandle`] that can await the result of
//! the future.
//!
//! # Task scheduling
//!
//! TODO
//!
//! # Join handles
//!
//! TODO
//!
//! # Cancellation
//!
//! TODO
//!
//! # Performance
//!
//! TODO: explain single allocation, etc.
//!
//! Task [construction] incurs a single allocation only. The [`Task`] can then be run and its
//! result awaited through the [`JoinHandle`]. When woken, the task gets automatically rescheduled.
//! It's also possible to cancel the task so that it stops running and can't be awaited anymore.
//!
//! [construction]: struct.Task.html#method.create
//! [`JoinHandle`]: struct.JoinHandle.html
//! [`Task`]: struct.Task.html
//! [`Future`]: https://doc.rust-lang.org/nightly/std/future/trait.Future.html
//! [`Waker`]: https://doc.rust-lang.org/nightly/std/task/struct.Waker.html
//! [`block_on()`]: https://docs.rs/futures-preview/*/futures/executor/fn.block_on.html
//!
//! # Examples
//!
//! A simple single-threaded executor:
//!
//! ```
//! # #![feature(async_await)]
//! use std::future::Future;
//! use std::panic::catch_unwind;
//! use std::thread;
//!
//! use async_task::{JoinHandle, Task};
//! use crossbeam::channel::{unbounded, Sender};
//! use futures::executor;
//! use lazy_static::lazy_static;
//!
//! /// Spawns a future on the executor.
//! fn spawn<F, R>(future: F) -> JoinHandle<R, ()>
//! where
//! F: Future<Output = R> + Send + 'static,
//! R: Send + 'static,
//! {
//! lazy_static! {
//! // A channel that holds scheduled tasks.
//! static ref QUEUE: Sender<Task<()>> = {
//! let (sender, receiver) = unbounded::<Task<()>>();
//!
//! // Start the executor thread.
//! thread::spawn(|| {
//! for task in receiver {
//! // Ignore panics for simplicity.
//! let _ignore_panic = catch_unwind(|| task.run());
//! }
//! });
//!
//! sender
//! };
//! }
//!
//! // Create a task that is scheduled by sending itself into the channel.
//! let schedule = |t| QUEUE.send(t).unwrap();
//! let (task, handle) = async_task::spawn(future, schedule, ());
//!
//! // Schedule the task by sending it into the channel.
//! task.schedule();
//!
//! handle
//! }
//!
//! // Spawn a future and await its result.
//! let handle = spawn(async {
//! println!("Hello, world!");
//! });
//! executor::block_on(handle);
//! ```
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
mod header;
mod join_handle;
mod raw;
mod state;
mod task;
mod utils;
pub use crate::join_handle::JoinHandle;
pub use crate::task::{spawn, Task};

629
src/raw.rs Normal file
View File

@ -0,0 +1,629 @@
use std::alloc::{self, Layout};
use std::cell::Cell;
use std::future::Future;
use std::marker::PhantomData;
use std::mem::{self, ManuallyDrop};
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
use crate::header::Header;
use crate::state::*;
use crate::utils::{abort_on_panic, extend};
use crate::Task;
/// The vtable for a task.
pub(crate) struct TaskVTable {
/// The raw waker vtable.
pub(crate) raw_waker: RawWakerVTable,
/// Schedules the task.
pub(crate) schedule: unsafe fn(*const ()),
/// Drops the future inside the task.
pub(crate) drop_future: unsafe fn(*const ()),
/// Returns a pointer to the output stored after completion.
pub(crate) get_output: unsafe fn(*const ()) -> *const (),
/// Drops a waker or a task.
pub(crate) decrement: unsafe fn(ptr: *const ()),
/// Destroys the task.
pub(crate) destroy: unsafe fn(*const ()),
/// Runs the task.
pub(crate) run: unsafe fn(*const ()),
}
/// Memory layout of a task.
///
/// This struct contains the information on:
///
/// 1. How to allocate and deallocate the task.
/// 2. How to access the fields inside the task.
#[derive(Clone, Copy)]
pub(crate) struct TaskLayout {
/// Memory layout of the whole task.
pub(crate) layout: Layout,
/// Offset into the task at which the tag is stored.
pub(crate) offset_t: usize,
/// Offset into the task at which the schedule function is stored.
pub(crate) offset_s: usize,
/// Offset into the task at which the future is stored.
pub(crate) offset_f: usize,
/// Offset into the task at which the output is stored.
pub(crate) offset_r: usize,
}
/// Raw pointers to the fields of a task.
pub(crate) struct RawTask<F, R, S, T> {
/// The task header.
pub(crate) header: *const Header,
/// The schedule function.
pub(crate) schedule: *const S,
/// The tag inside the task.
pub(crate) tag: *mut T,
/// The future.
pub(crate) future: *mut F,
/// The output of the future.
pub(crate) output: *mut R,
}
impl<F, R, S, T> Copy for RawTask<F, R, S, T> {}
impl<F, R, S, T> Clone for RawTask<F, R, S, T> {
fn clone(&self) -> Self {
Self {
header: self.header,
schedule: self.schedule,
tag: self.tag,
future: self.future,
output: self.output,
}
}
}
impl<F, R, S, T> RawTask<F, R, S, T>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
S: Fn(Task<T>) + Send + Sync + 'static,
T: Send + 'static,
{
/// Allocates a task with the given `future` and `schedule` function.
///
/// It is assumed there are initially only the `Task` reference and the `JoinHandle`.
pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> {
// Compute the layout of the task for allocation. Abort if the computation fails.
let task_layout = abort_on_panic(|| Self::task_layout());
unsafe {
// Allocate enough space for the entire task.
let raw_task = match NonNull::new(alloc::alloc(task_layout.layout) as *mut ()) {
None => std::process::abort(),
Some(p) => p,
};
let raw = Self::from_ptr(raw_task.as_ptr());
// Write the header as the first field of the task.
(raw.header as *mut Header).write(Header {
state: AtomicUsize::new(SCHEDULED | HANDLE | REFERENCE),
awaiter: Cell::new(None),
vtable: &TaskVTable {
raw_waker: RawWakerVTable::new(
Self::clone_waker,
Self::wake,
Self::wake_by_ref,
Self::decrement,
),
schedule: Self::schedule,
drop_future: Self::drop_future,
get_output: Self::get_output,
decrement: Self::decrement,
destroy: Self::destroy,
run: Self::run,
},
});
// Write the tag as the second field of the task.
(raw.tag as *mut T).write(tag);
// Write the schedule function as the third field of the task.
(raw.schedule as *mut S).write(schedule);
// Write the future as the fourth field of the task.
raw.future.write(future);
raw_task
}
}
/// Creates a `RawTask` from a raw task pointer.
#[inline]
pub(crate) fn from_ptr(ptr: *const ()) -> Self {
let task_layout = Self::task_layout();
let p = ptr as *const u8;
unsafe {
Self {
header: p as *const Header,
tag: p.add(task_layout.offset_t) as *mut T,
schedule: p.add(task_layout.offset_s) as *const S,
future: p.add(task_layout.offset_f) as *mut F,
output: p.add(task_layout.offset_r) as *mut R,
}
}
}
/// Returns the memory layout for a task.
#[inline]
fn task_layout() -> TaskLayout {
// Compute the layouts for `Header`, `T`, `S`, `F`, and `R`.
let layout_header = Layout::new::<Header>();
let layout_t = Layout::new::<T>();
let layout_s = Layout::new::<S>();
let layout_f = Layout::new::<F>();
let layout_r = Layout::new::<R>();
// Compute the layout for `union { F, R }`.
let size_union = layout_f.size().max(layout_r.size());
let align_union = layout_f.align().max(layout_r.align());
let layout_union = unsafe { Layout::from_size_align_unchecked(size_union, align_union) };
// Compute the layout for `Header` followed by `T`, then `S`, then `union { F, R }`.
let layout = layout_header;
let (layout, offset_t) = extend(layout, layout_t);
let (layout, offset_s) = extend(layout, layout_s);
let (layout, offset_union) = extend(layout, layout_union);
let offset_f = offset_union;
let offset_r = offset_union;
TaskLayout {
layout,
offset_t,
offset_s,
offset_f,
offset_r,
}
}
/// Wakes a waker.
unsafe fn wake(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
let mut state = (*raw.header).state.load(Ordering::Acquire);
loop {
// If the task is completed or closed, it can't be woken.
if state & (COMPLETED | CLOSED) != 0 {
// Drop the waker.
Self::decrement(ptr);
break;
}
// If the task is already scheduled, we just need to synchronize with the thread that
// will run the task by "publishing" our current view of the memory.
if state & SCHEDULED != 0 {
// Update the state without actually modifying it.
match (*raw.header).state.compare_exchange_weak(
state,
state,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Drop the waker.
Self::decrement(ptr);
break;
}
Err(s) => state = s,
}
} else {
// Mark the task as scheduled.
match (*raw.header).state.compare_exchange_weak(
state,
state | SCHEDULED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// If the task is not yet scheduled and isn't currently running, now is the
// time to schedule it.
if state & (SCHEDULED | RUNNING) == 0 {
// Schedule the task.
let task = Task {
raw_task: NonNull::new_unchecked(ptr as *mut ()),
_marker: PhantomData,
};
(*raw.schedule)(task);
} else {
// Drop the waker.
Self::decrement(ptr);
}
break;
}
Err(s) => state = s,
}
}
}
}
/// Wakes a waker by reference.
unsafe fn wake_by_ref(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
let mut state = (*raw.header).state.load(Ordering::Acquire);
loop {
// If the task is completed or closed, it can't be woken.
if state & (COMPLETED | CLOSED) != 0 {
break;
}
// If the task is already scheduled, we just need to synchronize with the thread that
// will run the task by "publishing" our current view of the memory.
if state & SCHEDULED != 0 {
// Update the state without actually modifying it.
match (*raw.header).state.compare_exchange_weak(
state,
state,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => break,
Err(s) => state = s,
}
} else {
// If the task is not scheduled nor running, we'll need to schedule after waking.
let new = if state & (SCHEDULED | RUNNING) == 0 {
(state | SCHEDULED) + REFERENCE
} else {
state | SCHEDULED
};
// Mark the task as scheduled.
match (*raw.header).state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// If the task is not scheduled nor running, now is the time to schedule.
if state & (SCHEDULED | RUNNING) == 0 {
// If the reference count overflowed, abort.
if state > isize::max_value() as usize {
std::process::abort();
}
// Schedule the task.
let task = Task {
raw_task: NonNull::new_unchecked(ptr as *mut ()),
_marker: PhantomData,
};
(*raw.schedule)(task);
}
break;
}
Err(s) => state = s,
}
}
}
}
/// Clones a waker.
unsafe fn clone_waker(ptr: *const ()) -> RawWaker {
let raw = Self::from_ptr(ptr);
let raw_waker = &(*raw.header).vtable.raw_waker;
// Increment the reference count. With any kind of reference-counted data structure,
// relaxed ordering is fine when the reference is being cloned.
let state = (*raw.header).state.fetch_add(REFERENCE, Ordering::Relaxed);
// If the reference count overflowed, abort.
if state > isize::max_value() as usize {
std::process::abort();
}
RawWaker::new(ptr, raw_waker)
}
/// Drops a waker or a task.
///
/// This function will decrement the reference count. If it drops down to zero and the
/// associated join handle has been dropped too, then the task gets destroyed.
#[inline]
unsafe fn decrement(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
// Decrement the reference count.
let new = (*raw.header).state.fetch_sub(REFERENCE, Ordering::AcqRel) - REFERENCE;
// If this was the last reference to the task and the `JoinHandle` has been dropped as
// well, then destroy task.
if new & !(REFERENCE - 1) == 0 && new & HANDLE == 0 {
Self::destroy(ptr);
}
}
/// Schedules a task for running.
///
/// This function doesn't modify the state of the task. It only passes the task reference to
/// its schedule function.
unsafe fn schedule(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
(*raw.schedule)(Task {
raw_task: NonNull::new_unchecked(ptr as *mut ()),
_marker: PhantomData,
});
}
/// Drops the future inside a task.
#[inline]
unsafe fn drop_future(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
// We need a safeguard against panics because the destructor can panic.
abort_on_panic(|| {
raw.future.drop_in_place();
})
}
/// Returns a pointer to the output inside a task.
unsafe fn get_output(ptr: *const ()) -> *const () {
let raw = Self::from_ptr(ptr);
raw.output as *const ()
}
/// Cleans up task's resources and deallocates it.
///
/// If the task has not been closed, then its future or the output will be dropped. The
/// schedule function and the tag get dropped too.
#[inline]
unsafe fn destroy(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
let task_layout = Self::task_layout();
// We need a safeguard against panics because destructors can panic.
abort_on_panic(|| {
// Drop the schedule function.
(raw.schedule as *mut S).drop_in_place();
// Drop the tag.
(raw.tag as *mut T).drop_in_place();
});
// Finally, deallocate the memory reserved by the task.
alloc::dealloc(ptr as *mut u8, task_layout.layout);
}
/// Runs a task.
///
/// If polling its future panics, the task will be closed and the panic propagated into the
/// caller.
unsafe fn run(ptr: *const ()) {
let raw = Self::from_ptr(ptr);
// Create a context from the raw task pointer and the vtable inside the its header.
let waker = ManuallyDrop::new(Waker::from_raw(RawWaker::new(
ptr,
&(*raw.header).vtable.raw_waker,
)));
let cx = &mut Context::from_waker(&waker);
let mut state = (*raw.header).state.load(Ordering::Acquire);
// Update the task's state before polling its future.
loop {
// If the task has been closed, drop the task reference and return.
if state & CLOSED != 0 {
// Notify the awaiter that the task has been closed.
if state & AWAITER != 0 {
(*raw.header).notify();
}
// Drop the future.
Self::drop_future(ptr);
// Drop the task reference.
Self::decrement(ptr);
return;
}
// Mark the task as unscheduled and running.
match (*raw.header).state.compare_exchange_weak(
state,
(state & !SCHEDULED) | RUNNING,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Update the state because we're continuing with polling the future.
state = (state & !SCHEDULED) | RUNNING;
break;
}
Err(s) => state = s,
}
}
// Poll the inner future, but surround it with a guard that closes the task in case polling
// panics.
let guard = Guard(raw);
let poll = <F as Future>::poll(Pin::new_unchecked(&mut *raw.future), cx);
mem::forget(guard);
match poll {
Poll::Ready(out) => {
// Replace the future with its output.
Self::drop_future(ptr);
raw.output.write(out);
// A place where the output will be stored in case it needs to be dropped.
let mut output = None;
// The task is now completed.
loop {
// If the handle is dropped, we'll need to close it and drop the output.
let new = if state & HANDLE == 0 {
(state & !RUNNING & !SCHEDULED) | COMPLETED | CLOSED
} else {
(state & !RUNNING & !SCHEDULED) | COMPLETED
};
// Mark the task as not running and completed.
match (*raw.header).state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// If the handle is dropped or if the task was closed while running,
// now it's time to drop the output.
if state & HANDLE == 0 || state & CLOSED != 0 {
// Read the output.
output = Some(raw.output.read());
}
// Notify the awaiter that the task has been completed.
if state & AWAITER != 0 {
(*raw.header).notify();
}
// Drop the task reference.
Self::decrement(ptr);
break;
}
Err(s) => state = s,
}
}
// Drop the output if it was taken out of the task.
drop(output);
}
Poll::Pending => {
// The task is still not completed.
loop {
// If the task was closed while running, we'll need to unschedule in case it
// was woken and then clean up its resources.
let new = if state & CLOSED != 0 {
state & !RUNNING & !SCHEDULED
} else {
state & !RUNNING
};
// Mark the task as not running.
match (*raw.header).state.compare_exchange_weak(
state,
new,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(state) => {
// If the task was closed while running, we need to drop its future.
// If the task was woken while running, we need to schedule it.
// Otherwise, we just drop the task reference.
if state & CLOSED != 0 {
// The thread that closed the task didn't drop the future because
// it was running so now it's our responsibility to do so.
Self::drop_future(ptr);
// Drop the task reference.
Self::decrement(ptr);
} else if state & SCHEDULED != 0 {
// The thread that has woken the task didn't reschedule it because
// it was running so now it's our responsibility to do so.
Self::schedule(ptr);
} else {
// Drop the task reference.
Self::decrement(ptr);
}
break;
}
Err(s) => state = s,
}
}
}
}
/// A guard that closes the task if polling its future panics.
struct Guard<F, R, S, T>(RawTask<F, R, S, T>)
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
S: Fn(Task<T>) + Send + Sync + 'static,
T: Send + 'static;
impl<F, R, S, T> Drop for Guard<F, R, S, T>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
S: Fn(Task<T>) + Send + Sync + 'static,
T: Send + 'static,
{
fn drop(&mut self) {
let raw = self.0;
let ptr = raw.header as *const ();
unsafe {
let mut state = (*raw.header).state.load(Ordering::Acquire);
loop {
// If the task was closed while running, then unschedule it, drop its
// future, and drop the task reference.
if state & CLOSED != 0 {
// We still need to unschedule the task because it is possible it was
// woken while running.
(*raw.header).state.fetch_and(!SCHEDULED, Ordering::AcqRel);
// The thread that closed the task didn't drop the future because it
// was running so now it's our responsibility to do so.
RawTask::<F, R, S, T>::drop_future(ptr);
// Drop the task reference.
RawTask::<F, R, S, T>::decrement(ptr);
break;
}
// Mark the task as not running, not scheduled, and closed.
match (*raw.header).state.compare_exchange_weak(
state,
(state & !RUNNING & !SCHEDULED) | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(state) => {
// Drop the future because the task is now closed.
RawTask::<F, R, S, T>::drop_future(ptr);
// Notify the awaiter that the task has been closed.
if state & AWAITER != 0 {
(*raw.header).notify();
}
// Drop the task reference.
RawTask::<F, R, S, T>::decrement(ptr);
break;
}
Err(s) => state = s,
}
}
}
}
}
}
}

65
src/state.rs Normal file
View File

@ -0,0 +1,65 @@
/// Set if the task is scheduled for running.
///
/// A task is considered to be scheduled whenever its `Task` reference exists. It is in scheduled
/// state at the moment of creation and when it gets unapused either by its `JoinHandle` or woken
/// by a `Waker`.
///
/// This flag can't be set when the task is completed. However, it can be set while the task is
/// running, in which case it will be rescheduled as soon as polling finishes.
pub(crate) const SCHEDULED: usize = 1 << 0;
/// Set if the task is running.
///
/// A task is running state while its future is being polled.
///
/// This flag can't be set when the task is completed. However, it can be in scheduled state while
/// it is running, in which case it will be rescheduled when it stops being polled.
pub(crate) const RUNNING: usize = 1 << 1;
/// Set if the task has been completed.
///
/// This flag is set when polling returns `Poll::Ready`. The output of the future is then stored
/// inside the task until it becomes stopped. In fact, `JoinHandle` picks the output up by marking
/// the task as stopped.
///
/// This flag can't be set when the task is scheduled or completed.
pub(crate) const COMPLETED: usize = 1 << 2;
/// Set if the task is closed.
///
/// If a task is closed, that means its either cancelled or its output has been consumed by the
/// `JoinHandle`. A task becomes closed when:
///
/// 1. It gets cancelled by `Task::cancel()` or `JoinHandle::cancel()`.
/// 2. Its output is awaited by the `JoinHandle`.
/// 3. It panics while polling the future.
/// 4. It is completed and the `JoinHandle` is dropped.
pub(crate) const CLOSED: usize = 1 << 3;
/// Set if the `JoinHandle` still exists.
///
/// The `JoinHandle` is a special case in that it is only tracked by this flag, while all other
/// task references (`Task` and `Waker`s) are tracked by the reference count.
pub(crate) const HANDLE: usize = 1 << 4;
/// Set if the `JoinHandle` is awaiting the output.
///
/// This flag is set while there is a registered awaiter of type `Waker` inside the task. When the
/// task gets closed or completed, we need to wake the awaiter. This flag can be used as a fast
/// check that tells us if we need to wake anyone without acquiring the lock inside the task.
pub(crate) const AWAITER: usize = 1 << 5;
/// Set if the awaiter is locked.
///
/// This lock is acquired before a new awaiter is registered or the existing one is woken.
pub(crate) const LOCKED: usize = 1 << 6;
/// A single reference.
///
/// The lower bits in the state contain various flags representing the task state, while the upper
/// bits contain the reference count. The value of `REFERENCE` represents a single reference in the
/// total reference count.
///
/// Note that the reference counter only tracks the `Task` and `Waker`s. The `JoinHandle` is
/// tracked separately by the `HANDLE` flag.
pub(crate) const REFERENCE: usize = 1 << 7;

390
src/task.rs Normal file
View File

@ -0,0 +1,390 @@
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::ptr::NonNull;
use crate::header::Header;
use crate::raw::RawTask;
use crate::JoinHandle;
/// Creates a new task.
///
/// This constructor returns a `Task` reference that runs the future and a [`JoinHandle`] that
/// awaits its result.
///
/// The `tag` is stored inside the allocated task.
///
/// When run, the task polls `future`. When woken, it gets scheduled for running by the
/// `schedule` function.
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// use crossbeam::channel;
///
/// // The future inside the task.
/// let future = async {
/// println!("Hello, world!");
/// };
///
/// // If the task gets woken, it will be sent into this channel.
/// let (s, r) = channel::unbounded();
/// let schedule = move |task| s.send(task).unwrap();
///
/// // Create a task with the future and the schedule function.
/// let (task, handle) = async_task::spawn(future, schedule, ());
/// ```
///
/// [`JoinHandle`]: struct.JoinHandle.html
pub fn spawn<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
S: Fn(Task<T>) + Send + Sync + 'static,
T: Send + Sync + 'static,
{
let raw_task = RawTask::<F, R, S, T>::allocate(tag, future, schedule);
let task = Task {
raw_task,
_marker: PhantomData,
};
let handle = JoinHandle {
raw_task,
_marker: PhantomData,
};
(task, handle)
}
/// A task that runs a future.
///
/// # Construction
///
/// A task is a heap-allocated structure containing:
///
/// * A reference counter.
/// * The state of the task.
/// * Arbitrary piece of data called a *tag*.
/// * A function that schedules the task when woken.
/// * A future or its result if polling has completed.
///
/// Constructor [`Task::create()`] returns a [`Task`] and a [`JoinHandle`]. Those two references
/// are like two sides of the task: one runs the future and the other awaits its result.
///
/// # Behavior
///
/// The [`Task`] reference "owns" the task itself and is used to [run] it. Running consumes the
/// [`Task`] reference and polls its internal future. If the future is still pending after being
/// polled, the [`Task`] reference will be recreated when woken by a [`Waker`]. If the future
/// completes, its result becomes available to the [`JoinHandle`].
///
/// The [`JoinHandle`] is a [`Future`] that awaits the result of the task.
///
/// When the task is woken, its [`Task`] reference is recreated and passed to the schedule function
/// provided during construction. In most executors, scheduling simply pushes the [`Task`] into a
/// queue of runnable tasks.
///
/// If the [`Task`] reference is dropped without being run, the task is cancelled.
///
/// Both [`Task`] and [`JoinHandle`] have methods that cancel the task. When cancelled, the task
/// won't be scheduled again even if a [`Waker`] wakes it or the [`JoinHandle`] is polled. An
/// attempt to run a cancelled task won't do anything. And if the cancelled task has already
/// completed, awaiting its result through [`JoinHandle`] will return `None`.
///
/// If polling the task's future panics, it gets cancelled automatically.
///
/// # Task states
///
/// A task can be in the following states:
///
/// * Sleeping: The [`Task`] reference doesn't exist and is waiting to be scheduled by a [`Waker`].
/// * Scheduled: The [`Task`] reference exists and is waiting to be [run].
/// * Completed: The [`Task`] reference doesn't exist anymore and can't be rescheduled, but its
/// result is available to the [`JoinHandle`].
/// * Cancelled: The [`Task`] reference may or may not exist, but running it does nothing and
/// awaiting the [`JoinHandle`] returns `None`.
///
/// When constructed, the task is initially in the scheduled state.
///
/// # Destruction
///
/// The future inside the task gets dropped in the following cases:
///
/// * When [`Task`] is dropped.
/// * When [`Task`] is run to completion.
///
/// If the future hasn't been dropped and the last [`Waker`] or [`JoinHandle`] is dropped, or if
/// a [`JoinHandle`] cancels the task, then the task will be scheduled one last time so that its
/// future gets dropped by the executor. In other words, the task's future can be dropped only by
/// [`Task`].
///
/// When the task completes, the result of its future is stored inside the allocation. This result
/// is taken out when the [`JoinHandle`] awaits it. When the task is cancelled or the
/// [`JoinHandle`] is dropped without being awaited, the result gets dropped too.
///
/// The task gets deallocated when all references to it are dropped, which includes the [`Task`],
/// the [`JoinHandle`], and any associated [`Waker`]s.
///
/// The tag inside the task and the schedule function get dropped at the time of deallocation.
///
/// # Panics
///
/// If polling the inner future inside [`run()`] panics, the panic will be propagated into
/// the caller. Likewise, a panic inside the task result's destructor will be propagated. All other
/// panics result in the process being aborted.
///
/// More precisely, the process is aborted if a panic occurs:
///
/// * Inside the schedule function.
/// * While dropping the tag.
/// * While dropping the future.
/// * While dropping the schedule function.
/// * While waking the task awaiting the [`JoinHandle`].
///
/// [`run()`]: struct.Task.html#method.run
/// [run]: struct.Task.html#method.run
/// [`JoinHandle`]: struct.JoinHandle.html
/// [`Task`]: struct.Task.html
/// [`Task::create()`]: struct.Task.html#method.create
/// [`Future`]: https://doc.rust-lang.org/std/future/trait.Future.html
/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// use async_task::Task;
/// use crossbeam::channel;
/// use futures::executor;
///
/// // The future inside the task.
/// let future = async {
/// println!("Hello, world!");
/// };
///
/// // If the task gets woken, it will be sent into this channel.
/// let (s, r) = channel::unbounded();
/// let schedule = move |task| s.send(task).unwrap();
///
/// // Create a task with the future and the schedule function.
/// let (task, handle) = async_task::spawn(future, schedule, ());
///
/// // Run the task. In this example, it will complete after a single run.
/// task.run();
/// assert!(r.is_empty());
///
/// // Await its result.
/// executor::block_on(handle);
/// ```
pub struct Task<T> {
/// A pointer to the heap-allocated task.
pub(crate) raw_task: NonNull<()>,
/// A marker capturing the generic type `T`.
pub(crate) _marker: PhantomData<T>,
}
unsafe impl<T> Send for Task<T> {}
unsafe impl<T> Sync for Task<T> {}
impl<T> Task<T> {
/// Schedules the task.
///
/// This is a convenience method that simply reschedules the task by passing it to its schedule
/// function.
///
/// If the task is cancelled, this method won't do anything.
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// use crossbeam::channel;
///
/// // The future inside the task.
/// let future = async {
/// println!("Hello, world!");
/// };
///
/// // If the task gets woken, it will be sent into this channel.
/// let (s, r) = channel::unbounded();
/// let schedule = move |task| s.send(task).unwrap();
///
/// // Create a task with the future and the schedule function.
/// let (task, handle) = async_task::spawn(future, schedule, ());
///
/// // Send the task into the channel.
/// task.schedule();
///
/// // Retrieve the task back from the channel.
/// let task = r.recv().unwrap();
/// ```
pub fn schedule(self) {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
mem::forget(self);
unsafe {
((*header).vtable.schedule)(ptr);
}
}
/// Runs the task.
///
/// This method polls the task's future. If the future completes, its result will become
/// available to the [`JoinHandle`]. And if the future is still pending, the task will have to
/// be woken in order to be rescheduled and then run again.
///
/// If the task is cancelled, running it won't do anything.
///
/// # Panics
///
/// It is possible that polling the future panics, in which case the panic will be propagated
/// into the caller. It is advised that invocations of this method are wrapped inside
/// [`catch_unwind`].
///
/// If a panic occurs, the task is automatically cancelled.
///
/// [`catch_unwind`]: https://doc.rust-lang.org/std/panic/fn.catch_unwind.html
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// use crossbeam::channel;
/// use futures::executor;
///
/// // The future inside the task.
/// let future = async { 1 + 2 };
///
/// // If the task gets woken, it will be sent into this channel.
/// let (s, r) = channel::unbounded();
/// let schedule = move |task| s.send(task).unwrap();
///
/// // Create a task with the future and the schedule function.
/// let (task, handle) = async_task::spawn(future, schedule, ());
///
/// // Run the task. In this example, it will complete after a single run.
/// task.run();
/// assert!(r.is_empty());
///
/// // Await the result of the task.
/// let result = executor::block_on(handle);
/// assert_eq!(result, Some(3));
/// ```
pub fn run(self) {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
mem::forget(self);
unsafe {
((*header).vtable.run)(ptr);
}
}
/// Cancels the task.
///
/// When cancelled, the task won't be scheduled again even if a [`Waker`] wakes it. An attempt
/// to run it won't do anything. And if it's completed, awaiting its result evaluates to
/// `None`.
///
/// [`Waker`]: https://doc.rust-lang.org/std/task/struct.Waker.html
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// use crossbeam::channel;
/// use futures::executor;
///
/// // The future inside the task.
/// let future = async { 1 + 2 };
///
/// // If the task gets woken, it will be sent into this channel.
/// let (s, r) = channel::unbounded();
/// let schedule = move |task| s.send(task).unwrap();
///
/// // Create a task with the future and the schedule function.
/// let (task, handle) = async_task::spawn(future, schedule, ());
///
/// // Cancel the task.
/// task.cancel();
///
/// // Running a cancelled task does nothing.
/// task.run();
///
/// // Await the result of the task.
/// let result = executor::block_on(handle);
/// assert_eq!(result, None);
/// ```
pub fn cancel(&self) {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
unsafe {
(*header).cancel();
}
}
/// Returns a reference to the tag stored inside the task.
///
/// # Examples
///
/// ```
/// # #![feature(async_await)]
/// use crossbeam::channel;
///
/// // The future inside the task.
/// let future = async { 1 + 2 };
///
/// // If the task gets woken, it will be sent into this channel.
/// let (s, r) = channel::unbounded();
/// let schedule = move |task| s.send(task).unwrap();
///
/// // Create a task with the future and the schedule function.
/// let (task, handle) = async_task::spawn(future, schedule, "a simple task");
///
/// // Access the tag.
/// assert_eq!(*task.tag(), "a simple task");
/// ```
pub fn tag(&self) -> &T {
let offset = Header::offset_tag::<T>();
let ptr = self.raw_task.as_ptr();
unsafe {
let raw = (ptr as *mut u8).add(offset) as *const T;
&*raw
}
}
}
impl<T> Drop for Task<T> {
fn drop(&mut self) {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
unsafe {
// Cancel the task.
(*header).cancel();
// Drop the future.
((*header).vtable.drop_future)(ptr);
// Drop the task reference.
((*header).vtable.decrement)(ptr);
}
}
}
impl<T: fmt::Debug> fmt::Debug for Task<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let ptr = self.raw_task.as_ptr();
let header = ptr as *const Header;
f.debug_struct("Task")
.field("header", unsafe { &(*header) })
.field("tag", self.tag())
.finish()
}
}

48
src/utils.rs Normal file
View File

@ -0,0 +1,48 @@
use std::alloc::Layout;
use std::mem;
/// Calls a function and aborts if it panics.
///
/// This is useful in unsafe code where we can't recover from panics.
#[inline]
pub(crate) fn abort_on_panic<T>(f: impl FnOnce() -> T) -> T {
struct Bomb;
impl Drop for Bomb {
fn drop(&mut self) {
std::process::abort();
}
}
let bomb = Bomb;
let t = f();
mem::forget(bomb);
t
}
/// Returns the layout for `a` followed by `b` and the offset of `b`.
///
/// This function was adapted from the currently unstable `Layout::extend()`:
/// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.extend
#[inline]
pub(crate) fn extend(a: Layout, b: Layout) -> (Layout, usize) {
let new_align = a.align().max(b.align());
let pad = padding_needed_for(a, b.align());
let offset = a.size().checked_add(pad).unwrap();
let new_size = offset.checked_add(b.size()).unwrap();
let layout = Layout::from_size_align(new_size, new_align).unwrap();
(layout, offset)
}
/// Returns the padding after `layout` that aligns the following address to `align`.
///
/// This function was adapted from the currently unstable `Layout::padding_needed_for()`:
/// https://doc.rust-lang.org/nightly/std/alloc/struct.Layout.html#method.padding_needed_for
#[inline]
pub(crate) fn padding_needed_for(layout: Layout, align: usize) -> usize {
let len = layout.size();
let len_rounded_up = len.wrapping_add(align).wrapping_sub(1) & !align.wrapping_sub(1);
len_rounded_up.wrapping_sub(len)
}

314
tests/basic.rs Normal file
View File

@ -0,0 +1,314 @@
#![feature(async_await)]
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use async_task::Task;
use crossbeam::atomic::AtomicCell;
use crossbeam::channel;
use futures::future;
use lazy_static::lazy_static;
// Creates a future with event counters.
//
// Usage: `future!(f, POLL, DROP)`
//
// The future `f` always returns `Poll::Ready`.
// When it gets polled, `POLL` is incremented.
// When it gets dropped, `DROP` is incremented.
macro_rules! future {
($name:pat, $poll:ident, $drop:ident) => {
lazy_static! {
static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let $name = {
struct Fut(Box<i32>);
impl Future for Fut {
type Output = Box<i32>;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
$poll.fetch_add(1);
Poll::Ready(Box::new(0))
}
}
impl Drop for Fut {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
Fut(Box::new(0))
};
};
}
// Creates a schedule function with event counters.
//
// Usage: `schedule!(s, SCHED, DROP)`
//
// The schedule function `s` does nothing.
// When it gets invoked, `SCHED` is incremented.
// When it gets dropped, `DROP` is incremented.
macro_rules! schedule {
($name:pat, $sched:ident, $drop:ident) => {
lazy_static! {
static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let $name = {
struct Guard(Box<i32>);
impl Drop for Guard {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
let guard = Guard(Box::new(0));
move |_task| {
&guard;
$sched.fetch_add(1);
}
};
};
}
// Creates a task with event counters.
//
// Usage: `task!(task, handle f, s, DROP)`
//
// A task with future `f` and schedule function `s` is created.
// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
// When the tag inside the task gets dropped, `DROP` is incremented.
macro_rules! task {
($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
lazy_static! {
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let ($task, $handle) = {
struct Tag(Box<i32>);
impl Drop for Tag {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
async_task::spawn($future, $schedule, Tag(Box::new(0)))
};
};
}
#[test]
fn cancel_and_drop_handle() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
task.cancel();
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
drop(handle);
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
drop(task);
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
}
#[test]
fn run_and_drop_handle() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
drop(handle);
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
}
#[test]
fn drop_handle_and_run() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
drop(handle);
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
}
#[test]
fn cancel_and_run() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
handle.cancel();
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
drop(handle);
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
task.run();
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
}
#[test]
fn run_and_cancel() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
handle.cancel();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
drop(handle);
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
}
#[test]
fn schedule() {
let (s, r) = channel::unbounded();
let schedule = move |t| s.send(t).unwrap();
let (task, _handle) = async_task::spawn(
future::poll_fn(|_| Poll::<()>::Pending),
schedule,
Box::new(0),
);
assert!(r.is_empty());
task.schedule();
let task = r.recv().unwrap();
assert!(r.is_empty());
task.schedule();
let task = r.recv().unwrap();
assert!(r.is_empty());
task.schedule();
r.recv().unwrap();
}
#[test]
fn tag() {
let (s, r) = channel::unbounded();
let schedule = move |t| s.send(t).unwrap();
let (task, handle) = async_task::spawn(
future::poll_fn(|_| Poll::<()>::Pending),
schedule,
AtomicUsize::new(7),
);
assert!(r.is_empty());
task.schedule();
let task = r.recv().unwrap();
assert!(r.is_empty());
handle.tag().fetch_add(1, Ordering::SeqCst);
task.schedule();
let task = r.recv().unwrap();
assert_eq!(task.tag().load(Ordering::SeqCst), 8);
assert!(r.is_empty());
task.schedule();
r.recv().unwrap();
}
#[test]
fn schedule_counter() {
let (s, r) = channel::unbounded();
let schedule = move |t: Task<AtomicUsize>| {
t.tag().fetch_add(1, Ordering::SeqCst);
s.send(t).unwrap();
};
let (task, handle) = async_task::spawn(
future::poll_fn(|_| Poll::<()>::Pending),
schedule,
AtomicUsize::new(0),
);
task.schedule();
assert_eq!(handle.tag().load(Ordering::SeqCst), 1);
r.recv().unwrap().schedule();
assert_eq!(handle.tag().load(Ordering::SeqCst), 2);
r.recv().unwrap().schedule();
assert_eq!(handle.tag().load(Ordering::SeqCst), 3);
r.recv().unwrap();
}

454
tests/join.rs Normal file
View File

@ -0,0 +1,454 @@
#![feature(async_await)]
use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
use async_task::Task;
use crossbeam::atomic::AtomicCell;
use futures::executor::block_on;
use futures::future;
use lazy_static::lazy_static;
// Creates a future with event counters.
//
// Usage: `future!(f, POLL, DROP_F, DROP_O)`
//
// The future `f` outputs `Poll::Ready`.
// When it gets polled, `POLL` is incremented.
// When it gets dropped, `DROP_F` is incremented.
// When the output gets dropped, `DROP_O` is incremented.
macro_rules! future {
($name:pat, $poll:ident, $drop_f:ident, $drop_o:ident) => {
lazy_static! {
static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop_f: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop_o: AtomicCell<usize> = AtomicCell::new(0);
}
let $name = {
struct Fut(Box<i32>);
impl Future for Fut {
type Output = Out;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
$poll.fetch_add(1);
Poll::Ready(Out(Box::new(0)))
}
}
impl Drop for Fut {
fn drop(&mut self) {
$drop_f.fetch_add(1);
}
}
struct Out(Box<i32>);
impl Drop for Out {
fn drop(&mut self) {
$drop_o.fetch_add(1);
}
}
Fut(Box::new(0))
};
};
}
// Creates a schedule function with event counters.
//
// Usage: `schedule!(s, SCHED, DROP)`
//
// The schedule function `s` does nothing.
// When it gets invoked, `SCHED` is incremented.
// When it gets dropped, `DROP` is incremented.
macro_rules! schedule {
($name:pat, $sched:ident, $drop:ident) => {
lazy_static! {
static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let $name = {
struct Guard(Box<i32>);
impl Drop for Guard {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
let guard = Guard(Box::new(0));
move |task: Task<_>| {
&guard;
task.schedule();
$sched.fetch_add(1);
}
};
};
}
// Creates a task with event counters.
//
// Usage: `task!(task, handle f, s, DROP)`
//
// A task with future `f` and schedule function `s` is created.
// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
// When the tag inside the task gets dropped, `DROP` is incremented.
macro_rules! task {
($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
lazy_static! {
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let ($task, $handle) = {
struct Tag(Box<i32>);
impl Drop for Tag {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
async_task::spawn($future, $schedule, Tag(Box::new(0)))
};
};
}
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
}
#[test]
fn cancel_and_join() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
assert_eq!(DROP_O.load(), 0);
task.cancel();
drop(task);
assert_eq!(DROP_O.load(), 0);
assert!(block_on(handle).is_none());
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(DROP_O.load(), 0);
}
#[test]
fn run_and_join() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
assert_eq!(DROP_O.load(), 0);
task.run();
assert_eq!(DROP_O.load(), 0);
assert!(block_on(handle).is_some());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(DROP_O.load(), 1);
}
#[test]
fn drop_handle_and_run() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
assert_eq!(DROP_O.load(), 0);
drop(handle);
assert_eq!(DROP_O.load(), 0);
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(DROP_O.load(), 1);
}
#[test]
fn join_twice() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, mut handle, f, s, DROP_D);
assert_eq!(DROP_O.load(), 0);
task.run();
assert_eq!(DROP_O.load(), 0);
assert!(block_on(&mut handle).is_some());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 1);
assert!(block_on(&mut handle).is_none());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 1);
drop(handle);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
}
#[test]
fn join_and_cancel() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
thread::sleep(ms(100));
task.cancel();
drop(task);
thread::sleep(ms(200));
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_O.load(), 0);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
});
assert!(block_on(handle).is_none());
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
thread::sleep(ms(100));
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_O.load(), 0);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
})
.unwrap();
}
#[test]
fn join_and_run() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
thread::sleep(ms(200));
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
thread::sleep(ms(100));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
});
assert!(block_on(handle).is_some());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_O.load(), 1);
thread::sleep(ms(100));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
})
.unwrap();
}
#[test]
fn try_join_and_run_and_join() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, mut handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
thread::sleep(ms(200));
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
thread::sleep(ms(100));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
});
block_on(future::select(&mut handle, future::ready(())));
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 0);
assert!(block_on(handle).is_some());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_O.load(), 1);
thread::sleep(ms(100));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
})
.unwrap();
}
#[test]
fn try_join_and_cancel_and_run() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, mut handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
thread::sleep(ms(200));
task.run();
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
});
block_on(future::select(&mut handle, future::ready(())));
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 0);
handle.cancel();
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 0);
drop(handle);
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 0);
})
.unwrap();
}
#[test]
fn try_join_and_run_and_cancel() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, mut handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
thread::sleep(ms(200));
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
});
block_on(future::select(&mut handle, future::ready(())));
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 0);
thread::sleep(ms(400));
handle.cancel();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 0);
drop(handle);
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(DROP_O.load(), 1);
})
.unwrap();
}
#[test]
fn await_output() {
struct Fut<T>(Cell<Option<T>>);
impl<T> Fut<T> {
fn new(t: T) -> Fut<T> {
Fut(Cell::new(Some(t)))
}
}
impl<T> Future for Fut<T> {
type Output = T;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(self.0.take().unwrap())
}
}
for i in 0..10 {
let (task, handle) = async_task::spawn(Fut::new(i), drop, Box::new(0));
task.run();
assert_eq!(block_on(handle), Some(i));
}
for i in 0..10 {
let (task, handle) = async_task::spawn(Fut::new(vec![7; i]), drop, Box::new(0));
task.run();
assert_eq!(block_on(handle), Some(vec![7; i]));
}
let (task, handle) = async_task::spawn(Fut::new("foo".to_string()), drop, Box::new(0));
task.run();
assert_eq!(block_on(handle), Some("foo".to_string()));
}

288
tests/panic.rs Normal file
View File

@ -0,0 +1,288 @@
#![feature(async_await)]
use std::future::Future;
use std::panic::catch_unwind;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
use async_task::Task;
use crossbeam::atomic::AtomicCell;
use futures::executor::block_on;
use futures::future;
use lazy_static::lazy_static;
// Creates a future with event counters.
//
// Usage: `future!(f, POLL, DROP)`
//
// The future `f` sleeps for 200 ms and then panics.
// When it gets polled, `POLL` is incremented.
// When it gets dropped, `DROP` is incremented.
macro_rules! future {
($name:pat, $poll:ident, $drop:ident) => {
lazy_static! {
static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let $name = {
struct Fut(Box<i32>);
impl Future for Fut {
type Output = ();
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
$poll.fetch_add(1);
thread::sleep(ms(200));
panic!()
}
}
impl Drop for Fut {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
Fut(Box::new(0))
};
};
}
// Creates a schedule function with event counters.
//
// Usage: `schedule!(s, SCHED, DROP)`
//
// The schedule function `s` does nothing.
// When it gets invoked, `SCHED` is incremented.
// When it gets dropped, `DROP` is incremented.
macro_rules! schedule {
($name:pat, $sched:ident, $drop:ident) => {
lazy_static! {
static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let $name = {
struct Guard(Box<i32>);
impl Drop for Guard {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
let guard = Guard(Box::new(0));
move |_task: Task<_>| {
&guard;
$sched.fetch_add(1);
}
};
};
}
// Creates a task with event counters.
//
// Usage: `task!(task, handle f, s, DROP)`
//
// A task with future `f` and schedule function `s` is created.
// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
// When the tag inside the task gets dropped, `DROP` is incremented.
macro_rules! task {
($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
lazy_static! {
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let ($task, $handle) = {
struct Tag(Box<i32>);
impl Drop for Tag {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
async_task::spawn($future, $schedule, Tag(Box::new(0)))
};
};
}
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
}
#[test]
fn cancel_during_run() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
assert!(catch_unwind(|| task.run()).is_err());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
});
thread::sleep(ms(100));
handle.cancel();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
drop(handle);
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
})
.unwrap();
}
#[test]
fn run_and_join() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
assert!(catch_unwind(|| task.run()).is_err());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert!(block_on(handle).is_none());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
}
#[test]
fn try_join_and_run_and_join() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, mut handle, f, s, DROP_D);
block_on(future::select(&mut handle, future::ready(())));
assert_eq!(POLL.load(), 0);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert!(catch_unwind(|| task.run()).is_err());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert!(block_on(handle).is_none());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
}
#[test]
fn join_during_run() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
assert!(catch_unwind(|| task.run()).is_err());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
thread::sleep(ms(100));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
});
thread::sleep(ms(100));
assert!(block_on(handle).is_none());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
thread::sleep(ms(100));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
})
.unwrap();
}
#[test]
fn try_join_during_run() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, mut handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
assert!(catch_unwind(|| task.run()).is_err());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
});
thread::sleep(ms(100));
block_on(future::select(&mut handle, future::ready(())));
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
drop(handle);
})
.unwrap();
}
#[test]
fn drop_handle_during_run() {
future!(f, POLL, DROP_F);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
assert!(catch_unwind(|| task.run()).is_err());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
});
thread::sleep(ms(100));
drop(handle);
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
})
.unwrap();
}

265
tests/ready.rs Normal file
View File

@ -0,0 +1,265 @@
#![feature(async_await)]
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
use async_task::Task;
use crossbeam::atomic::AtomicCell;
use futures::executor::block_on;
use futures::future;
use lazy_static::lazy_static;
// Creates a future with event counters.
//
// Usage: `future!(f, POLL, DROP_F, DROP_O)`
//
// The future `f` sleeps for 200 ms and outputs `Poll::Ready`.
// When it gets polled, `POLL` is incremented.
// When it gets dropped, `DROP_F` is incremented.
// When the output gets dropped, `DROP_O` is incremented.
macro_rules! future {
($name:pat, $poll:ident, $drop_f:ident, $drop_o:ident) => {
lazy_static! {
static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop_f: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop_o: AtomicCell<usize> = AtomicCell::new(0);
}
let $name = {
struct Fut(Box<i32>);
impl Future for Fut {
type Output = Out;
fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
$poll.fetch_add(1);
thread::sleep(ms(200));
Poll::Ready(Out(Box::new(0)))
}
}
impl Drop for Fut {
fn drop(&mut self) {
$drop_f.fetch_add(1);
}
}
struct Out(Box<i32>);
impl Drop for Out {
fn drop(&mut self) {
$drop_o.fetch_add(1);
}
}
Fut(Box::new(0))
};
};
}
// Creates a schedule function with event counters.
//
// Usage: `schedule!(s, SCHED, DROP)`
//
// The schedule function `s` does nothing.
// When it gets invoked, `SCHED` is incremented.
// When it gets dropped, `DROP` is incremented.
macro_rules! schedule {
($name:pat, $sched:ident, $drop:ident) => {
lazy_static! {
static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let $name = {
struct Guard(Box<i32>);
impl Drop for Guard {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
let guard = Guard(Box::new(0));
move |_task: Task<_>| {
&guard;
$sched.fetch_add(1);
}
};
};
}
// Creates a task with event counters.
//
// Usage: `task!(task, handle f, s, DROP)`
//
// A task with future `f` and schedule function `s` is created.
// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
// When the tag inside the task gets dropped, `DROP` is incremented.
macro_rules! task {
($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
lazy_static! {
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let ($task, $handle) = {
struct Tag(Box<i32>);
impl Drop for Tag {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
async_task::spawn($future, $schedule, Tag(Box::new(0)))
};
};
}
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
}
#[test]
fn cancel_during_run() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 1);
});
thread::sleep(ms(100));
handle.cancel();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 0);
thread::sleep(ms(200));
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 1);
drop(handle);
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(DROP_O.load(), 1);
})
.unwrap();
}
#[test]
fn join_during_run() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
thread::sleep(ms(100));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
});
thread::sleep(ms(100));
assert!(block_on(handle).is_some());
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_O.load(), 1);
thread::sleep(ms(100));
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
})
.unwrap();
}
#[test]
fn try_join_during_run() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, mut handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(DROP_O.load(), 1);
});
thread::sleep(ms(100));
block_on(future::select(&mut handle, future::ready(())));
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 0);
drop(handle);
})
.unwrap();
}
#[test]
fn drop_handle_during_run() {
future!(f, POLL, DROP_F, DROP_O);
schedule!(s, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
crossbeam::scope(|scope| {
scope.spawn(|_| {
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(DROP_O.load(), 1);
});
thread::sleep(ms(100));
drop(handle);
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(DROP_O.load(), 0);
})
.unwrap();
}

357
tests/waker_panic.rs Normal file
View File

@ -0,0 +1,357 @@
#![feature(async_await)]
use std::cell::Cell;
use std::future::Future;
use std::panic::catch_unwind;
use std::pin::Pin;
use std::task::Waker;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
use async_task::Task;
use crossbeam::atomic::AtomicCell;
use crossbeam::channel;
use lazy_static::lazy_static;
// Creates a future with event counters.
//
// Usage: `future!(f, waker, POLL, DROP)`
//
// The future `f` always sleeps for 200 ms, and panics the second time it is polled.
// When it gets polled, `POLL` is incremented.
// When it gets dropped, `DROP` is incremented.
//
// Every time the future is run, it stores the waker into a global variable.
// This waker can be extracted using the `waker` function.
macro_rules! future {
($name:pat, $waker:pat, $poll:ident, $drop:ident) => {
lazy_static! {
static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
static ref WAKER: AtomicCell<Option<Waker>> = AtomicCell::new(None);
}
let ($name, $waker) = {
struct Fut(Cell<bool>, Box<i32>);
impl Future for Fut {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
WAKER.store(Some(cx.waker().clone()));
$poll.fetch_add(1);
thread::sleep(ms(200));
if self.0.get() {
panic!()
} else {
self.0.set(true);
Poll::Pending
}
}
}
impl Drop for Fut {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
(Fut(Cell::new(false), Box::new(0)), || {
WAKER.swap(None).unwrap()
})
};
};
}
// Creates a schedule function with event counters.
//
// Usage: `schedule!(s, chan, SCHED, DROP)`
//
// The schedule function `s` pushes the task into `chan`.
// When it gets invoked, `SCHED` is incremented.
// When it gets dropped, `DROP` is incremented.
//
// Receiver `chan` extracts the task when it is scheduled.
macro_rules! schedule {
($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
lazy_static! {
static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let ($name, $chan) = {
let (s, r) = channel::unbounded();
struct Guard(Box<i32>);
impl Drop for Guard {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
let guard = Guard(Box::new(0));
let sched = move |task: Task<_>| {
&guard;
$sched.fetch_add(1);
s.send(task).unwrap();
};
(sched, r)
};
};
}
// Creates a task with event counters.
//
// Usage: `task!(task, handle f, s, DROP)`
//
// A task with future `f` and schedule function `s` is created.
// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
// When the tag inside the task gets dropped, `DROP` is incremented.
macro_rules! task {
($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
lazy_static! {
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let ($task, $handle) = {
struct Tag(Box<i32>);
impl Drop for Tag {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
async_task::spawn($future, $schedule, Tag(Box::new(0)))
};
};
}
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
}
#[test]
fn wake_during_run() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task.run();
let w = waker();
w.wake_by_ref();
let task = chan.recv().unwrap();
crossbeam::scope(|scope| {
scope.spawn(|_| {
assert!(catch_unwind(|| task.run()).is_err());
drop(waker());
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
});
thread::sleep(ms(100));
w.wake();
drop(handle);
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
thread::sleep(ms(200));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
})
.unwrap();
}
#[test]
fn cancel_during_run() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task.run();
let w = waker();
w.wake();
let task = chan.recv().unwrap();
crossbeam::scope(|scope| {
scope.spawn(|_| {
assert!(catch_unwind(|| task.run()).is_err());
drop(waker());
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
});
thread::sleep(ms(100));
handle.cancel();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
drop(handle);
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
thread::sleep(ms(200));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
})
.unwrap();
}
#[test]
fn wake_and_cancel_during_run() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task.run();
let w = waker();
w.wake_by_ref();
let task = chan.recv().unwrap();
crossbeam::scope(|scope| {
scope.spawn(|_| {
assert!(catch_unwind(|| task.run()).is_err());
drop(waker());
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
});
thread::sleep(ms(100));
w.wake();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
handle.cancel();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
drop(handle);
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
thread::sleep(ms(200));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
})
.unwrap();
}
#[test]
fn cancel_and_wake_during_run() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task.run();
let w = waker();
w.wake_by_ref();
let task = chan.recv().unwrap();
crossbeam::scope(|scope| {
scope.spawn(|_| {
assert!(catch_unwind(|| task.run()).is_err());
drop(waker());
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
});
thread::sleep(ms(100));
handle.cancel();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
drop(handle);
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
w.wake();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
thread::sleep(ms(200));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
})
.unwrap();
}

348
tests/waker_pending.rs Normal file
View File

@ -0,0 +1,348 @@
#![feature(async_await)]
use std::future::Future;
use std::pin::Pin;
use std::task::Waker;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
use async_task::Task;
use crossbeam::atomic::AtomicCell;
use crossbeam::channel;
use lazy_static::lazy_static;
// Creates a future with event counters.
//
// Usage: `future!(f, waker, POLL, DROP)`
//
// The future `f` always sleeps for 200 ms and returns `Poll::Pending`.
// When it gets polled, `POLL` is incremented.
// When it gets dropped, `DROP` is incremented.
//
// Every time the future is run, it stores the waker into a global variable.
// This waker can be extracted using the `waker` function.
macro_rules! future {
($name:pat, $waker:pat, $poll:ident, $drop:ident) => {
lazy_static! {
static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
static ref WAKER: AtomicCell<Option<Waker>> = AtomicCell::new(None);
}
let ($name, $waker) = {
struct Fut(Box<i32>);
impl Future for Fut {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
WAKER.store(Some(cx.waker().clone()));
$poll.fetch_add(1);
thread::sleep(ms(200));
Poll::Pending
}
}
impl Drop for Fut {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
(Fut(Box::new(0)), || WAKER.swap(None).unwrap())
};
};
}
// Creates a schedule function with event counters.
//
// Usage: `schedule!(s, chan, SCHED, DROP)`
//
// The schedule function `s` pushes the task into `chan`.
// When it gets invoked, `SCHED` is incremented.
// When it gets dropped, `DROP` is incremented.
//
// Receiver `chan` extracts the task when it is scheduled.
macro_rules! schedule {
($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
lazy_static! {
static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let ($name, $chan) = {
let (s, r) = channel::unbounded();
struct Guard(Box<i32>);
impl Drop for Guard {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
let guard = Guard(Box::new(0));
let sched = move |task: Task<_>| {
&guard;
$sched.fetch_add(1);
s.send(task).unwrap();
};
(sched, r)
};
};
}
// Creates a task with event counters.
//
// Usage: `task!(task, handle f, s, DROP)`
//
// A task with future `f` and schedule function `s` is created.
// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
// When the tag inside the task gets dropped, `DROP` is incremented.
macro_rules! task {
($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
lazy_static! {
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let ($task, $handle) = {
struct Tag(Box<i32>);
impl Drop for Tag {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
async_task::spawn($future, $schedule, Tag(Box::new(0)))
};
};
}
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
}
#[test]
fn wake_during_run() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(task, _handle, f, s, DROP_D);
task.run();
let w = waker();
w.wake_by_ref();
let task = chan.recv().unwrap();
crossbeam::scope(|scope| {
scope.spawn(|_| {
task.run();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 2);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 1);
});
thread::sleep(ms(100));
w.wake_by_ref();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
thread::sleep(ms(200));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 2);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 1);
})
.unwrap();
chan.recv().unwrap();
drop(waker());
}
#[test]
fn cancel_during_run() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task.run();
let w = waker();
w.wake();
let task = chan.recv().unwrap();
crossbeam::scope(|scope| {
scope.spawn(|_| {
task.run();
drop(waker());
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
});
thread::sleep(ms(100));
handle.cancel();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
drop(handle);
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
thread::sleep(ms(200));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
})
.unwrap();
}
#[test]
fn wake_and_cancel_during_run() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task.run();
let w = waker();
w.wake_by_ref();
let task = chan.recv().unwrap();
crossbeam::scope(|scope| {
scope.spawn(|_| {
task.run();
drop(waker());
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
});
thread::sleep(ms(100));
w.wake();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
handle.cancel();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
drop(handle);
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
thread::sleep(ms(200));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
})
.unwrap();
}
#[test]
fn cancel_and_wake_during_run() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(task, handle, f, s, DROP_D);
task.run();
let w = waker();
w.wake_by_ref();
let task = chan.recv().unwrap();
crossbeam::scope(|scope| {
scope.spawn(|_| {
task.run();
drop(waker());
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
});
thread::sleep(ms(100));
handle.cancel();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
drop(handle);
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
w.wake();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
thread::sleep(ms(200));
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
})
.unwrap();
}

328
tests/waker_ready.rs Normal file
View File

@ -0,0 +1,328 @@
#![feature(async_await)]
use std::cell::Cell;
use std::future::Future;
use std::pin::Pin;
use std::task::Waker;
use std::task::{Context, Poll};
use std::thread;
use std::time::Duration;
use async_task::Task;
use crossbeam::atomic::AtomicCell;
use crossbeam::channel;
use lazy_static::lazy_static;
// Creates a future with event counters.
//
// Usage: `future!(f, waker, POLL, DROP)`
//
// The future `f` always sleeps for 200 ms, and returns `Poll::Ready` the second time it is polled.
// When it gets polled, `POLL` is incremented.
// When it gets dropped, `DROP` is incremented.
//
// Every time the future is run, it stores the waker into a global variable.
// This waker can be extracted using the `waker` function.
macro_rules! future {
($name:pat, $waker:pat, $poll:ident, $drop:ident) => {
lazy_static! {
static ref $poll: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
static ref WAKER: AtomicCell<Option<Waker>> = AtomicCell::new(None);
}
let ($name, $waker) = {
struct Fut(Cell<bool>, Box<i32>);
impl Future for Fut {
type Output = Box<i32>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
WAKER.store(Some(cx.waker().clone()));
$poll.fetch_add(1);
thread::sleep(ms(200));
if self.0.get() {
Poll::Ready(Box::new(0))
} else {
self.0.set(true);
Poll::Pending
}
}
}
impl Drop for Fut {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
(Fut(Cell::new(false), Box::new(0)), || {
WAKER.swap(None).unwrap()
})
};
};
}
// Creates a schedule function with event counters.
//
// Usage: `schedule!(s, chan, SCHED, DROP)`
//
// The schedule function `s` pushes the task into `chan`.
// When it gets invoked, `SCHED` is incremented.
// When it gets dropped, `DROP` is incremented.
//
// Receiver `chan` extracts the task when it is scheduled.
macro_rules! schedule {
($name:pat, $chan:pat, $sched:ident, $drop:ident) => {
lazy_static! {
static ref $sched: AtomicCell<usize> = AtomicCell::new(0);
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let ($name, $chan) = {
let (s, r) = channel::unbounded();
struct Guard(Box<i32>);
impl Drop for Guard {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
let guard = Guard(Box::new(0));
let sched = move |task: Task<_>| {
&guard;
$sched.fetch_add(1);
s.send(task).unwrap();
};
(sched, r)
};
};
}
// Creates a task with event counters.
//
// Usage: `task!(task, handle f, s, DROP)`
//
// A task with future `f` and schedule function `s` is created.
// The `Task` and `JoinHandle` are bound to `task` and `handle`, respectively.
// When the tag inside the task gets dropped, `DROP` is incremented.
macro_rules! task {
($task:pat, $handle: pat, $future:expr, $schedule:expr, $drop:ident) => {
lazy_static! {
static ref $drop: AtomicCell<usize> = AtomicCell::new(0);
}
let ($task, $handle) = {
struct Tag(Box<i32>);
impl Drop for Tag {
fn drop(&mut self) {
$drop.fetch_add(1);
}
}
async_task::spawn($future, $schedule, Tag(Box::new(0)))
};
};
}
fn ms(ms: u64) -> Duration {
Duration::from_millis(ms)
}
#[test]
fn wake() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(mut task, _, f, s, DROP_D);
assert!(chan.is_empty());
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
waker().wake();
task = chan.recv().unwrap();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
task.run();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
waker().wake();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
}
#[test]
fn wake_by_ref() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(mut task, _, f, s, DROP_D);
assert!(chan.is_empty());
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
waker().wake_by_ref();
task = chan.recv().unwrap();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
task.run();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
waker().wake_by_ref();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
}
#[test]
fn clone() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(mut task, _, f, s, DROP_D);
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
let w2 = waker().clone();
let w3 = w2.clone();
let w4 = w3.clone();
w4.wake();
task = chan.recv().unwrap();
task.run();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
w3.wake();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
drop(w2);
drop(waker());
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
}
#[test]
fn wake_cancelled() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(task, _, f, s, DROP_D);
task.run();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
let w = waker();
w.wake_by_ref();
chan.recv().unwrap().cancel();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
w.wake();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
}
#[test]
fn wake_completed() {
future!(f, waker, POLL, DROP_F);
schedule!(s, chan, SCHEDULE, DROP_S);
task!(task, _, f, s, DROP_D);
task.run();
let w = waker();
assert_eq!(POLL.load(), 1);
assert_eq!(SCHEDULE.load(), 0);
assert_eq!(DROP_F.load(), 0);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
w.wake();
chan.recv().unwrap().run();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 0);
assert_eq!(DROP_D.load(), 0);
assert_eq!(chan.len(), 0);
waker().wake();
assert_eq!(POLL.load(), 2);
assert_eq!(SCHEDULE.load(), 1);
assert_eq!(DROP_F.load(), 1);
assert_eq!(DROP_S.load(), 1);
assert_eq!(DROP_D.load(), 1);
assert_eq!(chan.len(), 0);
}