More docs

This commit is contained in:
Stjepan Glavina 2020-04-16 16:09:22 +02:00
parent 90f0d901e5
commit 07cf232cc9
3 changed files with 276 additions and 90 deletions

View File

@ -7,7 +7,7 @@ description = "WIP"
license = "MIT OR Apache-2.0"
[dependencies]
async-task = "2.1.1"
async-task = "3.0.0"
crossbeam = "0.7.3"
futures = { version = "0.3.4", default-features = false, features = ["std"] }
once_cell = "1.3.1"

View File

@ -1,3 +1,7 @@
# smol
A small and fast async runtime.
## Goals
* Small - Fits into a single source file.

View File

@ -9,6 +9,8 @@
//! 3. Blocking executor for tasks created by [`Task::blocking()`], [`blocking!`], [`iter()`],
//! [`reader()`] and [`writer()`].
//!
//! Blocking executor is the only one that spawns threads.
//!
//! ## Reactor
//!
//! To wait for the next I/O event, the reactor calls [epoll] on Linux/Android, [kqueue] on
@ -17,14 +19,17 @@
//! The [`Async`] type registers an I/O handle in the reactor and is able to convert its blocking
//! operations into async operations.
//!
//! The [`Timer`] type registers a timer in the reactor that will fire at a certain instant in
//! The [`Timer`] type registers a timer in the reactor that will fire at the chosen point in
//! time.
//!
//! ## Running
//!
//! Function [`run()`] simultaneously runs the thread-local executor, the work-stealing executor,
//! and polls the reactor for I/O events and timers. At least one thread has to be calling
//! [`run()`] in order for futures waiting on I/O and timers to get notified.
//! Function [`run()`] simultaneously runs the thread-local executor, runs the work-stealing
//! executor, and polls the reactor for I/O events and timers. At least one thread has to be
//! calling [`run()`] in order for futures waiting on I/O and timers to get notified.
//!
//! If you want a multithreaded runtime, just call [`run()`] from multiple threads. See [here TODO
//! link to example] for an example.
//!
//! There is also [`block_on()`], which simply blocks the thread until a future completes, but it
//! doesn't do anything else besides that.
@ -53,12 +58,60 @@
//! }
//! ```
//!
//! For more examples, look inside the [examples] directory.
//! Look inside the [examples] directory for more.
//!
//! You can use this runtime to read a [file][read-file] or
//! [directory][read-directory], [spawn][process-run] a process or read its
//! [output][process-output], use a timer to [sleep][timer-sleep] or set a
//! [timeout][timer-timeout], catch the [Ctrl-C][ctrl-c] signal and [gracefully shut down] TODO.
//!
//! You can also implement a simple TCP [client][tcp-client]/[server][tcp-server], a chat
//! [client][chat-client]/[server][chat-server] over TCP, a [web crawler][web-crawler], a simple
//! HTTP/TLS [client][simple-client]/[server][simple-server], a hyper
//! [client][hyper-client]/[server][hyper-server], an async-h1
//! [client][async-h1-client]/[server][async-h1-server], a WebSocket/TLS
//! [client][websocket-client]/[server][websocket-server].
//!
//! You can use libraries like [inotify], [timerfd], [signal-hook], and [uds_windows] in async
//! manner.
//!
//! You can also mix `smol` with [async-std] and [tokio], or use runtime-specific libraries like
//! [reqwest].
//!
//! [epoll]: https://en.wikipedia.org/wiki/Epoll
//! [kqueue]: https://en.wikipedia.org/wiki/Kqueue
//! [WSAPoll]: https://docs.microsoft.com/en-us/windows/win32/api/winsock2/nf-winsock2-wsapoll
//!
//! [examples]: https://github.com/stjepang/smol/tree/master/examples
//! [async-h1]: https://docs.rs/async-h1
//! [hyper]: https://docs.rs/hyper
//! [www.rust-lang.org]: https://www.rust-lang.org/
//! [cat]: https://en.wikipedia.org/wiki/Cat_(Unix)
//!
//! [ctrl-c]: https://github.com/stjepang/smol/blob/master/examples/ctrl-c.rs
//! [read-file]: https://github.com/stjepang/smol/blob/master/examples/read-file.rs
//! [read-directory]: https://github.com/stjepang/smol/blob/master/examples/read-directory.rs
//! [timer-sleep]: https://github.com/stjepang/smol/blob/master/examples/timer-sleep.rs
//! [timer-timeout]: https://github.com/stjepang/smol/blob/master/examples/timer-timeout.rs
//! [process-run]: https://github.com/stjepang/smol/blob/master/examples/process-run.rs
//! [process-output]: https://github.com/stjepang/smol/blob/master/examples/process-output.rs
//! [tcp-client]: https://github.com/stjepang/smol/blob/master/examples/tcp-client.rs
//! [tcp-server]: https://github.com/stjepang/smol/blob/master/examples/tcp-server.rs
//! [simple-client]: https://github.com/stjepang/smol/blob/master/examples/simple-client.rs
//! [simple-server]: https://github.com/stjepang/smol/blob/master/examples/simple-server.rs
//! [async-h1-client]: https://github.com/stjepang/smol/blob/master/examples/async-h1-client.rs
//! [async-h1-server]: https://github.com/stjepang/smol/blob/master/examples/async-h1-server.rs
//! [hyper-client]: https://github.com/stjepang/smol/blob/master/examples/hyper-client.rs
//! [hyper-server]: https://github.com/stjepang/smol/blob/master/examples/hyper-server.rs
//! [websocket-client]: https://github.com/stjepang/smol/blob/master/examples/websocket-client.rs
//! [websocket-server]: https://github.com/stjepang/smol/blob/master/examples/websocket-server.rs
//! [chat-client]: https://github.com/stjepang/smol/blob/master/examples/chat-client.rs
//! [chat-server]: https://github.com/stjepang/smol/blob/master/examples/chat-server.rs
//! [web-crawler]: https://github.com/stjepang/smol/blob/master/examples/web-crawler.rs
//! [inotify]: https://github.com/stjepang/smol/blob/master/examples/linux-inotify.rs
//! [timerfd]: https://github.com/stjepang/smol/blob/master/examples/linux-timerfd.rs
//! [signal-hook]: https://github.com/stjepang/smol/blob/master/examples/unix-signal.rs
//! [uds_windows]: https://github.com/stjepang/smol/blob/master/examples/windows-uds.rs
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
@ -112,6 +165,17 @@ use slab::Slab;
use socket2::{Domain, Protocol, Socket, Type};
use std::sync::{Condvar, Mutex, MutexGuard};
// TODO: explain the implementation and major components:
// - the Task struct
// - thread-local executor
// - work-stealing executor
// - reactor
// - Timer
// - Async
// - the IoEvent
// - blocking executor
// - sys module
// ---------- The task system ----------
/// A runnable future, ready for execution.
@ -162,7 +226,7 @@ pub struct Task<T>(Option<async_task::JoinHandle<T, ()>>);
impl<T: 'static> Task<T> {
/// Spawns a future onto the thread-local executor.
///
/// Panics if the current thread is not inside an invocation [`run()`].
/// Panics if the current thread is not inside an invocation of [`run()`].
///
/// # Examples
///
@ -294,7 +358,12 @@ impl<T> Future for Task<T> {
}
/// Blocks on a single future.
///
/// TODO
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
// The implementation of this function is explained by the following blog post:
// https://stjepang.github.io/2020/01/25/build-your-own-block-on.html
thread_local! {
// Parker and waker associated with the current thread.
static CACHE: RefCell<(Parker, Waker)> = {
@ -321,64 +390,124 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
}
/// Executes all futures until the main one completes.
///
/// TODO a thread-pool example with num_cpus::get().max(1)
pub fn run<T>(future: impl Future<Output = T>) -> T {
if THREAD_LOCAL_EXECUTOR.is_set() || WORKER.is_set() {
// If this thread is already inside an executor, panic.
if WORKER.is_set() {
panic!("recursive `run()`");
}
// Create a thread-local executor and a worker in the work-stealing executor.
let local = ThreadLocalExecutor::new();
let worker = WorkStealingExecutor::get().worker();
// TODO: Explain how this runs three executors (block_on, local, ws)
// -> it's like concurrent block_on(), ThreadLocalExecutor::execute(), Worker::execute()
// we alternate between those three executors
// Create a waker that triggers an I/O event in the thread-local scheduler.
let ev = local.event.clone();
let waker = async_task::waker_fn(move || ev.set());
let cx = &mut Context::from_waker(&waker);
futures::pin_mut!(future);
// Set up thread-locals.
// TODO: can local_event be inside local_executor?
// TODO: also spawn_event inside workstealing?
// TODO: rename workstealing to workstealingexecutor?
// Set up the thread-locals before execution begins.
THREAD_LOCAL_EXECUTOR.set(&local, || {
WORKER.set(&worker, || {
// We "drive" four components at the same time, treating them all fairly and making
// sure none of them get starved:
//
// 1. `future` - the main future.
// 2. `local - the thread-local executor.
// 3. `worker` - the work-stealing executor.
// 4. `Reactor::get()` - the reactor.
//
// When all four components are out of work, we block the current thread on
// epoll/kevent/WSAPoll. If new work comes in that isn't naturally triggered by an
// I/O event registered with `Async` handles, we use `IoEvent`s to simulate an I/O
// event that will unblock the thread:
//
// - When the main future is woken, `local.event` is triggered.
// - When thread-local executor gets new work, `local.event` is triggered.
// - When work-stealing executor gets new work, `worker.executor.event` is triggered.
// - When a new earliest timer is registered, `Reactor::get().event` is triggered.
//
// This way we make sure that if any changes happen that might give us new work will
// unblock epoll/kevent/WSAPoll and let us continue the loop.
loop {
// 1. Poll the main future.
if let Poll::Ready(val) = use_throttle(|| future.as_mut().poll(cx)) {
return val;
}
// 2. Run a batch of tasks in the thread-local executor.
let more_local = local.execute();
// 3. Run a batch of tasks in the work-stealing executor.
let more_worker = worker.execute();
// 4. Poll the reactor.
Reactor::get().poll().expect("failure while polling I/O");
if !local.event.clear() && !more_local && !more_worker {
let lock = Reactor::get().lock();
let ready = local.event.ready();
futures::pin_mut!(lock);
futures::pin_mut!(ready);
// If there is more work in the thread-local or the work-stealing executor,
// continue the loop.
if more_local || more_worker {
continue;
}
// Block until either the reactor is locked or a local event occurs.
if let Either::Left((mut reactor, _)) = block_on(future::select(lock, ready)) {
if !local.event.clear() && !worker.event.clear() {
reactor.wait().expect("failure while polling I/O");
}
// Prepare for blocking until the reactor is locked or `local.event` is triggered.
//
// Note that there is no need to wait for `worker.executor.event`. If the reactor
// is locked immediately, we'll check for the I/O event right after that anyway.
//
// If some other worker is holding the reactor locked, it will be unblocked as soon
// as the I/O event is triggered. Then, another worker will be allowed to lock the
// reactor, and will be unblocked if there is more work to do. Every worker
// triggers `worker.executor.event` every time it finds a runnable task.
let lock = Reactor::get().lock();
let ready = local.event.ready();
futures::pin_mut!(lock);
futures::pin_mut!(ready);
// Block until either the reactor is locked or `local.event` is triggered.
if let Either::Left((mut reactor, _)) = block_on(future::select(lock, ready)) {
// Clear the two I/O events.
let local_ev = local.event.clear();
let worker_ev = worker.executor.event.clear();
// If any of the two I/O events has been triggered, continue the loop.
if local_ev || worker_ev {
continue;
}
// Block until an I/O event occurs.
reactor.wait().expect("failure while waiting on I/O");
}
}
})
})
}
// Limits the number of I/O operations a task can perform in one go.
// Number of times the current task is allowed to poll I/O operations.
//
// When this budget is used up, I/O operations will wake the current task and return
// `Poll::Pending`.
//
// This thread-local is set before running any task.
scoped_thread_local!(static BUDGET: Cell<u32>);
/// Runs a task and returns `true` if it was throttled.
fn use_throttle<T>(run: impl FnOnce() -> T) -> T {
BUDGET.set(&Cell::new(200), run)
/// Sets an I/O budget for polling a future.
///
/// Once this budget is exceeded, polled I/O operations will always wake the current task and
/// return `Poll::Pending`.
///
/// We throttle I/O this way in order to prevent futures from running for
/// too long and thus starving other futures.
fn use_throttle<T>(poll: impl FnOnce() -> T) -> T {
// This is a fairly arbitrary number that seems to work well in practice.
BUDGET.set(&Cell::new(200), poll)
}
/// Returns `Poll::Pending` if the I/O budget has been used up.
fn poll_throttle(cx: &mut Context<'_>) -> Poll<()> {
// Decrement the budget and check if it was zero.
if BUDGET.is_set() && BUDGET.with(|b| b.replace(b.get().saturating_sub(1))) == 0 {
// Make sure to wake the current task. The task is not *really* pending, we're just
// artificially throttling it to let other tasks be run.
cx.waker().wake_by_ref();
return Poll::Pending;
}
@ -387,77 +516,99 @@ fn poll_throttle(cx: &mut Context<'_>) -> Poll<()> {
// ---------- Thread-local executor ----------
// An executor for thread-local tasks.
// The thread-local executor.
//
// Thread-local tasks are spawned by calling `Task::local()` and do not have to implement `Send`.
// They can only be run by the same thread that created them.
// This thread-local is only set while inside `run()`.
scoped_thread_local!(static THREAD_LOCAL_EXECUTOR: ThreadLocalExecutor);
/// A queue of thread-local tasks.
/// An executor for thread-local tasks.
///
/// Thread-local tasks are spawned by calling `Task::local()` and their futures do not have to
/// implement `Send`. They can only be run by the same thread that created them.
struct ThreadLocalExecutor {
/// The main task queue.
queue: RefCell<VecDeque<Runnable>>,
/// When another thread wakes a task belonging to this executor, it goes into this queue.
injector: Arc<SegQueue<Runnable>>,
/// An I/O event that is triggered when another thread wakes a task belonging to this executor.
event: IoEvent,
}
impl ThreadLocalExecutor {
/// Creates a new thread-local executor.
fn new() -> ThreadLocalExecutor {
ThreadLocalExecutor {
queue: RefCell::new(VecDeque::new()),
injector: Arc::new(SegQueue::new()),
event: IoEvent::create().expect("cannot create a self-pipe"), // TODO: Result
event: IoEvent::create().expect("cannot create an `IoEvent`"),
}
}
/// Spawns a task onto this executor.
fn spawn<T: 'static>(&self, future: impl Future<Output = T> + 'static) -> Task<T> {
let id = thread_id();
let injector = self.injector.clone();
let event = self.event.clone();
// The function that schedules a runnable task.
let schedule = move |runnable| {
if thread_id() == id {
// If scheduling from the original thread, push into the main queue.
THREAD_LOCAL_EXECUTOR.with(|ex| ex.queue.borrow_mut().push_back(runnable));
} else {
// If scheduling from a remote thread, push into the injector queue.
// If scheduling from a different thread, push into the injector queue.
injector.push(runnable);
// The original thread may be currently polling so let's interrupt it.
// Trigger an I/O event to let the original thread know that a task has been
// scheduled. If that thread is inside epoll/kqueue/WSAPoll, an I/O event will wake
// it up.
event.set();
}
};
// Create a task, schedule it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn_local(future, schedule, ());
runnable.schedule();
Task(Some(handle))
}
/// Executes a batch of tasks and returns `true` if there is more work to do.
/// Executes a batch of tasks and returns `true` if there are more tasks to run.
fn execute(&self) -> bool {
for _ in 0..200 {
match self.pop() {
None => return false,
Some(r) => {
use_throttle(|| r.run());
for _ in 0..4 {
for _ in 0..50 {
// Find the next task to run.
match self.pop() {
None => return false,
Some(r) => {
use_throttle(|| r.run());
}
}
}
// Poll the reactor and drain the injector queue every now and then.
self.fetch();
}
self.fetch();
true
}
/// Pops the next runnable to run.
/// Pops the next task to run.
fn pop(&self) -> Option<Runnable> {
// Check if there is a task in the main queue.
if let Some(r) = self.queue.borrow_mut().pop_front() {
return Some(r);
}
// Otherwise, fetch more tasks from the reactor or the injector queue.
self.fetch();
// Check the main queue again.
self.queue.borrow_mut().pop_front()
}
/// TODO Moves all tasks from the remote queue into the main queue.
/// Polls the reactor and moves all tasks from the injector queue into the main queue.
fn fetch(&self) {
// The reactor might wake tasks belonging to this executor.
Reactor::get().poll().expect("failure while polling I/O");
// Move tasks from the injector queue into the main queue.
let mut queue = self.queue.borrow_mut();
while let Ok(r) = self.injector.pop() {
queue.push_back(r);
@ -465,88 +616,113 @@ impl ThreadLocalExecutor {
}
}
/// Same as `std::thread::current().id()`, but more efficient.
fn thread_id() -> ThreadId {
thread_local! {
static ID: ThreadId = thread::current().id();
}
ID.with(|id| *id)
ID.try_with(|id| *id)
.unwrap_or_else(|_| thread::current().id())
}
// ---------- Global work-stealing executor ----------
// ---------- Work-stealing executor ----------
/// The work-stealing executor.
// The current thread's worker.
//
// Other threads may steal tasks from this worker through its associated stealer that was
// registered in the work-stealing executor.
//
// This thread-local is only set while inside `run()`.
scoped_thread_local!(static WORKER: Worker);
/// The global work-stealing executor.
///
/// Tasks created by `Task::spawn()` go into this executor and can be executed by any thread that
/// calls `run()`. For that reason, such tasks must implement `Send`. Even if a task is polled once
/// on a certain executor thread, it may afterwards get stolen by a different thread and continue
/// execution there.
/// Tasks created by `Task::spawn()` go into this executor. Any calling `run()` initializes a
/// `Worker` that participates in work stealing, which is allowed to run any task in this executor
/// or in other workers.
///
/// Since tasks can be stolen by any worker and thus move from thread to thread, their futures must
/// implement `Send`.
///
/// Work stealing is a strategy that reduces contention in a multi-threaded environment. If all
/// invocations of `run()` used the same global task queue all the time, they would constantly
/// "step on each other's toes", causing a lot of CPU cache traffic and too often waste time
/// retrying queue operations in compare-and-swap loops.
///
/// The solution is to have a separate queue in each invocation of `run()`, called a "worker".
/// Each thread is primarily using its own worker. Once there are no more tasks in the worker, we
/// either grab a batch of tasks from the main global queue, or steal tasks from other workers.
/// Of course, work-stealing also causes some contention, but much less so.
///
/// More about work stealing: https://en.wikipedia.org/wiki/Work_stealing
struct WorkStealingExecutor {
/// When a thread that is not inside `run()` spawns or wakes a task, it goes into this queue.
injector: deque::Injector<Runnable>,
/// Registered handles for stealing tasks from workers.
stealers: ShardedLock<Slab<deque::Stealer<Runnable>>>,
/// An I/O event that is triggered whenever there might be available tasks to run.
event: IoEvent,
}
impl WorkStealingExecutor {
/// Returns a reference to the global work-stealing executor.
fn get() -> &'static WorkStealingExecutor {
static EXECUTOR: Lazy<WorkStealingExecutor> = Lazy::new(|| {
WorkStealingExecutor {
injector: deque::Injector::new(),
stealers: ShardedLock::new(Slab::new()),
event: IoEvent::create().unwrap(), // TODO: Result
}
static EXECUTOR: Lazy<WorkStealingExecutor> = Lazy::new(|| WorkStealingExecutor {
injector: deque::Injector::new(),
stealers: ShardedLock::new(Slab::new()),
event: IoEvent::create().expect("cannot create an `IoEvent`"),
});
&EXECUTOR
}
/// Spawns a task onto this executor.
fn spawn<T: Send + 'static>(
&'static self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
// The function that schedules a runnable task.
let schedule = move |runnable| {
if WORKER.is_set() {
// If scheduling from a worker thread, push into the worker's queue.
WORKER.with(|w| w.push(runnable));
} else {
// If scheduling from a non-worker thread, push into the injector queue.
self.injector.push(runnable);
// A task has been pushed into the global queue - we need to interrupt.
// Trigger an I/O event to let workers know that a task has been scheduled.
self.event.set();
}
};
// Create a task, schedule it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn(future, schedule, ());
runnable.schedule();
Task(Some(handle))
}
/// Registers a new worker.
///
/// The worker will automatically deregister itself when dropped.
fn worker(&'static self) -> Worker {
let mut stealers = self.stealers.write().unwrap();
let vacant = stealers.vacant_entry();
// Create a worker and put its stealer handle into the executor.
let worker = Worker {
id: vacant.key(),
slot: Cell::new(None),
worker: deque::Worker::new_fifo(),
executor: self,
event: self.event.clone(),
};
vacant.insert(worker.worker.stealer());
worker
}
}
// A worker that executes global tasks.
//
// Tasks created by `Task::spawn()` go into the work-stealing executor. It has a single global
// queue called "injector". However, if all invocations of `run()` pop tasks from the injector
// queue at the same time, that will cause contention that slows everything down.
//
// To reduce contention, each `run()` registers a "worker", which is its own queue containing a
// handful of tasks taken from the injector queue. Then, each worker pops tasks from its own queue.
// If a worker ends up with an empty queue, it will try to steal a batch of tasks from the injector
// queue or from other workers.
//
// This thread-local variable is a handle to the current thread's worker. Other threads may steal
// tasks from it through its associated stealer that was registered in the work-stealing executor.
scoped_thread_local!(static WORKER: Worker);
// TODO
/// A queue of some stealable global tasks.
///
@ -556,23 +732,30 @@ struct Worker {
slot: Cell<Option<Runnable>>,
worker: deque::Worker<Runnable>,
executor: &'static WorkStealingExecutor,
event: IoEvent,
}
impl Worker {
/// Executes a batch of tasks and returns `true` if there is more work to do.
/// Executes a batch of tasks and returns `true` if there are more tasks to run.
fn execute(&self) -> bool {
for _ in 0..4 {
for _ in 0..50 {
// Find the next task to run.
match self.pop() {
None => return false,
Some(r) => {
// Notify other workers that there may be more tasks.
self.executor.event.set();
// Run the task.
if use_throttle(|| r.run()) {
// If the budget was used up TODO
self.push(None);
}
}
}
}
// TODO Poll the reactor and drain the injector queue every now and then.
self.push(None);
self.fetch();
}
@ -585,7 +768,7 @@ impl Worker {
Some(runnable) => {
self.worker.push(runnable);
// A task has been pushed into the local queue - we need to interrupt.
self.event.set();
// self.executor.event.set();
}
}
}
@ -604,7 +787,7 @@ impl Worker {
if let Some(r) = ws_retry(|| self.executor.injector.steal_batch_and_pop(&self.worker)) {
self.push(r); // TODO: optimize interrupts
// A task has been pushed into the local queue - we need to interrupt.
self.event.set();
// self.executor.event.set();
}
// Poll the reactor.
@ -632,7 +815,7 @@ impl Worker {
}) {
self.slot.set(Some(r));
// A task may have been pushed into the local queue - we need to interrupt.
self.event.set();
// self.event.set();
}
}
}
@ -706,6 +889,7 @@ struct Reactor {
sources: piper::Lock<Slab<Arc<Source>>>,
events: piper::Mutex<sys::Events>,
timers: piper::Lock<BTreeMap<(Instant, u64), Waker>>,
event: Lazy<IoEvent>,
}
impl Reactor {
@ -716,17 +900,12 @@ impl Reactor {
sources: piper::Lock::new(Slab::new()),
events: piper::Mutex::new(sys::Events::new()),
timers: piper::Lock::new(BTreeMap::new()),
event: Lazy::new(|| IoEvent::create().expect("cannot create an `IoEvent`")),
}
});
&REACTOR
}
fn event() -> &'static IoEvent {
static EVENT: Lazy<IoEvent> = Lazy::new(|| IoEvent::create().unwrap()); // TODO: result
Reactor::get(); // Make sure the reactor is initialized before the event.
&EVENT
}
/// Registers an I/O source in the reactor.
fn register(
&self,
@ -786,7 +965,7 @@ impl ReactorLock<'_> {
}
fn react(&mut self, block: bool) -> io::Result<()> {
Reactor::event().clear();
self.reactor.event.clear();
let next_timer = {
// Split timers into ready and pending timers.
@ -841,7 +1020,7 @@ impl ReactorLock<'_> {
// ---------- Timer ----------
/// Fires at a certain point in time.
/// Fires at the chosen point in time.
#[derive(Debug)]
pub struct Timer {
id: Option<u64>,
@ -867,7 +1046,7 @@ impl Timer {
// If this timer is going to be the earliest one, interrupt the reactor.
if let Some((first, _)) = timers.keys().next() {
if self.when < *first {
Reactor::event().set();
Reactor::get().event.set();
}
}
@ -996,6 +1175,7 @@ impl<T> Async<T> {
Ok(io)
}
/// TODO
pub async fn with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut op = op;
let mut io = self.io.as_ref().unwrap();
@ -1003,6 +1183,7 @@ impl<T> Async<T> {
future::poll_fn(|cx| Self::poll_io(cx, || op(&mut io), source)).await
}
/// TODO
pub async fn with_mut<R>(&mut self, op: impl FnMut(&mut T) -> io::Result<R>) -> io::Result<R> {
let mut op = op;
let mut io = self.io.as_mut().unwrap();
@ -1307,7 +1488,7 @@ impl Async<UnixDatagram> {
}
}
// ---------- The self-pipe trick ----------
// ---------- Self-pipe ----------
/// A boolean flag that is set whenever a thread-local task is woken by another thread.
///
@ -1468,11 +1649,12 @@ impl BlockingExecutor {
&EXECUTOR
}
/// Spawns a blocking task onto the thread pool.
/// Spawns a task onto this executor.
fn spawn<T: Send + 'static>(
&'static self,
future: impl Future<Output = T> + Send + 'static,
) -> Task<T> {
// Create a task, schedule it, and return its `Task` handle.
let (runnable, handle) = async_task::spawn(future, move |r| self.schedule(r), ());
runnable.schedule();
Task(Some(handle))