Implement local()

This commit is contained in:
Stjepan Glavina 2020-02-20 15:18:08 +01:00
parent 785fa9d436
commit 5634a098e5
2 changed files with 57 additions and 8 deletions

View File

@ -9,6 +9,7 @@ license = "MIT OR Apache-2.0"
[dependencies]
async-task = "1.3.0"
crossbeam-deque = "0.7.2"
crossbeam-queue = "0.1.2"
crossbeam-utils = "0.7.0"
futures-core = "0.3.3"
futures-io = "0.3.3"

View File

@ -37,6 +37,7 @@ use std::{
};
use crossbeam_deque::{Injector, Steal, Stealer, Worker};
use crossbeam_queue::SegQueue;
use crossbeam_utils::sync::{Parker, ShardedLock};
use futures_core::stream::Stream;
use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite};
@ -51,7 +52,6 @@ use slab::Slab;
use socket2::{Domain, Protocol, Socket, Type};
// TODO: fix unwraps
// TODO: if epoll/kqueue/wepoll gets EINTR, then retry - or maybe just call notify()
// TODO: catch panics in wake() and Waker::drop()
// TODO: readme for inspiration: https://github.com/piscisaureus/wepoll
@ -72,6 +72,11 @@ thread_local! {
struct Processor {
slot: Cell<Option<Runnable>>,
worker: Worker<Runnable>,
local: RefCell<VecDeque<Runnable>>,
remote: Arc<SegQueue<Runnable>>,
id: ThreadId,
waker: Arc<Async<IoFlag>>,
stealable: Arc<Async<IoFlag>>,
}
@ -81,6 +86,11 @@ impl Processor {
Ok(Processor {
slot: Cell::new(None),
worker: Worker::new_fifo(),
local: RefCell::new(VecDeque::new()),
remote: Arc::new(SegQueue::new()),
id: thread::current().id(),
waker: Arc::new(Async::nonblocking(IoFlag::create()?)?),
stealable: stealable.clone(),
})
@ -136,6 +146,13 @@ impl Processor {
}
}
}
fn drain_remote(&self) {
let mut local = self.local.borrow_mut();
while let Ok(r) = self.remote.pop() {
local.push_back(r);
}
}
}
struct Executor {
@ -164,6 +181,14 @@ impl Executor {
}
fn find_runnable(&self, proc: &Processor) -> io::Result<Option<Runnable>> {
if let Some(r) = proc.local.borrow_mut().pop_front() {
return Ok(Some(r));
}
proc.drain_remote();
if let Some(r) = proc.local.borrow_mut().pop_front() {
return Ok(Some(r));
}
// First look for a task in the local queue.
if let Some(r) = proc.pop() {
return Ok(Some(r));
@ -328,8 +353,30 @@ impl<T: 'static> Task<T> {
where
T: 'static,
{
// TODO: panic if not called inside a worker started with run()
todo!()
PROCESSOR.with(|proc| {
let proc = proc
.get_or_try_init(|| Processor::create(&EXECUTOR.stealable))
.expect("unexpected I/O error in `local()`");
let id = proc.id;
let remote = proc.remote.clone();
let schedule = move |runnable| {
PROCESSOR.with(|proc| match proc.get() {
Some(proc) if proc.id == id => {
proc.local.borrow_mut().push_back(runnable);
}
Some(_) | None => {
remote.push(runnable);
EXECUTOR.interrupt();
}
});
};
let (runnable, handle) = async_task::spawn_local(future, schedule, ());
runnable.schedule();
Task(Some(handle))
})
}
}
@ -573,15 +620,16 @@ impl Timer {
}
}
fn id(&mut self) -> usize {
self as *mut Timer as usize
fn key(&mut self) -> (Instant, usize) {
let id = self as *mut Timer as usize;
(self.when, id)
}
}
impl Drop for Timer {
fn drop(&mut self) {
if self.inserted {
TIMERS.lock().map.remove(&(self.when, self.id()));
TIMERS.lock().map.remove(&self.key());
}
}
}
@ -593,7 +641,7 @@ impl Future for Timer {
let mut timers = TIMERS.lock();
if Instant::now() >= self.when {
timers.map.remove(&(self.when, self.id()));
timers.map.remove(&self.key());
return Poll::Ready(self.when);
}
@ -606,7 +654,7 @@ impl Future for Timer {
}
let waker = cx.waker().clone();
timers.map.insert((self.when, self.id()), waker);
timers.map.insert(self.key(), waker);
self.inserted = true;
if is_earliest {