mirror of https://github.com/stjepang/smol
Remove IoEvent
This commit is contained in:
parent
993e90d35d
commit
35e3bbfe4c
169
src/io_event.rs
169
src/io_event.rs
|
@ -1,169 +0,0 @@
|
|||
//! An I/O object for waking up threads blocked on the reactor.
|
||||
//!
|
||||
//! We use the self-pipe trick explained [here](https://cr.yp.to/docs/selfpipe.html).
|
||||
//!
|
||||
//! On Unix systems, the self-pipe is a pair of unnamed connected sockets. On Windows, the
|
||||
//! self-pipe is a pair of TCP sockets connected over localhost.
|
||||
|
||||
use std::io::{self, Read, Write};
|
||||
#[cfg(windows)]
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::atomic::{self, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
use socket2::{Domain, Socket, Type};
|
||||
|
||||
use crate::async_io::Async;
|
||||
|
||||
#[cfg(not(target_os = "linux"))]
|
||||
type Notifier = Socket;
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
type Notifier = linux::EventFd;
|
||||
|
||||
/// A self-pipe.
|
||||
struct Inner {
|
||||
/// The writer side, emptied by `clear()`.
|
||||
writer: Notifier,
|
||||
|
||||
/// The reader side, filled by `notify()`.
|
||||
reader: Async<Notifier>,
|
||||
}
|
||||
|
||||
/// A flag that that triggers an I/O event whenever it is set.
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct IoEvent(Arc<Inner>);
|
||||
|
||||
impl IoEvent {
|
||||
/// Creates a new `IoEvent`.
|
||||
pub fn new() -> io::Result<IoEvent> {
|
||||
let (writer, reader) = notifier()?;
|
||||
|
||||
Ok(IoEvent(Arc::new(Inner {
|
||||
writer,
|
||||
reader: Async::new(reader)?,
|
||||
})))
|
||||
}
|
||||
|
||||
/// Sets the flag to `true`.
|
||||
pub fn notify(&self) {
|
||||
// Publish all in-memory changes before setting the flag.
|
||||
atomic::fence(Ordering::SeqCst);
|
||||
|
||||
// Trigger an I/O event by writing a byte into the sending socket.
|
||||
let _ = (&self.0.writer).write(&1u64.to_ne_bytes());
|
||||
let _ = (&self.0.writer).flush();
|
||||
|
||||
// Re-register to wake up the poller.
|
||||
let _ = self.0.reader.reregister_io_event();
|
||||
}
|
||||
|
||||
/// Sets the flag to `false`.
|
||||
pub fn clear(&self) {
|
||||
// Read all available bytes from the receiving socket.
|
||||
while self.0.reader.get_ref().read(&mut [0; 64]).is_ok() {}
|
||||
|
||||
// Publish all in-memory changes after clearing the flag.
|
||||
atomic::fence(Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a pair of connected sockets.
|
||||
#[cfg(all(unix, not(target_os = "linux")))]
|
||||
fn notifier() -> io::Result<(Socket, Socket)> {
|
||||
let (sock1, sock2) = Socket::pair(Domain::unix(), Type::stream(), None)?;
|
||||
sock1.set_nonblocking(true)?;
|
||||
sock2.set_nonblocking(true)?;
|
||||
|
||||
sock1.set_send_buffer_size(1)?;
|
||||
sock2.set_recv_buffer_size(1)?;
|
||||
|
||||
Ok((sock1, sock2))
|
||||
}
|
||||
|
||||
#[cfg(target_os = "linux")]
|
||||
mod linux {
|
||||
use super::*;
|
||||
use crate::sys::eventfd::eventfd;
|
||||
use crate::sys::unistd;
|
||||
use std::os::unix::io::AsRawFd;
|
||||
|
||||
pub(crate) struct EventFd(std::os::unix::io::RawFd);
|
||||
|
||||
impl EventFd {
|
||||
pub fn new() -> Result<Self, std::io::Error> {
|
||||
let fd = eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK)?;
|
||||
Ok(EventFd(fd))
|
||||
}
|
||||
|
||||
pub fn try_clone(&self) -> Result<EventFd, io::Error> {
|
||||
unistd::dup(self.0).map(EventFd)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRawFd for EventFd {
|
||||
fn as_raw_fd(&self) -> i32 {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for EventFd {
|
||||
fn drop(&mut self) {
|
||||
let _ = unistd::close(self.0);
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for &EventFd {
|
||||
#[inline]
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::result::Result<usize, std::io::Error> {
|
||||
unistd::read(self.0, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for &EventFd {
|
||||
#[inline]
|
||||
fn write(&mut self, buf: &[u8]) -> std::result::Result<usize, std::io::Error> {
|
||||
unistd::write(self.0, buf)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn flush(&mut self) -> std::result::Result<(), std::io::Error> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates eventfd on linux.
|
||||
#[cfg(target_os = "linux")]
|
||||
fn notifier() -> io::Result<(Notifier, Notifier)> {
|
||||
use linux::EventFd;
|
||||
let sock1 = EventFd::new()?;
|
||||
let sock2 = sock1.try_clone()?;
|
||||
Ok((sock1, sock2))
|
||||
}
|
||||
|
||||
/// Creates a pair of connected sockets.
|
||||
#[cfg(windows)]
|
||||
fn notifier() -> io::Result<(Notifier, Notifier)> {
|
||||
// Create a temporary listener.
|
||||
let listener = Socket::new(Domain::ipv4(), Type::stream(), None)?;
|
||||
listener.bind(&SocketAddr::from(([127, 0, 0, 1], 0)).into())?;
|
||||
listener.listen(1)?;
|
||||
|
||||
// First socket: start connecting to the listener.
|
||||
let sock1 = Socket::new(Domain::ipv4(), Type::stream(), None)?;
|
||||
sock1.set_nonblocking(true)?;
|
||||
let _ = sock1.set_nodelay(true)?;
|
||||
let _ = sock1.connect(&listener.local_addr()?);
|
||||
|
||||
// Second socket: accept a connection from the listener.
|
||||
let (sock2, _) = listener.accept()?;
|
||||
sock2.set_nonblocking(true)?;
|
||||
let _ = sock2.set_nodelay(true)?;
|
||||
|
||||
sock1.set_send_buffer_size(1)?;
|
||||
sock2.set_recv_buffer_size(1)?;
|
||||
|
||||
Ok((sock1, sock2))
|
||||
}
|
|
@ -119,7 +119,6 @@ mod async_io;
|
|||
mod block_on;
|
||||
mod blocking;
|
||||
mod context;
|
||||
mod io_event;
|
||||
mod multitask;
|
||||
mod parking;
|
||||
mod reactor;
|
||||
|
|
|
@ -474,7 +474,6 @@ mod sys {
|
|||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::io_event::IoEvent;
|
||||
use crate::sys::epoll::{
|
||||
epoll_create1, epoll_ctl, epoll_wait, EpollEvent, EpollFlags, EpollOp,
|
||||
};
|
||||
|
@ -596,29 +595,33 @@ mod sys {
|
|||
target_os = "dragonfly",
|
||||
))]
|
||||
mod sys {
|
||||
use std::io;
|
||||
use std::os::unix::io::RawFd;
|
||||
use std::io::{self, Read, Write};
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::os::unix::net::UnixStream;
|
||||
use std::time::Duration;
|
||||
|
||||
use once_cell::sync::Lazy;
|
||||
|
||||
use crate::io_event::IoEvent;
|
||||
use crate::sys::event::{kevent_ts, kqueue, KEvent};
|
||||
use crate::sys::fcntl::{fcntl, FcntlArg};
|
||||
|
||||
pub struct Reactor {
|
||||
kqueue_fd: RawFd,
|
||||
io_event: Lazy<IoEvent>,
|
||||
read_stream: UnixStream,
|
||||
write_stream: UnixStream,
|
||||
}
|
||||
impl Reactor {
|
||||
pub fn new() -> io::Result<Reactor> {
|
||||
let kqueue_fd = kqueue()?;
|
||||
fcntl(kqueue_fd, FcntlArg::F_SETFD(libc::FD_CLOEXEC))?;
|
||||
let io_event = Lazy::<IoEvent>::new(|| IoEvent::new().unwrap());
|
||||
Ok(Reactor {
|
||||
let (read_stream, write_stream) = UnixStream::pair()?;
|
||||
read_stream.set_nonblocking(true)?;
|
||||
write_stream.set_nonblocking(true)?;
|
||||
let reactor = Reactor {
|
||||
kqueue_fd,
|
||||
io_event,
|
||||
})
|
||||
read_stream,
|
||||
write_stream,
|
||||
};
|
||||
reactor.reregister(reactor.read_stream.as_raw_fd(), !0, true, false)?;
|
||||
Ok(reactor)
|
||||
}
|
||||
pub fn register(&self, _fd: RawFd, _key: usize) -> io::Result<()> {
|
||||
Ok(())
|
||||
|
@ -678,11 +681,14 @@ mod sys {
|
|||
tv_nsec: t.subsec_nanos() as libc::c_long,
|
||||
});
|
||||
events.len = kevent_ts(self.kqueue_fd, &[], &mut events.list, timeout)?;
|
||||
self.io_event.clear();
|
||||
|
||||
while (&self.read_stream).read(&mut [0; 64]).is_ok() {}
|
||||
self.reregister(self.read_stream.as_raw_fd(), !0, true, false)?;
|
||||
|
||||
Ok(events.len)
|
||||
}
|
||||
pub fn notify(&self) -> io::Result<()> {
|
||||
self.io_event.notify();
|
||||
let _ = (&self.write_stream).write(&[1]);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
@ -764,10 +770,10 @@ mod sys {
|
|||
) -> io::Result<()> {
|
||||
let mut flags = EPOLLONESHOT;
|
||||
if read {
|
||||
flags |= read_flags();
|
||||
flags |= READ_FLAGS;
|
||||
}
|
||||
if write {
|
||||
flags |= write_flags();
|
||||
flags |= WRITE_FLAGS;
|
||||
}
|
||||
let mut ev = epoll_event {
|
||||
events: flags as u32,
|
||||
|
@ -843,12 +849,8 @@ mod sys {
|
|||
self.0
|
||||
}
|
||||
}
|
||||
fn read_flags() -> u32 {
|
||||
EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR | EPOLLPRI
|
||||
}
|
||||
fn write_flags() -> u32 {
|
||||
EPOLLOUT | EPOLLHUP | EPOLLERR
|
||||
}
|
||||
const READ_FLAGS: u32 = EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR | EPOLLPRI;
|
||||
const WRITE_FLAGS: u32 = EPOLLOUT | EPOLLHUP | EPOLLERR;
|
||||
|
||||
pub struct Events {
|
||||
list: Box<[epoll_event]>,
|
||||
|
@ -869,8 +871,8 @@ mod sys {
|
|||
}
|
||||
pub fn iter(&self) -> impl Iterator<Item = Event> + '_ {
|
||||
self.list[..self.len].iter().map(|ev| Event {
|
||||
readable: (ev.events & read_flags()) != 0,
|
||||
writable: (ev.events & write_flags()) != 0,
|
||||
readable: (ev.events & READ_FLAGS) != 0,
|
||||
writable: (ev.events & WRITE_FLAGS) != 0,
|
||||
key: unsafe { ev.data.u64 } as usize,
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue