Compare commits

...

38 Commits

Author SHA1 Message Date
Pier Fumagalli 7719966913
docs: Fix typo in is_err()
Closes #203

Co-authored-by: John Nunley <dev@notgull.net>
2024-05-29 19:00:00 -07:00
John Nunley 2af3a5e14f
bugfix: Fix new nightly errors
- Use i32::MAX instead of std::i32::MAX
- Add "expected cfg" lints to Cargo.toml

Signed-off-by: John Nunley <dev@notgull.net>
2024-05-28 19:33:31 -07:00
John Nunley cf2d60efca
ci: Use latest stable wine in testing
This works around bugs in Rust v1.78 that introduce incompatibilities
into Wine.

https://github.com/smol-rs/polling/pull/201#issuecomment-2092385046

Signed-off-by: John Nunley <dev@notgull.net>
2024-05-03 17:55:23 -07:00
John Nunley 0b4afcaf0a
ci: Remove +nightly and use default
Previously in the "cross" CI job, checks would use "+nightly" to ensure
that the nightly compiler is used. However this adds a lot of noise. In
order to clean up CI this commit replaces "+nightly" with "rustup
default nightly" at the start of the job.

cc https://github.com/smol-rs/polling/pull/197#discussion_r1573375732

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-22 18:36:07 -07:00
John Nunley eb9d92a2e0
v3.7.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-22 16:33:52 -07:00
Nikolay Arhipov 9e46c8455c
feat: ported to Vita target
Fixes #160
2024-04-20 11:22:46 -07:00
John Nunley 1c16a1e4af
v3.6.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-03-23 20:42:01 -07:00
irvingouj @ Devolutions e25b3b4e4c
feat: Replace is_connect_failed with is_err
In linux, epoll, EPOLLHUP may happen even if no connection call is made. It
would confuse callers for what is actually happening.

Replaced is_connect_failed, and we detect if connection failed by using the
combination of is_err and is_interrupt, please see the example, tcp_client
2024-03-20 22:04:46 -07:00
John Nunley 50454d1cea
feat: Add support for HermitOS
HermitOS is a microkernel target aiming to provide a simple OS for
virtualized applications. It recently added support for the poll() and
eventfd() system calls, which means we can target it with our poll()
based backend.

Hermit does not have a traditional libc; instead it uses hermit-abi.
However rustix does not support using hermit-abi as its underlying
layer yet. So we have to build a shim layer until it does.

Closes #177
cc bytecodealliance/rustix#1012

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-12 21:33:06 -07:00
John Nunley 634a77c264 bugfix: Remove CallOnDrop from port.rs
CallOnDrop is no longer used in the Windows IOCP backend, and it wasn't
being flagged as unused until the latest nightly. In order to fix
Windows builds, this commit removes CallOnDrop.

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-12 20:46:02 -07:00
John Nunley 4d64fdc572
v3.5.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-02-17 22:02:48 -08:00
John Nunley 77b4ed1156
feat: On RedoxOS, use epoll instead of the poll backend
Technically RedoxOS supports the poll syscall, so we already support
RedoxOS. However, this is very slow. This commit ports this code to
epoll, which should be more efficient.

Closes #176
2024-02-11 08:31:13 -08:00
John Nunley ac7fbcae31
v3.4.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-02-05 19:18:30 -08:00
John Nunley 24e3691794
feat: Add a way to wait on process by PID
This is needed to make processes work in `async-process`.

Signed-off-by: John Nunley <dev@notgull.net>
2024-02-01 06:34:02 -08:00
John Nunley 62430fd56e Annotate ESP-IDF EPERM error with eventfd info
If eventfd isn't initialized, `Polling::new` will fail with an EPERM
error. You need to call the "esp_vfs_eventfd_register" function to
initialize the eventfd subsystem in ESP-IDF. This commit indicates to
the user that this needs to happen.

Signed-off-by: John Nunley <dev@notgull.net>
2024-01-27 21:17:58 -08:00
John Nunley ae484a0a12 tests: Fix clippy error in wait-signal
Signed-off-by: John Nunley <dev@notgull.net>
2024-01-27 21:17:58 -08:00
irvingouj @ Devolutions cf25dd85f8
feat: Add the ability to identify if the connection has failed 2024-01-26 12:58:39 -08:00
John Nunley 6125508c93
v3.3.2
Signed-off-by: John Nunley <dev@notgull.net>
2024-01-14 09:00:57 -08:00
John Nunley ea5a38a500
feat(windows): AFD failure now sources underlying I/O error
Previously, if AFD failed to initialize `polling` would return a custom
I/O error with a string error, containing the formatted version of the
underlying system error. However, this means that information about the
underlying system error is lost to the user.

This commit makes it so the returned `io::Error` wraps a user
inaccessible type: `AfdError`. This `AfdError`, when stringified,
returns a similar error message as what was previously returned. In
addition when `.source()` is used it returns the underlying system
error.

Closes #174

Signed-off-by: John Nunley <dev@notgull.net>
2024-01-08 16:34:13 -08:00
John Nunley 1f13664bbb
ci: Add async-io tests back to CI
Closes #145

Signed-off-by: John Nunley <dev@notgull.net>
2024-01-08 08:23:02 -08:00
Taiki Endo 0c794fce50 Ignore dead_code warning for tuple struct
This lint does not take into account destructors.

```
error: field `0` is never read
    --> src\iocp\mod.rs:1155:13
     |
1155 |     Waiting(WaitHandle),
     |     ------- ^^^^^^^^^^
     |     |
     |     field in this variant
     |
     = note: `-D dead-code` implied by `-D warnings`
     = help: to override `-D warnings` add `#[allow(dead_code)]`
help: consider changing the field to be of unit type to suppress this warning while preserving the field numbering, or remove the field
     |
1155 |     Waiting(()),
     |             ~~
