Compare commits

...

28 Commits

Author SHA1 Message Date
James Liu 924b4530a7
feat: Implement static executors
Resolves #111. Creates a `StaticExecutor` type under a feature flag and allows 
constructing it from an `Executor` via `Executor::leak`. Unlike the executor 
it came from, it's a wrapper around a `State` and omits all changes to 
`active`.

Note, unlike the API proposed in #111, this PR also includes a unsafe 
`StaticExecutor::spawn_scoped` for spawning non-'static tasks, where the 
caller is responsible for ensuring that the task doesn't outlive the borrowed 
state. This would be required for Bevy to migrate to this type, where we're 
currently using lifetime transmutation on `Executor` to enable 
`Thread::scope`-like APIs for working with borrowed state. `StaticExecutor` 
does not have an external lifetime parameter so this approach is infeasible 
without such an API.

The performance gains while using the type are substantial:

```
single_thread/executor::spawn_one
                        time:   [1.6157 µs 1.6238 µs 1.6362 µs]
Found 6 outliers among 100 measurements (6.00%)
  3 (3.00%) high mild
  3 (3.00%) high severe
single_thread/executor::spawn_batch
                        time:   [28.169 µs 29.650 µs 32.196 µs]
Found 19 outliers among 100 measurements (19.00%)
  10 (10.00%) low severe
  3 (3.00%) low mild
  3 (3.00%) high mild
  3 (3.00%) high severe
single_thread/executor::spawn_many_local
                        time:   [6.1952 ms 6.2230 ms 6.2578 ms]
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) high mild
  3 (3.00%) high severe
single_thread/executor::spawn_recursively
                        time:   [50.202 ms 50.479 ms 50.774 ms]
Found 6 outliers among 100 measurements (6.00%)
  5 (5.00%) high mild
  1 (1.00%) high severe
single_thread/executor::yield_now
                        time:   [5.8795 ms 5.8883 ms 5.8977 ms]
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) high mild

multi_thread/executor::spawn_one
                        time:   [1.2565 µs 1.2979 µs 1.3470 µs]
Found 8 outliers among 100 measurements (8.00%)
  7 (7.00%) high mild
  1 (1.00%) high severe
multi_thread/executor::spawn_batch
                        time:   [38.009 µs 43.693 µs 52.882 µs]
Found 22 outliers among 100 measurements (22.00%)
  21 (21.00%) high mild
  1 (1.00%) high severe
Benchmarking multi_thread/executor::spawn_many_local: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 386.6s, or reduce sample count to 10.
multi_thread/executor::spawn_many_local
                        time:   [27.492 ms 27.652 ms 27.814 ms]
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
Benchmarking multi_thread/executor::spawn_recursively: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 16.6s, or reduce sample count to 30.
multi_thread/executor::spawn_recursively
                        time:   [165.82 ms 166.04 ms 166.26 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
multi_thread/executor::yield_now
                        time:   [22.469 ms 22.649 ms 22.798 ms]
Found 8 outliers among 100 measurements (8.00%)
  5 (5.00%) low severe
  3 (3.00%) low mild

single_thread/leaked_executor::spawn_one
                        time:   [1.4717 µs 1.4778 µs 1.4832 µs]
Found 9 outliers among 100 measurements (9.00%)
  3 (3.00%) low severe
  2 (2.00%) low mild
  3 (3.00%) high mild
  1 (1.00%) high severe
single_thread/leaked_executor::spawn_many_local
                        time:   [4.2622 ms 4.3065 ms 4.3489 ms]
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) low mild
single_thread/leaked_executor::spawn_recursively
                        time:   [26.566 ms 26.899 ms 27.228 ms]
single_thread/leaked_executor::yield_now
                        time:   [5.7200 ms 5.7270 ms 5.7342 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

multi_thread/leaked_executor::spawn_one
                        time:   [1.3755 µs 1.4321 µs 1.4892 µs]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
multi_thread/leaked_executor::spawn_many_local
                        time:   [4.1838 ms 4.2394 ms 4.2989 ms]
Found 7 outliers among 100 measurements (7.00%)
  7 (7.00%) high mild
multi_thread/leaked_executor::spawn_recursively
                        time:   [43.074 ms 43.159 ms 43.241 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) low mild
multi_thread/leaked_executor::yield_now
                        time:   [23.210 ms 23.257 ms 23.302 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) low mild
```
2024-05-12 16:22:32 -07:00
John Nunley f1c7ae3340
bench: Add some more filled-out benchmarks
This commit aims to add benchmarks that more realistically reflect
workloads that might happen in the real world.

These benchmarks are as follows:

- "channels", which sets up TASKS tasks, where each task uses a channel
  to wake up the next one.
- "server", which tries to simulate a web server-type scenario.

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-25 22:52:40 -07:00
John Nunley ef512cb384
v1.11.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-13 22:52:52 -07:00
Jacob Rothstein df57d9bc98
feat: reexport async_task::FallibleTask
Motivation: FallibleTask is part of the public interface of this crate, in that Task::fallible returns FallibleTask. However, in order to name that type, users need to add a direct dependency on async_task and ensure the crates versions are compatible. Reexporting allows crate users to name the type directly.
2024-04-11 16:33:17 -07:00
James Liu 649bdfda23
Support racy initialization of an Executor's state
Fixes #89. Uses @notgull's suggestion of using a `AtomicPtr` with a racy initialization instead of a `OnceCell`.

For the addition of more `unsafe`, I added the `clippy::undocumented_unsafe_blocks` lint at a warn, and fixed a few of the remaining open clippy issues (i.e. `Waker::clone_from` already handling the case where they're equal).

Removing `async_lock` as a dependency shouldn't be a SemVer breaking change.
2024-04-08 19:41:14 -07:00
John Nunley 4b37c612f6 v1.10.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-07 08:17:52 -07:00
John Nunley 00f0b99fad chore: Silence clippy
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-05 08:25:58 -07:00
John Nunley d3196999f4 feat: Add a way to batch spawn tasks
For some workloads many tasks are spawned at a time. This requires
locking and unlocking the executor's inner lock every time you spawn a
task. If you spawn many tasks this can be expensive.

This commit exposes a new "spawn_batch" method on both types. This
method allows the user to spawn an entire set of tasks at a time.

Closes #91

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-30 08:18:14 -07:00
John Nunley 17720b098a v1.9.1
Signed-off-by: John Nunley <dev@notgull.net>
2024-03-29 21:10:44 -07:00
John Nunley b6d3a60b44 chore: Fix MIRI failure in larger_tasks
Signed-off-by: John Nunley <dev@notgull.net>
2024-03-25 06:51:06 -07:00
John Nunley a2c1267c85 chore: Fix new nightly warnings
Signed-off-by: John Nunley <dev@notgull.net>
2024-03-25 06:51:06 -07:00
John Nunley 00dbbbf85d Revert "feat: Use actual thread local queues instead of using a RwLock"
This reverts commit 7592d4188a.
2024-03-25 06:51:06 -07:00
John Nunley c90fd306cd Revert "bugfix: Account for local queue corner cases"
This reverts commit 22a9e8b305.
2024-03-25 06:51:06 -07:00
John Nunley 22a9e8b305 bugfix: Account for local queue corner cases
It turns out that with the current strategy it is possible for tasks to
be stuck in the local queue without any hope of being picked back up.
In practice this seems to happen when the only entities polling the
system are tickers, as opposed to runners. Since tickets don't steal
tasks, it is possible for tasks to be left over in the local queue that
don't filter out.

One possible solution is to make it so tickers steal tasks, but this
kind of defeats the point of tickers. So I've instead elected to replace
the current strategy with one that accounts for the corner cases with
local queues.

The main difference is that I replace the Sleepers struct with two
event_listener::Event's. One that handles tickers subscribed to the
global queue and one that handles tickers subscribed to the local queue.
The other main difference is that each local queue now has a reference
counter. If this count reaches zero, no tasks will be pushed to this
queue. Only runners increment or decrement this counter.

This makes the previously instituted tests pass, so hopefully this works
for most use cases.

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-12 20:38:37 -07:00
John Nunley d5dc7a8008 tests: Add tests with more complicated futures
This should catch the errors from earlier.

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-12 20:38:37 -07:00
John Nunley 2f3189a4b4
v1.9.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-02-21 20:58:51 -08:00
James Liu c7bbe489ab
Use wrapping add on ticks to avoid tick counter overflow in debug builds (#101) 2024-02-22 13:03:49 +09:00
James Liu 7592d4188a
feat: Use actual thread local queues instead of using a RwLock
Currently, runner local queues rely on a RwLock<Vec<Arc<ConcurrentQueue>>>> to store the queues instead of using actual thread-local storage.

This adds thread_local as a dependency, but this should allow the executor to work steal without needing to hold a lock, as well as allow tasks to schedule onto the local queue directly, where possible, instead of always relying on the global injector queue.

Fixes #62

Co-authored-by: John Nunley <jtnunley01@gmail.com>
2024-02-21 19:53:40 -08:00
James Liu 188f976dc3
m: Weaken the atomic orderings for notification
The atomic orderings on State::notified might be too strong, as it's primarily
being used as a deterrent against waking up too many threads. This PR weakens
their sequentially consistent operations to Acquire/Release.
2024-02-17 12:20:57 -08:00
James Liu 568a314ad9
Avoid redundant lookups in the active slab when spawning new tasks (#96) 2024-02-17 17:02:59 +09:00
James Liu 7ffdf5ba92
m: Replace unnecessary atomics with non-atomic operations 2024-02-16 17:22:43 -08:00
Jacob Rothstein 0baba46152
chore: Bump async-task to v4.4.0
this crate depends on async_task::Builder, which was introduced in 4.4.0
2024-02-12 19:40:56 -08:00
dependabot[bot] 4fbe23af69
Update criterion requirement from 0.4 to 0.5 (#43)
Signed-off-by: dependabot[bot] <support@github.com>
2024-01-27 00:34:45 +09:00
John Nunley 6c70369102
ex: Use Semaphore instead of manual event-listener
Whoops, I accidentally reinvented a semaphore and made the example a lot
more complicated than it needed to be.

Signed-off-by: John Nunley <dev@notgull.net>
2024-01-08 16:01:07 -08:00
Taiki Endo 57fcc2d991 Relax MSRV to 1.60
https://github.com/smol-rs/futures-lite/pull/90
2024-01-07 07:07:37 +09:00
Taiki Endo 24510a7b72 ci: Use cargo-hack's --rust-version flag for msrv check
This respects rust-version field in Cargo.toml, so it removes the need
to manage MSRV in both the CI file and Cargo.toml.
2024-01-07 07:07:37 +09:00
John Nunley d747bcd827
v1.8.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-24 08:21:32 -08:00
John Nunley fa117dee27
Propagate panics in tasks (#78)
After smol-rs/async-task#37 I meant to add this to the executor. This
commit makes it so all panics are surfaced in the tasks that the user
calls. Hopefully this improves ergonomics.

Signed-off-by: John Nunley <dev@notgull.net>
Signed-off-by: Alain Zscheile <fogti+devel@ytrizja.de>
2023-11-21 11:39:09 +01:00
12 changed files with 1503 additions and 318 deletions

View File

@ -45,21 +45,17 @@ jobs:
if: startsWith(matrix.rust, 'nightly') if: startsWith(matrix.rust, 'nightly')
run: cargo check -Z features=dev_dep run: cargo check -Z features=dev_dep
- run: cargo test - run: cargo test
- run: cargo test --all-features
- run: cargo check --all --all-features --target wasm32-unknown-unknown - run: cargo check --all --all-features --target wasm32-unknown-unknown
- run: cargo hack build --all --all-features --target wasm32-unknown-unknown --no-dev-deps - run: cargo hack build --all --all-features --target wasm32-unknown-unknown --no-dev-deps
msrv: msrv:
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy:
matrix:
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.61']
steps: steps:
- uses: actions/checkout@v4 - uses: actions/checkout@v4
- name: Install Rust - name: Install cargo-hack
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} uses: taiki-e/install-action@cargo-hack
- run: cargo build - run: cargo hack build --rust-version
clippy: clippy:
runs-on: ubuntu-latest runs-on: ubuntu-latest
@ -87,6 +83,10 @@ jobs:
env: env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation
RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout
- run: cargo miri test --all-features
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks
RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout
security_audit: security_audit:
permissions: permissions:

View File

@ -1,3 +1,30 @@
# Version 1.11.0
- Re-export the `async_task::FallibleTask` primitive. (#113)
- Support racy initialization of the executor state. This should allow the executor to be
initialized on web targets without any issues. (#108)
# Version 1.10.0
- Add a function `spawn_batch` that allows users to spawn multiple tasks while only locking the executor once. (#92)
# Version 1.9.1
- Remove the thread-local optimization due to the bugs that it introduces. (#106)
# Version 1.9.0
- Re-introduce the thread-local task push optimization to the executor. (#93)
- Bump `async-task` to v4.4.0. (#90)
- Replace some unnecessary atomic operations with non-atomic operations. (#94)
- Use weaker atomic orderings for notifications. (#95)
- When spawning a future, avoid looking up the ID to assign to that future twice. (#96)
# Version 1.8.0
- When spawned tasks panic, the panic is caught and then surfaced in the spawned
`Task`. Previously, the panic would be surfaced in `tick()` or `run()`. (#78)
# Version 1.7.2 # Version 1.7.2
- Fix compilation under WebAssembly targets (#77). - Fix compilation under WebAssembly targets (#77).

View File

@ -3,10 +3,10 @@ name = "async-executor"
# When publishing a new version: # When publishing a new version:
# - Update CHANGELOG.md # - Update CHANGELOG.md
# - Create "v1.x.y" git tag # - Create "v1.x.y" git tag
version = "1.7.2" version = "1.11.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"] authors = ["Stjepan Glavina <stjepang@gmail.com>", "John Nunley <dev@notgull.net>"]
edition = "2021" edition = "2021"
rust-version = "1.61" rust-version = "1.63"
description = "Async executor" description = "Async executor"
license = "Apache-2.0 OR MIT" license = "Apache-2.0 OR MIT"
repository = "https://github.com/smol-rs/async-executor" repository = "https://github.com/smol-rs/async-executor"
@ -14,10 +14,13 @@ keywords = ["asynchronous", "executor", "single", "multi", "spawn"]
categories = ["asynchronous", "concurrency"] categories = ["asynchronous", "concurrency"]
exclude = ["/.*"] exclude = ["/.*"]
[features]
# Adds support for executors optimized for use in static variables.
static = []
[dependencies] [dependencies]
async-lock = "3.0.0" async-task = "4.4.0"
async-task = "4.0.0" concurrent-queue = "2.5.0"
concurrent-queue = "2.0.0"
fastrand = "2.0.0" fastrand = "2.0.0"
futures-lite = { version = "2.0.0", default-features = false } futures-lite = { version = "2.0.0", default-features = false }
slab = "0.4.4" slab = "0.4.4"
@ -28,9 +31,9 @@ futures-lite = { version = "2.0.0", default-features = false, features = ["std"]
[dev-dependencies] [dev-dependencies]
async-channel = "2.0.0" async-channel = "2.0.0"
async-io = "2.1.0" async-io = "2.1.0"
criterion = { version = "0.4.0", default-features = false, features = ["cargo_bench_support"] } async-lock = "3.0.0"
criterion = { version = "0.5", default-features = false, features = ["cargo_bench_support"] }
easy-parallel = "3.1.0" easy-parallel = "3.1.0"
event-listener = "3.0.0"
fastrand = "2.0.0" fastrand = "2.0.0"
futures-lite = "2.0.0" futures-lite = "2.0.0"
once_cell = "1.16.0" once_cell = "1.16.0"
@ -38,3 +41,4 @@ once_cell = "1.16.0"
[[bench]] [[bench]]
name = "executor" name = "executor"
harness = false harness = false
required-features = ["static"]

View File

@ -1,7 +1,7 @@
use std::future::Future; use std::mem;
use std::thread::available_parallelism; use std::thread::available_parallelism;
use async_executor::Executor; use async_executor::{Executor, StaticExecutor};
use criterion::{criterion_group, criterion_main, Criterion}; use criterion::{criterion_group, criterion_main, Criterion};
use futures_lite::{future, prelude::*}; use futures_lite::{future, prelude::*};
@ -10,6 +10,7 @@ const STEPS: usize = 300;
const LIGHT_TASKS: usize = 25_000; const LIGHT_TASKS: usize = 25_000;
static EX: Executor<'_> = Executor::new(); static EX: Executor<'_> = Executor::new();
static STATIC_EX: StaticExecutor = StaticExecutor::new();
fn run(f: impl FnOnce(), multithread: bool) { fn run(f: impl FnOnce(), multithread: bool) {
let limit = if multithread { let limit = if multithread {
@ -27,6 +28,22 @@ fn run(f: impl FnOnce(), multithread: bool) {
}); });
} }
fn run_static(f: impl FnOnce(), multithread: bool) {
let limit = if multithread {
available_parallelism().unwrap().get()
} else {
1
};
let (s, r) = async_channel::bounded::<()>(1);
easy_parallel::Parallel::new()
.each(0..limit, |_| future::block_on(STATIC_EX.run(r.recv())))
.finish(move || {
let _s = s;
f()
});
}
fn create(c: &mut Criterion) { fn create(c: &mut Criterion) {
c.bench_function("executor::create", |b| { c.bench_function("executor::create", |b| {
b.iter(|| { b.iter(|| {
@ -38,93 +55,442 @@ fn create(c: &mut Criterion) {
} }
fn running_benches(c: &mut Criterion) { fn running_benches(c: &mut Criterion) {
for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() { for (prefix, with_static) in [("executor", false), ("static_executor", true)] {
let mut group = c.benchmark_group(group_name.to_string()); for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() {
let mut group = c.benchmark_group(group_name.to_string());
group.bench_function("executor::spawn_one", |b| { group.bench_function(format!("{}::spawn_one", prefix), |b| {
run( if with_static {
|| { run_static(
b.iter(|| { || {
future::block_on(async { EX.spawn(async {}).await }); b.iter(|| {
}); future::block_on(async { STATIC_EX.spawn(async {}).await });
}, });
*multithread, },
); *multithread,
}); );
} else {
group.bench_function("executor::spawn_many_local", |b| { run(
run( || {
|| { b.iter(|| {
b.iter(move || { future::block_on(async { EX.spawn(async {}).await });
future::block_on(async { });
let mut tasks = Vec::new(); },
for _ in 0..LIGHT_TASKS { *multithread,
tasks.push(EX.spawn(async {})); );
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
);
});
group.bench_function("executor::spawn_recursively", |b| {
#[allow(clippy::manual_async_fn)]
fn go(i: usize) -> impl Future<Output = ()> + Send + 'static {
async move {
if i != 0 {
EX.spawn(async move {
let fut = go(i - 1).boxed();
fut.await;
})
.await;
}
} }
});
if !with_static {
group.bench_function("executor::spawn_batch", |b| {
run(
|| {
let mut handles = vec![];
b.iter(|| {
EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles);
});
handles.clear();
},
*multithread,
)
});
} }
run( group.bench_function(format!("{}::spawn_many_local", prefix), |b| {
|| { if with_static {
b.iter(move || { run_static(
future::block_on(async { || {
let mut tasks = Vec::new(); b.iter(move || {
for _ in 0..TASKS { future::block_on(async {
tasks.push(EX.spawn(go(STEPS))); let mut tasks = Vec::new();
} for _ in 0..LIGHT_TASKS {
for task in tasks { tasks.push(STATIC_EX.spawn(async {}));
task.await;
}
});
});
},
*multithread,
);
});
group.bench_function("executor::yield_now", |b| {
run(
|| {
b.iter(move || {
future::block_on(async {
let mut tasks = Vec::new();
for _ in 0..TASKS {
tasks.push(EX.spawn(async move {
for _ in 0..STEPS {
future::yield_now().await;
} }
})); for task in tasks {
} task.await;
for task in tasks { }
task.await; });
} });
}); },
}); *multithread,
}, );
*multithread, } else {
); run(
}); || {
b.iter(move || {
future::block_on(async {
let mut tasks = Vec::new();
for _ in 0..LIGHT_TASKS {
tasks.push(EX.spawn(async {}));
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
);
}
});
group.bench_function(format!("{}::spawn_recursively", prefix), |b| {
#[allow(clippy::manual_async_fn)]
fn go(i: usize) -> impl Future<Output = ()> + Send + 'static {
async move {
if i != 0 {
EX.spawn(async move {
let fut = go(i - 1).boxed();
fut.await;
})
.await;
}
}
}
#[allow(clippy::manual_async_fn)]
fn go_static(i: usize) -> impl Future<Output = ()> + Send + 'static {
async move {
if i != 0 {
STATIC_EX
.spawn(async move {
let fut = go_static(i - 1).boxed();
fut.await;
})
.await;
}
}
}
if with_static {
run_static(
|| {
b.iter(move || {
future::block_on(async {
let mut tasks = Vec::new();
for _ in 0..TASKS {
tasks.push(STATIC_EX.spawn(go_static(STEPS)));
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
);
} else {
run(
|| {
b.iter(move || {
future::block_on(async {
let mut tasks = Vec::new();
for _ in 0..TASKS {
tasks.push(EX.spawn(go(STEPS)));
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
);
}
});
group.bench_function(format!("{}::yield_now", prefix), |b| {
if with_static {
run_static(
|| {
b.iter(move || {
future::block_on(async {
let mut tasks = Vec::new();
for _ in 0..TASKS {
tasks.push(STATIC_EX.spawn(async move {
for _ in 0..STEPS {
future::yield_now().await;
}
}));
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
);
} else {
run(
|| {
b.iter(move || {
future::block_on(async {
let mut tasks = Vec::new();
for _ in 0..TASKS {
tasks.push(EX.spawn(async move {
for _ in 0..STEPS {
future::yield_now().await;
}
}));
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
);
}
});
group.bench_function(format!("{}::channels", prefix), |b| {
if with_static {
run_static(
|| {
b.iter(move || {
future::block_on(async {
// Create channels.
let mut tasks = Vec::new();
let (first_send, first_recv) = async_channel::bounded(1);
let mut current_recv = first_recv;
for _ in 0..TASKS {
let (next_send, next_recv) = async_channel::bounded(1);
let current_recv =
mem::replace(&mut current_recv, next_recv);
tasks.push(STATIC_EX.spawn(async move {
// Send a notification on to the next task.
for _ in 0..STEPS {
current_recv.recv().await.unwrap();
next_send.send(()).await.unwrap();
}
}));
}
for _ in 0..STEPS {
first_send.send(()).await.unwrap();
current_recv.recv().await.unwrap();
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
)
} else {
run(
|| {
b.iter(move || {
future::block_on(async {
// Create channels.
let mut tasks = Vec::new();
let (first_send, first_recv) = async_channel::bounded(1);
let mut current_recv = first_recv;
for _ in 0..TASKS {
let (next_send, next_recv) = async_channel::bounded(1);
let current_recv =
mem::replace(&mut current_recv, next_recv);
tasks.push(EX.spawn(async move {
// Send a notification on to the next task.
for _ in 0..STEPS {
current_recv.recv().await.unwrap();
next_send.send(()).await.unwrap();
}
}));
}
for _ in 0..STEPS {
first_send.send(()).await.unwrap();
current_recv.recv().await.unwrap();
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
)
}
});
group.bench_function(format!("{}::web_server", prefix), |b| {
if with_static {
run_static(
|| {
b.iter(move || {
future::block_on(async {
let (db_send, db_recv) =
async_channel::bounded::<async_channel::Sender<_>>(
TASKS / 5,
);
let mut db_rng = fastrand::Rng::with_seed(0x12345678);
let mut web_rng = db_rng.fork();
// This task simulates a database.
let db_task = STATIC_EX.spawn(async move {
loop {
// Wait for a new task.
let incoming = match db_recv.recv().await {
Ok(incoming) => incoming,
Err(_) => break,
};
// Process the task. Maybe it takes a while.
for _ in 0..db_rng.usize(..10) {
future::yield_now().await;
}
// Send the data back.
incoming.send(db_rng.usize(..)).await.ok();
}
});
// This task simulates a web server waiting for new tasks.
let server_task = STATIC_EX.spawn(async move {
for i in 0..TASKS {
// Get a new connection.
if web_rng.usize(..=16) == 16 {
future::yield_now().await;
}
let mut web_rng = web_rng.fork();
let db_send = db_send.clone();
let task = STATIC_EX.spawn(async move {
// Check if the data is cached...
if web_rng.bool() {
// ...it's in cache!
future::yield_now().await;
return;
}
// Otherwise we have to make a DB call or two.
for _ in 0..web_rng.usize(STEPS / 2..STEPS) {
let (resp_send, resp_recv) =
async_channel::bounded(1);
db_send.send(resp_send).await.unwrap();
criterion::black_box(
resp_recv.recv().await.unwrap(),
);
}
// Send the data back...
for _ in 0..web_rng.usize(3..16) {
future::yield_now().await;
}
});
task.detach();
if i & 16 == 0 {
future::yield_now().await;
}
}
});
// Spawn and wait for it to stop.
server_task.await;
db_task.await;
});
})
},
*multithread,
)
} else {
run(
|| {
b.iter(move || {
future::block_on(async {
let (db_send, db_recv) =
async_channel::bounded::<async_channel::Sender<_>>(
TASKS / 5,
);
let mut db_rng = fastrand::Rng::with_seed(0x12345678);
let mut web_rng = db_rng.fork();
// This task simulates a database.
let db_task = EX.spawn(async move {
loop {
// Wait for a new task.
let incoming = match db_recv.recv().await {
Ok(incoming) => incoming,
Err(_) => break,
};
// Process the task. Maybe it takes a while.
for _ in 0..db_rng.usize(..10) {
future::yield_now().await;
}
// Send the data back.
incoming.send(db_rng.usize(..)).await.ok();
}
});
// This task simulates a web server waiting for new tasks.
let server_task = EX.spawn(async move {
for i in 0..TASKS {
// Get a new connection.
if web_rng.usize(..=16) == 16 {
future::yield_now().await;
}
let mut web_rng = web_rng.fork();
let db_send = db_send.clone();
let task = EX.spawn(async move {
// Check if the data is cached...
if web_rng.bool() {
// ...it's in cache!
future::yield_now().await;
return;
}
// Otherwise we have to make a DB call or two.
for _ in 0..web_rng.usize(STEPS / 2..STEPS) {
let (resp_send, resp_recv) =
async_channel::bounded(1);
db_send.send(resp_send).await.unwrap();
criterion::black_box(
resp_recv.recv().await.unwrap(),
);
}
// Send the data back...
for _ in 0..web_rng.usize(3..16) {
future::yield_now().await;
}
});
task.detach();
if i & 16 == 0 {
future::yield_now().await;
}
}
});
// Spawn and wait for it to stop.
server_task.await;
db_task.await;
});
})
},
*multithread,
)
}
});
}
} }
} }

View File

@ -1,46 +1,23 @@
//! An executor where you can only push a limited number of tasks. //! An executor where you can only push a limited number of tasks.
use async_executor::{Executor, Task}; use async_executor::{Executor, Task};
use event_listener::{Event, EventListener}; use async_lock::Semaphore;
use futures_lite::pin; use std::{future::Future, sync::Arc, time::Duration};
use std::{
future::Future,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
/// An executor where you can only push a limited number of tasks. /// An executor where you can only push a limited number of tasks.
struct LimitedExecutor { struct LimitedExecutor {
/// Inner running executor. /// Inner running executor.
executor: Executor<'static>, executor: Executor<'static>,
/// Shared state. /// Semaphore limiting the number of tasks.
shared: Arc<SharedState>, semaphore: Arc<Semaphore>,
}
struct SharedState {
/// The maximum number of tasks that can be pushed.
max: usize,
/// The current number of active tasks.
active: AtomicUsize,
/// Event listeners for when a new task is available.
slot_available: Event,
} }
impl LimitedExecutor { impl LimitedExecutor {
fn new(max: usize) -> Self { fn new(max: usize) -> Self {
Self { Self {
executor: Executor::new(), executor: Executor::new(),
shared: Arc::new(SharedState { semaphore: Semaphore::new(max).into(),
max,
active: AtomicUsize::new(0),
slot_available: Event::new(),
}),
} }
} }
@ -49,67 +26,18 @@ impl LimitedExecutor {
where where
F::Output: Send + 'static, F::Output: Send + 'static,
{ {
let listener = EventListener::new(&self.shared.slot_available); // Wait for a semaphore permit.
pin!(listener); let permit = self.semaphore.acquire_arc().await;
// Load the current number of active tasks. // Wrap it into a new future.
let mut active = self.shared.active.load(Ordering::Acquire); let future = async move {
let result = future.await;
drop(permit);
result
};
loop { // Spawn the task.
// Check if there is a slot available. self.executor.spawn(future)
if active < self.shared.max {
// Try to set the slot to what would be the new number of tasks.
let new_active = active + 1;
match self.shared.active.compare_exchange(
active,
new_active,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
// Wrap the future in another future that decrements the active count
// when it's done.
let future = {
let shared = self.shared.clone();
async move {
struct DecOnDrop(Arc<SharedState>);
impl Drop for DecOnDrop {
fn drop(&mut self) {
// Decrement the count and notify someone.
self.0.active.fetch_sub(1, Ordering::SeqCst);
self.0.slot_available.notify(usize::MAX);
}
}
let _dec = DecOnDrop(shared);
future.await
}
};
// Wake up another waiter, in case there is one.
self.shared.slot_available.notify(1);
// Spawn the task.
return self.executor.spawn(future);
}
Err(actual) => {
// Try again.
active = actual;
}
}
} else {
// Start waiting for a slot to become available.
if listener.as_ref().is_listening() {
listener.as_mut().await;
} else {
listener.as_mut().listen();
}
active = self.shared.active.load(Ordering::Acquire);
}
}
} }
/// Run a future to completion. /// Run a future to completion.

View File

@ -1,6 +1,5 @@
//! An executor with task priorities. //! An executor with task priorities.
use std::future::Future;
use std::thread; use std::thread;
use async_executor::{Executor, Task}; use async_executor::{Executor, Task};

View File

@ -25,31 +25,40 @@
//! future::block_on(ex.run(task)); //! future::block_on(ex.run(task));
//! ``` //! ```
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![warn(
missing_docs,
missing_debug_implementations,
rust_2018_idioms,
clippy::undocumented_unsafe_blocks
)]
#![doc( #![doc(
html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" html_favicon_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)] )]
#![doc( #![doc(
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
)] )]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
use std::fmt; use std::fmt;
use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::panic::{RefUnwindSafe, UnwindSafe}; use std::panic::{RefUnwindSafe, UnwindSafe};
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
use std::sync::{Arc, Mutex, RwLock, TryLockError}; use std::sync::{Arc, Mutex, RwLock, TryLockError};
use std::task::{Poll, Waker}; use std::task::{Poll, Waker};
use async_lock::OnceCell; use async_task::{Builder, Runnable};
use async_task::Runnable;
use concurrent_queue::ConcurrentQueue; use concurrent_queue::ConcurrentQueue;
use futures_lite::{future, prelude::*}; use futures_lite::{future, prelude::*};
use slab::Slab; use slab::Slab;
#[cfg(feature = "static")]
mod static_executors;
#[doc(no_inline)] #[doc(no_inline)]
pub use async_task::Task; pub use async_task::{FallibleTask, Task};
#[cfg(feature = "static")]
pub use static_executors::*;
/// An async executor. /// An async executor.
/// ///
@ -77,13 +86,15 @@ pub use async_task::Task;
/// ``` /// ```
pub struct Executor<'a> { pub struct Executor<'a> {
/// The executor state. /// The executor state.
state: OnceCell<Arc<State>>, state: AtomicPtr<State>,
/// Makes the `'a` lifetime invariant. /// Makes the `'a` lifetime invariant.
_marker: PhantomData<std::cell::UnsafeCell<&'a ()>>, _marker: PhantomData<std::cell::UnsafeCell<&'a ()>>,
} }
// SAFETY: Executor stores no thread local state that can be accessed via other thread.
unsafe impl Send for Executor<'_> {} unsafe impl Send for Executor<'_> {}
// SAFETY: Executor internally synchronizes all of it's operations internally.
unsafe impl Sync for Executor<'_> {} unsafe impl Sync for Executor<'_> {}
impl UnwindSafe for Executor<'_> {} impl UnwindSafe for Executor<'_> {}
@ -107,7 +118,7 @@ impl<'a> Executor<'a> {
/// ``` /// ```
pub const fn new() -> Executor<'a> { pub const fn new() -> Executor<'a> {
Executor { Executor {
state: OnceCell::new(), state: AtomicPtr::new(std::ptr::null_mut()),
_marker: PhantomData, _marker: PhantomData,
} }
} }
@ -150,17 +161,120 @@ impl<'a> Executor<'a> {
pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> { pub fn spawn<T: Send + 'a>(&self, future: impl Future<Output = T> + Send + 'a) -> Task<T> {
let mut active = self.state().active.lock().unwrap(); let mut active = self.state().active.lock().unwrap();
// SAFETY: `T` and the future are `Send`.
unsafe { self.spawn_inner(future, &mut active) }
}
/// Spawns many tasks onto the executor.
///
/// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
/// contention.
///
/// For very large numbers of tasks the lock is occasionally dropped and re-acquired to
/// prevent runner thread starvation. It is assumed that the iterator provided does not
/// block; blocking iterators can lock up the internal mutex and therefore the entire
/// executor.
///
/// ## Example
///
/// ```
/// use async_executor::Executor;
/// use futures_lite::{stream, prelude::*};
/// use std::future::ready;
///
/// # futures_lite::future::block_on(async {
/// let mut ex = Executor::new();
///
/// let futures = [
/// ready(1),
/// ready(2),
/// ready(3)
/// ];
///
/// // Spawn all of the futures onto the executor at once.
/// let mut tasks = vec![];
/// ex.spawn_many(futures, &mut tasks);
///
/// // Await all of them.
/// let results = ex.run(async move {
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
/// }).await;
/// assert_eq!(results, [1, 2, 3]);
/// # });
/// ```
///
/// [`spawn`]: Executor::spawn
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
&self,
futures: impl IntoIterator<Item = F>,
handles: &mut impl Extend<Task<F::Output>>,
) {
let mut active = Some(self.state().active.lock().unwrap());
// Convert the futures into tasks.
let tasks = futures.into_iter().enumerate().map(move |(i, future)| {
// SAFETY: `T` and the future are `Send`.
let task = unsafe { self.spawn_inner(future, active.as_mut().unwrap()) };
// Yield the lock every once in a while to ease contention.
if i.wrapping_sub(1) % 500 == 0 {
drop(active.take());
active = Some(self.state().active.lock().unwrap());
}
task
});
// Push the tasks to the user's collection.
handles.extend(tasks);
}
/// Spawn a future while holding the inner lock.
///
/// # Safety
///
/// If this is an `Executor`, `F` and `T` must be `Send`.
unsafe fn spawn_inner<T: 'a>(
&self,
future: impl Future<Output = T> + 'a,
active: &mut Slab<Waker>,
) -> Task<T> {
// Remove the task from the set of active tasks when the future finishes. // Remove the task from the set of active tasks when the future finishes.
let index = active.vacant_entry().key(); let entry = active.vacant_entry();
let state = self.state().clone(); let index = entry.key();
let state = self.state_as_arc();
let future = async move { let future = async move {
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index))); let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
future.await future.await
}; };
// Create the task and register it in the set of active tasks. // Create the task and register it in the set of active tasks.
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) }; //
active.insert(runnable.waker()); // SAFETY:
//
// If `future` is not `Send`, this must be a `LocalExecutor` as per this
// function's unsafe precondition. Since `LocalExecutor` is `!Sync`,
// `try_tick`, `tick` and `run` can only be called from the origin
// thread of the `LocalExecutor`. Similarly, `spawn` can only be called
// from the origin thread, ensuring that `future` and the executor share
// the same origin thread. The `Runnable` can be scheduled from other
// threads, but because of the above `Runnable` can only be called or
// dropped on the origin thread.
//
// `future` is not `'static`, but we make sure that the `Runnable` does
// not outlive `'a`. When the executor is dropped, the `active` field is
// drained and all of the `Waker`s are woken. Then, the queue inside of
// the `Executor` is drained of all of its runnables. This ensures that
// runnables are dropped and this precondition is satisfied.
//
// `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
// Therefore we do not need to worry about what is done with the
// `Waker`.
let (runnable, task) = Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule());
entry.insert(runnable.waker());
runnable.schedule(); runnable.schedule();
task task
@ -184,18 +298,7 @@ impl<'a> Executor<'a> {
/// assert!(ex.try_tick()); // a task was found /// assert!(ex.try_tick()); // a task was found
/// ``` /// ```
pub fn try_tick(&self) -> bool { pub fn try_tick(&self) -> bool {
match self.state().queue.pop() { self.state().try_tick()
Err(_) => false,
Ok(runnable) => {
// Notify another ticker now to pick up where this ticker left off, just in case
// running the task takes a long time.
self.state().notify();
// Run the task.
runnable.run();
true
}
}
} }
/// Runs a single task. /// Runs a single task.
@ -218,9 +321,7 @@ impl<'a> Executor<'a> {
/// future::block_on(ex.tick()); // runs the task /// future::block_on(ex.tick()); // runs the task
/// ``` /// ```
pub async fn tick(&self) { pub async fn tick(&self) {
let state = self.state(); self.state().tick().await;
let runnable = Ticker::new(state).runnable().await;
runnable.run();
} }
/// Runs the executor until the given future completes. /// Runs the executor until the given future completes.
@ -239,27 +340,12 @@ impl<'a> Executor<'a> {
/// assert_eq!(res, 6); /// assert_eq!(res, 6);
/// ``` /// ```
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T { pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
let runner = Runner::new(self.state()); self.state().run(future).await
let mut rng = fastrand::Rng::new();
// A future that runs tasks forever.
let run_forever = async {
loop {
for _ in 0..200 {
let runnable = runner.runnable(&mut rng).await;
runnable.run();
}
future::yield_now().await;
}
};
// Run `future` and `run_forever` concurrently until `future` completes.
future.or(run_forever).await
} }
/// Returns a function that schedules a runnable task when it gets woken up. /// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.state().clone(); let state = self.state_as_arc();
// TODO: If possible, push into the current local queue and notify the ticker. // TODO: If possible, push into the current local queue and notify the ticker.
move |runnable| { move |runnable| {
@ -268,34 +354,73 @@ impl<'a> Executor<'a> {
} }
} }
/// Returns a reference to the inner state. /// Returns a pointer to the inner state.
fn state(&self) -> &Arc<State> { #[inline]
#[cfg(not(target_family = "wasm"))] fn state_ptr(&self) -> *const State {
{ #[cold]
return self.state.get_or_init_blocking(|| Arc::new(State::new())); fn alloc_state(atomic_ptr: &AtomicPtr<State>) -> *mut State {
let state = Arc::new(State::new());
// TODO: Switch this to use cast_mut once the MSRV can be bumped past 1.65
let ptr = Arc::into_raw(state) as *mut State;
if let Err(actual) = atomic_ptr.compare_exchange(
std::ptr::null_mut(),
ptr,
Ordering::AcqRel,
Ordering::Acquire,
) {
// SAFETY: This was just created from Arc::into_raw.
drop(unsafe { Arc::from_raw(ptr) });
actual
} else {
ptr
}
} }
// Some projects use this on WASM for some reason. In this case get_or_init_blocking let mut ptr = self.state.load(Ordering::Acquire);
// doesn't work. Just poll the future once and panic if there is contention. if ptr.is_null() {
#[cfg(target_family = "wasm")] ptr = alloc_state(&self.state);
future::block_on(future::poll_once( }
self.state.get_or_init(|| async { Arc::new(State::new()) }), ptr
)) }
.expect("encountered contention on WASM")
/// Returns a reference to the inner state.
#[inline]
fn state(&self) -> &State {
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
// when accessed through state_ptr.
unsafe { &*self.state_ptr() }
}
// Clones the inner state Arc
#[inline]
fn state_as_arc(&self) -> Arc<State> {
// SAFETY: So long as an Executor lives, it's state pointer will always be a valid
// Arc when accessed through state_ptr.
let arc = unsafe { Arc::from_raw(self.state_ptr()) };
let clone = arc.clone();
std::mem::forget(arc);
clone
} }
} }
impl Drop for Executor<'_> { impl Drop for Executor<'_> {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(state) = self.state.get() { let ptr = *self.state.get_mut();
let mut active = state.active.lock().unwrap(); if ptr.is_null() {
for w in active.drain() { return;
w.wake();
}
drop(active);
while state.queue.pop().is_ok() {}
} }
// SAFETY: As ptr is not null, it was allocated via Arc::new and converted
// via Arc::into_raw in state_ptr.
let state = unsafe { Arc::from_raw(ptr) };
let mut active = state.active.lock().unwrap_or_else(|e| e.into_inner());
for w in active.drain() {
w.wake();
}
drop(active);
while state.queue.pop().is_ok() {}
} }
} }
@ -393,20 +518,70 @@ impl<'a> LocalExecutor<'a> {
pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> { pub fn spawn<T: 'a>(&self, future: impl Future<Output = T> + 'a) -> Task<T> {
let mut active = self.inner().state().active.lock().unwrap(); let mut active = self.inner().state().active.lock().unwrap();
// Remove the task from the set of active tasks when the future finishes. // SAFETY: This executor is not thread safe, so the future and its result
let index = active.vacant_entry().key(); // cannot be sent to another thread.
let state = self.inner().state().clone(); unsafe { self.inner().spawn_inner(future, &mut active) }
let future = async move { }
let _guard = CallOnDrop(move || drop(state.active.lock().unwrap().try_remove(index)));
future.await
};
// Create the task and register it in the set of active tasks. /// Spawns many tasks onto the executor.
let (runnable, task) = unsafe { async_task::spawn_unchecked(future, self.schedule()) }; ///
active.insert(runnable.waker()); /// As opposed to the [`spawn`] method, this locks the executor's inner task lock once and
/// spawns all of the tasks in one go. With large amounts of tasks this can improve
/// contention.
///
/// It is assumed that the iterator provided does not block; blocking iterators can lock up
/// the internal mutex and therefore the entire executor. Unlike [`Executor::spawn`], the
/// mutex is not released, as there are no other threads that can poll this executor.
///
/// ## Example
///
/// ```
/// use async_executor::LocalExecutor;
/// use futures_lite::{stream, prelude::*};
/// use std::future::ready;
///
/// # futures_lite::future::block_on(async {
/// let mut ex = LocalExecutor::new();
///
/// let futures = [
/// ready(1),
/// ready(2),
/// ready(3)
/// ];
///
/// // Spawn all of the futures onto the executor at once.
/// let mut tasks = vec![];
/// ex.spawn_many(futures, &mut tasks);
///
/// // Await all of them.
/// let results = ex.run(async move {
/// stream::iter(tasks).then(|x| x).collect::<Vec<_>>().await
/// }).await;
/// assert_eq!(results, [1, 2, 3]);
/// # });
/// ```
///
/// [`spawn`]: LocalExecutor::spawn
/// [`Executor::spawn_many`]: Executor::spawn_many
pub fn spawn_many<T: Send + 'a, F: Future<Output = T> + Send + 'a>(
&self,
futures: impl IntoIterator<Item = F>,
handles: &mut impl Extend<Task<F::Output>>,
) {
let mut active = self.inner().state().active.lock().unwrap();
runnable.schedule(); // Convert all of the futures to tasks.
task let tasks = futures.into_iter().map(|future| {
// SAFETY: This executor is not thread safe, so the future and its result
// cannot be sent to another thread.
unsafe { self.inner().spawn_inner(future, &mut active) }
// As only one thread can spawn or poll tasks at a time, there is no need
// to release lock contention here.
});
// Push them to the user's collection.
handles.extend(tasks);
} }
/// Attempts to run a task if at least one is scheduled. /// Attempts to run a task if at least one is scheduled.
@ -472,16 +647,6 @@ impl<'a> LocalExecutor<'a> {
self.inner().run(future).await self.inner().run(future).await
} }
/// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.inner().state().clone();
move |runnable| {
state.queue.push(runnable).unwrap();
state.notify();
}
}
/// Returns a reference to the inner executor. /// Returns a reference to the inner executor.
fn inner(&self) -> &Executor<'a> { fn inner(&self) -> &Executor<'a> {
&self.inner &self.inner
@ -514,7 +679,7 @@ struct State {
impl State { impl State {
/// Creates state for a new executor. /// Creates state for a new executor.
fn new() -> State { const fn new() -> State {
State { State {
queue: ConcurrentQueue::unbounded(), queue: ConcurrentQueue::unbounded(),
local_queues: RwLock::new(Vec::new()), local_queues: RwLock::new(Vec::new()),
@ -533,7 +698,7 @@ impl State {
fn notify(&self) { fn notify(&self) {
if self if self
.notified .notified
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst) .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok() .is_ok()
{ {
let waker = self.sleepers.lock().unwrap().notify(); let waker = self.sleepers.lock().unwrap().notify();
@ -542,6 +707,45 @@ impl State {
} }
} }
} }
pub(crate) fn try_tick(&self) -> bool {
match self.queue.pop() {
Err(_) => false,
Ok(runnable) => {
// Notify another ticker now to pick up where this ticker left off, just in case
// running the task takes a long time.
self.notify();
// Run the task.
runnable.run();
true
}
}
}
pub(crate) async fn tick(&self) {
let runnable = Ticker::new(self).runnable().await;
runnable.run();
}
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
let mut runner = Runner::new(self);
let mut rng = fastrand::Rng::new();
// A future that runs tasks forever.
let run_forever = async {
loop {
for _ in 0..200 {
let runnable = runner.runnable(&mut rng).await;
runnable.run();
}
future::yield_now().await;
}
};
// Run `future` and `run_forever` concurrently until `future` completes.
future.or(run_forever).await
}
} }
/// A list of sleeping tickers. /// A list of sleeping tickers.
@ -576,9 +780,7 @@ impl Sleepers {
fn update(&mut self, id: usize, waker: &Waker) -> bool { fn update(&mut self, id: usize, waker: &Waker) -> bool {
for item in &mut self.wakers { for item in &mut self.wakers {
if item.0 == id { if item.0 == id {
if !item.1.will_wake(waker) { item.1.clone_from(waker);
item.1 = waker.clone();
}
return false; return false;
} }
} }
@ -631,29 +833,26 @@ struct Ticker<'a> {
/// 1) Woken. /// 1) Woken.
/// 2a) Sleeping and unnotified. /// 2a) Sleeping and unnotified.
/// 2b) Sleeping and notified. /// 2b) Sleeping and notified.
sleeping: AtomicUsize, sleeping: usize,
} }
impl Ticker<'_> { impl Ticker<'_> {
/// Creates a ticker. /// Creates a ticker.
fn new(state: &State) -> Ticker<'_> { fn new(state: &State) -> Ticker<'_> {
Ticker { Ticker { state, sleeping: 0 }
state,
sleeping: AtomicUsize::new(0),
}
} }
/// Moves the ticker into sleeping and unnotified state. /// Moves the ticker into sleeping and unnotified state.
/// ///
/// Returns `false` if the ticker was already sleeping and unnotified. /// Returns `false` if the ticker was already sleeping and unnotified.
fn sleep(&self, waker: &Waker) -> bool { fn sleep(&mut self, waker: &Waker) -> bool {
let mut sleepers = self.state.sleepers.lock().unwrap(); let mut sleepers = self.state.sleepers.lock().unwrap();
match self.sleeping.load(Ordering::SeqCst) { match self.sleeping {
// Move to sleeping state. // Move to sleeping state.
0 => self 0 => {
.sleeping self.sleeping = sleepers.insert(waker);
.store(sleepers.insert(waker), Ordering::SeqCst), }
// Already sleeping, check if notified. // Already sleeping, check if notified.
id => { id => {
@ -665,31 +864,31 @@ impl Ticker<'_> {
self.state self.state
.notified .notified
.swap(sleepers.is_notified(), Ordering::SeqCst); .store(sleepers.is_notified(), Ordering::Release);
true true
} }
/// Moves the ticker into woken state. /// Moves the ticker into woken state.
fn wake(&self) { fn wake(&mut self) {
let id = self.sleeping.swap(0, Ordering::SeqCst); if self.sleeping != 0 {
if id != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap(); let mut sleepers = self.state.sleepers.lock().unwrap();
sleepers.remove(id); sleepers.remove(self.sleeping);
self.state self.state
.notified .notified
.swap(sleepers.is_notified(), Ordering::SeqCst); .store(sleepers.is_notified(), Ordering::Release);
} }
self.sleeping = 0;
} }
/// Waits for the next runnable task to run. /// Waits for the next runnable task to run.
async fn runnable(&self) -> Runnable { async fn runnable(&mut self) -> Runnable {
self.runnable_with(|| self.state.queue.pop().ok()).await self.runnable_with(|| self.state.queue.pop().ok()).await
} }
/// Waits for the next runnable task to run, given a function that searches for a task. /// Waits for the next runnable task to run, given a function that searches for a task.
async fn runnable_with(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable { async fn runnable_with(&mut self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
future::poll_fn(|cx| { future::poll_fn(|cx| {
loop { loop {
match search() { match search() {
@ -720,14 +919,13 @@ impl Ticker<'_> {
impl Drop for Ticker<'_> { impl Drop for Ticker<'_> {
fn drop(&mut self) { fn drop(&mut self) {
// If this ticker is in sleeping state, it must be removed from the sleepers list. // If this ticker is in sleeping state, it must be removed from the sleepers list.
let id = self.sleeping.swap(0, Ordering::SeqCst); if self.sleeping != 0 {
if id != 0 {
let mut sleepers = self.state.sleepers.lock().unwrap(); let mut sleepers = self.state.sleepers.lock().unwrap();
let notified = sleepers.remove(id); let notified = sleepers.remove(self.sleeping);
self.state self.state
.notified .notified
.swap(sleepers.is_notified(), Ordering::SeqCst); .store(sleepers.is_notified(), Ordering::Release);
// If this ticker was notified, then notify another ticker. // If this ticker was notified, then notify another ticker.
if notified { if notified {
@ -752,7 +950,7 @@ struct Runner<'a> {
local: Arc<ConcurrentQueue<Runnable>>, local: Arc<ConcurrentQueue<Runnable>>,
/// Bumped every time a runnable task is found. /// Bumped every time a runnable task is found.
ticks: AtomicUsize, ticks: usize,
} }
impl Runner<'_> { impl Runner<'_> {
@ -762,7 +960,7 @@ impl Runner<'_> {
state, state,
ticker: Ticker::new(state), ticker: Ticker::new(state),
local: Arc::new(ConcurrentQueue::bounded(512)), local: Arc::new(ConcurrentQueue::bounded(512)),
ticks: AtomicUsize::new(0), ticks: 0,
}; };
state state
.local_queues .local_queues
@ -773,7 +971,7 @@ impl Runner<'_> {
} }
/// Waits for the next runnable task to run. /// Waits for the next runnable task to run.
async fn runnable(&self, rng: &mut fastrand::Rng) -> Runnable { async fn runnable(&mut self, rng: &mut fastrand::Rng) -> Runnable {
let runnable = self let runnable = self
.ticker .ticker
.runnable_with(|| { .runnable_with(|| {
@ -816,9 +1014,9 @@ impl Runner<'_> {
.await; .await;
// Bump the tick counter. // Bump the tick counter.
let ticks = self.ticks.fetch_add(1, Ordering::SeqCst); self.ticks = self.ticks.wrapping_add(1);
if ticks % 64 == 0 { if self.ticks % 64 == 0 {
// Steal tasks from the global queue to ensure fair task scheduling. // Steal tasks from the global queue to ensure fair task scheduling.
steal(&self.state.queue, &self.local); steal(&self.state.queue, &self.local);
} }
@ -868,22 +1066,30 @@ fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
/// Debug implementation for `Executor` and `LocalExecutor`. /// Debug implementation for `Executor` and `LocalExecutor`.
fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Get a reference to the state. // Get a reference to the state.
let state = match executor.state.get() { let ptr = executor.state.load(Ordering::Acquire);
Some(state) => state, if ptr.is_null() {
None => { // The executor has not been initialized.
// The executor has not been initialized. struct Uninitialized;
struct Uninitialized;
impl fmt::Debug for Uninitialized { impl fmt::Debug for Uninitialized {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("<uninitialized>") f.write_str("<uninitialized>")
}
} }
return f.debug_tuple(name).field(&Uninitialized).finish();
} }
};
return f.debug_tuple(name).field(&Uninitialized).finish();
}
// SAFETY: If the state pointer is not null, it must have been
// allocated properly by Arc::new and converted via Arc::into_raw
// in state_ptr.
let state = unsafe { &*ptr };
debug_state(state, name, f)
}
/// Debug implementation for `Executor` and `LocalExecutor`.
fn debug_state(state: &State, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
/// Debug wrapper for the number of active tasks. /// Debug wrapper for the number of active tasks.
struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>); struct ActiveTasks<'a>(&'a Mutex<Slab<Waker>>);
@ -948,6 +1154,7 @@ fn _ensure_send_and_sync() {
fn is_send<T: Send>(_: T) {} fn is_send<T: Send>(_: T) {}
fn is_sync<T: Sync>(_: T) {} fn is_sync<T: Sync>(_: T) {}
fn is_static<T: 'static>(_: T) {}
is_send::<Executor<'_>>(Executor::new()); is_send::<Executor<'_>>(Executor::new());
is_sync::<Executor<'_>>(Executor::new()); is_sync::<Executor<'_>>(Executor::new());
@ -957,6 +1164,9 @@ fn _ensure_send_and_sync() {
is_sync(ex.run(pending::<()>())); is_sync(ex.run(pending::<()>()));
is_send(ex.tick()); is_send(ex.tick());
is_sync(ex.tick()); is_sync(ex.tick());
is_send(ex.schedule());
is_sync(ex.schedule());
is_static(ex.schedule());
/// ```compile_fail /// ```compile_fail
/// use async_executor::LocalExecutor; /// use async_executor::LocalExecutor;

479
src/static_executors.rs Normal file
View File

@ -0,0 +1,479 @@
use crate::{debug_state, Executor, LocalExecutor, State};
use async_task::{Builder, Runnable, Task};
use slab::Slab;
use std::{
cell::UnsafeCell,
fmt,
future::Future,
marker::PhantomData,
panic::{RefUnwindSafe, UnwindSafe},
};
impl Executor<'static> {
/// Consumes the [`Executor`] and intentionally leaks it.
///
/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
/// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations
/// when spawning, running, and finishing tasks.
///
/// `StaticExecutor` cannot be converted back into a `Executor`, so this operation is
/// irreversible without the use of unsafe.
///
/// # Example
///
/// ```
/// use async_executor::Executor;
/// use futures_lite::future;
///
/// let ex = Executor::new().leak();
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
///
/// future::block_on(ex.run(task));
/// ```
pub fn leak(self) -> &'static StaticExecutor {
let ptr = self.state_ptr();
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
// when accessed through state_ptr. This executor will live for the full 'static
// lifetime so this isn't an arbitrary lifetime extension.
let state: &'static State = unsafe { &*ptr };
std::mem::forget(self);
let mut active = state.active.lock().unwrap();
if !active.is_empty() {
// Reschedule all of the active tasks.
for waker in active.drain() {
waker.wake();
}
// Overwrite to ensure that the slab is deallocated.
*active = Slab::new();
}
// SAFETY: StaticExecutor has the same memory layout as State as it's repr(transparent).
// The lifetime is not altered: 'static -> 'static.
let static_executor: &'static StaticExecutor = unsafe { std::mem::transmute(state) };
static_executor
}
}
impl LocalExecutor<'static> {
/// Consumes the [`LocalExecutor`] and intentionally leaks it.
///
/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
/// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations
/// when spawning, running, and finishing tasks.
///
/// `StaticLocalExecutor` cannot be converted back into a `Executor`, so this operation is
/// irreversible without the use of unsafe.
///
/// # Example
///
/// ```
/// use async_executor::LocalExecutor;
/// use futures_lite::future;
///
/// let ex = LocalExecutor::new().leak();
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
///
/// future::block_on(ex.run(task));
/// ```
pub fn leak(self) -> &'static StaticLocalExecutor {
let ptr = self.inner.state_ptr();
// SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid
// when accessed through state_ptr. This executor will live for the full 'static
// lifetime so this isn't an arbitrary lifetime extension.
let state: &'static State = unsafe { &*ptr };
std::mem::forget(self);
let mut active = state.active.lock().unwrap();
if !active.is_empty() {
// Reschedule all of the active tasks.
for waker in active.drain() {
waker.wake();
}
// Overwrite to ensure that the slab is deallocated.
*active = Slab::new();
}
// SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent).
// The lifetime is not altered: 'static -> 'static.
let static_executor: &'static StaticLocalExecutor = unsafe { std::mem::transmute(state) };
static_executor
}
}
/// A static-lifetimed async [`Executor`].
///
/// This is primarily intended to be used in [`static`] variables, or types intended to be used, or can be created in non-static
/// contexts via [`Executor::leak`].
///
/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
///
/// As this type does not implement `Drop`, losing the handle to the executor or failing
/// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned
/// tasks to permanently leak. Any tasks at the time will not be cancelled.
///
/// [`static`]: https://doc.rust-lang.org/std/keyword.static.html
#[repr(transparent)]
pub struct StaticExecutor {
state: State,
}
// SAFETY: Executor stores no thread local state that can be accessed via other thread.
unsafe impl Send for StaticExecutor {}
// SAFETY: Executor internally synchronizes all of it's operations internally.
unsafe impl Sync for StaticExecutor {}
impl UnwindSafe for StaticExecutor {}
impl RefUnwindSafe for StaticExecutor {}
impl fmt::Debug for StaticExecutor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
debug_state(&self.state, "StaticExecutor", f)
}
}
impl StaticExecutor {
/// Creates a new StaticExecutor.
///
/// # Examples
///
/// ```
/// use async_executor::StaticExecutor;
///
/// static EXECUTOR: StaticExecutor = StaticExecutor::new();
/// ```
pub const fn new() -> Self {
Self {
state: State::new(),
}
}
/// Spawns a task onto the executor.
///
/// Note: unlike [`Executor::spawn`], this function requires being called with a `'static`
/// borrow on the executor.
///
/// # Examples
///
/// ```
/// use async_executor::StaticExecutor;
///
/// static EXECUTOR: StaticExecutor = StaticExecutor::new();
///
/// let task = EXECUTOR.spawn(async {
/// println!("Hello world");
/// });
/// ```
pub fn spawn<T: Send + 'static>(
&'static self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
let (runnable, task) = Builder::new()
.propagate_panic(true)
.spawn(|()| future, self.schedule());
runnable.schedule();
task
}
/// Spawns a non-`'static` task onto the executor.
///
/// ## Safety
///
/// The caller must ensure that the returned task terminates
/// or is cancelled before the end of 'a.
pub unsafe fn spawn_scoped<'a, T: Send + 'a>(
&'static self,
future: impl Future<Output = T> + Send + 'a,
) -> Task<T> {
// SAFETY:
//
// - `future` is `Send`
// - `future` is not `'static`, but the caller guarantees that the
// task, and thus its `Runnable` must not live longer than `'a`.
// - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
// Therefore we do not need to worry about what is done with the
// `Waker`.
let (runnable, task) = unsafe {
Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule())
};
runnable.schedule();
task
}
/// Attempts to run a task if at least one is scheduled.
///
/// Running a scheduled task means simply polling its future once.
///
/// # Examples
///
/// ```
/// use async_executor::StaticExecutor;
///
/// static EXECUTOR: StaticExecutor = StaticExecutor::new();
///
/// assert!(!EXECUTOR.try_tick()); // no tasks to run
///
/// let task = EXECUTOR.spawn(async {
/// println!("Hello world");
/// });
///
/// assert!(EXECUTOR.try_tick()); // a task was found
/// ```
pub fn try_tick(&self) -> bool {
self.state.try_tick()
}
/// Runs a single task.
///
/// Running a task means simply polling its future once.
///
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
///
/// # Examples
///
/// ```
/// use async_executor::StaticExecutor;
/// use futures_lite::future;
///
/// static EXECUTOR: StaticExecutor = StaticExecutor::new();
///
/// let task = EXECUTOR.spawn(async {
/// println!("Hello world");
/// });
///
/// future::block_on(EXECUTOR.tick()); // runs the task
/// ```
pub async fn tick(&self) {
self.state.tick().await;
}
/// Runs the executor until the given future completes.
///
/// # Examples
///
/// ```
/// use async_executor::StaticExecutor;
/// use futures_lite::future;
///
/// static EXECUTOR: StaticExecutor = StaticExecutor::new();
///
/// let task = EXECUTOR.spawn(async { 1 + 2 });
/// let res = future::block_on(EXECUTOR.run(async { task.await * 2 }));
///
/// assert_eq!(res, 6);
/// ```
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
self.state.run(future).await
}
/// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state: &'static State = &self.state;
// TODO: If possible, push into the current local queue and notify the ticker.
move |runnable| {
state.queue.push(runnable).unwrap();
state.notify();
}
}
}
impl Default for StaticExecutor {
fn default() -> Self {
Self::new()
}
}
/// A static async [`LocalExecutor`] created from [`LocalExecutor::leak`].
///
/// This is primarily intended to be used in [`thread_local`] variables, or can be created in non-static
/// contexts via [`LocalExecutor::leak`].
///
/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
///
/// As this type does not implement `Drop`, losing the handle to the executor or failing
/// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned
/// tasks to permanently leak. Any tasks at the time will not be cancelled.
///
/// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html
#[repr(transparent)]
pub struct StaticLocalExecutor {
state: State,
marker_: PhantomData<UnsafeCell<()>>,
}
impl UnwindSafe for StaticLocalExecutor {}
impl RefUnwindSafe for StaticLocalExecutor {}
impl fmt::Debug for StaticLocalExecutor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
debug_state(&self.state, "StaticLocalExecutor", f)
}
}
impl StaticLocalExecutor {
/// Creates a new StaticLocalExecutor.
///
/// # Examples
///
/// ```
/// use async_executor::StaticLocalExecutor;
///
/// thread_local! {
/// static EXECUTOR: StaticLocalExecutor = StaticLocalExecutor::new();
/// }
/// ```
pub const fn new() -> Self {
Self {
state: State::new(),
marker_: PhantomData,
}
}
/// Spawns a task onto the executor.
///
/// Note: unlike [`LocalExecutor::spawn`], this function requires being called with a `'static`
/// borrow on the executor.
///
/// # Examples
///
/// ```
/// use async_executor::LocalExecutor;
///
/// let ex = LocalExecutor::new().leak();
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
/// ```
pub fn spawn<T: 'static>(&'static self, future: impl Future<Output = T> + 'static) -> Task<T> {
let (runnable, task) = Builder::new()
.propagate_panic(true)
.spawn_local(|()| future, self.schedule());
runnable.schedule();
task
}
/// Spawns a non-`'static` task onto the executor.
///
/// ## Safety
///
/// The caller must ensure that the returned task terminates
/// or is cancelled before the end of 'a.
pub unsafe fn spawn_scoped<'a, T: 'a>(
&'static self,
future: impl Future<Output = T> + 'a,
) -> Task<T> {
// SAFETY:
//
// - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`,
// `try_tick`, `tick` and `run` can only be called from the origin
// thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only
// be called from the origin thread, ensuring that `future` and the executor
// share the same origin thread. The `Runnable` can be scheduled from other
// threads, but because of the above `Runnable` can only be called or
// dropped on the origin thread.
// - `future` is not `'static`, but the caller guarantees that the
// task, and thus its `Runnable` must not live longer than `'a`.
// - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
// Therefore we do not need to worry about what is done with the
// `Waker`.
let (runnable, task) = unsafe {
Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule())
};
runnable.schedule();
task
}
/// Attempts to run a task if at least one is scheduled.
///
/// Running a scheduled task means simply polling its future once.
///
/// # Examples
///
/// ```
/// use async_executor::LocalExecutor;
///
/// let ex = LocalExecutor::new().leak();
/// assert!(!ex.try_tick()); // no tasks to run
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
/// assert!(ex.try_tick()); // a task was found
/// ```
pub fn try_tick(&self) -> bool {
self.state.try_tick()
}
/// Runs a single task.
///
/// Running a task means simply polling its future once.
///
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
///
/// # Examples
///
/// ```
/// use async_executor::LocalExecutor;
/// use futures_lite::future;
///
/// let ex = LocalExecutor::new().leak();
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
/// future::block_on(ex.tick()); // runs the task
/// ```
pub async fn tick(&self) {
self.state.tick().await;
}
/// Runs the executor until the given future completes.
///
/// # Examples
///
/// ```
/// use async_executor::LocalExecutor;
/// use futures_lite::future;
///
/// let ex = LocalExecutor::new().leak();
///
/// let task = ex.spawn(async { 1 + 2 });
/// let res = future::block_on(ex.run(async { task.await * 2 }));
///
/// assert_eq!(res, 6);
/// ```
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
self.state.run(future).await
}
/// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state: &'static State = &self.state;
// TODO: If possible, push into the current local queue and notify the ticker.
move |runnable| {
state.queue.push(runnable).unwrap();
state.notify();
}
}
}
impl Default for StaticLocalExecutor {
fn default() -> Self {
Self::new()
}
}

View File

@ -121,6 +121,20 @@ fn drop_finished_task_and_then_drop_executor() {
assert_eq!(DROP.load(Ordering::SeqCst), 1); assert_eq!(DROP.load(Ordering::SeqCst), 1);
} }
#[test]
fn iterator_panics_mid_run() {
let ex = Executor::new();
let panic = std::panic::catch_unwind(|| {
let mut handles = vec![];
ex.spawn_many(
(0..50).map(|i| if i == 25 { panic!() } else { future::ready(i) }),
&mut handles,
)
});
assert!(panic.is_err());
}
struct CallOnDrop<F: Fn()>(F); struct CallOnDrop<F: Fn()>(F);
impl<F: Fn()> Drop for CallOnDrop<F> { impl<F: Fn()> Drop for CallOnDrop<F> {

99
tests/larger_tasks.rs Normal file
View File

@ -0,0 +1,99 @@
//! Test for larger tasks.
use async_executor::Executor;
use futures_lite::future::{self, block_on};
use futures_lite::prelude::*;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
fn do_run<Fut: Future<Output = ()>>(mut f: impl FnMut(Arc<Executor<'static>>) -> Fut) {
// This should not run for longer than two minutes.
#[cfg(not(miri))]
let _stop_timeout = {
let (stop_timeout, stopper) = async_channel::bounded::<()>(1);
thread::spawn(move || {
block_on(async move {
let timeout = async {
async_io::Timer::after(Duration::from_secs(2 * 60)).await;
eprintln!("test timed out after 2m");
std::process::exit(1)
};
let _ = stopper.recv().or(timeout).await;
})
});
stop_timeout
};
let ex = Arc::new(Executor::new());
// Test 1: Use the `run` command.
block_on(ex.run(f(ex.clone())));
// Test 2: Loop on `tick`.
block_on(async {
let ticker = async {
loop {
ex.tick().await;
}
};
f(ex.clone()).or(ticker).await
});
// Test 3: Run on many threads.
thread::scope(|scope| {
let (_signal, shutdown) = async_channel::bounded::<()>(1);
for _ in 0..16 {
let shutdown = shutdown.clone();
let ex = &ex;
scope.spawn(move || block_on(ex.run(shutdown.recv())));
}
block_on(f(ex.clone()));
});
// Test 4: Tick loop on many threads.
thread::scope(|scope| {
let (_signal, shutdown) = async_channel::bounded::<()>(1);
for _ in 0..16 {
let shutdown = shutdown.clone();
let ex = &ex;
scope.spawn(move || {
block_on(async move {
let ticker = async {
loop {
ex.tick().await;
}
};
shutdown.recv().or(ticker).await
})
});
}
block_on(f(ex.clone()));
});
}
#[test]
fn smoke() {
do_run(|ex| async move { ex.spawn(async {}).await });
}
#[test]
fn yield_now() {
do_run(|ex| async move { ex.spawn(future::yield_now()).await })
}
#[test]
fn timer() {
do_run(|ex| async move {
ex.spawn(async_io::Timer::after(Duration::from_millis(5)))
.await;
})
}

14
tests/panic_prop.rs Normal file
View File

@ -0,0 +1,14 @@
use async_executor::Executor;
use futures_lite::{future, prelude::*};
#[test]
fn test_panic_propagation() {
let ex = Executor::new();
let task = ex.spawn(async { panic!("should be caught by the task") });
// Running the executor should not panic.
assert!(ex.try_tick());
// Polling the task should.
assert!(future::block_on(task.catch_unwind()).is_err());
}

45
tests/spawn_many.rs Normal file
View File

@ -0,0 +1,45 @@
use async_executor::{Executor, LocalExecutor};
use futures_lite::future;
#[cfg(not(miri))]
const READY_COUNT: usize = 50_000;
#[cfg(miri)]
const READY_COUNT: usize = 505;
#[test]
fn spawn_many() {
future::block_on(async {
let ex = Executor::new();
// Spawn a lot of tasks.
let mut tasks = vec![];
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks);
// Run all of the tasks in parallel.
ex.run(async move {
for (i, task) in tasks.into_iter().enumerate() {
assert_eq!(task.await, i);
}
})
.await;
});
}
#[test]
fn spawn_many_local() {
future::block_on(async {
let ex = LocalExecutor::new();
// Spawn a lot of tasks.
let mut tasks = vec![];
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks);
// Run all of the tasks in parallel.
ex.run(async move {
for (i, task) in tasks.into_iter().enumerate() {
assert_eq!(task.await, i);
}
})
.await;
});
}