diff --git a/src/reactor.rs b/src/reactor.rs index 21d6547..2f959a1 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -157,14 +157,14 @@ impl Reactor { /// /// This doesn't have strong guarantees. If there are ready events, they may or may not be /// processed depending on whether the reactor is locked. - pub fn poll(&self) -> io::Result<()> { - if let Some(events) = self.events.try_lock() { - let reactor = self; - let mut lock = ReactorLock { reactor, events }; + pub fn poll(&self) -> io::Result> { + if let Some(mut lock) = self.try_lock() { // React to events without blocking. - lock.react(false)?; + let n = lock.react(false)?; + Ok(Some(n)) + } else { + Ok(None) } - Ok(()) } /// Locks the reactor. @@ -173,6 +173,14 @@ impl Reactor { let events = self.events.lock().await; ReactorLock { reactor, events } } + + /// Attempts to lock the reactor. + pub fn try_lock(&self) -> Option> { + self.events.try_lock().map(|events| { + let reactor = self; + ReactorLock { reactor, events } + }) + } } /// Polls the reactor for I/O events and wakes up tasks. @@ -183,12 +191,18 @@ pub(crate) struct ReactorLock<'a> { impl ReactorLock<'_> { /// Blocks until at least one event is processed. - pub fn wait(&mut self) -> io::Result<()> { + pub fn wait(&mut self) -> io::Result { self.react(true) } + /// TODO + pub fn poll(&mut self) -> io::Result { + self.react(false) + } + /// Processes new events, optionally blocking until the first event. - fn react(&mut self, block: bool) -> io::Result<()> { + fn react(&mut self, block: bool) -> io::Result { + let mut count = 0; loop { // Fire timers and compute the timeout until the next event. let timeout = self.fire_timers(); @@ -207,7 +221,7 @@ impl ReactorLock<'_> { // The timeout was hit, so check for timers again. Ok(0) => { self.fire_timers(); - return Ok(()); + return Ok(0); // TODO } // At least one I/O event occured. @@ -225,11 +239,12 @@ impl ReactorLock<'_> { // Wake up tasks waiting on I/O. for w in wakers.drain(..) { + count += 1; // TODO: don't forget timers w.wake(); } } - return Ok(()); + return Ok(count); } // The syscall was interrupted - recompute the timeout and restart. diff --git a/src/run.rs b/src/run.rs index 4cfed85..5159fa9 100644 --- a/src/run.rs +++ b/src/run.rs @@ -128,6 +128,7 @@ pub fn run(future: impl Future) -> T { // // This way we make sure that if any changes happen that might give us new work will // unblock epoll/kevent/wepoll and let us continue the loop. + let mut step = 0; loop { // 1. Poll the main future. if let Poll::Ready(val) = throttle::setup(|| future.as_mut().poll(cx)) { @@ -137,15 +138,51 @@ pub fn run(future: impl Future) -> T { let more_local = local.execute(); // 3. Run a batch of tasks in the work-stealing executor. let more_worker = worker.execute(); + // TODO: wake every time a worker is dropped? + // 4. Poll the reactor. - reactor.poll().expect("failure while polling I/O"); + if let Some(mut reactor_lock) = reactor.try_lock() { + step = 0; + + // Clear the two I/O events. + let local_ev = local.event().clear(); + let ws_ev = ws_executor.event().clear(); + + if reactor_lock.poll().expect("failure while polling I/O") > 0 { + continue; + } + + if more_local || more_worker { + continue; + } + + // If any of the two I/O events has been triggered, continue the loop. + if local_ev || ws_ev { + continue; + } + + // Block until an I/O event occurs. + reactor_lock.wait().expect("failure while waiting on I/O"); + + local.event().clear(); + ws_executor.event().clear(); + continue; + } // If there is more work in the thread-local or the work-stealing executor, continue // the loop. if more_local || more_worker { + step = 0; continue; } + step += 1; + if step <= 2 { + std::thread::yield_now(); + continue; + } + step = 0; + // Prepare for blocking until the reactor is locked or `local.event()` is triggered. // // Note that there is no need to wait for `ws_executor.event()`. If the reactor is @@ -173,6 +210,9 @@ pub fn run(future: impl Future) -> T { // Block until an I/O event occurs. reactor_lock.wait().expect("failure while waiting on I/O"); + + local.event().clear(); + ws_executor.event().clear(); } } }) diff --git a/src/thread_local.rs b/src/thread_local.rs index 2b849f9..1f4a4d6 100644 --- a/src/thread_local.rs +++ b/src/thread_local.rs @@ -115,8 +115,7 @@ impl ThreadLocalExecutor { } } - // Poll the reactor and drain the injector queue. We do this occasionally to make - // execution more fair to all tasks involved. + // Drain the injector queue occasionally to make execution more fair. self.fetch(); } @@ -136,11 +135,8 @@ impl ThreadLocalExecutor { self.queue.borrow_mut().pop_front() } - /// Polls the reactor and moves all tasks from the injector queue into the main queue. + /// Moves all tasks from the injector queue into the main queue. fn fetch(&self) { - // The reactor might wake tasks belonging to this executor. - Reactor::get().poll().expect("failure while polling I/O"); - // Move tasks from the injector queue into the main queue. let mut queue = self.queue.borrow_mut(); while let Ok(r) = self.injector.pop() { diff --git a/src/work_stealing.rs b/src/work_stealing.rs index abde327..9167269 100644 --- a/src/work_stealing.rs +++ b/src/work_stealing.rs @@ -33,7 +33,6 @@ use scoped_tls_hkt::scoped_thread_local; use slab::Slab; use crate::io_event::IoEvent; -use crate::reactor::Reactor; use crate::task::{Runnable, Task}; use crate::throttle; @@ -180,10 +179,11 @@ impl Worker<'_> { } } - // Flush the slot, grab some tasks from the global queue, and poll the reactor. We do - // this occasionally to make execution more fair to all tasks involved. self.push(None); - self.fetch(); + if let Some(r) = retry_steal(|| self.executor.injector.steal_batch_and_pop(&self.queue)) + { + self.push(Some(r)); + } } // There are likely more tasks to run. @@ -199,9 +199,6 @@ impl Worker<'_> { if let Some(r) = self.slot.replace(runnable.into()) { // If the slot had a task, push it into the queue. self.queue.push(r); - - // Notify other workers that there are stealable tasks. - self.executor.event.notify(); } } @@ -212,39 +209,14 @@ impl Worker<'_> { return Some(r); } - // If not, fetch more tasks from the injector queue, the reactor, or other workers. - self.fetch(); - - // Check the slot and the queue again. - self.slot.take().or_else(|| self.queue.pop()) - } - - /// Steals from the injector and polls the reactor, or steals from other workers if that fails. - fn fetch(&self) { // Try stealing from the global queue. if let Some(r) = retry_steal(|| self.executor.injector.steal_batch_and_pop(&self.queue)) { - // Push the task, but don't return -- let's not forget to poll the reactor. - self.push(r); + return Some(r); } - // Poll the reactor. - Reactor::get().poll().expect("failure while polling I/O"); - - // If there is at least one task in the slot, return. - if let Some(r) = self.slot.take() { - self.slot.set(Some(r)); - return; - } - - // If there is at least one task in the queue, return. - if !self.queue.is_empty() { - return; - } - - // Still no tasks found - our last hope is to steal from other workers. + // Try stealing from other workers. let stealers = self.executor.stealers.read().unwrap(); - - if let Some(r) = retry_steal(|| { + retry_steal(|| { // Pick a random starting point in the iterator list and rotate the list. let n = stealers.len(); let start = fast_random(n); @@ -258,10 +230,7 @@ impl Worker<'_> { // that's the collected result and we'll retry from the beginning. iter.map(|(_, s)| s.steal_batch_and_pop(&self.queue)) .collect() - }) { - // Push the stolen task. - self.push(r); - } + }) } }