Separate adding and modifying of file descriptors

This replaces Poller.insert() and Poller.interest() with Poller.add()
and Poller.modify(), and renames Poller.remove() to Poller.delete().

The method Poller.add() is used for adding a new file descriptor, while
Poller.modify() is used for updating an existing one. Poller.remove() is
renamed to Poller.delete() so the naming scheme of these methods follows
that of epoll, wepoll, etc.

This new setup means that adding a new socket only requires a single
call of Poller.add(), instead of a combination of Poller.insert() and
Poller.interest(). This reduces the amount of system calls necessary,
and leads to a more pleasant API.

On systems that use kqueue or ports, the behaviour of Poller.add() and
Poller.modify() is the same. This is because on these systems adding an
already existing file descriptor will just update its configuration.
This however is an implementation detail and should not be relied upon
by users.

Migrating to this new API is pretty simple, simply replace this:

    poller.insert(&socket);
    poller.interest(&socket, event);

With this:

    poller.add(&socket, event);

And for cases where Poller.interest() was used for updating an existing
file descriptor, simply replace it will a call to Poller.modify().

See https://github.com/stjepang/polling/issues/16 and
https://github.com/stjepang/polling/pull/17 for more information.
This commit is contained in:
Yorick Peterse 2020-10-01 21:26:44 +02:00
parent 4e5c3ce836
commit ba05307af1
No known key found for this signature in database
GPG Key ID: EDD30D2BEB691AC9
7 changed files with 164 additions and 188 deletions

View File

@ -38,8 +38,7 @@ socket.set_nonblocking(true)?;
// Create a poller and register interest in readability on the socket.
let poller = Poller::new()?;
poller.insert(&socket)?;
poller.interest(&socket, Event::readable(key))?;
poller.add(&socket, Event::readable(key))?;
// The event loop.
let mut events = Vec::new();
@ -53,7 +52,7 @@ loop {
// Perform a non-blocking accept operation.
socket.accept()?;
// Set interest in the next readability event.
poller.interest(&socket, Event::readable(key))?;
poller.modify(&socket, Event::readable(key))?;
}
}
}

View File

