m: Remove the thread-local executor optimization
This was added in #37 as an optimization, but has since lead to many bugs. See the issues #53, #57 and #60 for more information. I do not have the bandwidth to address all of these bugs, so I'm taking the path of least resistance by just removing the problematic code. CLoses #53, #57 and #60 Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
parent
917caad8b9
commit
8a0832c090
167
src/lib.rs
167
src/lib.rs
|
@ -26,7 +26,6 @@
|
|||
html_logo_url = "https://raw.githubusercontent.com/smol-rs/smol/master/assets/images/logo_fullsize_transparent.png"
|
||||
)]
|
||||
|
||||
use std::cell::RefCell;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::marker::PhantomData;
|
||||
|
@ -236,56 +235,29 @@ impl<'a> Executor<'a> {
|
|||
let runner = Runner::new(self.state());
|
||||
let mut rng = fastrand::Rng::new();
|
||||
|
||||
// Set the local queue while we're running.
|
||||
LocalQueue::set(self.state(), &runner.local, {
|
||||
let runner = &runner;
|
||||
async move {
|
||||
// A future that runs tasks forever.
|
||||
let run_forever = async {
|
||||
loop {
|
||||
for _ in 0..200 {
|
||||
let runnable = runner.runnable(&mut rng).await;
|
||||
runnable.run();
|
||||
}
|
||||
future::yield_now().await;
|
||||
}
|
||||
};
|
||||
|
||||
// Run `future` and `run_forever` concurrently until `future` completes.
|
||||
future.or(run_forever).await
|
||||
// A future that runs tasks forever.
|
||||
let run_forever = async {
|
||||
loop {
|
||||
for _ in 0..200 {
|
||||
let runnable = runner.runnable(&mut rng).await;
|
||||
runnable.run();
|
||||
}
|
||||
future::yield_now().await;
|
||||
}
|
||||
})
|
||||
.await
|
||||
};
|
||||
|
||||
// Run `future` and `run_forever` concurrently until `future` completes.
|
||||
future.or(run_forever).await
|
||||
}
|
||||
|
||||
/// Returns a function that schedules a runnable task when it gets woken up.
|
||||
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
|
||||
let state = self.state().clone();
|
||||
|
||||
// If possible, push into the current local queue and notify the ticker.
|
||||
// TODO: If possible, push into the current local queue and notify the ticker.
|
||||
move |runnable| {
|
||||
let mut runnable = Some(runnable);
|
||||
|
||||
// Try to push into the local queue.
|
||||
LocalQueue::with(|local_queue| {
|
||||
// Make sure that we don't accidentally push to an executor that isn't ours.
|
||||
if local_queue.state != &*state as *const State as usize {
|
||||
return;
|
||||
}
|
||||
|
||||
if let Err(e) = local_queue.queue.push(runnable.take().unwrap()) {
|
||||
runnable = Some(e.into_inner());
|
||||
return;
|
||||
}
|
||||
|
||||
local_queue.waker.wake_by_ref();
|
||||
});
|
||||
|
||||
// If the local queue push failed, just push to the global queue.
|
||||
if let Some(runnable) = runnable {
|
||||
state.queue.push(runnable).unwrap();
|
||||
state.notify();
|
||||
}
|
||||
state.queue.push(runnable).unwrap();
|
||||
state.notify();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -853,106 +825,6 @@ impl Drop for Runner<'_> {
|
|||
}
|
||||
}
|
||||
|
||||
/// The state of the currently running local queue.
|
||||
struct LocalQueue {
|
||||
/// The pointer to the state of the executor.
|
||||
///
|
||||
/// Used to make sure we don't push runnables to the wrong executor.
|
||||
state: usize,
|
||||
|
||||
/// The concurrent queue.
|
||||
queue: Arc<ConcurrentQueue<Runnable>>,
|
||||
|
||||
/// The waker for the runnable.
|
||||
waker: Waker,
|
||||
}
|
||||
|
||||
impl LocalQueue {
|
||||
/// Run a function with the current local queue.
|
||||
fn with<R>(f: impl FnOnce(&LocalQueue) -> R) -> Option<R> {
|
||||
std::thread_local! {
|
||||
/// The current local queue.
|
||||
static LOCAL_QUEUE: RefCell<Option<LocalQueue>> = RefCell::new(None);
|
||||
}
|
||||
|
||||
impl LocalQueue {
|
||||
/// Run a function with a set local queue.
|
||||
async fn set<F>(
|
||||
state: &State,
|
||||
queue: &Arc<ConcurrentQueue<Runnable>>,
|
||||
fut: F,
|
||||
) -> F::Output
|
||||
where
|
||||
F: Future,
|
||||
{
|
||||
// Make the `LocalQueue` structure.
|
||||
let make_local_queue = |waker: &Waker| LocalQueue {
|
||||
state: state as *const State as usize,
|
||||
queue: queue.clone(),
|
||||
waker: waker.clone(),
|
||||
};
|
||||
|
||||
// Store the local queue and the current waker.
|
||||
let mut old = with_waker(|waker| {
|
||||
LOCAL_QUEUE.with(move |slot| slot.borrow_mut().replace(make_local_queue(waker)))
|
||||
})
|
||||
.await;
|
||||
|
||||
// Restore the old local queue on drop.
|
||||
let _guard = CallOnDrop(move || {
|
||||
let old = old.take();
|
||||
let _ = LOCAL_QUEUE.try_with(move |slot| {
|
||||
*slot.borrow_mut() = old;
|
||||
});
|
||||
});
|
||||
|
||||
// Pin the future.
|
||||
futures_lite::pin!(fut);
|
||||
|
||||
// Run it such that the waker is updated every time it's polled.
|
||||
future::poll_fn(move |cx| {
|
||||
LOCAL_QUEUE
|
||||
.try_with({
|
||||
let waker = cx.waker();
|
||||
move |slot| {
|
||||
let mut slot = slot.borrow_mut();
|
||||
let qaw = match slot.as_mut() {
|
||||
None => {
|
||||
// Another local queue dropped itself and replaced with None,
|
||||
// we can take its place!
|
||||
*slot = Some(make_local_queue(waker));
|
||||
return;
|
||||
}
|
||||
Some(qaw) => qaw,
|
||||
};
|
||||
|
||||
// If we've been replaced, just ignore the slot.
|
||||
if !Arc::ptr_eq(&qaw.queue, queue) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Update the waker, if it has changed.
|
||||
if !qaw.waker.will_wake(waker) {
|
||||
qaw.waker = waker.clone();
|
||||
}
|
||||
}
|
||||
})
|
||||
.ok();
|
||||
|
||||
// Poll the future.
|
||||
fut.as_mut().poll(cx)
|
||||
})
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
LOCAL_QUEUE
|
||||
.try_with(|local_queue| local_queue.borrow().as_ref().map(f))
|
||||
.ok()
|
||||
.flatten()
|
||||
}
|
||||
}
|
||||
|
||||
/// Steals some items from one queue into another.
|
||||
fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
|
||||
// Half of `src`'s length rounded up.
|
||||
|
@ -1053,15 +925,6 @@ impl<F: FnMut()> Drop for CallOnDrop<F> {
|
|||
}
|
||||
}
|
||||
|
||||
/// Run a closure with the current waker.
|
||||
fn with_waker<F: FnOnce(&Waker) -> R, R>(f: F) -> impl Future<Output = R> {
|
||||
let mut f = Some(f);
|
||||
future::poll_fn(move |cx| {
|
||||
let f = f.take().unwrap();
|
||||
Poll::Ready(f(cx.waker()))
|
||||
})
|
||||
}
|
||||
|
||||
fn _ensure_send_and_sync() {
|
||||
use futures_lite::future::pending;
|
||||
|
||||
|
|
Loading…
Reference in New Issue