This commit is contained in:
Stjepan Glavina 2020-02-10 03:11:04 +01:00
parent 1c404a210c
commit b370f4f117
1 changed files with 62 additions and 76 deletions

View File

@ -11,13 +11,6 @@ use std::future::Future;
use std::io::{self, Read, Write};
use std::mem;
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket};
#[cfg(unix)]
use std::os::unix::{
io::AsRawFd,
net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream},
};
#[cfg(windows)]
use std::os::windows::io::AsRawSocket;
use std::panic::catch_unwind;
use std::path::Path;
use std::pin::Pin;
@ -93,7 +86,7 @@ impl Reactor {
fn poll(&self) -> io::Result<()> {
let interrupted = self.reset();
let next_timer = self.registry.poll_timers();
let next_timer = self.registry.fire_timers();
let timeout = if interrupted {
Some(Duration::from_secs(0))
@ -105,7 +98,7 @@ impl Reactor {
}
fn poll_quick(&self) -> io::Result<()> {
self.registry.poll_timers();
self.registry.fire_timers();
self.registry.wait_io(Some(Duration::from_secs(0)))?;
Ok(())
}
@ -236,7 +229,7 @@ impl Registry {
Ok(())
}
fn poll_timers(&self) -> Option<Instant> {
fn fire_timers(&self) -> Option<Instant> {
let now = Instant::now();
let (ready, next_timer) = {
let mut timers = self.timers.lock();
@ -457,53 +450,50 @@ fn start_thread() {
SLEEPING.fetch_add(1, Ordering::SeqCst);
let timeout = Duration::from_secs(1);
thread::Builder::new()
.name("async-std/blocking".to_string())
.spawn(move || {
loop {
let mut runnable = match THREAD_POOL.receiver.recv_timeout(timeout) {
Ok(runnable) => runnable,
Err(_) => {
// Check whether this is the last sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
// If so, then restart the thread to make sure there is always at least
// one sleeping thread.
if SLEEPING.compare_and_swap(0, 1, Ordering::SeqCst) == 0 {
continue;
}
thread::spawn(move || {
loop {
let mut runnable = match THREAD_POOL.receiver.recv_timeout(timeout) {
Ok(runnable) => runnable,
Err(_) => {
// Check whether this is the last sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
// If so, then restart the thread to make sure there is always at least
// one sleeping thread.
if SLEEPING.compare_and_swap(0, 1, Ordering::SeqCst) == 0 {
continue;
}
// Stop the thread.
return;
}
};
// If there are no sleeping threads, then start one to make sure there is always at
// least one sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
start_thread();
}
loop {
let _ = catch_unwind(|| runnable.run());
// Try taking another runnable if there are any available.
runnable = match THREAD_POOL.receiver.try_recv() {
Ok(runnable) => runnable,
Err(_) => break,
};
}
// If there is at least one sleeping thread, stop this thread instead of putting it
// to sleep.
if SLEEPING.load(Ordering::SeqCst) > 0 {
// Stop the thread.
return;
}
};
SLEEPING.fetch_add(1, Ordering::SeqCst);
// If there are no sleeping threads, then start one to make sure there is always at
// least one sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
start_thread();
}
})
.expect("cannot start a blocking thread");
loop {
let _ = catch_unwind(|| runnable.run());
// Try taking another runnable if there are any available.
runnable = match THREAD_POOL.receiver.try_recv() {
Ok(runnable) => runnable,
Err(_) => break,
};
}
// If there is at least one sleeping thread, stop this thread instead of putting it
// to sleep.
if SLEEPING.load(Ordering::SeqCst) > 0 {
return;
}
SLEEPING.fetch_add(1, Ordering::SeqCst);
}
});
}
/// Blocks on a single future.
@ -672,7 +662,7 @@ pub struct Async<T> {
}
#[cfg(unix)]
impl<T: AsRawFd> Async<T> {
impl<T: std::os::unix::io::AsRawFd> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn nonblocking(source: T) -> io::Result<Async<T>> {
Ok(Async {
@ -683,7 +673,7 @@ impl<T: AsRawFd> Async<T> {
}
#[cfg(windows)]
impl<T: AsRawSocket> Async<T> {
impl<T: std::os::windows::io::AsRawSocket> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn nonblocking(source: T) -> io::Result<Async<T>> {
Ok(Async {
@ -984,7 +974,10 @@ impl Async<UdpSocket> {
}
}
#[cfg(any(unix))]
#[cfg(unix)]
use std::os::unix::net::{SocketAddr as UnixSocketAddr, UnixDatagram, UnixListener, UnixStream};
#[cfg(unix)]
impl Async<UnixListener> {
/// Creates a listener bound to the specified path.
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixListener>> {
@ -1011,7 +1004,7 @@ impl Async<UnixListener> {
}
}
#[cfg(any(unix))]
#[cfg(unix)]
impl Async<UnixStream> {
/// Connects to the specified path.
pub fn connect<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixStream>> {
@ -1029,7 +1022,7 @@ impl Async<UnixStream> {
}
}
#[cfg(any(unix))]
#[cfg(unix)]
impl Async<UnixDatagram> {
/// Creates a socket bound to the specified path.
pub fn bind<P: AsRef<Path>>(path: P) -> io::Result<Async<UnixDatagram>> {
@ -1098,9 +1091,9 @@ mod sys {
pub struct Poller(RawFd);
impl Poller {
pub fn create() -> io::Result<Poller> {
epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC)
.map(Poller)
.map_err(io_err)
Ok(Poller(
epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).map_err(io_err)?,
))
}
pub fn register(&self, source: RawSource, index: usize) -> io::Result<()> {
let ev = &mut EpollEvent::new(flags(), index as u64);
@ -1120,6 +1113,12 @@ mod sys {
Ok(events.len)
}
}
fn flags() -> EpollFlags {
EpollFlags::EPOLLET | EpollFlags::EPOLLIN | EpollFlags::EPOLLOUT | EpollFlags::EPOLLRDHUP
}
fn io_err(err: impl std::error::Error + Send + Sync + 'static) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(err))
}
pub struct Events {
list: Box<[EpollEvent]>,
@ -1140,21 +1139,11 @@ mod sys {
})
}
}
pub struct Event {
pub is_read: bool,
pub is_write: bool,
pub index: usize,
}
#[inline]
fn flags() -> EpollFlags {
EpollFlags::EPOLLET | EpollFlags::EPOLLIN | EpollFlags::EPOLLOUT | EpollFlags::EPOLLRDHUP
}
fn io_err(err: impl std::error::Error + Send + Sync + 'static) -> io::Error {
io::Error::new(io::ErrorKind::Other, Box::new(err))
}
}
// ----- Windows (WSAPoll) -----
@ -1184,7 +1173,7 @@ mod sys {
pub struct Poller(wepoll::Epoll);
impl Poller {
pub fn create() -> io::Result<Poller> {
wepoll::Epoll::new().map(Poller)
Ok(Poller(wepoll::Epoll::new()?))
}
pub fn register(&self, source: RawSource, index: usize) -> io::Result<()> {
self.0.register(source, flags(), index as u64)
@ -1200,6 +1189,10 @@ mod sys {
self.0.poll(events, timeout)
}
}
fn flags() -> wepoll::EventFlag {
use wepoll::EventFlag::*;
ONESHOT | IN | OUT | RDHUP
}
pub struct Events(wepoll::Events);
impl Events {
@ -1214,16 +1207,9 @@ mod sys {
})
}
}
pub struct Event {
pub is_read: bool,
pub is_write: bool,
pub index: usize,
}
#[inline]
fn flags() -> wepoll::EventFlag {
use wepoll::EventFlag::*;
ONESHOT | IN | OUT | RDHUP
}
}