@ -11,11 +11,9 @@ fn main() -> io::Result<()> {
l2.set_nonblocking(true)?;
let poller = Poller::new()?;
poller.insert(&l1)?;
poller.insert(&l2)?;
poller.interest(&l1, Event::readable(1))?;
poller.interest(&l2, Event::readable(2))?;
poller.add(&l1, Event::readable(1))?;
poller.add(&l2, Event::readable(2))?;
let mut events = Vec::new();
loop {
@ -27,12 +25,12 @@ fn main() -> io::Result<()> {
1 => {
println!("Accept on l1");
l1.accept()?;
poller.interest(&l1, Event::readable(1))?;
poller.modify(&l1, Event::readable(1))?;
}
2 => {
println!("Accept on l2");
l2.accept()?;
poller.interest(&l2, Event::readable(2))?;
poller.modify(&l2, Event::readable(2))?;
}
_ => unreachable!(),
}

View File

@ -64,10 +64,10 @@ impl Poller {
};
if let Some(timer_fd) = timer_fd {
poller.insert(timer_fd)?;
poller.add(timer_fd, Event::none(crate::NOTIFY_KEY))?;
}
poller.insert(event_fd)?;
poller.interest(
poller.add(
event_fd,
Event {
key: crate::NOTIFY_KEY,
@ -85,48 +85,20 @@ impl Poller {
Ok(poller)
}
/// Inserts a file descriptor.
pub fn insert(&self, fd: RawFd) -> io::Result<()> {
log::trace!("insert: epoll_fd={}, fd={}", self.epoll_fd, fd);
// Register the file descriptor in epoll.
let mut ev = libc::epoll_event {
events: libc::EPOLLONESHOT as _,
u64: crate::NOTIFY_KEY as u64,
};
syscall!(epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut ev))?;
Ok(())
/// Adds a new file descriptor.
pub fn add(&self, fd: RawFd, ev: Event) -> io::Result<()> {
log::trace!("add: epoll_fd={}, fd={}, ev={:?}", self.epoll_fd, fd, ev);
self.register(fd, libc::EPOLL_CTL_ADD, ev)
}
/// Sets interest in a read/write event on a file descriptor and associates a key with it.
pub fn interest(&self, fd: RawFd, ev: Event) -> io::Result<()> {
log::trace!(
"interest: epoll_fd={}, fd={}, ev={:?}",
self.epoll_fd,
fd,
ev
);
let mut flags = libc::EPOLLONESHOT;
if ev.readable {
flags |= read_flags();
}
if ev.writable {
flags |= write_flags();
}
let mut ev = libc::epoll_event {
events: flags as _,
u64: ev.key as u64,
};
syscall!(epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_MOD, fd, &mut ev))?;
Ok(())
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> {
log::trace!("modify: epoll_fd={}, fd={}, ev={:?}", self.epoll_fd, fd, ev);
self.register(fd, libc::EPOLL_CTL_MOD, ev)
}
/// Removes a file descriptor.
pub fn remove(&self, fd: RawFd) -> io::Result<()> {
pub fn delete(&self, fd: RawFd) -> io::Result<()> {
log::trace!("remove: epoll_fd={}, fd={}", self.epoll_fd, fd);
syscall!(epoll_ctl(
@ -164,7 +136,7 @@ impl Poller {
))?;
// Set interest in timerfd.
self.interest(
self.modify(
timer_fd,
Event {
key: crate::NOTIFY_KEY,
@ -205,7 +177,7 @@ impl Poller {
&mut buf[0] as *mut u8 as *mut libc::c_void,
buf.len()
));
self.interest(
self.modify(
self.event_fd,
Event {
key: crate::NOTIFY_KEY,
@ -233,6 +205,27 @@ impl Poller {
));
Ok(())
}
fn register(&self, fd: RawFd, action: i32, ev: Event) -> io::Result<()> {
let mut flags = libc::EPOLLONESHOT;
if ev.readable {
flags |= read_flags();
}
if ev.writable {
flags |= write_flags();
}
let mut epoll_ev = libc::epoll_event {
events: flags as _,
u64: ev.key as u64,
};
syscall!(epoll_ctl(self.epoll_fd, action, fd, &mut epoll_ev))?;
Ok(())
}
}
impl Drop for Poller {
@ -245,10 +238,10 @@ impl Drop for Poller {
);
if let Some(timer_fd) = self.timer_fd {
let _ = self.remove(timer_fd);
let _ = self.delete(timer_fd);
let _ = syscall!(close(timer_fd));
}
let _ = self.remove(self.event_fd);
let _ = self.delete(self.event_fd);
let _ = syscall!(close(self.event_fd));
let _ = syscall!(close(self.epoll_fd));
}

View File

@ -35,7 +35,7 @@ impl Poller {
read_stream,
write_stream,
};
poller.interest(
poller.add(
poller.read_stream.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
@ -52,24 +52,10 @@ impl Poller {
Ok(poller)
}
/// Inserts a file descriptor.
pub fn insert(&self, fd: RawFd) -> io::Result<()> {
/// Adds a new file descriptor.
pub fn add(&self, fd: RawFd, ev: Event) -> io::Result<()> {
if fd != self.read_stream.as_raw_fd() {
log::trace!("insert: fd={}", fd);
}
Ok(())
}
/// Sets interest in a read/write event on a file descriptor and associates a key with it.
pub fn interest(&self, fd: RawFd, ev: Event) -> io::Result<()> {
if fd != self.read_stream.as_raw_fd() {
log::trace!(
"interest: kqueue_fd={}, fd={}, ev={:?}",
self.kqueue_fd,
fd,
ev
);
log::trace!("add: kqueue_fd={}, fd={}, ev={:?}", self.kqueue_fd, fd, ev);
}
let mut read_flags = libc::EV_ONESHOT | libc::EV_RECEIPT;
@ -131,8 +117,15 @@ impl Poller {
Ok(())
}
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> {
// Adding a file description that is already registered will just update the existing
// registration.
self.add(fd, ev)
}
/// Removes a file descriptor.
pub fn remove(&self, fd: RawFd) -> io::Result<()> {
pub fn delete(&self, fd: RawFd) -> io::Result<()> {
if fd != self.read_stream.as_raw_fd() {
log::trace!("remove: kqueue_fd={}, fd={}", self.kqueue_fd, fd);
}
@ -207,7 +200,7 @@ impl Poller {
// Clear the notification (if received) and re-register interest in it.
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
self.interest(
self.modify(
self.read_stream.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
@ -230,7 +223,7 @@ impl Poller {
impl Drop for Poller {
fn drop(&mut self) {
log::trace!("drop: kqueue_fd={}", self.kqueue_fd);
let _ = self.remove(self.read_stream.as_raw_fd());
let _ = self.delete(self.read_stream.as_raw_fd());
let _ = syscall!(close(self.kqueue_fd));
}
}

View File

@ -26,8 +26,7 @@
//!
//! // Create a poller and register interest in readability on the socket.
//! let poller = Poller::new()?;
//! poller.insert(&socket)?;
//! poller.interest(&socket, Event::readable(key))?;
//! poller.add(&socket, Event::readable(key))?;
//!
//! // The event loop.
//! let mut events = Vec::new();
@ -41,7 +40,7 @@
//! // Perform a non-blocking accept operation.
//! socket.accept()?;
//! // Set interest in the next readability event.
//! poller.interest(&socket, Event::readable(key))?;
//! poller.modify(&socket, Event::readable(key))?;
//! }
//! }
//! }
@ -102,6 +101,17 @@ cfg_if! {
}
}
macro_rules! verify_event_key {
($event:expr) => {{
if $event.key == usize::MAX {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"the key is not allowed to be `usize::MAX`",
));
}
}};
}
/// Key associated with notifications.
const NOTIFY_KEY: usize = std::usize::MAX;
@ -188,32 +198,7 @@ impl Poller {
})
}
/// Inserts a file descriptor or socket into the poller.
///
/// Before setting interest in readability or writability, the file descriptor or socket must
/// be inserted into the poller.
///
/// Don't forget to [remove][`Poller::remove()`] it when it is no longer used!
///
/// # Examples
///
/// ```
/// use polling::Poller;
/// use std::net::TcpListener;
///
/// let poller = Poller::new()?;
/// let socket = TcpListener::bind("127.0.0.1:0")?;
///
/// socket.set_nonblocking(true)?;
///
/// poller.insert(&socket)?;
/// # std::io::Result::Ok(())
/// ```
pub fn insert(&self, source: impl Source) -> io::Result<()> {
self.poller.insert(source.raw())
}
/// Enables or disables interest in a file descriptor or socket.
/// Enables interest in a file descriptor or socket.
///
/// A file descriptor or socket is considered readable or writable when a read or write
/// operation on it would not block. This doesn't mean the read or write operation will
@ -227,11 +212,12 @@ impl Poller {
/// - `Event { key: 7, readable: true, writable: false }`
/// - `Event { key: 7, readable: false, writable: true }`
///
/// Don't forget to [delete][`Poller::delete()`] it when it is no longer used!
///
/// # Errors
///
/// This method returns an error in the following situations:
///
/// * If `source` was not [inserted][`Poller::interest()`] into the poller.
/// * If `key` equals `usize::MAX` because that key is reserved for internal use.
/// * If an error is returned by the syscall.
///
@ -244,7 +230,7 @@ impl Poller {
/// # let poller = Poller::new()?;
/// # let key = 7;
/// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
/// poller.interest(&source, Event::all(key))?;
/// poller.add(&source, Event::all(key))?;
/// # std::io::Result::Ok(())
/// ```
///
@ -255,7 +241,7 @@ impl Poller {
/// # let poller = Poller::new()?;
/// # let key = 7;
/// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
/// poller.interest(&source, Event::readable(key))?;
/// poller.add(&source, Event::readable(key))?;
/// # std::io::Result::Ok(())
/// ```
///
@ -266,7 +252,7 @@ impl Poller {
/// # let poller = Poller::new()?;
/// # let key = 7;
/// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
/// poller.interest(&source, Event::writable(key))?;
/// poller.add(&source, Event::writable(key))?;
/// # std::io::Result::Ok(())
/// ```
///
@ -277,42 +263,64 @@ impl Poller {
/// # let poller = Poller::new()?;
/// # let key = 7;
/// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
/// poller.interest(&source, Event::none(key))?;
/// poller.add(&source, Event::none(key))?;
/// # std::io::Result::Ok(())
/// ```
pub fn interest(&self, source: impl Source, event: Event) -> io::Result<()> {
if event.key == usize::MAX {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"the key is not allowed to be `usize::MAX`",
))
} else {
self.poller.interest(source.raw(), event)
}
pub fn add(&self, source: impl Source, event: Event) -> io::Result<()> {
verify_event_key!(event);
self.poller.add(source.raw(), event)
}
/// Modifies the interest of a file descriptor or socket.
///
/// This method has the same behaviour as [`add()`][`Poller::add()`] except it modifies the
/// interest of an already registered file descriptor or socket.
///
/// To use this method with a file descriptor, you must first add it using
/// [`add()`][`Poller::add()`].
///
/// # Examples
///
/// This will first register a socket for only writes, then modify the interest to both reads
/// and writes:
///
/// ```no_run
/// # use polling::{Event, Poller};
/// # let poller = Poller::new()?;
/// # let key = 7;
/// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
/// poller.add(&source, Event::writable(key))?;
/// poller.modify(&source, Event::all(key))?;
/// # std::io::Result::Ok(())
/// ```
pub fn modify(&self, source: impl Source, event: Event) -> io::Result<()> {
verify_event_key!(event);
self.poller.modify(source.raw(), event)
}
/// Removes a file descriptor or socket from the poller.
///
/// Unlike [`insert()`][`Poller::insert()`], this method only removes the file descriptor or
/// Unlike [`add()`][`Poller::add()`], this method only removes the file descriptor or
/// socket from the poller without putting it back into blocking mode.
///
/// # Examples
///
/// ```
/// use polling::Poller;
/// use polling::{Event, Poller};
/// use std::net::TcpListener;
///
/// let poller = Poller::new()?;
/// let socket = TcpListener::bind("127.0.0.1:0")?;
/// let key = 7;
///
/// socket.set_nonblocking(true)?;
///
/// poller.insert(&socket)?;
/// poller.remove(&socket)?;
/// poller.add(&socket, Event::all(key))?;
/// poller.delete(&socket)?;
/// # std::io::Result::Ok(())
/// ```
pub fn remove(&self, source: impl Source) -> io::Result<()> {
self.poller.remove(source.raw())
pub fn delete(&self, source: impl Source) -> io::Result<()> {
self.poller.delete(source.raw())
}
/// Waits for at least one I/O event and returns the number of new events.
@ -336,15 +344,16 @@ impl Poller {
/// # Examples
///
/// ```
/// use polling::Poller;
/// use polling::{Event, Poller};
/// use std::net::TcpListener;
/// use std::time::Duration;
///
/// let poller = Poller::new()?;
/// let socket = TcpListener::bind("127.0.0.1:0")?;
/// let key = 7;
///
/// socket.set_nonblocking(true)?;
/// poller.insert(&socket)?;
/// poller.add(&socket, Event::all(key))?;
///
/// let mut events = Vec::new();
/// let n = poller.wait(&mut events, Some(Duration::from_secs(1)))?;

View File

@ -36,7 +36,7 @@ impl Poller {
read_stream,
write_stream,
};
poller.interest(
poller.add(
poller.read_stream.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
@ -48,21 +48,8 @@ impl Poller {
Ok(poller)
}
/// Inserts a file descriptor.
pub fn insert(&self, fd: RawFd) -> io::Result<()> {
syscall!(port_associate(
self.port_fd,
libc::PORT_SOURCE_FD,
fd as _,
0,
0 as _,
))?;
Ok(())
}
/// Sets interest in a read/write event on a file descriptor and associates a key with it.
pub fn interest(&self, fd: RawFd, ev: Event) -> io::Result<()> {
/// Adds a file descriptor.
pub fn add(&self, fd: RawFd, ev: Event) -> io::Result<()> {
let mut flags = 0;
if ev.readable {
flags |= libc::POLLIN;
@ -82,8 +69,14 @@ impl Poller {
Ok(())
}
pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> {
// Adding a file description that is already registered will just update the existing
// registration.
self.add(fd, ev)
}
/// Removes a file descriptor.
pub fn remove(&self, fd: RawFd) -> io::Result<()> {
pub fn delete(&self, fd: RawFd) -> io::Result<()> {
syscall!(port_dissociate(
self.port_fd,
libc::PORT_SOURCE_FD,
@ -127,7 +120,7 @@ impl Poller {
// Clear the notification (if received) and re-register interest in it.
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
self.interest(
self.modify(
self.read_stream.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
@ -148,7 +141,7 @@ impl Poller {
impl Drop for Poller {
fn drop(&mut self) {
let _ = self.remove(self.read_stream.as_raw_fd());
let _ = self.delete(self.read_stream.as_raw_fd());
let _ = syscall!(close(self.port_fd));
}
}

View File

@ -47,60 +47,25 @@ impl Poller {
Ok(Poller { handle, notified })
}
/// Inserts a socket.
pub fn insert(&self, sock: RawSocket) -> io::Result<()> {
log::trace!("insert: handle={:?}, sock={}", self.handle, sock);
// Register the socket in wepoll.
let mut ev = we::epoll_event {
events: we::EPOLLONESHOT,
data: we::epoll_data {
u64: crate::NOTIFY_KEY as u64,
},
};
wepoll!(epoll_ctl(
self.handle,
we::EPOLL_CTL_ADD as ctypes::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
/// Adds a socket.
pub fn add(&self, sock: RawSocket, ev: Event) -> io::Result<()> {
log::trace!("add: handle={:?}, sock={}, ev={:?}", self.handle, sock, ev);
self.register(sock, we::EPOLL_CTL_ADD, ev)
}
/// Sets interest in a read/write event on a socket and associates a key with it.
pub fn interest(&self, sock: RawSocket, ev: Event) -> io::Result<()> {
/// Modifies a socket.
pub fn modify(&self, sock: RawSocket, ev: Event) -> io::Result<()> {
log::trace!(
"interest: handle={:?}, sock={}, ev={:?}",
"modify: handle={:?}, sock={}, ev={:?}",
self.handle,
sock,
ev
);
let mut flags = we::EPOLLONESHOT;
if ev.readable {
flags |= READ_FLAGS;
}
if ev.writable {
flags |= WRITE_FLAGS;
}
let mut ev = we::epoll_event {
events: flags as u32,
data: we::epoll_data { u64: ev.key as u64 },
};
wepoll!(epoll_ctl(
self.handle,
we::EPOLL_CTL_MOD as ctypes::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
self.register(sock, we::EPOLL_CTL_MOD, ev)
}
/// Removes a socket.
pub fn remove(&self, sock: RawSocket) -> io::Result<()> {
pub fn delete(&self, sock: RawSocket) -> io::Result<()> {
log::trace!("remove: handle={:?}, sock={}", self.handle, sock);
wepoll!(epoll_ctl(
self.handle,
@ -177,6 +142,32 @@ impl Poller {
}
Ok(())
}
fn register(&self, sock: RawSocket, action: u32, ev: Event) -> io::Result<()> {
let mut flags = we::EPOLLONESHOT;
if ev.readable {
flags |= READ_FLAGS;
}
if ev.writable {
flags |= WRITE_FLAGS;
}
let mut ev = we::epoll_event {
events: flags as u32,
data: we::epoll_data { u64: ev.key as u64 },
};
wepoll!(epoll_ctl(
self.handle,
action as ctypes::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
}
}
impl Drop for Poller {