ex: Add an example of an executor with limited tasks
Signed-off-by: John Nunley <dev@notgull.net>
This commit is contained in:
parent
2cfb6e4ed0
commit
917caad8b9
|
@ -27,6 +27,8 @@ async-channel = "1.4.1"
|
|||
async-io = "1.1.9"
|
||||
criterion = { version = "0.4.0", default-features = false, features = ["cargo_bench_support"] }
|
||||
easy-parallel = "3.1.0"
|
||||
event-listener = "3.0.0"
|
||||
fastrand = "2.0.0"
|
||||
once_cell = "1.16.0"
|
||||
|
||||
[[bench]]
|
||||
|
|
|
@ -0,0 +1,167 @@
|
|||
//! An executor where you can only push a limited number of tasks.
|
||||
|
||||
use async_executor::{Executor, Task};
|
||||
use event_listener::{Event, EventListener};
|
||||
use futures_lite::pin;
|
||||
use std::{
|
||||
future::Future,
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
/// An executor where you can only push a limited number of tasks.
|
||||
struct LimitedExecutor {
|
||||
/// Inner running executor.
|
||||
executor: Executor<'static>,
|
||||
|
||||
/// Shared state.
|
||||
shared: Arc<SharedState>,
|
||||
}
|
||||
|
||||
struct SharedState {
|
||||
/// The maximum number of tasks that can be pushed.
|
||||
max: usize,
|
||||
|
||||
/// The current number of active tasks.
|
||||
active: AtomicUsize,
|
||||
|
||||
/// Event listeners for when a new task is available.
|
||||
slot_available: Event,
|
||||
}
|
||||
|
||||
impl LimitedExecutor {
|
||||
fn new(max: usize) -> Self {
|
||||
Self {
|
||||
executor: Executor::new(),
|
||||
shared: Arc::new(SharedState {
|
||||
max,
|
||||
active: AtomicUsize::new(0),
|
||||
slot_available: Event::new(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
/// Spawn a task, waiting until there is a slot available.
|
||||
async fn spawn<F: Future + Send + 'static>(&self, future: F) -> Task<F::Output>
|
||||
where
|
||||
F::Output: Send + 'static,
|
||||
{
|
||||
let listener = EventListener::new(&self.shared.slot_available);
|
||||
pin!(listener);
|
||||
|
||||
// Load the current number of active tasks.
|
||||
let mut active = self.shared.active.load(Ordering::Acquire);
|
||||
|
||||
loop {
|
||||
// Check if there is a slot available.
|
||||
if active < self.shared.max {
|
||||
// Try to set the slot to what would be the new number of tasks.
|
||||
let new_active = active + 1;
|
||||
match self.shared.active.compare_exchange(
|
||||
active,
|
||||
new_active,
|
||||
Ordering::SeqCst,
|
||||
Ordering::SeqCst,
|
||||
) {
|
||||
Ok(_) => {
|
||||
// Wrap the future in another future that decrements the active count
|
||||
// when it's done.
|
||||
let future = {
|
||||
let shared = self.shared.clone();
|
||||
async move {
|
||||
struct DecOnDrop(Arc<SharedState>);
|
||||
|
||||
impl Drop for DecOnDrop {
|
||||
fn drop(&mut self) {
|
||||
// Decrement the count and notify someone.
|
||||
self.0.active.fetch_sub(1, Ordering::SeqCst);
|
||||
self.0.slot_available.notify(usize::MAX);
|
||||
}
|
||||
}
|
||||
|
||||
let _dec = DecOnDrop(shared);
|
||||
future.await
|
||||
}
|
||||
};
|
||||
|
||||
// Wake up another waiter, in case there is one.
|
||||
self.shared.slot_available.notify(1);
|
||||
|
||||
// Spawn the task.
|
||||
return self.executor.spawn(future);
|
||||
}
|
||||
|
||||
Err(actual) => {
|
||||
// Try again.
|
||||
active = actual;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Start waiting for a slot to become available.
|
||||
if listener.as_ref().is_listening() {
|
||||
listener.as_mut().await;
|
||||
} else {
|
||||
listener.as_mut().listen();
|
||||
}
|
||||
|
||||
active = self.shared.active.load(Ordering::Acquire);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Run a future to completion.
|
||||
async fn run<F: Future>(&self, future: F) -> F::Output {
|
||||
self.executor.run(future).await
|
||||
}
|
||||
}
|
||||
|
||||
fn main() {
|
||||
futures_lite::future::block_on(async {
|
||||
let ex = Arc::new(LimitedExecutor::new(10));
|
||||
ex.run({
|
||||
let ex = ex.clone();
|
||||
async move {
|
||||
// Spawn a bunch of tasks that wait for a while.
|
||||
for i in 0..15 {
|
||||
ex.spawn(async move {
|
||||
async_io::Timer::after(Duration::from_millis(fastrand::u64(1..3))).await;
|
||||
println!("Waiting task #{i} finished!");
|
||||
})
|
||||
.await
|
||||
.detach();
|
||||
}
|
||||
|
||||
let (start_tx, start_rx) = async_channel::bounded::<()>(1);
|
||||
let mut current_rx = start_rx;
|
||||
|
||||
// Send the first message.
|
||||
start_tx.send(()).await.unwrap();
|
||||
|
||||
// Spawn a bunch of channel tasks that wake eachother up.
|
||||
for i in 0..25 {
|
||||
let (next_tx, next_rx) = async_channel::bounded::<()>(1);
|
||||
|
||||
ex.spawn(async move {
|
||||
current_rx.recv().await.unwrap();
|
||||
println!("Channel task {i} woken up!");
|
||||
next_tx.send(()).await.unwrap();
|
||||
println!("Channel task {i} finished!");
|
||||
})
|
||||
.await
|
||||
.detach();
|
||||
|
||||
current_rx = next_rx;
|
||||
}
|
||||
|
||||
// Wait for the last task to finish.
|
||||
current_rx.recv().await.unwrap();
|
||||
|
||||
println!("All tasks finished!");
|
||||
}
|
||||
})
|
||||
.await;
|
||||
});
|
||||
}
|
Loading…
Reference in New Issue