Compare commits

...

122 Commits

Author SHA1 Message Date
James Liu 924b4530a7
feat: Implement static executors
Resolves #111. Creates a `StaticExecutor` type under a feature flag and allows 
constructing it from an `Executor` via `Executor::leak`. Unlike the executor 
it came from, it's a wrapper around a `State` and omits all changes to 
`active`.

Note, unlike the API proposed in #111, this PR also includes a unsafe 
`StaticExecutor::spawn_scoped` for spawning non-'static tasks, where the 
caller is responsible for ensuring that the task doesn't outlive the borrowed 
state. This would be required for Bevy to migrate to this type, where we're 
currently using lifetime transmutation on `Executor` to enable 
`Thread::scope`-like APIs for working with borrowed state. `StaticExecutor` 
does not have an external lifetime parameter so this approach is infeasible 
without such an API.

The performance gains while using the type are substantial:

```
single_thread/executor::spawn_one
                        time:   [1.6157 µs 1.6238 µs 1.6362 µs]
Found 6 outliers among 100 measurements (6.00%)
  3 (3.00%) high mild
  3 (3.00%) high severe
single_thread/executor::spawn_batch
                        time:   [28.169 µs 29.650 µs 32.196 µs]
Found 19 outliers among 100 measurements (19.00%)
  10 (10.00%) low severe
  3 (3.00%) low mild
  3 (3.00%) high mild
  3 (3.00%) high severe
single_thread/executor::spawn_many_local
                        time:   [6.1952 ms 6.2230 ms 6.2578 ms]
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) high mild
  3 (3.00%) high severe
single_thread/executor::spawn_recursively
                        time:   [50.202 ms 50.479 ms 50.774 ms]
Found 6 outliers among 100 measurements (6.00%)
  5 (5.00%) high mild
  1 (1.00%) high severe
single_thread/executor::yield_now
                        time:   [5.8795 ms 5.8883 ms 5.8977 ms]
Found 3 outliers among 100 measurements (3.00%)
  3 (3.00%) high mild

multi_thread/executor::spawn_one
                        time:   [1.2565 µs 1.2979 µs 1.3470 µs]
Found 8 outliers among 100 measurements (8.00%)
  7 (7.00%) high mild
  1 (1.00%) high severe
multi_thread/executor::spawn_batch
                        time:   [38.009 µs 43.693 µs 52.882 µs]
Found 22 outliers among 100 measurements (22.00%)
  21 (21.00%) high mild
  1 (1.00%) high severe
Benchmarking multi_thread/executor::spawn_many_local: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 386.6s, or reduce sample count to 10.
multi_thread/executor::spawn_many_local
                        time:   [27.492 ms 27.652 ms 27.814 ms]
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) low mild
  3 (3.00%) high mild
Benchmarking multi_thread/executor::spawn_recursively: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 16.6s, or reduce sample count to 30.
multi_thread/executor::spawn_recursively
                        time:   [165.82 ms 166.04 ms 166.26 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
multi_thread/executor::yield_now
                        time:   [22.469 ms 22.649 ms 22.798 ms]
Found 8 outliers among 100 measurements (8.00%)
  5 (5.00%) low severe
  3 (3.00%) low mild

single_thread/leaked_executor::spawn_one
                        time:   [1.4717 µs 1.4778 µs 1.4832 µs]
Found 9 outliers among 100 measurements (9.00%)
  3 (3.00%) low severe
  2 (2.00%) low mild
  3 (3.00%) high mild
  1 (1.00%) high severe
single_thread/leaked_executor::spawn_many_local
                        time:   [4.2622 ms 4.3065 ms 4.3489 ms]
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) low mild
single_thread/leaked_executor::spawn_recursively
                        time:   [26.566 ms 26.899 ms 27.228 ms]
single_thread/leaked_executor::yield_now
                        time:   [5.7200 ms 5.7270 ms 5.7342 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

multi_thread/leaked_executor::spawn_one
                        time:   [1.3755 µs 1.4321 µs 1.4892 µs]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
multi_thread/leaked_executor::spawn_many_local
                        time:   [4.1838 ms 4.2394 ms 4.2989 ms]
Found 7 outliers among 100 measurements (7.00%)
  7 (7.00%) high mild
multi_thread/leaked_executor::spawn_recursively
                        time:   [43.074 ms 43.159 ms 43.241 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) low mild
multi_thread/leaked_executor::yield_now
                        time:   [23.210 ms 23.257 ms 23.302 ms]
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) low mild
```
2024-05-12 16:22:32 -07:00
John Nunley f1c7ae3340
bench: Add some more filled-out benchmarks
This commit aims to add benchmarks that more realistically reflect
workloads that might happen in the real world.

These benchmarks are as follows:

- "channels", which sets up TASKS tasks, where each task uses a channel
  to wake up the next one.
- "server", which tries to simulate a web server-type scenario.

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-25 22:52:40 -07:00
John Nunley ef512cb384
v1.11.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-13 22:52:52 -07:00
Jacob Rothstein df57d9bc98
feat: reexport async_task::FallibleTask
Motivation: FallibleTask is part of the public interface of this crate, in that Task::fallible returns FallibleTask. However, in order to name that type, users need to add a direct dependency on async_task and ensure the crates versions are compatible. Reexporting allows crate users to name the type directly.
2024-04-11 16:33:17 -07:00
James Liu 649bdfda23
Support racy initialization of an Executor's state
Fixes #89. Uses @notgull's suggestion of using a `AtomicPtr` with a racy initialization instead of a `OnceCell`.

