mirror of https://github.com/smol-rs/polling
Compare commits
38 Commits
Author | SHA1 | Date |
---|---|---|
Pier Fumagalli | 7719966913 | |
John Nunley | 2af3a5e14f | |
John Nunley | cf2d60efca | |
John Nunley | 0b4afcaf0a | |
John Nunley | eb9d92a2e0 | |
Nikolay Arhipov | 9e46c8455c | |
John Nunley | 1c16a1e4af | |
irvingouj @ Devolutions | e25b3b4e4c | |
John Nunley | 50454d1cea | |
John Nunley | 634a77c264 | |
John Nunley | 4d64fdc572 | |
John Nunley | 77b4ed1156 | |
John Nunley | ac7fbcae31 | |
John Nunley | 24e3691794 | |
John Nunley | 62430fd56e | |
John Nunley | ae484a0a12 | |
irvingouj @ Devolutions | cf25dd85f8 | |
John Nunley | 6125508c93 | |
John Nunley | ea5a38a500 | |
John Nunley | 1f13664bbb | |
Taiki Endo | 0c794fce50 | |
Taiki Endo | 94c5ebf78b | |
Taiki Endo | e956a8ad64 | |
Taiki Endo | 078c478346 | |
John Nunley | b57a7c32a2 | |
dependabot[bot] | 08a316e1fc | |
John Nunley | 8087787ab2 | |
John Nunley | b9ab821df1 | |
Uli Schlachter | 0575cbd4bc | |
Taiki Endo | 37a1d4ecd2 | |
Taiki Endo | a559165acd | |
tison | ceb88a46c4 | |
John Nunley | d9a65fdd73 | |
Al Hoang | 99a32b7607 | |
John Nunley | 9e143a38e1 | |
John Nunley | 45ebe3b904 | |
David Hotham | 254577da8d | |
Taiki Endo | 8c99506375 |
|
@ -14,7 +14,7 @@ env:
|
||||||
freebsd_task:
|
freebsd_task:
|
||||||
name: test ($TARGET)
|
name: test ($TARGET)
|
||||||
freebsd_instance:
|
freebsd_instance:
|
||||||
image_family: freebsd-12-4
|
image_family: freebsd-13-2
|
||||||
matrix:
|
matrix:
|
||||||
- env:
|
- env:
|
||||||
TARGET: x86_64-unknown-freebsd
|
TARGET: x86_64-unknown-freebsd
|
||||||
|
|
|
@ -41,7 +41,7 @@ jobs:
|
||||||
- os: windows-latest
|
- os: windows-latest
|
||||||
rust: nightly-i686-pc-windows-gnu
|
rust: nightly-i686-pc-windows-gnu
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v4
|
||||||
- name: Install Rust
|
- name: Install Rust
|
||||||
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
|
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
|
||||||
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
|
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
|
||||||
|
@ -62,12 +62,18 @@ jobs:
|
||||||
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg polling_test_epoll_pipe
|
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg polling_test_epoll_pipe
|
||||||
if: startsWith(matrix.os, 'ubuntu')
|
if: startsWith(matrix.os, 'ubuntu')
|
||||||
- run: cargo hack build --feature-powerset --no-dev-deps
|
- run: cargo hack build --feature-powerset --no-dev-deps
|
||||||
- name: Add rust-src
|
# TODO: broken due to https://github.com/rust-lang/rust/pull/119026.
|
||||||
if: startsWith(matrix.rust, 'nightly')
|
# - name: Check selected Tier 3 targets
|
||||||
run: rustup component add rust-src
|
# if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
|
||||||
- name: Check selected Tier 3 targets
|
# run: cargo check -Z build-std --target=riscv32imc-esp-espidf
|
||||||
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
|
- name: Clone async-io
|
||||||
run: cargo check -Z build-std --target=riscv32imc-esp-espidf
|
run: git clone https://github.com/smol-rs/async-io.git
|
||||||
|
# The async-io Cargo.toml already has a patch section at the bottom, so we
|
||||||
|
# can just add this.
|
||||||
|
- name: Patch polling
|
||||||
|
run: echo 'polling = { path = ".." }' >> async-io/Cargo.toml
|
||||||
|
- name: Test async-io
|
||||||
|
run: cargo test --manifest-path=async-io/Cargo.toml
|
||||||
|
|
||||||
cross:
|
cross:
|
||||||
runs-on: ${{ matrix.os }}
|
runs-on: ${{ matrix.os }}
|
||||||
|
@ -75,12 +81,16 @@ jobs:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
os: [ubuntu-latest, macos-latest]
|
os: [ubuntu-latest, macos-latest]
|
||||||
|
rust: [nightly, stable]
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v4
|
||||||
- name: Install Rust
|
- name: Install Rust
|
||||||
run: rustup update stable
|
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
|
||||||
- name: Install cross
|
- name: Install cross
|
||||||
uses: taiki-e/install-action@cross
|
uses: taiki-e/install-action@cross
|
||||||
|
- name: Add rust-src
|
||||||
|
if: startsWith(matrix.rust, 'nightly')
|
||||||
|
run: rustup component add rust-src
|
||||||
# We don't test BSDs, since we already test them in Cirrus.
|
# We don't test BSDs, since we already test them in Cirrus.
|
||||||
- name: Android
|
- name: Android
|
||||||
if: startsWith(matrix.os, 'ubuntu')
|
if: startsWith(matrix.os, 'ubuntu')
|
||||||
|
@ -105,17 +115,30 @@ jobs:
|
||||||
run: |
|
run: |
|
||||||
rustup target add x86_64-unknown-illumos
|
rustup target add x86_64-unknown-illumos
|
||||||
cargo build --target x86_64-unknown-illumos
|
cargo build --target x86_64-unknown-illumos
|
||||||
|
- name: Redox
|
||||||
|
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
|
||||||
|
run: |
|
||||||
|
rustup target add x86_64-unknown-redox
|
||||||
|
cargo check --target x86_64-unknown-redox
|
||||||
|
- name: HermitOS
|
||||||
|
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
|
||||||
|
run: cargo check -Z build-std --target x86_64-unknown-hermit
|
||||||
|
- name: Check haiku
|
||||||
|
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
|
||||||
|
run: cargo check -Z build-std --target x86_64-unknown-haiku
|
||||||
|
- name: Check vita
|
||||||
|
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
|
||||||
|
run: cargo check -Z build-std --target armv7-sony-vita-newlibeabihf
|
||||||
|
|
||||||
wine:
|
wine:
|
||||||
runs-on: ubuntu-22.04
|
runs-on: ubuntu-22.04
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v4
|
||||||
- name: Install Rust
|
- name: Install Rust
|
||||||
run: rustup update stable
|
run: rustup update stable
|
||||||
- uses: taiki-e/setup-cross-toolchain-action@v1
|
- uses: taiki-e/setup-cross-toolchain-action@v1
|
||||||
with:
|
with:
|
||||||
target: x86_64-pc-windows-gnu
|
target: x86_64-pc-windows-gnu
|
||||||
runner: wine@7.13
|
|
||||||
- run: cargo test --target x86_64-pc-windows-gnu
|
- run: cargo test --target x86_64-pc-windows-gnu
|
||||||
|
|
||||||
msrv:
|
msrv:
|
||||||
|
@ -124,27 +147,20 @@ jobs:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
os: [ubuntu-latest, windows-latest]
|
os: [ubuntu-latest, windows-latest]
|
||||||
# When updating this, the reminder to update the minimum supported
|
|
||||||
# Rust version in Cargo.toml.
|
|
||||||
rust: ['1.63']
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v4
|
||||||
- name: Install Rust
|
- name: Install cargo-hack
|
||||||
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
|
uses: taiki-e/install-action@cargo-hack
|
||||||
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
|
- run: cargo hack build --no-dev-deps --rust-version
|
||||||
- run: cargo build
|
- run: cargo hack build --no-dev-deps --rust-version --target x86_64-unknown-freebsd
|
||||||
- name: Install Other Targets
|
|
||||||
if: startsWith(matrix.os, 'ubuntu')
|
if: startsWith(matrix.os, 'ubuntu')
|
||||||
run: rustup target add x86_64-unknown-freebsd x86_64-unknown-netbsd
|
- run: cargo hack build --no-dev-deps --rust-version --target x86_64-unknown-netbsd
|
||||||
- run: cargo build --target x86_64-unknown-freebsd
|
|
||||||
if: startsWith(matrix.os, 'ubuntu')
|
|
||||||
- run: cargo build --target x86_64-unknown-netbsd
|
|
||||||
if: startsWith(matrix.os, 'ubuntu')
|
if: startsWith(matrix.os, 'ubuntu')
|
||||||
|
|
||||||
clippy:
|
clippy:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v4
|
||||||
- name: Install Rust
|
- name: Install Rust
|
||||||
run: rustup update stable
|
run: rustup update stable
|
||||||
- run: cargo clippy --all-features --all-targets
|
- run: cargo clippy --all-features --all-targets
|
||||||
|
@ -152,7 +168,7 @@ jobs:
|
||||||
fmt:
|
fmt:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v4
|
||||||
- name: Install Rust
|
- name: Install Rust
|
||||||
run: rustup update stable
|
run: rustup update stable
|
||||||
- run: cargo fmt --all --check
|
- run: cargo fmt --all --check
|
||||||
|
@ -164,7 +180,7 @@ jobs:
|
||||||
issues: write
|
issues: write
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v4
|
||||||
# https://github.com/rustsec/audit-check/issues/2
|
# https://github.com/rustsec/audit-check/issues/2
|
||||||
- uses: rustsec/audit-check@master
|
- uses: rustsec/audit-check@master
|
||||||
with:
|
with:
|
||||||
|
|
|
@ -13,7 +13,7 @@ jobs:
|
||||||
if: github.repository_owner == 'smol-rs'
|
if: github.repository_owner == 'smol-rs'
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3
|
- uses: actions/checkout@v4
|
||||||
- uses: taiki-e/create-gh-release-action@v1
|
- uses: taiki-e/create-gh-release-action@v1
|
||||||
with:
|
with:
|
||||||
changelog: CHANGELOG.md
|
changelog: CHANGELOG.md
|
||||||
|
|
47
CHANGELOG.md
47
CHANGELOG.md
|
@ -1,3 +1,50 @@
|
||||||
|
# Version 3.7.0
|
||||||
|
|
||||||
|
- Add support for the PS Vita as a platform. (#160)
|
||||||
|
|
||||||
|
# Version 3.6.0
|
||||||
|
|
||||||
|
- Add an `is_err` method to `Event` to tell when an error has occurred. (#189)
|
||||||
|
- Deprecate the `is_connect_failed` function. (#189)
|
||||||
|
- Add support for HermitOS to `polling`. (#194)
|
||||||
|
|
||||||
|
# Version 3.5.0
|
||||||
|
|
||||||
|
- Use the `epoll` backend when RedoxOS is enabled. (#190)
|
||||||
|
|
||||||
|
# Version 3.4.0
|
||||||
|
|
||||||
|
- Add the ability to identify whether socket connection has failed. (#185)
|
||||||
|
- On BSD, add the ability to wait on a process by its PID. Previously, it was
|
||||||
|
only possible to wait on a process by a `Child` object. (#180)
|
||||||
|
- On ESP-IDF, annotate `eventfd` initialization failures with a message
|
||||||
|
indicating the source of those failures. (#186)
|
||||||
|
|
||||||
|
# Version 3.3.2
|
||||||
|
|
||||||
|
- When AFD fails to initialize, the resulting error now references
|
||||||
|
the underlying system error. (#174)
|
||||||
|
|
||||||
|
# Version 3.3.1
|
||||||
|
|
||||||
|
- Bump `windows-sys` to v0.52.0. (#169)
|
||||||
|
|
||||||
|
# Version 3.3.0
|
||||||
|
|
||||||
|
- Automatically restarts polling when `ErrorKind::Interrupted` is returned, rather than relying on the user to handle it. (#164)
|
||||||
|
- Fix bad link in documentation for `Poller::wait()`. (#163)
|
||||||
|
|
||||||
|
# Version 3.2.0
|
||||||
|
|
||||||
|
- The `kqueue` backend previously allowed the following operations that other backends forbid. Now these operations result in an error: (#153)
|
||||||
|
- Inserting a source that was already inserted.
|
||||||
|
- Modifying/deleting a source that was not already inserted.
|
||||||
|
- Add support for Haiku OS. (#154)
|
||||||
|
|
||||||
|
# Version 3.1.0
|
||||||
|
|
||||||
|
- Add an `Event::new()` constructor to simplify creating `Event`s. (#149)
|
||||||
|
|
||||||
# Version 3.0.0
|
# Version 3.0.0
|
||||||
|
|
||||||
- Replace `libc` in all backends with the `rustix` crate (#108).
|
- Replace `libc` in all backends with the `rustix` crate (#108).
|
||||||
|
|
27
Cargo.toml
27
Cargo.toml
|
@ -3,9 +3,9 @@ name = "polling"
|
||||||
# When publishing a new version:
|
# When publishing a new version:
|
||||||
# - Update CHANGELOG.md
|
# - Update CHANGELOG.md
|
||||||
# - Create "v3.x.y" git tag
|
# - Create "v3.x.y" git tag
|
||||||
version = "3.0.0"
|
version = "3.7.0"
|
||||||
authors = ["Stjepan Glavina <stjepang@gmail.com>", "John Nunley <dev@notgull.net>"]
|
authors = ["Stjepan Glavina <stjepang@gmail.com>", "John Nunley <dev@notgull.net>"]
|
||||||
edition = "2018"
|
edition = "2021"
|
||||||
rust-version = "1.63"
|
rust-version = "1.63"
|
||||||
description = "Portable interface to epoll, kqueue, event ports, and IOCP"
|
description = "Portable interface to epoll, kqueue, event ports, and IOCP"
|
||||||
license = "Apache-2.0 OR MIT"
|
license = "Apache-2.0 OR MIT"
|
||||||
|
@ -17,20 +17,27 @@ exclude = ["/.*"]
|
||||||
[package.metadata.docs.rs]
|
[package.metadata.docs.rs]
|
||||||
rustdoc-args = ["--cfg", "docsrs"]
|
rustdoc-args = ["--cfg", "docsrs"]
|
||||||
|
|
||||||
|
[lints.rust]
|
||||||
|
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(polling_test_poll_backend)', 'cfg(polling_test_epoll_pipe)'] }
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
cfg-if = "1"
|
cfg-if = "1"
|
||||||
tracing = { version = "0.1.37", default-features = false }
|
tracing = { version = "0.1.37", default-features = false }
|
||||||
|
|
||||||
[target.'cfg(any(unix, target_os = "fuchsia", target_os = "vxworks"))'.dependencies]
|
[target.'cfg(any(unix, target_os = "fuchsia", target_os = "vxworks"))'.dependencies.rustix]
|
||||||
rustix = { version = "0.38.8", features = ["event", "fs", "pipe", "process", "std", "time"], default-features = false }
|
version = "0.38.31"
|
||||||
|
features = ["event", "fs", "pipe", "process", "std", "time"]
|
||||||
|
default-features = false
|
||||||
|
|
||||||
[target.'cfg(windows)'.dependencies]
|
[target.'cfg(windows)'.dependencies]
|
||||||
concurrent-queue = "2.2.0"
|
concurrent-queue = "2.2.0"
|
||||||
pin-project-lite = "0.2.9"
|
pin-project-lite = "0.2.9"
|
||||||
|
|
||||||
[target.'cfg(windows)'.dependencies.windows-sys]
|
[target.'cfg(windows)'.dependencies.windows-sys]
|
||||||
version = "0.48"
|
version = "0.52"
|
||||||
features = [
|
features = [
|
||||||
|
"Wdk_Foundation",
|
||||||
|
"Wdk_Storage_FileSystem",
|
||||||
"Win32_Foundation",
|
"Win32_Foundation",
|
||||||
"Win32_Networking_WinSock",
|
"Win32_Networking_WinSock",
|
||||||
"Win32_Security",
|
"Win32_Security",
|
||||||
|
@ -41,6 +48,16 @@ features = [
|
||||||
"Win32_System_WindowsProgramming",
|
"Win32_System_WindowsProgramming",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[target.'cfg(target_os = "hermit")'.dependencies.hermit-abi]
|
||||||
|
version = "0.3.9"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
easy-parallel = "3.1.0"
|
easy-parallel = "3.1.0"
|
||||||
fastrand = "2.0.0"
|
fastrand = "2.0.0"
|
||||||
|
socket2 = "0.5.5"
|
||||||
|
|
||||||
|
[target.'cfg(unix)'.dev-dependencies]
|
||||||
|
libc = "0.2"
|
||||||
|
|
||||||
|
[target.'cfg(all(unix, not(target_os="vita")))'.dev-dependencies]
|
||||||
|
signal-hook = "0.3.17"
|
||||||
|
|
|
@ -12,11 +12,11 @@ https://docs.rs/polling)
|
||||||
Portable interface to epoll, kqueue, event ports, and IOCP.
|
Portable interface to epoll, kqueue, event ports, and IOCP.
|
||||||
|
|
||||||
Supported platforms:
|
Supported platforms:
|
||||||
- [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android
|
- [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android, RedoxOS
|
||||||
- [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD,
|
- [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD,
|
||||||
DragonFly BSD
|
DragonFly BSD
|
||||||
- [event ports](https://illumos.org/man/port_create): illumos, Solaris
|
- [event ports](https://illumos.org/man/port_create): illumos, Solaris
|
||||||
- [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, other Unix systems
|
- [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, HermitOS, other Unix systems
|
||||||
- [IOCP](https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports): Windows, Wine (version 7.13+)
|
- [IOCP](https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports): Windows, Wine (version 7.13+)
|
||||||
|
|
||||||
Polling is done in oneshot mode, which means interest in I/O events needs to be reset after
|
Polling is done in oneshot mode, which means interest in I/O events needs to be reset after
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
use std::{io, net};
|
||||||
|
|
||||||
|
use polling::Event;
|
||||||
|
use socket2::Type;
|
||||||
|
|
||||||
|
fn main() -> io::Result<()> {
|
||||||
|
let socket = socket2::Socket::new(socket2::Domain::IPV4, Type::STREAM, None)?;
|
||||||
|
let poller = polling::Poller::new()?;
|
||||||
|
unsafe {
|
||||||
|
poller.add(&socket, Event::new(0, true, true))?;
|
||||||
|
}
|
||||||
|
let addr = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 8080);
|
||||||
|
socket.set_nonblocking(true)?;
|
||||||
|
let _ = socket.connect(&addr.into());
|
||||||
|
|
||||||
|
let mut events = polling::Events::new();
|
||||||
|
|
||||||
|
events.clear();
|
||||||
|
poller.wait(&mut events, None)?;
|
||||||
|
|
||||||
|
let event = events.iter().next();
|
||||||
|
let event = match event {
|
||||||
|
Some(event) => event,
|
||||||
|
None => {
|
||||||
|
println!("no event");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
println!("event: {:?}", event);
|
||||||
|
if event.is_err().unwrap_or(false) {
|
||||||
|
println!("connect failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -27,22 +27,16 @@ mod example {
|
||||||
|
|
||||||
println!("Press Ctrl+C to exit...");
|
println!("Press Ctrl+C to exit...");
|
||||||
|
|
||||||
loop {
|
// Wait for events.
|
||||||
// Wait for events.
|
poller.wait(&mut events, None).unwrap();
|
||||||
poller.wait(&mut events, None).unwrap();
|
|
||||||
|
|
||||||
// Process events.
|
// Process events.
|
||||||
for ev in events.iter() {
|
let ev = events.iter().next().unwrap();
|
||||||
match ev.key {
|
match ev.key {
|
||||||
1 => {
|
1 => {
|
||||||
println!("SIGINT received");
|
println!("SIGINT received");
|
||||||
return;
|
|
||||||
}
|
|
||||||
_ => unreachable!(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
_ => unreachable!(),
|
||||||
events.clear();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
79
src/epoll.rs
79
src/epoll.rs
|
@ -1,20 +1,23 @@
|
||||||
//! Bindings to epoll (Linux, Android).
|
//! Bindings to epoll (Linux, Android).
|
||||||
|
|
||||||
use std::convert::TryInto;
|
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
|
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use rustix::event::{epoll, eventfd, EventfdFlags};
|
#[cfg(not(target_os = "redox"))]
|
||||||
use rustix::fd::OwnedFd;
|
use rustix::event::{eventfd, EventfdFlags};
|
||||||
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
|
#[cfg(not(target_os = "redox"))]
|
||||||
use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
|
|
||||||
use rustix::pipe::{pipe, pipe_with, PipeFlags};
|
|
||||||
use rustix::time::{
|
use rustix::time::{
|
||||||
timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
|
timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
|
||||||
Timespec,
|
Timespec,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use rustix::event::epoll;
|
||||||
|
use rustix::fd::OwnedFd;
|
||||||
|
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
|
||||||
|
use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
|
||||||
|
use rustix::pipe::{pipe, pipe_with, PipeFlags};
|
||||||
|
|
||||||
use crate::{Event, PollMode};
|
use crate::{Event, PollMode};
|
||||||
|
|
||||||
/// Interface to epoll.
|
/// Interface to epoll.
|
||||||
|
@ -27,6 +30,9 @@ pub struct Poller {
|
||||||
notifier: Notifier,
|
notifier: Notifier,
|
||||||
|
|
||||||
/// File descriptor for the timerfd that produces timeouts.
|
/// File descriptor for the timerfd that produces timeouts.
|
||||||
|
///
|
||||||
|
/// Redox does not support timerfd.
|
||||||
|
#[cfg(not(target_os = "redox"))]
|
||||||
timer_fd: Option<OwnedFd>,
|
timer_fd: Option<OwnedFd>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -40,6 +46,7 @@ impl Poller {
|
||||||
|
|
||||||
// Set up notifier and timerfd.
|
// Set up notifier and timerfd.
|
||||||
let notifier = Notifier::new()?;
|
let notifier = Notifier::new()?;
|
||||||
|
#[cfg(not(target_os = "redox"))]
|
||||||
let timer_fd = timerfd_create(
|
let timer_fd = timerfd_create(
|
||||||
TimerfdClockId::Monotonic,
|
TimerfdClockId::Monotonic,
|
||||||
TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
|
TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
|
||||||
|
@ -49,10 +56,12 @@ impl Poller {
|
||||||
let poller = Poller {
|
let poller = Poller {
|
||||||
epoll_fd,
|
epoll_fd,
|
||||||
notifier,
|
notifier,
|
||||||
|
#[cfg(not(target_os = "redox"))]
|
||||||
timer_fd,
|
timer_fd,
|
||||||
};
|
};
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
|
#[cfg(not(target_os = "redox"))]
|
||||||
if let Some(ref timer_fd) = poller.timer_fd {
|
if let Some(ref timer_fd) = poller.timer_fd {
|
||||||
poller.add(
|
poller.add(
|
||||||
timer_fd.as_raw_fd(),
|
timer_fd.as_raw_fd(),
|
||||||
|
@ -71,7 +80,6 @@ impl Poller {
|
||||||
tracing::trace!(
|
tracing::trace!(
|
||||||
epoll_fd = ?poller.epoll_fd.as_raw_fd(),
|
epoll_fd = ?poller.epoll_fd.as_raw_fd(),
|
||||||
notifier = ?poller.notifier,
|
notifier = ?poller.notifier,
|
||||||
timer_fd = ?poller.timer_fd,
|
|
||||||
"new",
|
"new",
|
||||||
);
|
);
|
||||||
Ok(poller)
|
Ok(poller)
|
||||||
|
@ -156,6 +164,7 @@ impl Poller {
|
||||||
);
|
);
|
||||||
let _enter = span.enter();
|
let _enter = span.enter();
|
||||||
|
|
||||||
|
#[cfg(not(target_os = "redox"))]
|
||||||
if let Some(ref timer_fd) = self.timer_fd {
|
if let Some(ref timer_fd) = self.timer_fd {
|
||||||
// Configure the timeout using timerfd.
|
// Configure the timeout using timerfd.
|
||||||
let new_val = Itimerspec {
|
let new_val = Itimerspec {
|
||||||
|
@ -182,12 +191,17 @@ impl Poller {
|
||||||
)?;
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(not(target_os = "redox"))]
|
||||||
|
let timer_fd = &self.timer_fd;
|
||||||
|
#[cfg(target_os = "redox")]
|
||||||
|
let timer_fd: Option<core::convert::Infallible> = None;
|
||||||
|
|
||||||
// Timeout in milliseconds for epoll.
|
// Timeout in milliseconds for epoll.
|
||||||
let timeout_ms = match (&self.timer_fd, timeout) {
|
let timeout_ms = match (timer_fd, timeout) {
|
||||||
(_, Some(t)) if t == Duration::from_secs(0) => 0,
|
(_, Some(t)) if t == Duration::from_secs(0) => 0,
|
||||||
(None, Some(t)) => {
|
(None, Some(t)) => {
|
||||||
// Round up to a whole millisecond.
|
// Round up to a whole millisecond.
|
||||||
let mut ms = t.as_millis().try_into().unwrap_or(std::i32::MAX);
|
let mut ms = t.as_millis().try_into().unwrap_or(i32::MAX);
|
||||||
if Duration::from_millis(ms as u64) < t {
|
if Duration::from_millis(ms as u64) < t {
|
||||||
ms = ms.saturating_add(1);
|
ms = ms.saturating_add(1);
|
||||||
}
|
}
|
||||||
|
@ -246,10 +260,10 @@ impl Drop for Poller {
|
||||||
"drop",
|
"drop",
|
||||||
epoll_fd = ?self.epoll_fd.as_raw_fd(),
|
epoll_fd = ?self.epoll_fd.as_raw_fd(),
|
||||||
notifier = ?self.notifier,
|
notifier = ?self.notifier,
|
||||||
timer_fd = ?self.timer_fd
|
|
||||||
);
|
);
|
||||||
let _enter = span.enter();
|
let _enter = span.enter();
|
||||||
|
|
||||||
|
#[cfg(not(target_os = "redox"))]
|
||||||
if let Some(timer_fd) = self.timer_fd.take() {
|
if let Some(timer_fd) = self.timer_fd.take() {
|
||||||
let _ = self.delete(timer_fd.as_fd());
|
let _ = self.delete(timer_fd.as_fd());
|
||||||
}
|
}
|
||||||
|
@ -258,6 +272,7 @@ impl Drop for Poller {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `timespec` value that equals zero.
|
/// `timespec` value that equals zero.
|
||||||
|
#[cfg(not(target_os = "redox"))]
|
||||||
const TS_ZERO: Timespec = unsafe { std::mem::transmute([0u8; std::mem::size_of::<Timespec>()]) };
|
const TS_ZERO: Timespec = unsafe { std::mem::transmute([0u8; std::mem::size_of::<Timespec>()]) };
|
||||||
|
|
||||||
/// Get the EPOLL flags for the interest.
|
/// Get the EPOLL flags for the interest.
|
||||||
|
@ -366,6 +381,19 @@ impl EventExtra {
|
||||||
pub fn is_pri(&self) -> bool {
|
pub fn is_pri(&self) -> bool {
|
||||||
self.flags.contains(epoll::EventFlags::PRI)
|
self.flags.contains(epoll::EventFlags::PRI)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn is_connect_failed(&self) -> Option<bool> {
|
||||||
|
Some(
|
||||||
|
self.flags.contains(epoll::EventFlags::ERR)
|
||||||
|
&& self.flags.contains(epoll::EventFlags::HUP),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn is_err(&self) -> Option<bool> {
|
||||||
|
Some(self.flags.contains(epoll::EventFlags::ERR))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The notifier for Linux.
|
/// The notifier for Linux.
|
||||||
|
@ -378,6 +406,7 @@ impl EventExtra {
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
enum Notifier {
|
enum Notifier {
|
||||||
/// The primary notifier, using eventfd.
|
/// The primary notifier, using eventfd.
|
||||||
|
#[cfg(not(target_os = "redox"))]
|
||||||
EventFd(OwnedFd),
|
EventFd(OwnedFd),
|
||||||
|
|
||||||
/// The fallback notifier, using a pipe.
|
/// The fallback notifier, using a pipe.
|
||||||
|
@ -394,19 +423,22 @@ impl Notifier {
|
||||||
/// Create a new notifier.
|
/// Create a new notifier.
|
||||||
fn new() -> io::Result<Self> {
|
fn new() -> io::Result<Self> {
|
||||||
// Skip eventfd for testing if necessary.
|
// Skip eventfd for testing if necessary.
|
||||||
if !cfg!(polling_test_epoll_pipe) {
|
#[cfg(not(target_os = "redox"))]
|
||||||
// Try to create an eventfd.
|
{
|
||||||
match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) {
|
if !cfg!(polling_test_epoll_pipe) {
|
||||||
Ok(fd) => {
|
// Try to create an eventfd.
|
||||||
tracing::trace!("created eventfd for notifier");
|
match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) {
|
||||||
return Ok(Notifier::EventFd(fd));
|
Ok(fd) => {
|
||||||
}
|
tracing::trace!("created eventfd for notifier");
|
||||||
|
return Ok(Notifier::EventFd(fd));
|
||||||
|
}
|
||||||
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
"eventfd() failed with error ({}), falling back to pipe",
|
"eventfd() failed with error ({}), falling back to pipe",
|
||||||
err
|
err
|
||||||
);
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -428,6 +460,7 @@ impl Notifier {
|
||||||
/// The file descriptor to register in the poller.
|
/// The file descriptor to register in the poller.
|
||||||
fn as_fd(&self) -> BorrowedFd<'_> {
|
fn as_fd(&self) -> BorrowedFd<'_> {
|
||||||
match self {
|
match self {
|
||||||
|
#[cfg(not(target_os = "redox"))]
|
||||||
Notifier::EventFd(fd) => fd.as_fd(),
|
Notifier::EventFd(fd) => fd.as_fd(),
|
||||||
Notifier::Pipe {
|
Notifier::Pipe {
|
||||||
read_pipe: read, ..
|
read_pipe: read, ..
|
||||||
|
@ -438,6 +471,7 @@ impl Notifier {
|
||||||
/// Notify the poller.
|
/// Notify the poller.
|
||||||
fn notify(&self) {
|
fn notify(&self) {
|
||||||
match self {
|
match self {
|
||||||
|
#[cfg(not(target_os = "redox"))]
|
||||||
Self::EventFd(fd) => {
|
Self::EventFd(fd) => {
|
||||||
let buf: [u8; 8] = 1u64.to_ne_bytes();
|
let buf: [u8; 8] = 1u64.to_ne_bytes();
|
||||||
let _ = write(fd, &buf);
|
let _ = write(fd, &buf);
|
||||||
|
@ -452,6 +486,7 @@ impl Notifier {
|
||||||
/// Clear the notification.
|
/// Clear the notification.
|
||||||
fn clear(&self) {
|
fn clear(&self) {
|
||||||
match self {
|
match self {
|
||||||
|
#[cfg(not(target_os = "redox"))]
|
||||||
Self::EventFd(fd) => {
|
Self::EventFd(fd) => {
|
||||||
let mut buf = [0u8; 8];
|
let mut buf = [0u8; 8];
|
||||||
let _ = read(fd, &mut buf);
|
let _ = read(fd, &mut buf);
|
||||||
|
|
|
@ -14,6 +14,8 @@ use std::ptr;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::Once;
|
use std::sync::Once;
|
||||||
|
|
||||||
|
use windows_sys::Wdk::Foundation::OBJECT_ATTRIBUTES;
|
||||||
|
use windows_sys::Wdk::Storage::FileSystem::FILE_OPEN;
|
||||||
use windows_sys::Win32::Foundation::{
|
use windows_sys::Win32::Foundation::{
|
||||||
CloseHandle, HANDLE, HMODULE, NTSTATUS, STATUS_NOT_FOUND, STATUS_PENDING, STATUS_SUCCESS,
|
CloseHandle, HANDLE, HMODULE, NTSTATUS, STATUS_NOT_FOUND, STATUS_PENDING, STATUS_SUCCESS,
|
||||||
UNICODE_STRING,
|
UNICODE_STRING,
|
||||||
|
@ -21,11 +23,9 @@ use windows_sys::Win32::Foundation::{
|
||||||
use windows_sys::Win32::Networking::WinSock::{
|
use windows_sys::Win32::Networking::WinSock::{
|
||||||
WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE_POLL, SOCKET_ERROR,
|
WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE_POLL, SOCKET_ERROR,
|
||||||
};
|
};
|
||||||
use windows_sys::Win32::Storage::FileSystem::{
|
use windows_sys::Win32::Storage::FileSystem::{FILE_SHARE_READ, FILE_SHARE_WRITE, SYNCHRONIZE};
|
||||||
FILE_OPEN, FILE_SHARE_READ, FILE_SHARE_WRITE, SYNCHRONIZE,
|
|
||||||
};
|
|
||||||
use windows_sys::Win32::System::LibraryLoader::{GetModuleHandleW, GetProcAddress};
|
use windows_sys::Win32::System::LibraryLoader::{GetModuleHandleW, GetProcAddress};
|
||||||
use windows_sys::Win32::System::WindowsProgramming::{IO_STATUS_BLOCK, OBJECT_ATTRIBUTES};
|
use windows_sys::Win32::System::IO::IO_STATUS_BLOCK;
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
#[repr(C)]
|
#[repr(C)]
|
||||||
|
|
|
@ -46,7 +46,6 @@ use pin_project_lite::pin_project;
|
||||||
|
|
||||||
use std::cell::UnsafeCell;
|
use std::cell::UnsafeCell;
|
||||||
use std::collections::hash_map::{Entry, HashMap};
|
use std::collections::hash_map::{Entry, HashMap};
|
||||||
use std::convert::TryFrom;
|
|
||||||
use std::ffi::c_void;
|
use std::ffi::c_void;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
|
@ -122,17 +121,19 @@ impl Poller {
|
||||||
pub(super) fn new() -> io::Result<Self> {
|
pub(super) fn new() -> io::Result<Self> {
|
||||||
// Make sure AFD is able to be used.
|
// Make sure AFD is able to be used.
|
||||||
if let Err(e) = afd::NtdllImports::force_load() {
|
if let Err(e) = afd::NtdllImports::force_load() {
|
||||||
return Err(crate::unsupported_error(format!(
|
return Err(io::Error::new(
|
||||||
"Failed to initialize unstable Windows functions: {}\nThis usually only happens for old Windows or Wine.",
|
io::ErrorKind::Unsupported,
|
||||||
e
|
AfdError::new("failed to initialize unstable Windows functions", e),
|
||||||
)));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create and destroy a single AFD to test if we support it.
|
// Create and destroy a single AFD to test if we support it.
|
||||||
Afd::<Packet>::new().map_err(|e| crate::unsupported_error(format!(
|
Afd::<Packet>::new().map_err(|e| {
|
||||||
"Failed to initialize \\Device\\Afd: {}\nThis usually only happens for old Windows or Wine.",
|
io::Error::new(
|
||||||
e,
|
io::ErrorKind::Unsupported,
|
||||||
)))?;
|
AfdError::new("failed to initialize \\Device\\Afd", e),
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
let port = IoCompletionPort::new(0)?;
|
let port = IoCompletionPort::new(0)?;
|
||||||
tracing::trace!(handle = ?port, "new");
|
tracing::trace!(handle = ?port, "new");
|
||||||
|
@ -680,6 +681,18 @@ impl EventExtra {
|
||||||
pub fn set_pri(&mut self, active: bool) {
|
pub fn set_pri(&mut self, active: bool) {
|
||||||
self.flags.set(AfdPollMask::RECEIVE_EXPEDITED, active);
|
self.flags.set(AfdPollMask::RECEIVE_EXPEDITED, active);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Check if TCP connect failed. Deprecated.
|
||||||
|
#[inline]
|
||||||
|
pub fn is_connect_failed(&self) -> Option<bool> {
|
||||||
|
Some(self.flags.intersects(AfdPollMask::CONNECT_FAIL))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if TCP connect failed.
|
||||||
|
#[inline]
|
||||||
|
pub fn is_err(&self) -> Option<bool> {
|
||||||
|
Some(self.flags.intersects(AfdPollMask::CONNECT_FAIL))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A packet used to wake up the poller with an event.
|
/// A packet used to wake up the poller with an event.
|
||||||
|
@ -1153,7 +1166,7 @@ enum WaitableStatus {
|
||||||
Idle,
|
Idle,
|
||||||
|
|
||||||
/// We are waiting on this handle to become signaled.
|
/// We are waiting on this handle to become signaled.
|
||||||
Waiting(WaitHandle),
|
Waiting(#[allow(dead_code)] WaitHandle),
|
||||||
|
|
||||||
/// This handle has been cancelled.
|
/// This handle has been cancelled.
|
||||||
Cancelled,
|
Cancelled,
|
||||||
|
@ -1327,6 +1340,54 @@ fn dur2timeout(dur: Duration) -> u32 {
|
||||||
.unwrap_or(INFINITE)
|
.unwrap_or(INFINITE)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// An error type that wraps around failing to open AFD.
|
||||||
|
struct AfdError {
|
||||||
|
/// String description of what happened.
|
||||||
|
description: &'static str,
|
||||||
|
|
||||||
|
/// The underlying system error.
|
||||||
|
system: io::Error,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AfdError {
|
||||||
|
#[inline]
|
||||||
|
fn new(description: &'static str, system: io::Error) -> Self {
|
||||||
|
Self {
|
||||||
|
description,
|
||||||
|
system,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for AfdError {
|
||||||
|
#[inline]
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("AfdError")
|
||||||
|
.field("description", &self.description)
|
||||||
|
.field("system", &self.system)
|
||||||
|
.field("note", &"probably caused by old Windows or Wine")
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for AfdError {
|
||||||
|
#[inline]
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"{}: {}\nThis error is usually caused by running on old Windows or Wine",
|
||||||
|
self.description, &self.system
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for AfdError {
|
||||||
|
#[inline]
|
||||||
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||||
|
Some(&self.system)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct CallOnDrop<F: FnMut()>(F);
|
struct CallOnDrop<F: FnMut()>(F);
|
||||||
|
|
||||||
impl<F: FnMut()> Drop for CallOnDrop<F> {
|
impl<F: FnMut()> Drop for CallOnDrop<F> {
|
||||||
|
|
|
@ -2,7 +2,6 @@
|
||||||
|
|
||||||
use super::dur2timeout;
|
use super::dur2timeout;
|
||||||
|
|
||||||
use std::convert::TryInto;
|
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
|
@ -297,11 +296,3 @@ impl<T: CompletionHandle> Drop for OverlappedEntry<T> {
|
||||||
drop(unsafe { self.packet() });
|
drop(unsafe { self.packet() });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct CallOnDrop<F: FnMut()>(F);
|
|
||||||
|
|
||||||
impl<F: FnMut()> Drop for CallOnDrop<F> {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
(self.0)();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
//! Bindings to kqueue (macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
|
//! Bindings to kqueue (macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
|
||||||
|
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
|
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
|
||||||
|
use std::sync::RwLock;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use rustix::event::kqueue;
|
use rustix::event::kqueue;
|
||||||
|
@ -15,6 +17,11 @@ pub struct Poller {
|
||||||
/// File descriptor for the kqueue instance.
|
/// File descriptor for the kqueue instance.
|
||||||
kqueue_fd: OwnedFd,
|
kqueue_fd: OwnedFd,
|
||||||
|
|
||||||
|
/// List of sources currently registered in this poller.
|
||||||
|
///
|
||||||
|
/// This is used to make sure the same source is not registered twice.
|
||||||
|
sources: RwLock<HashSet<SourceId>>,
|
||||||
|
|
||||||
/// Notification pipe for waking up the poller.
|
/// Notification pipe for waking up the poller.
|
||||||
///
|
///
|
||||||
/// On platforms that support `EVFILT_USER`, this uses that to wake up the poller. Otherwise, it
|
/// On platforms that support `EVFILT_USER`, this uses that to wake up the poller. Otherwise, it
|
||||||
|
@ -22,6 +29,23 @@ pub struct Poller {
|
||||||
notify: notify::Notify,
|
notify: notify::Notify,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Identifier for a source.
|
||||||
|
#[doc(hidden)]
|
||||||
|
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
|
||||||
|
pub enum SourceId {
|
||||||
|
/// Registered file descriptor.
|
||||||
|
Fd(RawFd),
|
||||||
|
|
||||||
|
/// Signal.
|
||||||
|
Signal(std::os::raw::c_int),
|
||||||
|
|
||||||
|
/// Process ID.
|
||||||
|
Pid(rustix::process::Pid),
|
||||||
|
|
||||||
|
/// Timer ID.
|
||||||
|
Timer(usize),
|
||||||
|
}
|
||||||
|
|
||||||
impl Poller {
|
impl Poller {
|
||||||
/// Creates a new poller.
|
/// Creates a new poller.
|
||||||
pub fn new() -> io::Result<Poller> {
|
pub fn new() -> io::Result<Poller> {
|
||||||
|
@ -31,6 +55,7 @@ impl Poller {
|
||||||
|
|
||||||
let poller = Poller {
|
let poller = Poller {
|
||||||
kqueue_fd,
|
kqueue_fd,
|
||||||
|
sources: RwLock::new(HashSet::new()),
|
||||||
notify: notify::Notify::new()?,
|
notify: notify::Notify::new()?,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -60,6 +85,8 @@ impl Poller {
|
||||||
///
|
///
|
||||||
/// The file descriptor must be valid and it must last until it is deleted.
|
/// The file descriptor must be valid and it must last until it is deleted.
|
||||||
pub unsafe fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
|
pub unsafe fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
|
||||||
|
self.add_source(SourceId::Fd(fd))?;
|
||||||
|
|
||||||
// File descriptors don't need to be added explicitly, so just modify the interest.
|
// File descriptors don't need to be added explicitly, so just modify the interest.
|
||||||
self.modify(BorrowedFd::borrow_raw(fd), ev, mode)
|
self.modify(BorrowedFd::borrow_raw(fd), ev, mode)
|
||||||
}
|
}
|
||||||
|
@ -79,6 +106,8 @@ impl Poller {
|
||||||
};
|
};
|
||||||
let _enter = span.as_ref().map(|s| s.enter());
|
let _enter = span.as_ref().map(|s| s.enter());
|
||||||
|
|
||||||
|
self.has_source(SourceId::Fd(fd.as_raw_fd()))?;
|
||||||
|
|
||||||
let mode_flags = mode_to_flags(mode);
|
let mode_flags = mode_to_flags(mode);
|
||||||
|
|
||||||
let read_flags = if ev.readable {
|
let read_flags = if ev.readable {
|
||||||
|
@ -143,10 +172,57 @@ impl Poller {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add a source to the sources set.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn add_source(&self, source: SourceId) -> io::Result<()> {
|
||||||
|
if self
|
||||||
|
.sources
|
||||||
|
.write()
|
||||||
|
.unwrap_or_else(|e| e.into_inner())
|
||||||
|
.insert(source)
|
||||||
|
{
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(io::Error::from(io::ErrorKind::AlreadyExists))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tell if a source is currently inside the set.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn has_source(&self, source: SourceId) -> io::Result<()> {
|
||||||
|
if self
|
||||||
|
.sources
|
||||||
|
.read()
|
||||||
|
.unwrap_or_else(|e| e.into_inner())
|
||||||
|
.contains(&source)
|
||||||
|
{
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(io::Error::from(io::ErrorKind::NotFound))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove a source from the sources set.
|
||||||
|
#[inline]
|
||||||
|
pub(crate) fn remove_source(&self, source: SourceId) -> io::Result<()> {
|
||||||
|
if self
|
||||||
|
.sources
|
||||||
|
.write()
|
||||||
|
.unwrap_or_else(|e| e.into_inner())
|
||||||
|
.remove(&source)
|
||||||
|
{
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(io::Error::from(io::ErrorKind::NotFound))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Deletes a file descriptor.
|
/// Deletes a file descriptor.
|
||||||
pub fn delete(&self, fd: BorrowedFd<'_>) -> io::Result<()> {
|
pub fn delete(&self, fd: BorrowedFd<'_>) -> io::Result<()> {
|
||||||
// Simply delete interest in the file descriptor.
|
// Simply delete interest in the file descriptor.
|
||||||
self.modify(fd, Event::none(0), PollMode::Oneshot)
|
self.modify(fd, Event::none(0), PollMode::Oneshot)?;
|
||||||
|
|
||||||
|
self.remove_source(SourceId::Fd(fd.as_raw_fd()))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Waits for I/O events with an optional timeout.
|
/// Waits for I/O events with an optional timeout.
|
||||||
|
@ -295,6 +371,16 @@ impl EventExtra {
|
||||||
pub fn is_pri(&self) -> bool {
|
pub fn is_pri(&self) -> bool {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn is_connect_failed(&self) -> Option<bool> {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn is_err(&self) -> Option<bool> {
|
||||||
|
None
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags {
|
pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags {
|
||||||
|
|
181
src/lib.rs
181
src/lib.rs
|
@ -1,11 +1,11 @@
|
||||||
//! Portable interface to epoll, kqueue, event ports, and IOCP.
|
//! Portable interface to epoll, kqueue, event ports, and IOCP.
|
||||||
//!
|
//!
|
||||||
//! Supported platforms:
|
//! Supported platforms:
|
||||||
//! - [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android
|
//! - [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android, RedoxOS
|
||||||
//! - [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD,
|
//! - [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD,
|
||||||
//! DragonFly BSD
|
//! DragonFly BSD
|
||||||
//! - [event ports](https://illumos.org/man/port_create): illumos, Solaris
|
//! - [event ports](https://illumos.org/man/port_create): illumos, Solaris
|
||||||
//! - [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, other Unix systems
|
//! - [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, HermitOS, other Unix systems
|
||||||
//! - [IOCP](https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports): Windows, Wine (version 7.13+)
|
//! - [IOCP](https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports): Windows, Wine (version 7.13+)
|
||||||
//!
|
//!
|
||||||
//! By default, polling is done in oneshot mode, which means interest in I/O events needs to
|
//! By default, polling is done in oneshot mode, which means interest in I/O events needs to
|
||||||
|
@ -70,8 +70,7 @@ use std::marker::PhantomData;
|
||||||
use std::num::NonZeroUsize;
|
use std::num::NonZeroUsize;
|
||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
use std::usize;
|
|
||||||
|
|
||||||
use cfg_if::cfg_if;
|
use cfg_if::cfg_if;
|
||||||
|
|
||||||
|
@ -81,7 +80,11 @@ cfg_if! {
|
||||||
if #[cfg(polling_test_poll_backend)] {
|
if #[cfg(polling_test_poll_backend)] {
|
||||||
mod poll;
|
mod poll;
|
||||||
use poll as sys;
|
use poll as sys;
|
||||||
} else if #[cfg(any(target_os = "linux", target_os = "android"))] {
|
} else if #[cfg(any(
|
||||||
|
target_os = "linux",
|
||||||
|
target_os = "android",
|
||||||
|
target_os = "redox"
|
||||||
|
))] {
|
||||||
mod epoll;
|
mod epoll;
|
||||||
use epoll as sys;
|
use epoll as sys;
|
||||||
} else if #[cfg(any(
|
} else if #[cfg(any(
|
||||||
|
@ -104,6 +107,7 @@ cfg_if! {
|
||||||
use kqueue as sys;
|
use kqueue as sys;
|
||||||
} else if #[cfg(any(
|
} else if #[cfg(any(
|
||||||
target_os = "vxworks",
|
target_os = "vxworks",
|
||||||
|
target_os = "hermit",
|
||||||
target_os = "fuchsia",
|
target_os = "fuchsia",
|
||||||
target_os = "horizon",
|
target_os = "horizon",
|
||||||
unix,
|
unix,
|
||||||
|
@ -121,7 +125,7 @@ cfg_if! {
|
||||||
pub mod os;
|
pub mod os;
|
||||||
|
|
||||||
/// Key associated with notifications.
|
/// Key associated with notifications.
|
||||||
const NOTIFY_KEY: usize = std::usize::MAX;
|
const NOTIFY_KEY: usize = usize::MAX;
|
||||||
|
|
||||||
/// Indicates that a file descriptor or socket can read or write without blocking.
|
/// Indicates that a file descriptor or socket can read or write without blocking.
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
@ -181,52 +185,46 @@ pub enum PollMode {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Event {
|
impl Event {
|
||||||
/// All kinds of events (readable and writable).
|
/// Create a new event.
|
||||||
///
|
pub const fn new(key: usize, readable: bool, writable: bool) -> Event {
|
||||||
/// Equivalent to: `Event { key, readable: true, writable: true }`
|
|
||||||
pub const fn all(key: usize) -> Event {
|
|
||||||
Event {
|
Event {
|
||||||
key,
|
key,
|
||||||
readable: true,
|
readable,
|
||||||
writable: true,
|
writable,
|
||||||
extra: sys::EventExtra::empty(),
|
extra: sys::EventExtra::empty(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// All kinds of events (readable and writable).
|
||||||
|
///
|
||||||
|
/// Equivalent to: `Event::new(key, true, true)`
|
||||||
|
#[inline]
|
||||||
|
pub const fn all(key: usize) -> Event {
|
||||||
|
Event::new(key, true, true)
|
||||||
|
}
|
||||||
|
|
||||||
/// Only the readable event.
|
/// Only the readable event.
|
||||||
///
|
///
|
||||||
/// Equivalent to: `Event { key, readable: true, writable: false }`
|
/// Equivalent to: `Event::new(key, true, false)`
|
||||||
|
#[inline]
|
||||||
pub const fn readable(key: usize) -> Event {
|
pub const fn readable(key: usize) -> Event {
|
||||||
Event {
|
Event::new(key, true, false)
|
||||||
key,
|
|
||||||
readable: true,
|
|
||||||
writable: false,
|
|
||||||
extra: sys::EventExtra::empty(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Only the writable event.
|
/// Only the writable event.
|
||||||
///
|
///
|
||||||
/// Equivalent to: `Event { key, readable: false, writable: true }`
|
/// Equivalent to: `Event::new(key, false, true)`
|
||||||
|
#[inline]
|
||||||
pub const fn writable(key: usize) -> Event {
|
pub const fn writable(key: usize) -> Event {
|
||||||
Event {
|
Event::new(key, false, true)
|
||||||
key,
|
|
||||||
readable: false,
|
|
||||||
writable: true,
|
|
||||||
extra: sys::EventExtra::empty(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// No events.
|
/// No events.
|
||||||
///
|
///
|
||||||
/// Equivalent to: `Event { key, readable: false, writable: false }`
|
/// Equivalent to: `Event::new(key, false, false)`
|
||||||
|
#[inline]
|
||||||
pub const fn none(key: usize) -> Event {
|
pub const fn none(key: usize) -> Event {
|
||||||
Event {
|
Event::new(key, false, false)
|
||||||
key,
|
|
||||||
readable: false,
|
|
||||||
writable: false,
|
|
||||||
extra: sys::EventExtra::empty(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add interruption events to this interest.
|
/// Add interruption events to this interest.
|
||||||
|
@ -339,6 +337,86 @@ impl Event {
|
||||||
self.extra.is_pri()
|
self.extra.is_pri()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tells if this event is the result of a connection failure.
|
||||||
|
///
|
||||||
|
/// This function checks if a TCP connection has failed. It corresponds to the `EPOLLERR` or `EPOLLHUP` event in Linux
|
||||||
|
/// and `CONNECT_FAILED` event in Windows IOCP.
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```
|
||||||
|
/// use std::{io, net};
|
||||||
|
/// // Assuming polling and socket2 are included as dependencies in Cargo.toml
|
||||||
|
/// use polling::Event;
|
||||||
|
/// use socket2::Type;
|
||||||
|
///
|
||||||
|
/// fn main() -> io::Result<()> {
|
||||||
|
/// let socket = socket2::Socket::new(socket2::Domain::IPV4, Type::STREAM, None)?;
|
||||||
|
/// let poller = polling::Poller::new()?;
|
||||||
|
/// unsafe {
|
||||||
|
/// poller.add(&socket, Event::new(0, true, true))?;
|
||||||
|
/// }
|
||||||
|
/// let addr = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 8080);
|
||||||
|
/// socket.set_nonblocking(true)?;
|
||||||
|
/// let _ = socket.connect(&addr.into());
|
||||||
|
///
|
||||||
|
/// let mut events = polling::Events::new();
|
||||||
|
///
|
||||||
|
/// events.clear();
|
||||||
|
/// poller.wait(&mut events, None)?;
|
||||||
|
///
|
||||||
|
/// let event = events.iter().next();
|
||||||
|
///
|
||||||
|
/// let event = match event {
|
||||||
|
/// Some(event) => event,
|
||||||
|
/// None => {
|
||||||
|
/// println!("no event");
|
||||||
|
/// return Ok(());
|
||||||
|
/// },
|
||||||
|
/// };
|
||||||
|
///
|
||||||
|
/// println!("event: {:?}", event);
|
||||||
|
/// if event
|
||||||
|
/// .is_connect_failed()
|
||||||
|
/// .unwrap_or_default()
|
||||||
|
/// {
|
||||||
|
/// println!("connect failed");
|
||||||
|
/// }
|
||||||
|
///
|
||||||
|
/// Ok(())
|
||||||
|
/// }
|
||||||
|
/// ```
|
||||||
|
///
|
||||||
|
/// # Returns
|
||||||
|
///
|
||||||
|
/// Returns `Some(true)` if the connection has failed, `Some(false)` if the connection has not failed,
|
||||||
|
/// or `None` if the platform does not support detecting this condition.
|
||||||
|
#[inline]
|
||||||
|
#[deprecated(
|
||||||
|
since = "3.4.0",
|
||||||
|
note = "use `is_err` in combination of is_hup instead, see documentation for `is_err`"
|
||||||
|
)]
|
||||||
|
pub fn is_connect_failed(&self) -> Option<bool> {
|
||||||
|
self.extra.is_connect_failed()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tells if this event is the result of a connection failure.
|
||||||
|
///
|
||||||
|
/// This function checks if an error exist, particularly useful in detecting if TCP connection failed. It corresponds to the `EPOLLERR` event in Linux
|
||||||
|
/// and `CONNECT_FAILED` event in Windows IOCP.
|
||||||
|
///
|
||||||
|
/// ## Caveats
|
||||||
|
///
|
||||||
|
/// In `epoll`, a TCP connection failure is indicated by `EPOLLERR` + `EPOLLHUP`, though just `EPOLLERR` is enough to indicate a connection failure.
|
||||||
|
/// EPOLLHUP may happen when we haven't event called `connect` on the socket, but it is still a valid event to check for.
|
||||||
|
///
|
||||||
|
/// Returns `Some(true)` if the connection has failed, `Some(false)` if there is no error,
|
||||||
|
/// or `None` if the platform does not support detecting this condition.
|
||||||
|
#[inline]
|
||||||
|
pub fn is_err(&self) -> Option<bool> {
|
||||||
|
self.extra.is_err()
|
||||||
|
}
|
||||||
|
|
||||||
/// Remove any extra information from this event.
|
/// Remove any extra information from this event.
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn clear_extra(&mut self) {
|
pub fn clear_extra(&mut self) {
|
||||||
|
@ -616,8 +694,8 @@ impl Poller {
|
||||||
|
|
||||||
/// Waits for at least one I/O event and returns the number of new events.
|
/// Waits for at least one I/O event and returns the number of new events.
|
||||||
///
|
///
|
||||||
/// New events will be appended to `events`. If necessary, make sure to clear the [`Vec`]
|
/// New events will be appended to `events`. If necessary, make sure to clear the
|
||||||
/// before calling [`wait()`][`Poller::wait()`]!
|
/// [`Events`][Events::clear()] before calling [`wait()`][`Poller::wait()`]!
|
||||||
///
|
///
|
||||||
/// This method will return with no new events if a notification is delivered by the
|
/// This method will return with no new events if a notification is delivered by the
|
||||||
/// [`notify()`] method, or the timeout is reached. Sometimes it may even return with no events
|
/// [`notify()`] method, or the timeout is reached. Sometimes it may even return with no events
|
||||||
|
@ -658,14 +736,30 @@ impl Poller {
|
||||||
let _enter = span.enter();
|
let _enter = span.enter();
|
||||||
|
|
||||||
if let Ok(_lock) = self.lock.try_lock() {
|
if let Ok(_lock) = self.lock.try_lock() {
|
||||||
// Wait for I/O events.
|
let deadline = timeout.and_then(|timeout| Instant::now().checked_add(timeout));
|
||||||
self.poller.wait(&mut events.events, timeout)?;
|
|
||||||
|
|
||||||
// Clear the notification, if any.
|
loop {
|
||||||
self.notified.swap(false, Ordering::SeqCst);
|
// Figure out how long to wait for.
|
||||||
|
let timeout =
|
||||||
|
deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
|
||||||
|
|
||||||
// Indicate number of events.
|
// Wait for I/O events.
|
||||||
Ok(events.len())
|
if let Err(e) = self.poller.wait(&mut events.events, timeout) {
|
||||||
|
// If the wait was interrupted by a signal, clear events and try again.
|
||||||
|
if e.kind() == io::ErrorKind::Interrupted {
|
||||||
|
events.clear();
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear the notification, if any.
|
||||||
|
self.notified.swap(false, Ordering::SeqCst);
|
||||||
|
|
||||||
|
// Indicate number of events.
|
||||||
|
return Ok(events.len());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tracing::trace!("wait: skipping because another thread is already waiting on I/O");
|
tracing::trace!("wait: skipping because another thread is already waiting on I/O");
|
||||||
Ok(0)
|
Ok(0)
|
||||||
|
@ -945,8 +1039,11 @@ impl fmt::Debug for Poller {
|
||||||
}
|
}
|
||||||
|
|
||||||
cfg_if! {
|
cfg_if! {
|
||||||
if #[cfg(unix)] {
|
if #[cfg(any(unix, target_os = "hermit"))] {
|
||||||
|
#[cfg(unix)]
|
||||||
use std::os::unix::io::{AsRawFd, RawFd, AsFd, BorrowedFd};
|
use std::os::unix::io::{AsRawFd, RawFd, AsFd, BorrowedFd};
|
||||||
|
#[cfg(target_os = "hermit")]
|
||||||
|
use std::os::hermit::io::{AsRawFd, RawFd, AsFd, BorrowedFd};
|
||||||
|
|
||||||
/// A resource with a raw file descriptor.
|
/// A resource with a raw file descriptor.
|
||||||
pub trait AsRawSource {
|
pub trait AsRawSource {
|
||||||
|
|
|
@ -20,6 +20,7 @@ pub mod iocp;
|
||||||
|
|
||||||
mod __private {
|
mod __private {
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
|
#[allow(dead_code)]
|
||||||
pub trait PollerSealed {}
|
pub trait PollerSealed {}
|
||||||
|
|
||||||
impl PollerSealed for crate::Poller {}
|
impl PollerSealed for crate::Poller {}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
//! Functionality that is only availale for IOCP-based platforms.
|
//! Functionality that is only available for IOCP-based platforms.
|
||||||
|
|
||||||
pub use crate::sys::CompletionPacket;
|
pub use crate::sys::CompletionPacket;
|
||||||
|
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
//! Functionality that is only available for `kqueue`-based platforms.
|
//! Functionality that is only available for `kqueue`-based platforms.
|
||||||
|
|
||||||
use crate::sys::mode_to_flags;
|
use crate::sys::{mode_to_flags, SourceId};
|
||||||
use crate::{PollMode, Poller};
|
use crate::{PollMode, Poller};
|
||||||
|
|
||||||
use std::io;
|
use std::io;
|
||||||
|
use std::marker::PhantomData;
|
||||||
use std::process::Child;
|
use std::process::Child;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -98,10 +99,13 @@ impl<F: Filter> PollerKqueueExt<F> for Poller {
|
||||||
#[inline(always)]
|
#[inline(always)]
|
||||||
fn add_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> {
|
fn add_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> {
|
||||||
// No difference between adding and modifying in kqueue.
|
// No difference between adding and modifying in kqueue.
|
||||||
|
self.poller.add_source(filter.source_id())?;
|
||||||
self.modify_filter(filter, key, mode)
|
self.modify_filter(filter, key, mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn modify_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> {
|
fn modify_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> {
|
||||||
|
self.poller.has_source(filter.source_id())?;
|
||||||
|
|
||||||
// Convert the filter into a kevent.
|
// Convert the filter into a kevent.
|
||||||
let event = filter.filter(kqueue::EventFlags::ADD | mode_to_flags(mode), key);
|
let event = filter.filter(kqueue::EventFlags::ADD | mode_to_flags(mode), key);
|
||||||
|
|
||||||
|
@ -114,7 +118,9 @@ impl<F: Filter> PollerKqueueExt<F> for Poller {
|
||||||
let event = filter.filter(kqueue::EventFlags::DELETE, 0);
|
let event = filter.filter(kqueue::EventFlags::DELETE, 0);
|
||||||
|
|
||||||
// Delete the filter.
|
// Delete the filter.
|
||||||
self.poller.submit_changes([event])
|
self.poller.submit_changes([event])?;
|
||||||
|
|
||||||
|
self.poller.remove_source(filter.source_id())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,6 +132,11 @@ unsafe impl<T: FilterSealed + ?Sized> FilterSealed for &T {
|
||||||
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event {
|
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event {
|
||||||
(**self).filter(flags, key)
|
(**self).filter(flags, key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
fn source_id(&self) -> SourceId {
|
||||||
|
(**self).source_id()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Filter + ?Sized> Filter for &T {}
|
impl<T: Filter + ?Sized> Filter for &T {}
|
||||||
|
@ -149,6 +160,11 @@ unsafe impl FilterSealed for Signal {
|
||||||
key as _,
|
key as _,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
fn source_id(&self) -> SourceId {
|
||||||
|
SourceId::Signal(self.0)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Filter for Signal {}
|
impl Filter for Signal {}
|
||||||
|
@ -156,11 +172,14 @@ impl Filter for Signal {}
|
||||||
/// Monitor a child process.
|
/// Monitor a child process.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Process<'a> {
|
pub struct Process<'a> {
|
||||||
/// The child process to monitor.
|
/// The process ID to monitor.
|
||||||
child: &'a Child,
|
pid: rustix::process::Pid,
|
||||||
|
|
||||||
/// The operation to monitor.
|
/// The operation to monitor.
|
||||||
ops: ProcessOps,
|
ops: ProcessOps,
|
||||||
|
|
||||||
|
/// Lifetime of the underlying process.
|
||||||
|
_lt: PhantomData<&'a Child>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The operations that a monitored process can perform.
|
/// The operations that a monitored process can perform.
|
||||||
|
@ -185,7 +204,24 @@ impl<'a> Process<'a> {
|
||||||
/// Once registered into the `Poller`, the `Child` object must outlive this filter's
|
/// Once registered into the `Poller`, the `Child` object must outlive this filter's
|
||||||
/// registration into the poller.
|
/// registration into the poller.
|
||||||
pub unsafe fn new(child: &'a Child, ops: ProcessOps) -> Self {
|
pub unsafe fn new(child: &'a Child, ops: ProcessOps) -> Self {
|
||||||
Self { child, ops }
|
Self {
|
||||||
|
pid: rustix::process::Pid::from_child(child),
|
||||||
|
ops,
|
||||||
|
_lt: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a `Process` from a PID.
|
||||||
|
///
|
||||||
|
/// # Safety
|
||||||
|
///
|
||||||
|
/// The PID must be tied to an actual child process.
|
||||||
|
pub unsafe fn from_pid(pid: std::num::NonZeroI32, ops: ProcessOps) -> Self {
|
||||||
|
Self {
|
||||||
|
pid: unsafe { rustix::process::Pid::from_raw_unchecked(pid.get()) },
|
||||||
|
ops,
|
||||||
|
_lt: PhantomData,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,13 +236,20 @@ unsafe impl FilterSealed for Process<'_> {
|
||||||
|
|
||||||
kqueue::Event::new(
|
kqueue::Event::new(
|
||||||
kqueue::EventFilter::Proc {
|
kqueue::EventFilter::Proc {
|
||||||
pid: rustix::process::Pid::from_child(self.child),
|
// SAFETY: We know that the PID is nonzero.
|
||||||
|
pid: self.pid,
|
||||||
flags: events,
|
flags: events,
|
||||||
},
|
},
|
||||||
flags | kqueue::EventFlags::RECEIPT,
|
flags | kqueue::EventFlags::RECEIPT,
|
||||||
key as _,
|
key as _,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
fn source_id(&self) -> SourceId {
|
||||||
|
// SAFETY: We know that the PID is nonzero
|
||||||
|
SourceId::Pid(self.pid)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Filter for Process<'_> {}
|
impl Filter for Process<'_> {}
|
||||||
|
@ -234,11 +277,17 @@ unsafe impl FilterSealed for Timer {
|
||||||
key as _,
|
key as _,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline(always)]
|
||||||
|
fn source_id(&self) -> SourceId {
|
||||||
|
SourceId::Timer(self.id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Filter for Timer {}
|
impl Filter for Timer {}
|
||||||
|
|
||||||
mod __private {
|
mod __private {
|
||||||
|
use crate::sys::SourceId;
|
||||||
use rustix::event::kqueue;
|
use rustix::event::kqueue;
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
|
@ -247,5 +296,8 @@ mod __private {
|
||||||
///
|
///
|
||||||
/// This filter's flags must have `EV_RECEIPT`.
|
/// This filter's flags must have `EV_RECEIPT`.
|
||||||
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event;
|
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event;
|
||||||
|
|
||||||
|
/// Get the source ID for this source.
|
||||||
|
fn source_id(&self) -> SourceId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
265
src/poll.rs
265
src/poll.rs
|
@ -1,14 +1,17 @@
|
||||||
//! Bindings to poll (VxWorks, Fuchsia, other Unix systems).
|
//! Bindings to poll (VxWorks, Fuchsia, other Unix systems).
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::convert::TryInto;
|
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||||
use std::sync::{Condvar, Mutex};
|
use std::sync::{Condvar, Mutex};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use rustix::event::{poll, PollFd, PollFlags};
|
#[cfg(not(target_os = "hermit"))]
|
||||||
use rustix::fd::{AsFd, AsRawFd, BorrowedFd};
|
use rustix::fd::{AsFd, AsRawFd, BorrowedFd};
|
||||||
|
#[cfg(target_os = "hermit")]
|
||||||
|
use std::os::hermit::io::{AsFd, AsRawFd, BorrowedFd};
|
||||||
|
|
||||||
|
use syscall::{poll, PollFd, PollFlags};
|
||||||
|
|
||||||
// std::os::unix doesn't exist on Fuchsia
|
// std::os::unix doesn't exist on Fuchsia
|
||||||
type RawFd = std::os::raw::c_int;
|
type RawFd = std::os::raw::c_int;
|
||||||
|
@ -427,6 +430,16 @@ impl EventExtra {
|
||||||
pub fn is_pri(&self) -> bool {
|
pub fn is_pri(&self) -> bool {
|
||||||
self.flags.contains(PollFlags::PRI)
|
self.flags.contains(PollFlags::PRI)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn is_connect_failed(&self) -> Option<bool> {
|
||||||
|
Some(self.flags.contains(PollFlags::ERR) || self.flags.contains(PollFlags::HUP))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn is_err(&self) -> Option<bool> {
|
||||||
|
Some(self.flags.contains(PollFlags::ERR))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn cvt_mode_as_remove(mode: PollMode) -> io::Result<bool> {
|
fn cvt_mode_as_remove(mode: PollMode) -> io::Result<bool> {
|
||||||
|
@ -439,7 +452,182 @@ fn cvt_mode_as_remove(mode: PollMode) -> io::Result<bool> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(target_os = "espidf"))]
|
#[cfg(unix)]
|
||||||
|
mod syscall {
|
||||||
|
pub(super) use rustix::event::{poll, PollFd, PollFlags};
|
||||||
|
|
||||||
|
#[cfg(target_os = "espidf")]
|
||||||
|
pub(super) use rustix::event::{eventfd, EventfdFlags};
|
||||||
|
#[cfg(target_os = "espidf")]
|
||||||
|
pub(super) use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
|
||||||
|
#[cfg(target_os = "espidf")]
|
||||||
|
pub(super) use rustix::io::{read, write};
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(target_os = "hermit")]
|
||||||
|
mod syscall {
|
||||||
|
// TODO: Remove this shim once HermitOS is supported in Rustix.
|
||||||
|
|
||||||
|
use std::fmt;
|
||||||
|
use std::io;
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
use std::ops::BitOr;
|
||||||
|
|
||||||
|
pub(super) use std::os::hermit::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd, RawFd};
|
||||||
|
|
||||||
|
/// Create an eventfd.
|
||||||
|
pub(super) fn eventfd(count: u64, _flags: EventfdFlags) -> io::Result<OwnedFd> {
|
||||||
|
let fd = unsafe { hermit_abi::eventfd(count, 0) };
|
||||||
|
|
||||||
|
if fd < 0 {
|
||||||
|
Err(io::Error::from_raw_os_error(unsafe {
|
||||||
|
hermit_abi::get_errno()
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
Ok(unsafe { OwnedFd::from_raw_fd(fd) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Read some bytes.
|
||||||
|
pub(super) fn read(fd: BorrowedFd<'_>, bytes: &mut [u8]) -> io::Result<usize> {
|
||||||
|
let count = unsafe { hermit_abi::read(fd.as_raw_fd(), bytes.as_mut_ptr(), bytes.len()) };
|
||||||
|
|
||||||
|
cvt(count)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Write some bytes.
|
||||||
|
pub(super) fn write(fd: BorrowedFd<'_>, bytes: &[u8]) -> io::Result<usize> {
|
||||||
|
let count = unsafe { hermit_abi::write(fd.as_raw_fd(), bytes.as_ptr(), bytes.len()) };
|
||||||
|
|
||||||
|
cvt(count)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Safe wrapper around the `poll` system call.
|
||||||
|
pub(super) fn poll(fds: &mut [PollFd<'_>], timeout: i32) -> io::Result<usize> {
|
||||||
|
let call = unsafe {
|
||||||
|
hermit_abi::poll(
|
||||||
|
fds.as_mut_ptr() as *mut hermit_abi::pollfd,
|
||||||
|
fds.len(),
|
||||||
|
timeout,
|
||||||
|
)
|
||||||
|
};
|
||||||
|
|
||||||
|
cvt(call as isize)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Safe wrapper around `pollfd`.
|
||||||
|
#[repr(transparent)]
|
||||||
|
pub(super) struct PollFd<'a> {
|
||||||
|
inner: hermit_abi::pollfd,
|
||||||
|
_lt: PhantomData<BorrowedFd<'a>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> PollFd<'a> {
|
||||||
|
pub(super) fn from_borrowed_fd(fd: BorrowedFd<'a>, inflags: PollFlags) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: hermit_abi::pollfd {
|
||||||
|
fd: fd.as_raw_fd(),
|
||||||
|
events: inflags.0,
|
||||||
|
revents: 0,
|
||||||
|
},
|
||||||
|
_lt: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) fn revents(&self) -> PollFlags {
|
||||||
|
PollFlags(self.inner.revents)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsFd for PollFd<'_> {
|
||||||
|
fn as_fd(&self) -> BorrowedFd<'_> {
|
||||||
|
unsafe { BorrowedFd::borrow_raw(self.inner.fd) }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Debug for PollFd<'_> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
f.debug_struct("PollFd")
|
||||||
|
.field("fd", &format_args!("0x{:x}", self.inner.fd))
|
||||||
|
.field("events", &PollFlags(self.inner.events))
|
||||||
|
.field("revents", &PollFlags(self.inner.revents))
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Wrapper around polling flags.
|
||||||
|
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
|
||||||
|
pub(super) struct PollFlags(i16);
|
||||||
|
|
||||||
|
impl PollFlags {
|
||||||
|
/// Empty set of flags.
|
||||||
|
pub(super) const fn empty() -> Self {
|
||||||
|
Self(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(super) const IN: PollFlags = PollFlags(hermit_abi::POLLIN);
|
||||||
|
pub(super) const OUT: PollFlags = PollFlags(hermit_abi::POLLOUT);
|
||||||
|
pub(super) const WRBAND: PollFlags = PollFlags(hermit_abi::POLLWRBAND);
|
||||||
|
pub(super) const ERR: PollFlags = PollFlags(hermit_abi::POLLERR);
|
||||||
|
pub(super) const HUP: PollFlags = PollFlags(hermit_abi::POLLHUP);
|
||||||
|
pub(super) const PRI: PollFlags = PollFlags(hermit_abi::POLLPRI);
|
||||||
|
|
||||||
|
/// Tell if this contains some flags.
|
||||||
|
pub(super) fn contains(self, flags: PollFlags) -> bool {
|
||||||
|
self.0 & flags.0 != 0
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set a flag.
|
||||||
|
pub(super) fn set(&mut self, flags: PollFlags, set: bool) {
|
||||||
|
if set {
|
||||||
|
self.0 |= flags.0;
|
||||||
|
} else {
|
||||||
|
self.0 &= !(flags.0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tell if this is empty.
|
||||||
|
pub(super) fn is_empty(self) -> bool {
|
||||||
|
self.0 == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tell if this intersects with some flags.
|
||||||
|
pub(super) fn intersects(self, flags: PollFlags) -> bool {
|
||||||
|
self.contains(flags)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BitOr for PollFlags {
|
||||||
|
type Output = PollFlags;
|
||||||
|
|
||||||
|
fn bitor(self, rhs: Self) -> Self::Output {
|
||||||
|
Self(self.0 | rhs.0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Copy, Clone)]
|
||||||
|
pub(super) struct EventfdFlags;
|
||||||
|
|
||||||
|
impl EventfdFlags {
|
||||||
|
pub(super) fn empty() -> Self {
|
||||||
|
Self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Convert a number to an actual result.
|
||||||
|
#[inline]
|
||||||
|
fn cvt(len: isize) -> io::Result<usize> {
|
||||||
|
if len < 0 {
|
||||||
|
Err(io::Error::from_raw_os_error(unsafe {
|
||||||
|
hermit_abi::get_errno()
|
||||||
|
}))
|
||||||
|
} else {
|
||||||
|
Ok(len as usize)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(not(any(target_os = "espidf", target_os = "hermit")))]
|
||||||
mod notify {
|
mod notify {
|
||||||
use std::io;
|
use std::io;
|
||||||
|
|
||||||
|
@ -447,7 +635,9 @@ mod notify {
|
||||||
use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
|
use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
|
||||||
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
|
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
|
||||||
use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
|
use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
|
||||||
use rustix::pipe::{pipe, pipe_with, PipeFlags};
|
#[cfg(not(target_os = "haiku"))]
|
||||||
|
use rustix::pipe::pipe_with;
|
||||||
|
use rustix::pipe::{pipe, PipeFlags};
|
||||||
|
|
||||||
/// A notification pipe.
|
/// A notification pipe.
|
||||||
///
|
///
|
||||||
|
@ -468,12 +658,18 @@ mod notify {
|
||||||
impl Notify {
|
impl Notify {
|
||||||
/// Creates a new notification pipe.
|
/// Creates a new notification pipe.
|
||||||
pub(super) fn new() -> io::Result<Self> {
|
pub(super) fn new() -> io::Result<Self> {
|
||||||
let (read_pipe, write_pipe) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| {
|
let fallback_pipe = |_| {
|
||||||
let (read_pipe, write_pipe) = pipe()?;
|
let (read_pipe, write_pipe) = pipe()?;
|
||||||
fcntl_setfd(&read_pipe, fcntl_getfd(&read_pipe)? | FdFlags::CLOEXEC)?;
|
fcntl_setfd(&read_pipe, fcntl_getfd(&read_pipe)? | FdFlags::CLOEXEC)?;
|
||||||
fcntl_setfd(&write_pipe, fcntl_getfd(&write_pipe)? | FdFlags::CLOEXEC)?;
|
fcntl_setfd(&write_pipe, fcntl_getfd(&write_pipe)? | FdFlags::CLOEXEC)?;
|
||||||
io::Result::Ok((read_pipe, write_pipe))
|
io::Result::Ok((read_pipe, write_pipe))
|
||||||
})?;
|
};
|
||||||
|
|
||||||
|
#[cfg(not(target_os = "haiku"))]
|
||||||
|
let (read_pipe, write_pipe) = pipe_with(PipeFlags::CLOEXEC).or_else(fallback_pipe)?;
|
||||||
|
|
||||||
|
#[cfg(target_os = "haiku")]
|
||||||
|
let (read_pipe, write_pipe) = fallback_pipe(PipeFlags::CLOEXEC)?;
|
||||||
|
|
||||||
// Put the reading side into non-blocking mode.
|
// Put the reading side into non-blocking mode.
|
||||||
fcntl_setfl(&read_pipe, fcntl_getfl(&read_pipe)? | OFlags::NONBLOCK)?;
|
fcntl_setfl(&read_pipe, fcntl_getfl(&read_pipe)? | OFlags::NONBLOCK)?;
|
||||||
|
@ -489,7 +685,7 @@ mod notify {
|
||||||
self.read_pipe.as_fd()
|
self.read_pipe.as_fd()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Provides the poll flags to be used when registering the read half of the botify pipe with the `Poller`.
|
/// Provides the poll flags to be used when registering the read half of the notify pipe with the `Poller`.
|
||||||
pub(super) fn poll_flags(&self) -> PollFlags {
|
pub(super) fn poll_flags(&self) -> PollFlags {
|
||||||
PollFlags::RDNORM
|
PollFlags::RDNORM
|
||||||
}
|
}
|
||||||
|
@ -503,7 +699,25 @@ mod notify {
|
||||||
|
|
||||||
/// Pops a notification (if any) from the pipe.
|
/// Pops a notification (if any) from the pipe.
|
||||||
pub(super) fn pop_notification(&self) -> Result<(), io::Error> {
|
pub(super) fn pop_notification(&self) -> Result<(), io::Error> {
|
||||||
read(&self.read_pipe, &mut [0; 1])?;
|
// Pipes on Vita do not guarantee that after `write` call succeeds, the
|
||||||
|
// data becomes immediately available for reading on the other side of the pipe.
|
||||||
|
// To ensure that the notification is not lost, the read side of the pipe is temporarily
|
||||||
|
// switched to blocking for a single `read` call.
|
||||||
|
#[cfg(target_os = "vita")]
|
||||||
|
rustix::fs::fcntl_setfl(
|
||||||
|
&self.read_pipe,
|
||||||
|
rustix::fs::fcntl_getfl(&self.read_pipe)? & !rustix::fs::OFlags::NONBLOCK,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let result = read(&self.read_pipe, &mut [0; 1]);
|
||||||
|
|
||||||
|
#[cfg(target_os = "vita")]
|
||||||
|
rustix::fs::fcntl_setfl(
|
||||||
|
&self.read_pipe,
|
||||||
|
rustix::fs::fcntl_getfl(&self.read_pipe)? | rustix::fs::OFlags::NONBLOCK,
|
||||||
|
)?;
|
||||||
|
|
||||||
|
result?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -522,20 +736,18 @@ mod notify {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(target_os = "espidf")]
|
#[cfg(any(target_os = "espidf", target_os = "hermit"))]
|
||||||
mod notify {
|
mod notify {
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::mem;
|
use std::mem;
|
||||||
|
|
||||||
use rustix::event::PollFlags;
|
use super::syscall::{
|
||||||
use rustix::event::{eventfd, EventfdFlags};
|
eventfd, read, write, AsFd, AsRawFd, BorrowedFd, EventfdFlags, OwnedFd, PollFlags, RawFd,
|
||||||
|
};
|
||||||
use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
|
|
||||||
use rustix::io::{read, write};
|
|
||||||
|
|
||||||
/// A notification pipe.
|
/// A notification pipe.
|
||||||
///
|
///
|
||||||
/// This implementation uses ther `eventfd` syscall to send notifications.
|
/// This implementation uses the `eventfd` syscall to send notifications.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub(super) struct Notify {
|
pub(super) struct Notify {
|
||||||
/// The file descriptor of the eventfd object. This is also stored as the first
|
/// The file descriptor of the eventfd object. This is also stored as the first
|
||||||
|
@ -559,13 +771,20 @@ mod notify {
|
||||||
// (1) is not a problem for us, as we want the eventfd() file descriptor to be in a non-blocking mode anyway
|
// (1) is not a problem for us, as we want the eventfd() file descriptor to be in a non-blocking mode anyway
|
||||||
// (2) is also not a problem, as long as we don't try to read the counter value in an endless loop when we detect being notified
|
// (2) is also not a problem, as long as we don't try to read the counter value in an endless loop when we detect being notified
|
||||||
|
|
||||||
#[cfg(not(target_os = "espidf"))]
|
|
||||||
let flags = EventfdFlags::NONBLOCK;
|
|
||||||
|
|
||||||
#[cfg(target_os = "espidf")]
|
|
||||||
let flags = EventfdFlags::empty();
|
let flags = EventfdFlags::empty();
|
||||||
|
let event_fd = eventfd(0, flags).map_err(|err| {
|
||||||
let event_fd = eventfd(0, flags)?;
|
match io::Error::from(err) {
|
||||||
|
err if err.kind() == io::ErrorKind::PermissionDenied => {
|
||||||
|
// EPERM can happen if the eventfd isn't initialized yet.
|
||||||
|
// Tell the user to call esp_vfs_eventfd_register.
|
||||||
|
io::Error::new(
|
||||||
|
io::ErrorKind::PermissionDenied,
|
||||||
|
"failed to initialize eventfd for polling, try calling `esp_vfs_eventfd_register`"
|
||||||
|
)
|
||||||
|
},
|
||||||
|
err => err,
|
||||||
|
}
|
||||||
|
})?;
|
||||||
|
|
||||||
Ok(Self { event_fd })
|
Ok(Self { event_fd })
|
||||||
}
|
}
|
||||||
|
@ -582,14 +801,14 @@ mod notify {
|
||||||
|
|
||||||
/// Notifies the `Poller` instance via the eventfd file descriptor.
|
/// Notifies the `Poller` instance via the eventfd file descriptor.
|
||||||
pub(super) fn notify(&self) -> Result<(), io::Error> {
|
pub(super) fn notify(&self) -> Result<(), io::Error> {
|
||||||
write(&self.event_fd, &1u64.to_ne_bytes())?;
|
write(self.event_fd.as_fd(), &1u64.to_ne_bytes())?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Pops a notification (if any) from the eventfd file descriptor.
|
/// Pops a notification (if any) from the eventfd file descriptor.
|
||||||
pub(super) fn pop_notification(&self) -> Result<(), io::Error> {
|
pub(super) fn pop_notification(&self) -> Result<(), io::Error> {
|
||||||
read(&self.event_fd, &mut [0; mem::size_of::<u64>()])?;
|
read(self.event_fd.as_fd(), &mut [0; mem::size_of::<u64>()])?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
10
src/port.rs
10
src/port.rs
|
@ -250,4 +250,14 @@ impl EventExtra {
|
||||||
pub fn is_pri(&self) -> bool {
|
pub fn is_pri(&self) -> bool {
|
||||||
self.flags.contains(PollFlags::PRI)
|
self.flags.contains(PollFlags::PRI)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn is_connect_failed(&self) -> Option<bool> {
|
||||||
|
Some(self.flags.contains(PollFlags::ERR) && self.flags.contains(PollFlags::HUP))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[inline]
|
||||||
|
pub fn is_err(&self) -> Option<bool> {
|
||||||
|
Some(self.flags.contains(PollFlags::ERR))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,6 +76,53 @@ fn concurrent_modify() -> io::Result<()> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(all(unix, not(target_os = "vita")))]
|
||||||
|
#[test]
|
||||||
|
fn concurrent_interruption() -> io::Result<()> {
|
||||||
|
struct MakeItSend<T>(T);
|
||||||
|
unsafe impl<T> Send for MakeItSend<T> {}
|
||||||
|
|
||||||
|
let (reader, _writer) = tcp_pair()?;
|
||||||
|
let poller = Poller::new()?;
|
||||||
|
unsafe {
|
||||||
|
poller.add(&reader, Event::none(0))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut events = Events::new();
|
||||||
|
let events_borrow = &mut events;
|
||||||
|
let (sender, receiver) = std::sync::mpsc::channel();
|
||||||
|
|
||||||
|
Parallel::new()
|
||||||
|
.add(move || {
|
||||||
|
// Register a signal handler so that the syscall is actually interrupted. A signal that
|
||||||
|
// is ignored by default does not cause an interrupted syscall.
|
||||||
|
signal_hook::flag::register(signal_hook::consts::signal::SIGURG, Default::default())?;
|
||||||
|
|
||||||
|
// Signal to the other thread how to send a signal to us
|
||||||
|
sender
|
||||||
|
.send(MakeItSend(unsafe { libc::pthread_self() }))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
poller.wait(events_borrow, Some(Duration::from_secs(1)))?;
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.add(move || {
|
||||||
|
let MakeItSend(target_thread) = receiver.recv().unwrap();
|
||||||
|
thread::sleep(Duration::from_millis(100));
|
||||||
|
assert_eq!(0, unsafe {
|
||||||
|
libc::pthread_kill(target_thread, libc::SIGURG)
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.run()
|
||||||
|
.into_iter()
|
||||||
|
.collect::<io::Result<()>>()?;
|
||||||
|
|
||||||
|
assert_eq!(events.len(), 0);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> {
|
fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> {
|
||||||
let listener = TcpListener::bind("127.0.0.1:0")?;
|
let listener = TcpListener::bind("127.0.0.1:0")?;
|
||||||
let a = TcpStream::connect(listener.local_addr()?)?;
|
let a = TcpStream::connect(listener.local_addr()?)?;
|
||||||
|
|
43
tests/io.rs
43
tests/io.rs
|
@ -1,6 +1,7 @@
|
||||||
use polling::{Event, Events, Poller};
|
use polling::{Event, Events, Poller};
|
||||||
use std::io::{self, Write};
|
use std::io::{self, Write};
|
||||||
use std::net::{TcpListener, TcpStream};
|
use std::net::{TcpListener, TcpStream};
|
||||||
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
|
@ -38,6 +39,48 @@ fn basic_io() {
|
||||||
poller.delete(&read).unwrap();
|
poller.delete(&read).unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn insert_twice() {
|
||||||
|
#[cfg(unix)]
|
||||||
|
use std::os::unix::io::AsRawFd;
|
||||||
|
#[cfg(windows)]
|
||||||
|
use std::os::windows::io::AsRawSocket;
|
||||||
|
|
||||||
|
let (read, mut write) = tcp_pair().unwrap();
|
||||||
|
let read = Arc::new(read);
|
||||||
|
|
||||||
|
let poller = Poller::new().unwrap();
|
||||||
|
unsafe {
|
||||||
|
#[cfg(unix)]
|
||||||
|
let read = read.as_raw_fd();
|
||||||
|
#[cfg(windows)]
|
||||||
|
let read = read.as_raw_socket();
|
||||||
|
|
||||||
|
poller.add(read, Event::readable(1)).unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
poller.add(read, Event::readable(1)).unwrap_err().kind(),
|
||||||
|
io::ErrorKind::AlreadyExists
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
write.write_all(&[1]).unwrap();
|
||||||
|
let mut events = Events::new();
|
||||||
|
assert_eq!(
|
||||||
|
poller
|
||||||
|
.wait(&mut events, Some(Duration::from_secs(1)))
|
||||||
|
.unwrap(),
|
||||||
|
1
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(events.len(), 1);
|
||||||
|
assert_eq!(
|
||||||
|
events.iter().next().unwrap().with_no_extra(),
|
||||||
|
Event::readable(1)
|
||||||
|
);
|
||||||
|
|
||||||
|
poller.delete(&read).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> {
|
fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> {
|
||||||
let listener = TcpListener::bind("127.0.0.1:0")?;
|
let listener = TcpListener::bind("127.0.0.1:0")?;
|
||||||
let a = TcpStream::connect(listener.local_addr()?)?;
|
let a = TcpStream::connect(listener.local_addr()?)?;
|
||||||
|
|
Loading…
Reference in New Issue