Compare commits

...

36 Commits

Author SHA1 Message Date
John Nunley cf2d60efca
ci: Use latest stable wine in testing
This works around bugs in Rust v1.78 that introduce incompatibilities
into Wine.

https://github.com/smol-rs/polling/pull/201#issuecomment-2092385046

Signed-off-by: John Nunley <dev@notgull.net>
2024-05-03 17:55:23 -07:00
John Nunley 0b4afcaf0a
ci: Remove +nightly and use default
Previously in the "cross" CI job, checks would use "+nightly" to ensure
that the nightly compiler is used. However this adds a lot of noise. In
order to clean up CI this commit replaces "+nightly" with "rustup
default nightly" at the start of the job.

cc https://github.com/smol-rs/polling/pull/197#discussion_r1573375732

Signed-off-by: John Nunley <dev@notgull.net>
2024-04-22 18:36:07 -07:00
John Nunley eb9d92a2e0
v3.7.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-04-22 16:33:52 -07:00
Nikolay Arhipov 9e46c8455c
feat: ported to Vita target
Fixes #160
2024-04-20 11:22:46 -07:00
John Nunley 1c16a1e4af
v3.6.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-03-23 20:42:01 -07:00
irvingouj @ Devolutions e25b3b4e4c
feat: Replace is_connect_failed with is_err
In linux, epoll, EPOLLHUP may happen even if no connection call is made. It
would confuse callers for what is actually happening.

Replaced is_connect_failed, and we detect if connection failed by using the
combination of is_err and is_interrupt, please see the example, tcp_client
2024-03-20 22:04:46 -07:00
John Nunley 50454d1cea
feat: Add support for HermitOS
HermitOS is a microkernel target aiming to provide a simple OS for
virtualized applications. It recently added support for the poll() and
eventfd() system calls, which means we can target it with our poll()
based backend.

Hermit does not have a traditional libc; instead it uses hermit-abi.
However rustix does not support using hermit-abi as its underlying
layer yet. So we have to build a shim layer until it does.

Closes #177
cc bytecodealliance/rustix#1012

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-12 21:33:06 -07:00
John Nunley 634a77c264 bugfix: Remove CallOnDrop from port.rs
CallOnDrop is no longer used in the Windows IOCP backend, and it wasn't
being flagged as unused until the latest nightly. In order to fix
Windows builds, this commit removes CallOnDrop.

Signed-off-by: John Nunley <dev@notgull.net>
2024-03-12 20:46:02 -07:00
John Nunley 4d64fdc572
v3.5.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-02-17 22:02:48 -08:00
John Nunley 77b4ed1156
feat: On RedoxOS, use epoll instead of the poll backend
Technically RedoxOS supports the poll syscall, so we already support
RedoxOS. However, this is very slow. This commit ports this code to
epoll, which should be more efficient.

Closes #176
2024-02-11 08:31:13 -08:00
John Nunley ac7fbcae31
v3.4.0
Signed-off-by: John Nunley <dev@notgull.net>
2024-02-05 19:18:30 -08:00
John Nunley 24e3691794
feat: Add a way to wait on process by PID
This is needed to make processes work in `async-process`.

Signed-off-by: John Nunley <dev@notgull.net>
2024-02-01 06:34:02 -08:00
John Nunley 62430fd56e Annotate ESP-IDF EPERM error with eventfd info
If eventfd isn't initialized, `Polling::new` will fail with an EPERM
error. You need to call the "esp_vfs_eventfd_register" function to
initialize the eventfd subsystem in ESP-IDF. This commit indicates to
the user that this needs to happen.

Signed-off-by: John Nunley <dev@notgull.net>
2024-01-27 21:17:58 -08:00
John Nunley ae484a0a12 tests: Fix clippy error in wait-signal
Signed-off-by: John Nunley <dev@notgull.net>
2024-01-27 21:17:58 -08:00
irvingouj @ Devolutions cf25dd85f8
feat: Add the ability to identify if the connection has failed 2024-01-26 12:58:39 -08:00
John Nunley 6125508c93
v3.3.2
Signed-off-by: John Nunley <dev@notgull.net>
2024-01-14 09:00:57 -08:00
John Nunley ea5a38a500
feat(windows): AFD failure now sources underlying I/O error
Previously, if AFD failed to initialize `polling` would return a custom
I/O error with a string error, containing the formatted version of the
underlying system error. However, this means that information about the
underlying system error is lost to the user.

This commit makes it so the returned `io::Error` wraps a user
inaccessible type: `AfdError`. This `AfdError`, when stringified,
returns a similar error message as what was previously returned. In
addition when `.source()` is used it returns the underlying system
error.

Closes #174

Signed-off-by: John Nunley <dev@notgull.net>
2024-01-08 16:34:13 -08:00
John Nunley 1f13664bbb
ci: Add async-io tests back to CI
Closes #145

Signed-off-by: John Nunley <dev@notgull.net>
2024-01-08 08:23:02 -08:00
Taiki Endo 0c794fce50 Ignore dead_code warning for tuple struct
This lint does not take into account destructors.

```
error: field `0` is never read
    --> src\iocp\mod.rs:1155:13
     |
1155 |     Waiting(WaitHandle),
     |     ------- ^^^^^^^^^^
     |     |
     |     field in this variant
     |
     = note: `-D dead-code` implied by `-D warnings`
     = help: to override `-D warnings` add `#[allow(dead_code)]`
help: consider changing the field to be of unit type to suppress this warning while preserving the field numbering, or remove the field
     |
1155 |     Waiting(()),
     |             ~~