For the addition of more `unsafe`, I added the `clippy::undocumented_unsafe_blocks` lint at a warn, and fixed a few of the remaining open clippy issues (i.e. `Waker::clone_from` already handling the case where they're equal).

Removing `async_lock` as a dependency shouldn't be a SemVer breaking change.
2024-04-08 19:41:14 -07:00
John Nunley 4b37c612f6 v1.10.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-07 08:17:52 -07:00
John Nunley 00f0b99fad chore: Silence clippy
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-05 08:25:58 -07:00
John Nunley d3196999f4 feat: Add a way to batch spawn tasks
For some workloads many tasks are spawned at a time. This requires
locking and unlocking the executor's inner lock every time you spawn a
task. If you spawn many tasks this can be expensive.

This commit exposes a new "spawn_batch" method on both types. This
method allows the user to spawn an entire set of tasks at a time.

Closes #91

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-30 08:18:14 -07:00
John Nunley 17720b098a v1.9.1
Signed-off-by: John Nunley <dev@notgull.net>
2024-03-29 21:10:44 -07:00
John Nunley b6d3a60b44 chore: Fix MIRI failure in larger_tasks
Signed-off-by: John Nunley <dev@notgull.net>
2024-03-25 06:51:06 -07:00
John Nunley a2c1267c85 chore: Fix new nightly warnings
Signed-off-by: John Nunley <dev@notgull.net>
2024-03-25 06:51:06 -07:00
John Nunley 00dbbbf85d Revert "feat: Use actual thread local queues instead of using a RwLock"
This reverts commit 7592d4188a.
2024-03-25 06:51:06 -07:00
John Nunley c90fd306cd Revert "bugfix: Account for local queue corner cases"
This reverts commit 22a9e8b305.
2024-03-25 06:51:06 -07:00
John Nunley 22a9e8b305 bugfix: Account for local queue corner cases
It turns out that with the current strategy it is possible for tasks to
be stuck in the local queue without any hope of being picked back up.
In practice this seems to happen when the only entities polling the
system are tickers, as opposed to runners. Since tickets don't steal
tasks, it is possible for tasks to be left over in the local queue that
don't filter out.

One possible solution is to make it so tickers steal tasks, but this
kind of defeats the point of tickers. So I've instead elected to replace
the current strategy with one that accounts for the corner cases with
local queues.

The main difference is that I replace the Sleepers struct with two
event_listener::Event's. One that handles tickers subscribed to the
global queue and one that handles tickers subscribed to the local queue.
The other main difference is that each local queue now has a reference
counter. If this count reaches zero, no tasks will be pushed to this
queue. Only runners increment or decrement this counter.

This makes the previously instituted tests pass, so hopefully this works
for most use cases.

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-12 20:38:37 -07:00
John Nunley d5dc7a8008 tests: Add tests with more complicated futures
This should catch the errors from earlier.

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-12 20:38:37 -07:00
John Nunley 2f3189a4b4
v1.9.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-02-21 20:58:51 -08:00
James Liu c7bbe489ab
Use wrapping add on ticks to avoid tick counter overflow in debug builds (#101) 2024-02-22 13:03:49 +09:00
James Liu 7592d4188a
feat: Use actual thread local queues instead of using a RwLock
Currently, runner local queues rely on a RwLock<Vec<Arc<ConcurrentQueue>>>> to store the queues instead of using actual thread-local storage.

This adds thread_local as a dependency, but this should allow the executor to work steal without needing to hold a lock, as well as allow tasks to schedule onto the local queue directly, where possible, instead of always relying on the global injector queue.

Fixes #62

Co-authored-by: John Nunley <jtnunley01@gmail.com>
2024-02-21 19:53:40 -08:00
James Liu 188f976dc3
m: Weaken the atomic orderings for notification
The atomic orderings on State::notified might be too strong, as it's primarily
being used as a deterrent against waking up too many threads. This PR weakens
their sequentially consistent operations to Acquire/Release.
2024-02-17 12:20:57 -08:00
James Liu 568a314ad9
Avoid redundant lookups in the active slab when spawning new tasks (#96) 2024-02-17 17:02:59 +09:00
James Liu 7ffdf5ba92
m: Replace unnecessary atomics with non-atomic operations 2024-02-16 17:22:43 -08:00
Jacob Rothstein 0baba46152
chore: Bump async-task to v4.4.0
this crate depends on async_task::Builder, which was introduced in 4.4.0
2024-02-12 19:40:56 -08:00
dependabot[bot] 4fbe23af69
Update criterion requirement from 0.4 to 0.5 (#43)
Signed-off-by: dependabot[bot] <support@github.com>
2024-01-27 00:34:45 +09:00
John Nunley 6c70369102
ex: Use Semaphore instead of manual event-listener
Whoops, I accidentally reinvented a semaphore and made the example a lot
more complicated than it needed to be.

Signed-off-by: John Nunley <dev@notgull.net>
2024-01-08 16:01:07 -08:00
Taiki Endo 57fcc2d991 Relax MSRV to 1.60
https://github.com/smol-rs/futures-lite/pull/90
2024-01-07 07:07:37 +09:00
Taiki Endo 24510a7b72 ci: Use cargo-hack's --rust-version flag for msrv check
This respects rust-version field in Cargo.toml, so it removes the need
to manage MSRV in both the CI file and Cargo.toml.
2024-01-07 07:07:37 +09:00
John Nunley d747bcd827
v1.8.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-24 08:21:32 -08:00
John Nunley fa117dee27
Propagate panics in tasks (#78)
After smol-rs/async-task#37 I meant to add this to the executor. This
commit makes it so all panics are surfaced in the tasks that the user
calls. Hopefully this improves ergonomics.

Signed-off-by: John Nunley <dev@notgull.net>
Signed-off-by: Alain Zscheile <fogti+devel@ytrizja.de>
2023-11-21 11:39:09 +01:00
John Nunley 4b1cf40142
v1.7.2
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-18 09:26:16 -08:00
John Nunley 144b0576d1 Update to 2021 edition
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-13 08:30:50 -08:00
John Nunley b140c46123 Fix dev-dependency WASM compilation issue
`futures-lite` in the dev dependencies added a `block_on` call that was
not present in the WASM build, causing a compile error. This PR makes
sure that the `std` feature of `futures-lite` is enabled in Cargo.toml.

This also adds a CI check to ensure that this doesn't happen again

Signed-off-by: John Nunley <dev@notgull.net>
2023-11-13 08:30:50 -08:00
John Nunley 1d4769a7b5
v1.7.1
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-12 16:21:46 -08:00
John Nunley 6c3d45b23c
bugfix: Fix wasm32 compile errors
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-11 10:15:04 -08:00
John Nunley f076528d27
Add a disclaimer saying this is a basic executor (#74)
In many issues I've mentioned that the executors in this crate are just
reference executors. However, this is not documented in the crate
itself.

This commit adds a disclaimer to the crate documentation and to
README.md that these are reference executors that shouldn't be relied on
for performance.

Signed-off-by: John Nunley <dev@notgull.net>
2023-11-11 08:34:46 -08:00
John Nunley c7fd967c9e
v1.7.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-05 17:24:42 -08:00
John Nunley 361c5fd359 Fix missing import on Miri
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-02 21:50:48 -07:00
John Nunley 457cf7b888 Disable leaky test for MIRI
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-02 21:50:48 -07:00
John Nunley e1e2ab11df Bump async-io, async-lock and futures-lite
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-02 21:50:48 -07:00
dependabot[bot] b91875e73b
deps: Update async-channel requirement from 1.4.1 to 2.0.0
Updates the requirements on [async-channel](https://github.com/smol-rs/async-channel) to permit the latest version.
- [Release notes](https://github.com/smol-rs/async-channel/releases)
- [Changelog](https://github.com/smol-rs/async-channel/blob/master/CHANGELOG.md)
- [Commits](https://github.com/smol-rs/async-channel/compare/v1.4.1...v2.0.0)

---
updated-dependencies:
- dependency-name: async-channel
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2023-10-30 20:54:02 -07:00
John Nunley 599c71a3f9
v1.6.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-10-16 19:17:27 -07:00
John Nunley 8a0832c090
m: Remove the thread-local executor optimization
This was added in #37 as an optimization, but has since lead to many bugs. See
the issues #53, #57 and #60 for more information. I do not have the bandwidth
to address all of these bugs, so I'm taking the path of least resistance by
just removing the problematic code.

CLoses #53, #57 and #60

Signed-off-by: John Nunley <dev@notgull.net>
2023-10-16 18:50:00 -07:00
John Nunley 917caad8b9
ex: Add an example of an executor with limited tasks
Signed-off-by: John Nunley <dev@notgull.net>
2023-10-15 19:26:22 -07:00
John Nunley 2cfb6e4ed0
v1.5.4
Signed-off-by: John Nunley <dev@notgull.net>
2023-09-27 21:17:03 -07:00
John Nunley 4154ad2190
Fix a bug where TLS would become None (#55)
* Fix a bug where TLS would become None

The bug is invoked as follows:

- Runner 1 is created and stores the current version of the TLS
  LOCAL_QUEUE variable, which is None.
- Runner 2 is also created. It stores the current version of the TLS
  variable as well, which is Runner 1's queue.
- Runner 1 is dropped. It stores None into the LOCAL_QUEUE variable.
- Runner 2 tries to run. It reads from the LOCAL_QUEUE variable, sees
  that it is None, and panics.

This could be solved by just not using the local queue if the variable
is None. However, we can do one better; if the slot is open, we can
optimize the runner by replacing it with our own queue. This should
allow for the local queue to be used more often.

Closes #54

Signed-off-by: John Nunley <dev@notgull.net>
2023-09-27 20:01:15 -07:00
John Nunley 77b5b169c5
v1.5.3
Signed-off-by: John Nunley <dev@notgull.net>
2023-09-25 09:52:25 -07:00
John Nunley ecddfde87a
m: Remove unused memchr dependency
Signed-off-by: John Nunley <dev@notgull.net>
2023-09-23 11:01:56 -07:00
Taiki Endo ff67cb9a5f Update actions/checkout action to v4 2023-09-10 18:18:02 +09:00
Taiki Endo 609aafb330 Bump MSRV to 1.61
```
error: package `memchr v2.6.3` cannot be built because it requires rustc 1.61 or newer, while the currently active rustc version is 1.60.0
```
2023-09-05 00:49:20 +09:00
John Nunley a5ff8df7d9
bugfix: Ensure that ex.run() produces a Send future
This commit makes sure that the run() and tick() functions produce
futures that are Send and Sync, to prevent a regression introduced in
PR #37. Tests are also added to prevent this regression in the future.

Signed-off-by: John Nunley <dev@notgull.net>
2023-08-20 17:08:35 -07:00
John Nunley e19573367b
v1.5.2
Signed-off-by: John Nunley <dev@notgull.net>
2023-08-19 19:17:39 -07:00
John Nunley aed7279805
Add smol-rs logo (#46) 2023-07-17 14:35:07 +09:00
John Nunley 9df3dd4974
alg: Push tasks directly to the local runner
This commit adds an optimization where a thread-local variable contains the queue of the
current runner. Rather than pushing to the global queue and hoping that a local queue
eventually picks it up, tasks are pushed directly to this local queue if available.

This has led to speedups of up to 70% in some cases and up to 10% in other workloads.
2023-07-02 11:29:19 -07:00
John Nunley 1a9e08ce73
Use fastrand v2.0.0 (#45) 2023-06-09 17:53:03 -07:00
Yosh 85c20eb98b
Replace `num_cpus` crate with `std::thread` (#42) 2023-05-05 21:38:00 +09:00
Taiki Endo 8562c41062 Update permissions for security_audit 2023-04-10 02:35:44 +09:00
John Nunley a438e9da8c
v1.5.1 (#40) 2023-04-07 11:01:23 -07:00
John Nunley 6aba704efc
bench: Add benchmarks for lower thread counts (#38) 2023-03-10 19:18:48 -08:00
Taiki Endo b8885f9578 Bump MSRV to 1.48
async-lock 2.7.0 requires Rust 1.48.

```
error[E0658]: use of unstable library feature 'future_readiness_fns'
   --> /home/runner/.cargo/registry/src/github.com-1ecc6299db9ec823/async-lock-2.7.0/src/once_cell.rs:430:45
    |
430 |             self.initialize_or_wait(move || std::future::ready(closure()), &mut Blocking),
    |                                             ^^^^^^^^^^^^^^^^^^
    |
```
2023-03-06 01:43:03 +09:00
Taiki Endo ddfb54d1c4 Minimize GITHUB_TOKEN permissions
Refs: https://github.blog/changelog/2021-04-20-github-actions-control-permissions-for-github_token
2023-03-04 11:41:08 +09:00
Taiki Endo 4d8e7bad23 Set CARGO_NET_GIT_FETCH_WITH_CLI=true in CI 2023-03-04 11:41:08 +09:00
John Nunley a988ee3e46
m: Migrate benchmarks to criterion (#35)
* m: Migrate to criterion

* Update CI
2023-01-23 11:30:43 -08:00
Taiki Endo f196463b09 Enable dependabot update for Rust 2022-12-28 12:27:06 +09:00
Taiki Endo b48a503109 Clean up CI config 2022-12-28 12:27:06 +09:00
John Nunley 8287e520b9
Implement debug output to be better (#33) 2022-12-25 07:12:59 -08:00
Taiki Endo 00ea6cf6a1 Release 1.5.0 2022-11-09 11:45:41 +09:00
Taiki Endo c09ecba5bb Update concurrent-queue to 2 2022-11-09 11:45:41 +09:00
Taiki Endo 92423cfaa1 Remove msrv field from .clippy.toml
Since Rust 1.64, Clippy respects `rust-version` field in Cargo.toml.
rust-lang/rust@b776fb8
2022-11-09 11:45:41 +09:00
John Nunley 263ea89390
Replace once_cell with async-lock (#29) 2022-10-29 21:41:55 -07:00
Taiki Endo d2daab599b Enable -Zmiri-strict-provenance 2022-08-16 22:54:23 +09:00
Taiki Endo 660747cd8d Apply clippy to tests and examples 2022-07-17 21:46:43 +09:00
Taiki Endo d1e4817bdc
Run Miri on CI (#27) 2022-07-08 01:21:16 +09:00
Taiki Endo 16f0b9ca70 Bump MSRV to Rust 1.47
https://github.com/smol-rs/async-task/releases/tag/v4.3.0
2022-07-08 01:14:13 +09:00
Taiki Endo 21f4982a3d Update CI config 2022-07-08 01:12:38 +09:00
Taiki Endo 19919c4694 Update actions/checkout action to v3 2022-05-01 13:35:15 +09:00
Taiki Endo f190408a6f Remove rustfmt.toml 2022-01-09 01:25:32 +09:00
Taiki Endo 367095cdc5 Create GitHub release automatically 2022-01-08 21:33:51 +09:00
Taiki Endo ee7bd4d2af Clean up CI config 2022-01-08 21:33:35 +09:00
Taiki Endo 2341801cd0 Fix clippy::redundant_closure warning 2021-12-30 09:38:02 +09:00
Taiki Endo b9ac443e56 Update slab to 0.4.4 2021-12-30 09:36:58 +09:00
Taiki Endo dacd4db652
Merge pull request #24 from smol-rs/next
Bump to v1.4.1
2021-04-24 18:10:06 +09:00
Taiki Endo edf0296f59 Bump to v1.4.1 2021-04-24 17:56:59 +09:00
Taiki Endo 50f867002c
Merge pull request #23 from smol-rs/slab
Replace vec-arena with slab
2021-04-20 11:29:13 +09:00
Taiki Endo f25cd267ac Replace vec-arena with slab 2021-04-18 22:39:34 +09:00
Taiki Endo 0ca774230e
Merge pull request #21 from smol-rs/readme
Remove readme field from Cargo.toml
2021-02-14 19:50:33 +09:00
Taiki Endo 4decd55ccb Remove readme field from Cargo.toml 2021-02-14 19:40:19 +09:00
Taiki Endo ab77214b6e
Merge pull request #20 from smol-rs/badge
Update license badge to match Cargo.toml
2021-02-14 13:48:11 +09:00
Taiki Endo d1ae069de4 Update license badge to match Cargo.toml 2021-02-14 13:39:05 +09:00
Taiki Endo af56c2a590
Merge pull request #19 from taiki-e/url
Update URLs
2020-12-26 23:54:58 +09:00
Taiki Endo 64b80cf591 Update URLs 2020-12-26 23:47:13 +09:00
Taiki Endo 9bbf0d8403
Merge pull request #18 from taiki-e/compare_and_swap
Replace deprecated compare_and_swap with compare_exchange
2020-12-24 21:34:50 +09:00
Taiki Endo c4d019827f Replace deprecated compare_and_swap with compare_exchange 2020-12-24 21:22:43 +09:00
Taiki Endo 337af8182a
Merge pull request #17 from taiki-e/ci
Fix CI
2020-12-24 19:11:54 +09:00
Taiki Endo 6860810a15 Fix CI 2020-12-24 19:03:29 +09:00
Stjepan Glavina 60e316dd7a
Merge pull request #16 from mbrobbel/patch-1
Fix a typo in README.md
2020-12-08 20:18:23 +01:00
Stjepan Glavina 8dd3422176
Typo 2020-12-08 20:18:06 +01:00
Matthijs Brobbel 6e559e8790
Fix a typo in README.md 2020-12-08 19:48:57 +01:00
Stjepan Glavina 36b9333f06
. 2020-11-30 12:30:15 +01:00
Stjepan Glavina 62a61401d1 Bump to v1.4.0 2020-11-10 15:56:55 +01:00
Stjepan Glavina 38141bb5b4 Cleanup 2020-11-10 15:54:01 +01:00
Stjepan Glavina b55198557b
Merge pull request #14 from Keruspe/local
add Executor::is_empty and LocalExecutor::is_empty
2020-11-10 15:50:44 +01:00
Marc-Antoine Perennou 5a5ecd2763 add Executor::is_empty and LocalExecutor::is_empty
Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
2020-11-06 18:29:30 +01:00
Stjepan Glavina 29ba8a72ad Update futures-lite 2020-10-09 14:49:25 +02:00
Stjepan Glavina 2fcbbdebb8 Add benchmarks 2020-10-08 14:28:45 +02:00
Stjepan Glavina 98aac61707 More tests 2020-09-20 19:06:09 +02:00
Stjepan Glavina 4079184178 Bump to v1.3.0 2020-09-20 16:32:51 +02:00
Stjepan Glavina f9e28cd6d8 Make all executors scoped 2020-09-20 16:30:35 +02:00
Stjepan Glavina e714ec4221 Bump to v1.2.0 2020-09-20 02:40:01 +02:00
Stjepan Glavina 525ac9fe7e Comments 2020-09-20 02:38:56 +02:00
Stjepan Glavina 8cea09da36 Update async-task 2020-09-20 02:36:54 +02:00
Stjepan Glavina 184185a7fa Refactor 2020-09-19 22:40:06 +02:00
Stjepan Glavina 6f2b0b8a49 Make executors scoped 2020-09-19 22:38:11 +02:00
Stjepan Glavina 19eb3ccd6e Bump to v1.1.1 2020-09-14 15:51:41 +02:00
Stjepan Glavina 31519f0cfc Replace AtomicU64 with AtomicUsize 2020-09-14 15:51:17 +02:00
Stjepan Glavina 65ee297322 Bump to v1.1.0 2020-09-10 23:29:05 +02:00
Stjepan Glavina 5e08a9a351 Use atomics to make run() and tick() futures Send + Sync 2020-09-10 23:27:56 +02:00
Stjepan Glavina 05456efbee Bump to v1.0.0 2020-09-07 16:03:27 +02:00
Stjepan Glavina 7b21df5732 Simplify 2020-08-29 20:29:54 +02:00
Stjepan Glavina d6505ef575 Bump to v0.2.1 2020-08-29 20:07:00 +02:00
Stjepan Glavina 924d3a9f26
Merge pull request #9 from stjepang/tick
Add tick() and try_tick()
2020-08-29 20:00:10 +02:00
Stjepan Glavina 2da645e6e0 Refactor 2020-08-29 19:57:21 +02:00
Stjepan Glavina d69638b2d3 simplify 2020-08-29 19:15:42 +02:00
Stjepan Glavina 6c6c1b1c2f Add tick() and try_tick() 2020-08-29 18:31:33 +02:00
22 changed files with 2763 additions and 615 deletions

1
.github/FUNDING.yml vendored
View File

@ -1 +0,0 @@
github: stjepang

9
.github/dependabot.yml vendored Normal file
View File

@ -0,0 +1,9 @@
version: 2
updates:
- package-ecosystem: cargo
directory: /
schedule:
interval: weekly
commit-message:
prefix: ''
labels: []

View File

@ -1,51 +0,0 @@
name: Build and test
on:
push:
branches:
- master
pull_request:
jobs:
build_and_test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest]
rust: [nightly, beta, stable]
steps:
- uses: actions/checkout@v2
- name: Set current week of the year in environnement
if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macOS')
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
- name: Set current week of the year in environnement
if: startsWith(matrix.os, 'windows')
run: echo "::set-env name=CURRENT_WEEK::$(Get-Date -UFormat %V)"
- name: Install latest ${{ matrix.rust }}
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
profile: minimal
override: true
- name: Run cargo check
uses: actions-rs/cargo@v1
with:
command: check
args: --all --bins --examples --tests --all-features
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
if: startsWith(matrix.rust, 'nightly')
uses: actions-rs/cargo@v1
with:
command: check
args: -Z features=dev_dep
- name: Run cargo test
uses: actions-rs/cargo@v1
with:
command: test

102
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,102 @@
name: CI
permissions:
contents: read
on:
pull_request:
push:
branches:
- master
schedule:
- cron: '0 2 * * 0'
env:
CARGO_INCREMENTAL: 0
CARGO_NET_GIT_FETCH_WITH_CLI: true
CARGO_NET_RETRY: 10
CARGO_TERM_COLOR: always
RUST_BACKTRACE: 1
RUSTFLAGS: -D warnings
RUSTDOCFLAGS: -D warnings
RUSTUP_MAX_RETRIES: 10
defaults:
run:
shell: bash
jobs:
test:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest]
rust: [nightly, beta, stable]
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- run: rustup target add wasm32-unknown-unknown
- uses: taiki-e/install-action@cargo-hack
- run: cargo build --all --all-features --all-targets
if: startsWith(matrix.rust, 'nightly')
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
if: startsWith(matrix.rust, 'nightly')
run: cargo check -Z features=dev_dep
- run: cargo test
- run: cargo test --all-features
- run: cargo check --all --all-features --target wasm32-unknown-unknown
- run: cargo hack build --all --all-features --target wasm32-unknown-unknown --no-dev-deps
msrv:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install cargo-hack
uses: taiki-e/install-action@cargo-hack
- run: cargo hack build --rust-version
clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- run: cargo clippy --all-features --all-targets
fmt:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- run: cargo fmt --all --check
miri:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
run: rustup toolchain install nightly --component miri && rustup default nightly
- run: cargo miri test
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation
RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout
- run: cargo miri test --all-features
env:
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks
RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout
security_audit:
permissions:
checks: write
contents: read
issues: write
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
# https://github.com/rustsec/audit-check/issues/2
- uses: rustsec/audit-check@master
with:
token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,26 +0,0 @@
name: Lint
on:
push:
branches:
- master
pull_request:
jobs:
clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set current week of the year in environnement
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
components: clippy
- uses: actions-rs/clippy-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
args: --all-features -- -W clippy::all

22
.github/workflows/release.yml vendored Normal file
View File

@ -0,0 +1,22 @@
name: Release
permissions:
contents: write
on:
push:
tags:
- v[0-9]+.*
jobs:
create-release:
if: github.repository_owner == 'smol-rs'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: taiki-e/create-gh-release-action@v1
with:
changelog: CHANGELOG.md
branch: master
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,20 +0,0 @@
name: Security audit
on:
push:
branches:
- master
pull_request:
jobs:
security_audit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set current week of the year in environnement
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
- uses: actions-rs/audit-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}

View File

@ -1,3 +1,102 @@
# Version 1.11.0
- Re-export the `async_task::FallibleTask` primitive. (#113)
- Support racy initialization of the executor state. This should allow the executor to be
initialized on web targets without any issues. (#108)
# Version 1.10.0
- Add a function `spawn_batch` that allows users to spawn multiple tasks while only locking the executor once. (#92)
# Version 1.9.1
- Remove the thread-local optimization due to the bugs that it introduces. (#106)
# Version 1.9.0
- Re-introduce the thread-local task push optimization to the executor. (#93)
- Bump `async-task` to v4.4.0. (#90)
- Replace some unnecessary atomic operations with non-atomic operations. (#94)
- Use weaker atomic orderings for notifications. (#95)
- When spawning a future, avoid looking up the ID to assign to that future twice. (#96)
# Version 1.8.0
- When spawned tasks panic, the panic is caught and then surfaced in the spawned
`Task`. Previously, the panic would be surfaced in `tick()` or `run()`. (#78)
# Version 1.7.2
- Fix compilation under WebAssembly targets (#77).
# Version 1.7.1
- Fix compilation under WebAssembly targets (#75).
- Add a disclaimer indicating that this is a reference executor (#74).
# Version 1.7.0
- Bump `async-lock` and `futures-lite` to their latest versions. (#70)
# Version 1.6.0
- Remove the thread-local queue optimization, as it caused a number of bugs in production use cases. (#61)
# Version 1.5.4
- Fix a panic that could happen when two concurrent `run()` calls are made and the thread local task slot is left as `None`. (#55)
# Version 1.5.3
- Fix an accidental breaking change in v1.5.2, where `ex.run()` was no longer `Send`. (#50)
- Remove the unused `memchr` dependency. (#51)
# Version 1.5.2
- Add thread-local task queue optimizations, allowing new tasks to avoid using the global queue. (#37)
- Update `fastrand` to v2. (#45)
# Version 1.5.1
- Implement a better form of debug output for Executor and LocalExecutor. (#33)
# Version 1.5.0
- Remove the dependency on the `once_cell` crate to restore the MSRV. (#29)
- Update `concurrent-queue` to v2.
# Version 1.4.1
- Remove dependency on deprecated `vec-arena`. (#23)
# Version 1.4.0
- Add `Executor::is_empty()` and `LocalExecutor::is_empty()`.
# Version 1.3.0
- Parametrize executors over a lifetime to allow spawning non-`static` futures.
# Version 1.2.0
- Update `async-task` to v4.
# Version 1.1.1
- Replace `AtomicU64` with `AtomicUsize`.
# Version 1.1.0
- Use atomics to make `Executor::run()` and `Executor::tick()` futures `Send + Sync`.
# Version 1.0.0
- Stabilize.
# Version 0.2.1
- Add `try_tick()` and `tick()` methods.
# Version 0.2.0
- Redesign the whole API.

View File

@ -1,25 +1,44 @@
[package]
name = "async-executor"
version = "0.2.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>"]
edition = "2018"
# When publishing a new version:
# - Update CHANGELOG.md
# - Create "v1.x.y" git tag
version = "1.11.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>", "John Nunley <dev@notgull.net>"]
edition = "2021"
rust-version = "1.63"
description = "Async executor"
license = "Apache-2.0 OR MIT"
repository = "https://github.com/stjepang/async-executor"
homepage = "https://github.com/stjepang/async-executor"
documentation = "https://docs.rs/async-executor"
repository = "https://github.com/smol-rs/async-executor"
keywords = ["asynchronous", "executor", "single", "multi", "spawn"]
categories = ["asynchronous", "concurrency"]
readme = "README.md"
exclude = ["/.*"]
[features]
# Adds support for executors optimized for use in static variables.
static = []
[dependencies]
async-task = "3.0.0"
concurrent-queue = "1.2.2"
fastrand = "1.3.4"
futures-lite = "1.0.0"
once_cell = "1.4.1"
async-task = "4.4.0"
concurrent-queue = "2.5.0"
fastrand = "2.0.0"
futures-lite = { version = "2.0.0", default-features = false }
slab = "0.4.4"
[target.'cfg(target_family = "wasm")'.dependencies]
futures-lite = { version = "2.0.0", default-features = false, features = ["std"] }
[dev-dependencies]
async-channel = "1.4.1"
async-io = "0.2.0"
async-channel = "2.0.0"
async-io = "2.1.0"
async-lock = "3.0.0"
criterion = { version = "0.5", default-features = false, features = ["cargo_bench_support"] }
easy-parallel = "3.1.0"
fastrand = "2.0.0"
futures-lite = "2.0.0"
once_cell = "1.16.0"
[[bench]]
name = "executor"
harness = false
required-features = ["static"]

View File

@ -1,9 +1,9 @@
# async-executor
[![Build](https://github.com/stjepang/async-executor/workflows/Build%20and%20test/badge.svg)](
https://github.com/stjepang/async-executor/actions)
[![License](https://img.shields.io/badge/license-MIT%2FApache--2.0-blue.svg)](
https://github.com/stjepang/async-executor)
[![Build](https://github.com/smol-rs/async-executor/workflows/Build%20and%20test/badge.svg)](
https://github.com/smol-rs/async-executor/actions)
[![License](https://img.shields.io/badge/license-Apache--2.0_OR_MIT-blue.svg)](
https://github.com/smol-rs/async-executor)
[![Cargo](https://img.shields.io/crates/v/async-executor.svg)](
https://crates.io/crates/async-executor)
[![Documentation](https://docs.rs/async-executor/badge.svg)](
@ -11,6 +11,13 @@ https://docs.rs/async-executor)
Async executors.
This crate provides two reference executors that trade performance for
functionality. They should be considered reference executors that are "good
enough" for most use cases. For more specialized use cases, consider writing
your own executor on top of [`async-task`].
[`async-task`]: https://crates.io/crates/async-task
## Examples
```rust
@ -25,7 +32,7 @@ let task = ex.spawn(async {
println!("Hello world");
});
// Run the executor until the task complets.
// Run the executor until the task completes.
future::block_on(ex.run(task));
```

499
benches/executor.rs Normal file
View File

@ -0,0 +1,499 @@
use std::mem;
use std::thread::available_parallelism;
use async_executor::{Executor, StaticExecutor};
use criterion::{criterion_group, criterion_main, Criterion};
use futures_lite::{future, prelude::*};
const TASKS: usize = 300;
const STEPS: usize = 300;
const LIGHT_TASKS: usize = 25_000;
static EX: Executor<'_> = Executor::new();
static STATIC_EX: StaticExecutor = StaticExecutor::new();
fn run(f: impl FnOnce(), multithread: bool) {
let limit = if multithread {
available_parallelism().unwrap().get()
} else {
1
};
let (s, r) = async_channel::bounded::<()>(1);
easy_parallel::Parallel::new()
.each(0..limit, |_| future::block_on(EX.run(r.recv())))
.finish(move || {
let _s = s;
f()
});
}
fn run_static(f: impl FnOnce(), multithread: bool) {
let limit = if multithread {
available_parallelism().unwrap().get()
} else {
1
};
let (s, r) = async_channel::bounded::<()>(1);
easy_parallel::Parallel::new()
.each(0..limit, |_| future::block_on(STATIC_EX.run(r.recv())))
.finish(move || {
let _s = s;
f()
});
}
fn create(c: &mut Criterion) {
c.bench_function("executor::create", |b| {
b.iter(|| {
let ex = Executor::new();
let task = ex.spawn(async {});
future::block_on(ex.run(task));
})
});
}
fn running_benches(c: &mut Criterion) {
for (prefix, with_static) in [("executor", false), ("static_executor", true)] {
for (group_name, multithread) in [("single_thread", false), ("multi_thread", true)].iter() {
let mut group = c.benchmark_group(group_name.to_string());
group.bench_function(format!("{}::spawn_one", prefix), |b| {
if with_static {
run_static(
|| {
b.iter(|| {
future::block_on(async { STATIC_EX.spawn(async {}).await });
});
},
*multithread,
);
} else {
run(
|| {
b.iter(|| {
future::block_on(async { EX.spawn(async {}).await });
});
},
*multithread,
);
}
});
if !with_static {
group.bench_function("executor::spawn_batch", |b| {
run(
|| {
let mut handles = vec![];
b.iter(|| {
EX.spawn_many((0..250).map(|_| future::yield_now()), &mut handles);
});
handles.clear();
},
*multithread,
)
});
}
group.bench_function(format!("{}::spawn_many_local", prefix), |b| {
if with_static {
run_static(
|| {
b.iter(move || {
future::block_on(async {
let mut tasks = Vec::new();
for _ in 0..LIGHT_TASKS {
tasks.push(STATIC_EX.spawn(async {}));
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
);
} else {
run(
|| {
b.iter(move || {
future::block_on(async {
let mut tasks = Vec::new();
for _ in 0..LIGHT_TASKS {
tasks.push(EX.spawn(async {}));
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
);
}
});
group.bench_function(format!("{}::spawn_recursively", prefix), |b| {
#[allow(clippy::manual_async_fn)]
fn go(i: usize) -> impl Future<Output = ()> + Send + 'static {
async move {
if i != 0 {
EX.spawn(async move {
let fut = go(i - 1).boxed();
fut.await;
})
.await;
}
}
}
#[allow(clippy::manual_async_fn)]
fn go_static(i: usize) -> impl Future<Output = ()> + Send + 'static {
async move {
if i != 0 {
STATIC_EX
.spawn(async move {
let fut = go_static(i - 1).boxed();
fut.await;
})
.await;
}
}
}
if with_static {
run_static(
|| {
b.iter(move || {
future::block_on(async {
let mut tasks = Vec::new();
for _ in 0..TASKS {
tasks.push(STATIC_EX.spawn(go_static(STEPS)));
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
);
} else {
run(
|| {
b.iter(move || {
future::block_on(async {
let mut tasks = Vec::new();
for _ in 0..TASKS {
tasks.push(EX.spawn(go(STEPS)));
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
);
}
});
group.bench_function(format!("{}::yield_now", prefix), |b| {
if with_static {
run_static(
|| {
b.iter(move || {
future::block_on(async {
let mut tasks = Vec::new();
for _ in 0..TASKS {
tasks.push(STATIC_EX.spawn(async move {
for _ in 0..STEPS {
future::yield_now().await;
}
}));
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
);
} else {
run(
|| {
b.iter(move || {
future::block_on(async {
let mut tasks = Vec::new();
for _ in 0..TASKS {
tasks.push(EX.spawn(async move {
for _ in 0..STEPS {
future::yield_now().await;
}
}));
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
);
}
});
group.bench_function(format!("{}::channels", prefix), |b| {
if with_static {
run_static(
|| {
b.iter(move || {
future::block_on(async {
// Create channels.
let mut tasks = Vec::new();
let (first_send, first_recv) = async_channel::bounded(1);
let mut current_recv = first_recv;
for _ in 0..TASKS {
let (next_send, next_recv) = async_channel::bounded(1);
let current_recv =
mem::replace(&mut current_recv, next_recv);
tasks.push(STATIC_EX.spawn(async move {
// Send a notification on to the next task.
for _ in 0..STEPS {
current_recv.recv().await.unwrap();
next_send.send(()).await.unwrap();
}
}));
}
for _ in 0..STEPS {
first_send.send(()).await.unwrap();
current_recv.recv().await.unwrap();
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
)
} else {
run(
|| {
b.iter(move || {
future::block_on(async {
// Create channels.
let mut tasks = Vec::new();
let (first_send, first_recv) = async_channel::bounded(1);
let mut current_recv = first_recv;
for _ in 0..TASKS {
let (next_send, next_recv) = async_channel::bounded(1);
let current_recv =
mem::replace(&mut current_recv, next_recv);
tasks.push(EX.spawn(async move {
// Send a notification on to the next task.
for _ in 0..STEPS {
current_recv.recv().await.unwrap();
next_send.send(()).await.unwrap();
}
}));
}
for _ in 0..STEPS {
first_send.send(()).await.unwrap();
current_recv.recv().await.unwrap();
}
for task in tasks {
task.await;
}
});
});
},
*multithread,
)
}
});
group.bench_function(format!("{}::web_server", prefix), |b| {
if with_static {
run_static(
|| {
b.iter(move || {
future::block_on(async {
let (db_send, db_recv) =
async_channel::bounded::<async_channel::Sender<_>>(
TASKS / 5,
);
let mut db_rng = fastrand::Rng::with_seed(0x12345678);
let mut web_rng = db_rng.fork();
// This task simulates a database.
let db_task = STATIC_EX.spawn(async move {
loop {
// Wait for a new task.
let incoming = match db_recv.recv().await {
Ok(incoming) => incoming,
Err(_) => break,
};
// Process the task. Maybe it takes a while.
for _ in 0..db_rng.usize(..10) {
future::yield_now().await;
}
// Send the data back.
incoming.send(db_rng.usize(..)).await.ok();
}
});
// This task simulates a web server waiting for new tasks.
let server_task = STATIC_EX.spawn(async move {
for i in 0..TASKS {
// Get a new connection.
if web_rng.usize(..=16) == 16 {
future::yield_now().await;
}
let mut web_rng = web_rng.fork();
let db_send = db_send.clone();
let task = STATIC_EX.spawn(async move {
// Check if the data is cached...
if web_rng.bool() {
// ...it's in cache!
future::yield_now().await;
return;
}
// Otherwise we have to make a DB call or two.
for _ in 0..web_rng.usize(STEPS / 2..STEPS) {
let (resp_send, resp_recv) =
async_channel::bounded(1);
db_send.send(resp_send).await.unwrap();
criterion::black_box(
resp_recv.recv().await.unwrap(),
);
}
// Send the data back...
for _ in 0..web_rng.usize(3..16) {
future::yield_now().await;
}
});
task.detach();
if i & 16 == 0 {
future::yield_now().await;
}
}
});
// Spawn and wait for it to stop.
server_task.await;
db_task.await;
});
})
},
*multithread,
)
} else {
run(
|| {
b.iter(move || {
future::block_on(async {
let (db_send, db_recv) =
async_channel::bounded::<async_channel::Sender<_>>(
TASKS / 5,
);
let mut db_rng = fastrand::Rng::with_seed(0x12345678);
let mut web_rng = db_rng.fork();
// This task simulates a database.
let db_task = EX.spawn(async move {
loop {
// Wait for a new task.
let incoming = match db_recv.recv().await {
Ok(incoming) => incoming,
Err(_) => break,
};
// Process the task. Maybe it takes a while.
for _ in 0..db_rng.usize(..10) {
future::yield_now().await;
}
// Send the data back.
incoming.send(db_rng.usize(..)).await.ok();
}
});
// This task simulates a web server waiting for new tasks.
let server_task = EX.spawn(async move {
for i in 0..TASKS {
// Get a new connection.
if web_rng.usize(..=16) == 16 {
future::yield_now().await;
}
let mut web_rng = web_rng.fork();
let db_send = db_send.clone();
let task = EX.spawn(async move {
// Check if the data is cached...
if web_rng.bool() {
// ...it's in cache!
future::yield_now().await;
return;
}
// Otherwise we have to make a DB call or two.
for _ in 0..web_rng.usize(STEPS / 2..STEPS) {
let (resp_send, resp_recv) =
async_channel::bounded(1);
db_send.send(resp_send).await.unwrap();
criterion::black_box(
resp_recv.recv().await.unwrap(),
);
}
// Send the data back...
for _ in 0..web_rng.usize(3..16) {
future::yield_now().await;
}
});
task.detach();
if i & 16 == 0 {
future::yield_now().await;
}
}
});
// Spawn and wait for it to stop.
server_task.await;
db_task.await;
});
})
},
*multithread,
)
}
});
}
}
}
criterion_group!(benches, create, running_benches);
criterion_main!(benches);

95
examples/limit.rs Normal file
View File

@ -0,0 +1,95 @@
//! An executor where you can only push a limited number of tasks.
use async_executor::{Executor, Task};
use async_lock::Semaphore;
use std::{future::Future, sync::Arc, time::Duration};
/// An executor where you can only push a limited number of tasks.
struct LimitedExecutor {
/// Inner running executor.
executor: Executor<'static>,
/// Semaphore limiting the number of tasks.
semaphore: Arc<Semaphore>,
}
impl LimitedExecutor {
fn new(max: usize) -> Self {
Self {
executor: Executor::new(),
semaphore: Semaphore::new(max).into(),
}
}
/// Spawn a task, waiting until there is a slot available.
async fn spawn<F: Future + Send + 'static>(&self, future: F) -> Task<F::Output>
where
F::Output: Send + 'static,
{
// Wait for a semaphore permit.
let permit = self.semaphore.acquire_arc().await;
// Wrap it into a new future.
let future = async move {
let result = future.await;
drop(permit);
result
};
// Spawn the task.
self.executor.spawn(future)
}
/// Run a future to completion.
async fn run<F: Future>(&self, future: F) -> F::Output {
self.executor.run(future).await
}
}
fn main() {
futures_lite::future::block_on(async {
let ex = Arc::new(LimitedExecutor::new(10));
ex.run({
let ex = ex.clone();
async move {
// Spawn a bunch of tasks that wait for a while.
for i in 0..15 {
ex.spawn(async move {
async_io::Timer::after(Duration::from_millis(fastrand::u64(1..3))).await;
println!("Waiting task #{i} finished!");
})
.await
.detach();
}
let (start_tx, start_rx) = async_channel::bounded::<()>(1);
let mut current_rx = start_rx;
// Send the first message.
start_tx.send(()).await.unwrap();
// Spawn a bunch of channel tasks that wake eachother up.
for i in 0..25 {
let (next_tx, next_rx) = async_channel::bounded::<()>(1);
ex.spawn(async move {
current_rx.recv().await.unwrap();
println!("Channel task {i} woken up!");
next_tx.send(()).await.unwrap();
println!("Channel task {i} finished!");
})
.await
.detach();
current_rx = next_rx;
}
// Wait for the last task to finish.
current_rx.recv().await.unwrap();
println!("All tasks finished!");
}
})
.await;
});
}

84
examples/priority.rs Normal file
View File

@ -0,0 +1,84 @@
//! An executor with task priorities.
use std::thread;
use async_executor::{Executor, Task};
use futures_lite::{future, prelude::*};
/// Task priority.
#[repr(usize)]
#[derive(Debug, Clone, Copy)]
enum Priority {
High = 0,
Medium = 1,
Low = 2,
}
/// An executor with task priorities.
///
/// Tasks with lower priorities only get polled when there are no tasks with higher priorities.
struct PriorityExecutor<'a> {
ex: [Executor<'a>; 3],
}
impl<'a> PriorityExecutor<'a> {
/// Creates a new executor.
const fn new() -> PriorityExecutor<'a> {
PriorityExecutor {
ex: [Executor::new(), Executor::new(), Executor::new()],
}
}
/// Spawns a task with the given priority.
fn spawn<T: Send + 'a>(
&self,
priority: Priority,
future: impl Future<Output = T> + Send + 'a,
) -> Task<T> {
self.ex[priority as usize].spawn(future)
}
/// Runs the executor forever.
async fn run(&self) {
loop {
for _ in 0..200 {
let t0 = self.ex[0].tick();
let t1 = self.ex[1].tick();
let t2 = self.ex[2].tick();
// Wait until one of the ticks completes, trying them in order from highest
// priority to lowest priority.
t0.or(t1).or(t2).await;
}
// Yield every now and then.
future::yield_now().await;
}
}
}
fn main() {
static EX: PriorityExecutor<'_> = PriorityExecutor::new();
// Spawn a thread running the executor forever.
thread::spawn(|| future::block_on(EX.run()));
let mut tasks = Vec::new();
for _ in 0..20 {
// Choose a random priority.
let choice = [Priority::High, Priority::Medium, Priority::Low];
let priority = choice[fastrand::usize(..choice.len())];
// Spawn a task with this priority.
tasks.push(EX.spawn(priority, async move {
println!("{:?}", priority);
future::yield_now().await;
println!("{:?}", priority);
}));
}
for task in tasks {
future::block_on(task);
}
}

View File

@ -1 +0,0 @@
version = "Two"

1466
src/lib.rs

File diff suppressed because it is too large Load Diff

479
src/static_executors.rs Normal file
View File

@ -0,0 +1,479 @@
use crate::{debug_state, Executor, LocalExecutor, State};
use async_task::{Builder, Runnable, Task};
use slab::Slab;
use std::{
cell::UnsafeCell,
fmt,
future::Future,
marker::PhantomData,
panic::{RefUnwindSafe, UnwindSafe},
};
impl Executor<'static> {
/// Consumes the [`Executor`] and intentionally leaks it.
///
/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
/// [`StaticExecutor`]'s functions are optimized to require fewer synchronizing operations
/// when spawning, running, and finishing tasks.
///
/// `StaticExecutor` cannot be converted back into a `Executor`, so this operation is
/// irreversible without the use of unsafe.
///
/// # Example
///
/// ```
/// use async_executor::Executor;
/// use futures_lite::future;
///
/// let ex = Executor::new().leak();
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
///
/// future::block_on(ex.run(task));
/// ```
pub fn leak(self) -> &'static StaticExecutor {
let ptr = self.state_ptr();
// SAFETY: So long as an Executor lives, it's state pointer will always be valid
// when accessed through state_ptr. This executor will live for the full 'static
// lifetime so this isn't an arbitrary lifetime extension.
let state: &'static State = unsafe { &*ptr };
std::mem::forget(self);
let mut active = state.active.lock().unwrap();
if !active.is_empty() {
// Reschedule all of the active tasks.
for waker in active.drain() {
waker.wake();
}
// Overwrite to ensure that the slab is deallocated.
*active = Slab::new();
}
// SAFETY: StaticExecutor has the same memory layout as State as it's repr(transparent).
// The lifetime is not altered: 'static -> 'static.
let static_executor: &'static StaticExecutor = unsafe { std::mem::transmute(state) };
static_executor
}
}
impl LocalExecutor<'static> {
/// Consumes the [`LocalExecutor`] and intentionally leaks it.
///
/// Largely equivalent to calling `Box::leak(Box::new(executor))`, but the produced
/// [`StaticLocalExecutor`]'s functions are optimized to require fewer synchronizing operations
/// when spawning, running, and finishing tasks.
///
/// `StaticLocalExecutor` cannot be converted back into a `Executor`, so this operation is
/// irreversible without the use of unsafe.
///
/// # Example
///
/// ```
/// use async_executor::LocalExecutor;
/// use futures_lite::future;
///
/// let ex = LocalExecutor::new().leak();
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
///
/// future::block_on(ex.run(task));
/// ```
pub fn leak(self) -> &'static StaticLocalExecutor {
let ptr = self.inner.state_ptr();
// SAFETY: So long as a LocalExecutor lives, it's state pointer will always be valid
// when accessed through state_ptr. This executor will live for the full 'static
// lifetime so this isn't an arbitrary lifetime extension.
let state: &'static State = unsafe { &*ptr };
std::mem::forget(self);
let mut active = state.active.lock().unwrap();
if !active.is_empty() {
// Reschedule all of the active tasks.
for waker in active.drain() {
waker.wake();
}
// Overwrite to ensure that the slab is deallocated.
*active = Slab::new();
}
// SAFETY: StaticLocalExecutor has the same memory layout as State as it's repr(transparent).
// The lifetime is not altered: 'static -> 'static.
let static_executor: &'static StaticLocalExecutor = unsafe { std::mem::transmute(state) };
static_executor
}
}
/// A static-lifetimed async [`Executor`].
///
/// This is primarily intended to be used in [`static`] variables, or types intended to be used, or can be created in non-static
/// contexts via [`Executor::leak`].
///
/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
///
/// As this type does not implement `Drop`, losing the handle to the executor or failing
/// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned
/// tasks to permanently leak. Any tasks at the time will not be cancelled.
///
/// [`static`]: https://doc.rust-lang.org/std/keyword.static.html
#[repr(transparent)]
pub struct StaticExecutor {
state: State,
}
// SAFETY: Executor stores no thread local state that can be accessed via other thread.
unsafe impl Send for StaticExecutor {}
// SAFETY: Executor internally synchronizes all of it's operations internally.
unsafe impl Sync for StaticExecutor {}
impl UnwindSafe for StaticExecutor {}
impl RefUnwindSafe for StaticExecutor {}
impl fmt::Debug for StaticExecutor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
debug_state(&self.state, "StaticExecutor", f)
}
}
impl StaticExecutor {
/// Creates a new StaticExecutor.
///
/// # Examples
///
/// ```
/// use async_executor::StaticExecutor;
///
/// static EXECUTOR: StaticExecutor = StaticExecutor::new();
/// ```
pub const fn new() -> Self {
Self {
state: State::new(),
}
}
/// Spawns a task onto the executor.
///
/// Note: unlike [`Executor::spawn`], this function requires being called with a `'static`
/// borrow on the executor.
///
/// # Examples
///
/// ```
/// use async_executor::StaticExecutor;
///
/// static EXECUTOR: StaticExecutor = StaticExecutor::new();
///
/// let task = EXECUTOR.spawn(async {
/// println!("Hello world");
/// });
/// ```
pub fn spawn<T: Send + 'static>(
&'static self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
let (runnable, task) = Builder::new()
.propagate_panic(true)
.spawn(|()| future, self.schedule());
runnable.schedule();
task
}
/// Spawns a non-`'static` task onto the executor.
///
/// ## Safety
///
/// The caller must ensure that the returned task terminates
/// or is cancelled before the end of 'a.
pub unsafe fn spawn_scoped<'a, T: Send + 'a>(
&'static self,
future: impl Future<Output = T> + Send + 'a,
) -> Task<T> {
// SAFETY:
//
// - `future` is `Send`
// - `future` is not `'static`, but the caller guarantees that the
// task, and thus its `Runnable` must not live longer than `'a`.
// - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
// Therefore we do not need to worry about what is done with the
// `Waker`.
let (runnable, task) = unsafe {
Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule())
};
runnable.schedule();
task
}
/// Attempts to run a task if at least one is scheduled.
///
/// Running a scheduled task means simply polling its future once.
///
/// # Examples
///
/// ```
/// use async_executor::StaticExecutor;
///
/// static EXECUTOR: StaticExecutor = StaticExecutor::new();
///
/// assert!(!EXECUTOR.try_tick()); // no tasks to run
///
/// let task = EXECUTOR.spawn(async {
/// println!("Hello world");
/// });
///
/// assert!(EXECUTOR.try_tick()); // a task was found
/// ```
pub fn try_tick(&self) -> bool {
self.state.try_tick()
}
/// Runs a single task.
///
/// Running a task means simply polling its future once.
///
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
///
/// # Examples
///
/// ```
/// use async_executor::StaticExecutor;
/// use futures_lite::future;
///
/// static EXECUTOR: StaticExecutor = StaticExecutor::new();
///
/// let task = EXECUTOR.spawn(async {
/// println!("Hello world");
/// });
///
/// future::block_on(EXECUTOR.tick()); // runs the task
/// ```
pub async fn tick(&self) {
self.state.tick().await;
}
/// Runs the executor until the given future completes.
///
/// # Examples
///
/// ```
/// use async_executor::StaticExecutor;
/// use futures_lite::future;
///
/// static EXECUTOR: StaticExecutor = StaticExecutor::new();
///
/// let task = EXECUTOR.spawn(async { 1 + 2 });
/// let res = future::block_on(EXECUTOR.run(async { task.await * 2 }));
///
/// assert_eq!(res, 6);
/// ```
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
self.state.run(future).await
}
/// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state: &'static State = &self.state;
// TODO: If possible, push into the current local queue and notify the ticker.
move |runnable| {
state.queue.push(runnable).unwrap();
state.notify();
}
}
}
impl Default for StaticExecutor {
fn default() -> Self {
Self::new()
}
}
/// A static async [`LocalExecutor`] created from [`LocalExecutor::leak`].
///
/// This is primarily intended to be used in [`thread_local`] variables, or can be created in non-static
/// contexts via [`LocalExecutor::leak`].
///
/// Spawning, running, and finishing tasks are optimized with the assumption that the executor will never be `Drop`'ed.
/// A static executor may require signficantly less overhead in both single-threaded and mulitthreaded use cases.
///
/// As this type does not implement `Drop`, losing the handle to the executor or failing
/// to consistently drive the executor with [`tick`] or [`run`] will cause the all spawned
/// tasks to permanently leak. Any tasks at the time will not be cancelled.
///
/// [`thread_local]: https://doc.rust-lang.org/std/macro.thread_local.html
#[repr(transparent)]
pub struct StaticLocalExecutor {
state: State,
marker_: PhantomData<UnsafeCell<()>>,
}
impl UnwindSafe for StaticLocalExecutor {}
impl RefUnwindSafe for StaticLocalExecutor {}
impl fmt::Debug for StaticLocalExecutor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
debug_state(&self.state, "StaticLocalExecutor", f)
}
}
impl StaticLocalExecutor {
/// Creates a new StaticLocalExecutor.
///
/// # Examples
///
/// ```
/// use async_executor::StaticLocalExecutor;
///
/// thread_local! {
/// static EXECUTOR: StaticLocalExecutor = StaticLocalExecutor::new();
/// }
/// ```
pub const fn new() -> Self {
Self {
state: State::new(),
marker_: PhantomData,
}
}
/// Spawns a task onto the executor.
///
/// Note: unlike [`LocalExecutor::spawn`], this function requires being called with a `'static`
/// borrow on the executor.
///
/// # Examples
///
/// ```
/// use async_executor::LocalExecutor;
///
/// let ex = LocalExecutor::new().leak();
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
/// ```
pub fn spawn<T: 'static>(&'static self, future: impl Future<Output = T> + 'static) -> Task<T> {
let (runnable, task) = Builder::new()
.propagate_panic(true)
.spawn_local(|()| future, self.schedule());
runnable.schedule();
task
}
/// Spawns a non-`'static` task onto the executor.
///
/// ## Safety
///
/// The caller must ensure that the returned task terminates
/// or is cancelled before the end of 'a.
pub unsafe fn spawn_scoped<'a, T: 'a>(
&'static self,
future: impl Future<Output = T> + 'a,
) -> Task<T> {
// SAFETY:
//
// - `future` is not `Send` but `StaticLocalExecutor` is `!Sync`,
// `try_tick`, `tick` and `run` can only be called from the origin
// thread of the `StaticLocalExecutor`. Similarly, `spawn_scoped` can only
// be called from the origin thread, ensuring that `future` and the executor
// share the same origin thread. The `Runnable` can be scheduled from other
// threads, but because of the above `Runnable` can only be called or
// dropped on the origin thread.
// - `future` is not `'static`, but the caller guarantees that the
// task, and thus its `Runnable` must not live longer than `'a`.
// - `self.schedule()` is `Send`, `Sync` and `'static`, as checked below.
// Therefore we do not need to worry about what is done with the
// `Waker`.
let (runnable, task) = unsafe {
Builder::new()
.propagate_panic(true)
.spawn_unchecked(|()| future, self.schedule())
};
runnable.schedule();
task
}
/// Attempts to run a task if at least one is scheduled.
///
/// Running a scheduled task means simply polling its future once.
///
/// # Examples
///
/// ```
/// use async_executor::LocalExecutor;
///
/// let ex = LocalExecutor::new().leak();
/// assert!(!ex.try_tick()); // no tasks to run
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
/// assert!(ex.try_tick()); // a task was found
/// ```
pub fn try_tick(&self) -> bool {
self.state.try_tick()
}
/// Runs a single task.
///
/// Running a task means simply polling its future once.
///
/// If no tasks are scheduled when this method is called, it will wait until one is scheduled.
///
/// # Examples
///
/// ```
/// use async_executor::LocalExecutor;
/// use futures_lite::future;
///
/// let ex = LocalExecutor::new().leak();
///
/// let task = ex.spawn(async {
/// println!("Hello world");
/// });
/// future::block_on(ex.tick()); // runs the task
/// ```
pub async fn tick(&self) {
self.state.tick().await;
}
/// Runs the executor until the given future completes.
///
/// # Examples
///
/// ```
/// use async_executor::LocalExecutor;
/// use futures_lite::future;
///
/// let ex = LocalExecutor::new().leak();
///
/// let task = ex.spawn(async { 1 + 2 });
/// let res = future::block_on(ex.run(async { task.await * 2 }));
///
/// assert_eq!(res, 6);
/// ```
pub async fn run<T>(&self, future: impl Future<Output = T>) -> T {
self.state.run(future).await
}
/// Returns a function that schedules a runnable task when it gets woken up.
fn schedule(&'static self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state: &'static State = &self.state;
// TODO: If possible, push into the current local queue and notify the ticker.
move |runnable| {
state.queue.push(runnable).unwrap();
state.notify();
}
}
}
impl Default for StaticLocalExecutor {
fn default() -> Self {
Self::new()
}
}

View File

@ -0,0 +1,34 @@
use async_executor::LocalExecutor;
use futures_lite::future::{block_on, pending, poll_once};
use futures_lite::pin;
use std::cell::Cell;
#[test]
fn shared_queue_slot() {
block_on(async {
let was_polled = Cell::new(false);
let future = async {
was_polled.set(true);
pending::<()>().await;
};
let ex1 = LocalExecutor::new();
let ex2 = LocalExecutor::new();
// Start the futures for running forever.
let (run1, run2) = (ex1.run(pending::<()>()), ex2.run(pending::<()>()));
pin!(run1);
pin!(run2);
assert!(poll_once(run1.as_mut()).await.is_none());
assert!(poll_once(run2.as_mut()).await.is_none());
// Spawn the future on executor one and then poll executor two.
ex1.spawn(future).detach();
assert!(poll_once(run2).await.is_none());
assert!(!was_polled.get());
// Poll the first one.
assert!(poll_once(run1).await.is_none());
assert!(was_polled.get());
});
}

144
tests/drop.rs Normal file
View File

@ -0,0 +1,144 @@
#[cfg(not(miri))]
use std::mem;
use std::panic::catch_unwind;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use std::task::{Poll, Waker};
use async_executor::{Executor, Task};
use futures_lite::future;
use once_cell::sync::Lazy;
#[test]
fn executor_cancels_everything() {
static DROP: AtomicUsize = AtomicUsize::new(0);
static WAKER: Lazy<Mutex<Option<Waker>>> = Lazy::new(Default::default);
let ex = Executor::new();
let task = ex.spawn(async {
let _guard = CallOnDrop(|| {
DROP.fetch_add(1, Ordering::SeqCst);
});
future::poll_fn(|cx| {
*WAKER.lock().unwrap() = Some(cx.waker().clone());
Poll::Pending::<()>
})
.await;
});
future::block_on(ex.tick());
assert!(WAKER.lock().unwrap().is_some());
assert_eq!(DROP.load(Ordering::SeqCst), 0);
drop(ex);
assert_eq!(DROP.load(Ordering::SeqCst), 1);
assert!(catch_unwind(|| future::block_on(task)).is_err());
assert_eq!(DROP.load(Ordering::SeqCst), 1);
}
#[cfg(not(miri))]
#[test]
fn leaked_executor_leaks_everything() {
static DROP: AtomicUsize = AtomicUsize::new(0);
static WAKER: Lazy<Mutex<Option<Waker>>> = Lazy::new(Default::default);
let ex = Executor::new();
let task = ex.spawn(async {
let _guard = CallOnDrop(|| {
DROP.fetch_add(1, Ordering::SeqCst);
});
future::poll_fn(|cx| {
*WAKER.lock().unwrap() = Some(cx.waker().clone());
Poll::Pending::<()>
})
.await;
});
future::block_on(ex.tick());
assert!(WAKER.lock().unwrap().is_some());
assert_eq!(DROP.load(Ordering::SeqCst), 0);
mem::forget(ex);
assert_eq!(DROP.load(Ordering::SeqCst), 0);
assert!(future::block_on(future::poll_once(task)).is_none());
assert_eq!(DROP.load(Ordering::SeqCst), 0);
}
#[test]
fn await_task_after_dropping_executor() {
let s: String = "hello".into();
let ex = Executor::new();
let task: Task<&str> = ex.spawn(async { &*s });
assert!(ex.try_tick());
drop(ex);
assert_eq!(future::block_on(task), "hello");
drop(s);
}
#[test]
fn drop_executor_and_then_drop_finished_task() {
static DROP: AtomicUsize = AtomicUsize::new(0);
let ex = Executor::new();
let task = ex.spawn(async {
CallOnDrop(|| {
DROP.fetch_add(1, Ordering::SeqCst);
})
});
assert!(ex.try_tick());
assert_eq!(DROP.load(Ordering::SeqCst), 0);
drop(ex);
assert_eq!(DROP.load(Ordering::SeqCst), 0);
drop(task);
assert_eq!(DROP.load(Ordering::SeqCst), 1);
}
#[test]
fn drop_finished_task_and_then_drop_executor() {
static DROP: AtomicUsize = AtomicUsize::new(0);
let ex = Executor::new();
let task = ex.spawn(async {
CallOnDrop(|| {
DROP.fetch_add(1, Ordering::SeqCst);
})
});
assert!(ex.try_tick());
assert_eq!(DROP.load(Ordering::SeqCst), 0);
drop(task);
assert_eq!(DROP.load(Ordering::SeqCst), 1);
drop(ex);
assert_eq!(DROP.load(Ordering::SeqCst), 1);
}
#[test]
fn iterator_panics_mid_run() {
let ex = Executor::new();
let panic = std::panic::catch_unwind(|| {
let mut handles = vec![];
ex.spawn_many(
(0..50).map(|i| if i == 25 { panic!() } else { future::ready(i) }),
&mut handles,
)
});
assert!(panic.is_err());
}
struct CallOnDrop<F: Fn()>(F);
impl<F: Fn()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}

99
tests/larger_tasks.rs Normal file
View File

@ -0,0 +1,99 @@
//! Test for larger tasks.
use async_executor::Executor;
use futures_lite::future::{self, block_on};
use futures_lite::prelude::*;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
fn do_run<Fut: Future<Output = ()>>(mut f: impl FnMut(Arc<Executor<'static>>) -> Fut) {
// This should not run for longer than two minutes.
#[cfg(not(miri))]
let _stop_timeout = {
let (stop_timeout, stopper) = async_channel::bounded::<()>(1);
thread::spawn(move || {
block_on(async move {
let timeout = async {
async_io::Timer::after(Duration::from_secs(2 * 60)).await;
eprintln!("test timed out after 2m");
std::process::exit(1)
};
let _ = stopper.recv().or(timeout).await;
})
});
stop_timeout
};
let ex = Arc::new(Executor::new());
// Test 1: Use the `run` command.
block_on(ex.run(f(ex.clone())));
// Test 2: Loop on `tick`.
block_on(async {
let ticker = async {
loop {
ex.tick().await;
}
};
f(ex.clone()).or(ticker).await
});
// Test 3: Run on many threads.
thread::scope(|scope| {
let (_signal, shutdown) = async_channel::bounded::<()>(1);
for _ in 0..16 {
let shutdown = shutdown.clone();
let ex = &ex;
scope.spawn(move || block_on(ex.run(shutdown.recv())));
}
block_on(f(ex.clone()));
});
// Test 4: Tick loop on many threads.
thread::scope(|scope| {
let (_signal, shutdown) = async_channel::bounded::<()>(1);
for _ in 0..16 {
let shutdown = shutdown.clone();
let ex = &ex;
scope.spawn(move || {
block_on(async move {
let ticker = async {
loop {
ex.tick().await;
}
};
shutdown.recv().or(ticker).await
})
});
}
block_on(f(ex.clone()));
});
}
#[test]
fn smoke() {
do_run(|ex| async move { ex.spawn(async {}).await });
}
#[test]
fn yield_now() {
do_run(|ex| async move { ex.spawn(future::yield_now()).await })
}
#[test]
fn timer() {
do_run(|ex| async move {
ex.spawn(async_io::Timer::after(Duration::from_millis(5)))
.await;
})
}

24
tests/local_queue.rs Normal file
View File

@ -0,0 +1,24 @@
use async_executor::Executor;
use futures_lite::{future, pin};
#[test]
fn two_queues() {
future::block_on(async {
// Create an executor with two runners.
let ex = Executor::new();
let (run1, run2) = (
ex.run(future::pending::<()>()),
ex.run(future::pending::<()>()),
);
let mut run1 = Box::pin(run1);
pin!(run2);
// Poll them both.
assert!(future::poll_once(run1.as_mut()).await.is_none());
assert!(future::poll_once(run2.as_mut()).await.is_none());
// Drop the first one, which should leave the local queue in the `None` state.
drop(run1);
assert!(future::poll_once(run2.as_mut()).await.is_none());
});
}

14
tests/panic_prop.rs Normal file
View File

@ -0,0 +1,14 @@
use async_executor::Executor;
use futures_lite::{future, prelude::*};
#[test]
fn test_panic_propagation() {
let ex = Executor::new();
let task = ex.spawn(async { panic!("should be caught by the task") });
// Running the executor should not panic.
assert!(ex.try_tick());
// Polling the task should.
assert!(future::block_on(task.catch_unwind()).is_err());
}

45
tests/spawn_many.rs Normal file
View File

@ -0,0 +1,45 @@
use async_executor::{Executor, LocalExecutor};
use futures_lite::future;
#[cfg(not(miri))]
const READY_COUNT: usize = 50_000;
#[cfg(miri)]
const READY_COUNT: usize = 505;
#[test]
fn spawn_many() {
future::block_on(async {
let ex = Executor::new();
// Spawn a lot of tasks.
let mut tasks = vec![];
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks);
// Run all of the tasks in parallel.
ex.run(async move {
for (i, task) in tasks.into_iter().enumerate() {
assert_eq!(task.await, i);
}
})
.await;
});
}
#[test]
fn spawn_many_local() {
future::block_on(async {
let ex = LocalExecutor::new();
// Spawn a lot of tasks.
let mut tasks = vec![];
ex.spawn_many((0..READY_COUNT).map(future::ready), &mut tasks);
// Run all of the tasks in parallel.
ex.run(async move {
for (i, task) in tasks.into_iter().enumerate() {
assert_eq!(task.await, i);
}
})
.await;
});
}