ex: Use Semaphore instead of manual event-listener

Whoops, I accidentally reinvented a semaphore and made the example a lot
more complicated than it needed to be.

Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
John Nunley 2024-01-08 16:01:07 -08:00 committed by GitHub
parent 57fcc2d991
commit 6c70369102
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 16 additions and 88 deletions

View File

@ -28,9 +28,9 @@ futures-lite = { version = "2.0.0", default-features = false, features = ["std"]
[dev-dependencies]
async-channel = "2.0.0"
async-io = "2.1.0"
async-lock = "3.0.0"
criterion = { version = "0.4.0", default-features = false, features = ["cargo_bench_support"] }
easy-parallel = "3.1.0"
event-listener = "3.0.0"
fastrand = "2.0.0"
futures-lite = "2.0.0"
once_cell = "1.16.0"

View File

@ -1,46 +1,23 @@
//! An executor where you can only push a limited number of tasks.
use async_executor::{Executor, Task};
use event_listener::{Event, EventListener};
use futures_lite::pin;
use std::{
future::Future,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use async_lock::Semaphore;
use std::{future::Future, sync::Arc, time::Duration};
/// An executor where you can only push a limited number of tasks.
struct LimitedExecutor {
/// Inner running executor.
executor: Executor<'static>,
/// Shared state.
shared: Arc<SharedState>,
}
struct SharedState {
/// The maximum number of tasks that can be pushed.
max: usize,
/// The current number of active tasks.
active: AtomicUsize,
/// Event listeners for when a new task is available.
slot_available: Event,
/// Semaphore limiting the number of tasks.
semaphore: Arc<Semaphore>,
}
impl LimitedExecutor {
fn new(max: usize) -> Self {
Self {
executor: Executor::new(),
shared: Arc::new(SharedState {
max,
active: AtomicUsize::new(0),
slot_available: Event::new(),
}),
semaphore: Semaphore::new(max).into(),
}
}
@ -49,67 +26,18 @@ impl LimitedExecutor {
where
F::Output: Send + 'static,
{
let listener = EventListener::new(&self.shared.slot_available);
pin!(listener);
// Wait for a semaphore permit.
let permit = self.semaphore.acquire_arc().await;
// Load the current number of active tasks.
let mut active = self.shared.active.load(Ordering::Acquire);
// Wrap it into a new future.
let future = async move {
let result = future.await;
drop(permit);
result
};
loop {
// Check if there is a slot available.
if active < self.shared.max {
// Try to set the slot to what would be the new number of tasks.
let new_active = active + 1;
match self.shared.active.compare_exchange(
active,
new_active,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
// Wrap the future in another future that decrements the active count
// when it's done.
let future = {
let shared = self.shared.clone();
async move {
struct DecOnDrop(Arc<SharedState>);
impl Drop for DecOnDrop {
fn drop(&mut self) {
// Decrement the count and notify someone.
self.0.active.fetch_sub(1, Ordering::SeqCst);
self.0.slot_available.notify(usize::MAX);
}
}
let _dec = DecOnDrop(shared);
future.await
}
};
// Wake up another waiter, in case there is one.
self.shared.slot_available.notify(1);
// Spawn the task.
return self.executor.spawn(future);
}
Err(actual) => {
// Try again.
active = actual;
}
}
} else {
// Start waiting for a slot to become available.
if listener.as_ref().is_listening() {
listener.as_mut().await;
} else {
listener.as_mut().listen();
}
active = self.shared.active.load(Ordering::Acquire);
}
}
// Spawn the task.
self.executor.spawn(future)
}
/// Run a future to completion.