```
2024-01-07 16:11:02 +09:00
Taiki Endo 94c5ebf78b ci: Temporarily disable riscv32imc-esp-espidf build 2024-01-07 16:11:02 +09:00
Taiki Endo e956a8ad64 ci: Update FreeBSD image to 13.2
```
pkg install -y git
Updating FreeBSD repository catalogue...
pkg: http://pkgmir.geo.freebsd.org/FreeBSD:12:amd64/quarterly/meta.txz: Not Found
repository FreeBSD has no meta file, using default settings
pkg: http://pkgmir.geo.freebsd.org/FreeBSD:12:amd64/quarterly/packagesite.pkg: Not Found
pkg: http://pkgmir.geo.freebsd.org/FreeBSD:12:amd64/quarterly/packagesite.txz: Not Found
Unable to update repository FreeBSD
Error updating repositories!
```
2024-01-07 16:11:02 +09:00
Taiki Endo 078c478346 ci: Use cargo-hack's --rust-version flag for msrv check
This respects rust-version field in Cargo.toml, so it removes the need
to manage MSRV in both the CI file and Cargo.toml.
2024-01-07 16:11:02 +09:00
John Nunley b57a7c32a2
v3.3.1
Signed-off-by: John Nunley <dev@notgull.net>
2023-11-24 08:22:27 -08:00
dependabot[bot] 08a316e1fc
m: Update windows-sys requirement from 0.48 to 0.52
* Update windows-sys requirement from 0.48 to 0.52

Updates the requirements on [windows-sys](https://github.com/microsoft/windows-rs) to permit the latest version.
- [Release notes](https://github.com/microsoft/windows-rs/releases)
- [Commits](https://github.com/microsoft/windows-rs/compare/0.48.0...0.52.0)

---
updated-dependencies:
- dependency-name: windows-sys
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

* Correct windows-sys imports

Signed-off-by: John Nunley <dev@notgull.net>

---------

Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: John Nunley <dev@notgull.net>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: John Nunley <dev@notgull.net>
2023-11-24 07:53:04 -08:00
John Nunley 8087787ab2
v3.3.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-10-28 18:54:47 -07:00
John Nunley b9ab821df1
bugfix: Handle interrupts while polling
Previous, `Poller::wait` would bubble signal interruption error to the user.
However, this may be unexpected for simple use cases. Thus, this commit makes
it so, if `ErrorKind::Interrupted` is received by the underlying `wait()` call,
it clears the events and tries to wait again.

This also adds a test for this interruption written by @psychon.

Co-Authored-By: Uli Schlachter <psychon@users.noreply.github.com>
Signed-off-by: John Nunley <dev@notgull.net>
2023-10-27 07:02:08 -07:00
Uli Schlachter 0575cbd4bc
docs: Fix wrong link in docs of Poller::wait()
Once upon a time, this got a Vec as an argument, but that was replaced
with the Events struct. Thus, this should link to Events and not Vec.

Signed-off-by: Uli Schlachter <psychon@znc.in>
2023-10-20 09:49:44 -07:00
Taiki Endo 37a1d4ecd2
Remove needless imports (#159) 2023-10-08 17:23:34 +09:00
Taiki Endo a559165acd
Migrate to Rust 2021 (#158) 2023-10-08 14:46:23 +09:00
tison ceb88a46c4
chore: prefer usize::MAX to std::usize::MAX
Signed-off-by: tison <wander4096@gmail.com>
2023-10-05 20:10:18 -07:00
John Nunley d9a65fdd73
v3.2.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-10-02 07:37:32 -07:00
Al Hoang 99a32b7607
feat: Add support for Haiku OS
Haiku does not support pipe_with at all, so just fall back to pipe().
2023-10-01 20:57:41 -07:00
John Nunley 9e143a38e1
bugfix: Manage sources being inserted into kqueue
Thus far, our kqueue implementation has been a relatively thin layer on
top of the OS kqueue. However, kqueue doesn't keep track of when the
same source is inserted twice, or when a source that doesn't exist is
removed. In the interest of keeping consistent behavior between backends
this commit adds a system for tracking when sources are inserted.

Closes #151

Signed-off-by: John Nunley <dev@notgull.net>
2023-09-27 21:30:46 -07:00
John Nunley 45ebe3b904
v3.1.0
Signed-off-by: John Nunley <dev@notgull.net>
2023-09-25 09:42:41 -07:00
David Hotham 254577da8d
feat: introduce Event::new()
This makes it easier to construct Events.
2023-09-12 06:02:02 -07:00
Taiki Endo 8c99506375 Update actions/checkout action to v4 2023-09-10 18:28:12 +09:00
21 changed files with 916 additions and 167 deletions

View File

@ -14,7 +14,7 @@ env:
freebsd_task:
name: test ($TARGET)
freebsd_instance:
image_family: freebsd-12-4
image_family: freebsd-13-2
matrix:
- env:
TARGET: x86_64-unknown-freebsd

View File

@ -41,7 +41,7 @@ jobs:
- os: windows-latest
rust: nightly-i686-pc-windows-gnu
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install Rust
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
@ -62,12 +62,18 @@ jobs:
RUSTFLAGS: ${{ env.RUSTFLAGS }} --cfg polling_test_epoll_pipe
if: startsWith(matrix.os, 'ubuntu')
- run: cargo hack build --feature-powerset --no-dev-deps
- name: Add rust-src
if: startsWith(matrix.rust, 'nightly')
run: rustup component add rust-src
- name: Check selected Tier 3 targets
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
run: cargo check -Z build-std --target=riscv32imc-esp-espidf
# TODO: broken due to https://github.com/rust-lang/rust/pull/119026.
# - name: Check selected Tier 3 targets
# if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
# run: cargo check -Z build-std --target=riscv32imc-esp-espidf
- name: Clone async-io
run: git clone https://github.com/smol-rs/async-io.git
# The async-io Cargo.toml already has a patch section at the bottom, so we
# can just add this.
- name: Patch polling
run: echo 'polling = { path = ".." }' >> async-io/Cargo.toml
- name: Test async-io
run: cargo test --manifest-path=async-io/Cargo.toml
cross:
runs-on: ${{ matrix.os }}
@ -75,12 +81,16 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest]
rust: [nightly, stable]
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }}
- name: Install cross
uses: taiki-e/install-action@cross
- name: Add rust-src
if: startsWith(matrix.rust, 'nightly')
run: rustup component add rust-src
# We don't test BSDs, since we already test them in Cirrus.
- name: Android
if: startsWith(matrix.os, 'ubuntu')
@ -105,17 +115,30 @@ jobs:
run: |
rustup target add x86_64-unknown-illumos
cargo build --target x86_64-unknown-illumos
- name: Redox
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
run: |
rustup target add x86_64-unknown-redox
cargo check --target x86_64-unknown-redox
- name: HermitOS
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
run: cargo check -Z build-std --target x86_64-unknown-hermit
- name: Check haiku
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
run: cargo check -Z build-std --target x86_64-unknown-haiku
- name: Check vita
if: startsWith(matrix.rust, 'nightly') && matrix.os == 'ubuntu-latest'
run: cargo check -Z build-std --target armv7-sony-vita-newlibeabihf
wine:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- uses: taiki-e/setup-cross-toolchain-action@v1
with:
target: x86_64-pc-windows-gnu
runner: wine@7.13
- run: cargo test --target x86_64-pc-windows-gnu
msrv:
@ -124,27 +147,20 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-latest, windows-latest]
# When updating this, the reminder to update the minimum supported
# Rust version in Cargo.toml.
rust: ['1.63']
steps:
- uses: actions/checkout@v3
- name: Install Rust
# --no-self-update is necessary because the windows environment cannot self-update rustup.exe.
run: rustup update ${{ matrix.rust }} --no-self-update && rustup default ${{ matrix.rust }}
- run: cargo build
- name: Install Other Targets
- uses: actions/checkout@v4
- name: Install cargo-hack
uses: taiki-e/install-action@cargo-hack
- run: cargo hack build --no-dev-deps --rust-version
- run: cargo hack build --no-dev-deps --rust-version --target x86_64-unknown-freebsd
if: startsWith(matrix.os, 'ubuntu')
run: rustup target add x86_64-unknown-freebsd x86_64-unknown-netbsd
- run: cargo build --target x86_64-unknown-freebsd
if: startsWith(matrix.os, 'ubuntu')
- run: cargo build --target x86_64-unknown-netbsd
- run: cargo hack build --no-dev-deps --rust-version --target x86_64-unknown-netbsd
if: startsWith(matrix.os, 'ubuntu')
clippy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- run: cargo clippy --all-features --all-targets
@ -152,7 +168,7 @@ jobs:
fmt:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install Rust
run: rustup update stable
- run: cargo fmt --all --check
@ -164,7 +180,7 @@ jobs:
issues: write
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
# https://github.com/rustsec/audit-check/issues/2
- uses: rustsec/audit-check@master
with:

View File

@ -13,7 +13,7 @@ jobs:
if: github.repository_owner == 'smol-rs'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: taiki-e/create-gh-release-action@v1
with:
changelog: CHANGELOG.md

View File

@ -1,3 +1,50 @@
# Version 3.7.0
- Add support for the PS Vita as a platform. (#160)
# Version 3.6.0
- Add an `is_err` method to `Event` to tell when an error has occurred. (#189)
- Deprecate the `is_connect_failed` function. (#189)
- Add support for HermitOS to `polling`. (#194)
# Version 3.5.0
- Use the `epoll` backend when RedoxOS is enabled. (#190)
# Version 3.4.0
- Add the ability to identify whether socket connection has failed. (#185)
- On BSD, add the ability to wait on a process by its PID. Previously, it was
only possible to wait on a process by a `Child` object. (#180)
- On ESP-IDF, annotate `eventfd` initialization failures with a message
indicating the source of those failures. (#186)
# Version 3.3.2
- When AFD fails to initialize, the resulting error now references
the underlying system error. (#174)
# Version 3.3.1
- Bump `windows-sys` to v0.52.0. (#169)
# Version 3.3.0
- Automatically restarts polling when `ErrorKind::Interrupted` is returned, rather than relying on the user to handle it. (#164)
- Fix bad link in documentation for `Poller::wait()`. (#163)
# Version 3.2.0
- The `kqueue` backend previously allowed the following operations that other backends forbid. Now these operations result in an error: (#153)
- Inserting a source that was already inserted.
- Modifying/deleting a source that was not already inserted.
- Add support for Haiku OS. (#154)
# Version 3.1.0
- Add an `Event::new()` constructor to simplify creating `Event`s. (#149)
# Version 3.0.0
- Replace `libc` in all backends with the `rustix` crate (#108).

View File

@ -3,9 +3,9 @@ name = "polling"
# When publishing a new version:
# - Update CHANGELOG.md
# - Create "v3.x.y" git tag
version = "3.0.0"
version = "3.7.0"
authors = ["Stjepan Glavina <stjepang@gmail.com>", "John Nunley <dev@notgull.net>"]
edition = "2018"
edition = "2021"
rust-version = "1.63"
description = "Portable interface to epoll, kqueue, event ports, and IOCP"
license = "Apache-2.0 OR MIT"
@ -21,16 +21,20 @@ rustdoc-args = ["--cfg", "docsrs"]
cfg-if = "1"
tracing = { version = "0.1.37", default-features = false }
[target.'cfg(any(unix, target_os = "fuchsia", target_os = "vxworks"))'.dependencies]
rustix = { version = "0.38.8", features = ["event", "fs", "pipe", "process", "std", "time"], default-features = false }
[target.'cfg(any(unix, target_os = "fuchsia", target_os = "vxworks"))'.dependencies.rustix]
version = "0.38.31"
features = ["event", "fs", "pipe", "process", "std", "time"]
default-features = false
[target.'cfg(windows)'.dependencies]
concurrent-queue = "2.2.0"
pin-project-lite = "0.2.9"
[target.'cfg(windows)'.dependencies.windows-sys]
version = "0.48"
version = "0.52"
features = [
"Wdk_Foundation",
"Wdk_Storage_FileSystem",
"Win32_Foundation",
"Win32_Networking_WinSock",
"Win32_Security",
@ -41,6 +45,16 @@ features = [
"Win32_System_WindowsProgramming",
]
[target.'cfg(target_os = "hermit")'.dependencies.hermit-abi]
version = "0.3.9"
[dev-dependencies]
easy-parallel = "3.1.0"
fastrand = "2.0.0"
socket2 = "0.5.5"
[target.'cfg(unix)'.dev-dependencies]
libc = "0.2"
[target.'cfg(all(unix, not(target_os="vita")))'.dev-dependencies]
signal-hook = "0.3.17"

View File

@ -12,11 +12,11 @@ https://docs.rs/polling)
Portable interface to epoll, kqueue, event ports, and IOCP.
Supported platforms:
- [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android
- [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android, RedoxOS
- [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD,
DragonFly BSD
- [event ports](https://illumos.org/man/port_create): illumos, Solaris
- [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, other Unix systems
- [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, HermitOS, other Unix systems
- [IOCP](https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports): Windows, Wine (version 7.13+)
Polling is done in oneshot mode, which means interest in I/O events needs to be reset after

36
examples/tcp_client.rs Normal file
View File

@ -0,0 +1,36 @@
use std::{io, net};
use polling::Event;
use socket2::Type;
fn main() -> io::Result<()> {
let socket = socket2::Socket::new(socket2::Domain::IPV4, Type::STREAM, None)?;
let poller = polling::Poller::new()?;
unsafe {
poller.add(&socket, Event::new(0, true, true))?;
}
let addr = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 8080);
socket.set_nonblocking(true)?;
let _ = socket.connect(&addr.into());
let mut events = polling::Events::new();
events.clear();
poller.wait(&mut events, None)?;
let event = events.iter().next();
let event = match event {
Some(event) => event,
None => {
println!("no event");
return Ok(());
}
};
println!("event: {:?}", event);
if event.is_err().unwrap_or(false) {
println!("connect failed");
}
Ok(())
}

View File

@ -27,22 +27,16 @@ mod example {
println!("Press Ctrl+C to exit...");
loop {
// Wait for events.
poller.wait(&mut events, None).unwrap();
// Wait for events.
poller.wait(&mut events, None).unwrap();
// Process events.
for ev in events.iter() {
match ev.key {
1 => {
println!("SIGINT received");
return;
}
_ => unreachable!(),
}
// Process events.
let ev = events.iter().next().unwrap();
match ev.key {
1 => {
println!("SIGINT received");
}
events.clear();
_ => unreachable!(),
}
}
}

View File

@ -1,20 +1,23 @@
//! Bindings to epoll (Linux, Android).
use std::convert::TryInto;
use std::io;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
use std::time::Duration;
use rustix::event::{epoll, eventfd, EventfdFlags};
use rustix::fd::OwnedFd;
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
use rustix::pipe::{pipe, pipe_with, PipeFlags};
#[cfg(not(target_os = "redox"))]
use rustix::event::{eventfd, EventfdFlags};
#[cfg(not(target_os = "redox"))]
use rustix::time::{
timerfd_create, timerfd_settime, Itimerspec, TimerfdClockId, TimerfdFlags, TimerfdTimerFlags,
Timespec,
};
use rustix::event::epoll;
use rustix::fd::OwnedFd;
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
use rustix::pipe::{pipe, pipe_with, PipeFlags};
use crate::{Event, PollMode};
/// Interface to epoll.
@ -27,6 +30,9 @@ pub struct Poller {
notifier: Notifier,
/// File descriptor for the timerfd that produces timeouts.
///
/// Redox does not support timerfd.
#[cfg(not(target_os = "redox"))]
timer_fd: Option<OwnedFd>,
}
@ -40,6 +46,7 @@ impl Poller {
// Set up notifier and timerfd.
let notifier = Notifier::new()?;
#[cfg(not(target_os = "redox"))]
let timer_fd = timerfd_create(
TimerfdClockId::Monotonic,
TimerfdFlags::CLOEXEC | TimerfdFlags::NONBLOCK,
@ -49,10 +56,12 @@ impl Poller {
let poller = Poller {
epoll_fd,
notifier,
#[cfg(not(target_os = "redox"))]
timer_fd,
};
unsafe {
#[cfg(not(target_os = "redox"))]
if let Some(ref timer_fd) = poller.timer_fd {
poller.add(
timer_fd.as_raw_fd(),
@ -71,7 +80,6 @@ impl Poller {
tracing::trace!(
epoll_fd = ?poller.epoll_fd.as_raw_fd(),
notifier = ?poller.notifier,
timer_fd = ?poller.timer_fd,
"new",
);
Ok(poller)
@ -156,6 +164,7 @@ impl Poller {
);
let _enter = span.enter();
#[cfg(not(target_os = "redox"))]
if let Some(ref timer_fd) = self.timer_fd {
// Configure the timeout using timerfd.
let new_val = Itimerspec {
@ -182,8 +191,13 @@ impl Poller {
)?;
}
#[cfg(not(target_os = "redox"))]
let timer_fd = &self.timer_fd;
#[cfg(target_os = "redox")]
let timer_fd: Option<core::convert::Infallible> = None;
// Timeout in milliseconds for epoll.
let timeout_ms = match (&self.timer_fd, timeout) {
let timeout_ms = match (timer_fd, timeout) {
(_, Some(t)) if t == Duration::from_secs(0) => 0,
(None, Some(t)) => {
// Round up to a whole millisecond.
@ -246,10 +260,10 @@ impl Drop for Poller {
"drop",
epoll_fd = ?self.epoll_fd.as_raw_fd(),
notifier = ?self.notifier,
timer_fd = ?self.timer_fd
);
let _enter = span.enter();
#[cfg(not(target_os = "redox"))]
if let Some(timer_fd) = self.timer_fd.take() {
let _ = self.delete(timer_fd.as_fd());
}
@ -258,6 +272,7 @@ impl Drop for Poller {
}
/// `timespec` value that equals zero.
#[cfg(not(target_os = "redox"))]
const TS_ZERO: Timespec = unsafe { std::mem::transmute([0u8; std::mem::size_of::<Timespec>()]) };
/// Get the EPOLL flags for the interest.
@ -366,6 +381,19 @@ impl EventExtra {
pub fn is_pri(&self) -> bool {
self.flags.contains(epoll::EventFlags::PRI)
}
#[inline]
pub fn is_connect_failed(&self) -> Option<bool> {
Some(
self.flags.contains(epoll::EventFlags::ERR)
&& self.flags.contains(epoll::EventFlags::HUP),
)
}
#[inline]
pub fn is_err(&self) -> Option<bool> {
Some(self.flags.contains(epoll::EventFlags::ERR))
}
}
/// The notifier for Linux.
@ -378,6 +406,7 @@ impl EventExtra {
#[derive(Debug)]
enum Notifier {
/// The primary notifier, using eventfd.
#[cfg(not(target_os = "redox"))]
EventFd(OwnedFd),
/// The fallback notifier, using a pipe.
@ -394,19 +423,22 @@ impl Notifier {
/// Create a new notifier.
fn new() -> io::Result<Self> {
// Skip eventfd for testing if necessary.
if !cfg!(polling_test_epoll_pipe) {
// Try to create an eventfd.
match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) {
Ok(fd) => {
tracing::trace!("created eventfd for notifier");
return Ok(Notifier::EventFd(fd));
}
#[cfg(not(target_os = "redox"))]
{
if !cfg!(polling_test_epoll_pipe) {
// Try to create an eventfd.
match eventfd(0, EventfdFlags::CLOEXEC | EventfdFlags::NONBLOCK) {
Ok(fd) => {
tracing::trace!("created eventfd for notifier");
return Ok(Notifier::EventFd(fd));
}
Err(err) => {
tracing::warn!(
"eventfd() failed with error ({}), falling back to pipe",
err
);
Err(err) => {
tracing::warn!(
"eventfd() failed with error ({}), falling back to pipe",
err
);
}
}
}
}
@ -428,6 +460,7 @@ impl Notifier {
/// The file descriptor to register in the poller.
fn as_fd(&self) -> BorrowedFd<'_> {
match self {
#[cfg(not(target_os = "redox"))]
Notifier::EventFd(fd) => fd.as_fd(),
Notifier::Pipe {
read_pipe: read, ..
@ -438,6 +471,7 @@ impl Notifier {
/// Notify the poller.
fn notify(&self) {
match self {
#[cfg(not(target_os = "redox"))]
Self::EventFd(fd) => {
let buf: [u8; 8] = 1u64.to_ne_bytes();
let _ = write(fd, &buf);
@ -452,6 +486,7 @@ impl Notifier {
/// Clear the notification.
fn clear(&self) {
match self {
#[cfg(not(target_os = "redox"))]
Self::EventFd(fd) => {
let mut buf = [0u8; 8];
let _ = read(fd, &mut buf);

View File

@ -14,6 +14,8 @@ use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Once;
use windows_sys::Wdk::Foundation::OBJECT_ATTRIBUTES;
use windows_sys::Wdk::Storage::FileSystem::FILE_OPEN;
use windows_sys::Win32::Foundation::{
CloseHandle, HANDLE, HMODULE, NTSTATUS, STATUS_NOT_FOUND, STATUS_PENDING, STATUS_SUCCESS,
UNICODE_STRING,
@ -21,11 +23,9 @@ use windows_sys::Win32::Foundation::{
use windows_sys::Win32::Networking::WinSock::{
WSAIoctl, SIO_BASE_HANDLE, SIO_BSP_HANDLE_POLL, SOCKET_ERROR,
};
use windows_sys::Win32::Storage::FileSystem::{
FILE_OPEN, FILE_SHARE_READ, FILE_SHARE_WRITE, SYNCHRONIZE,
};
use windows_sys::Win32::Storage::FileSystem::{FILE_SHARE_READ, FILE_SHARE_WRITE, SYNCHRONIZE};
use windows_sys::Win32::System::LibraryLoader::{GetModuleHandleW, GetProcAddress};
use windows_sys::Win32::System::WindowsProgramming::{IO_STATUS_BLOCK, OBJECT_ATTRIBUTES};
use windows_sys::Win32::System::IO::IO_STATUS_BLOCK;
#[derive(Default)]
#[repr(C)]

View File

@ -46,7 +46,6 @@ use pin_project_lite::pin_project;
use std::cell::UnsafeCell;
use std::collections::hash_map::{Entry, HashMap};
use std::convert::TryFrom;
use std::ffi::c_void;
use std::fmt;
use std::io;
@ -122,17 +121,19 @@ impl Poller {
pub(super) fn new() -> io::Result<Self> {
// Make sure AFD is able to be used.
if let Err(e) = afd::NtdllImports::force_load() {
return Err(crate::unsupported_error(format!(
"Failed to initialize unstable Windows functions: {}\nThis usually only happens for old Windows or Wine.",
e
)));
return Err(io::Error::new(
io::ErrorKind::Unsupported,
AfdError::new("failed to initialize unstable Windows functions", e),
));
}
// Create and destroy a single AFD to test if we support it.
Afd::<Packet>::new().map_err(|e| crate::unsupported_error(format!(
"Failed to initialize \\Device\\Afd: {}\nThis usually only happens for old Windows or Wine.",
e,
)))?;
Afd::<Packet>::new().map_err(|e| {
io::Error::new(
io::ErrorKind::Unsupported,
AfdError::new("failed to initialize \\Device\\Afd", e),
)
})?;
let port = IoCompletionPort::new(0)?;
tracing::trace!(handle = ?port, "new");
@ -680,6 +681,18 @@ impl EventExtra {
pub fn set_pri(&mut self, active: bool) {
self.flags.set(AfdPollMask::RECEIVE_EXPEDITED, active);
}
/// Check if TCP connect failed. Deprecated.
#[inline]
pub fn is_connect_failed(&self) -> Option<bool> {
Some(self.flags.intersects(AfdPollMask::CONNECT_FAIL))
}
/// Check if TCP connect failed.
#[inline]
pub fn is_err(&self) -> Option<bool> {
Some(self.flags.intersects(AfdPollMask::CONNECT_FAIL))
}
}
/// A packet used to wake up the poller with an event.
@ -1153,7 +1166,7 @@ enum WaitableStatus {
Idle,
/// We are waiting on this handle to become signaled.
Waiting(WaitHandle),
Waiting(#[allow(dead_code)] WaitHandle),
/// This handle has been cancelled.
Cancelled,
@ -1327,6 +1340,54 @@ fn dur2timeout(dur: Duration) -> u32 {
.unwrap_or(INFINITE)
}
/// An error type that wraps around failing to open AFD.
struct AfdError {
/// String description of what happened.
description: &'static str,
/// The underlying system error.
system: io::Error,
}
impl AfdError {
#[inline]
fn new(description: &'static str, system: io::Error) -> Self {
Self {
description,
system,
}
}
}
impl fmt::Debug for AfdError {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AfdError")
.field("description", &self.description)
.field("system", &self.system)
.field("note", &"probably caused by old Windows or Wine")
.finish()
}
}
impl fmt::Display for AfdError {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}: {}\nThis error is usually caused by running on old Windows or Wine",
self.description, &self.system
)
}
}
impl std::error::Error for AfdError {
#[inline]
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
Some(&self.system)
}
}
struct CallOnDrop<F: FnMut()>(F);
impl<F: FnMut()> Drop for CallOnDrop<F> {

View File

@ -2,7 +2,6 @@
use super::dur2timeout;
use std::convert::TryInto;
use std::fmt;
use std::io;
use std::marker::PhantomData;
@ -297,11 +296,3 @@ impl<T: CompletionHandle> Drop for OverlappedEntry<T> {
drop(unsafe { self.packet() });
}
}
struct CallOnDrop<F: FnMut()>(F);
impl<F: FnMut()> Drop for CallOnDrop<F> {
fn drop(&mut self) {
(self.0)();
}
}

View File

@ -1,7 +1,9 @@
//! Bindings to kqueue (macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
use std::collections::HashSet;
use std::io;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
use std::sync::RwLock;
use std::time::Duration;
use rustix::event::kqueue;
@ -15,6 +17,11 @@ pub struct Poller {
/// File descriptor for the kqueue instance.
kqueue_fd: OwnedFd,
/// List of sources currently registered in this poller.
///
/// This is used to make sure the same source is not registered twice.
sources: RwLock<HashSet<SourceId>>,
/// Notification pipe for waking up the poller.
///
/// On platforms that support `EVFILT_USER`, this uses that to wake up the poller. Otherwise, it
@ -22,6 +29,23 @@ pub struct Poller {
notify: notify::Notify,
}
/// Identifier for a source.
#[doc(hidden)]
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum SourceId {
/// Registered file descriptor.
Fd(RawFd),
/// Signal.
Signal(std::os::raw::c_int),
/// Process ID.
Pid(rustix::process::Pid),
/// Timer ID.
Timer(usize),
}
impl Poller {
/// Creates a new poller.
pub fn new() -> io::Result<Poller> {
@ -31,6 +55,7 @@ impl Poller {
let poller = Poller {
kqueue_fd,
sources: RwLock::new(HashSet::new()),
notify: notify::Notify::new()?,
};
@ -60,6 +85,8 @@ impl Poller {
///
/// The file descriptor must be valid and it must last until it is deleted.
pub unsafe fn add(&self, fd: RawFd, ev: Event, mode: PollMode) -> io::Result<()> {
self.add_source(SourceId::Fd(fd))?;
// File descriptors don't need to be added explicitly, so just modify the interest.
self.modify(BorrowedFd::borrow_raw(fd), ev, mode)
}
@ -79,6 +106,8 @@ impl Poller {
};
let _enter = span.as_ref().map(|s| s.enter());
self.has_source(SourceId::Fd(fd.as_raw_fd()))?;
let mode_flags = mode_to_flags(mode);
let read_flags = if ev.readable {
@ -143,10 +172,57 @@ impl Poller {
Ok(())
}
/// Add a source to the sources set.
#[inline]
pub(crate) fn add_source(&self, source: SourceId) -> io::Result<()> {
if self
.sources
.write()
.unwrap_or_else(|e| e.into_inner())
.insert(source)
{
Ok(())
} else {
Err(io::Error::from(io::ErrorKind::AlreadyExists))
}
}
/// Tell if a source is currently inside the set.
#[inline]
pub(crate) fn has_source(&self, source: SourceId) -> io::Result<()> {
if self
.sources
.read()
.unwrap_or_else(|e| e.into_inner())
.contains(&source)
{
Ok(())
} else {
Err(io::Error::from(io::ErrorKind::NotFound))
}
}
/// Remove a source from the sources set.
#[inline]
pub(crate) fn remove_source(&self, source: SourceId) -> io::Result<()> {
if self
.sources
.write()
.unwrap_or_else(|e| e.into_inner())
.remove(&source)
{
Ok(())
} else {
Err(io::Error::from(io::ErrorKind::NotFound))
}
}
/// Deletes a file descriptor.
pub fn delete(&self, fd: BorrowedFd<'_>) -> io::Result<()> {
// Simply delete interest in the file descriptor.
self.modify(fd, Event::none(0), PollMode::Oneshot)
self.modify(fd, Event::none(0), PollMode::Oneshot)?;
self.remove_source(SourceId::Fd(fd.as_raw_fd()))
}
/// Waits for I/O events with an optional timeout.
@ -295,6 +371,16 @@ impl EventExtra {
pub fn is_pri(&self) -> bool {
false
}
#[inline]
pub fn is_connect_failed(&self) -> Option<bool> {
None
}
#[inline]
pub fn is_err(&self) -> Option<bool> {
None
}
}
pub(crate) fn mode_to_flags(mode: PollMode) -> kqueue::EventFlags {

View File

@ -1,11 +1,11 @@
//! Portable interface to epoll, kqueue, event ports, and IOCP.
//!
//! Supported platforms:
//! - [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android
//! - [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android, RedoxOS
//! - [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, tvOS, watchOS, FreeBSD, NetBSD, OpenBSD,
//! DragonFly BSD
//! - [event ports](https://illumos.org/man/port_create): illumos, Solaris
//! - [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, other Unix systems
//! - [poll](https://en.wikipedia.org/wiki/Poll_(Unix)): VxWorks, Fuchsia, HermitOS, other Unix systems
//! - [IOCP](https://learn.microsoft.com/en-us/windows/win32/fileio/i-o-completion-ports): Windows, Wine (version 7.13+)
//!
//! By default, polling is done in oneshot mode, which means interest in I/O events needs to
@ -70,8 +70,7 @@ use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::time::Duration;
use std::usize;
use std::time::{Duration, Instant};
use cfg_if::cfg_if;
@ -81,7 +80,11 @@ cfg_if! {
if #[cfg(polling_test_poll_backend)] {
mod poll;
use poll as sys;
} else if #[cfg(any(target_os = "linux", target_os = "android"))] {
} else if #[cfg(any(
target_os = "linux",
target_os = "android",
target_os = "redox"
))] {
mod epoll;
use epoll as sys;
} else if #[cfg(any(
@ -104,6 +107,7 @@ cfg_if! {
use kqueue as sys;
} else if #[cfg(any(
target_os = "vxworks",
target_os = "hermit",
target_os = "fuchsia",
target_os = "horizon",
unix,
@ -121,7 +125,7 @@ cfg_if! {
pub mod os;
/// Key associated with notifications.
const NOTIFY_KEY: usize = std::usize::MAX;
const NOTIFY_KEY: usize = usize::MAX;
/// Indicates that a file descriptor or socket can read or write without blocking.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -181,52 +185,46 @@ pub enum PollMode {
}
impl Event {
/// All kinds of events (readable and writable).
///
/// Equivalent to: `Event { key, readable: true, writable: true }`
pub const fn all(key: usize) -> Event {
/// Create a new event.
pub const fn new(key: usize, readable: bool, writable: bool) -> Event {
Event {
key,
readable: true,
writable: true,
readable,
writable,
extra: sys::EventExtra::empty(),
}
}
/// All kinds of events (readable and writable).
///
/// Equivalent to: `Event::new(key, true, true)`
#[inline]
pub const fn all(key: usize) -> Event {
Event::new(key, true, true)
}
/// Only the readable event.
///
/// Equivalent to: `Event { key, readable: true, writable: false }`
/// Equivalent to: `Event::new(key, true, false)`
#[inline]
pub const fn readable(key: usize) -> Event {
Event {
key,
readable: true,
writable: false,
extra: sys::EventExtra::empty(),
}
Event::new(key, true, false)
}
/// Only the writable event.
///
/// Equivalent to: `Event { key, readable: false, writable: true }`
/// Equivalent to: `Event::new(key, false, true)`
#[inline]
pub const fn writable(key: usize) -> Event {
Event {
key,
readable: false,
writable: true,
extra: sys::EventExtra::empty(),
}
Event::new(key, false, true)
}
/// No events.
///
/// Equivalent to: `Event { key, readable: false, writable: false }`
/// Equivalent to: `Event::new(key, false, false)`
#[inline]
pub const fn none(key: usize) -> Event {
Event {
key,
readable: false,
writable: false,
extra: sys::EventExtra::empty(),
}
Event::new(key, false, false)
}
/// Add interruption events to this interest.
@ -339,6 +337,86 @@ impl Event {
self.extra.is_pri()
}
/// Tells if this event is the result of a connection failure.
///
/// This function checks if a TCP connection has failed. It corresponds to the `EPOLLERR` or `EPOLLHUP` event in Linux
/// and `CONNECT_FAILED` event in Windows IOCP.
///
/// # Examples
///
/// ```
/// use std::{io, net};
/// // Assuming polling and socket2 are included as dependencies in Cargo.toml
/// use polling::Event;
/// use socket2::Type;
///
/// fn main() -> io::Result<()> {
/// let socket = socket2::Socket::new(socket2::Domain::IPV4, Type::STREAM, None)?;
/// let poller = polling::Poller::new()?;
/// unsafe {
/// poller.add(&socket, Event::new(0, true, true))?;
/// }
/// let addr = net::SocketAddr::new(net::Ipv4Addr::LOCALHOST.into(), 8080);
/// socket.set_nonblocking(true)?;
/// let _ = socket.connect(&addr.into());
///
/// let mut events = polling::Events::new();
///
/// events.clear();
/// poller.wait(&mut events, None)?;
///
/// let event = events.iter().next();
///
/// let event = match event {
/// Some(event) => event,
/// None => {
/// println!("no event");
/// return Ok(());
/// },
/// };
///
/// println!("event: {:?}", event);
/// if event
/// .is_connect_failed()
/// .unwrap_or_default()
/// {
/// println!("connect failed");
/// }
///
/// Ok(())
/// }
/// ```
///
/// # Returns
///
/// Returns `Some(true)` if the connection has failed, `Some(false)` if the connection has not failed,
/// or `None` if the platform does not support detecting this condition.
#[inline]
#[deprecated(
since = "3.4.0",
note = "use `is_err` in combination of is_hup instead, see documentation for `is_err`"
)]
pub fn is_connect_failed(&self) -> Option<bool> {
self.extra.is_connect_failed()
}
/// Tells if this event is the result of a connection failure.
///
/// This function checks if an error exist, particularly useful in detecting if TCP connection failed. It corresponds to the `EPOLLERR` event in Linux
/// and `CONNECT_FAILED` event in Windows IOCP.
///
/// ## Caveats
///
/// In `epoll`, a TCP connection failure is indicated by `EPOLLERR` + `EPOLLHUP`, though just `EPOLLERR` is enough to indicate a connection failure.
/// EPOLLHUP may happen when we haven't event called `connect` on the socket, but it is still a valid event to check for.
///
/// Returns `Some(true)` if the connection has failed, `Some(false)` if there is an error,
/// or `None` if the platform does not support detecting this condition.
#[inline]
pub fn is_err(&self) -> Option<bool> {
self.extra.is_err()
}
/// Remove any extra information from this event.
#[inline]
pub fn clear_extra(&mut self) {
@ -616,8 +694,8 @@ impl Poller {
/// Waits for at least one I/O event and returns the number of new events.
///
/// New events will be appended to `events`. If necessary, make sure to clear the [`Vec`]
/// before calling [`wait()`][`Poller::wait()`]!
/// New events will be appended to `events`. If necessary, make sure to clear the
/// [`Events`][Events::clear()] before calling [`wait()`][`Poller::wait()`]!
///
/// This method will return with no new events if a notification is delivered by the
/// [`notify()`] method, or the timeout is reached. Sometimes it may even return with no events
@ -658,14 +736,30 @@ impl Poller {
let _enter = span.enter();
if let Ok(_lock) = self.lock.try_lock() {
// Wait for I/O events.
self.poller.wait(&mut events.events, timeout)?;
let deadline = timeout.and_then(|timeout| Instant::now().checked_add(timeout));
// Clear the notification, if any.
self.notified.swap(false, Ordering::SeqCst);
loop {
// Figure out how long to wait for.
let timeout =
deadline.map(|deadline| deadline.saturating_duration_since(Instant::now()));
// Indicate number of events.
Ok(events.len())
// Wait for I/O events.
if let Err(e) = self.poller.wait(&mut events.events, timeout) {
// If the wait was interrupted by a signal, clear events and try again.
if e.kind() == io::ErrorKind::Interrupted {
events.clear();
continue;
} else {
return Err(e);
}
}
// Clear the notification, if any.
self.notified.swap(false, Ordering::SeqCst);
// Indicate number of events.
return Ok(events.len());
}
} else {
tracing::trace!("wait: skipping because another thread is already waiting on I/O");
Ok(0)
@ -945,8 +1039,11 @@ impl fmt::Debug for Poller {
}
cfg_if! {
if #[cfg(unix)] {
if #[cfg(any(unix, target_os = "hermit"))] {
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, RawFd, AsFd, BorrowedFd};
#[cfg(target_os = "hermit")]
use std::os::hermit::io::{AsRawFd, RawFd, AsFd, BorrowedFd};
/// A resource with a raw file descriptor.
pub trait AsRawSource {

View File

@ -20,6 +20,7 @@ pub mod iocp;
mod __private {
#[doc(hidden)]
#[allow(dead_code)]
pub trait PollerSealed {}
impl PollerSealed for crate::Poller {}

View File

@ -1,4 +1,4 @@
//! Functionality that is only availale for IOCP-based platforms.
//! Functionality that is only available for IOCP-based platforms.
pub use crate::sys::CompletionPacket;

View File

@ -1,9 +1,10 @@
//! Functionality that is only available for `kqueue`-based platforms.
use crate::sys::mode_to_flags;
use crate::sys::{mode_to_flags, SourceId};
use crate::{PollMode, Poller};
use std::io;
use std::marker::PhantomData;
use std::process::Child;
use std::time::Duration;
@ -98,10 +99,13 @@ impl<F: Filter> PollerKqueueExt<F> for Poller {
#[inline(always)]
fn add_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> {
// No difference between adding and modifying in kqueue.
self.poller.add_source(filter.source_id())?;
self.modify_filter(filter, key, mode)
}
fn modify_filter(&self, filter: F, key: usize, mode: PollMode) -> io::Result<()> {
self.poller.has_source(filter.source_id())?;
// Convert the filter into a kevent.
let event = filter.filter(kqueue::EventFlags::ADD | mode_to_flags(mode), key);
@ -114,7 +118,9 @@ impl<F: Filter> PollerKqueueExt<F> for Poller {
let event = filter.filter(kqueue::EventFlags::DELETE, 0);
// Delete the filter.
self.poller.submit_changes([event])
self.poller.submit_changes([event])?;
self.poller.remove_source(filter.source_id())
}
}
@ -126,6 +132,11 @@ unsafe impl<T: FilterSealed + ?Sized> FilterSealed for &T {
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event {
(**self).filter(flags, key)
}
#[inline(always)]
fn source_id(&self) -> SourceId {
(**self).source_id()
}
}
impl<T: Filter + ?Sized> Filter for &T {}
@ -149,6 +160,11 @@ unsafe impl FilterSealed for Signal {
key as _,
)
}
#[inline(always)]
fn source_id(&self) -> SourceId {
SourceId::Signal(self.0)
}
}
impl Filter for Signal {}
@ -156,11 +172,14 @@ impl Filter for Signal {}
/// Monitor a child process.
#[derive(Debug)]
pub struct Process<'a> {
/// The child process to monitor.
child: &'a Child,
/// The process ID to monitor.
pid: rustix::process::Pid,
/// The operation to monitor.
ops: ProcessOps,
/// Lifetime of the underlying process.
_lt: PhantomData<&'a Child>,
}
/// The operations that a monitored process can perform.
@ -185,7 +204,24 @@ impl<'a> Process<'a> {
/// Once registered into the `Poller`, the `Child` object must outlive this filter's
/// registration into the poller.
pub unsafe fn new(child: &'a Child, ops: ProcessOps) -> Self {
Self { child, ops }
Self {
pid: rustix::process::Pid::from_child(child),
ops,
_lt: PhantomData,
}
}
/// Create a `Process` from a PID.
///
/// # Safety
///
/// The PID must be tied to an actual child process.
pub unsafe fn from_pid(pid: std::num::NonZeroI32, ops: ProcessOps) -> Self {
Self {
pid: unsafe { rustix::process::Pid::from_raw_unchecked(pid.get()) },
ops,
_lt: PhantomData,
}
}
}
@ -200,13 +236,20 @@ unsafe impl FilterSealed for Process<'_> {
kqueue::Event::new(
kqueue::EventFilter::Proc {
pid: rustix::process::Pid::from_child(self.child),
// SAFETY: We know that the PID is nonzero.
pid: self.pid,
flags: events,
},
flags | kqueue::EventFlags::RECEIPT,
key as _,
)
}
#[inline(always)]
fn source_id(&self) -> SourceId {
// SAFETY: We know that the PID is nonzero
SourceId::Pid(self.pid)
}
}
impl Filter for Process<'_> {}
@ -234,11 +277,17 @@ unsafe impl FilterSealed for Timer {
key as _,
)
}
#[inline(always)]
fn source_id(&self) -> SourceId {
SourceId::Timer(self.id)
}
}
impl Filter for Timer {}
mod __private {
use crate::sys::SourceId;
use rustix::event::kqueue;
#[doc(hidden)]
@ -247,5 +296,8 @@ mod __private {
///
/// This filter's flags must have `EV_RECEIPT`.
fn filter(&self, flags: kqueue::EventFlags, key: usize) -> kqueue::Event;
/// Get the source ID for this source.
fn source_id(&self) -> SourceId;
}
}

