From 8a0832c090625dd36e66ff7b42a76a80b421f90f Mon Sep 17 00:00:00 2001 From: John Nunley Date: Mon, 16 Oct 2023 18:50:00 -0700 Subject: [PATCH] m: Remove the thread-local executor optimization This was added in #37 as an optimization, but has since lead to many bugs. See the issues #53, #57 and #60 for more information. I do not have the bandwidth to address all of these bugs, so I'm taking the path of least resistance by just removing the problematic code. CLoses #53, #57 and #60 Signed-off-by: John Nunley --- src/lib.rs | 167 +++++------------------------------------------------ 1 file changed, 15 insertions(+), 152 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 213f11f..79428cd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,6 @@ html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png" )] -use std::cell::RefCell; use std::fmt; use std::future::Future; use std::marker::PhantomData; @@ -236,56 +235,29 @@ impl<'a> Executor<'a> { let runner = Runner::new(self.state()); let mut rng = fastrand::Rng::new(); - // Set the local queue while we're running. - LocalQueue::set(self.state(), &runner.local, { - let runner = &runner; - async move { - // A future that runs tasks forever. - let run_forever = async { - loop { - for _ in 0..200 { - let runnable = runner.runnable(&mut rng).await; - runnable.run(); - } - future::yield_now().await; - } - }; - - // Run `future` and `run_forever` concurrently until `future` completes. - future.or(run_forever).await + // A future that runs tasks forever. + let run_forever = async { + loop { + for _ in 0..200 { + let runnable = runner.runnable(&mut rng).await; + runnable.run(); + } + future::yield_now().await; } - }) - .await + }; + + // Run `future` and `run_forever` concurrently until `future` completes. + future.or(run_forever).await } /// Returns a function that schedules a runnable task when it gets woken up. fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static { let state = self.state().clone(); - // If possible, push into the current local queue and notify the ticker. + // TODO: If possible, push into the current local queue and notify the ticker. move |runnable| { - let mut runnable = Some(runnable); - - // Try to push into the local queue. - LocalQueue::with(|local_queue| { - // Make sure that we don't accidentally push to an executor that isn't ours. - if local_queue.state != &*state as *const State as usize { - return; - } - - if let Err(e) = local_queue.queue.push(runnable.take().unwrap()) { - runnable = Some(e.into_inner()); - return; - } - - local_queue.waker.wake_by_ref(); - }); - - // If the local queue push failed, just push to the global queue. - if let Some(runnable) = runnable { - state.queue.push(runnable).unwrap(); - state.notify(); - } + state.queue.push(runnable).unwrap(); + state.notify(); } } @@ -853,106 +825,6 @@ impl Drop for Runner<'_> { } } -/// The state of the currently running local queue. -struct LocalQueue { - /// The pointer to the state of the executor. - /// - /// Used to make sure we don't push runnables to the wrong executor. - state: usize, - - /// The concurrent queue. - queue: Arc>, - - /// The waker for the runnable. - waker: Waker, -} - -impl LocalQueue { - /// Run a function with the current local queue. - fn with(f: impl FnOnce(&LocalQueue) -> R) -> Option { - std::thread_local! { - /// The current local queue. - static LOCAL_QUEUE: RefCell> = RefCell::new(None); - } - - impl LocalQueue { - /// Run a function with a set local queue. - async fn set( - state: &State, - queue: &Arc>, - fut: F, - ) -> F::Output - where - F: Future, - { - // Make the `LocalQueue` structure. - let make_local_queue = |waker: &Waker| LocalQueue { - state: state as *const State as usize, - queue: queue.clone(), - waker: waker.clone(), - }; - - // Store the local queue and the current waker. - let mut old = with_waker(|waker| { - LOCAL_QUEUE.with(move |slot| slot.borrow_mut().replace(make_local_queue(waker))) - }) - .await; - - // Restore the old local queue on drop. - let _guard = CallOnDrop(move || { - let old = old.take(); - let _ = LOCAL_QUEUE.try_with(move |slot| { - *slot.borrow_mut() = old; - }); - }); - - // Pin the future. - futures_lite::pin!(fut); - - // Run it such that the waker is updated every time it's polled. - future::poll_fn(move |cx| { - LOCAL_QUEUE - .try_with({ - let waker = cx.waker(); - move |slot| { - let mut slot = slot.borrow_mut(); - let qaw = match slot.as_mut() { - None => { - // Another local queue dropped itself and replaced with None, - // we can take its place! - *slot = Some(make_local_queue(waker)); - return; - } - Some(qaw) => qaw, - }; - - // If we've been replaced, just ignore the slot. - if !Arc::ptr_eq(&qaw.queue, queue) { - return; - } - - // Update the waker, if it has changed. - if !qaw.waker.will_wake(waker) { - qaw.waker = waker.clone(); - } - } - }) - .ok(); - - // Poll the future. - fut.as_mut().poll(cx) - }) - .await - } - } - - LOCAL_QUEUE - .try_with(|local_queue| local_queue.borrow().as_ref().map(f)) - .ok() - .flatten() - } -} - /// Steals some items from one queue into another. fn steal(src: &ConcurrentQueue, dest: &ConcurrentQueue) { // Half of `src`'s length rounded up. @@ -1053,15 +925,6 @@ impl Drop for CallOnDrop { } } -/// Run a closure with the current waker. -fn with_waker R, R>(f: F) -> impl Future { - let mut f = Some(f); - future::poll_fn(move |cx| { - let f = f.take().unwrap(); - Poll::Ready(f(cx.waker())) - }) -} - fn _ensure_send_and_sync() { use futures_lite::future::pending;