This commit is contained in:
Stjepan Glavina 2020-02-16 15:58:06 +01:00
parent 57ef197d0e
commit c97162ae82
2 changed files with 20 additions and 4 deletions

View File

@ -20,6 +20,7 @@ pin-utils = "0.1.0-alpha.4"
scopeguard = "1.0.0"
slab = "0.4.2"
socket2 = "0.3.11"
async-std = { version = "1", features = ["unstable"] }
[target.'cfg(unix)'.dependencies]
nix = "0.16.1"

View File

@ -67,10 +67,12 @@ thread_local! {
static WORKER: RefCell<Option<Worker<Runnable>>> = RefCell::new(None);
}
use async_std::sync::{channel, Receiver, Sender};
struct Executor {
injector: Injector<Runnable>,
stealers: ShardedLock<Vec<(ThreadId, Stealer<Runnable>)>>,
interrupt: Async<IoFlag>,
hot: (Sender<()>, Receiver<()>),
}
impl Executor {
@ -79,6 +81,7 @@ impl Executor {
injector: Injector::new(),
stealers: ShardedLock::new(Vec::new()),
interrupt: IoFlag::create().and_then(Async::nonblocking)?,
hot: channel(1),
})
}
@ -111,7 +114,7 @@ impl Executor {
if let Some(r) = self.find_quick() {
return Some(r);
}
self.poll_quick().unwrap();
self.poll_quick(false).unwrap();
if let Some(r) = self.find_quick() {
return Some(r);
}
@ -139,7 +142,7 @@ impl Executor {
while !io_flag.get() {
if runs > 50 {
runs = 0;
self.poll_quick()?;
self.poll_quick(true)?;
} else if let Some(runnable) = self.find_runnable() {
runs += 1;
let _ = catch_unwind(|| runnable.run());
@ -160,7 +163,14 @@ impl Executor {
});
pin_utils::pin_mut!(flag_ready);
let poller_ready = REACTOR.lock();
let poller_ready = async {
if let Some(poller) = REACTOR.try_lock() {
poller
} else {
self.hot.1.recv().await;
REACTOR.lock().await
}
};
pin_utils::pin_mut!(poller_ready);
// TODO: use piper::select! here
@ -186,10 +196,15 @@ impl Executor {
}
// TODO: return number of events?
fn poll_quick(&self) -> io::Result<()> {
fn poll_quick(&self, check: bool) -> io::Result<()> {
fire_timers();
if let Some(mut poller) = REACTOR.try_lock() {
poller.poll(Some(Duration::from_secs(0)))?;
if check {
use futures_util::future::FutureExt;
self.hot.0.send(()).now_or_never();
}
}
Ok(())
}