View File

@ -1,14 +1,17 @@
//! Bindings to poll (VxWorks, Fuchsia, other Unix systems).
use std::collections::HashMap;
use std::convert::TryInto;
use std::io;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant};
use rustix::event::{poll, PollFd, PollFlags};
#[cfg(not(target_os = "hermit"))]
use rustix::fd::{AsFd, AsRawFd, BorrowedFd};
#[cfg(target_os = "hermit")]
use std::os::hermit::io::{AsFd, AsRawFd, BorrowedFd};
use syscall::{poll, PollFd, PollFlags};
// std::os::unix doesn't exist on Fuchsia
type RawFd = std::os::raw::c_int;
@ -427,6 +430,16 @@ impl EventExtra {
pub fn is_pri(&self) -> bool {
self.flags.contains(PollFlags::PRI)
}
#[inline]
pub fn is_connect_failed(&self) -> Option<bool> {
Some(self.flags.contains(PollFlags::ERR) || self.flags.contains(PollFlags::HUP))
}
#[inline]
pub fn is_err(&self) -> Option<bool> {
Some(self.flags.contains(PollFlags::ERR))
}
}
fn cvt_mode_as_remove(mode: PollMode) -> io::Result<bool> {
@ -439,7 +452,182 @@ fn cvt_mode_as_remove(mode: PollMode) -> io::Result<bool> {
}
}
#[cfg(not(target_os = "espidf"))]
#[cfg(unix)]
mod syscall {
pub(super) use rustix::event::{poll, PollFd, PollFlags};
#[cfg(target_os = "espidf")]
pub(super) use rustix::event::{eventfd, EventfdFlags};
#[cfg(target_os = "espidf")]
pub(super) use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
#[cfg(target_os = "espidf")]
pub(super) use rustix::io::{read, write};
}
#[cfg(target_os = "hermit")]
mod syscall {
// TODO: Remove this shim once HermitOS is supported in Rustix.
use std::fmt;
use std::io;
use std::marker::PhantomData;
use std::ops::BitOr;
pub(super) use std::os::hermit::io::{AsFd, AsRawFd, BorrowedFd, FromRawFd, OwnedFd, RawFd};
/// Create an eventfd.
pub(super) fn eventfd(count: u64, _flags: EventfdFlags) -> io::Result<OwnedFd> {
let fd = unsafe { hermit_abi::eventfd(count, 0) };
if fd < 0 {
Err(io::Error::from_raw_os_error(unsafe {
hermit_abi::get_errno()
}))
} else {
Ok(unsafe { OwnedFd::from_raw_fd(fd) })
}
}
/// Read some bytes.
pub(super) fn read(fd: BorrowedFd<'_>, bytes: &mut [u8]) -> io::Result<usize> {
let count = unsafe { hermit_abi::read(fd.as_raw_fd(), bytes.as_mut_ptr(), bytes.len()) };
cvt(count)
}
/// Write some bytes.
pub(super) fn write(fd: BorrowedFd<'_>, bytes: &[u8]) -> io::Result<usize> {
let count = unsafe { hermit_abi::write(fd.as_raw_fd(), bytes.as_ptr(), bytes.len()) };
cvt(count)
}
/// Safe wrapper around the `poll` system call.
pub(super) fn poll(fds: &mut [PollFd<'_>], timeout: i32) -> io::Result<usize> {
let call = unsafe {
hermit_abi::poll(
fds.as_mut_ptr() as *mut hermit_abi::pollfd,
fds.len(),
timeout,
)
};
cvt(call as isize)
}
/// Safe wrapper around `pollfd`.
#[repr(transparent)]
pub(super) struct PollFd<'a> {
inner: hermit_abi::pollfd,
_lt: PhantomData<BorrowedFd<'a>>,
}
impl<'a> PollFd<'a> {
pub(super) fn from_borrowed_fd(fd: BorrowedFd<'a>, inflags: PollFlags) -> Self {
Self {
inner: hermit_abi::pollfd {
fd: fd.as_raw_fd(),
events: inflags.0,
revents: 0,
},
_lt: PhantomData,
}
}
pub(super) fn revents(&self) -> PollFlags {
PollFlags(self.inner.revents)
}
}
impl AsFd for PollFd<'_> {
fn as_fd(&self) -> BorrowedFd<'_> {
unsafe { BorrowedFd::borrow_raw(self.inner.fd) }
}
}
impl fmt::Debug for PollFd<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollFd")
.field("fd", &format_args!("0x{:x}", self.inner.fd))
.field("events", &PollFlags(self.inner.events))
.field("revents", &PollFlags(self.inner.revents))
.finish()
}
}
/// Wrapper around polling flags.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub(super) struct PollFlags(i16);
impl PollFlags {
/// Empty set of flags.
pub(super) const fn empty() -> Self {
Self(0)
}
pub(super) const IN: PollFlags = PollFlags(hermit_abi::POLLIN);
pub(super) const OUT: PollFlags = PollFlags(hermit_abi::POLLOUT);
pub(super) const WRBAND: PollFlags = PollFlags(hermit_abi::POLLWRBAND);
pub(super) const ERR: PollFlags = PollFlags(hermit_abi::POLLERR);
pub(super) const HUP: PollFlags = PollFlags(hermit_abi::POLLHUP);
pub(super) const PRI: PollFlags = PollFlags(hermit_abi::POLLPRI);
/// Tell if this contains some flags.
pub(super) fn contains(self, flags: PollFlags) -> bool {
self.0 & flags.0 != 0
}
/// Set a flag.
pub(super) fn set(&mut self, flags: PollFlags, set: bool) {
if set {
self.0 |= flags.0;
} else {
self.0 &= !(flags.0);
}
}
/// Tell if this is empty.
pub(super) fn is_empty(self) -> bool {
self.0 == 0
}
/// Tell if this intersects with some flags.
pub(super) fn intersects(self, flags: PollFlags) -> bool {
self.contains(flags)
}
}
impl BitOr for PollFlags {
type Output = PollFlags;
fn bitor(self, rhs: Self) -> Self::Output {
Self(self.0 | rhs.0)
}
}
#[derive(Debug, Copy, Clone)]
pub(super) struct EventfdFlags;
impl EventfdFlags {
pub(super) fn empty() -> Self {
Self
}
}
/// Convert a number to an actual result.
#[inline]
fn cvt(len: isize) -> io::Result<usize> {
if len < 0 {
Err(io::Error::from_raw_os_error(unsafe {
hermit_abi::get_errno()
}))
} else {
Ok(len as usize)
}
}
}
#[cfg(not(any(target_os = "espidf", target_os = "hermit")))]
mod notify {
use std::io;
@ -447,7 +635,9 @@ mod notify {
use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
use rustix::io::{fcntl_getfd, fcntl_setfd, read, write, FdFlags};
use rustix::pipe::{pipe, pipe_with, PipeFlags};
#[cfg(not(target_os = "haiku"))]
use rustix::pipe::pipe_with;
use rustix::pipe::{pipe, PipeFlags};
/// A notification pipe.
///
@ -468,12 +658,18 @@ mod notify {
impl Notify {
/// Creates a new notification pipe.
pub(super) fn new() -> io::Result<Self> {
let (read_pipe, write_pipe) = pipe_with(PipeFlags::CLOEXEC).or_else(|_| {
let fallback_pipe = |_| {
let (read_pipe, write_pipe) = pipe()?;
fcntl_setfd(&read_pipe, fcntl_getfd(&read_pipe)? | FdFlags::CLOEXEC)?;
fcntl_setfd(&write_pipe, fcntl_getfd(&write_pipe)? | FdFlags::CLOEXEC)?;
io::Result::Ok((read_pipe, write_pipe))
})?;
};
#[cfg(not(target_os = "haiku"))]
let (read_pipe, write_pipe) = pipe_with(PipeFlags::CLOEXEC).or_else(fallback_pipe)?;
#[cfg(target_os = "haiku")]
let (read_pipe, write_pipe) = fallback_pipe(PipeFlags::CLOEXEC)?;
// Put the reading side into non-blocking mode.
fcntl_setfl(&read_pipe, fcntl_getfl(&read_pipe)? | OFlags::NONBLOCK)?;
@ -489,7 +685,7 @@ mod notify {
self.read_pipe.as_fd()
}
/// Provides the poll flags to be used when registering the read half of the botify pipe with the `Poller`.
/// Provides the poll flags to be used when registering the read half of the notify pipe with the `Poller`.
pub(super) fn poll_flags(&self) -> PollFlags {
PollFlags::RDNORM
}
@ -503,7 +699,25 @@ mod notify {
/// Pops a notification (if any) from the pipe.
pub(super) fn pop_notification(&self) -> Result<(), io::Error> {
read(&self.read_pipe, &mut [0; 1])?;
// Pipes on Vita do not guarantee that after `write` call succeeds, the
// data becomes immediately available for reading on the other side of the pipe.
// To ensure that the notification is not lost, the read side of the pipe is temporarily
// switched to blocking for a single `read` call.
#[cfg(target_os = "vita")]
rustix::fs::fcntl_setfl(
&self.read_pipe,
rustix::fs::fcntl_getfl(&self.read_pipe)? & !rustix::fs::OFlags::NONBLOCK,
)?;
let result = read(&self.read_pipe, &mut [0; 1]);
#[cfg(target_os = "vita")]
rustix::fs::fcntl_setfl(
&self.read_pipe,
rustix::fs::fcntl_getfl(&self.read_pipe)? | rustix::fs::OFlags::NONBLOCK,
)?;
result?;
Ok(())
}
@ -522,20 +736,18 @@ mod notify {
}
}
#[cfg(target_os = "espidf")]
#[cfg(any(target_os = "espidf", target_os = "hermit"))]
mod notify {
use std::io;
use std::mem;
use rustix::event::PollFlags;
use rustix::event::{eventfd, EventfdFlags};
use rustix::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
use rustix::io::{read, write};
use super::syscall::{
eventfd, read, write, AsFd, AsRawFd, BorrowedFd, EventfdFlags, OwnedFd, PollFlags, RawFd,
};
/// A notification pipe.
///
/// This implementation uses ther `eventfd` syscall to send notifications.
/// This implementation uses the `eventfd` syscall to send notifications.
#[derive(Debug)]
pub(super) struct Notify {
/// The file descriptor of the eventfd object. This is also stored as the first
@ -559,13 +771,20 @@ mod notify {
// (1) is not a problem for us, as we want the eventfd() file descriptor to be in a non-blocking mode anyway
// (2) is also not a problem, as long as we don't try to read the counter value in an endless loop when we detect being notified
#[cfg(not(target_os = "espidf"))]
let flags = EventfdFlags::NONBLOCK;
#[cfg(target_os = "espidf")]
let flags = EventfdFlags::empty();
let event_fd = eventfd(0, flags)?;
let event_fd = eventfd(0, flags).map_err(|err| {
match io::Error::from(err) {
err if err.kind() == io::ErrorKind::PermissionDenied => {
// EPERM can happen if the eventfd isn't initialized yet.
// Tell the user to call esp_vfs_eventfd_register.
io::Error::new(
io::ErrorKind::PermissionDenied,
"failed to initialize eventfd for polling, try calling `esp_vfs_eventfd_register`"
)
},
err => err,
}
})?;
Ok(Self { event_fd })
}
@ -582,14 +801,14 @@ mod notify {
/// Notifies the `Poller` instance via the eventfd file descriptor.
pub(super) fn notify(&self) -> Result<(), io::Error> {
write(&self.event_fd, &1u64.to_ne_bytes())?;
write(self.event_fd.as_fd(), &1u64.to_ne_bytes())?;
Ok(())
}
/// Pops a notification (if any) from the eventfd file descriptor.
pub(super) fn pop_notification(&self) -> Result<(), io::Error> {
read(&self.event_fd, &mut [0; mem::size_of::<u64>()])?;
read(self.event_fd.as_fd(), &mut [0; mem::size_of::<u64>()])?;
Ok(())
}

View File

@ -250,4 +250,14 @@ impl EventExtra {
pub fn is_pri(&self) -> bool {
self.flags.contains(PollFlags::PRI)
}
#[inline]
pub fn is_connect_failed(&self) -> Option<bool> {
Some(self.flags.contains(PollFlags::ERR) && self.flags.contains(PollFlags::HUP))
}
#[inline]
pub fn is_err(&self) -> Option<bool> {
Some(self.flags.contains(PollFlags::ERR))
}
}

View File

@ -76,6 +76,53 @@ fn concurrent_modify() -> io::Result<()> {
Ok(())
}
#[cfg(all(unix, not(target_os = "vita")))]
#[test]
fn concurrent_interruption() -> io::Result<()> {
struct MakeItSend<T>(T);
unsafe impl<T> Send for MakeItSend<T> {}
let (reader, _writer) = tcp_pair()?;
let poller = Poller::new()?;
unsafe {
poller.add(&reader, Event::none(0))?;
}
let mut events = Events::new();
let events_borrow = &mut events;
let (sender, receiver) = std::sync::mpsc::channel();
Parallel::new()
.add(move || {
// Register a signal handler so that the syscall is actually interrupted. A signal that
// is ignored by default does not cause an interrupted syscall.
signal_hook::flag::register(signal_hook::consts::signal::SIGURG, Default::default())?;
// Signal to the other thread how to send a signal to us
sender
.send(MakeItSend(unsafe { libc::pthread_self() }))
.unwrap();
poller.wait(events_borrow, Some(Duration::from_secs(1)))?;
Ok(())
})
.add(move || {
let MakeItSend(target_thread) = receiver.recv().unwrap();
thread::sleep(Duration::from_millis(100));
assert_eq!(0, unsafe {
libc::pthread_kill(target_thread, libc::SIGURG)
});
Ok(())
})
.run()
.into_iter()
.collect::<io::Result<()>>()?;
assert_eq!(events.len(), 0);
Ok(())
}
fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> {
let listener = TcpListener::bind("127.0.0.1:0")?;
let a = TcpStream::connect(listener.local_addr()?)?;

View File

@ -1,6 +1,7 @@
use polling::{Event, Events, Poller};
use std::io::{self, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::time::Duration;
#[test]
@ -38,6 +39,48 @@ fn basic_io() {
poller.delete(&read).unwrap();
}
#[test]
fn insert_twice() {
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
let (read, mut write) = tcp_pair().unwrap();
let read = Arc::new(read);
let poller = Poller::new().unwrap();
unsafe {
#[cfg(unix)]
let read = read.as_raw_fd();
#[cfg(windows)]
let read = read.as_raw_socket();
poller.add(read, Event::readable(1)).unwrap();
assert_eq!(
poller.add(read, Event::readable(1)).unwrap_err().kind(),
io::ErrorKind::AlreadyExists
);
}
write.write_all(&[1]).unwrap();
let mut events = Events::new();
assert_eq!(
poller
.wait(&mut events, Some(Duration::from_secs(1)))
.unwrap(),
1
);
assert_eq!(events.len(), 1);
assert_eq!(
events.iter().next().unwrap().with_no_extra(),
Event::readable(1)
);
poller.delete(&read).unwrap();
}
fn tcp_pair() -> io::Result<(TcpStream, TcpStream)> {
let listener = TcpListener::bind("127.0.0.1:0")?;
let a = TcpStream::connect(listener.local_addr()?)?;