diff --git a/src/lib.rs b/src/lib.rs index d4d77a5..213f11f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -885,15 +885,16 @@ impl LocalQueue { where F: Future, { + // Make the `LocalQueue` structure. + let make_local_queue = |waker: &Waker| LocalQueue { + state: state as *const State as usize, + queue: queue.clone(), + waker: waker.clone(), + }; + // Store the local queue and the current waker. let mut old = with_waker(|waker| { - LOCAL_QUEUE.with(move |slot| { - slot.borrow_mut().replace(LocalQueue { - state: state as *const State as usize, - queue: queue.clone(), - waker: waker.clone(), - }) - }) + LOCAL_QUEUE.with(move |slot| slot.borrow_mut().replace(make_local_queue(waker))) }) .await; @@ -915,7 +916,15 @@ impl LocalQueue { let waker = cx.waker(); move |slot| { let mut slot = slot.borrow_mut(); - let qaw = slot.as_mut().expect("missing local queue"); + let qaw = match slot.as_mut() { + None => { + // Another local queue dropped itself and replaced with None, + // we can take its place! + *slot = Some(make_local_queue(waker)); + return; + } + Some(qaw) => qaw, + }; // If we've been replaced, just ignore the slot. if !Arc::ptr_eq(&qaw.queue, queue) { diff --git a/tests/different_executors.rs b/tests/different_executors.rs new file mode 100644 index 0000000..afef3be --- /dev/null +++ b/tests/different_executors.rs @@ -0,0 +1,34 @@ +use async_executor::LocalExecutor; +use futures_lite::future::{block_on, pending, poll_once}; +use futures_lite::pin; +use std::cell::Cell; + +#[test] +fn shared_queue_slot() { + block_on(async { + let was_polled = Cell::new(false); + let future = async { + was_polled.set(true); + pending::<()>().await; + }; + + let ex1 = LocalExecutor::new(); + let ex2 = LocalExecutor::new(); + + // Start the futures for running forever. + let (run1, run2) = (ex1.run(pending::<()>()), ex2.run(pending::<()>())); + pin!(run1); + pin!(run2); + assert!(poll_once(run1.as_mut()).await.is_none()); + assert!(poll_once(run2.as_mut()).await.is_none()); + + // Spawn the future on executor one and then poll executor two. + ex1.spawn(future).detach(); + assert!(poll_once(run2).await.is_none()); + assert!(!was_polled.get()); + + // Poll the first one. + assert!(poll_once(run1).await.is_none()); + assert!(was_polled.get()); + }); +} diff --git a/tests/local_queue.rs b/tests/local_queue.rs new file mode 100644 index 0000000..4678366 --- /dev/null +++ b/tests/local_queue.rs @@ -0,0 +1,24 @@ +use async_executor::Executor; +use futures_lite::{future, pin}; + +#[test] +fn two_queues() { + future::block_on(async { + // Create an executor with two runners. + let ex = Executor::new(); + let (run1, run2) = ( + ex.run(future::pending::<()>()), + ex.run(future::pending::<()>()), + ); + let mut run1 = Box::pin(run1); + pin!(run2); + + // Poll them both. + assert!(future::poll_once(run1.as_mut()).await.is_none()); + assert!(future::poll_once(run2.as_mut()).await.is_none()); + + // Drop the first one, which should leave the local queue in the `None` state. + drop(run1); + assert!(future::poll_once(run2.as_mut()).await.is_none()); + }); +}