Split up some more

This commit is contained in:
Stjepan Glavina 2020-04-20 14:08:41 +02:00
parent a8f9fe2375
commit c8f2572e35
10 changed files with 465 additions and 436 deletions

View File

@ -1,13 +1,15 @@
//! Async methods for networking types.
//! Async I/O.
use std::future::Future;
use std::io::{self, Read, Write};
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
#[cfg(unix)]
use std::{
os::unix::io::AsRawFd,
os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
path::Path,
};
@ -15,13 +17,139 @@ use std::{
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, RawSocket};
use futures::future;
use futures::io::{AsyncRead, AsyncWrite};
use futures::stream::{self, Stream};
use socket2::{Domain, Protocol, Socket, Type};
use crate::reactor::Async;
use crate::reactor::{Reactor, Source};
use crate::task::Task;
/// Async I/O.
///
/// TODO
/// TODO: suggest using Shared to split
/// TODO: suggest using Lock if Async<T> is not splittable
#[derive(Debug)]
pub struct Async<T> {
/// A source registered in the reactor.
source: Arc<Source>,
/// The inner I/O handle.
io: Option<Box<T>>,
}
#[cfg(unix)]
impl<T: AsRawFd> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
///
/// TODO: explain AsRawFd and AsRawSocket
/// TODO: **warning** for unix users: the I/O handle must be compatible with epoll/kqueue!
/// Most notably, `File`, `Stdin`, `Stdout`, `Stderr` will **not** work.
pub fn new(io: T) -> io::Result<Async<T>> {
Ok(Async {
source: Reactor::get().insert_io(io.as_raw_fd())?,
io: Some(Box::new(io)),
})
}
}
#[cfg(windows)]
impl<T: AsRawSocket> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
///
/// TODO
pub fn new(io: T) -> io::Result<Async<T>> {
Ok(Async {
source: Reactor::get().insert_io(io.as_raw_socket())?,
io: Some(Box::new(io)),
})
}
}
impl<T> Async<T> {
/// Gets a reference to the inner I/O handle.
///
/// # Examples
///
/// ```
/// use smol::Async;
/// use std::net::TcpListener;
///
/// # smol::run(async {
/// let listener = Async::<TcpListener>::bind("127.0.0.1:80")?;
/// let inner = listener.get_ref();
/// # std::io::Result::Ok(()) });
/// ```
pub fn get_ref(&self) -> &T {
self.io.as_ref().unwrap()
}
/// Gets a mutable reference to the inner I/O handle.
///
/// # Examples
///
/// ```
/// use smol::Async;
/// use std::net::TcpListener;
///
/// # smol::run(async {
/// let mut listener = Async::<TcpListener>::bind("127.0.0.1:80")?;
/// let inner = listener.get_mut();
/// # std::io::Result::Ok(()) });
/// ```
pub fn get_mut(&mut self) -> &mut T {
self.io.as_mut().unwrap()
}
/// Unwraps the inner non-blocking I/O handle.
///
/// # Examples
///
/// ```
/// use smol::Async;
/// use std::net::TcpListener;
///
/// # smol::run(async {
/// let listener = Async::<TcpListener>::bind("127.0.0.1:80")?;
/// let inner = listener.into_inner()?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn into_inner(mut self) -> io::Result<T> {
let io = *self.io.take().unwrap();
Reactor::get().remove_io(&self.source)?;
Ok(io)
}
/// TODO
pub async fn with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut op = op;
let mut io = self.io.as_ref().unwrap();
let source = &self.source;
future::poll_fn(|cx| source.poll_io(cx, || op(&mut io))).await
}
/// TODO
pub async fn with_mut<R>(&mut self, op: impl FnMut(&mut T) -> io::Result<R>) -> io::Result<R> {
let mut op = op;
let mut io = self.io.as_mut().unwrap();
let source = &self.source;
future::poll_fn(|cx| source.poll_io(cx, || op(&mut io))).await
}
}
impl<T> Drop for Async<T> {
fn drop(&mut self) {
if self.io.is_some() {
// Deregister and ignore errors because destructors should not panic.
let _ = Reactor::get().remove_io(&self.source);
// Drop the I/O handle to close it.
self.io.take();
}
}
}
/// Pins a future and then polls it.
fn poll_future<T>(cx: &mut Context<'_>, fut: impl Future<Output = T>) -> Poll<T> {
futures::pin_mut!(fut);

View File

@ -1,3 +1,5 @@
//! Implementation of [`block_on()`].
use std::cell::RefCell;
use std::future::Future;
use std::task::{Context, Poll, Waker};

View File

@ -1,4 +1,11 @@
//! The blocking executor.
//!
//! This module also implements convenient adapters:
//!
//! - [`blocking!`] as syntax sugar around [`Task::blocking()`]
//! - [`iter()`] converts an [`Iterator`] into a [`Stream`]
//! - [`reader()`] converts a [`Read`] into an [`AsyncRead`]
//! - [`writer()`] converts a [`Write`] into an [`AsyncWrite`]
use std::collections::VecDeque;
use std::future::Future;
@ -110,7 +117,7 @@ impl BlockingExecutor {
thread::spawn(move || {
// If enabled, set up tokio before the main loop begins.
context::enter(|| self.main_loop()) // TODO
context::enter(|| self.main_loop())
});
}
}

