commit 43b1991fc871470b223dde203c20bf8b26fe914d Author: Stjepan Glavina Date: Sat May 9 21:06:27 2020 +0200 Initial commit diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml new file mode 100644 index 0000000..e1f75e7 --- /dev/null +++ b/.github/FUNDING.yml @@ -0,0 +1 @@ +github: stjepang diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml new file mode 100644 index 0000000..a105cf8 --- /dev/null +++ b/.github/workflows/build-and-test.yaml @@ -0,0 +1,44 @@ +name: Build and test + +on: + push: + branches: + - master + pull_request: + +jobs: + build_and_test: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: ubuntu-latest + rust: [nightly, beta, stable] + steps: + - uses: actions/checkout@v2 + + - name: Set current week of the year in environnement + if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macOS') + run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" + + - name: Set current week of the year in environnement + if: startsWith(matrix.os, 'windows') + run: echo "::set-env name=CURRENT_WEEK::$(Get-Date -UFormat %V)" + + - name: Install latest ${{ matrix.rust }} + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.rust }} + profile: minimal + override: true + + - name: Run cargo check + uses: actions-rs/cargo@v1 + with: + command: check + args: --all --benches --bins --examples --tests --all-features + + - name: Run cargo test + uses: actions-rs/cargo@v1 + with: + command: test diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml new file mode 100644 index 0000000..6242b0d --- /dev/null +++ b/.github/workflows/lint.yaml @@ -0,0 +1,59 @@ +name: Lint + +on: + push: + branches: + - master + pull_request: + +jobs: + clippy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set current week of the year in environnement + run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" + + - name: Cache .rustup + uses: actions/cache@v1 + with: + path: ~/.rustup + key: rustup-${{ env.CURRENT_WEEK }}-clippy + + - name: Cache cargo binaries + uses: actions/cache@v1 + with: + path: ~/.cargo/bin + key: cargo-binaries-${{ env.CURRENT_WEEK }}-clippy + + - uses: actions-rs/toolchain@v1 + with: + toolchain: stable + profile: minimal + components: clippy + - uses: actions-rs/clippy-check@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} + args: --all-features + loc: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set current week of the year in environnement + run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" + + - name: Cache cargo binaries + uses: actions/cache@v1 + with: + path: ~/.cargo/bin + key: cargo-binaries-${{ env.CURRENT_WEEK }}-loc + + - name: Install cargo-count + run: cargo install cargo-count || true + + - name: Run cargo count + # '-a --exclude=$(cat .gitignore)' necessary to work around: + # https://github.com/kbknapp/cargo-count/issues/36 + run: cargo count -l rs -a --exclude=$(cat .gitignore) src diff --git a/.github/workflows/security.yaml b/.github/workflows/security.yaml new file mode 100644 index 0000000..e94b952 --- /dev/null +++ b/.github/workflows/security.yaml @@ -0,0 +1,32 @@ +name: Security audit + +on: + push: + branches: + - master + pull_request: + +jobs: + security_audit: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Set current week of the year in environnement + run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" + + - name: Cache .rustup + uses: actions/cache@v1 + with: + path: ~/.rustup + key: rustup-${{ env.CURRENT_WEEK }}-security_audit + + - name: Cache cargo binaries + uses: actions/cache@v1 + with: + path: ~/.cargo/bin + key: cargo-binaries-${{ env.CURRENT_WEEK }}-security_audit + + - uses: actions-rs/audit-check@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..1187b6f --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "blocking" +version = "0.2.0" +authors = ["Stjepan Glavina "] +edition = "2018" +description = "An executor for isolating blocking I/O in async programs" +license = "Apache-2.0 OR MIT" +repository = "https://github.com/stjepang/blocking" +homepage = "https://github.com/stjepang/blocking" +documentation = "https://docs.rs/blocking" +keywords = ["async", "file", "stdio", "stdin", "process"] +categories = ["asynchronous", "concurrency"] +readme = "README.md" + +[dependencies] +async-task = "3.0.0" +futures = { version = "0.3.4", default-features = false, features = ["std"] } +once_cell = "1.3.1" + +[dev-dependencies] +futures = { version = "0.3.4", default-features = false, features = ["executor"] } diff --git a/LICENSE-APACHE b/LICENSE-APACHE new file mode 100644 index 0000000..16fe87b --- /dev/null +++ b/LICENSE-APACHE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [yyyy] [name of copyright owner] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. diff --git a/LICENSE-MIT b/LICENSE-MIT new file mode 100644 index 0000000..31aa793 --- /dev/null +++ b/LICENSE-MIT @@ -0,0 +1,23 @@ +Permission is hereby granted, free of charge, to any +person obtaining a copy of this software and associated +documentation files (the "Software"), to deal in the +Software without restriction, including without +limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software +is furnished to do so, subject to the following +conditions: + +The above copyright notice and this permission notice +shall be included in all copies or substantial portions +of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ef85c92 --- /dev/null +++ b/README.md @@ -0,0 +1,103 @@ +# blocking + +[![Build](https://github.com/stjepang/blocking/workflows/Build%20and%20test/badge.svg)]( +https://github.com/stjepang/blocking/actions) +[![Coverage Status](https://coveralls.io/repos/github/stjepang/blocking/badge.svg?branch=master)]( +https://coveralls.io/github/stjepang/blocking?branch=master) +[![License](https://img.shields.io/badge/license-MIT%2FApache--2.0-blue.svg)]( +https://github.com/stjepang/blocking) +[![Cargo](https://img.shields.io/crates/v/blocking.svg)]( +https://crates.io/crates/blocking) +[![Documentation](https://docs.rs/blocking/badge.svg)]( +https://docs.rs/blocking) +[![Chat](https://img.shields.io/discord/701824908866617385.svg?logo=discord)]( +https://discord.gg/x6m5Vvt) + +An executor for isolating blocking I/O in async programs. + +Sometimes there's no way to avoid blocking I/O. Consider files or stdin, which have weak async +support on modern operating systems. While [IOCP], [AIO], and [io_uring] are possible +solutions, they're not always available or ideal. + +Since blocking is not allowed inside futures, we must move blocking I/O onto a special +executor provided by this crate. On this executor, futures are allowed to "cheat" and block +without any restrictions. The executor dynamically spawns and stops threads depending on the +current number of running futures. + +Note that there is a limit on the number of active threads. Once that limit is hit, a running +task has to complete or yield before other tasks get a chance to continue running. When a +thread is idle, it waits for the next task or shuts down after a certain timeout. + +[IOCP]: https://en.wikipedia.org/wiki/Input/output_completion_port +[AIO]: http://man7.org/linux/man-pages/man2/io_submit.2.html +[io_uring]: https://lwn.net/Articles/776703/ + +# Examples + +Spawn a blocking future with [`Blocking::spawn()`]: + +```rust +use blocking::Blocking; +use std::fs; + +# futures::executor::block_on(async { +let contents = Blocking::spawn(async { fs::read_to_string("file.txt") }).await?; +# std::io::Result::Ok(()) }); +``` + +Or do the same with the [`blocking!`] macro: + +```rust +use blocking::blocking; +use std::fs; + +# futures::executor::block_on(async { +let contents = blocking!(fs::read_to_string("file.txt"))?; +# std::io::Result::Ok(()) }); +``` + +Read a file and pipe its contents to stdout: + +```rust +use blocking::Blocking; +use std::fs::File; +use std::io::stdout; + +# futures::executor::block_on(async { +let input = Blocking::new(File::open("file.txt")?); +let mut output = Blocking::new(stdout()); + +futures::io::copy(input, &mut output).await?; +# std::io::Result::Ok(()) }); +``` + +Iterate over the contents of a directory: + +```rust +use blocking::Blocking; +use futures::prelude::*; +use std::fs; + +# futures::executor::block_on(async { +let mut dir = Blocking::new(fs::read_dir(".")?); + +while let Some(item) = dir.next().await { + println!("{}", item?.file_name().to_string_lossy()); +} +# std::io::Result::Ok(()) }); +``` + +## License + +Licensed under either of + + * Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0) + * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. + +#### Contribution + +Unless you explicitly state otherwise, any contribution intentionally submitted +for inclusion in the work by you, as defined in the Apache-2.0 license, shall be +dual licensed as above, without any additional terms or conditions. diff --git a/examples/foo.rs b/examples/foo.rs new file mode 100644 index 0000000..9575a9e --- /dev/null +++ b/examples/foo.rs @@ -0,0 +1,28 @@ +use std::fs::File; +use std::io; +use std::io::prelude::*; +use std::time::Instant; + +use blocking::Blocking; +use futures::prelude::*; + +fn main() -> io::Result<()> { + futures::executor::block_on(async { + let start = Instant::now(); + let mut file = File::open("big.txt")?; + let mut contents = Vec::::new(); + + // file.read_to_end(&mut contents)?; + // Blocking::new(file).read_to_end(&mut contents).await?; + // smol::reader(file).read_to_end(&mut contents).await?; + + let mut file2 = File::create("big2.txt")?; + // Blocking::new(file2).write_all(&contents).await?; + futures::io::copy(Blocking::new(file), &mut Blocking::new(file2)).await?; + + dbg!(contents.len()); + dbg!(start.elapsed()); + + Ok(()) + }) +} diff --git a/examples/ls.rs b/examples/ls.rs new file mode 100644 index 0000000..1bb5aa6 --- /dev/null +++ b/examples/ls.rs @@ -0,0 +1,29 @@ +//! Lists directory contents. +//! +//! Run with: +//! +//! ``` +//! cargo run --example ls . +//! ``` + +use std::fs; +use std::io; +use std::env; + +use blocking::Blocking; +use futures::executor::block_on; +use futures::prelude::*; + +fn main() -> io::Result<()> { + let path = env::args().nth(1).unwrap_or(".".into()); + + block_on(async { + let mut dir = Blocking::new(fs::read_dir(path)?); + + while let Some(item) = dir.next().await { + println!("{}", item?.file_name().to_string_lossy()); + } + + Ok(()) + }) +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..5fbb101 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,1066 @@ +//! An executor for isolating blocking I/O in async programs. +//! +//! Sometimes there's no way to avoid blocking I/O. Consider files or stdin, which have weak async +//! support on modern operating systems. While [IOCP], [AIO], and [io_uring] are possible +//! solutions, they're not always available or ideal. +//! +//! Since blocking is not allowed inside futures, we must move blocking I/O onto a special +//! executor provided by this crate. On this executor, futures are allowed to "cheat" and block +//! without any restrictions. The executor dynamically spawns and stops threads depending on the +//! current number of running futures. +//! +//! Note that there is a limit on the number of active threads. Once that limit is hit, a running +//! task has to complete or yield before other tasks get a chance to continue running. When a +//! thread is idle, it waits for the next task or shuts down after a certain timeout. +//! +//! [IOCP]: https://en.wikipedia.org/wiki/Input/output_completion_port +//! [AIO]: http://man7.org/linux/man-pages/man2/io_submit.2.html +//! [io_uring]: https://lwn.net/Articles/776703/ +//! +//! # Examples +//! +//! Spawn a blocking future with [`Blocking::spawn()`]: +//! +//! ```no_run +//! use blocking::Blocking; +//! use std::fs; +//! +//! # futures::executor::block_on(async { +//! let contents = Blocking::spawn(async { fs::read_to_string("file.txt") }).await?; +//! # std::io::Result::Ok(()) }); +//! ``` +//! +//! Or do the same with the [`blocking!`] macro: +//! +//! ```no_run +//! use blocking::blocking; +//! use std::fs; +//! +//! # futures::executor::block_on(async { +//! let contents = blocking!(fs::read_to_string("file.txt"))?; +//! # std::io::Result::Ok(()) }); +//! ``` +//! +//! Read a file and pipe its contents to stdout: +//! +//! ```no_run +//! use blocking::Blocking; +//! use std::fs::File; +//! use std::io::stdout; +//! +//! # futures::executor::block_on(async { +//! let input = Blocking::new(File::open("file.txt")?); +//! let mut output = Blocking::new(stdout()); +//! +//! futures::io::copy(input, &mut output).await?; +//! # std::io::Result::Ok(()) }); +//! ``` +//! +//! Iterate over the contents of a directory: +//! +//! ```no_run +//! use blocking::Blocking; +//! use futures::prelude::*; +//! use std::fs; +//! +//! # futures::executor::block_on(async { +//! let mut dir = Blocking::new(fs::read_dir(".")?); +//! +//! while let Some(item) = dir.next().await { +//! println!("{}", item?.file_name().to_string_lossy()); +//! } +//! # std::io::Result::Ok(()) }); +//! ``` + +use std::any::Any; +use std::collections::VecDeque; +use std::io::{self, Read, Write}; +use std::mem; +use std::panic; +use std::pin::Pin; +use std::slice; +use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; +use std::sync::{Arc, Condvar, Mutex, MutexGuard}; +use std::task::{Context, Poll}; +use std::thread; +use std::time::Duration; + +use futures::channel::mpsc; +use futures::prelude::*; +use futures::task::AtomicWaker; +use once_cell::sync::Lazy; + +/// A runnable future, ready for execution. +/// +/// When a future is internally spawned using `async_task::spawn()` or `async_task::spawn_local()`, +/// we get back two values: +/// +/// 1. an `async_task::Task<()>`, which we refer to as a `Runnable` +/// 2. an `async_task::JoinHandle`, which is wrapped inside a `Task` +/// +/// Once a `Runnable` is run, it "vanishes" and only reappears when its future is woken. When it's +/// woken up, its schedule function is called, which means the `Runnable` gets pushed into the main +/// task queue in the executor. +type Runnable = async_task::Task<()>; + +struct Task(Option>); + +impl Drop for Task { + fn drop(&mut self) { + if let Some(handle) = &self.0 { + handle.cancel(); + } + } +} + +impl Future for Task { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut self.0.as_mut().unwrap()).poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(output) => Poll::Ready(output.expect("task has failed")), + } + } +} + +/// The blocking executor. +struct Executor { + /// Inner state of the executor. + inner: Mutex, + + /// Used to put idle threads to sleep and wake them up when new work comes in. + cvar: Condvar, +} + +/// Inner state of the blocking executor. +struct Inner { + /// Number of idle threads in the pool. + /// + /// Idle threads are sleeping, waiting to get a task to run. + idle_count: usize, + + /// Total number of threads in the pool. + /// + /// This is the number of idle threads + the number of active threads. + thread_count: usize, + + /// The queue of blocking tasks. + queue: VecDeque, +} + +impl Executor { + /// Spawns a future onto this executor. + /// + /// Returns a [`Task`] handle for the spawned task. + fn spawn(future: impl Future + Send + 'static) -> Task { + static EXECUTOR: Lazy = Lazy::new(|| Executor { + inner: Mutex::new(Inner { + idle_count: 0, + thread_count: 0, + queue: VecDeque::new(), + }), + cvar: Condvar::new(), + }); + + // Create a task, schedule it, and return its `Task` handle. + let (runnable, handle) = async_task::spawn(future, |r| EXECUTOR.schedule(r), ()); + runnable.schedule(); + Task(Some(handle)) + } + + /// Runs the main loop on the current thread. + /// + /// This function runs blocking tasks until it becomes idle and times out. + fn main_loop(&'static self) { + let mut inner = self.inner.lock().unwrap(); + loop { + // This thread is not idle anymore because it's going to run tasks. + inner.idle_count -= 1; + + // Run tasks in the queue. + while let Some(runnable) = inner.queue.pop_front() { + // We have found a task - grow the pool if needed. + self.grow_pool(inner); + + // Run the task. + let _ = panic::catch_unwind(|| runnable.run()); + + // Re-lock the inner state and continue. + inner = self.inner.lock().unwrap(); + } + + // This thread is now becoming idle. + inner.idle_count += 1; + + // Put the thread to sleep until another task is scheduled. + let timeout = Duration::from_millis(500); + let (lock, res) = self.cvar.wait_timeout(inner, timeout).unwrap(); + inner = lock; + + // If there are no tasks after a while, stop this thread. + if res.timed_out() && inner.queue.is_empty() { + inner.idle_count -= 1; + inner.thread_count -= 1; + break; + } + } + } + + /// Schedules a runnable task for execution. + fn schedule(&'static self, runnable: Runnable) { + let mut inner = self.inner.lock().unwrap(); + inner.queue.push_back(runnable); + + // Notify a sleeping thread and spawn more threads if needed. + self.cvar.notify_one(); + self.grow_pool(inner); + } + + /// Spawns more blocking threads if the pool is overloaded with work. + fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) { + // If runnable tasks greatly outnumber idle threads and there aren't too many threads + // already, then be aggressive: wake all idle threads and spawn one more thread. + while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < 500 { + // The new thread starts in idle state. + inner.idle_count += 1; + inner.thread_count += 1; + + // Notify all existing idle threads because we need to hurry up. + self.cvar.notify_all(); + + // Spawn the new thread. + thread::spawn(move || self.main_loop()); + } + } +} + +/// Spawns blocking I/O onto a thread. +/// +/// Note that `blocking!(expr)` is just syntax sugar for +/// `Blocking::spawn(async move { expr }).await`. +/// +/// # Examples +/// +/// Read a file into a string: +/// +/// ```no_run +/// use blocking::blocking; +/// use std::fs; +/// +/// # futures::executor::block_on(async { +/// let contents = blocking!(fs::read_to_string("file.txt"))?; +/// # std::io::Result::Ok(()) }); +/// ``` +/// +/// Spawn a process: +/// +/// ```no_run +/// use blocking::blocking; +/// use std::process::Command; +/// +/// # futures::executor::block_on(async { +/// let out = blocking!(Command::new("dir").output())?; +/// # std::io::Result::Ok(()) }); +/// ``` +#[macro_export] +macro_rules! blocking { + ($($expr:tt)*) => { + $crate::Blocking::spawn(async move { $($expr)* }).await + }; +} + +/// Async I/O that runs on a thread. +/// +/// This handle represents a future performing some blocking I/O on the special thread pool. The +/// output of the future can be awaited because [`Blocking`] itself is a future. +/// +/// It's also possible to interact with [`Blocking`] through [`Stream`], [`AsyncRead`] and +/// [`AsyncWrite`] traits if the inner type implements [`Iterator`], [`Read`], or [`Write`]. +/// +/// To spawn a future and start it immediately, use [`Blocking::spawn()`]. To create an I/O handle +/// that will lazily spawn an I/O future on its own, use [`Blocking::new()`]. +/// +/// If the [`Blocking`] handle is dropped, the future performing I/O will be canceled if it hasn't +/// completed yet. However, note that it's not possible to forcibly cancel blocking I/O, so if the +/// future is currently running, it won't be canceled until it yields. +/// +/// If writing some data through the [`AsyncWrite`] trait, make sure to flush before dropping the +/// [`Blocking`] handle or some written data might get lost. Alternatively, await the handle to +/// complete the pending work and extract the inner blocking I/O handle. +/// +/// # Examples +/// +/// ``` +/// use blocking::Blocking; +/// use futures::prelude::*; +/// use std::io::stdout; +/// +/// # futures::executor::block_on(async { +/// let mut stdout = Blocking::new(stdout()); +/// stdout.write_all(b"Hello world!").await?; +/// +/// let inner = stdout.await; +/// # std::io::Result::Ok(()) }); +/// ``` +pub struct Blocking(State); + +impl Blocking { + /// Wraps a blocking I/O handle into an async interface. + /// + /// # Examples + /// + /// ```no_run + /// use blocking::Blocking; + /// use std::io::stdin; + /// + /// # futures::executor::block_on(async { + /// // Create an async handle to standard input. + /// let stdin = Blocking::new(stdin()); + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn new(io: T) -> Blocking { + Blocking(State::Idle(Some(Box::new(io)))) + } + + /// Gets a mutable reference to the blocking I/O handle. + /// + /// This is an async method because the I/O handle might be on a different thread and needs to + /// be moved onto the current thread before we can get a reference to it. + /// + /// # Examples + /// + /// ```no_run + /// use blocking::Blocking; + /// use std::fs::File; + /// + /// # futures::executor::block_on(async { + /// let mut file = Blocking::new(File::create("file.txt")?); + /// let metadata = file.get_mut().await.metadata()?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn get_mut(&mut self) -> &mut T { + // Wait for the running task to stop and ignore I/O errors if there are any. + let _ = future::poll_fn(|cx| self.poll_stop(cx)).await; + + // Assume idle state and get a reference to the inner value. + match &mut self.0 { + State::Idle(t) => t.as_mut().expect("inner value was taken out"), + State::Streaming(..) | State::Reading(..) | State::Writing(..) | State::Task(..) => { + unreachable!("when stopped, the state machine must be in idle state"); + } + } + } + + /// Extracts the inner blocking I/O handle. + /// + /// This is an async method because the I/O handle might be on a different thread and needs to + /// be moved onto the current thread before we can extract it. + /// + /// Note that awaiting this method is equivalent to awaiting the [`Blocking`] handle. + /// + /// # Examples + /// + /// ```no_run + /// use blocking::Blocking; + /// use futures::prelude::*; + /// use std::fs::File; + /// + /// # futures::executor::block_on(async { + /// let mut file = Blocking::new(File::create("file.txt")?); + /// file.write_all(b"Hello world!").await?; + /// + /// let file = file.into_inner().await; + /// # std::io::Result::Ok(()) }); + /// ``` + pub async fn into_inner(self) -> T { + // There's a bug in rustdoc causing it to render `mut self` as `__arg0: Self`, so we just + // bind `self` to a local mutable variable. + let mut this = self; + + // Wait for the running task to stop and ignore I/O errors if there are any. + let _ = future::poll_fn(|cx| this.poll_stop(cx)).await; + + // Assume idle state and extract the inner value. + match &mut this.0 { + State::Idle(t) => *t.take().expect("inner value was taken out"), + State::Streaming(..) | State::Reading(..) | State::Writing(..) | State::Task(..) => { + unreachable!("when stopped, the state machine must be in idle state"); + } + } + } + + /// Waits for the running task to stop. + /// + /// On success, the state machine is moved into the idle state. + fn poll_stop(&mut self, cx: &mut Context<'_>) -> Poll> { + loop { + match &mut self.0 { + State::Idle(_) => return Poll::Ready(Ok(())), + + State::Streaming(any, task) => { + // Drop the receiver to close the channel. This stops the `send()` operation in + // the task, after which the task returns the iterator back. + any.take(); + + // Poll the task to retrieve the iterator. + let iter = futures::ready!(Pin::new(task).poll(cx)); + self.0 = State::Idle(Some(iter)); + } + + State::Reading(reader, task) => { + // Drop the reader to close the pipe. This stops the `futures::io::copy` + // operation in the task, after which the task returns the I/O handle back. + reader.take(); + + // Poll the task to retrieve the I/O handle. + let (res, io) = futures::ready!(Pin::new(task).poll(cx)); + // Make sure to move into the idle state before reporting errors. + self.0 = State::Idle(Some(io)); + res?; + } + + State::Writing(writer, task) => { + // Drop the writer to close the pipe. This stops the `futures::io::copy` + // operation in the task, after which the task flushes the I/O handle and + // returns it back. + writer.take(); + + // Poll the task to retrieve the I/O handle. + let (res, io) = futures::ready!(Pin::new(task).poll(cx)); + // Make sure to move into the idle state before reporting errors. + self.0 = State::Idle(Some(io)); + res?; + } + + State::Task(task) => { + // Poll the task to retrieve the inner value. + let t = futures::ready!(Pin::new(task).poll(cx)); + self.0 = State::Idle(Some(Box::new(t))); + } + } + } + } +} + +impl Blocking { + /// Spawns a future that is allowed to do blocking I/O. + /// + /// If the [`Blocking`] handle is dropped, the future will be canceled if it hasn't completed + /// yet. However, note that it's not possible to forcibly cancel blocking I/O, so if the future + /// is currently running, it won't be canceled until it yields. + /// + /// # Examples + /// + /// ```no_run + /// use blocking::Blocking; + /// use std::fs; + /// + /// # futures::executor::block_on(async { + /// let contents = Blocking::spawn(async { fs::read_to_string("file.txt") }).await?; + /// # std::io::Result::Ok(()) }); + /// ``` + pub fn spawn(future: impl Future + Send + 'static) -> Blocking { + let task = Executor::spawn(future); + Blocking(State::Task(task)) + } +} + +impl Future for Blocking { + type Output = T; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // Wait for the running task to stop and ignore I/O errors if there are any. + let _ = futures::ready!(self.poll_stop(cx)); + + // Assume idle state and extract the inner value. + match &mut self.0 { + State::Idle(t) => Poll::Ready(*t.take().expect("inner value was taken out")), + State::Streaming(..) | State::Reading(..) | State::Writing(..) | State::Task(..) => { + unreachable!("when stopped, the state machine must be in idle state"); + } + } + } +} + +/// Current state of a blocking task. +enum State { + /// There is no blocking task. + /// + /// The inner value is readily available, unless it has already been extracted. The value is + /// extracted out by [`Blocking::into_inner()`], [`AsyncWrite::poll_close()`], or by awaiting + /// [`Blocking`]. + Idle(Option>), + + /// A task was spawned by [`Blocking::spawn()`] and is still running. + Task(Task), + + /// The inner value is an [`Iterator`] currently iterating in a task. + /// + /// The `dyn Any` value here is a `mpsc::Receiver<::Item>`. + Streaming(Option>, Task>), + + /// The inner value is a [`Read`] currently reading in a task. + Reading(Option, Task<(io::Result<()>, Box)>), + + /// The inner value is a [`Write`] currently writing in a task. + Writing(Option, Task<(io::Result<()>, Box)>), +} + +impl Stream for Blocking +where + T::Item: Send + 'static, +{ + type Item = T::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match &mut self.0 { + // If not in idle or active streaming state, stop the running task. + State::Task(..) + | State::Streaming(None, _) + | State::Reading(..) + | State::Writing(..) => { + // Wait for the running task to stop. + let _ = futures::ready!(self.poll_stop(cx)); + } + + // If idle, start a streaming task. + State::Idle(iter) => { + // If idle, take the iterator out to run it on a blocking task. + let mut iter = iter.take().unwrap(); + + // This channel capacity seems to work well in practice. If it's too low, there + // will be too much synchronization between tasks. If too high, memory + // consumption increases. + let (mut sender, receiver) = mpsc::channel(8 * 1024); // 8192 items + + // Spawn a blocking task that runs the iterator and returns it when done. + let task = Executor::spawn(async move { + for item in &mut iter { + if sender.send(item).await.is_err() { + break; + } + } + iter + }); + + // Move into the busy state and poll again. + self.0 = State::Streaming(Some(Box::new(receiver)), task); + } + + // If streaming, receive an item. + State::Streaming(Some(any), task) => { + let receiver = any.downcast_mut::>().unwrap(); + + // Poll the channel. + let opt = futures::ready!(Pin::new(receiver).poll_next(cx)); + + // If the channel is closed, retrieve the iterator back from the blocking task. + // This is not really a required step, but it's cleaner to drop the iterator on + // the same thread that created it. + if opt.is_none() { + // Poll the task to retrieve the iterator. + let iter = futures::ready!(Pin::new(task).poll(cx)); + self.0 = State::Idle(Some(iter)); + } + + return Poll::Ready(opt); + } + } + } + } +} + +impl AsyncRead for Blocking { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + loop { + match &mut self.0 { + // If not in idle or active reading state, stop the running task. + State::Task(..) + | State::Reading(None, _) + | State::Streaming(..) + | State::Writing(..) => { + // Wait for the running task to stop. + futures::ready!(self.poll_stop(cx))?; + } + + // If idle, start a reading task. + State::Idle(io) => { + // If idle, take the I/O handle out to read it on a blocking task. + let mut io = io.take().unwrap(); + + // This pipe capacity seems to work well in practice. If it's too low, there + // will be too much synchronization between tasks. If too high, memory + // consumption increases. + let (reader, mut writer) = pipe(8 * 1024 * 1024); // 8 MB + + // Spawn a blocking task that reads and returns the I/O handle when done. + let task = Executor::spawn(async move { + // Copy bytes from the I/O handle into the pipe until the pipe is closed or + // an error occurs. + loop { + match future::poll_fn(|cx| writer.poll_write(cx, &mut io)).await { + Ok(0) => return (Ok(()), io), + Ok(_) => {} + Err(err) => return (Err(err), io), + } + } + }); + + // Move into the busy state and poll again. + self.0 = State::Reading(Some(reader), task); + } + + // If reading, read bytes from the pipe. + State::Reading(Some(reader), task) => { + // Poll the pipe. + let n = futures::ready!(Pin::new(reader).poll_read(cx, buf))?; + + // If the pipe is closed, retrieve the I/O handle back from the blocking task. + // This is not really a required step, but it's cleaner to drop the handle on + // the same thread that created it. + if n == 0 { + // Poll the task to retrieve the I/O handle. + let (res, io) = futures::ready!(Pin::new(task).poll(cx)); + // Make sure to move into the idle state before reporting errors. + self.0 = State::Idle(Some(io)); + res?; + } + + return Poll::Ready(Ok(n)); + } + } + } + } +} + +impl AsyncWrite for Blocking { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + loop { + match &mut self.0 { + // If not in idle or active writing state, stop the running task. + State::Task(..) + | State::Writing(None, _) + | State::Streaming(..) + | State::Reading(..) => { + // Wait for the running task to stop. + futures::ready!(self.poll_stop(cx))?; + } + + // If idle, start the writing task. + State::Idle(io) => { + // If idle, take the I/O handle out to write on a blocking task. + let mut io = io.take().unwrap(); + + // This pipe capacity seems to work well in practice. If it's too low, there will + // be too much synchronization between tasks. If too high, memory consumption + // increases. + let (mut reader, writer) = pipe(8 * 1024 * 1024); // 8 MB + + // Spawn a blocking task that writes and returns the I/O handle when done. + let task = Executor::spawn(async move { + // Copy bytes from the pipe into the I/O handle until the pipe is closed or an + // error occurs. Flush the I/O handle at the end. + loop { + match future::poll_fn(|cx| reader.poll_read(cx, &mut io)).await { + Ok(0) => return (io.flush(), io), + Ok(_) => {} + Err(err) => { + let _ = io.flush(); + return (Err(err), io); + } + } + } + }); + + // Move into the busy state. + self.0 = State::Writing(Some(writer), task); + } + + // If writing,write more bytes into the pipe. + State::Writing(Some(writer), _) => return Pin::new(writer).poll_write(cx, buf), + } + } + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + loop { + match &mut self.0 { + // If not in idle state, stop the running task. + State::Task(..) + | State::Streaming(..) + | State::Writing(..) + | State::Reading(..) => { + // Wait for the running task to stop. + futures::ready!(self.poll_stop(cx))?; + } + + // Idle implies flushed. + State::Idle(_) => return Poll::Ready(Ok(())), + } + } + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // First, make sure the I/O handle is flushed. + futures::ready!(Pin::new(&mut *self).poll_flush(cx))?; + + // Then move into the idle state with no I/O handle, thus dropping it. + self.0 = State::Idle(None); + Poll::Ready(Ok(())) + } +} + +/// Creates a bounded single-producer single-consumer pipe. +/// +/// A pipe is a ring buffer of `cap` bytes that implements traits [`AsyncRead`] and [`AsyncWrite`]. +/// +/// When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts +/// to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes. +/// +/// When the receiver is dropped, the pipe is closed and no more bytes and be written into it. +/// Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes. +fn pipe(cap: usize) -> (Reader, Writer) { + assert!(cap > 0, "capacity must be positive"); + assert!(cap.checked_mul(2).is_some(), "capacity is too large"); + + // Allocate the ring buffer. + let mut v = Vec::with_capacity(cap); + let buffer = v.as_mut_ptr(); + mem::forget(v); + + let inner = Arc::new(Pipe { + head: AtomicUsize::new(0), + tail: AtomicUsize::new(0), + reader: AtomicWaker::new(), + writer: AtomicWaker::new(), + closed: AtomicBool::new(false), + buffer, + cap, + }); + + let r = Reader { + inner: inner.clone(), + head: 0, + tail: 0, + }; + + let w = Writer { + inner, + head: 0, + tail: 0, + zeroed_until: 0, + }; + + (r, w) +} + +/// The reading side of a pipe. +#[derive(Debug)] +struct Reader { + /// The inner ring buffer. + inner: Arc, + + /// The head index, moved by the reader, in the range `0..2*cap`. + /// + /// This index always matches `inner.head`. + head: usize, + + /// The tail index, moved by the writer, in the range `0..2*cap`. + /// + /// This index is a snapshot of `index.tail` that might become stale at any point. + tail: usize, +} + +/// The writing side of a pipe. +#[derive(Debug)] +struct Writer { + /// The inner ring buffer. + inner: Arc, + + /// The head index, moved by the reader, in the range `0..2*cap`. + /// + /// This index is a snapshot of `index.head` that might become stale at any point. + head: usize, + + /// The tail index, moved by the writer, in the range `0..2*cap`. + /// + /// This index always matches `inner.tail`. + tail: usize, + + /// How many bytes at the beginning of the buffer have been zeroed. + /// + /// The pipe allocates an uninitialized buffer, and we must be careful about passing + /// uninitialized data to user code. Zeroing the buffer right after allocation would be too + /// expensive, so we zero it in smaller chunks as the writer makes progress. + zeroed_until: usize, +} + +unsafe impl Send for Reader {} +unsafe impl Send for Writer {} + +/// The inner ring buffer. +/// +/// Head and tail indices are in the range `0..2*cap`, even though they really map onto the +/// `0..cap` range. The distance between head and tail indices is never more than `cap`. +/// +/// The reason why indices are not in the range `0..cap` is because we need to distinguish between +/// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail` +/// could mean the pipe is either empty or full, but we don't know which! +#[derive(Debug)] +struct Pipe { + /// The head index, moved by the reader, in the range `0..2*cap`. + head: AtomicUsize, + + /// The tail index, moved by the writer, in the range `0..2*cap`. + tail: AtomicUsize, + + /// A waker representing the blocked reader. + reader: AtomicWaker, + + /// A waker representing the blocked writer. + writer: AtomicWaker, + + /// Set to `true` if the reader or writer was dropped. + closed: AtomicBool, + + /// The byte buffer. + buffer: *mut u8, + + /// The buffer capacity. + cap: usize, +} + +impl Drop for Pipe { + fn drop(&mut self) { + // Deallocate the byte buffer. + unsafe { + Vec::from_raw_parts(self.buffer, 0, self.cap); + } + } +} + +impl Drop for Reader { + fn drop(&mut self) { + // Dropping closes the pipe and then wakes the writer. + self.inner.closed.store(true, Ordering::SeqCst); + self.inner.writer.wake(); + } +} + +impl Drop for Writer { + fn drop(&mut self) { + // Dropping closes the pipe and then wakes the reader. + self.inner.closed.store(true, Ordering::SeqCst); + self.inner.reader.wake(); + } +} + +impl Reader { + fn poll_read(&mut self, cx: &mut Context<'_>, mut dest: impl Write) -> Poll> { + let cap = self.inner.cap; + + // Calculates the distance between two indices. + let distance = |a: usize, b: usize| { + if a <= b { + b - a + } else { + 2 * cap - (a - b) + } + }; + + // If the pipe appears to be empty... + if distance(self.head, self.tail) == 0 { + // Reload the tail in case it's become stale. + self.tail = self.inner.tail.load(Ordering::Acquire); + + // If the pipe is now really empty... + if distance(self.head, self.tail) == 0 { + // Register the waker. + self.inner.reader.register(cx.waker()); + atomic::fence(Ordering::SeqCst); + + // Reload the tail after registering the waker. + self.tail = self.inner.tail.load(Ordering::Acquire); + + // If the pipe is still empty... + if distance(self.head, self.tail) == 0 { + // Check whether the pipe is closed or just empty. + if self.inner.closed.load(Ordering::Relaxed) { + return Poll::Ready(Ok(0)); + } else { + return Poll::Pending; + } + } + } + } + + // The pipe is not empty so remove the waker. + self.inner.reader.take(); + + // Given an index in `0..2*cap`, returns the real index in `0..cap`. + let real_index = |i: usize| { + if i < cap { + i + } else { + i - cap + } + }; + + // Number of bytes read so far. + let mut count = 0; + + loop { + // Calculate how many bytes to read in this iteration. + let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon! + .min(distance(self.head, self.tail)) // No more than bytes in the pipe. + .min(cap - real_index(self.head)); // Don't go past the buffer boundary. + + // Create a slice of data in the pipe buffer. + let pipe_slice = + unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) }; + + // Copy bytes from the pipe buffer into `dest`. + let n = dest + .write(pipe_slice) + .expect("shouldn't fail because `dest` is a slice"); + count += n; + + // If pipe is empty or `dest` is full, return. + if n == 0 { + return Poll::Ready(Ok(count)); + } + + // Move the head forward. + if self.head + n < 2 * cap { + self.head += n; + } else { + self.head = 0; + } + + // Store the current head index. + self.inner.head.store(self.head, Ordering::Release); + + // Wake the writer because the pipe is not full. + self.inner.writer.wake(); + } + } +} + +impl Writer { + fn poll_write(&mut self, cx: &mut Context<'_>, mut src: impl Read) -> Poll> { + // Just a quick check if the pipe is closed, which is why a relaxed load is okay. + if self.inner.closed.load(Ordering::Relaxed) { + return Poll::Ready(Ok(0)); + } + + // Calculates the distance between two indices. + let cap = self.inner.cap; + let distance = |a: usize, b: usize| { + if a <= b { + b - a + } else { + 2 * cap - (a - b) + } + }; + + // If the pipe appears to be full... + if distance(self.head, self.tail) == cap { + // Reload the head in case it's become stale. + self.head = self.inner.head.load(Ordering::Acquire); + + // If the pipe is now really empty... + if distance(self.head, self.tail) == cap { + // Register the waker. + self.inner.writer.register(cx.waker()); + atomic::fence(Ordering::SeqCst); + + // Reload the head after registering the waker. + self.head = self.inner.head.load(Ordering::Acquire); + + // If the pipe is still full... + if distance(self.head, self.tail) == cap { + // Check whether the pipe is closed or just full. + if self.inner.closed.load(Ordering::Relaxed) { + return Poll::Ready(Ok(0)); + } else { + return Poll::Pending; + } + } + } + } + + // The pipe is not full so remove the waker. + self.inner.writer.take(); + + // Given an index in `0..2*cap`, returns the real index in `0..cap`. + let real_index = |i: usize| { + if i < cap { + i + } else { + i - cap + } + }; + + // Number of bytes written so far. + let mut count = 0; + + loop { + // Calculate how many bytes to write in this iteration. + let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon! + .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting. + .min(cap - distance(self.head, self.tail)) // No more than space in the pipe. + .min(cap - real_index(self.tail)); // Don't go past the buffer boundary. + + // Create a slice of available space in the pipe buffer. + let pipe_slice_mut = unsafe { + let from = real_index(self.tail); + let to = from + n; + + // Make sure all bytes in the slice are initialized. + if self.zeroed_until < to { + self.inner + .buffer + .add(self.zeroed_until) + .write_bytes(0u8, to - self.zeroed_until); + self.zeroed_until = to; + } + + slice::from_raw_parts_mut(self.inner.buffer.add(from), n) + }; + + // Copy bytes from `src` into the piper buffer. + let n = src + .read(pipe_slice_mut) + .expect("shouldn't fail because `src` is a slice"); + count += n; + + // If the pipe is full or `src` is empty, return. + if n == 0 { + return Poll::Ready(Ok(count)); + } + + // Move the tail forward. + if self.tail + n < 2 * cap { + self.tail += n; + } else { + self.tail = 0; + } + + // Store the current tail index. + self.inner.tail.store(self.tail, Ordering::Release); + + // Wake the reader because the pipe is not empty. + self.inner.reader.wake(); + } + } +}