This commit is contained in:
Stjepan Glavina 2020-07-23 13:05:38 +02:00
parent 98e75b9759
commit b9115cc671
1 changed files with 14 additions and 20 deletions

View File

@ -53,6 +53,7 @@
use std::future::Future;
use async_channel::{unbounded, Receiver};
use async_executor::{Executor, LocalExecutor};
use easy_parallel::Parallel;
@ -107,21 +108,21 @@ pub mod prelude {
/// })
/// ```
pub fn run<T>(future: impl Future<Output = T>) -> T {
setup(num_cpus::get(), future)
}
#[cfg(not(feature = "tokio02"))]
fn setup<T>(num_threads: usize, future: impl Future<Output = T>) -> T {
let ex = Executor::new();
let local_ex = LocalExecutor::new();
// A channel that coordinates shutdown when the main future completes.
let (trigger, shutdown) = async_channel::unbounded::<()>();
let (trigger, shutdown) = unbounded::<()>();
let future = async move {
let _trigger = trigger; // Dropped at the end of this async block.
future.await
};
setup(num_cpus::get(), shutdown, future)
}
#[cfg(not(feature = "tokio02"))]
fn setup<T>(num_threads: usize, shutdown: Receiver<()>, future: impl Future<Output = T>) -> T {
let ex = Executor::new();
let local_ex = LocalExecutor::new();
Parallel::new()
.each(0..num_threads, |_| ex.run(shutdown.recv()))
.finish(|| ex.enter(|| local_ex.run(future)))
@ -129,17 +130,7 @@ fn setup<T>(num_threads: usize, future: impl Future<Output = T>) -> T {
}
#[cfg(feature = "tokio02")]
fn setup<T>(num_threads: usize, future: impl Future<Output = T>) -> T {
let ex = Executor::new();
let local_ex = LocalExecutor::new();
// A channel that signals shutdown to the thread pool when the main future completes.
let (s, shutdown) = async_channel::unbounded::<()>();
let future = async move {
let _s = s; // Drops sender at the end of this async block.
future.await
};
fn setup<T>(num_threads: usize, shutdown: Receiver<()>, future: impl Future<Output = T>) -> T {
// A minimal tokio runtime.
let mut rt = tokio::runtime::Builder::new()
.enable_all()
@ -148,6 +139,9 @@ fn setup<T>(num_threads: usize, future: impl Future<Output = T>) -> T {
.expect("cannot start tokio runtime");
let handle = rt.handle().clone();
let ex = Executor::new();
let local_ex = LocalExecutor::new();
Parallel::new()
.add(|| ex.enter(|| rt.block_on(shutdown.recv())))
.each(0..num_threads, |_| handle.enter(|| ex.run(shutdown.recv())))