Replace async-channel with flume

This commit replaces async-channel in some tests with flume. It appears
that async-channel doesn't work under MIRI but flume does, so we can
work around this for now by replacing it with flume.

cc https://github.com/smol-rs/async-channel/issues/85

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2024-04-08 14:10:39 -07:00 committed by GitHub
parent df82e5bb5e
commit 542831132f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 32 additions and 28 deletions

View File

@ -24,8 +24,8 @@ default = ["std"]
std = ["event-listener/std", "event-listener-strategy/std"]
[dev-dependencies]
async-channel = "2.2.0"
fastrand = "2.0.0"
flume = "0.11.0"
futures-lite = "2.0.0"
waker-fn = "1.1.0"

View File

@ -123,6 +123,8 @@ impl Barrier {
/// println!("after wait");
/// });
/// }
/// # // Wait for threads to stop.
/// # std::thread::sleep(std::time::Duration::from_secs(1));
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub fn wait_blocking(&self) -> BarrierWaitResult {

View File

@ -5,6 +5,7 @@ use async_lock::Barrier;
use futures_lite::future;
#[test]
#[cfg_attr(miri, ignore)]
fn smoke() {
future::block_on(async move {
const N: usize = 10;
@ -12,7 +13,7 @@ fn smoke() {
let barrier = Arc::new(Barrier::new(N));
for _ in 0..10 {
let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();
for _ in 0..N - 1 {
let c = barrier.clone();
@ -21,7 +22,7 @@ fn smoke() {
thread::spawn(move || {
future::block_on(async move {
let res = c.wait().await;
tx.send(res.is_leader()).await.unwrap();
tx.send_async(res.is_leader()).await.unwrap();
})
});
}
@ -35,7 +36,7 @@ fn smoke() {
// Now, the barrier is cleared and we should get data.
for _ in 0..N - 1 {
if rx.recv().await.unwrap() {
if rx.recv_async().await.unwrap() {
assert!(!leader_found);
leader_found = true;
}
@ -55,7 +56,7 @@ fn smoke_blocking() {
let barrier = Arc::new(Barrier::new(N));
for _ in 0..10 {
let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();
for _ in 0..N - 1 {
let c = barrier.clone();
@ -63,7 +64,7 @@ fn smoke_blocking() {
thread::spawn(move || {
let res = c.wait_blocking();
tx.send_blocking(res.is_leader()).unwrap();
tx.send(res.is_leader()).unwrap();
});
}
@ -76,7 +77,7 @@ fn smoke_blocking() {
// Now, the barrier is cleared and we should get data.
for _ in 0..N - 1 {
if rx.recv().await.unwrap() {
if rx.recv_async().await.unwrap() {
assert!(!leader_found);
leader_found = true;
}

View File

@ -63,7 +63,7 @@ fn get_mut() {
#[test]
fn contention() {
future::block_on(async {
let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();
let tx = Arc::new(tx);
let mutex = Arc::new(Mutex::new(0i32));
@ -77,14 +77,14 @@ fn contention() {
future::block_on(async move {
let mut lock = mutex.lock().await;
*lock += 1;
tx.send(()).await.unwrap();
tx.send_async(()).await.unwrap();
drop(lock);
})
});
}
for _ in 0..num_tasks {
rx.recv().await.unwrap();
rx.recv_async().await.unwrap();
}
let lock = mutex.lock().await;

View File

@ -25,13 +25,13 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
#[cfg(not(target_family = "wasm"))]
fn spawn<T: Send + 'static>(f: impl Future<Output = T> + Send + 'static) -> future::Boxed<T> {
let (s, r) = async_channel::bounded(1);
let (s, r) = flume::bounded(1);
thread::spawn(move || {
future::block_on(async {
let _ = s.send(f.await).await;
let _ = s.send_async(f.await).await;
})
});
async move { r.recv().await.unwrap() }.boxed()
async move { r.recv_async().await.unwrap() }.boxed()
}
#[test]
@ -119,13 +119,14 @@ fn get_mut() {
}
// Miri bug; this works when async is replaced with blocking
#[cfg(not(any(target_family = "wasm", miri)))]
#[cfg(not(target_family = "wasm"))]
#[test]
#[cfg_attr(miri, ignore)]
fn contention() {
const N: u32 = 10;
const M: usize = if cfg!(miri) { 100 } else { 1000 };
let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();
let tx = Arc::new(tx);
let rw = Arc::new(RwLock::new(()));
@ -142,25 +143,25 @@ fn contention() {
drop(rw.read().await);
}
}
tx.send(()).await.unwrap();
tx.send_async(()).await.unwrap();
});
}
future::block_on(async move {
for _ in 0..N {
rx.recv().await.unwrap();
rx.recv_async().await.unwrap();
}
});
}
// Miri bug; this works when async is replaced with blocking
#[cfg(not(any(target_family = "wasm", miri)))]
#[cfg(not(target_family = "wasm"))]
#[test]
#[cfg_attr(miri, ignore)]
fn contention_arc() {
const N: u32 = 10;
const M: usize = if cfg!(miri) { 100 } else { 1000 };
let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();
let tx = Arc::new(tx);
let rw = Arc::new(RwLock::new(()));
@ -177,13 +178,13 @@ fn contention_arc() {
drop(rw.read_arc().await);
}
}
tx.send(()).await.unwrap();
tx.send_async(()).await.unwrap();
});
}
future::block_on(async move {
for _ in 0..N {
rx.recv().await.unwrap();
rx.recv_async().await.unwrap();
}
});
}
@ -192,7 +193,7 @@ fn contention_arc() {
#[test]
fn writer_and_readers() {
let lock = Arc::new(RwLock::new(0i32));
let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();
// Spawn a writer task.
let _spawned = spawn({
@ -205,7 +206,7 @@ fn writer_and_readers() {
future::yield_now().await;
*lock = tmp + 1;
}
tx.send(()).await.unwrap();
tx.send_async(()).await.unwrap();
}
});
@ -228,7 +229,7 @@ fn writer_and_readers() {
}
// Wait for writer to finish.
rx.recv().await.unwrap();
rx.recv_async().await.unwrap();
let lock = lock.read().await;
assert_eq!(*lock, 1000);
});
@ -238,7 +239,7 @@ fn writer_and_readers() {
#[test]
fn writer_and_readers_arc() {
let lock = Arc::new(RwLock::new(0i32));
let (tx, rx) = async_channel::unbounded();
let (tx, rx) = flume::unbounded();
// Spawn a writer task.
let _spawned = spawn({
@ -251,7 +252,7 @@ fn writer_and_readers_arc() {
future::yield_now().await;
*lock = tmp + 1;
}
tx.send(()).await.unwrap();
tx.send_async(()).await.unwrap();
}
});
@ -274,7 +275,7 @@ fn writer_and_readers_arc() {
}
// Wait for writer to finish.
rx.recv().await.unwrap();
rx.recv_async().await.unwrap();
let lock = lock.read_arc().await;
assert_eq!(*lock, 1000);
});