Add block_on() and SMOL_THREADS

This commit is contained in:
Stjepan Glavina 2020-07-27 13:59:47 +02:00
parent ead5867905
commit 974c4cd218
3 changed files with 81 additions and 33 deletions

View File

@ -24,6 +24,7 @@ blocking = "0.5.0"
easy-parallel = "3.1.0"
futures-lite = "0.1.9"
num_cpus = "1.13.0"
cfg-if = "0.1.10"
[dependencies.tokio]
version = "0.2.22"

View File

@ -12,7 +12,7 @@ fn main() {
// Set a handler that sends a message through a channel.
let (s, ctrl_c) = async_channel::bounded(100);
let handle = move || {
let _ = future::poll_once(s.send(()));
let _ = s.try_send(());
};
ctrlc::set_handler(handle).unwrap();

View File

@ -51,10 +51,11 @@
#![forbid(unsafe_code)]
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::env;
use std::future::Future;
use async_channel::{unbounded, Receiver};
use async_executor::{Executor, LocalExecutor};
use cfg_if::cfg_if;
use easy_parallel::Parallel;
#[doc(inline)]
@ -86,14 +87,59 @@ pub mod prelude {
};
}
/// Starts executors and runs the future.
/// Starts a thread-local executor and then runs the future.
///
/// # Examples
///
/// ```
/// use smol::Task;
///
/// smol::block_on(async {
/// let task = Task::local(async {
/// println!("Hello world");
/// });
/// task.await;
/// })
/// ```
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
let local_ex = LocalExecutor::new();
cfg_if! {
if #[cfg(not(feature = "tokio02"))] {
local_ex.run(future)
} else {
// A minimal tokio runtime to support libraries depending on it.
let mut rt = tokio::runtime::Builder::new()
.enable_all()
.basic_scheduler()
.build()
.expect("cannot start tokio runtime");
let handle = rt.handle().clone();
// A channel that coordinates shutdown when the main future completes.
let (trigger, shutdown) = async_channel::unbounded::<()>();
let future = async move {
let _trigger = trigger; // Dropped at the end of this async block.
future.await
};
Parallel::new()
.add(|| rt.block_on(shutdown.recv()))
.finish(|| handle.enter(|| local_ex.run(future)))
.1
}
}
}
/// Starts a thread-local and a multi-threaded executor and then runs the future.
///
/// This function runs two executors at the same time:
///
/// 1. The current thread runs a [`LocalExecutor`] and the main `future` on it.
/// 2. A thread pool runs an [`Executor`] until the main `future` completes.
///
/// The number of spawned threads matches the number of logical CPU cores on the system.
/// The number of spawned threads matches the number of logical CPU cores on the system, but it can
/// be overriden by setting the `SMOL_THREADS` environment variable.
///
/// # Examples
///
@ -109,42 +155,43 @@ pub mod prelude {
/// ```
pub fn run<T>(future: impl Future<Output = T>) -> T {
// A channel that coordinates shutdown when the main future completes.
let (trigger, shutdown) = unbounded::<()>();
let (trigger, shutdown) = async_channel::unbounded::<()>();
let future = async move {
let _trigger = trigger; // Dropped at the end of this async block.
future.await
};
setup(num_cpus::get(), shutdown, future)
}
#[cfg(not(feature = "tokio02"))]
fn setup<T>(num_threads: usize, shutdown: Receiver<()>, future: impl Future<Output = T>) -> T {
let ex = Executor::new();
let local_ex = LocalExecutor::new();
Parallel::new()
.each(0..num_threads, |_| ex.run(shutdown.recv()))
.finish(|| ex.enter(|| local_ex.run(future)))
.1
}
#[cfg(feature = "tokio02")]
fn setup<T>(num_threads: usize, shutdown: Receiver<()>, future: impl Future<Output = T>) -> T {
// A minimal tokio runtime.
let mut rt = tokio::runtime::Builder::new()
.enable_all()
.basic_scheduler()
.build()
.expect("cannot start tokio runtime");
let handle = rt.handle().clone();
let num_threads = {
// Parse SMOL_THREADS or use the number of CPU cores on the system.
env::var("SMOL_THREADS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or_else(|| num_cpus::get())
};
let ex = Executor::new();
let local_ex = LocalExecutor::new();
Parallel::new()
.add(|| ex.enter(|| rt.block_on(shutdown.recv())))
.each(0..num_threads, |_| handle.enter(|| ex.run(shutdown.recv())))
.finish(|| handle.enter(|| ex.enter(|| local_ex.run(future))))
.1
cfg_if! {
if #[cfg(not(feature = "tokio02"))] {
Parallel::new()
.each(0..num_threads, |_| ex.run(shutdown.recv()))
.finish(|| ex.enter(|| local_ex.run(future)))
.1
} else {
// A minimal tokio runtime to support libraries depending on it.
let mut rt = tokio::runtime::Builder::new()
.enable_all()
.basic_scheduler()
.build()
.expect("cannot start tokio runtime");
let handle = rt.handle().clone();
Parallel::new()
.add(|| ex.enter(|| rt.block_on(shutdown.recv())))
.each(0..num_threads, |_| handle.enter(|| ex.run(shutdown.recv())))
.finish(|| handle.enter(|| ex.enter(|| local_ex.run(future))))
.1
}
}
}