Add TryFrom for Async

This commit is contained in:
Stjepan Glavina 2020-09-14 16:07:06 +02:00
parent 82c7e59107
commit 061a96ef8d
2 changed files with 29 additions and 5 deletions

View File

@ -708,6 +708,24 @@ impl<T> Drop for Async<T> {
}
}
#[cfg(windows)]
impl<T: AsRawFd> TryFrom<T> for Async<T> {
type Error = io::Error;
fn try_from(io: T) -> io::Result<Async<T>> {
Async::new(io)
}
}
#[cfg(windows)]
impl<T: AsRawSocket> TryFrom<T> for Async<T> {
type Error = io::Error;
fn try_from(io: T) -> io::Result<Async<T>> {
Async::new(io)
}
}
impl<T: Read> AsyncRead for Async<T> {
fn poll_read(
mut self: Pin<&mut Self>,

View File

@ -30,7 +30,9 @@ pub(crate) struct Reactor {
/// Unparks the "async-io" thread.
thread_unparker: parking::Unparker,
/// Bindings to epoll/kqueue/event ports/wepoll.
/// Portable bindings to epoll/kqueue/event ports/wepoll.
///
/// This is where I/O is polled, producing I/O events.
poller: Poller,
/// Ticker bumped before polling.
@ -45,6 +47,8 @@ pub(crate) struct Reactor {
sources: Mutex<Arena<Arc<Source>>>,
/// Temporary storage for I/O events when polling the reactor.
///
/// Holding a lock on this event list implies the exclusive right to poll I/O.
events: Mutex<Vec<Event>>,
/// An ordered map of registered timers.
@ -103,8 +107,8 @@ impl Reactor {
if last_tick == tick {
let reactor_lock = if sleeps >= 10 {
// If no new ticks have occurred for a while, stop sleeping and
// spinning in this loop and just block on the reactor lock.
// If no new ticks have occurred for a while, stop sleeping and spinning in
// this loop and just block on the reactor lock.
Some(self.lock())
} else {
self.try_lock()
@ -129,7 +133,8 @@ impl Reactor {
log::trace!("main_loop: sleeping for {} us", delay_us);
if parker.park_timeout(Duration::from_micros(*delay_us)) {
log::trace!("main_loop: notified");
// If woken before timeout, reset the last tick and sleep counter.
// If notified before timeout, reset the last tick and the sleep counter.
last_tick = self.ticker.load(Ordering::SeqCst);
sleeps = 0;
} else {
@ -236,6 +241,7 @@ impl Reactor {
// Check if this thread been handling I/O events for a long time.
if start.elapsed() > Duration::from_micros(500) {
log::trace!("block_on: stops hogging the reactor");
// This thread is clearly processing I/O events for some other threads
// because it didn't get a notification yet. It's best to stop hogging the
// reactor and give other threads a chance to process I/O events for
@ -252,8 +258,8 @@ impl Reactor {
}
}
} else {
log::trace!("block_on: sleep until notification");
// Wait for an actual notification.
log::trace!("block_on: sleep until notification");
p.park();
}
}