Refactor
This commit is contained in:
parent
86b340b50f
commit
13084b77ee
|
@ -0,0 +1,229 @@
|
|||
use std::cell::Cell;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures_lite::*;
|
||||
use once_cell::sync::Lazy;
|
||||
use waker_fn::waker_fn;
|
||||
|
||||
use crate::reactor::Reactor;
|
||||
|
||||
/// Number of currently active `block_on()` invocations.
|
||||
static BLOCK_ON_COUNT: AtomicUsize = AtomicUsize::new(0);
|
||||
|
||||
/// Unparker for the "async-io" thread.
|
||||
static UNPARKER: Lazy<parking::Unparker> = Lazy::new(|| {
|
||||
let (parker, unparker) = parking::pair();
|
||||
|
||||
// Spawn a helper thread driving the reactor.
|
||||
//
|
||||
// Note that this thread is not exactly necessary, it's only here to help push things
|
||||
// forward if there are no `Parker`s around or if `Parker`s are just idling and never
|
||||
// parking.
|
||||
thread::Builder::new()
|
||||
.name("async-io".to_string())
|
||||
.spawn(move || main_loop(parker))
|
||||
.expect("cannot spawn async-io thread");
|
||||
|
||||
unparker
|
||||
});
|
||||
|
||||
/// Initializes the "async-io" thread.
|
||||
pub(crate) fn init() {
|
||||
Lazy::force(&UNPARKER);
|
||||
}
|
||||
|
||||
/// The main loop for the "async-io" thread.
|
||||
fn main_loop(parker: parking::Parker) {
|
||||
// The last observed reactor tick.
|
||||
let mut last_tick = 0;
|
||||
// Number of sleeps since this thread has called `react()`.
|
||||
let mut sleeps = 0u64;
|
||||
|
||||
loop {
|
||||
let tick = Reactor::get().ticker();
|
||||
|
||||
if last_tick == tick {
|
||||
let reactor_lock = if sleeps >= 10 {
|
||||
// If no new ticks have occurred for a while, stop sleeping and spinning in
|
||||
// this loop and just block on the reactor lock.
|
||||
Some(Reactor::get().lock())
|
||||
} else {
|
||||
Reactor::get().try_lock()
|
||||
};
|
||||
|
||||
if let Some(mut reactor_lock) = reactor_lock {
|
||||
log::trace!("main_loop: waiting on I/O");
|
||||
reactor_lock.react(None).ok();
|
||||
last_tick = Reactor::get().ticker();
|
||||
sleeps = 0;
|
||||
}
|
||||
} else {
|
||||
last_tick = tick;
|
||||
}
|
||||
|
||||
if BLOCK_ON_COUNT.load(Ordering::SeqCst) > 0 {
|
||||
// Exponential backoff from 50us to 10ms.
|
||||
let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000]
|
||||
.get(sleeps as usize)
|
||||
.unwrap_or(&10_000);
|
||||
|
||||
log::trace!("main_loop: sleeping for {} us", delay_us);
|
||||
if parker.park_timeout(Duration::from_micros(*delay_us)) {
|
||||
log::trace!("main_loop: notified");
|
||||
|
||||
// If notified before timeout, reset the last tick and the sleep counter.
|
||||
last_tick = Reactor::get().ticker();
|
||||
sleeps = 0;
|
||||
} else {
|
||||
sleeps += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks the current thread on a future, processing I/O events when idle.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_io::Timer;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// async_io::block_on(async {
|
||||
/// // This timer will likely be processed by the current
|
||||
/// // thread rather than the fallback "async-io" thread.
|
||||
/// Timer::after(Duration::from_millis(1)).await;
|
||||
/// });
|
||||
/// ```
|
||||
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
|
||||
log::trace!("block_on()");
|
||||
|
||||
// Increment `BLOCK_ON_COUNT` so that the "async-io" thread becomes less aggressive.
|
||||
BLOCK_ON_COUNT.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
// Make sure to decrement `BLOCK_ON_COUNT` at the end and wake the "async-io" thread.
|
||||
let _guard = CallOnDrop(|| {
|
||||
BLOCK_ON_COUNT.fetch_sub(1, Ordering::SeqCst);
|
||||
UNPARKER.unpark();
|
||||
});
|
||||
|
||||
// Parker and unparker for notifying the current thread.
|
||||
let (p, u) = parking::pair();
|
||||
// This boolean is set to `true` when the current thread is blocked on I/O.
|
||||
let io_blocked = Arc::new(AtomicBool::new(false));
|
||||
|
||||
thread_local! {
|
||||
// Indicates that the current thread is polling I/O, but not necessarily blocked on it.
|
||||
static IO_POLLING: Cell<bool> = Cell::new(false);
|
||||
}
|
||||
|
||||
// Prepare the waker.
|
||||
let waker = waker_fn({
|
||||
let io_blocked = io_blocked.clone();
|
||||
move || {
|
||||
if u.unpark() {
|
||||
// Check if waking from another thread and if currently blocked on I/O.
|
||||
if !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) {
|
||||
Reactor::get().notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
let cx = &mut Context::from_waker(&waker);
|
||||
pin!(future);
|
||||
|
||||
loop {
|
||||
// Poll the future.
|
||||
if let Poll::Ready(t) = future.as_mut().poll(cx) {
|
||||
log::trace!("block_on: completed");
|
||||
return t;
|
||||
}
|
||||
|
||||
// Check if a notification was received.
|
||||
if p.park_timeout(Duration::from_secs(0)) {
|
||||
log::trace!("block_on: notified");
|
||||
|
||||
// Try grabbing a lock on the reactor to process I/O events.
|
||||
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
|
||||
// First let wakers know this parker is processing I/O events.
|
||||
IO_POLLING.with(|io| io.set(true));
|
||||
let _guard = CallOnDrop(|| {
|
||||
IO_POLLING.with(|io| io.set(false));
|
||||
});
|
||||
|
||||
// Process available I/O events.
|
||||
reactor_lock.react(Some(Duration::from_secs(0))).ok();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Try grabbing a lock on the reactor to wait on I/O.
|
||||
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
|
||||
// Record the instant at which the lock was grabbed.
|
||||
let start = Instant::now();
|
||||
|
||||
loop {
|
||||
// First let wakers know this parker is blocked on I/O.
|
||||
IO_POLLING.with(|io| io.set(true));
|
||||
io_blocked.store(true, Ordering::SeqCst);
|
||||
let _guard = CallOnDrop(|| {
|
||||
IO_POLLING.with(|io| io.set(false));
|
||||
io_blocked.store(false, Ordering::SeqCst);
|
||||
});
|
||||
|
||||
// Check if a notification has been received before `io_blocked` was updated
|
||||
// because in that case the reactor won't receive a wakeup.
|
||||
if p.park_timeout(Duration::from_secs(0)) {
|
||||
log::trace!("block_on: notified");
|
||||
break;
|
||||
}
|
||||
|
||||
// Wait for I/O events.
|
||||
log::trace!("block_on: waiting on I/O");
|
||||
reactor_lock.react(None).ok();
|
||||
|
||||
// Check if a notification has been received.
|
||||
if p.park_timeout(Duration::from_secs(0)) {
|
||||
log::trace!("block_on: notified");
|
||||
break;
|
||||
}
|
||||
|
||||
// Check if this thread been handling I/O events for a long time.
|
||||
if start.elapsed() > Duration::from_micros(500) {
|
||||
log::trace!("block_on: stops hogging the reactor");
|
||||
|
||||
// This thread is clearly processing I/O events for some other threads
|
||||
// because it didn't get a notification yet. It's best to stop hogging the
|
||||
// reactor and give other threads a chance to process I/O events for
|
||||
// themselves.
|
||||
drop(reactor_lock);
|
||||
|
||||
// Unpark the "async-io" thread in case no other thread is ready to start
|
||||
// processing I/O events. This way we prevent a potential latency spike.
|
||||
UNPARKER.unpark();
|
||||
|
||||
// Wait for a notification.
|
||||
p.park();
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Wait for an actual notification.
|
||||
log::trace!("block_on: sleep until notification");
|
||||
p.park();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs a closure when dropped.
|
||||
struct CallOnDrop<F: Fn()>(F);
|
||||
|
||||
impl<F: Fn()> Drop for CallOnDrop<F> {
|
||||
fn drop(&mut self) {
|
||||
(self.0)();
|
||||
}
|
||||
}
|
19
src/lib.rs
19
src/lib.rs
|
@ -81,25 +81,10 @@ use futures_lite::{future, pin, ready};
|
|||
|
||||
use crate::reactor::{Reactor, Source};
|
||||
|
||||
mod driver;
|
||||
mod reactor;
|
||||
|
||||
/// Blocks the current thread on a future, processing I/O events when idle.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use async_io::Timer;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// async_io::block_on(async {
|
||||
/// // This timer will likely be processed by the current
|
||||
/// // thread rather than the fallback "async-io" thread.
|
||||
/// Timer::after(Duration::from_millis(1)).await;
|
||||
/// });
|
||||
/// ```
|
||||
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
|
||||
Reactor::get().block_on(future)
|
||||
}
|
||||
pub use driver::block_on;
|
||||
|
||||
/// A future that expires at a point in time.
|
||||
///
|
||||
|
|
267
src/reactor.rs
267
src/reactor.rs
|
@ -1,4 +1,3 @@
|
|||
use std::cell::Cell;
|
||||
use std::collections::BTreeMap;
|
||||
use std::io;
|
||||
use std::mem;
|
||||
|
@ -7,10 +6,9 @@ use std::os::unix::io::RawFd;
|
|||
#[cfg(windows)]
|
||||
use std::os::windows::io::RawSocket;
|
||||
use std::panic;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex, MutexGuard};
|
||||
use std::task::{Context, Poll, Waker};
|
||||
use std::thread;
|
||||
use std::task::{Poll, Waker};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use concurrent_queue::ConcurrentQueue;
|
||||
|
@ -18,7 +16,6 @@ use futures_lite::*;
|
|||
use once_cell::sync::Lazy;
|
||||
use polling::{Event, Poller};
|
||||
use vec_arena::Arena;
|
||||
use waker_fn::waker_fn;
|
||||
|
||||
const READ: usize = 0;
|
||||
const WRITE: usize = 1;
|
||||
|
@ -27,12 +24,6 @@ const WRITE: usize = 1;
|
|||
///
|
||||
/// There is only one global instance of this type, accessible by [`Reactor::get()`].
|
||||
pub(crate) struct Reactor {
|
||||
/// Number of active `block_on()`s.
|
||||
block_on_count: AtomicUsize,
|
||||
|
||||
/// Unparks the "async-io" thread.
|
||||
thread_unparker: parking::Unparker,
|
||||
|
||||
/// Portable bindings to epoll/kqueue/event ports/wepoll.
|
||||
///
|
||||
/// This is where I/O is polled, producing I/O events.
|
||||
|
@ -72,21 +63,8 @@ impl Reactor {
|
|||
/// Returns a reference to the reactor.
|
||||
pub(crate) fn get() -> &'static Reactor {
|
||||
static REACTOR: Lazy<Reactor> = Lazy::new(|| {
|
||||
let (parker, unparker) = parking::pair();
|
||||
|
||||
// Spawn a helper thread driving the reactor.
|
||||
//
|
||||
// Note that this thread is not exactly necessary, it's only here to help push things
|
||||
// forward if there are no `Parker`s around or if `Parker`s are just idling and never
|
||||
// parking.
|
||||
thread::Builder::new()
|
||||
.name("async-io".to_string())
|
||||
.spawn(move || Reactor::get().main_loop(parker))
|
||||
.expect("cannot spawn async-io thread");
|
||||
|
||||
crate::driver::init();
|
||||
Reactor {
|
||||
block_on_count: AtomicUsize::new(0),
|
||||
thread_unparker: unparker,
|
||||
poller: Poller::new().expect("cannot initialize I/O event notification"),
|
||||
ticker: AtomicUsize::new(0),
|
||||
sources: Mutex::new(Arena::new()),
|
||||
|
@ -98,174 +76,9 @@ impl Reactor {
|
|||
&REACTOR
|
||||
}
|
||||
|
||||
/// The main loop for the "async-io" thread.
|
||||
fn main_loop(&self, parker: parking::Parker) {
|
||||
// The last observed reactor tick.
|
||||
let mut last_tick = 0;
|
||||
// Number of sleeps since this thread has called `react()`.
|
||||
let mut sleeps = 0u64;
|
||||
|
||||
loop {
|
||||
let tick = self.ticker.load(Ordering::SeqCst);
|
||||
|
||||
if last_tick == tick {
|
||||
let reactor_lock = if sleeps >= 10 {
|
||||
// If no new ticks have occurred for a while, stop sleeping and spinning in
|
||||
// this loop and just block on the reactor lock.
|
||||
Some(self.lock())
|
||||
} else {
|
||||
self.try_lock()
|
||||
};
|
||||
|
||||
if let Some(mut reactor_lock) = reactor_lock {
|
||||
log::trace!("main_loop: waiting on I/O");
|
||||
reactor_lock.react(None).ok();
|
||||
last_tick = self.ticker.load(Ordering::SeqCst);
|
||||
sleeps = 0;
|
||||
}
|
||||
} else {
|
||||
last_tick = tick;
|
||||
}
|
||||
|
||||
if self.block_on_count.load(Ordering::SeqCst) > 0 {
|
||||
// Exponential backoff from 50us to 10ms.
|
||||
let delay_us = [50, 75, 100, 250, 500, 750, 1000, 2500, 5000]
|
||||
.get(sleeps as usize)
|
||||
.unwrap_or(&10_000);
|
||||
|
||||
log::trace!("main_loop: sleeping for {} us", delay_us);
|
||||
if parker.park_timeout(Duration::from_micros(*delay_us)) {
|
||||
log::trace!("main_loop: notified");
|
||||
|
||||
// If notified before timeout, reset the last tick and the sleep counter.
|
||||
last_tick = self.ticker.load(Ordering::SeqCst);
|
||||
sleeps = 0;
|
||||
} else {
|
||||
sleeps += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Blocks the current thread on a future, processing I/O events when idle.
|
||||
pub(crate) fn block_on<T>(&self, future: impl Future<Output = T>) -> T {
|
||||
log::trace!("block_on()");
|
||||
|
||||
// Increment `block_on_count` so that the "async-io" thread becomes less aggressive.
|
||||
self.block_on_count.fetch_add(1, Ordering::SeqCst);
|
||||
|
||||
// Make sure to decrement `block_on_count` at the end and wake the "async-io" thread.
|
||||
let _guard = CallOnDrop(|| {
|
||||
Reactor::get().block_on_count.fetch_sub(1, Ordering::SeqCst);
|
||||
Reactor::get().thread_unparker.unpark();
|
||||
});
|
||||
|
||||
// Parker and unparker for notifying the current thread.
|
||||
let (p, u) = parking::pair();
|
||||
// This boolean is set to `true` when the current thread is blocked on I/O.
|
||||
let io_blocked = Arc::new(AtomicBool::new(false));
|
||||
|
||||
thread_local! {
|
||||
// Indicates that the current thread is polling I/O, but not necessarily blocked on it.
|
||||
static IO_POLLING: Cell<bool> = Cell::new(false);
|
||||
}
|
||||
|
||||
// Prepare the waker.
|
||||
let waker = waker_fn({
|
||||
let io_blocked = io_blocked.clone();
|
||||
move || {
|
||||
if u.unpark() {
|
||||
// Check if waking from another thread and if currently blocked on I/O.
|
||||
if !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) {
|
||||
Reactor::get().notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
let cx = &mut Context::from_waker(&waker);
|
||||
pin!(future);
|
||||
|
||||
loop {
|
||||
// Poll the future.
|
||||
if let Poll::Ready(t) = future.as_mut().poll(cx) {
|
||||
log::trace!("block_on: completed");
|
||||
return t;
|
||||
}
|
||||
|
||||
// Check if a notification was received.
|
||||
if p.park_timeout(Duration::from_secs(0)) {
|
||||
log::trace!("block_on: notified");
|
||||
|
||||
// Try grabbing a lock on the reactor to process I/O events.
|
||||
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
|
||||
// First let wakers know this parker is processing I/O events.
|
||||
IO_POLLING.with(|io| io.set(true));
|
||||
let _guard = CallOnDrop(|| {
|
||||
IO_POLLING.with(|io| io.set(false));
|
||||
});
|
||||
|
||||
// Process available I/O events.
|
||||
reactor_lock.react(Some(Duration::from_secs(0))).ok();
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Try grabbing a lock on the reactor to wait on I/O.
|
||||
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
|
||||
// Record the instant at which the lock was grabbed.
|
||||
let start = Instant::now();
|
||||
|
||||
loop {
|
||||
// First let wakers know this parker is blocked on I/O.
|
||||
IO_POLLING.with(|io| io.set(true));
|
||||
io_blocked.store(true, Ordering::SeqCst);
|
||||
let _guard = CallOnDrop(|| {
|
||||
IO_POLLING.with(|io| io.set(false));
|
||||
io_blocked.store(false, Ordering::SeqCst);
|
||||
});
|
||||
|
||||
// Check if a notification has been received before `io_blocked` was updated
|
||||
// because in that case the reactor won't receive a wakeup.
|
||||
if p.park_timeout(Duration::from_secs(0)) {
|
||||
log::trace!("block_on: notified");
|
||||
break;
|
||||
}
|
||||
|
||||
// Wait for I/O events.
|
||||
log::trace!("block_on: waiting on I/O");
|
||||
reactor_lock.react(None).ok();
|
||||
|
||||
// Check if a notification has been received.
|
||||
if p.park_timeout(Duration::from_secs(0)) {
|
||||
log::trace!("block_on: notified");
|
||||
break;
|
||||
}
|
||||
|
||||
// Check if this thread been handling I/O events for a long time.
|
||||
if start.elapsed() > Duration::from_micros(500) {
|
||||
log::trace!("block_on: stops hogging the reactor");
|
||||
|
||||
// This thread is clearly processing I/O events for some other threads
|
||||
// because it didn't get a notification yet. It's best to stop hogging the
|
||||
// reactor and give other threads a chance to process I/O events for
|
||||
// themselves.
|
||||
drop(reactor_lock);
|
||||
|
||||
// Unpark the "async-io" thread in case no other thread is ready to start
|
||||
// processing I/O events. This way we prevent a potential latency spike.
|
||||
self.thread_unparker.unpark();
|
||||
|
||||
// Wait for a notification.
|
||||
p.park();
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Wait for an actual notification.
|
||||
log::trace!("block_on: sleep until notification");
|
||||
p.park();
|
||||
}
|
||||
}
|
||||
/// Returns the current ticker.
|
||||
pub(crate) fn ticker(&self) -> usize {
|
||||
self.ticker.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
/// Registers an I/O source in the reactor.
|
||||
|
@ -333,19 +146,19 @@ impl Reactor {
|
|||
}
|
||||
|
||||
/// Notifies the thread blocked on the reactor.
|
||||
fn notify(&self) {
|
||||
pub(crate) fn notify(&self) {
|
||||
self.poller.notify().expect("failed to notify reactor");
|
||||
}
|
||||
|
||||
/// Locks the reactor, potentially blocking if the lock is held by another thread.
|
||||
fn lock(&self) -> ReactorLock<'_> {
|
||||
pub(crate) fn lock(&self) -> ReactorLock<'_> {
|
||||
let reactor = self;
|
||||
let events = self.events.lock().unwrap();
|
||||
ReactorLock { reactor, events }
|
||||
}
|
||||
|
||||
/// Attempts to lock the reactor.
|
||||
fn try_lock(&self) -> Option<ReactorLock<'_>> {
|
||||
pub(crate) fn try_lock(&self) -> Option<ReactorLock<'_>> {
|
||||
self.events.try_lock().ok().map(|events| {
|
||||
let reactor = self;
|
||||
ReactorLock { reactor, events }
|
||||
|
@ -408,14 +221,14 @@ impl Reactor {
|
|||
}
|
||||
|
||||
/// A lock on the reactor.
|
||||
struct ReactorLock<'a> {
|
||||
pub(crate) struct ReactorLock<'a> {
|
||||
reactor: &'a Reactor,
|
||||
events: MutexGuard<'a, Vec<Event>>,
|
||||
}
|
||||
|
||||
impl ReactorLock<'_> {
|
||||
/// Processes new events, blocking until the first event or the timeout.
|
||||
fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
|
||||
pub(crate) fn react(&mut self, timeout: Option<Duration>) -> io::Result<()> {
|
||||
let mut wakers = Vec::new();
|
||||
|
||||
// Process ready timers.
|
||||
|
@ -546,6 +359,7 @@ impl Direction {
|
|||
self.waker.is_none() && self.wakers.is_empty()
|
||||
}
|
||||
|
||||
/// Moves all wakers into a `Vec`.
|
||||
fn drain_into(&mut self, dst: &mut Vec<Waker>) {
|
||||
if let Some(w) = self.waker.take() {
|
||||
dst.push(w);
|
||||
|
@ -555,6 +369,30 @@ impl Direction {
|
|||
}
|
||||
|
||||
impl Source {
|
||||
/// Registers a waker from `AsyncRead`.
|
||||
///
|
||||
/// If a different waker is already registered, it gets replaced and woken.
|
||||
pub(crate) fn register_reader(&self, waker: &Waker) -> io::Result<()> {
|
||||
self.register(READ, waker)
|
||||
}
|
||||
|
||||
/// Registers a waker from `AsyncWrite`.
|
||||
///
|
||||
/// If a different waker is already registered, it gets replaced and woken.
|
||||
pub(crate) fn register_writer(&self, waker: &Waker) -> io::Result<()> {
|
||||
self.register(WRITE, waker)
|
||||
}
|
||||
|
||||
/// Waits until the I/O source is readable.
|
||||
pub(crate) async fn readable(&self) -> io::Result<()> {
|
||||
self.ready(READ).await
|
||||
}
|
||||
|
||||
/// Waits until the I/O source is writable.
|
||||
pub(crate) async fn writable(&self) -> io::Result<()> {
|
||||
self.ready(WRITE).await
|
||||
}
|
||||
|
||||
/// Registers a waker from `AsyncRead` or `AsyncWrite`.
|
||||
///
|
||||
/// If a different waker is already registered, it gets replaced and woken.
|
||||
|
@ -586,20 +424,6 @@ impl Source {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
/// Registers a waker from `AsyncRead`.
|
||||
///
|
||||
/// If a different waker is already registered, it gets replaced and woken.
|
||||
pub(crate) fn register_reader(&self, waker: &Waker) -> io::Result<()> {
|
||||
self.register(READ, waker)
|
||||
}
|
||||
|
||||
/// Registers a waker from `AsyncWrite`.
|
||||
///
|
||||
/// If a different waker is already registered, it gets replaced and woken.
|
||||
pub(crate) fn register_writer(&self, waker: &Waker) -> io::Result<()> {
|
||||
self.register(WRITE, waker)
|
||||
}
|
||||
|
||||
/// Waits until the I/O source is readable or writable.
|
||||
async fn ready(&self, dir: usize) -> io::Result<()> {
|
||||
let mut ticks = None;
|
||||
|
@ -648,23 +472,4 @@ impl Source {
|
|||
})
|
||||
.await
|
||||
}
|
||||
|
||||
/// Waits until the I/O source is readable.
|
||||
pub(crate) async fn readable(&self) -> io::Result<()> {
|
||||
self.ready(READ).await
|
||||
}
|
||||
|
||||
/// Waits until the I/O source is writable.
|
||||
pub(crate) async fn writable(&self) -> io::Result<()> {
|
||||
self.ready(WRITE).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Runs a closure when dropped.
|
||||
struct CallOnDrop<F: Fn()>(F);
|
||||
|
||||
impl<F: Fn()> Drop for CallOnDrop<F> {
|
||||
fn drop(&mut self) {
|
||||
(self.0)();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue