From 35e3bbfe4cb88c0d6b4e424663dea05704266886 Mon Sep 17 00:00:00 2001 From: Stjepan Glavina Date: Sun, 21 Jun 2020 18:29:52 +0200 Subject: [PATCH] Remove IoEvent --- src/io_event.rs | 169 ------------------------------------------------ src/lib.rs | 1 - src/reactor.rs | 48 +++++++------- 3 files changed, 25 insertions(+), 193 deletions(-) delete mode 100644 src/io_event.rs diff --git a/src/io_event.rs b/src/io_event.rs deleted file mode 100644 index 40b4652..0000000 --- a/src/io_event.rs +++ /dev/null @@ -1,169 +0,0 @@ -//! An I/O object for waking up threads blocked on the reactor. -//! -//! We use the self-pipe trick explained [here](https://cr.yp.to/docs/selfpipe.html). -//! -//! On Unix systems, the self-pipe is a pair of unnamed connected sockets. On Windows, the -//! self-pipe is a pair of TCP sockets connected over localhost. - -use std::io::{self, Read, Write}; -#[cfg(windows)] -use std::net::SocketAddr; -use std::sync::atomic::{self, Ordering}; -use std::sync::Arc; - -#[cfg(not(target_os = "linux"))] -use socket2::{Domain, Socket, Type}; - -use crate::async_io::Async; - -#[cfg(not(target_os = "linux"))] -type Notifier = Socket; - -#[cfg(target_os = "linux")] -type Notifier = linux::EventFd; - -/// A self-pipe. -struct Inner { - /// The writer side, emptied by `clear()`. - writer: Notifier, - - /// The reader side, filled by `notify()`. - reader: Async, -} - -/// A flag that that triggers an I/O event whenever it is set. -#[derive(Clone)] -pub(crate) struct IoEvent(Arc); - -impl IoEvent { - /// Creates a new `IoEvent`. - pub fn new() -> io::Result { - let (writer, reader) = notifier()?; - - Ok(IoEvent(Arc::new(Inner { - writer, - reader: Async::new(reader)?, - }))) - } - - /// Sets the flag to `true`. - pub fn notify(&self) { - // Publish all in-memory changes before setting the flag. - atomic::fence(Ordering::SeqCst); - - // Trigger an I/O event by writing a byte into the sending socket. - let _ = (&self.0.writer).write(&1u64.to_ne_bytes()); - let _ = (&self.0.writer).flush(); - - // Re-register to wake up the poller. - let _ = self.0.reader.reregister_io_event(); - } - - /// Sets the flag to `false`. - pub fn clear(&self) { - // Read all available bytes from the receiving socket. - while self.0.reader.get_ref().read(&mut [0; 64]).is_ok() {} - - // Publish all in-memory changes after clearing the flag. - atomic::fence(Ordering::SeqCst); - } -} - -/// Creates a pair of connected sockets. -#[cfg(all(unix, not(target_os = "linux")))] -fn notifier() -> io::Result<(Socket, Socket)> { - let (sock1, sock2) = Socket::pair(Domain::unix(), Type::stream(), None)?; - sock1.set_nonblocking(true)?; - sock2.set_nonblocking(true)?; - - sock1.set_send_buffer_size(1)?; - sock2.set_recv_buffer_size(1)?; - - Ok((sock1, sock2)) -} - -#[cfg(target_os = "linux")] -mod linux { - use super::*; - use crate::sys::eventfd::eventfd; - use crate::sys::unistd; - use std::os::unix::io::AsRawFd; - - pub(crate) struct EventFd(std::os::unix::io::RawFd); - - impl EventFd { - pub fn new() -> Result { - let fd = eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)?; - Ok(EventFd(fd)) - } - - pub fn try_clone(&self) -> Result { - unistd::dup(self.0).map(EventFd) - } - } - - impl AsRawFd for EventFd { - fn as_raw_fd(&self) -> i32 { - self.0 - } - } - - impl Drop for EventFd { - fn drop(&mut self) { - let _ = unistd::close(self.0); - } - } - - impl Read for &EventFd { - #[inline] - fn read(&mut self, buf: &mut [u8]) -> std::result::Result { - unistd::read(self.0, buf) - } - } - - impl Write for &EventFd { - #[inline] - fn write(&mut self, buf: &[u8]) -> std::result::Result { - unistd::write(self.0, buf) - } - - #[inline] - fn flush(&mut self) -> std::result::Result<(), std::io::Error> { - Ok(()) - } - } -} - -/// Creates eventfd on linux. -#[cfg(target_os = "linux")] -fn notifier() -> io::Result<(Notifier, Notifier)> { - use linux::EventFd; - let sock1 = EventFd::new()?; - let sock2 = sock1.try_clone()?; - Ok((sock1, sock2)) -} - -/// Creates a pair of connected sockets. -#[cfg(windows)] -fn notifier() -> io::Result<(Notifier, Notifier)> { - // Create a temporary listener. - let listener = Socket::new(Domain::ipv4(), Type::stream(), None)?; - listener.bind(&SocketAddr::from(([127, 0, 0, 1], 0)).into())?; - listener.listen(1)?; - - // First socket: start connecting to the listener. - let sock1 = Socket::new(Domain::ipv4(), Type::stream(), None)?; - sock1.set_nonblocking(true)?; - let _ = sock1.set_nodelay(true)?; - let _ = sock1.connect(&listener.local_addr()?); - - // Second socket: accept a connection from the listener. - let (sock2, _) = listener.accept()?; - sock2.set_nonblocking(true)?; - let _ = sock2.set_nodelay(true)?; - - sock1.set_send_buffer_size(1)?; - sock2.set_recv_buffer_size(1)?; - - Ok((sock1, sock2)) -} diff --git a/src/lib.rs b/src/lib.rs index cb542bc..400bafe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -119,7 +119,6 @@ mod async_io; mod block_on; mod blocking; mod context; -mod io_event; mod multitask; mod parking; mod reactor; diff --git a/src/reactor.rs b/src/reactor.rs index 5942692..6c9fb88 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -474,7 +474,6 @@ mod sys { use once_cell::sync::Lazy; - use crate::io_event::IoEvent; use crate::sys::epoll::{ epoll_create1, epoll_ctl, epoll_wait, EpollEvent, EpollFlags, EpollOp, }; @@ -596,29 +595,33 @@ mod sys { target_os = "dragonfly", ))] mod sys { - use std::io; - use std::os::unix::io::RawFd; + use std::io::{self, Read, Write}; + use std::os::unix::io::{AsRawFd, RawFd}; + use std::os::unix::net::UnixStream; use std::time::Duration; - use once_cell::sync::Lazy; - - use crate::io_event::IoEvent; use crate::sys::event::{kevent_ts, kqueue, KEvent}; use crate::sys::fcntl::{fcntl, FcntlArg}; pub struct Reactor { kqueue_fd: RawFd, - io_event: Lazy, + read_stream: UnixStream, + write_stream: UnixStream, } impl Reactor { pub fn new() -> io::Result { let kqueue_fd = kqueue()?; fcntl(kqueue_fd, FcntlArg::F_SETFD(libc::FD_CLOEXEC))?; - let io_event = Lazy::::new(|| IoEvent::new().unwrap()); - Ok(Reactor { + let (read_stream, write_stream) = UnixStream::pair()?; + read_stream.set_nonblocking(true)?; + write_stream.set_nonblocking(true)?; + let reactor = Reactor { kqueue_fd, - io_event, - }) + read_stream, + write_stream, + }; + reactor.reregister(reactor.read_stream.as_raw_fd(), !0, true, false)?; + Ok(reactor) } pub fn register(&self, _fd: RawFd, _key: usize) -> io::Result<()> { Ok(()) @@ -678,11 +681,14 @@ mod sys { tv_nsec: t.subsec_nanos() as libc::c_long, }); events.len = kevent_ts(self.kqueue_fd, &[], &mut events.list, timeout)?; - self.io_event.clear(); + + while (&self.read_stream).read(&mut [0; 64]).is_ok() {} + self.reregister(self.read_stream.as_raw_fd(), !0, true, false)?; + Ok(events.len) } pub fn notify(&self) -> io::Result<()> { - self.io_event.notify(); + let _ = (&self.write_stream).write(&[1]); Ok(()) } } @@ -764,10 +770,10 @@ mod sys { ) -> io::Result<()> { let mut flags = EPOLLONESHOT; if read { - flags |= read_flags(); + flags |= READ_FLAGS; } if write { - flags |= write_flags(); + flags |= WRITE_FLAGS; } let mut ev = epoll_event { events: flags as u32, @@ -843,12 +849,8 @@ mod sys { self.0 } } - fn read_flags() -> u32 { - EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR | EPOLLPRI - } - fn write_flags() -> u32 { - EPOLLOUT | EPOLLHUP | EPOLLERR - } + const READ_FLAGS: u32 = EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR | EPOLLPRI; + const WRITE_FLAGS: u32 = EPOLLOUT | EPOLLHUP | EPOLLERR; pub struct Events { list: Box<[epoll_event]>, @@ -869,8 +871,8 @@ mod sys { } pub fn iter(&self) -> impl Iterator + '_ { self.list[..self.len].iter().map(|ev| Event { - readable: (ev.events & read_flags()) != 0, - writable: (ev.events & write_flags()) != 0, + readable: (ev.events & READ_FLAGS) != 0, + writable: (ev.events & WRITE_FLAGS) != 0, key: unsafe { ev.data.u64 } as usize, }) }