This commit is contained in:
Stjepan Glavina 2020-04-09 17:12:14 +02:00
parent cf59f7b7d7
commit 8c03f01c0a
1 changed files with 124 additions and 159 deletions

View File

@ -24,7 +24,7 @@ use std::mem;
use std::net::{SocketAddr, TcpListener, TcpStream, ToSocketAddrs, UdpSocket};
use std::panic;
use std::pin::Pin;
use std::sync::atomic::{self, AtomicBool, Ordering};
use std::sync::atomic::{self, AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use std::thread::{self, ThreadId};
@ -167,27 +167,24 @@ pub fn run<T>(future: impl Future<Output = T>) -> T {
loop {
flag.clear();
match future.as_mut().poll(cx) {
Poll::Ready(val) => return val,
Poll::Pending => {}
if let Poll::Ready(val) = future.as_mut().poll(cx) {
return val;
}
while !flag.get() {
let more_local = LOCAL_EXECUTOR.with(|ex| ex.execute());
let more_worker = WORKER.set(&worker, || worker.execute());
if more_local || more_worker {
REACTOR.poll_quick().expect("failure while polling I/O");
} else {
if !more_local && !more_worker {
let lock = REACTOR.lock();
let ready = flag.ready();
futures::pin_mut!(lock);
futures::pin_mut!(ready);
// Block until either the reactor is locked or the flag is set.
if let Either::Left((mut poller, _)) = block_on(future::select(lock, ready)) {
if let Either::Left((mut reactor, _)) = block_on(future::select(lock, ready)) {
if !flag.get() {
poller.poll().expect("failure while polling I/O");
reactor.wait().expect("failure while polling I/O");
}
}
}
@ -262,12 +259,11 @@ impl LocalExecutor {
/// Performs some work and returns `true` if there is more work to do.
fn execute(&self) -> bool {
for _ in 0..100 {
match self.pop() {
let runnable = match self.pop() {
None => return false,
Some(runnable) => {
use_throttle(|| runnable.run());
}
}
Some(r) => r,
};
use_throttle(|| runnable.run());
}
self.fetch();
true
@ -284,7 +280,7 @@ impl LocalExecutor {
/// TODO Moves all tasks from the remote queue into the main queue.
fn fetch(&self) {
REACTOR.poll_quick().expect("failure while polling I/O");
REACTOR.poll().expect("failure while polling I/O");
let mut queue = self.queue.borrow_mut();
while let Ok(r) = self.remote.pop() {
@ -331,8 +327,8 @@ impl GlobalExecutor {
Task(Some(handle))
}
/// Registers a new worker.
fn worker(&self) -> Worker {
// Register a new worker.
let id = thread::current().id();
match self.stealers.write().unwrap().entry(id) {
Entry::Occupied(_) => panic!("recursive `run()`"),
@ -365,20 +361,18 @@ struct Worker {
impl Worker {
/// Performs some work and returns `true` if there is more work to do.
fn execute(&self) -> bool {
let mut step = 0;
for _ in 0..100 {
let runnable = match self.pop() {
None => return false,
Some(r) => r,
};
step += 1;
if use_throttle(|| runnable.run()) || step >= 10 {
step = 0;
self.push(None);
for _ in 0..10 {
for _ in 0..10 {
let runnable = match self.pop().or_else(|| self.steal()) {
None => return false,
Some(r) => r,
};
if use_throttle(|| runnable.run()) {
break;
}
}
self.push(None);
}
self.fetch();
true
}
@ -399,21 +393,21 @@ impl Worker {
if let Some(r) = self.slot.take().or_else(|| self.worker.pop()) {
return Some(r);
}
self.fetch();
self.slot.take().or_else(|| self.worker.pop())
}
fn fetch(&self) {
// Try stealing from the global queue.
if let Some(r) = retry(|| GLOBAL_EXECUTOR.injector.steal_batch_and_pop(&self.worker)) {
// A task may have been pushed into the local queue - we need to interrupt.
if let Some(()) = retry(|| GLOBAL_EXECUTOR.injector.steal_batch(&self.worker)) {
// A task has been pushed into the local queue - we need to interrupt.
INTERRUPT.set();
return Some(r);
}
// Poll the reactor.
REACTOR.poll().expect("failure while polling I/O");
}
// Poll the reactor and check if any new tasks were scheduled.
REACTOR.poll_quick().expect("failure while polling I/O");
if let Some(r) = self.slot.take().or_else(|| self.worker.pop()) {
return Some(r);
}
// Try stealing from other workers.
fn steal(&self) -> Option<Runnable> {
let stealers = GLOBAL_EXECUTOR.stealers.read().unwrap();
if let Some(r) = retry(|| {
stealers
@ -427,14 +421,6 @@ impl Worker {
}
None
}
fn fetch(&self) {
if let Some(()) = retry(|| GLOBAL_EXECUTOR.injector.steal_batch(&self.worker)) {
// A task has been pushed into the local queue - we need to interrupt.
INTERRUPT.set();
}
REACTOR.poll_quick().expect("failure while polling I/O");
}
}
impl Drop for Worker {
@ -747,7 +733,7 @@ struct Reactor {
sys: sys::Reactor,
sources: piper::Lock<Slab<Arc<Source>>>,
events: piper::Mutex<sys::Events>,
timers: piper::Lock<BTreeMap<(Instant, usize), Waker>>,
timers: piper::Lock<BTreeMap<(Instant, u64), Waker>>,
}
impl Reactor {
@ -788,57 +774,47 @@ impl Reactor {
/// Processes ready events without blocking.
///
/// This call provides no guarantees and should only be used for the purpose of optimization.
fn poll_quick(&self) -> io::Result<()> {
fn poll(&self) -> io::Result<()> {
if let Some(events) = self.events.try_lock() {
let mut poller = Poller {
reactor: self,
events,
};
poller.poll_quick()?;
let reactor = self;
let mut guard = ReactorGuard { reactor, events };
// React to events without blocking.
guard.react(false)?;
}
Ok(())
}
/// Locks the reactor for polling.
async fn lock(&self) -> Poller<'_> {
Poller {
reactor: self,
events: self.events.lock().await,
}
async fn lock(&self) -> ReactorGuard<'_> {
let reactor = self;
let events = self.events.lock().await;
ReactorGuard { reactor, events }
}
}
/// Polls the reactor for I/O events and wakes up tasks.
struct Poller<'a> {
struct ReactorGuard<'a> {
reactor: &'a Reactor,
events: piper::MutexGuard<'a, sys::Events>,
}
impl Poller<'_> {
impl ReactorGuard<'_> {
/// Blocks until at least one event is processed.
fn poll(&mut self) -> io::Result<()> {
self.poll_internal(true)
fn wait(&mut self) -> io::Result<()> {
self.react(true)
}
/// Processes ready events without blocking.
fn poll_quick(&mut self) -> io::Result<()> {
self.poll_internal(false)
}
fn poll_internal(&mut self, block: bool) -> io::Result<()> {
fn react(&mut self, block: bool) -> io::Result<()> {
let next_timer = {
let now = Instant::now();
let mut timers = self.reactor.timers.lock();
// Split timers into ready and pending timers.
let pending = timers.split_off(&(now, 0));
let mut timers = self.reactor.timers.lock();
let pending = timers.split_off(&(Instant::now(), 0));
let ready = mem::replace(&mut *timers, pending);
// Wake up tasks waiting on timers.
for (_, waker) in ready {
waker.wake();
}
// Find when the next timer fires.
timers.keys().next().map(|(when, _)| *when)
};
@ -854,7 +830,7 @@ impl Poller<'_> {
// Block on I/O events.
loop {
match self.reactor.sys.poll(&mut self.events, timeout) {
match self.reactor.sys.wait(&mut self.events, timeout) {
Ok(0) => return Ok(()),
Ok(_) => break,
Err(err) if err.kind() == io::ErrorKind::Interrupted => {}
@ -865,7 +841,7 @@ impl Poller<'_> {
// Iterate over sources in the event list.
let sources = self.reactor.sources.lock();
for source in self.events.iter().filter_map(|i| sources.get(i)) {
// I/O events may unregister sources, so we need to re-register.
// I/O events may deregister sources, so we need to re-register.
self.reactor.sys.reregister(source.raw, source.index)?;
// Wake up tasks waiting on I/O.
@ -879,11 +855,13 @@ impl Poller<'_> {
// ----- Timer -----
static ID_GENERATOR: AtomicU64 = AtomicU64::new(1);
/// Fires at a certain point in time.
#[derive(Debug)]
pub struct Timer {
id: Option<u64>,
when: Instant,
inserted: bool,
}
impl Timer {
@ -894,27 +872,38 @@ impl Timer {
/// Fires at the specified instant in time.
pub fn at(when: Instant) -> Timer {
Timer {
when,
inserted: false,
let id = None;
Timer { id, when }
}
fn register(&mut self, cx: &mut Context<'_>) {
if self.id.is_none() {
let mut timers = REACTOR.timers.lock();
// If this timer is going to be the earliest one, interrupt the reactor.
if let Some((first, _)) = timers.keys().next() {
if self.when < *first {
INTERRUPT.set();
}
}
// Insert this timer into the timers map.
let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed);
timers.insert((self.when, id), cx.waker().clone());
self.id = Some(id);
}
}
/// Returns an unique identifier for this timer.
///
/// This method assumes the timer is pinned, even though it takes a mutable reference.
fn key(&mut self) -> (Instant, usize) {
let address = self as *mut Timer as usize;
(self.when, address)
fn deregister(&mut self) {
if let Some(id) = self.id {
REACTOR.timers.lock().remove(&(self.when, id));
}
}
}
impl Drop for Timer {
fn drop(&mut self) {
// If this timer is in the timers map, remove it.
if self.inserted {
REACTOR.timers.lock().remove(&self.key());
}
self.deregister();
}
}
@ -924,35 +913,11 @@ impl Future for Timer {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Check if this timer has already fired.
if Instant::now() >= self.when {
if self.inserted {
REACTOR.timers.lock().remove(&self.key());
self.inserted = false;
}
self.deregister();
return Poll::Ready(self.when);
}
// Check if this timer has been inserted into the timers map.
if !self.inserted {
let mut timers = REACTOR.timers.lock();
let mut is_earliest = false;
if let Some((first, _)) = timers.keys().next() {
if self.when < *first {
is_earliest = true;
}
}
// Insert this timer into the timers map.
let waker = cx.waker().clone();
timers.insert(self.key(), waker);
self.inserted = true;
// If this timer is now the earliest one, interrupt the reactor.
if is_earliest {
drop(timers);
INTERRUPT.set();
}
}
self.register(cx);
Poll::Pending
}
}
@ -964,25 +929,25 @@ impl Future for Timer {
/// TODO: does not work with files!
#[derive(Debug)]
pub struct Async<T> {
inner: Option<Box<T>>,
io: Option<Box<T>>,
source: Arc<Source>,
}
#[cfg(unix)]
impl<T: AsRawFd> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn new(inner: T) -> io::Result<Async<T>> {
pub fn new(io: T) -> io::Result<Async<T>> {
use nix::fcntl::{fcntl, FcntlArg, OFlag};
// Put the I/O handle in non-blocking mode.
let flags = fcntl(inner.as_raw_fd(), FcntlArg::F_GETFL).map_err(io_err)?;
let flags = fcntl(io.as_raw_fd(), FcntlArg::F_GETFL).map_err(io_err)?;
let flags = OFlag::from_bits_truncate(flags) | OFlag::O_NONBLOCK;
fcntl(inner.as_raw_fd(), FcntlArg::F_SETFL(flags)).map_err(io_err)?;
fcntl(io.as_raw_fd(), FcntlArg::F_SETFL(flags)).map_err(io_err)?;
// Register the I/O handle in the reactor.
Ok(Async {
source: REACTOR.register(inner.as_raw_fd())?,
inner: Some(Box::new(inner)),
source: REACTOR.register(io.as_raw_fd())?,
io: Some(Box::new(io)),
})
}
}
@ -1006,15 +971,15 @@ fn io_err(err: nix::Error) -> io::Error {
#[cfg(windows)]
impl<T: AsRawSocket> Async<T> {
/// Converts a non-blocking I/O handle into an async I/O handle.
pub fn new(inner: T) -> io::Result<Async<T>> {
pub fn new(io: T) -> io::Result<Async<T>> {
// Put the I/O handle in non-blocking mode.
let socket = unsafe { Socket::from_raw_socket(inner.as_raw_socket()) };
let socket = unsafe { Socket::from_raw_socket(io.as_raw_socket()) };
mem::ManuallyDrop::new(socket).set_nonblocking(true)?;
// Register the I/O handle in the reactor.
Ok(Async {
source: REACTOR.register(inner.as_raw_socket())?,
inner: Some(Box::new(inner)),
source: REACTOR.register(io.as_raw_socket())?,
io: Some(Box::new(io)),
})
}
}
@ -1029,33 +994,33 @@ impl<T: AsRawSocket> AsRawSocket for Async<T> {
impl<T> Async<T> {
/// Gets a reference to the inner I/O handle.
pub fn get_ref(&self) -> &T {
self.inner.as_ref().unwrap()
self.io.as_ref().unwrap()
}
/// Gets a mutable reference to the inner I/O handle.
pub fn get_mut(&mut self) -> &mut T {
self.inner.as_mut().unwrap()
self.io.as_mut().unwrap()
}
/// Extracts the inner non-blocking I/O handle.
pub fn into_inner(mut self) -> io::Result<T> {
let inner = *self.inner.take().unwrap();
let io = *self.io.take().unwrap();
REACTOR.deregister(&self.source)?;
Ok(inner)
Ok(io)
}
pub async fn with<R>(&self, op: impl FnMut(&T) -> io::Result<R>) -> io::Result<R> {
let mut op = op;
let mut inner = self.inner.as_ref().unwrap();
let mut io = self.io.as_ref().unwrap();
let wakers = &self.source.wakers;
future::poll_fn(|cx| Self::poll_io(cx, || op(&mut inner), wakers)).await
future::poll_fn(|cx| Self::poll_io(cx, || op(&mut io), wakers)).await
}
pub async fn with_mut<R>(&mut self, op: impl FnMut(&mut T) -> io::Result<R>) -> io::Result<R> {
let mut op = op;
let mut inner = self.inner.as_mut().unwrap();
let mut io = self.io.as_mut().unwrap();
let wakers = &self.source.wakers;
future::poll_fn(|cx| Self::poll_io(cx, || op(&mut inner), wakers)).await
future::poll_fn(|cx| Self::poll_io(cx, || op(&mut io), wakers)).await
}
fn poll_io<R>(
@ -1088,11 +1053,11 @@ impl<T> Async<T> {
impl<T> Drop for Async<T> {
fn drop(&mut self) {
if self.inner.is_some() {
if self.io.is_some() {
// Destructors should not panic.
let _ = REACTOR.deregister(&self.source);
// Drop and close the source.
self.inner.take();
// Drop the I/O handle to close it.
self.io.take();
}
}
}
@ -1108,7 +1073,7 @@ impl<T: Read> AsyncRead for Async<T> {
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
poll_future(cx, self.with_mut(|inner| inner.read(buf)))
poll_future(cx, self.with_mut(|io| io.read(buf)))
}
}
@ -1121,7 +1086,7 @@ where
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
poll_future(cx, self.with(|inner| (&*inner).read(buf)))
poll_future(cx, self.with(|io| (&*io).read(buf)))
}
}
@ -1131,11 +1096,11 @@ impl<T: Write> AsyncWrite for Async<T> {
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
poll_future(cx, self.with_mut(|inner| inner.write(buf)))
poll_future(cx, self.with_mut(|io| io.write(buf)))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
poll_future(cx, self.with_mut(|inner| inner.flush()))
poll_future(cx, self.with_mut(|io| io.flush()))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@ -1152,11 +1117,11 @@ where
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
poll_future(cx, self.with(|inner| (&*inner).write(buf)))
poll_future(cx, self.with(|io| (&*io).write(buf)))
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
poll_future(cx, self.with(|inner| (&*inner).flush()))
poll_future(cx, self.with(|io| (&*io).flush()))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
@ -1176,7 +1141,7 @@ impl Async<TcpListener> {
/// Accepts a new incoming connection.
pub async fn accept(&self) -> io::Result<(Async<TcpStream>, SocketAddr)> {
let (stream, addr) = self.with(|inner| inner.accept()).await?;
let (stream, addr) = self.with(|io| io.accept()).await?;
Ok((Async::new(stream)?, addr))
}
@ -1221,14 +1186,14 @@ impl Async<TcpStream> {
res => res.map(|_| ()),
};
// The stream becomes writable when connected.
stream.with(|inner| wait_connect(inner)).await?;
stream.with(|io| wait_connect(io)).await?;
Ok(stream)
}
/// Receives data from the stream without removing it from the buffer.
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.with(|inner| inner.peek(buf)).await
self.with(|io| io.peek(buf)).await
}
}
@ -1245,32 +1210,32 @@ impl Async<UdpSocket> {
/// Sends data to the specified address.
pub async fn send_to<A: Into<SocketAddr>>(&self, buf: &[u8], addr: A) -> io::Result<usize> {
let addr = addr.into();
self.with(|inner| inner.send_to(buf, addr)).await
self.with(|io| io.send_to(buf, addr)).await
}
/// Sends data to the socket's peer.
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.with(|inner| inner.send(buf)).await
self.with(|io| io.send(buf)).await
}
/// Receives data from the socket.
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.with(|inner| inner.recv_from(buf)).await
self.with(|io| io.recv_from(buf)).await
}
/// Receives data from the socket's peer.
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.with(|inner| inner.recv(buf)).await
self.with(|io| io.recv(buf)).await
}
/// Receives data without removing it from the buffer.
pub async fn peek_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
self.with(|inner| inner.peek_from(buf)).await
self.with(|io| io.peek_from(buf)).await
}
/// Receives data from the socket's peer without removing it from the buffer.
pub async fn peek(&self, buf: &mut [u8]) -> io::Result<usize> {
self.with(|inner| inner.peek(buf)).await
self.with(|io| io.peek(buf)).await
}
}
@ -1284,7 +1249,7 @@ impl Async<UnixListener> {
/// Accepts a new incoming connection.
pub async fn accept(&self) -> io::Result<(Async<UnixStream>, UnixSocketAddr)> {
let (stream, addr) = self.with(|inner| inner.accept()).await?;
let (stream, addr) = self.with(|io| io.accept()).await?;
Ok((Async::new(stream)?, addr))
}
@ -1335,22 +1300,22 @@ impl Async<UnixDatagram> {
/// Sends data to the specified address.
pub async fn send_to<P: AsRef<Path>>(&self, buf: &[u8], path: P) -> io::Result<usize> {
self.with(|inner| inner.send_to(buf, &path)).await
self.with(|io| io.send_to(buf, &path)).await
}
/// Sends data to the socket's peer.
pub async fn send(&self, buf: &[u8]) -> io::Result<usize> {
self.with(|inner| inner.send(buf)).await
self.with(|io| io.send(buf)).await
}
/// Receives data from the socket.
pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, UnixSocketAddr)> {
self.with(|inner| inner.recv_from(buf)).await
self.with(|io| io.recv_from(buf)).await
}
/// Receives data from the socket's peer.
pub async fn recv(&self, buf: &mut [u8]) -> io::Result<usize> {
self.with(|inner| inner.recv(buf)).await
self.with(|io| io.recv(buf)).await
}
}
@ -1477,7 +1442,7 @@ mod sys {
pub fn deregister(&self, fd: RawFd) -> io::Result<()> {
epoll_ctl(self.0, EpollOp::EpollCtlDel, fd, None).map_err(io_err)
}
pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms = timeout
.and_then(|t| t.as_millis().try_into().ok())
.unwrap_or(-1);
@ -1572,7 +1537,7 @@ mod sys {
}
Ok(())
}
pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout_ms: Option<usize> = timeout.and_then(|t| t.as_millis().try_into().ok());
let timeout = timeout_ms.map(|ms| libc::timespec {
tv_sec: (ms / 1000) as libc::time_t,
@ -1630,7 +1595,7 @@ mod sys {
let _ = self.0.deregister(&As(sock));
Ok(())
}
pub fn poll(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
pub fn wait(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
events.0.clear();
self.0.poll(&mut events.0, timeout)
}