View File

@ -1,4 +1,6 @@
//! Task context common to all executors.
//!
//! Before executor, we "enter" it by setting up some necessary thread-locals.
/// Enters the tokio context if the `tokio` feature is enabled.
pub(crate) fn enter<T>(f: impl FnOnce() -> T) -> T {

147
src/io_event.rs Normal file
View File

@ -0,0 +1,147 @@
use std::io::{self, Read, Write};
use std::sync::atomic::{self, AtomicBool, Ordering};
use std::sync::Arc;
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, RawSocket};
use socket2::{Domain, Socket, Type};
use crate::async_io::Async;
/// A boolean flag that is set whenever a thread-local task is woken by another thread.
///
/// Every time this flag's value is changed, an I/O event is triggered.
///
/// TODO
#[derive(Clone)]
pub(crate) struct IoEvent {
pipe: Arc<SelfPipe>,
}
/// TODO
impl IoEvent {
pub fn new() -> io::Result<IoEvent> {
Ok(IoEvent {
pipe: Arc::new(SelfPipe::new()?),
})
}
pub fn set(&self) {
self.pipe.set();
}
pub fn clear(&self) -> bool {
self.pipe.clear()
}
pub async fn ready(&self) {
self.pipe.ready().await;
}
}
/// A boolean flag that triggers I/O events whenever changed.
///
/// https://cr.yp.to/docs/selfpipe.html
///
/// TODO
struct SelfPipe {
flag: AtomicBool,
writer: Socket,
reader: Async<Socket>,
}
/// TODO
impl SelfPipe {
/// Creates a self-pipe.
fn new() -> io::Result<SelfPipe> {
let (writer, reader) = pipe()?;
writer.set_send_buffer_size(1)?;
reader.set_recv_buffer_size(1)?;
Ok(SelfPipe {
flag: AtomicBool::new(false),
writer,
reader: Async::new(reader)?,
})
}
/// Sets the flag to `true`.
// TODO: rename to raise() as in "raise a signal"? or even better: emit() or notify()
fn set(&self) {
// Publish all in-memory changes before setting the flag.
atomic::fence(Ordering::SeqCst);
// If the flag is not set...
if !self.flag.load(Ordering::SeqCst) {
// If this thread sets it...
if !self.flag.swap(true, Ordering::SeqCst) {
// Trigger an I/O event by writing a byte into the sending socket.
let _ = (&self.writer).write(&[1]);
let _ = (&self.writer).flush();
}
}
}
/// Sets the flag to `false`.
fn clear(&self) -> bool {
// Read all available bytes from the receiving socket.
while self.reader.get_ref().read(&mut [0; 64]).is_ok() {}
let value = self.flag.swap(false, Ordering::SeqCst);
// Publish all in-memory changes after clearing the flag.
atomic::fence(Ordering::SeqCst);
value
}
/// Waits until the flag is changed.
///
/// Note that this method may spuriously report changes when they didn't really happen.
async fn ready(&self) {
self.reader
.with(|_| match self.flag.load(Ordering::SeqCst) {
true => Ok(()),
false => Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
})
.await
.expect("failure while waiting on a self-pipe");
}
}
/// TODO
#[cfg(unix)]
fn pipe() -> io::Result<(Socket, Socket)> {
let (sock1, sock2) = Socket::pair(Domain::unix(), Type::stream(), None)?;
sock1.set_nonblocking(true)?;
sock2.set_nonblocking(true)?;
Ok((sock1, sock2))
}
/// TODO
#[cfg(windows)]
fn pipe() -> io::Result<(Socket, Socket)> {
// TODO The only portable way of manually triggering I/O events is to create a socket and
// send/receive dummy data on it. This pattern is also known as "the self-pipe trick".
// See the links below for more information.
//
// https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py
// https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows
// https://gist.github.com/geertj/4325783
// Create a temporary listener.
let listener = Socket::new(Domain::ipv4(), Type::stream(), None)?;
listener.bind(&SocketAddr::from(([127, 0, 0, 1], 0)).into())?;
listener.listen(1)?;
// First socket: start connecting to the listener.
let sock1 = Socket::new(Domain::ipv4(), Type::stream(), None)?;
sock1.set_nonblocking(true)?;
let _ = sock1.set_nodelay(true)?;
let _ = sock1.connect(&listener.local_addr()?);
// Second socket: accept a connection from the listener.
let (sock2, _) = listener.accept()?;
sock2.set_nonblocking(true)?;
let _ = sock2.set_nodelay(true)?;
Ok((sock1, sock2))
}

