Merge pull request #17 from YorickPeterse/separate-poller-methods

Refactor Poller.add and Poller.interest
This commit is contained in:
Stjepan Glavina 2020-10-02 16:32:20 +02:00 committed by GitHub
commit 35add590ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 187 additions and 221 deletions

View File

@ -16,11 +16,11 @@ jobs:
rust: [nightly, beta, stable, 1.40.0]
steps:
- uses: actions/checkout@v2
- name: Set current week of the year in environnement
if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macOS')
run: echo "::set-env name=CURRENT_WEEK::$(date +%V)"
- name: Set current week of the year in environnement
if: startsWith(matrix.os, 'windows')
run: echo "::set-env name=CURRENT_WEEK::$(Get-Date -UFormat %V)"
@ -62,11 +62,11 @@ jobs:
with:
command: test
- name: Clone async-io
run: git clone https://github.com/stjepang/async-io.git
- name: Add patch section
run: echo '[patch.crates-io]' >> async-io/Cargo.toml
- name: Patch polling
run: echo 'polling = { path = ".." }' >> async-io/Cargo.toml
- name: Test async-io
run: cargo test --manifest-path=async-io/Cargo.toml
# - name: Clone async-io
# run: git clone https://github.com/stjepang/async-io.git
# - name: Add patch section
# run: echo '[patch.crates-io]' >> async-io/Cargo.toml
# - name: Patch polling
# run: echo 'polling = { path = ".." }' >> async-io/Cargo.toml
# - name: Test async-io
# run: cargo test --manifest-path=async-io/Cargo.toml

View File

@ -33,10 +33,12 @@ use std::net::TcpListener;
let socket = TcpListener::bind("127.0.0.1:8000")?;
let key = 7; // arbitrary key identifying the socket
// File descriptors must be explicitly marked as non-blocking.
socket.set_nonblocking(true)?;
// Create a poller and register interest in readability on the socket.
let poller = Poller::new()?;
poller.insert(&socket)?;
poller.interest(&socket, Event::readable(key))?;
poller.add(&socket, Event::readable(key))?;
// The event loop.
let mut events = Vec::new();
@ -50,7 +52,7 @@ loop {
// Perform a non-blocking accept operation.
socket.accept()?;
// Set interest in the next readability event.
poller.interest(&socket, Event::readable(key))?;
poller.modify(&socket, Event::readable(key))?;
}
}
}

View File

@ -7,12 +7,13 @@ fn main() -> io::Result<()> {
let l1 = TcpListener::bind("127.0.0.1:8001")?;
let l2 = TcpListener::bind("127.0.0.1:8002")?;
let poller = Poller::new()?;
poller.insert(&l1)?;
poller.insert(&l2)?;
l1.set_nonblocking(true)?;
l2.set_nonblocking(true)?;
poller.interest(&l1, Event::readable(1))?;
poller.interest(&l2, Event::readable(2))?;
let poller = Poller::new()?;
poller.add(&l1, Event::readable(1))?;
poller.add(&l2, Event::readable(2))?;
let mut events = Vec::new();
loop {
@ -24,12 +25,12 @@ fn main() -> io::Result<()> {
1 => {
println!("Accept on l1");
l1.accept()?;
poller.interest(&l1, Event::readable(1))?;
poller.modify(&l1, Event::readable(1))?;
}
2 => {
println!("Accept on l2");
l2.accept()?;
poller.interest(&l2, Event::readable(2))?;
poller.modify(&l2, Event::readable(2))?;
}
_ => unreachable!(),
}

View File

@ -64,10 +64,10 @@ impl Poller {
};
if let Some(timer_fd) = timer_fd {
poller.insert(timer_fd)?;
poller.add(timer_fd, Event::none(crate::NOTIFY_KEY))?;
}
poller.insert(event_fd)?;
poller.interest(
poller.add(
event_fd,
Event {
key: crate::NOTIFY_KEY,
@ -85,52 +85,20 @@ impl Poller {
Ok(poller)
}
/// Inserts a file descriptor.
pub fn insert(&self, fd: RawFd) -> io::Result<()> {
log::trace!("insert: epoll_fd={}, fd={}", self.epoll_fd, fd);
// Put the file descriptor in non-blocking mode.
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
// Register the file descriptor in epoll.
let mut ev = libc::epoll_event {
events: libc::EPOLLONESHOT as _,
u64: crate::NOTIFY_KEY as u64,
};
syscall!(epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_ADD, fd, &mut ev))?;
Ok(())
/// Adds a new file descriptor.
pub fn add(&self, fd: RawFd, ev: Event) -> io::Result<()> {
log::trace!("add: epoll_fd={}, fd={}, ev={:?}", self.epoll_fd, fd, ev);
self.register(fd, libc::EPOLL_CTL_ADD, ev)
}
/// Sets interest in a read/write event on a file descriptor and associates a key with it.
pub fn interest(&self, fd: RawFd, ev: Event) -> io::Result<()> {
log::trace!(
"interest: epoll_fd={}, fd={}, ev={:?}",
self.epoll_fd,
fd,
ev
);
let mut flags = libc::EPOLLONESHOT;
if ev.readable {
flags |= read_flags();
}
if ev.writable {
flags |= write_flags();
}
let mut ev = libc::epoll_event {
events: flags as _,
u64: ev.key as u64,
};
syscall!(epoll_ctl(self.epoll_fd, libc::EPOLL_CTL_MOD, fd, &mut ev))?;
Ok(())
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> {
log::trace!("modify: epoll_fd={}, fd={}, ev={:?}", self.epoll_fd, fd, ev);
self.register(fd, libc::EPOLL_CTL_MOD, ev)
}
/// Removes a file descriptor.
pub fn remove(&self, fd: RawFd) -> io::Result<()> {
pub fn delete(&self, fd: RawFd) -> io::Result<()> {
log::trace!("remove: epoll_fd={}, fd={}", self.epoll_fd, fd);
syscall!(epoll_ctl(
@ -168,7 +136,7 @@ impl Poller {
))?;
// Set interest in timerfd.
self.interest(
self.modify(
timer_fd,
Event {
key: crate::NOTIFY_KEY,
@ -209,7 +177,7 @@ impl Poller {
&mut buf[0] as *mut u8 as *mut libc::c_void,
buf.len()
));
self.interest(
self.modify(
self.event_fd,
Event {
key: crate::NOTIFY_KEY,
@ -237,6 +205,27 @@ impl Poller {
));
Ok(())
}
fn register(&self, fd: RawFd, action: i32, ev: Event) -> io::Result<()> {
let mut flags = libc::EPOLLONESHOT;
if ev.readable {
flags |= read_flags();
}
if ev.writable {
flags |= write_flags();
}
let mut epoll_ev = libc::epoll_event {
events: flags as _,
u64: ev.key as u64,
};
syscall!(epoll_ctl(self.epoll_fd, action, fd, &mut epoll_ev))?;
Ok(())
}
}
impl Drop for Poller {
@ -249,10 +238,10 @@ impl Drop for Poller {
);
if let Some(timer_fd) = self.timer_fd {
let _ = self.remove(timer_fd);
let _ = self.delete(timer_fd);
let _ = syscall!(close(timer_fd));
}
let _ = self.remove(self.event_fd);
let _ = self.delete(self.event_fd);
let _ = syscall!(close(self.event_fd));
let _ = syscall!(close(self.epoll_fd));
}

View File

@ -35,7 +35,7 @@ impl Poller {
read_stream,
write_stream,
};
poller.interest(
poller.add(
poller.read_stream.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
@ -52,27 +52,10 @@ impl Poller {
Ok(poller)
}
/// Inserts a file descriptor.
pub fn insert(&self, fd: RawFd) -> io::Result<()> {
/// Adds a new file descriptor.
pub fn add(&self, fd: RawFd, ev: Event) -> io::Result<()> {
if fd != self.read_stream.as_raw_fd() {
log::trace!("insert: fd={}", fd);
}
// Put the file descriptor in non-blocking mode.
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
Ok(())
}
/// Sets interest in a read/write event on a file descriptor and associates a key with it.
pub fn interest(&self, fd: RawFd, ev: Event) -> io::Result<()> {
if fd != self.read_stream.as_raw_fd() {
log::trace!(
"interest: kqueue_fd={}, fd={}, ev={:?}",
self.kqueue_fd,
fd,
ev
);
log::trace!("add: kqueue_fd={}, fd={}, ev={:?}", self.kqueue_fd, fd, ev);
}
let mut read_flags = libc::EV_ONESHOT | libc::EV_RECEIPT;
@ -134,8 +117,15 @@ impl Poller {
Ok(())
}
/// Modifies an existing file descriptor.
pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> {
// Adding a file description that is already registered will just update the existing
// registration.
self.add(fd, ev)
}
/// Removes a file descriptor.
pub fn remove(&self, fd: RawFd) -> io::Result<()> {
pub fn delete(&self, fd: RawFd) -> io::Result<()> {
if fd != self.read_stream.as_raw_fd() {
log::trace!("remove: kqueue_fd={}, fd={}", self.kqueue_fd, fd);
}
@ -210,7 +200,7 @@ impl Poller {
// Clear the notification (if received) and re-register interest in it.
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
self.interest(
self.modify(
self.read_stream.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
@ -233,7 +223,7 @@ impl Poller {
impl Drop for Poller {
fn drop(&mut self) {
log::trace!("drop: kqueue_fd={}", self.kqueue_fd);
let _ = self.remove(self.read_stream.as_raw_fd());
let _ = self.delete(self.read_stream.as_raw_fd());
let _ = syscall!(close(self.kqueue_fd));
}
}

View File

@ -22,10 +22,11 @@
//! let socket = TcpListener::bind("127.0.0.1:8000")?;
//! let key = 7; // arbitrary key identifying the socket
//!
//! socket.set_nonblocking(true)?;
//!
//! // Create a poller and register interest in readability on the socket.
//! let poller = Poller::new()?;
//! poller.insert(&socket)?;
//! poller.interest(&socket, Event::readable(key))?;
//! poller.add(&socket, Event::readable(key))?;
//!
//! // The event loop.
//! let mut events = Vec::new();
@ -39,7 +40,7 @@
//! // Perform a non-blocking accept operation.
//! socket.accept()?;
//! // Set interest in the next readability event.
//! poller.interest(&socket, Event::readable(key))?;
//! poller.modify(&socket, Event::readable(key))?;
//! }
//! }
//! }
@ -100,6 +101,17 @@ cfg_if! {
}
}
macro_rules! verify_event_key {
($event:expr) => {{
if $event.key == usize::MAX {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"the key is not allowed to be `usize::MAX`",
));
}
}};
}
/// Key associated with notifications.
const NOTIFY_KEY: usize = std::usize::MAX;
@ -186,30 +198,7 @@ impl Poller {
})
}
/// Inserts a file descriptor or socket into the poller and puts it in non-blocking mode.
///
/// Before setting interest in readability or writability, the file descriptor or socket must
/// be inserted into the poller. This method also puts it in non-blocking mode.
///
/// Don't forget to [remove][`Poller::remove()`] it when it is no longer used!
///
/// # Examples
///
/// ```
/// use polling::Poller;
/// use std::net::TcpListener;
///
/// let poller = Poller::new()?;
/// let socket = TcpListener::bind("127.0.0.1:0")?;
///
/// poller.insert(&socket)?;
/// # std::io::Result::Ok(())
/// ```
pub fn insert(&self, source: impl Source) -> io::Result<()> {
self.poller.insert(source.raw())
}
/// Enables or disables interest in a file descriptor or socket.
/// Enables interest in a file descriptor or socket.
///
/// A file descriptor or socket is considered readable or writable when a read or write
/// operation on it would not block. This doesn't mean the read or write operation will
@ -223,11 +212,12 @@ impl Poller {
/// - `Event { key: 7, readable: true, writable: false }`
/// - `Event { key: 7, readable: false, writable: true }`
///
/// Don't forget to [delete][`Poller::delete()`] it when it is no longer used!
///
/// # Errors
///
/// This method returns an error in the following situations:
///
/// * If `source` was not [inserted][`Poller::interest()`] into the poller.
/// * If `key` equals `usize::MAX` because that key is reserved for internal use.
/// * If an error is returned by the syscall.
///
@ -240,7 +230,7 @@ impl Poller {
/// # let poller = Poller::new()?;
/// # let key = 7;
/// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
/// poller.interest(&source, Event::all(key))?;
/// poller.add(&source, Event::all(key))?;
/// # std::io::Result::Ok(())
/// ```
///
@ -251,7 +241,7 @@ impl Poller {
/// # let poller = Poller::new()?;
/// # let key = 7;
/// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
/// poller.interest(&source, Event::readable(key))?;
/// poller.add(&source, Event::readable(key))?;
/// # std::io::Result::Ok(())
/// ```
///
@ -262,7 +252,7 @@ impl Poller {
/// # let poller = Poller::new()?;
/// # let key = 7;
/// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
/// poller.interest(&source, Event::writable(key))?;
/// poller.add(&source, Event::writable(key))?;
/// # std::io::Result::Ok(())
/// ```
///
@ -273,40 +263,64 @@ impl Poller {
/// # let poller = Poller::new()?;
/// # let key = 7;
/// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
/// poller.interest(&source, Event::none(key))?;
/// poller.add(&source, Event::none(key))?;
/// # std::io::Result::Ok(())
/// ```
pub fn interest(&self, source: impl Source, event: Event) -> io::Result<()> {
if event.key == usize::MAX {
Err(io::Error::new(
io::ErrorKind::InvalidInput,
"the key is not allowed to be `usize::MAX`",
))
} else {
self.poller.interest(source.raw(), event)
}
pub fn add(&self, source: impl Source, event: Event) -> io::Result<()> {
verify_event_key!(event);
self.poller.add(source.raw(), event)
}
/// Modifies the interest of a file descriptor or socket.
///
/// This method has the same behaviour as [`add()`][`Poller::add()`] except it modifies the
/// interest of an already registered file descriptor or socket.
///
/// To use this method with a file descriptor, you must first add it using
/// [`add()`][`Poller::add()`].
///
/// # Examples
///
/// This will first register a socket for only writes, then modify the interest to both reads
/// and writes:
///
/// ```no_run
/// # use polling::{Event, Poller};
/// # let poller = Poller::new()?;
/// # let key = 7;
/// # let source = std::net::TcpListener::bind("127.0.0.1:0")?;
/// poller.add(&source, Event::writable(key))?;
/// poller.modify(&source, Event::all(key))?;
/// # std::io::Result::Ok(())
/// ```
pub fn modify(&self, source: impl Source, event: Event) -> io::Result<()> {
verify_event_key!(event);
self.poller.modify(source.raw(), event)
}
/// Removes a file descriptor or socket from the poller.
///
/// Unlike [`insert()`][`Poller::insert()`], this method only removes the file descriptor or
/// Unlike [`add()`][`Poller::add()`], this method only removes the file descriptor or
/// socket from the poller without putting it back into blocking mode.
///
/// # Examples
///
/// ```
/// use polling::Poller;
/// use polling::{Event, Poller};
/// use std::net::TcpListener;
///
/// let poller = Poller::new()?;
/// let socket = TcpListener::bind("127.0.0.1:0")?;
/// let key = 7;
///
/// poller.insert(&socket)?;
/// poller.remove(&socket)?;
/// socket.set_nonblocking(true)?;
///
/// poller.add(&socket, Event::all(key))?;
/// poller.delete(&socket)?;
/// # std::io::Result::Ok(())
/// ```
pub fn remove(&self, source: impl Source) -> io::Result<()> {
self.poller.remove(source.raw())
pub fn delete(&self, source: impl Source) -> io::Result<()> {
self.poller.delete(source.raw())
}
/// Waits for at least one I/O event and returns the number of new events.
@ -330,13 +344,16 @@ impl Poller {
/// # Examples
///
/// ```
/// use polling::Poller;
/// use polling::{Event, Poller};
/// use std::net::TcpListener;
/// use std::time::Duration;
///
/// let poller = Poller::new()?;
/// let socket = TcpListener::bind("127.0.0.1:0")?;
/// poller.insert(&socket)?;
/// let key = 7;
///
/// socket.set_nonblocking(true)?;
/// poller.add(&socket, Event::all(key))?;
///
/// let mut events = Vec::new();
/// let n = poller.wait(&mut events, Some(Duration::from_secs(1)))?;

View File

@ -36,7 +36,7 @@ impl Poller {
read_stream,
write_stream,
};
poller.interest(
poller.add(
poller.read_stream.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
@ -48,25 +48,8 @@ impl Poller {
Ok(poller)
}
/// Inserts a file descriptor.
pub fn insert(&self, fd: RawFd) -> io::Result<()> {
// Put the file descriptor in non-blocking mode.
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
syscall!(port_associate(
self.port_fd,
libc::PORT_SOURCE_FD,
fd as _,
0,
0 as _,
))?;
Ok(())
}
/// Sets interest in a read/write event on a file descriptor and associates a key with it.
pub fn interest(&self, fd: RawFd, ev: Event) -> io::Result<()> {
/// Adds a file descriptor.
pub fn add(&self, fd: RawFd, ev: Event) -> io::Result<()> {
let mut flags = 0;
if ev.readable {
flags |= libc::POLLIN;
@ -86,8 +69,14 @@ impl Poller {
Ok(())
}
pub fn modify(&self, fd: RawFd, ev: Event) -> io::Result<()> {
// Adding a file description that is already registered will just update the existing
// registration.
self.add(fd, ev)
}
/// Removes a file descriptor.
pub fn remove(&self, fd: RawFd) -> io::Result<()> {
pub fn delete(&self, fd: RawFd) -> io::Result<()> {
syscall!(port_dissociate(
self.port_fd,
libc::PORT_SOURCE_FD,
@ -131,7 +120,7 @@ impl Poller {
// Clear the notification (if received) and re-register interest in it.
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
self.interest(
self.modify(
self.read_stream.as_raw_fd(),
Event {
key: crate::NOTIFY_KEY,
@ -152,7 +141,7 @@ impl Poller {
impl Drop for Poller {
fn drop(&mut self) {
let _ = self.remove(self.read_stream.as_raw_fd());
let _ = self.delete(self.read_stream.as_raw_fd());
let _ = syscall!(close(self.port_fd));
}
}

View File

@ -47,73 +47,25 @@ impl Poller {
Ok(Poller { handle, notified })
}
/// Inserts a socket.
pub fn insert(&self, sock: RawSocket) -> io::Result<()> {
log::trace!("insert: handle={:?}, sock={}", self.handle, sock);
// Put the socket in non-blocking mode.
unsafe {
let mut nonblocking = true as ctypes::c_ulong;
let res = winsock2::ioctlsocket(
sock as winsock2::SOCKET,
winsock2::FIONBIO,
&mut nonblocking,
);
if res != 0 {
return Err(io::Error::last_os_error());
}
}
// Register the socket in wepoll.
let mut ev = we::epoll_event {
events: we::EPOLLONESHOT,
data: we::epoll_data {
u64: crate::NOTIFY_KEY as u64,
},
};
wepoll!(epoll_ctl(
self.handle,
we::EPOLL_CTL_ADD as ctypes::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
/// Adds a socket.
pub fn add(&self, sock: RawSocket, ev: Event) -> io::Result<()> {
log::trace!("add: handle={:?}, sock={}, ev={:?}", self.handle, sock, ev);
self.register(sock, we::EPOLL_CTL_ADD, ev)
}
/// Sets interest in a read/write event on a socket and associates a key with it.
pub fn interest(&self, sock: RawSocket, ev: Event) -> io::Result<()> {
/// Modifies a socket.
pub fn modify(&self, sock: RawSocket, ev: Event) -> io::Result<()> {
log::trace!(
"interest: handle={:?}, sock={}, ev={:?}",
"modify: handle={:?}, sock={}, ev={:?}",
self.handle,
sock,
ev
);
let mut flags = we::EPOLLONESHOT;
if ev.readable {
flags |= READ_FLAGS;
}
if ev.writable {
flags |= WRITE_FLAGS;
}
let mut ev = we::epoll_event {
events: flags as u32,
data: we::epoll_data { u64: ev.key as u64 },
};
wepoll!(epoll_ctl(
self.handle,
we::EPOLL_CTL_MOD as ctypes::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
self.register(sock, we::EPOLL_CTL_MOD, ev)
}
/// Removes a socket.
pub fn remove(&self, sock: RawSocket) -> io::Result<()> {
pub fn delete(&self, sock: RawSocket) -> io::Result<()> {
log::trace!("remove: handle={:?}, sock={}", self.handle, sock);
wepoll!(epoll_ctl(
self.handle,
@ -190,6 +142,32 @@ impl Poller {
}
Ok(())
}
fn register(&self, sock: RawSocket, action: u32, ev: Event) -> io::Result<()> {
let mut flags = we::EPOLLONESHOT;
if ev.readable {
flags |= READ_FLAGS;
}
if ev.writable {
flags |= WRITE_FLAGS;
}
let mut ev = we::epoll_event {
events: flags as u32,
data: we::epoll_data { u64: ev.key as u64 },
};
wepoll!(epoll_ctl(
self.handle,
action as ctypes::c_int,
sock as we::SOCKET,
&mut ev,
))?;
Ok(())
}
}
impl Drop for Poller {