Fix a bug where TLS would become None (#55)

* Fix a bug where TLS would become None

The bug is invoked as follows:

- Runner 1 is created and stores the current version of the TLS
  LOCAL_QUEUE variable, which is None.
- Runner 2 is also created. It stores the current version of the TLS
  variable as well, which is Runner 1's queue.
- Runner 1 is dropped. It stores None into the LOCAL_QUEUE variable.
- Runner 2 tries to run. It reads from the LOCAL_QUEUE variable, sees
  that it is None, and panics.

This could be solved by just not using the local queue if the variable
is None. However, we can do one better; if the slot is open, we can
optimize the runner by replacing it with our own queue. This should
allow for the local queue to be used more often.

Closes #54

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2023-09-27 20:01:15 -07:00 committed by GitHub
parent 77b5b169c5
commit 4154ad2190
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 75 additions and 8 deletions

View File

@ -885,15 +885,16 @@ impl LocalQueue {
where
F: Future,
{
// Make the `LocalQueue` structure.
let make_local_queue = |waker: &Waker| LocalQueue {
state: state as *const State as usize,
queue: queue.clone(),
waker: waker.clone(),
};
// Store the local queue and the current waker.
let mut old = with_waker(|waker| {
LOCAL_QUEUE.with(move |slot| {
slot.borrow_mut().replace(LocalQueue {
state: state as *const State as usize,
queue: queue.clone(),
waker: waker.clone(),
})
})
LOCAL_QUEUE.with(move |slot| slot.borrow_mut().replace(make_local_queue(waker)))
})
.await;
@ -915,7 +916,15 @@ impl LocalQueue {
let waker = cx.waker();
move |slot| {
let mut slot = slot.borrow_mut();
let qaw = slot.as_mut().expect("missing local queue");
let qaw = match slot.as_mut() {
None => {
// Another local queue dropped itself and replaced with None,
// we can take its place!
*slot = Some(make_local_queue(waker));
return;
}
Some(qaw) => qaw,
};
// If we've been replaced, just ignore the slot.
if !Arc::ptr_eq(&qaw.queue, queue) {

View File

@ -0,0 +1,34 @@
use async_executor::LocalExecutor;
use futures_lite::future::{block_on, pending, poll_once};
use futures_lite::pin;
use std::cell::Cell;
#[test]
fn shared_queue_slot() {
block_on(async {
let was_polled = Cell::new(false);
let future = async {
was_polled.set(true);
pending::<()>().await;
};
let ex1 = LocalExecutor::new();
let ex2 = LocalExecutor::new();
// Start the futures for running forever.
let (run1, run2) = (ex1.run(pending::<()>()), ex2.run(pending::<()>()));
pin!(run1);
pin!(run2);
assert!(poll_once(run1.as_mut()).await.is_none());
assert!(poll_once(run2.as_mut()).await.is_none());
// Spawn the future on executor one and then poll executor two.
ex1.spawn(future).detach();
assert!(poll_once(run2).await.is_none());
assert!(!was_polled.get());
// Poll the first one.
assert!(poll_once(run1).await.is_none());
assert!(was_polled.get());
});
}

24
tests/local_queue.rs Normal file
View File

@ -0,0 +1,24 @@
use async_executor::Executor;
use futures_lite::{future, pin};
#[test]
fn two_queues() {
future::block_on(async {
// Create an executor with two runners.
let ex = Executor::new();
let (run1, run2) = (
ex.run(future::pending::<()>()),
ex.run(future::pending::<()>()),
);
let mut run1 = Box::pin(run1);
pin!(run2);
// Poll them both.
assert!(future::poll_once(run1.as_mut()).await.is_none());
assert!(future::poll_once(run2.as_mut()).await.is_none());
// Drop the first one, which should leave the local queue in the `None` state.
drop(run1);
assert!(future::poll_once(run2.as_mut()).await.is_none());
});
}