View File

@ -121,19 +121,22 @@
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
mod async_io;
mod block_on;
mod blocking;
mod context;
mod net;
mod io_event;
mod reactor;
mod run;
mod task;
mod thread_local;
mod throttle;
mod timer;
mod work_stealing;
pub use async_io::Async;
pub use block_on::block_on;
pub use blocking::{iter, reader, writer};
pub use reactor::{Async, Timer};
pub use run::run;
pub use task::Task;
pub use timer::Timer;

View File

@ -15,49 +15,25 @@ compile_error!("reactor does not support this target OS");
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::future::Future;
use std::io::{self, Read, Write};
use std::io;
use std::mem;
use std::pin::Pin;
use std::sync::atomic::{self, AtomicBool, AtomicU64, Ordering};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::time::{Duration, Instant};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::io::RawFd;
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, FromRawSocket, RawSocket};
use std::os::windows::io::RawSocket;
use futures::future;
use once_cell::sync::Lazy;
use slab::Slab;
use socket2::{Domain, Socket, Type};
use crate::io_event::IoEvent;
use crate::throttle;
/// A registered source of I/O events.
#[derive(Debug)]
struct Source {
/// Raw file descriptor on Unix platforms.
#[cfg(unix)]
raw: RawFd,
/// Raw socket handle on Windows.
#[cfg(windows)]
raw: RawSocket,
/// The ID of this source obtain during registration.
key: usize,
/// A list of wakers representing tasks interested in events on this source.
wakers: piper::Lock<Vec<Waker>>,
/// Incremented on every I/O notification - this is only used for synchronization.
tick: AtomicU64,
}
/// The reactor driving I/O events and timers.
///
/// Every async I/O handle ("source") and every timer is registered here. Invocations of `run()`
@ -105,7 +81,7 @@ impl Reactor {
}
/// Registers an I/O source in the reactor.
fn register(
pub fn insert_io(
&self,
#[cfg(unix)] raw: RawFd,
#[cfg(windows)] raw: RawSocket,
@ -113,6 +89,23 @@ impl Reactor {
let mut sources = self.sources.lock();
let vacant = sources.vacant_entry();
#[cfg(unix)]
{
use nix::fcntl::{fcntl, FcntlArg, OFlag};
// Put the I/O handle in non-blocking mode.
let flags = fcntl(raw, FcntlArg::F_GETFL).map_err(io_err)?;
let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
fcntl(raw, FcntlArg::F_SETFL(flags)).map_err(io_err)?;
}
#[cfg(windows)]
{
// Put the I/O handle in non-blocking mode.
let socket = unsafe { Socket::from_raw_socket(raw) };
mem::ManuallyDrop::new(socket).set_nonblocking(true)?;
}
// Create a source and register it.
let source = Arc::new(Source {
raw,
@ -126,12 +119,38 @@ impl Reactor {
}
/// Deregisters an I/O source from the reactor.
fn deregister(&self, source: &Source) -> io::Result<()> {
pub fn remove_io(&self, source: &Source) -> io::Result<()> {
let mut sources = self.sources.lock();
sources.remove(source.key);
self.sys.deregister(source.raw)
}
/// TODO
pub fn insert_timer(&self, when: Instant, waker: Waker) -> u64 {
let mut timers = self.timers.lock();
// If this timer is going to be the earliest one, interrupt the reactor.
if let Some((first, _)) = timers.keys().next() {
if when < *first {
self.event.set();
}
}
// Generate a new ID.
static ID_GENERATOR: AtomicU64 = AtomicU64::new(1);
let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
assert!(id < u64::max_value() / 2, "exhausted timer IDs");
// Insert this timer into the timers map.
timers.insert((when, id), waker);
id
}
/// TODO
pub fn remove_timer(&self, when: Instant, id: u64) {
self.timers.lock().remove(&(when, id));
}
/// Processes ready events without blocking.
///
/// This doesn't have strong guarantees. If there are ready events, they may or may not be
@ -221,131 +240,62 @@ impl ReactorLock<'_> {
}
}
/// Fires at the chosen point in time.
///
/// TODO
/// A registered source of I/O events.
#[derive(Debug)]
pub struct Timer {
/// A unique ID for this timer.
///
/// When this field is set to `None`, this timer is not registered in the reactor.
id: Option<u64>,
pub(crate) struct Source {
/// Raw file descriptor on Unix platforms.
#[cfg(unix)]
raw: RawFd,
/// When this timer fires.
when: Instant,
/// Raw socket handle on Windows.
#[cfg(windows)]
raw: RawSocket,
/// The ID of this source obtain during registration.
key: usize,
/// A list of wakers representing tasks interested in events on this source.
wakers: piper::Lock<Vec<Waker>>,
/// Incremented on every I/O notification - this is only used for synchronization.
tick: AtomicU64,
}
impl Timer {
/// Fires after the specified duration of time.
///
/// TODO
pub fn after(dur: Duration) -> Timer {
Timer::at(Instant::now() + dur)
}
impl Source {
/// Attempts a non-blocking I/O operation and registers a waker if it errors with `WouldBlock`.
pub fn poll_io<R>(
&self,
cx: &mut Context<'_>,
mut op: impl FnMut() -> io::Result<R>,
) -> Poll<io::Result<R>> {
// Throttle if the current task did too many I/O operations without yielding.
futures::ready!(throttle::poll(cx));
/// Fires at the specified instant in time.
///
/// TODO
pub fn at(when: Instant) -> Timer {
let id = None;
Timer { id, when }
}
loop {
// This number is bumped just before I/O notifications while wakers are locked.
let tick = self.tick.load(Ordering::Acquire);
/// Registers this timer in the reactor, if not already registered.
///
/// A waker associated with the current context will be stored and woken when the timer fires.
fn register(&mut self, cx: &mut Context<'_>) {
if self.id.is_none() {
let mut timers = Reactor::get().timers.lock();
// If this timer is going to be the earliest one, interrupt the reactor.
if let Some((first, _)) = timers.keys().next() {
if self.when < *first {
Reactor::get().event.set();
}
// Attempt the non-blocking operation.
match op() {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
// Generate a new ID.
static ID_GENERATOR: AtomicU64 = AtomicU64::new(1);
let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
// Lock the waker list and retry the non-blocking operation.
let mut wakers = self.wakers.lock();
// Insert this timer into the timers map.
timers.insert((self.when, id), cx.waker().clone());
self.id = Some(id);
// If the current task is already registered, return.
if wakers.iter().any(|w| w.will_wake(cx.waker())) {
return Poll::Pending;
}
// If there were no new notifications, register and return.
if self.tick.load(Ordering::Acquire) == tick {
wakers.push(cx.waker().clone());
return Poll::Pending;
}
}
}
/// Deregisters this timer from the reactor, if it was registered.
fn deregister(&mut self) {
if let Some(id) = self.id {
Reactor::get().timers.lock().remove(&(self.when, id));
}
}
}
impl Drop for Timer {
fn drop(&mut self) {
self.deregister();
}
}
impl Future for Timer {
type Output = Instant;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Check if this timer has already fired.
if Instant::now() >= self.when {
self.deregister();
Poll::Ready(self.when)
} else {
self.register(cx);
Poll::Pending
}
}
}
/// Async I/O.
///
/// TODO
/// TODO: suggest using Shared to split
/// TODO: suggest using Lock if Async<T> is not splittable
#[derive(Debug)]
pub struct Async<T> {
/// A source registered in the reactor.
source: Arc<Source>,
/// The inner I/O handle.
io: Option<Box<T>>,
}
#[cfg(unix)]
impl<T: AsRawFd> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
///
/// TODO: explain AsRawFd and AsRawSocket
/// TODO: **warning** for unix users: the I/O handle must be compatible with epoll/kqueue!
/// Most notably, `File`, `Stdin`, `Stdout`, `Stderr` will **not** work.
pub fn new(io: T) -> io::Result<Async<T>> {
use nix::fcntl::{fcntl, FcntlArg, OFlag};
// Put the I/O handle in non-blocking mode.
let flags = fcntl(io.as_raw_fd(), FcntlArg::F_GETFL).map_err(io_err)?;
let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
fcntl(io.as_raw_fd(), FcntlArg::F_SETFL(flags)).map_err(io_err)?;
// Register the I/O handle in the reactor.
Ok(Async {
source: Reactor::get().register(io.as_raw_fd())?,
io: Some(Box::new(io)),
})
}
}
#[cfg(unix)]
impl<T: AsRawFd> AsRawFd for Async<T> {
fn as_raw_fd(&self) -> RawFd {
self.source.raw
}
}
/// Converts a `nix::Error` into `std::io::Error`.
@ -357,286 +307,7 @@ fn io_err(err: nix::Error) -> io::Error {
}
}
#[cfg(windows)]
impl<T: AsRawSocket> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn new(io: T) -> io::Result<Async<T>> {
// Put the I/O handle in non-blocking mode.
let socket = unsafe { Socket::from_raw_socket(io.as_raw_socket()) };
mem::ManuallyDrop::new(socket).set_nonblocking(true)?;
// Register the I/O handle in the reactor.
Ok(Async {
source: Reactor::get().register(io.as_raw_socket())?,
io: Some(Box::new(io)),
})
}
}
#[cfg(windows)]
impl<T: AsRawSocket> AsRawSocket for Async<T> {
fn as_raw_socket(&self) -> RawSocket {
self.source.raw
}
}
impl<T> Async<T> {
/// Gets a reference to the inner I/O handle.
///
/// # Examples
///
/// ```
/// use smol::Async;
/// use std::net::TcpListener;
///
/// # smol::run(async {
/// let listener = Async::<TcpListener>::bind("127.0.0.1:80")?;
/// let inner = listener.get_ref();
/// # std::io::Result::Ok(()) });
/// ```
pub fn get_ref(&self) -> &T {
self.io.as_ref().unwrap()
}
/// Gets a mutable reference to the inner I/O handle.
///
/// # Examples
///
/// ```
/// use smol::Async;
/// use std::net::TcpListener;
///
/// # smol::run(async {
/// let mut listener = Async::<TcpListener>::bind("127.0.0.1:80")?;
/// let inner = listener.get_mut();
/// # std::io::Result::Ok(()) });
/// ```
pub fn get_mut(&mut self) -> &mut T {
self.io.as_mut().unwrap()
}
/// Unwraps the inner non-blocking I/O handle.
///
/// # Examples
///
/// ```
/// use smol::Async;
/// use std::net::TcpListener;
///
/// # smol::run(async {
/// let listener = Async::<TcpListener>::bind("127.0.0.1:80")?;
/// let inner = listener.into_inner()?;
/// # std::io::Result::Ok(()) });
/// ```
pub fn into_inner(mut self) -> io::Result<T> {
let io = *self.io.take().unwrap();
Reactor::get().deregister(&self.source)?;
Ok(io)
}
/// TODO
pub async fn with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut op = op;
let mut io = self.io.as_ref().unwrap();
let source = &self.source;
future::poll_fn(|cx| Self::poll_io(cx, || op(&mut io), source)).await
}
/// TODO
pub async fn with_mut<R>(&mut self, op: impl FnMut(&mut T) -> io::Result<R>) -> io::Result<R> {
let mut op = op;
let mut io = self.io.as_mut().unwrap();
let source = &self.source;
future::poll_fn(|cx| Self::poll_io(cx, || op(&mut io), source)).await
}
/// Attempts a non-blocking I/O operation and registers a waker if it errors with `WouldBlock`.
fn poll_io<R>(
cx: &mut Context<'_>,
mut op: impl FnMut() -> io::Result<R>,
source: &Source,
) -> Poll<io::Result<R>> {
// Throttle if the current task did too many I/O operations without yielding.
futures::ready!(throttle::poll(cx));
loop {
// This number is bumped just before I/O notifications while wakers are locked.
let tick = source.tick.load(Ordering::Acquire);
// Attempt the non-blocking operation.
match op() {
Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
res => return Poll::Ready(res),
}
// Lock the waker list and retry the non-blocking operation.
let mut wakers = source.wakers.lock();
// If the current task is already registered, return.
if wakers.iter().any(|w| w.will_wake(cx.waker())) {
return Poll::Pending;
}
// If there were no new notifications, register and return.
if source.tick.load(Ordering::Acquire) == tick {
wakers.push(cx.waker().clone());
return Poll::Pending;
}
}
}
}
impl<T> Drop for Async<T> {
fn drop(&mut self) {
if self.io.is_some() {
// Deregister and ignore errors because destructors should not panic.
let _ = Reactor::get().deregister(&self.source);
// Drop the I/O handle to close it.
self.io.take();
}
}
}
/// A boolean flag that is set whenever a thread-local task is woken by another thread.
///
/// Every time this flag's value is changed, an I/O event is triggered.
///
/// TODO
#[derive(Clone)]
pub(crate) struct IoEvent {
pipe: Arc<SelfPipe>,
}
/// TODO
impl IoEvent {
pub fn new() -> io::Result<IoEvent> {
Ok(IoEvent {
pipe: Arc::new(SelfPipe::new()?),
})
}
pub fn set(&self) {
self.pipe.set();
}
pub fn clear(&self) -> bool {
self.pipe.clear()
}
pub async fn ready(&self) {
self.pipe.ready().await;
}
}
/// A boolean flag that triggers I/O events whenever changed.
///
/// https://cr.yp.to/docs/selfpipe.html
///
/// TODO
struct SelfPipe {
flag: AtomicBool,
writer: Socket,
reader: Async<Socket>,
}
/// TODO
impl SelfPipe {
/// Creates a self-pipe.
fn new() -> io::Result<SelfPipe> {
let (writer, reader) = pipe()?;
writer.set_send_buffer_size(1)?;
reader.set_recv_buffer_size(1)?;
Ok(SelfPipe {
flag: AtomicBool::new(false),
writer,
reader: Async::new(reader)?,
})
}
/// Sets the flag to `true`.
// TODO: rename to raise() as in "raise a signal"? or even better: emit() or notify()
fn set(&self) {
// Publish all in-memory changes before setting the flag.
atomic::fence(Ordering::SeqCst);
// If the flag is not set...
if !self.flag.load(Ordering::SeqCst) {
// If this thread sets it...
if !self.flag.swap(true, Ordering::SeqCst) {
// Trigger an I/O event by writing a byte into the sending socket.
let _ = (&self.writer).write(&[1]);
let _ = (&self.writer).flush();
}
}
}
/// Sets the flag to `false`.
fn clear(&self) -> bool {
// Read all available bytes from the receiving socket.
while self.reader.get_ref().read(&mut [0; 64]).is_ok() {}
let value = self.flag.swap(false, Ordering::SeqCst);
// Publish all in-memory changes after clearing the flag.
atomic::fence(Ordering::SeqCst);
value
}
/// Waits until the flag is changed.
///
/// Note that this method may spuriously report changes when they didn't really happen.
async fn ready(&self) {
self.reader
.with(|_| match self.flag.load(Ordering::SeqCst) {
true => Ok(()),
false => Err(io::Error::new(io::ErrorKind::WouldBlock, "")),
})
.await
.expect("failure while waiting on a self-pipe");
}
}
/// TODO
#[cfg(unix)]
fn pipe() -> io::Result<(Socket, Socket)> {
let (sock1, sock2) = Socket::pair(Domain::unix(), Type::stream(), None)?;
sock1.set_nonblocking(true)?;
sock2.set_nonblocking(true)?;
Ok((sock1, sock2))
}
/// TODO
#[cfg(windows)]
fn pipe() -> io::Result<(Socket, Socket)> {
// TODO The only portable way of manually triggering I/O events is to create a socket and
// send/receive dummy data on it. This pattern is also known as "the self-pipe trick".
// See the links below for more information.
//
// https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py
// https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows
// https://gist.github.com/geertj/4325783
// Create a temporary listener.
let listener = Socket::new(Domain::ipv4(), Type::stream(), None)?;
listener.bind(&SocketAddr::from(([127, 0, 0, 1], 0)).into())?;
listener.listen(1)?;
// First socket: start connecting to the listener.
let sock1 = Socket::new(Domain::ipv4(), Type::stream(), None)?;
sock1.set_nonblocking(true)?;
let _ = sock1.set_nodelay(true)?;
let _ = sock1.connect(&listener.local_addr()?);
// Second socket: accept a connection from the listener.
let (sock2, _) = listener.accept()?;
sock2.set_nonblocking(true)?;
let _ = sock2.set_nodelay(true)?;
Ok((sock1, sock2))
}
// ---------- epoll (Linux, Android) ----------
/// Bindings to epoll (Linux, Android).
#[cfg(any(target_os = "linux", target_os = "android"))]
mod sys {
use std::convert::TryInto;
@ -694,8 +365,7 @@ mod sys {
}
}
// ---------- kqueue (macOS, iOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD) ----------
/// Bindings to kqueue (macOS, iOS, FreeBSD, NetBSD, OpenBSD, DragonFly BSD).
#[cfg(any(
target_os = "macos",
target_os = "ios",
@ -791,8 +461,7 @@ mod sys {
}
}
// ---------- wepoll (Windows) ----------
/// Bindings to wepoll (Windows).
#[cfg(target_os = "windows")]
mod sys {
use std::io;

View File

@ -9,7 +9,8 @@ use std::thread::{self, ThreadId};
use crossbeam::queue::SegQueue;
use scoped_tls_hkt::scoped_thread_local;
use crate::reactor::{IoEvent, Reactor};
use crate::io_event::IoEvent;
use crate::reactor::Reactor;
use crate::task::{Runnable, Task};
use crate::throttle;

69
src/timer.rs Normal file
View File

@ -0,0 +1,69 @@
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use crate::reactor::Reactor;
/// Fires at the chosen point in time.
///
/// TODO
#[derive(Debug)]
pub struct Timer {
/// A unique ID for this timer.
///
/// When this field is set to `None`, this timer is not registered in the reactor.
id: Option<u64>,
/// When this timer fires.
when: Instant,
}
impl Timer {
/// Fires after the specified duration of time.
///
/// TODO
pub fn after(dur: Duration) -> Timer {
Timer::at(Instant::now() + dur)
}
/// Fires at the specified instant in time.
///
/// TODO
pub fn at(when: Instant) -> Timer {
let id = None;
Timer { id, when }
}
}
impl Drop for Timer {
fn drop(&mut self) {
if let Some(id) = self.id {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
}
}
}
impl Future for Timer {
type Output = Instant;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Check if the timer has already fired.
if Instant::now() >= self.when {
if let Some(id) = self.id {
// Deregister the timer from the reactor.
Reactor::get().remove_timer(self.when, id);
}
Poll::Ready(self.when)
} else {
if self.id.is_none() {
// Register the timer in the reactor.
let waker = cx.waker().clone();
self.id = Some(Reactor::get().insert_timer(self.when, waker));
}
Poll::Pending
}
}
}

View File

@ -11,7 +11,8 @@ use once_cell::sync::Lazy;
use scoped_tls_hkt::scoped_thread_local;
use slab::Slab;
use crate::reactor::{IoEvent, Reactor};
use crate::io_event::IoEvent;
use crate::reactor::Reactor;
use crate::task::{Runnable, Task};
use crate::throttle;