Add logging

This commit is contained in:
Stjepan Glavina 2020-08-29 16:00:59 +02:00
parent 54ac671d8e
commit 654ddca8a6
5 changed files with 70 additions and 0 deletions

View File

@ -15,6 +15,7 @@ readme = "README.md"
[dependencies]
cfg-if = "0.1.10"
libc = "0.2.74"
log = "0.4.11"
[dev-dependencies]
doc-comment = "0.3"

View File

@ -81,11 +81,19 @@ impl Poller {
},
)?;
log::debug!(
"new: epoll_fd={}, event_fd={}, timer_fd={}",
epoll_fd,
event_fd,
timer_fd
);
Ok(poller)
}
/// Inserts a file descriptor.
pub fn insert(&self, fd: RawFd) -> io::Result<()> {
log::debug!("insert: epoll_fd={}, fd={}", self.epoll_fd, fd);
// Put the file descriptor in non-blocking mode.
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
@ -102,6 +110,13 @@ impl Poller {
/// Sets interest in a read/write event on a file descriptor and associates a key with it.
pub fn interest(&self, fd: RawFd, ev: Event) -> io::Result<()> {
log::debug!(
"interest: epoll_fd={}, fd={}, ev={:?}",
self.epoll_fd,
fd,
ev
);
let mut flags = libc::EPOLLONESHOT;
if ev.readable {
flags |= read_flags();
@ -121,6 +136,8 @@ impl Poller {
/// Removes a file descriptor.
pub fn remove(&self, fd: RawFd) -> io::Result<()> {
log::debug!("remove: epoll_fd={}, fd={}", self.epoll_fd, fd);
syscall!(epoll_ctl(
self.epoll_fd,
libc::EPOLL_CTL_DEL,
@ -132,6 +149,8 @@ impl Poller {
/// Waits for I/O events with an optional timeout.
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
log::debug!("wait: epoll_fd={}, timeout={:?}", self.epoll_fd, timeout);
// Configure the timeout using timerfd.
let new_val = libc::itimerspec {
it_interval: TS_ZERO,
@ -172,6 +191,7 @@ impl Poller {
timeout_ms,
))?;
events.len = res as usize;
log::trace!("new events: epoll_fd={}, res={}", self.epoll_fd, res);
// Clear the notification (if received) and re-register interest in it.
let mut buf = [0u8; 8];
@ -194,6 +214,12 @@ impl Poller {
/// Sends a notification to wake up the current or next `wait()` call.
pub fn notify(&self) -> io::Result<()> {
log::debug!(
"notify: epoll_fd={}, event_fd={}",
self.epoll_fd,
self.event_fd
);
let buf: [u8; 8] = 1u64.to_ne_bytes();
let _ = syscall!(write(
self.event_fd,
@ -206,6 +232,12 @@ impl Poller {
impl Drop for Poller {
fn drop(&mut self) {
log::debug!(
"drop: epoll_fd={}, event_fd={}, timer_fd={}",
self.epoll_fd,
self.event_fd,
self.timer_fd
);
let _ = self.remove(self.event_fd);
let _ = self.remove(self.timer_fd);
let _ = syscall!(close(self.event_fd));

View File

@ -45,11 +45,14 @@ impl Poller {
},
)?;
log::debug!("new: kqueue_fd={}", kqueue_fd);
Ok(poller)
}
/// Inserts a file descriptor.
pub fn insert(&self, fd: RawFd) -> io::Result<()> {
log::debug!("insert: fd={}", fd);
// Put the file descriptor in non-blocking mode.
let flags = syscall!(fcntl(fd, libc::F_GETFL))?;
syscall!(fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK))?;
@ -58,6 +61,13 @@ impl Poller {
/// Sets interest in a read/write event on a file descriptor and associates a key with it.
pub fn interest(&self, fd: RawFd, ev: Event) -> io::Result<()> {
log::debug!(
"interest: kqueue_fd={}, fd={}, ev={:?}",
self.kqueue_fd,
fd,
ev
);
let mut read_flags = libc::EV_ONESHOT | libc::EV_RECEIPT;
let mut write_flags = libc::EV_ONESHOT | libc::EV_RECEIPT;
if ev.readable {
@ -119,6 +129,8 @@ impl Poller {
/// Removes a file descriptor.
pub fn remove(&self, fd: RawFd) -> io::Result<()> {
log::debug!("remove: kqueue_fd={}, fd={}", self.kqueue_fd, fd);
// A list of changes for kqueue.
let changelist = [
libc::kevent {
@ -162,6 +174,8 @@ impl Poller {
/// Waits for I/O events with an optional timeout.
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
log::debug!("wait: kqueue_fd={}, timeout={:?}", self.kqueue_fd, timeout);
// Convert the `Duration` to `libc::timespec`.
let timeout = timeout.map(|t| libc::timespec {
tv_sec: t.as_secs() as libc::time_t,
@ -183,6 +197,7 @@ impl Poller {
}
))?;
events.len = res as usize;
log::trace!("new events: kqueue_fd={}, res={}", self.kqueue_fd, res);
// Clear the notification (if received) and re-register interest in it.
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
@ -200,6 +215,7 @@ impl Poller {
/// Sends a notification to wake up the current or next `wait()` call.
pub fn notify(&self) -> io::Result<()> {
log::debug!("notify: kqueue_fd={}", self.kqueue_fd);
let _ = (&self.write_stream).write(&[1]);
Ok(())
}
@ -207,6 +223,7 @@ impl Poller {
impl Drop for Poller {
fn drop(&mut self) {
log::debug!("drop: kqueue_fd={}", self.kqueue_fd);
let _ = self.remove(self.read_stream.as_raw_fd());
let _ = syscall!(close(self.kqueue_fd));
}

View File

@ -340,6 +340,8 @@ impl Poller {
/// # std::io::Result::Ok(())
/// ```
pub fn wait(&self, events: &mut Vec<Event>, timeout: Option<Duration>) -> io::Result<usize> {
log::debug!("Poller::wait(_, {:?})", timeout);
if let Ok(mut lock) = self.events.try_lock() {
// Wait for I/O events.
self.poller.wait(&mut lock, timeout)?;
@ -352,6 +354,7 @@ impl Poller {
events.extend(lock.iter().filter(|ev| ev.key != usize::MAX));
Ok(events.len() - len)
} else {
log::trace!("wait: skipping because another thread is already waiting on I/O");
Ok(0)
}
}
@ -379,6 +382,7 @@ impl Poller {
/// # std::io::Result::Ok(())
/// ```
pub fn notify(&self) -> io::Result<()> {
log::debug!("Poller::notify()");
if !self
.notified
.compare_and_swap(false, true, Ordering::SeqCst)

View File

@ -42,11 +42,14 @@ impl Poller {
return Err(io::Error::last_os_error());
}
let notified = AtomicBool::new(false);
log::debug!("new: handle={:?}", handle);
Ok(Poller { handle, notified })
}
/// Inserts a socket.
pub fn insert(&self, sock: RawSocket) -> io::Result<()> {
log::debug!("insert: handle={:?}, sock={}", self.handle, sock);
// Put the socket in non-blocking mode.
unsafe {
let mut nonblocking = true as libc::c_ulong;
@ -77,6 +80,13 @@ impl Poller {
/// Sets interest in a read/write event on a socket and associates a key with it.
pub fn interest(&self, sock: RawSocket, ev: Event) -> io::Result<()> {
log::debug!(
"interest: handle={:?}, sock={}, ev={:?}",
self.handle,
sock,
ev
);
let mut flags = we::EPOLLONESHOT;
if ev.readable {
flags |= READ_FLAGS;
@ -101,6 +111,7 @@ impl Poller {
/// Removes a socket.
pub fn remove(&self, sock: RawSocket) -> io::Result<()> {
log::debug!("remove: handle={:?}, sock={}", self.handle, sock);
wepoll!(epoll_ctl(
self.handle,
we::EPOLL_CTL_DEL as libc::c_int,
@ -117,6 +128,7 @@ impl Poller {
/// If a notification occurs, this method will return but the notification event will not be
/// included in the `events` list nor contribute to the returned count.
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<()> {
log::debug!("wait: handle={:?}, timeout={:?}", self.handle, timeout);
let deadline = timeout.map(|t| Instant::now() + t);
loop {
@ -140,6 +152,7 @@ impl Poller {
events.list.len() as libc::c_int,
timeout_ms,
))? as usize;
log::trace!("new events: handle={:?}, len={}", self.handle, events.len);
// Break if there was a notification or at least one event, or if deadline is reached.
if self.notified.swap(false, Ordering::SeqCst) || events.len > 0 || timeout_ms == 0 {
@ -152,6 +165,8 @@ impl Poller {
/// Sends a notification to wake up the current or next `wait()` call.
pub fn notify(&self) -> io::Result<()> {
log::debug!("notify: handle={:?}", self.handle);
if !self
.notified
.compare_and_swap(false, true, Ordering::SeqCst)
@ -176,6 +191,7 @@ impl Poller {
impl Drop for Poller {
fn drop(&mut self) {
log::debug!("drop: handle={:?}", self.handle);
unsafe {
we::epoll_close(self.handle);
}