```
2024-01-07 16:11:02 +09:00
Taiki Endo 94c5ebf78b ci: Temporarily disable riscv32imc-esp-espidf build 2024-01-07 16:11:02 +09:00
Taiki Endo e956a8ad64 ci: Update FreeBSD image to 13.2
```
pkg install -y git
Updating FreeBSD repository catalogue...
pkg: http://pkgmir.geo.freebsd.org/FreeBSD:12:amd64/quarterly/meta.txz: Not Found
repository FreeBSD has no meta file, using default settings
pkg: http://pkgmir.geo.freebsd.org/FreeBSD:12:amd64/quarterly/packagesite.pkg: Not Found
pkg: http://pkgmir.geo.freebsd.org/FreeBSD:12:amd64/quarterly/packagesite.txz: Not Found
Unable to update repository FreeBSD
Error updating repositories!
```
2024-01-07 16:11:02 +09:00
Taiki Endo 078c478346 ci: Use cargo-hack's --rust-version flag for msrv check
This respects rust-version field in Cargo.toml, so it removes the need
to manage MSRV in both the CI file and Cargo.toml.
2024-01-07 16:11:02 +09:00
John Nunley b57a7c32a2
v3.3.1
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-24 08:22:27 -08:00
dependabot[bot] 08a316e1fc
m: Update windows-sys requirement from 0.48 to 0.52
* Update windows-sys requirement from 0.48 to 0.52

Updates the requirements on [windows-sys](https://github.com/microsoft/windows-rs) to permit the latest version.
- [Release notes](https://github.com/microsoft/windows-rs/releases)
- [Commits](https://github.com/microsoft/windows-rs/compare/0.48.0...0.52.0)

---
updated-dependencies:
- dependency-name: windows-sys
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

* Correct windows-sys imports

Signed-off-by: John Nunley <dev@notgull.net>

---------

Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: John Nunley <dev@notgull.net>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: John Nunley <dev@notgull.net>
2023-11-24 07:53:04 -08:00
John Nunley 8087787ab2
v3.3.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-10-28 18:54:47 -07:00
John Nunley b9ab821df1
bugfix: Handle interrupts while polling
Previous, `Poller::wait` would bubble signal interruption error to the user.
However, this may be unexpected for simple use cases. Thus, this commit makes
it so, if `ErrorKind::Interrupted` is received by the underlying `wait()` call,
it clears the events and tries to wait again.

This also adds a test for this interruption written by @psychon.

Co-Authored-By: Uli Schlachter <psychon@users.noreply.github.com>
Signed-off-by: John Nunley <dev@notgull.net>
2023-10-27 07:02:08 -07:00
Uli Schlachter 0575cbd4bc
docs: Fix wrong link in docs of Poller::wait()
Once upon a time, this got a Vec as an argument, but that was replaced
with the Events struct. Thus, this should link to Events and not Vec.

Signed-off-by: Uli Schlachter <psychon@znc.in>
2023-10-20 09:49:44 -07:00
Taiki Endo 37a1d4ecd2
Remove needless imports (#159) 2023-10-08 17:23:34 +09:00
Taiki Endo a559165acd
Migrate to Rust 2021 (#158) 2023-10-08 14:46:23 +09:00
tison ceb88a46c4
chore: prefer usize::MAX to std::usize::MAX
Signed-off-by: tison <wander4096@gmail.com>
2023-10-05 20:10:18 -07:00
John Nunley d9a65fdd73
v3.2.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-10-02 07:37:32 -07:00
Al Hoang 99a32b7607
feat: Add support for Haiku OS
Haiku does not support pipe_with at all, so just fall back to pipe().
2023-10-01 20:57:41 -07:00
John Nunley 9e143a38e1
bugfix: Manage sources being inserted into kqueue
Thus far, our kqueue implementation has been a relatively thin layer on
top of the OS kqueue. However, kqueue doesn't keep track of when the
same source is inserted twice, or when a source that doesn't exist is
removed. In the interest of keeping consistent behavior between backends
this commit adds a system for tracking when sources are inserted.

Closes #151

Signed-off-by: John Nunley <dev@notgull.net>
2023-09-27 21:30:46 -07:00
John Nunley 45ebe3b904
v3.1.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-09-25 09:42:41 -07:00
David Hotham 254577da8d
feat: introduce Event::new()
This makes it easier to construct Events.
2023-09-12 06:02:02 -07:00
Taiki Endo 8c99506375 Update actions/checkout action to v4 2023-09-10 18:28:12 +09:00
21 changed files with 920 additions and 168 deletions

View File

@ -14,7 +14,7 @@ env:
freebsd_task: freebsd_task:
name: test ($TARGET) name: test ($TARGET)
freebsd_instance: freebsd_instance:
image_family: freebsd-12-4 image_family: freebsd-13-2
matrix: matrix:
- env: - env:
TARGET: x86_64-unknown-freebsd TARGET: x86_64-unknown-freebsd

View File

@ -41,7 +41,7 @@ jobs:
- os: windows-latest - os: windows-latest
rust: nightly-i686-pc-windows-gnu rust: nightly-i686-pc-windows-gnu
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Install Rust - name: Install Rust
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe. # --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }} run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
@ -62,12 +62,18 @@ jobs:
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg polling_test_epoll_pipe RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg polling_test_epoll_pipe
if: startsWith(matrix.os, 'ubuntu') if: startsWith(matrix.os, 'ubuntu')
- run: cargo hack build --feature-powerset --no-dev-deps - run: cargo hack build --feature-powerset --no-dev-deps
- name: Add rust-src # TODO: broken due to https://github.com/rust-lang/rust/pull/119026.
if: startsWith(matrix.rust, 'nightly') # - name: Check selected Tier 3 targets
run: rustup component add rust-src # if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
- name: Check selected Tier 3 targets # run: cargo check -Z build-std --target=riscv32imc-esp-espidf
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest' - name: Clone async-io
run: cargo check -Z build-std --target=riscv32imc-esp-espidf run: git clone https://github.com/smol-rs/async-io.git
# The async-io Cargo.toml already has a patch section at the bottom, so we
# can just add this.
- name: Patch polling
run: echo 'polling = { path = ".." }' >> async-io/Cargo.toml
- name: Test async-io
run: cargo test --manifest-path=async-io/Cargo.toml
cross: cross:
runs-on: ${{ matrix.os }} runs-on: ${{ matrix.os }}
@ -75,12 +81,16 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-latest, macos-latest] os: [ubuntu-latest, macos-latest]
rust: [nightly, stable]
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Install Rust - name: Install Rust
run: rustup update stable run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- name: Install cross - name: Install cross
uses: taiki-e/install-action@cross uses: taiki-e/install-action@cross
- name: Add rust-src
if: startsWith(matrix.rust, 'nightly')
run: rustup component add rust-src
# We don't test BSDs, since we already test them in Cirrus. # We don't test BSDs, since we already test them in Cirrus.
- name: Android - name: Android
if: startsWith(matrix.os, 'ubuntu') if: startsWith(matrix.os, 'ubuntu')
@ -105,17 +115,30 @@ jobs:
run: | run: |
rustup target add x86_64-unknown-illumos rustup target add x86_64-unknown-illumos
cargo build --target x86_64-unknown-illumos cargo build --target x86_64-unknown-illumos
- name: Redox
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
run: |
rustup target add x86_64-unknown-redox
cargo check --target x86_64-unknown-redox
- name: HermitOS
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
run: cargo check -Z build-std --target x86_64-unknown-hermit
- name: Check haiku
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
run: cargo check -Z build-std --target x86_64-unknown-haiku
- name: Check vita
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
run: cargo check -Z build-std --target armv7-sony-vita-newlibeabihf
wine: wine:
runs-on: ubuntu-22.04 runs-on: ubuntu-22.04
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Install Rust - name: Install Rust
run: rustup update stable run: rustup update stable
- uses: taiki-e/setup-cross-toolchain-action@v1 - uses: taiki-e/setup-cross-toolchain-action@v1
with: with:
target: x86_64-pc-windows-gnu target: x86_64-pc-windows-gnu
runner: wine@7.13
- run: cargo test --target x86_64-pc-windows-gnu - run: cargo test --target x86_64-pc-windows-gnu
msrv: msrv:
@ -124,27 +147,20 @@ jobs:
fail-fast: false fail-fast: false
matrix: matrix:
os: [ubuntu-latest, windows-latest] os: [ubuntu-latest, windows-latest]
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.63']
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Install Rust - name: Install cargo-hack
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe. uses: taiki-e/install-action@cargo-hack
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }} - run: cargo hack build --no-dev-deps --rust-version
- run: cargo build - run: cargo hack build --no-dev-deps --rust-version --target x86_64-unknown-freebsd
- name: Install Other Targets
if: startsWith(matrix.os, 'ubuntu') if: startsWith(matrix.os, 'ubuntu')
run: rustup target add x86_64-unknown-freebsd x86_64-unknown-netbsd - run: cargo hack build --no-dev-deps --rust-version --target x86_64-unknown-netbsd
- run: cargo build --target x86_64-unknown-freebsd
if: startsWith(matrix.os, 'ubuntu')
- run: cargo build --target x86_64-unknown-netbsd
if: startsWith(matrix.os, 'ubuntu') if: startsWith(matrix.os, 'ubuntu')
clippy: clippy:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Install Rust - name: Install Rust
run: rustup update stable run: rustup update stable
- run: cargo clippy --all-features --all-targets - run: cargo clippy --all-features --all-targets
@ -152,7 +168,7 @@ jobs:
fmt: fmt:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- name: Install Rust - name: Install Rust
run: rustup update stable run: rustup update stable
- run: cargo fmt --all --check - run: cargo fmt --all --check
@ -164,7 +180,7 @@ jobs:
issues: write issues: write
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
# https://github.com/rustsec/audit-check/issues/2 # https://github.com/rustsec/audit-check/issues/2
- uses: rustsec/audit-check@master - uses: rustsec/audit-check@master
with: with:

View File

@ -13,7 +13,7 @@ jobs:
if: github.repository_owner == 'smol-rs' if: github.repository_owner == 'smol-rs'
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v3 - uses: actions/checkout@v4
- uses: taiki-e/create-gh-release-action@v1 - uses: taiki-e/create-gh-release-action@v1
with: with:
changelog: CHANGELOG.md changelog: CHANGELOG.md

View File

@ -1,3 +1,50 @@
# Version 3.7.0
- Add support for the PS Vita as a platform. (#160)
# Version 3.6.0
- Add an `is_err` method to `Event` to tell when an error has occurred. (#189)
- Deprecate the `is_connect_failed` function. (#189)
- Add support for HermitOS to `polling`. (#194)
# Version 3.5.0
- Use the `epoll` backend when RedoxOS is enabled. (#190)
# Version 3.4.0
- Add the ability to identify whether socket connection has failed. (#185)
- On BSD, add the ability to wait on a process by its PID. Previously, it was
only possible to wait on a process by a `Child` object. (#180)
- On ESP-IDF, annotate `eventfd` initialization failures with a message
indicating the source of those failures. (#186)
# Version 3.3.2
- When AFD fails to initialize, the resulting error now references
the underlying system error. (#174)
# Version 3.3.1
- Bump `windows-sys` to v0.52.0. (#169)
# Version 3.3.0
- Automatically restarts polling when `ErrorKind::Interrupted` is returned, rather than relying on the user to handle it. (#164)
- Fix bad link in documentation for `Poller::wait()`. (#163)
# Version 3.2.0
- The `kqueue` backend previously allowed the following operations that other backends forbid. Now these operations result in an error: (#153)
- Inserting a source that was already inserted.
- Modifying/deleting a source that was not already inserted.
- Add support for Haiku OS. (#154)
# Version 3.1.0
- Add an `Event::new()` constructor to simplify creating `Event`s. (#149)
# Version 3.0.0 # Version 3.0.0
- Replace `libc` in all backends with the `rustix` crate (#108). - Replace `libc` in all backends with the `rustix` crate (#108).

View File

@ -3,9 +3,9 @@ name = "polling"
# When publishing a new version: # When publishing a new version:
# - Update CHANGELOG.md # - Update CHANGELOG.md
# - Create "v3.x.y" git tag # - Create "v3.x.y" git tag
version = "3.0.0" version = "3.7.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>", "John Nunley <dev@notgull.net>"] authors = ["Stjepan Glavina <stjepang@gmail.com>", "John Nunley <dev@notgull.net>"]
edition = "2018" edition = "2021"
rust-version = "1.63" rust-version = "1.63"
description = "Portable interface to epoll, kqueue, event ports, and IOCP" description = "Portable interface to epoll, kqueue, event ports, and IOCP"
license = "Apache-2.0 OR MIT" license = "Apache-2.0 OR MIT"
@ -17,20 +17,27 @@ exclude = ["/.*"]
[package.metadata.docs.rs] [package.metadata.docs.rs]
rustdoc-args = ["--cfg", "docsrs"] rustdoc-args = ["--cfg", "docsrs"]
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(polling_test_poll_backend)', 'cfg(polling_test_epoll_pipe)'] }
[dependencies] [dependencies]
cfg-if = "1" cfg-if = "1"
tracing = { version = "0.1.37", default-features = false } tracing = { version = "0.1.37", default-features = false }
[target.'cfg(any(unix, target_os = "fuchsia", target_os = "vxworks"))'.dependencies] [target.'cfg(any(unix, target_os = "fuchsia", target_os = "vxworks"))'.dependencies.rustix]
rustix = { version = "0.38.8", features = ["event", "fs", "pipe", "process", "std", "time"], default-features = false } version = "0.38.31"
features = ["event", "fs", "pipe", "process", "std", "time"]
default-features = false
[target.'cfg(windows)'.dependencies] [target.'cfg(windows)'.dependencies]
concurrent-queue = "2.2.0" concurrent-queue = "2.2.0"
pin-project-lite = "0.2.9" pin-project-lite = "0.2.9"
[target.'cfg(windows)'.dependencies.windows-sys] [target.'cfg(windows)'.dependencies.windows-sys]
version = "0.48" version = "0.52"
features = [ features = [
"Wdk_Foundation",
"Wdk_Storage_FileSystem",
"Win32_Foundation", "Win32_Foundation",
"Win32_Networking_WinSock", "Win32_Networking_WinSock",
"Win32_Security", "Win32_Security",
@ -41,6 +48,16 @@ features = [
"Win32_System_WindowsProgramming", "Win32_System_WindowsProgramming",
] ]
[target.'cfg(target_os = "hermit")'.dependencies.hermit-abi]
version = "0.3.9"
[dev-dependencies] [dev-dependencies]
easy-parallel = "3.1.0" easy-parallel = "3.1.0"
fastrand = "2.0.0" fastrand = "2.0.0"
socket2 = "0.5.5"
[target.'cfg(unix)'.dev-dependencies]
libc = "0.2"
[target.'cfg(all(unix, not(target_os="vita")))'.dev-dependencies]
signal-hook = "0.3.17"

View File

@ -12,11 +12,11 @@ https://docs.rs/polling)
Portable interface to epoll, kqueue, event ports, and IOCP. Portable interface to epoll, kqueue, event ports, and IOCP.
Supported platforms: Supported platforms:
- [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android - [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android, RedoxOS
- [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD, - [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD,
DragonFly BSD DragonFly BSD
- [event ports](https://illumos.org/man/port_create): illumos, Solaris - [event ports](https://illumos.org/man/port_create): illumos, Solaris
- [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, other Unix systems - [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, HermitOS, other Unix systems
- [IOCP](https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports): Windows, Wine (version 7.13+) - [IOCP](https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports): Windows, Wine (version 7.13+)
Polling is done in oneshot mode, which means interest in I/O events needs to be reset after Polling is done in oneshot mode, which means interest in I/O events needs to be reset after

36
examples/tcp_client.rs Normal file
View File

@ -0,0 +1,36 @@
use std::{io, net};
use polling::Event;
use socket2::Type;
fn main() -> io::Result<()> {
let socket = socket2::Socket::new(socket2::Domain::IPV4, Type::STREAM, None)?;
let poller = polling::Poller::new()?;
unsafe {
poller.add(&socket, Event::new(0, true, true))?;
}
let addr = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 8080);
socket.set_nonblocking(true)?;
let _ = socket.connect(&addr.into());
let mut events = polling::Events::new();
events.clear();
poller.wait(&mut events, None)?;
let event = events.iter().next();
let event = match event {
Some(event) => event,
None => {
println!("no event");
return Ok(());
}
};
println!("event: {:?}", event);
if event.is_err().unwrap_or(false) {
println!("connect failed");
}
Ok(())
}

View File

@ -27,22 +27,16 @@ mod example {
println!("Press Ctrl+C to exit..."); println!("Press Ctrl+C to exit...");
loop { // Wait for events.
// Wait for events. poller.wait(&mut events, None).unwrap();
poller.wait(&mut events, None).unwrap();
// Process events. // Process events.
for ev in events.iter() { let ev = events.iter().next().unwrap();
match ev.key { match ev.key {
1 => { 1 => {
println!("SIGINT received"); println!("SIGINT received");
return;
}
_ => unreachable!(),
}
} }
_ => unreachable!(),
events.clear();
} }
} }
} }

View File

@ -1,20 +1,23 @@
//! Bindings to epoll (Linux, Android). //! Bindings to epoll (Linux, Android).
use std::convert::TryInto;
use std::io; use std::io;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
use std::time::Duration; use std::time::Duration;
use rustix::event::{epoll, eventfd, EventfdFlags}; #[cfg(not(target_os = "redox"))]
use rustix::fd::OwnedFd; use rustix::event::{eventfd, EventfdFlags};
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags}; #[cfg(not(target_os = "redox"))]
use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
use rustix::pipe::{pipe, pipe_with, PipeFlags};
use rustix::time::{ use rustix::time::{
timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags, timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
Timespec, Timespec,
}; };
use rustix::event::epoll;
use rustix::fd::OwnedFd;
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
use rustix::pipe::{pipe, pipe_with, PipeFlags};
use crate::{Event, PollMode}; use crate::{Event, PollMode};
/// Interface to epoll. /// Interface to epoll.
@ -27,6 +30,9 @@ pub struct Poller {
notifier: Notifier, notifier: Notifier,
/// File descriptor for the timerfd that produces timeouts. /// File descriptor for the timerfd that produces timeouts.
///
/// Redox does not support timerfd.
#[cfg(not(target_os = "redox"))]
timer_fd: Option<OwnedFd>, timer_fd: Option<OwnedFd>,
} }
@ -40,6 +46,7 @@ impl Poller {
// Set up notifier and timerfd. // Set up notifier and timerfd.
let notifier = Notifier::new()?; let notifier = Notifier::new()?;
#[cfg(not(target_os = "redox"))]
let timer_fd = timerfd_create( let timer_fd = timerfd_create(
TimerfdClockId::Monotonic, TimerfdClockId::Monotonic,
TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK, TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
@ -49,10 +56,12 @@ impl Poller {
let poller = Poller { let poller = Poller {
epoll_fd, epoll_fd,
notifier, notifier,
#[cfg(not(target_os = "redox"))]
timer_fd, timer_fd,
}; };
unsafe { unsafe {
#[cfg(not(target_os = "redox"))]
if let Some(ref timer_fd) = poller.timer_fd { if let Some(ref timer_fd) = poller.timer_fd {
poller.add( poller.add(
timer_fd.as_raw_fd(), timer_fd.as_raw_fd(),
@ -71,7 +80,6 @@ impl Poller {
tracing::trace!( tracing::trace!(
epoll_fd = ?poller.epoll_fd.as_raw_fd(), epoll_fd = ?poller.epoll_fd.as_raw_fd(),
notifier = ?poller.notifier, notifier = ?poller.notifier,
timer_fd = ?poller.timer_fd,
"new", "new",
); );
Ok(poller) Ok(poller)
@ -156,6 +164,7 @@ impl Poller {
); );
let _enter = span.enter(); let _enter = span.enter();
#[cfg(not(target_os = "redox"))]
if let Some(ref timer_fd) = self.timer_fd { if let Some(ref timer_fd) = self.timer_fd {
// Configure the timeout using timerfd. // Configure the timeout using timerfd.
let new_val = Itimerspec { let new_val = Itimerspec {
@ -182,12 +191,17 @@ impl Poller {
)?; )?;
} }
#[cfg(not(target_os = "redox"))]
let timer_fd = &self.timer_fd;
#[cfg(target_os = "redox")]
let timer_fd: Option<core::convert::Infallible> = None;
// Timeout in milliseconds for epoll. // Timeout in milliseconds for epoll.
let timeout_ms = match (&self.timer_fd, timeout) { let timeout_ms = match (timer_fd, timeout) {
(_, Some(t)) if t == Duration::from_secs(0) => 0, (_, Some(t)) if t == Duration::from_secs(0) => 0,
(None, Some(t)) => { (None, Some(t)) => {
// Round up to a whole millisecond. // Round up to a whole millisecond.
let mut ms = t.as_millis().try_into().unwrap_or(std::i32::MAX); let mut ms = t.as_millis().try_into().unwrap_or(i32::MAX);
if Duration::from_millis(ms as u64) < t { if Duration::from_millis(ms as u64) < t {
ms = ms.saturating_add(1); ms = ms.saturating_add(1);
} }
@ -246,10 +260,10 @@ impl Drop for Poller {
"drop", "drop",
epoll_fd = ?self.epoll_fd.as_raw_fd(), epoll_fd = ?self.epoll_fd.as_raw_fd(),
notifier = ?self.notifier, notifier = ?self.notifier,
timer_fd = ?self.timer_fd
); );
let _enter = span.enter(); let _enter = span.enter();
#[cfg(not(target_os = "redox"))]
if let Some(timer_fd) = self.timer_fd.take() { if let Some(timer_fd) = self.timer_fd.take() {
let _ = self.delete(timer_fd.as_fd()); let _ = self.delete(timer_fd.as_fd());
} }
@ -258,6 +272,7 @@ impl Drop for Poller {
} }
/// `timespec` value that equals zero. /// `timespec` value that equals zero.
#[cfg(not(target_os = "redox"))]
const TS_ZERO: Timespec = unsafe { std::mem::transmute([0u8; std::mem::size_of::<Timespec>()]) }; const TS_ZERO: Timespec = unsafe { std::mem::transmute([0u8; std::mem::size_of::<Timespec>()]) };
/// Get the EPOLL flags for the interest. /// Get the EPOLL flags for the interest.
@ -366,6 +381,19 @@ impl EventExtra {
pub fn is_pri(&self) -> bool { pub fn is_pri(&self) -> bool {
self.flags.contains(epoll::EventFlags::PRI) self.flags.contains(epoll::EventFlags::PRI)
} }
#[inline]
pub fn is_connect_failed(&self) -> Option<bool> {
Some(
self.flags.contains(epoll::EventFlags::ERR)
&& self.flags.contains(epoll::EventFlags::HUP),
)
}
#[inline]
pub fn is_err(&self) -> Option<bool> {
Some(self.flags.contains(epoll::EventFlags::ERR))
}
} }
/// The notifier for Linux. /// The notifier for Linux.
@ -378,6 +406,7 @@ impl EventExtra {
#[derive(Debug)] #[derive(Debug)]
enum Notifier { enum Notifier {
/// The primary notifier, using eventfd. /// The primary notifier, using eventfd.
#[cfg(not(target_os = "redox"))]
EventFd(OwnedFd), EventFd(OwnedFd),
/// The fallback notifier, using a pipe. /// The fallback notifier, using a pipe.
@ -394,19 +423,22 @@ impl Notifier {
/// Create a new notifier. /// Create a new notifier.
fn new() -> io::Result<Self> { fn new() -> io::Result<Self> {
// Skip eventfd for testing if necessary. // Skip eventfd for testing if necessary.
if !cfg!(polling_test_epoll_pipe) { #[cfg(not(target_os = "redox"))]
// Try to create an eventfd. {
match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) { if !cfg!(polling_test_epoll_pipe) {
Ok(fd) => { // Try to create an eventfd.
tracing::trace!("created eventfd for notifier"); match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) {
return Ok(Notifier::EventFd(fd)); Ok(fd) => {
} tracing::trace!("created eventfd for notifier");
return Ok(Notifier::EventFd(fd));
}
Err(err) => { Err(err) => {
tracing::warn!( tracing::warn!(
"eventfd() failed with error ({}), falling back to pipe", "eventfd() failed with error ({}), falling back to pipe",
err err
); );
}
} }
} }
} }
@ -428,6 +460,7 @@ impl Notifier {
/// The file descriptor to register in the poller. /// The file descriptor to register in the poller.
fn as_fd(&self) -> BorrowedFd<'_> { fn as_fd(&self) -> BorrowedFd<'_> {
match self { match self {
#[cfg(not(target_os = "redox"))]
Notifier::EventFd(fd) => fd.as_fd(), Notifier::EventFd(fd) => fd.as_fd(),
Notifier::Pipe { Notifier::Pipe {
read_pipe: read, .. read_pipe: read, ..
@ -438,6 +471,7 @@ impl Notifier {
/// Notify the poller. /// Notify the poller.
fn notify(&self) { fn notify(&self) {
match self { match self {
#[cfg(not(target_os = "redox"))]
Self::EventFd(fd) => { Self::EventFd(fd) => {
let buf: [u8; 8] = 1u64.to_ne_bytes(); let buf: [u8; 8] = 1u64.to_ne_bytes();
let _ = write(fd, &buf); let _ = write(fd, &buf);
@ -452,6 +486,7 @@ impl Notifier {
/// Clear the notification. /// Clear the notification.
fn clear(&self) { fn clear(&self) {
match self { match self {
#[cfg(not(target_os = "redox"))]
Self::EventFd(fd) => { Self::EventFd(fd) => {
let mut buf = [0u8; 8]; let mut buf = [0u8; 8];
let _ = read(fd, &mut buf); let _ = read(fd, &mut buf);

View File

@ -14,6 +14,8 @@ use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once; use std::sync::Once;
use windows_sys::Wdk::Foundation::OBJECT_ATTRIBUTES;
use windows_sys::Wdk::Storage::FileSystem::FILE_OPEN;
use windows_sys::Win32::Foundation::{ use windows_sys::Win32::Foundation::{
CloseHandle, HANDLE, HMODULE, NTSTATUS, STATUS_NOT_FOUND, STATUS_PENDING, STATUS_SUCCESS, CloseHandle, HANDLE, HMODULE, NTSTATUS, STATUS_NOT_FOUND, STATUS_PENDING, STATUS_SUCCESS,
UNICODE_STRING, UNICODE_STRING,
@ -21,11 +23,9 @@ use windows_sys::Win32::Foundation::{
use windows_sys::Win32::Networking::WinSock::{ use windows_sys::Win32::Networking::WinSock::{
WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE_POLL, SOCKET_ERROR, WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE_POLL, SOCKET_ERROR,
}; };
use windows_sys::Win32::Storage::FileSystem::{ use windows_sys::Win32::Storage::FileSystem::{FILE_SHARE_READ, FILE_SHARE_WRITE, SYNCHRONIZE};
FILE_OPEN, FILE_SHARE_READ, FILE_SHARE_WRITE, SYNCHRONIZE,
};
use windows_sys::Win32::System::LibraryLoader::{GetModuleHandleW, GetProcAddress}; use windows_sys::Win32::System::LibraryLoader::{GetModuleHandleW, GetProcAddress};
use windows_sys::Win32::System::WindowsProgramming::{IO_STATUS_BLOCK, OBJECT_ATTRIBUTES}; use windows_sys::Win32::System::IO::IO_STATUS_BLOCK;
#[derive(Default)] #[derive(Default)]
#[repr(C)] #[repr(C)]

View File

@ -46,7 +46,6 @@ use pin_project_lite::pin_project;
use std::cell::UnsafeCell; use std::cell::UnsafeCell;
use std::collections::hash_map::{Entry, HashMap}; use std::collections::hash_map::{Entry, HashMap};
use std::convert::TryFrom;
use std::ffi::c_void; use std::ffi::c_void;
use std::fmt; use std::fmt;
use std::io; use std::io;
@ -122,17 +121,19 @@ impl Poller {
pub(super) fn new() -> io::Result<Self> { pub(super) fn new() -> io::Result<Self> {
// Make sure AFD is able to be used. // Make sure AFD is able to be used.
if let Err(e) = afd::NtdllImports::force_load() { if let Err(e) = afd::NtdllImports::force_load() {
return Err(crate::unsupported_error(format!( return Err(io::Error::new(
"Failed to initialize unstable Windows functions: {}\nThis usually only happens for old Windows or Wine.", io::ErrorKind::Unsupported,
e AfdError::new("failed to initialize unstable Windows functions", e),
))); ));
} }
// Create and destroy a single AFD to test if we support it. // Create and destroy a single AFD to test if we support it.
Afd::<Packet>::new().map_err(|e| crate::unsupported_error(format!( Afd::<Packet>::new().map_err(|e| {
"Failed to initialize \\Device\\Afd: {}\nThis usually only happens for old Windows or Wine.", io::Error::new(
e, io::ErrorKind::Unsupported,
)))?; AfdError::new("failed to initialize \\Device\\Afd", e),
)
})?;
let port = IoCompletionPort::new(0)?; let port = IoCompletionPort::new(0)?;
tracing::trace!(handle = ?port, "new"); tracing::trace!(handle = ?port, "new");
@ -680,6 +681,18 @@ impl EventExtra {
pub fn set_pri(&mut self, active: bool) { pub fn set_pri(&mut self, active: bool) {
self.flags.set(AfdPollMask::RECEIVE_EXPEDITED, active); self.flags.set(AfdPollMask::RECEIVE_EXPEDITED, active);
} }
/// Check if TCP connect failed. Deprecated.
#[inline]
pub fn is_connect_failed(&self) -> Option<bool> {
Some(self.flags.intersects(AfdPollMask::CONNECT_FAIL))
}
/// Check if TCP connect failed.
#[inline]
pub fn is_err(&self) -> Option<bool> {
Some(self.flags.intersects(AfdPollMask::CONNECT_FAIL))
}
} }
/// A packet used to wake up the poller with an event. /// A packet used to wake up the poller with an event.
@ -1153,7 +1166,7 @@ enum WaitableStatus {
Idle, Idle,
/// We are waiting on this handle to become signaled. /// We are waiting on this handle to become signaled.
Waiting(WaitHandle), Waiting(#[allow(dead_code)] WaitHandle),
/// This handle has been cancelled. /// This handle has been cancelled.
Cancelled, Cancelled,
@ -1327,6 +1340,54 @@ fn dur2timeout(dur: Duration) -> u32 {
.unwrap_or(INFINITE) .unwrap_or(INFINITE)
} }
/// An error type that wraps around failing to open AFD.
struct AfdError {
/// String description of what happened.
description: &'static str,
/// The underlying system error.
system: io::Error,
}
impl AfdError {
#[inline]
fn new(description: &'static str, system: io::Error) -> Self {
Self {
description,
system,
}
}
}
impl fmt::Debug for AfdError {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AfdError")
.field("description", &self.description)
.field("system", &self.system)
.field("note", &"probably caused by old Windows or Wine")
.finish()
}
}
impl fmt::Display for AfdError {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}: {}\nThis error is usually caused by running on old Windows or Wine",
self.description, &self.system
)
}
}
impl std::error::Error for AfdError {
#[inline]
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.system)
}
}
struct CallOnDrop<F: FnMut()>(F); struct CallOnDrop<F: FnMut()>(F);
impl<F: FnMut()> Drop for CallOnDrop<F> { impl<F: FnMut()> Drop for CallOnDrop<F> {

View File

@ -2,7 +2,6 @@
use super::dur2timeout; use super::dur2timeout;
use std::convert::TryInto;
use std::fmt; use std::fmt;
use std::io; use std::io;
use std::marker::PhantomData; use std::marker::PhantomData;
@ -297,11 +296,3 @@ impl<T: CompletionHandle> Drop for OverlappedEntry<T> {
drop(unsafe { self.packet() }); drop(unsafe { self.packet() });
} }
} }
struct CallOnDrop<F: FnMut()>(F);
impl<F: FnMut()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}

View File

@ -1,7 +1,9 @@
//! Bindings to kqueue (macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD). //! Bindings to kqueue (macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
use std::collections::HashSet;
use std::io; use std::io;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}; use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
use std::sync::RwLock;
use std::time::Duration; use std::time::Duration;
use rustix::event::kqueue; use rustix::event::kqueue;
@ -15,6 +17,11 @@ pub struct Poller {
/// File descriptor for the kqueue instance. /// File descriptor for the kqueue instance.
kqueue_fd: OwnedFd, kqueue_fd: OwnedFd,
/// List of sources currently registered in this poller.
///
/// This is used to make sure the same source is not registered twice.
sources: RwLock<HashSet<SourceId>>,
/// Notification pipe for waking up the poller. /// Notification pipe for waking up the poller.
/// ///
/// On platforms that support `EVFILT_USER`, this uses that to wake up the poller. Otherwise, it /// On platforms that support `EVFILT_USER`, this uses that to wake up the poller. Otherwise, it
@ -22,6 +29,23 @@ pub struct Poller {
notify: notify::Notify, notify: notify::Notify,
} }
/// Identifier for a source.
#[doc(hidden)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum SourceId {
/// Registered file descriptor.
Fd(RawFd),
/// Signal.
Signal(std::os::raw::c_int),
/// Process ID.
Pid(rustix::process::Pid),
/// Timer ID.
Timer(usize),
}
impl Poller { impl Poller {
/// Creates a new poller. /// Creates a new poller.
pub fn new() -> io::Result<Poller> { pub fn new() -> io::Result<Poller> {
@ -31,6 +55,7 @@ impl Poller {
let poller = Poller { let poller = Poller {
kqueue_fd, kqueue_fd,
sources: RwLock::new(HashSet::new()),
notify: notify::Notify::new()?, notify: notify::Notify::new()?,
}; };
@ -60,6 +85,8 @@ impl Poller {
/// ///
/// The file descriptor must be valid and it must last until it is deleted. /// The file descriptor must be valid and it must last until it is deleted.
pub unsafe fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> { pub unsafe fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
self.add_source(SourceId::Fd(fd))?;
// File descriptors don't need to be added explicitly, so just modify the interest. // File descriptors don't need to be added explicitly, so just modify the interest.
self.modify(BorrowedFd::borrow_raw(fd), ev, mode) self.modify(BorrowedFd::borrow_raw(fd), ev, mode)
} }
@ -79,6 +106,8 @@ impl Poller {
}; };
let _enter = span.as_ref().map(|s| s.enter()); let _enter = span.as_ref().map(|s| s.enter());
self.has_source(SourceId::Fd(fd.as_raw_fd()))?;
let mode_flags = mode_to_flags(mode); let mode_flags = mode_to_flags(mode);
let read_flags = if ev.readable { let read_flags = if ev.readable {
@ -143,10 +172,57 @@ impl Poller {
Ok(()) Ok(())
} }
/// Add a source to the sources set.
#[inline]
pub(crate) fn add_source(&self, source: SourceId) -> io::Result<()> {
if self
.sources
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(source)
{
Ok(())
} else {
Err(io::Error::from(io::ErrorKind::AlreadyExists))
}
}
/// Tell if a source is currently inside the set.
#[inline]
pub(crate) fn has_source(&self, source: SourceId) -> io::Result<()> {
if self
.sources
.read()
.unwrap_or_else(|e| e.into_inner())
.contains(&source)
{
Ok(())
} else {
Err(io::Error::from(io::ErrorKind::NotFound))
}
}
/// Remove a source from the sources set.
#[inline]
pub(crate) fn remove_source(&self, source: SourceId) -> io::Result<()> {
if self
.sources
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(&source)
{
Ok(())
} else {
Err(io::Error::from(io::ErrorKind::NotFound))
}
}
/// Deletes a file descriptor. /// Deletes a file descriptor.
pub fn delete(&self, fd: BorrowedFd<'_>) -> io::Result<()> { pub fn delete(&self, fd: BorrowedFd<'_>) -> io::Result<()> {
// Simply delete interest in the file descriptor. // Simply delete interest in the file descriptor.
self.modify(fd, Event::none(0), PollMode::Oneshot) self.modify(fd, Event::none(0), PollMode::Oneshot)?;
self.remove_source(SourceId::Fd(fd.as_raw_fd()))
} }
/// Waits for I/O events with an optional timeout. /// Waits for I/O events with an optional timeout.
@ -295,6 +371,16 @@ impl EventExtra {
pub fn is_pri(&self) -> bool { pub fn is_pri(&self) -> bool {
false false
} }
#[inline]
pub fn is_connect_failed(&self) -> Option<bool> {
None
}
#[inline]
pub fn is_err(&self) -> Option<bool> {
None
}
} }
pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags { pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags {

View File

@ -1,11 +1,11 @@
//! Portable interface to epoll, kqueue, event ports, and IOCP. //! Portable interface to epoll, kqueue, event ports, and IOCP.
//! //!
//! Supported platforms: //! Supported platforms:
//! - [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android //! - [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android, RedoxOS
//! - [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD, //! - [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD,
//! DragonFly BSD //! DragonFly BSD
//! - [event ports](https://illumos.org/man/port_create): illumos, Solaris //! - [event ports](https://illumos.org/man/port_create): illumos, Solaris
//! - [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, other Unix systems //! - [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, HermitOS, other Unix systems
//! - [IOCP](https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports): Windows, Wine (version 7.13+) //! - [IOCP](https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports): Windows, Wine (version 7.13+)
//! //!
//! By default, polling is done in oneshot mode, which means interest in I/O events needs to //! By default, polling is done in oneshot mode, which means interest in I/O events needs to
@ -70,8 +70,7 @@ use std::marker::PhantomData;
use std::num::NonZeroUsize; use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex; use std::sync::Mutex;
use std::time::Duration; use std::time::{Duration, Instant};
use std::usize;
use cfg_if::cfg_if; use cfg_if::cfg_if;
@ -81,7 +80,11 @@ cfg_if! {
if #[cfg(polling_test_poll_backend)] { if #[cfg(polling_test_poll_backend)] {
mod poll; mod poll;
use poll as sys; use poll as sys;
} else if #[cfg(any(target_os = "linux", target_os = "android"))] { } else if #[cfg(any(
target_os = "linux",
target_os = "android",
target_os = "redox"
))] {
mod epoll; mod epoll;
use epoll as sys; use epoll as sys;
} else if #[cfg(any( } else if #[cfg(any(
@ -104,6 +107,7 @@ cfg_if! {
use kqueue as sys; use kqueue as sys;
} else if #[cfg(any( } else if #[cfg(any(
target_os = "vxworks", target_os = "vxworks",
target_os = "hermit",
target_os = "fuchsia", target_os = "fuchsia",
target_os = "horizon", target_os = "horizon",
unix, unix,
@ -121,7 +125,7 @@ cfg_if! {
pub mod os; pub mod os;
/// Key associated with notifications. /// Key associated with notifications.
const NOTIFY_KEY: usize = std::usize::MAX; const NOTIFY_KEY: usize = usize::MAX;
/// Indicates that a file descriptor or socket can read or write without blocking. /// Indicates that a file descriptor or socket can read or write without blocking.
#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -181,52 +185,46 @@ pub enum PollMode {
} }
impl Event { impl Event {
/// All kinds of events (readable and writable). /// Create a new event.
/// pub const fn new(key: usize, readable: bool, writable: bool) -> Event {
/// Equivalent to: `Event { key, readable: true, writable: true }`
pub const fn all(key: usize) -> Event {
Event { Event {
key, key,
readable: true, readable,
writable: true, writable,
extra: sys::EventExtra::empty(), extra: sys::EventExtra::empty(),
} }
} }
/// All kinds of events (readable and writable).
///
/// Equivalent to: `Event::new(key, true, true)`
#[inline]
pub const fn all(key: usize) -> Event {
Event::new(key, true, true)
}
/// Only the readable event. /// Only the readable event.
/// ///
/// Equivalent to: `Event { key, readable: true, writable: false }` /// Equivalent to: `Event::new(key, true, false)`
#[inline]
pub const fn readable(key: usize) -> Event { pub const fn readable(key: usize) -> Event {
Event { Event::new(key, true, false)
key,
readable: true,
writable: false,
extra: sys::EventExtra::empty(),
}
} }
/// Only the writable event. /// Only the writable event.
/// ///
/// Equivalent to: `Event { key, readable: false, writable: true }` /// Equivalent to: `Event::new(key, false, true)`
#[inline]
pub const fn writable(key: usize) -> Event { pub const fn writable(key: usize) -> Event {
Event { Event::new(key, false, true)
key,
readable: false,
writable: true,
extra: sys::EventExtra::empty(),
}
} }
/// No events. /// No events.
/// ///
/// Equivalent to: `Event { key, readable: false, writable: false }` /// Equivalent to: `Event::new(key, false, false)`
#[inline]
pub const fn none(key: usize) -> Event { pub const fn none(key: usize) -> Event {
Event { Event::new(key, false, false)
key,
readable: false,
writable: false,
extra: sys::EventExtra::empty(),
}
} }
/// Add interruption events to this interest. /// Add interruption events to this interest.
@ -339,6 +337,86 @@ impl Event {
self.extra.is_pri() self.extra.is_pri()
} }
/// Tells if this event is the result of a connection failure.
///
/// This function checks if a TCP connection has failed. It corresponds to the `EPOLLERR` or `EPOLLHUP` event in Linux
/// and `CONNECT_FAILED` event in Windows IOCP.
///
/// # Examples
///
/// ```
/// use std::{io, net};
/// // Assuming polling and socket2 are included as dependencies in Cargo.toml
/// use polling::Event;
/// use socket2::Type;
///
/// fn main() -> io::Result<()> {
/// let socket = socket2::Socket::new(socket2::Domain::IPV4, Type::STREAM, None)?;
/// let poller = polling::Poller::new()?;
/// unsafe {
/// poller.add(&socket, Event::new(0, true, true))?;
/// }
/// let addr = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 8080);
/// socket.set_nonblocking(true)?;
/// let _ = socket.connect(&addr.into());
///
/// let mut events = polling::Events::new();
///
/// events.clear();
/// poller.wait(&mut events, None)?;
///
/// let event = events.iter().next();
///
/// let event = match event {
/// Some(event) => event,
/// None => {
/// println!("no event");
/// return Ok(());
/// },
/// };
///
/// println!("event: {:?}", event);
/// if event
/// .is_connect_failed()
/// .unwrap_or_default()
/// {
/// println!("connect failed");
/// }
///
/// Ok(())
/// }
/// ```
///
/// # Returns
///
/// Returns `Some(true)` if the connection has failed, `Some(false)` if the connection has not failed,
/// or `None` if the platform does not support detecting this condition.
#[inline]
#[deprecated(
since = "3.4.0",
note = "use `is_err` in combination of is_hup instead, see documentation for `is_err`"
)]
pub fn is_connect_failed(&self) -> Option<bool> {
self.extra.is_connect_failed()
}
/// Tells if this event is the result of a connection failure.
///
/// This function checks if an error exist, particularly useful in detecting if TCP connection failed. It corresponds to the `EPOLLERR` event in Linux
/// and `CONNECT_FAILED` event in Windows IOCP.
///
/// ## Caveats
///
/// In `epoll`, a TCP connection failure is indicated by `EPOLLERR` + `EPOLLHUP`, though just `EPOLLERR` is enough to indicate a connection failure.
/// EPOLLHUP may happen when we haven't event called `connect` on the socket, but it is still a valid event to check for.
///
/// Returns `Some(true)` if the connection has failed, `Some(false)` if there is no error,
/// or `None` if the platform does not support detecting this condition.
#[inline]
pub fn is_err(&self) -> Option<bool> {
self.extra.is_err()
}
/// Remove any extra information from this event. /// Remove any extra information from this event.
#[inline] #[inline]
pub fn clear_extra(&mut self) { pub fn clear_extra(&mut self) {
@ -616,8 +694,8 @@ impl Poller {
/// Waits for at least one I/O event and returns the number of new events. /// Waits for at least one I/O event and returns the number of new events.
/// ///
/// New events will be appended to `events`. If necessary, make sure to clear the [`Vec`] /// New events will be appended to `events`. If necessary, make sure to clear the
/// before calling [`wait()`][`Poller::wait()`]! /// [`Events`][Events::clear()] before calling [`wait()`][`Poller::wait()`]!
/// ///
/// This method will return with no new events if a notification is delivered by the /// This method will return with no new events if a notification is delivered by the
/// [`notify()`] method, or the timeout is reached. Sometimes it may even return with no events /// [`notify()`] method, or the timeout is reached. Sometimes it may even return with no events
@ -658,14 +736,30 @@ impl Poller {
let _enter = span.enter(); let _enter = span.enter();
if let Ok(_lock) = self.lock.try_lock() { if let Ok(_lock) = self.lock.try_lock() {
// Wait for I/O events. let deadline = timeout.and_then(|timeout| Instant::now().checked_add(timeout));
self.poller.wait(&mut events.events, timeout)?;
// Clear the notification, if any. loop {
self.notified.swap(false, Ordering::SeqCst); // Figure out how long to wait for.
let timeout =
deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
// Indicate number of events. // Wait for I/O events.
Ok(events.len()) if let Err(e) = self.poller.wait(&mut events.events, timeout) {
// If the wait was interrupted by a signal, clear events and try again.
if e.kind() == io::ErrorKind::Interrupted {
events.clear();
continue;
} else {
return Err(e);
}
}
// Clear the notification, if any.
self.notified.swap(false, Ordering::SeqCst);
// Indicate number of events.
return Ok(events.len());
}
} else { } else {
tracing::trace!("wait: skipping because another thread is already waiting on I/O"); tracing::trace!("wait: skipping because another thread is already waiting on I/O");
Ok(0) Ok(0)
@ -945,8 +1039,11 @@ impl fmt::Debug for Poller {
} }
cfg_if! { cfg_if! {
if #[cfg(unix)] { if #[cfg(any(unix, target_os = "hermit"))] {
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, RawFd, AsFd, BorrowedFd}; use std::os::unix::io::{AsRawFd, RawFd, AsFd, BorrowedFd};
#[cfg(target_os = "hermit")]
use std::os::hermit::io::{AsRawFd, RawFd, AsFd, BorrowedFd};
/// A resource with a raw file descriptor. /// A resource with a raw file descriptor.
pub trait AsRawSource { pub trait AsRawSource {

View File

@ -20,6 +20,7 @@ pub mod iocp;
mod __private { mod __private {
#[doc(hidden)] #[doc(hidden)]
#[allow(dead_code)]
pub trait PollerSealed {} pub trait PollerSealed {}
impl PollerSealed for crate::Poller {} impl PollerSealed for crate::Poller {}

View File

@ -1,4 +1,4 @@
//! Functionality that is only availale for IOCP-based platforms. //! Functionality that is only available for IOCP-based platforms.
pub use crate::sys::CompletionPacket; pub use crate::sys::CompletionPacket;

View File

@ -1,9 +1,10 @@
//! Functionality that is only available for `kqueue`-based platforms. //! Functionality that is only available for `kqueue`-based platforms.
use crate::sys::mode_to_flags; use crate::sys::{mode_to_flags, SourceId};
use crate::{PollMode, Poller}; use crate::{PollMode, Poller};
use std::io; use std::io;
use std::marker::PhantomData;
use std::process::Child; use std::process::Child;
use std::time::Duration; use std::time::Duration;
@ -98,10 +99,13 @@ impl<F: Filter> PollerKqueueExt<F> for Poller {
#[inline(always)] #[inline(always)]
fn add_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> { fn add_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> {
// No difference between adding and modifying in kqueue. // No difference between adding and modifying in kqueue.
self.poller.add_source(filter.source_id())?;
self.modify_filter(filter, key, mode) self.modify_filter(filter, key, mode)
} }
fn modify_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> { fn modify_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> {
self.poller.has_source(filter.source_id())?;
// Convert the filter into a kevent. // Convert the filter into a kevent.
let event = filter.filter(kqueue::EventFlags::ADD | mode_to_flags(mode), key); let event = filter.filter(kqueue::EventFlags::ADD | mode_to_flags(mode), key);
@ -114,7 +118,9 @@ impl<F: Filter> PollerKqueueExt<F> for Poller {
let event = filter.filter(kqueue::EventFlags::DELETE, 0); let event = filter.filter(kqueue::EventFlags::DELETE, 0);
// Delete the filter. // Delete the filter.
self.poller.submit_changes([event]) self.poller.submit_changes([event])?;
self.poller.remove_source(filter.source_id())
} }
} }
@ -126,6 +132,11 @@ unsafe impl<T: FilterSealed + ?Sized> FilterSealed for &T {
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event { fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event {
(**self).filter(flags, key) (**self).filter(flags, key)
} }
#[inline(always)]
fn source_id(&self) -> SourceId {
(**self).source_id()
}
} }
impl<T: Filter + ?Sized> Filter for &T {} impl<T: Filter + ?Sized> Filter for &T {}
@ -149,6 +160,11 @@ unsafe impl FilterSealed for Signal {
key as _, key as _,
) )
} }
#[inline(always)]
fn source_id(&self) -> SourceId {
SourceId::Signal(self.0)
}
} }
impl Filter for Signal {} impl Filter for Signal {}
@ -156,11 +172,14 @@ impl Filter for Signal {}
/// Monitor a child process. /// Monitor a child process.
#[derive(Debug)] #[derive(Debug)]
pub struct Process<'a> { pub struct Process<'a> {
/// The child process to monitor. /// The process ID to monitor.
child: &'a Child, pid: rustix::process::Pid,
/// The operation to monitor. /// The operation to monitor.
ops: ProcessOps, ops: ProcessOps,
/// Lifetime of the underlying process.
_lt: PhantomData<&'a Child>,
} }
/// The operations that a monitored process can perform. /// The operations that a monitored process can perform.
@ -185,7 +204,24 @@ impl<'a> Process<'a> {
/// Once registered into the `Poller`, the `Child` object must outlive this filter's /// Once registered into the `Poller`, the `Child` object must outlive this filter's
/// registration into the poller. /// registration into the poller.
pub unsafe fn new(child: &'a Child, ops: ProcessOps) -> Self { pub unsafe fn new(child: &'a Child, ops: ProcessOps) -> Self {
Self { child, ops } Self {
pid: rustix::process::Pid::from_child(child),
ops,
_lt: PhantomData,
}
}
/// Create a `Process` from a PID.
///
/// # Safety
///
/// The PID must be tied to an actual child process.
pub unsafe fn from_pid(pid: std::num::NonZeroI32, ops: ProcessOps) -> Self {
Self {
pid: unsafe { rustix::process::Pid::from_raw_unchecked(pid.get()) },
ops,
_lt: PhantomData,
}
} }
} }
@ -200,13 +236,20 @@ unsafe impl FilterSealed for Process<'_> {
kqueue::Event::new( kqueue::Event::new(
kqueue::EventFilter::Proc { kqueue::EventFilter::Proc {
pid: rustix::process::Pid::from_child(self.child), // SAFETY: We know that the PID is nonzero.
pid: self.pid,
flags: events, flags: events,
}, },
flags | kqueue::EventFlags::RECEIPT, flags | kqueue::EventFlags::RECEIPT,
key as _, key as _,
) )
} }
#[inline(always)]
fn source_id(&self) -> SourceId {
// SAFETY: We know that the PID is nonzero
SourceId::Pid(self.pid)
}
} }
impl Filter for Process<'_> {} impl Filter for Process<'_> {}
@ -234,11 +277,17 @@ unsafe impl FilterSealed for Timer {
key as _, key as _,
) )
} }
#[inline(always)]
fn source_id(&self) -> SourceId {
SourceId::Timer(self.id)
}
} }
impl Filter for Timer {} impl Filter for Timer {}
mod __private { mod __private {
use crate::sys::SourceId;
use rustix::event::kqueue; use rustix::event::kqueue;
#[doc(hidden)] #[doc(hidden)]
@ -247,5 +296,8 @@ mod __private {
/// ///
/// This filter's flags must have `EV_RECEIPT`. /// This filter's flags must have `EV_RECEIPT`.
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event; fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event;
/// Get the source ID for this source.
fn source_id(&self) -> SourceId;
} }
} }

