Merge pull request #3 from papertigers/illumos

Add event ports for illumos
This commit is contained in:
Stjepan Glavina 2020-08-11 13:15:36 +02:00 committed by GitHub
commit 30ca45f83f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 229 additions and 4 deletions

View File

@ -12,9 +12,10 @@ https://docs.rs/polling)
Portable interface to epoll, kqueue, and wepoll.
Supported platforms:
- [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android, illumos
- [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android
- [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, FreeBSD, NetBSD, OpenBSD,
DragonFly BSD
- [event ports](https://illumos.org/man/port_create): illumos, Solaris
- [wepoll](https://github.com/piscisaureus/wepoll): Windows
Polling is done in oneshot mode, which means interest in I/O events needs to be reset after

View File

@ -1,9 +1,10 @@
//! Portable interface to epoll, kqueue, and wepoll.
//! Portable interface to epoll, kqueue, event ports, and wepoll.
//!
//! Supported platforms:
//! - [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android, illumos
//! - [epoll](https://en.wikipedia.org/wiki/Epoll): Linux, Android
//! - [kqueue](https://en.wikipedia.org/wiki/Kqueue): macOS, iOS, FreeBSD, NetBSD, OpenBSD,
//! DragonFly BSD
//! - [event ports](https://illumos.org/man/port_create): illumos, Solaris
//! - [wepoll](https://github.com/piscisaureus/wepoll): Windows
//!
//! Polling is done in oneshot mode, which means interest in I/O events needs to be reset after
@ -69,9 +70,15 @@ macro_rules! syscall {
}
cfg_if! {
if #[cfg(any(target_os = "linux", target_os = "android", target_os = "illumos"))] {
if #[cfg(any(target_os = "linux", target_os = "android"))] {
mod epoll;
use epoll as sys;
} else if #[cfg(any(
target_os = "illumos",
target_os = "solaris",
))] {
mod port;
use port as sys;
} else if #[cfg(any(
target_os = "macos",
target_os = "ios",

217
src/port.rs Normal file
View File

@ -0,0 +1,217 @@
//! Bindings to event port (illumos, Solaris).
use std::io::{self, Read, Write};
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
use std::ptr;
use std::time::Duration;
use std::usize;
use crate::Event;
/// Interface to event ports.
#[derive(Debug)]
pub struct Poller {
/// File descriptor for the port instance.
port_fd: RawFd,
/// Read side of a pipe for consuming notifications.
read_stream: UnixStream,
/// Write side of a pipe for producing notifications.
write_stream: UnixStream,
}
impl Poller {
/// Creates a new poller.
pub fn new() -> io::Result<Poller> {
let port_fd = unsafe {
let fd = match libc::port_create() {
-1 => return Err(io::Error::last_os_error()),
fd => fd,
};
let flags = libc::fcntl(fd, libc::F_GETFD);
libc::fcntl(fd, libc::F_SETFD, flags | libc::FD_CLOEXEC);
fd
};
// Set up the notification pipe.
let (read_stream, write_stream) = UnixStream::pair()?;
read_stream.set_nonblocking(true)?;
write_stream.set_nonblocking(true)?;
let poller = Poller {
port_fd,
read_stream,
write_stream,
};
poller.interest(
poller.read_stream.as_raw_fd(),
Event {
key: NOTIFY_KEY,
readable: true,
writable: false,
},
)?;
Ok(poller)
}
/// Inserts a file descriptor.
pub fn insert(&self, fd: RawFd) -> io::Result<()> {
// Put the file descriptor in non-blocking mode.
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
syscall!(port_associate(
self.port_fd,
libc::PORT_SOURCE_FD,
fd as _,
0,
0 as _,
))?;
Ok(())
}
/// Sets interest in a read/write event on a file descriptor and associates a key with it.
pub fn interest(&self, fd: RawFd, ev: Event) -> io::Result<()> {
let mut flags = 0;
if ev.readable {
flags |= libc::POLLIN;
}
if ev.writable {
flags |= libc::POLLOUT;
}
syscall!(port_associate(
self.port_fd,
libc::PORT_SOURCE_FD,
fd as _,
flags as _,
ev.key as _,
))?;
Ok(())
}
/// Removes a file descriptor.
pub fn remove(&self, fd: RawFd) -> io::Result<()> {
syscall!(port_dissociate(
self.port_fd,
libc::PORT_SOURCE_FD,
fd as usize,
))?;
Ok(())
}
/// Waits for I/O events with an optional timeout.
///
/// Returns the number of processed I/O events.
///
/// If a notification occurs, the notification event will be included in the `events` list
/// identifiable by key `usize::MAX`.
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let mut timeout = timeout.map(|t| libc::timespec {
tv_sec: t.as_secs() as libc::time_t,
tv_nsec: t.subsec_nanos() as libc::c_long,
});
let mut nget: u32 = 1;
// Wait for I/O events.
let res = syscall!(port_getn(
self.port_fd,
events.list.as_mut_ptr() as *mut libc::port_event,
events.list.len() as libc::c_uint,
&mut nget as _,
match &mut timeout {
None => ptr::null_mut(),
Some(t) => t,
}
));
// Event ports sets the return value to -1 and returns ETIME on timer expire. The number of
// returned events is stored in nget, but in our case it should always be 0 since we set
// nget to 1 initially.
let nevents = match res {
Err(e) => match e.raw_os_error().unwrap() {
libc::ETIME => 0,
_ => return Err(e),
},
Ok(_) => nget as usize,
};
events.len = nevents;
// Clear the notification (if received) and re-register interest in it.
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
self.interest(
self.read_stream.as_raw_fd(),
Event {
key: NOTIFY_KEY,
readable: true,
writable: false,
},
)?;
Ok(events.len)
}
/// Sends a notification to wake up the current or next `wait()` call.
pub fn notify(&self) -> io::Result<()> {
let _ = (&self.write_stream).write(&[1]);
Ok(())
}
}
impl Drop for Poller {
fn drop(&mut self) {
let _ = self.remove(self.read_stream.as_raw_fd());
let _ = syscall!(close(self.port_fd));
}
}
/// Key associated with the eventfd for producing notifications.
const NOTIFY_KEY: usize = usize::MAX;
/// Poll flags for all possible readability events.
fn read_flags() -> libc::c_short {
libc::POLLIN | libc::POLLHUP | libc::POLLERR | libc::POLLPRI
}
/// Poll flags for all possible writability events.
fn write_flags() -> libc::c_short {
libc::POLLOUT | libc::POLLHUP | libc::POLLERR
}
/// A list of reported I/O events.
pub struct Events {
list: Box<[libc::port_event]>,
len: usize,
}
unsafe impl Send for Events {}
impl Events {
/// Creates an empty list.
pub fn new() -> Events {
let ev = libc::port_event {
portev_events: 0,
portev_source: 0,
portev_pad: 0,
portev_object: 0,
portev_user: 0 as _,
};
let list = vec![ev; 1000].into_boxed_slice();
let len = 0;
Events { list, len }
}
/// Iterates over I/O events.
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
self.list[..self.len].iter().map(|ev| Event {
key: ev.portev_user as _,
readable: (ev.portev_events & read_flags() as libc::c_int) != 0,
writable: (ev.portev_events & write_flags() as libc::c_int) != 0,
})
}
}