mirror of https://github.com/smol-rs/smol
516 lines
17 KiB
Rust
516 lines
17 KiB
Rust
//! The reactor, async I/O, and timers.
|
|
|
|
#[cfg(not(any(
|
|
target_os = "linux", // epoll
|
|
target_os = "android", // epoll
|
|
target_os = "macos", // kqueue
|
|
target_os = "ios", // kqueue
|
|
target_os = "freebsd", // kqueue
|
|
target_os = "netbsd", // kqueue
|
|
target_os = "openbsd", // kqueue
|
|
target_os = "dragonfly", // kqueue
|
|
target_os = "windows", // wepoll
|
|
)))]
|
|
compile_error!("reactor does not support this target OS");
|
|
|
|
use std::collections::BTreeMap;
|
|
use std::fmt::Debug;
|
|
use std::io;
|
|
use std::mem;
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
use std::sync::Arc;
|
|
use std::task::{Context, Poll, Waker};
|
|
use std::time::{Duration, Instant};
|
|
|
|
#[cfg(unix)]
|
|
use std::os::unix::io::RawFd;
|
|
|
|
#[cfg(windows)]
|
|
use std::os::windows::io::RawSocket;
|
|
|
|
use once_cell::sync::Lazy;
|
|
use slab::Slab;
|
|
|
|
use crate::io_event::IoEvent;
|
|
use crate::throttle;
|
|
|
|
/// The reactor driving I/O events and timers.
|
|
///
|
|
/// Every async I/O handle ("source") and every timer is registered here. Invocations of `run()`
|
|
/// poll the reactor to check for new events every now and then.
|
|
///
|
|
/// There is only one global instance of this type, accessible by `Reactor::get()`.
|
|
pub(crate) struct Reactor {
|
|
/// Raw bindings to epoll/kqueue/wepoll.
|
|
sys: sys::Reactor,
|
|
|
|
/// Registered sources.
|
|
sources: piper::Lock<Slab<Arc<Source>>>,
|
|
|
|
/// Temporary storage for I/O events when polling the reactor.
|
|
events: piper::Mutex<sys::Events>,
|
|
|
|
/// An ordered map of registered timers.
|
|
///
|
|
/// Timers are in the order in which they fire. The `u64` in this type is a unique timer ID
|
|
/// used to distinguish timers that fire at the same time. The `Waker` represents the task
|
|
/// awaiting the timer.
|
|
timers: piper::Lock<BTreeMap<(Instant, u64), Waker>>,
|
|
|
|
/// An I/O event that is triggered when a new earliest timer is registered.
|
|
///
|
|
/// This is used to wake up the thread waiting on the reactor, which would otherwise wait until
|
|
/// the previously earliest timer.
|
|
///
|
|
/// The reason why this field is lazily created is because `IoEvent`s can be created only after
|
|
/// the reactor is fully initialized.
|
|
event: Lazy<IoEvent>,
|
|
}
|
|
|
|
impl Reactor {
|
|
/// Returns a reference to the reactor.
|
|
pub fn get() -> &'static Reactor {
|
|
static REACTOR: Lazy<Reactor> = Lazy::new(|| Reactor {
|
|
sys: sys::Reactor::new().expect("cannot initialize I/O event notification"),
|
|
sources: piper::Lock::new(Slab::new()),
|
|
events: piper::Mutex::new(sys::Events::new()),
|
|
timers: piper::Lock::new(BTreeMap::new()),
|
|
event: Lazy::new(|| IoEvent::new().expect("cannot create an `IoEvent`")),
|
|
});
|
|
&REACTOR
|
|
}
|
|
|
|
/// Registers an I/O source in the reactor.
|
|
pub fn insert_io(
|
|
&self,
|
|
#[cfg(unix)] raw: RawFd,
|
|
#[cfg(windows)] raw: RawSocket,
|
|
) -> io::Result<Arc<Source>> {
|
|
let mut sources = self.sources.lock();
|
|
let vacant = sources.vacant_entry();
|
|
|
|
#[cfg(unix)]
|
|
{
|
|
use nix::fcntl::{fcntl, FcntlArg, OFlag};
|
|
|
|
// Put the I/O handle in non-blocking mode.
|
|
let flags = fcntl(raw, FcntlArg::F_GETFL).map_err(io_err)?;
|
|
let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
|
|
fcntl(raw, FcntlArg::F_SETFL(flags)).map_err(io_err)?;
|
|
}
|
|
|
|
#[cfg(windows)]
|
|
{
|
|
// Put the I/O handle in non-blocking mode.
|
|
let socket = unsafe { Socket::from_raw_socket(raw) };
|
|
mem::ManuallyDrop::new(socket).set_nonblocking(true)?;
|
|
}
|
|
|
|
// Create a source and register it.
|
|
let source = Arc::new(Source {
|
|
raw,
|
|
key: vacant.key(),
|
|
wakers: piper::Lock::new(Vec::new()),
|
|
tick: AtomicU64::new(0),
|
|
});
|
|
self.sys.register(raw, source.key)?;
|
|
|
|
Ok(vacant.insert(source).clone())
|
|
}
|
|
|
|
/// Deregisters an I/O source from the reactor.
|
|
pub fn remove_io(&self, source: &Source) -> io::Result<()> {
|
|
let mut sources = self.sources.lock();
|
|
sources.remove(source.key);
|
|
self.sys.deregister(source.raw)
|
|
}
|
|
|
|
/// TODO
|
|
pub fn insert_timer(&self, when: Instant, waker: Waker) -> u64 {
|
|
let mut timers = self.timers.lock();
|
|
|
|
// Generate a new timer ID.
|
|
static ID_GENERATOR: AtomicU64 = AtomicU64::new(1);
|
|
let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
|
|
assert!(id < u64::max_value() / 2, "exhausted timer IDs");
|
|
|
|
// Insert this timer into the timers map.
|
|
timers.insert((when, id), waker);
|
|
|
|
// If this timer is now the earliest one, interrupt the reactor.
|
|
if timers.keys().next().map(|(when, _)| *when) == Some(when) {
|
|
self.event.set();
|
|
}
|
|
|
|
id
|
|
}
|
|
|
|
/// TODO
|
|
pub fn remove_timer(&self, when: Instant, id: u64) {
|
|
self.timers.lock().remove(&(when, id));
|
|
}
|
|
|
|
/// Processes ready events without blocking.
|
|
///
|
|
/// This doesn't have strong guarantees. If there are ready events, they may or may not be
|
|
/// processed depending on whether the reactor is locked.
|
|
pub fn poll(&self) -> io::Result<()> {
|
|
if let Some(events) = self.events.try_lock() {
|
|
let reactor = self;
|
|
let mut lock = ReactorLock { reactor, events };
|
|
// React to events without blocking.
|
|
lock.react(false)?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Locks the reactor.
|
|
pub async fn lock(&self) -> ReactorLock<'_> {
|
|
let reactor = self;
|
|
let events = self.events.lock().await;
|
|
ReactorLock { reactor, events }
|
|
}
|
|
}
|
|
|
|
/// Polls the reactor for I/O events and wakes up tasks.
|
|
pub(crate) struct ReactorLock<'a> {
|
|
reactor: &'a Reactor,
|
|
events: piper::MutexGuard<'a, sys::Events>,
|
|
}
|
|
|
|
impl ReactorLock<'_> {
|
|
/// Blocks until at least one event is processed.
|
|
pub fn wait(&mut self) -> io::Result<()> {
|
|
self.react(true)
|
|
}
|
|
|
|
fn react(&mut self, block: bool) -> io::Result<()> {
|
|
// TODO: document this function.......................
|
|
self.reactor.event.clear();
|
|
|
|
let timeout = {
|
|
// Split timers into ready and pending timers.
|
|
let mut timers = self.reactor.timers.lock();
|
|
let now = Instant::now();
|
|
let pending = timers.split_off(&(now, 0));
|
|
let ready = mem::replace(&mut *timers, pending);
|
|
|
|
let timeout = if ready.is_empty() && block {
|
|
// Calculate the timeout till the first timer fires.
|
|
timers.keys().next().map(|(when, _)| when.saturating_duration_since(now))
|
|
} else {
|
|
// If there are ready timers or this poll doesn't block, the timeout is zero.
|
|
Some(Duration::from_secs(0))
|
|
};
|
|
|
|
// Wake up tasks waiting on timers.
|
|
for (_, waker) in ready {
|
|
waker.wake();
|
|
}
|
|
|
|
timeout
|
|
};
|
|
|
|
// Block on I/O events.
|
|
loop {
|
|
match self.reactor.sys.wait(&mut self.events, timeout) {
|
|
Ok(0) => return Ok(()),
|
|
Ok(_) => break,
|
|
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
|
|
Err(err) => return Err(err),
|
|
}
|
|
}
|
|
|
|
// Iterate over sources in the event list.
|
|
let sources = self.reactor.sources.lock();
|
|
for source in self.events.iter().filter_map(|i| sources.get(i)) {
|
|
// I/O events may deregister sources, so we need to re-register.
|
|
self.reactor.sys.reregister(source.raw, source.key)?;
|
|
|
|
let mut wakers = source.wakers.lock();
|
|
let tick = source.tick.load(Ordering::Acquire);
|
|
source.tick.store(tick.wrapping_add(1), Ordering::Release);
|
|
|
|
// Wake up tasks waiting on I/O.
|
|
for w in wakers.drain(..) {
|
|
w.wake();
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
/// A registered source of I/O events.
|
|
#[derive(Debug)]
|
|
pub(crate) struct Source {
|
|
/// Raw file descriptor on Unix platforms.
|
|
#[cfg(unix)]
|
|
pub(crate) raw: RawFd,
|
|
|
|
/// Raw socket handle on Windows.
|
|
#[cfg(windows)]
|
|
pub(crate) raw: RawSocket,
|
|
|
|
/// The ID of this source obtain during registration.
|
|
key: usize,
|
|
|
|
/// A list of wakers representing tasks interested in events on this source.
|
|
wakers: piper::Lock<Vec<Waker>>,
|
|
|
|
/// Incremented on every I/O notification - this is only used for synchronization.
|
|
tick: AtomicU64,
|
|
}
|
|
|
|
impl Source {
|
|
/// Attempts a non-blocking I/O operation and registers a waker if it errors with `WouldBlock`.
|
|
pub fn poll_io<R>(
|
|
&self,
|
|
cx: &mut Context<'_>,
|
|
mut op: impl FnMut() -> io::Result<R>,
|
|
) -> Poll<io::Result<R>> {
|
|
// Throttle if the current task did too many I/O operations without yielding.
|
|
futures::ready!(throttle::poll(cx));
|
|
|
|
loop {
|
|
// This number is bumped just before I/O notifications while wakers are locked.
|
|
let tick = self.tick.load(Ordering::Acquire);
|
|
|
|
// Attempt the non-blocking operation.
|
|
match op() {
|
|
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
|
|
res => return Poll::Ready(res),
|
|
}
|
|
|
|
// Lock the waker list and retry the non-blocking operation.
|
|
let mut wakers = self.wakers.lock();
|
|
|
|
// If the current task is already registered, return.
|
|
if wakers.iter().any(|w| w.will_wake(cx.waker())) {
|
|
return Poll::Pending;
|
|
}
|
|
|
|
// If there were no new notifications, register and return.
|
|
if self.tick.load(Ordering::Acquire) == tick {
|
|
wakers.push(cx.waker().clone());
|
|
return Poll::Pending;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Converts a `nix::Error` into `std::io::Error`.
|
|
#[cfg(unix)]
|
|
fn io_err(err: nix::Error) -> io::Error {
|
|
match err {
|
|
nix::Error::Sys(code) => code.into(),
|
|
err => io::Error::new(io::ErrorKind::Other, Box::new(err)),
|
|
}
|
|
}
|
|
|
|
/// Bindings to epoll (Linux, Android).
|
|
#[cfg(any(target_os = "linux", target_os = "android"))]
|
|
mod sys {
|
|
use std::convert::TryInto;
|
|
use std::io;
|
|
use std::os::unix::io::RawFd;
|
|
use std::time::Duration;
|
|
|
|
use nix::sys::epoll::{
|
|
epoll_create1, epoll_ctl, epoll_wait, EpollCreateFlags, EpollEvent, EpollFlags, EpollOp,
|
|
};
|
|
|
|
use super::io_err;
|
|
|
|
pub struct Reactor(RawFd);
|
|
impl Reactor {
|
|
pub fn new() -> io::Result<Reactor> {
|
|
let epoll_fd = epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).map_err(io_err)?;
|
|
Ok(Reactor(epoll_fd))
|
|
}
|
|
pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> {
|
|
let ev = &mut EpollEvent::new(flags(), key as u64);
|
|
epoll_ctl(self.0, EpollOp::EpollCtlAdd, fd, Some(ev)).map_err(io_err)
|
|
}
|
|
pub fn reregister(&self, _raw: RawFd, _key: usize) -> io::Result<()> {
|
|
Ok(())
|
|
}
|
|
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
|
|
epoll_ctl(self.0, EpollOp::EpollCtlDel, fd, None).map_err(io_err)
|
|
}
|
|
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
|
|
let timeout_ms = timeout
|
|
.and_then(|t| t.as_millis().try_into().ok())
|
|
.unwrap_or(-1);
|
|
events.len = epoll_wait(self.0, &mut events.list, timeout_ms).map_err(io_err)?;
|
|
Ok(events.len)
|
|
}
|
|
}
|
|
fn flags() -> EpollFlags {
|
|
EpollFlags::EPOLLET | EpollFlags::EPOLLIN | EpollFlags::EPOLLOUT | EpollFlags::EPOLLRDHUP
|
|
}
|
|
|
|
pub struct Events {
|
|
list: Box<[EpollEvent]>,
|
|
len: usize,
|
|
}
|
|
impl Events {
|
|
pub fn new() -> Events {
|
|
let list = vec![EpollEvent::empty(); 1000].into_boxed_slice();
|
|
let len = 0;
|
|
Events { list, len }
|
|
}
|
|
pub fn iter(&self) -> impl Iterator<Item = usize> + '_ {
|
|
self.list[..self.len].iter().map(|ev| ev.data() as usize)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Bindings to kqueue (macOS, iOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
|
|
#[cfg(any(
|
|
target_os = "macos",
|
|
target_os = "ios",
|
|
target_os = "freebsd",
|
|
target_os = "netbsd",
|
|
target_os = "openbsd",
|
|
target_os = "dragonfly",
|
|
))]
|
|
mod sys {
|
|
use std::convert::TryInto;
|
|
use std::io;
|
|
use std::os::unix::io::RawFd;
|
|
use std::time::Duration;
|
|
|
|
use nix::errno::Errno;
|
|
use nix::fcntl::{fcntl, FcntlArg, FdFlag};
|
|
use nix::libc;
|
|
use nix::sys::event::{kevent_ts, kqueue, EventFilter, EventFlag, FilterFlag, KEvent};
|
|
|
|
use super::io_err;
|
|
|
|
pub struct Reactor(RawFd);
|
|
impl Reactor {
|
|
pub fn new() -> io::Result<Reactor> {
|
|
let fd = kqueue().map_err(io_err)?;
|
|
fcntl(fd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)).map_err(io_err)?;
|
|
Ok(Reactor(fd))
|
|
}
|
|
pub fn register(&self, fd: RawFd, key: usize) -> io::Result<()> {
|
|
let flags = EventFlag::EV_CLEAR | EventFlag::EV_RECEIPT | EventFlag::EV_ADD;
|
|
let udata = key as _;
|
|
let changelist = [
|
|
KEvent::new(fd as _, EventFilter::EVFILT_WRITE, flags, FFLAGS, 0, udata),
|
|
KEvent::new(fd as _, EventFilter::EVFILT_READ, flags, FFLAGS, 0, udata),
|
|
];
|
|
let mut eventlist = changelist.clone();
|
|
kevent_ts(self.0, &changelist, &mut eventlist, None).map_err(io_err)?;
|
|
for ev in &eventlist {
|
|
// See https://github.com/tokio-rs/mio/issues/582
|
|
let (flags, data) = (ev.flags(), ev.data());
|
|
if flags.contains(EventFlag::EV_ERROR) && data != 0 && data != Errno::EPIPE as _ {
|
|
return Err(io::Error::from_raw_os_error(data as _));
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
pub fn reregister(&self, _fd: RawFd, _key: usize) -> io::Result<()> {
|
|
Ok(())
|
|
}
|
|
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
|
|
let flags = EventFlag::EV_RECEIPT | EventFlag::EV_DELETE;
|
|
let changelist = [
|
|
KEvent::new(fd as _, EventFilter::EVFILT_WRITE, flags, FFLAGS, 0, 0),
|
|
KEvent::new(fd as _, EventFilter::EVFILT_READ, flags, FFLAGS, 0, 0),
|
|
];
|
|
let mut eventlist = changelist.clone();
|
|
kevent_ts(self.0, &changelist, &mut eventlist, None).map_err(io_err)?;
|
|
for ev in &eventlist {
|
|
let (flags, data) = (ev.flags(), ev.data());
|
|
if flags.contains(EventFlag::EV_ERROR) && data != 0 {
|
|
return Err(io::Error::from_raw_os_error(data as _));
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
|
|
let timeout_ms: Option<usize> = timeout.and_then(|t| t.as_millis().try_into().ok());
|
|
let timeout = timeout_ms.map(|ms| libc::timespec {
|
|
tv_sec: (ms / 1000) as libc::time_t,
|
|
tv_nsec: ((ms % 1000) * 1_000_000) as libc::c_long,
|
|
});
|
|
events.len = kevent_ts(self.0, &[], &mut events.list, timeout).map_err(io_err)?;
|
|
Ok(events.len)
|
|
}
|
|
}
|
|
const FFLAGS: FilterFlag = FilterFlag::empty();
|
|
|
|
pub struct Events {
|
|
list: Box<[KEvent]>,
|
|
len: usize,
|
|
}
|
|
impl Events {
|
|
pub fn new() -> Events {
|
|
let flags = EventFlag::empty();
|
|
let event = KEvent::new(0, EventFilter::EVFILT_USER, flags, FFLAGS, 0, 0);
|
|
let list = vec![event; 1000].into_boxed_slice();
|
|
let len = 0;
|
|
Events { list, len }
|
|
}
|
|
pub fn iter(&self) -> impl Iterator<Item = usize> + '_ {
|
|
self.list[..self.len].iter().map(|ev| ev.udata() as usize)
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Bindings to wepoll (Windows).
|
|
#[cfg(target_os = "windows")]
|
|
mod sys {
|
|
use std::io;
|
|
use std::os::windows::io::{AsRawSocket, RawSocket};
|
|
use std::time::Duration;
|
|
|
|
use wepoll_binding::{Epoll, EventFlag};
|
|
|
|
pub struct Reactor(Epoll);
|
|
impl Reactor {
|
|
pub fn new() -> io::Result<Reactor> {
|
|
Ok(Reactor(Epoll::new()?))
|
|
}
|
|
pub fn register(&self, sock: RawSocket, key: usize) -> io::Result<()> {
|
|
self.0.register(&As(sock), flags(), key as u64)
|
|
}
|
|
pub fn reregister(&self, sock: RawSocket, key: usize) -> io::Result<()> {
|
|
// Ignore errors because a concurrent poll can reregister the handle at any point.
|
|
let _ = self.0.reregister(&As(sock), flags(), key as u64);
|
|
Ok(())
|
|
}
|
|
pub fn deregister(&self, sock: RawSocket) -> io::Result<()> {
|
|
// Ignore errors because an event can deregister the handle at any point.
|
|
let _ = self.0.deregister(&As(sock));
|
|
Ok(())
|
|
}
|
|
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
|
|
events.0.clear();
|
|
self.0.poll(&mut events.0, timeout)
|
|
}
|
|
}
|
|
struct As(RawSocket);
|
|
impl AsRawSocket for As {
|
|
fn as_raw_socket(&self) -> RawSocket {
|
|
self.0
|
|
}
|
|
}
|
|
fn flags() -> EventFlag {
|
|
EventFlag::ONESHOT | EventFlag::IN | EventFlag::OUT | EventFlag::RDHUP
|
|
}
|
|
|
|
pub struct Events(wepoll_binding::Events);
|
|
impl Events {
|
|
pub fn new() -> Events {
|
|
Events(wepoll_binding::Events::with_capacity(1000))
|
|
}
|
|
pub fn iter(&self) -> impl Iterator<Item = usize> + '_ {
|
|
self.0.iter().map(|ev| ev.data() as usize)
|
|
}
|
|
}
|
|
}
|