View File

@ -1,14 +1,17 @@
//! Bindings to poll (VxWorks, Fuchsia, other Unix systems). //! Bindings to poll (VxWorks, Fuchsia, other Unix systems).
use std::collections::HashMap; use std::collections::HashMap;
use std::convert::TryInto;
use std::io; use std::io;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Condvar, Mutex}; use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use rustix::event::{poll, PollFd, PollFlags}; #[cfg(not(target_os = "hermit"))]
use rustix::fd::{AsFd, AsRawFd, BorrowedFd}; use rustix::fd::{AsFd, AsRawFd, BorrowedFd};
#[cfg(target_os = "hermit")]
use std::os::hermit::io::{AsFd, AsRawFd, BorrowedFd};
use syscall::{poll, PollFd, PollFlags};
// std::os::unix doesn't exist on Fuchsia // std::os::unix doesn't exist on Fuchsia
type RawFd = std::os::raw::c_int; type RawFd = std::os::raw::c_int;
@ -427,6 +430,16 @@ impl EventExtra {
pub fn is_pri(&self) -> bool { pub fn is_pri(&self) -> bool {
self.flags.contains(PollFlags::PRI) self.flags.contains(PollFlags::PRI)
} }
#[inline]
pub fn is_connect_failed(&self) -> Option<bool> {
Some(self.flags.contains(PollFlags::ERR) || self.flags.contains(PollFlags::HUP))
}
#[inline]
pub fn is_err(&self) -> Option<bool> {
Some(self.flags.contains(PollFlags::ERR))
}
} }
fn cvt_mode_as_remove(mode: PollMode) -> io::Result<bool> { fn cvt_mode_as_remove(mode: PollMode) -> io::Result<bool> {
@ -439,7 +452,182 @@ fn cvt_mode_as_remove(mode: PollMode) -> io::Result<bool> {
} }
} }
#[cfg(not(target_os = "espidf"))] #[cfg(unix)]
mod syscall {
pub(super) use rustix::event::{poll, PollFd, PollFlags};
#[cfg(target_os = "espidf")]
pub(super) use rustix::event::{eventfd, EventfdFlags};
#[cfg(target_os = "espidf")]
pub(super) use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
#[cfg(target_os = "espidf")]
pub(super) use rustix::io::{read, write};
}
#[cfg(target_os = "hermit")]
mod syscall {
// TODO: Remove this shim once HermitOS is supported in Rustix.
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::ops::BitOr;
pub(super) use std::os::hermit::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd, RawFd};
/// Create an eventfd.
pub(super) fn eventfd(count: u64, _flags: EventfdFlags) -> io::Result<OwnedFd> {
let fd = unsafe { hermit_abi::eventfd(count, 0) };
if fd < 0 {
Err(io::Error::from_raw_os_error(unsafe {
hermit_abi::get_errno()
}))
} else {
Ok(unsafe { OwnedFd::from_raw_fd(fd) })
}
}
/// Read some bytes.
pub(super) fn read(fd: BorrowedFd<'_>, bytes: &mut [u8]) -> io::Result<usize> {
let count = unsafe { hermit_abi::read(fd.as_raw_fd(), bytes.as_mut_ptr(), bytes.len()) };
cvt(count)
}
/// Write some bytes.
pub(super) fn write(fd: BorrowedFd<'_>, bytes: &[u8]) -> io::Result<usize> {
let count = unsafe { hermit_abi::write(fd.as_raw_fd(), bytes.as_ptr(), bytes.len()) };
cvt(count)
}
/// Safe wrapper around the `poll` system call.
pub(super) fn poll(fds: &mut [PollFd<'_>], timeout: i32) -> io::Result<usize> {
let call = unsafe {
hermit_abi::poll(
fds.as_mut_ptr() as *mut hermit_abi::pollfd,
fds.len(),
timeout,
)
};
cvt(call as isize)
}
/// Safe wrapper around `pollfd`.
#[repr(transparent)]
pub(super) struct PollFd<'a> {
inner: hermit_abi::pollfd,
_lt: PhantomData<BorrowedFd<'a>>,
}
impl<'a> PollFd<'a> {
pub(super) fn from_borrowed_fd(fd: BorrowedFd<'a>, inflags: PollFlags) -> Self {
Self {
inner: hermit_abi::pollfd {
fd: fd.as_raw_fd(),
events: inflags.0,
revents: 0,
},
_lt: PhantomData,
}
}
pub(super) fn revents(&self) -> PollFlags {
PollFlags(self.inner.revents)
}
}
impl AsFd for PollFd<'_> {
fn as_fd(&self) -> BorrowedFd<'_> {
unsafe { BorrowedFd::borrow_raw(self.inner.fd) }
}
}
impl fmt::Debug for PollFd<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollFd")
.field("fd", &format_args!("0x{:x}", self.inner.fd))
.field("events", &PollFlags(self.inner.events))
.field("revents", &PollFlags(self.inner.revents))
.finish()
}
}
/// Wrapper around polling flags.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(super) struct PollFlags(i16);
impl PollFlags {
/// Empty set of flags.
pub(super) const fn empty() -> Self {
Self(0)
}
pub(super) const IN: PollFlags = PollFlags(hermit_abi::POLLIN);
pub(super) const OUT: PollFlags = PollFlags(hermit_abi::POLLOUT);
pub(super) const WRBAND: PollFlags = PollFlags(hermit_abi::POLLWRBAND);
pub(super) const ERR: PollFlags = PollFlags(hermit_abi::POLLERR);
pub(super) const HUP: PollFlags = PollFlags(hermit_abi::POLLHUP);
pub(super) const PRI: PollFlags = PollFlags(hermit_abi::POLLPRI);
/// Tell if this contains some flags.
pub(super) fn contains(self, flags: PollFlags) -> bool {
self.0 & flags.0 != 0
}
/// Set a flag.
pub(super) fn set(&mut self, flags: PollFlags, set: bool) {
if set {
self.0 |= flags.0;
} else {
self.0 &= !(flags.0);
}
}
/// Tell if this is empty.
pub(super) fn is_empty(self) -> bool {
self.0 == 0
}
/// Tell if this intersects with some flags.
pub(super) fn intersects(self, flags: PollFlags) -> bool {
self.contains(flags)
}
}
impl BitOr for PollFlags {
type Output = PollFlags;
fn bitor(self, rhs: Self) -> Self::Output {
Self(self.0 | rhs.0)
}
}
#[derive(Debug, Copy, Clone)]
pub(super) struct EventfdFlags;
impl EventfdFlags {
pub(super) fn empty() -> Self {
Self
}
}
/// Convert a number to an actual result.
#[inline]
fn cvt(len: isize) -> io::Result<usize> {
if len < 0 {
Err(io::Error::from_raw_os_error(unsafe {
hermit_abi::get_errno()
}))
} else {
Ok(len as usize)
}
}
}
#[cfg(not(any(target_os = "espidf", target_os = "hermit")))]
mod notify { mod notify {
use std::io; use std::io;
@ -447,7 +635,9 @@ mod notify {
use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd}; use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags}; use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags}; use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
use rustix::pipe::{pipe, pipe_with, PipeFlags}; #[cfg(not(target_os = "haiku"))]
use rustix::pipe::pipe_with;
use rustix::pipe::{pipe, PipeFlags};
/// A notification pipe. /// A notification pipe.
/// ///
@ -468,12 +658,18 @@ mod notify {
impl Notify { impl Notify {
/// Creates a new notification pipe. /// Creates a new notification pipe.
pub(super) fn new() -> io::Result<Self> { pub(super) fn new() -> io::Result<Self> {
let (read_pipe, write_pipe) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| { let fallback_pipe = |_| {
let (read_pipe, write_pipe) = pipe()?; let (read_pipe, write_pipe) = pipe()?;
fcntl_setfd(&read_pipe, fcntl_getfd(&read_pipe)? | FdFlags::CLOEXEC)?; fcntl_setfd(&read_pipe, fcntl_getfd(&read_pipe)? | FdFlags::CLOEXEC)?;
fcntl_setfd(&write_pipe, fcntl_getfd(&write_pipe)? | FdFlags::CLOEXEC)?; fcntl_setfd(&write_pipe, fcntl_getfd(&write_pipe)? | FdFlags::CLOEXEC)?;
io::Result::Ok((read_pipe, write_pipe)) io::Result::Ok((read_pipe, write_pipe))
})?; };
#[cfg(not(target_os = "haiku"))]
let (read_pipe, write_pipe) = pipe_with(PipeFlags::CLOEXEC).or_else(fallback_pipe)?;
#[cfg(target_os = "haiku")]
let (read_pipe, write_pipe) = fallback_pipe(PipeFlags::CLOEXEC)?;
// Put the reading side into non-blocking mode. // Put the reading side into non-blocking mode.
fcntl_setfl(&read_pipe, fcntl_getfl(&read_pipe)? | OFlags::NONBLOCK)?; fcntl_setfl(&read_pipe, fcntl_getfl(&read_pipe)? | OFlags::NONBLOCK)?;
@ -489,7 +685,7 @@ mod notify {
self.read_pipe.as_fd() self.read_pipe.as_fd()
} }
/// Provides the poll flags to be used when registering the read half of the botify pipe with the `Poller`. /// Provides the poll flags to be used when registering the read half of the notify pipe with the `Poller`.
pub(super) fn poll_flags(&self) -> PollFlags { pub(super) fn poll_flags(&self) -> PollFlags {
PollFlags::RDNORM PollFlags::RDNORM
} }
@ -503,7 +699,25 @@ mod notify {
/// Pops a notification (if any) from the pipe. /// Pops a notification (if any) from the pipe.
pub(super) fn pop_notification(&self) -> Result<(), io::Error> { pub(super) fn pop_notification(&self) -> Result<(), io::Error> {
read(&self.read_pipe, &mut [0; 1])?; // Pipes on Vita do not guarantee that after `write` call succeeds, the
// data becomes immediately available for reading on the other side of the pipe.
// To ensure that the notification is not lost, the read side of the pipe is temporarily
// switched to blocking for a single `read` call.
#[cfg(target_os = "vita")]
rustix::fs::fcntl_setfl(
&self.read_pipe,
rustix::fs::fcntl_getfl(&self.read_pipe)? & !rustix::fs::OFlags::NONBLOCK,
)?;
let result = read(&self.read_pipe, &mut [0; 1]);
#[cfg(target_os = "vita")]
rustix::fs::fcntl_setfl(
&self.read_pipe,
rustix::fs::fcntl_getfl(&self.read_pipe)? | rustix::fs::OFlags::NONBLOCK,
)?;
result?;
Ok(()) Ok(())
} }
@ -522,20 +736,18 @@ mod notify {
} }
} }
#[cfg(target_os = "espidf")] #[cfg(any(target_os = "espidf", target_os = "hermit"))]
mod notify { mod notify {
use std::io; use std::io;
use std::mem; use std::mem;
use rustix::event::PollFlags; use super::syscall::{
use rustix::event::{eventfd, EventfdFlags}; eventfd, read, write, AsFd, AsRawFd, BorrowedFd, EventfdFlags, OwnedFd, PollFlags, RawFd,
};
use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
use rustix::io::{read, write};
/// A notification pipe. /// A notification pipe.
/// ///
/// This implementation uses ther `eventfd` syscall to send notifications. /// This implementation uses the `eventfd` syscall to send notifications.
#[derive(Debug)] #[derive(Debug)]
pub(super) struct Notify { pub(super) struct Notify {
/// The file descriptor of the eventfd object. This is also stored as the first /// The file descriptor of the eventfd object. This is also stored as the first
@ -559,13 +771,20 @@ mod notify {
// (1) is not a problem for us, as we want the eventfd() file descriptor to be in a non-blocking mode anyway // (1) is not a problem for us, as we want the eventfd() file descriptor to be in a non-blocking mode anyway
// (2) is also not a problem, as long as we don't try to read the counter value in an endless loop when we detect being notified // (2) is also not a problem, as long as we don't try to read the counter value in an endless loop when we detect being notified
#[cfg(not(target_os = "espidf"))]
let flags = EventfdFlags::NONBLOCK;
#[cfg(target_os = "espidf")]
let flags = EventfdFlags::empty(); let flags = EventfdFlags::empty();
let event_fd = eventfd(0, flags).map_err(|err| {
let event_fd = eventfd(0, flags)?; match io::Error::from(err) {
err if err.kind() == io::ErrorKind::PermissionDenied => {
// EPERM can happen if the eventfd isn't initialized yet.
// Tell the user to call esp_vfs_eventfd_register.
io::Error::new(
io::ErrorKind::PermissionDenied,
"failed to initialize eventfd for polling, try calling `esp_vfs_eventfd_register`"
)
},
err => err,
}
})?;
Ok(Self { event_fd }) Ok(Self { event_fd })
} }
@ -582,14 +801,14 @@ mod notify {
/// Notifies the `Poller` instance via the eventfd file descriptor. /// Notifies the `Poller` instance via the eventfd file descriptor.
pub(super) fn notify(&self) -> Result<(), io::Error> { pub(super) fn notify(&self) -> Result<(), io::Error> {
write(&self.event_fd, &1u64.to_ne_bytes())?; write(self.event_fd.as_fd(), &1u64.to_ne_bytes())?;
Ok(()) Ok(())
} }
/// Pops a notification (if any) from the eventfd file descriptor. /// Pops a notification (if any) from the eventfd file descriptor.
pub(super) fn pop_notification(&self) -> Result<(), io::Error> { pub(super) fn pop_notification(&self) -> Result<(), io::Error> {
read(&self.event_fd, &mut [0; mem::size_of::<u64>()])?; read(self.event_fd.as_fd(), &mut [0; mem::size_of::<u64>()])?;
Ok(()) Ok(())
} }

View File

@ -250,4 +250,14 @@ impl EventExtra {
pub fn is_pri(&self) -> bool { pub fn is_pri(&self) -> bool {
self.flags.contains(PollFlags::PRI) self.flags.contains(PollFlags::PRI)
} }
#[inline]
pub fn is_connect_failed(&self) -> Option<bool> {
Some(self.flags.contains(PollFlags::ERR) && self.flags.contains(PollFlags::HUP))
}
#[inline]
pub fn is_err(&self) -> Option<bool> {
Some(self.flags.contains(PollFlags::ERR))
}
} }

View File

@ -76,6 +76,53 @@ fn concurrent_modify() -> io::Result<()> {
Ok(()) Ok(())
} }
#[cfg(all(unix, not(target_os = "vita")))]
#[test]
fn concurrent_interruption() -> io::Result<()> {
struct MakeItSend<T>(T);
unsafe impl<T> Send for MakeItSend<T> {}
let (reader, _writer) = tcp_pair()?;
let poller = Poller::new()?;
unsafe {
poller.add(&reader, Event::none(0))?;
}
let mut events = Events::new();
let events_borrow = &mut events;
let (sender, receiver) = std::sync::mpsc::channel();
Parallel::new()
.add(move || {
// Register a signal handler so that the syscall is actually interrupted. A signal that
// is ignored by default does not cause an interrupted syscall.
signal_hook::flag::register(signal_hook::consts::signal::SIGURG, Default::default())?;
// Signal to the other thread how to send a signal to us
sender
.send(MakeItSend(unsafe { libc::pthread_self() }))
.unwrap();
poller.wait(events_borrow, Some(Duration::from_secs(1)))?;
Ok(())
})
.add(move || {
let MakeItSend(target_thread) = receiver.recv().unwrap();
thread::sleep(Duration::from_millis(100));
assert_eq!(0, unsafe {
libc::pthread_kill(target_thread, libc::SIGURG)
});
Ok(())
})
.run()
.into_iter()
.collect::<io::Result<()>>()?;
assert_eq!(events.len(), 0);
Ok(())
}
fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> { fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> {
let listener = TcpListener::bind("127.0.0.1:0")?; let listener = TcpListener::bind("127.0.0.1:0")?;
let a = TcpStream::connect(listener.local_addr()?)?; let a = TcpStream::connect(listener.local_addr()?)?;

View File

@ -1,6 +1,7 @@
use polling::{Event, Events, Poller}; use polling::{Event, Events, Poller};
use std::io::{self, Write}; use std::io::{self, Write};
use std::net::{TcpListener, TcpStream}; use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
#[test] #[test]
@ -38,6 +39,48 @@ fn basic_io() {
poller.delete(&read).unwrap(); poller.delete(&read).unwrap();
} }
#[test]
fn insert_twice() {
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
let (read, mut write) = tcp_pair().unwrap();
let read = Arc::new(read);
let poller = Poller::new().unwrap();
unsafe {
#[cfg(unix)]
let read = read.as_raw_fd();
#[cfg(windows)]
let read = read.as_raw_socket();
poller.add(read, Event::readable(1)).unwrap();
assert_eq!(
poller.add(read, Event::readable(1)).unwrap_err().kind(),
io::ErrorKind::AlreadyExists
);
}
write.write_all(&[1]).unwrap();
let mut events = Events::new();
assert_eq!(
poller
.wait(&mut events, Some(Duration::from_secs(1)))
.unwrap(),
1
);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(1)
);
poller.delete(&read).unwrap();
}
fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> { fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> {
let listener = TcpListener::bind("127.0.0.1:0")?; let listener = TcpListener::bind("127.0.0.1:0")?;
let a = TcpStream::connect(listener.local_addr()?)?; let a = TcpStream::connect(listener.local_addr()?)?;