diff --git a/Cargo.toml b/Cargo.toml index 13e3df4..35c82a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ blocking = "0.5.0" easy-parallel = "3.1.0" futures-lite = "0.1.9" num_cpus = "1.13.0" +cfg-if = "0.1.10" [dependencies.tokio] version = "0.2.22" diff --git a/examples/ctrl-c.rs b/examples/ctrl-c.rs index 523c792..0c190d0 100644 --- a/examples/ctrl-c.rs +++ b/examples/ctrl-c.rs @@ -12,7 +12,7 @@ fn main() { // Set a handler that sends a message through a channel. let (s, ctrl_c) = async_channel::bounded(100); let handle = move || { - let _ = future::poll_once(s.send(())); + let _ = s.try_send(()); }; ctrlc::set_handler(handle).unwrap(); diff --git a/src/lib.rs b/src/lib.rs index b625fa6..d6a8a9f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -51,10 +51,11 @@ #![forbid(unsafe_code)] #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +use std::env; use std::future::Future; -use async_channel::{unbounded, Receiver}; use async_executor::{Executor, LocalExecutor}; +use cfg_if::cfg_if; use easy_parallel::Parallel; #[doc(inline)] @@ -86,14 +87,59 @@ pub mod prelude { }; } -/// Starts executors and runs the future. +/// Starts a thread-local executor and then runs the future. +/// +/// # Examples +/// +/// ``` +/// use smol::Task; +/// +/// smol::block_on(async { +/// let task = Task::local(async { +/// println!("Hello world"); +/// }); +/// task.await; +/// }) +/// ``` +pub fn block_on(future: impl Future) -> T { + let local_ex = LocalExecutor::new(); + + cfg_if! { + if #[cfg(not(feature = "tokio02"))] { + local_ex.run(future) + } else { + // A minimal tokio runtime to support libraries depending on it. + let mut rt = tokio::runtime::Builder::new() + .enable_all() + .basic_scheduler() + .build() + .expect("cannot start tokio runtime"); + let handle = rt.handle().clone(); + + // A channel that coordinates shutdown when the main future completes. + let (trigger, shutdown) = async_channel::unbounded::<()>(); + let future = async move { + let _trigger = trigger; // Dropped at the end of this async block. + future.await + }; + + Parallel::new() + .add(|| rt.block_on(shutdown.recv())) + .finish(|| handle.enter(|| local_ex.run(future))) + .1 + } + } +} + +/// Starts a thread-local and a multi-threaded executor and then runs the future. /// /// This function runs two executors at the same time: /// /// 1. The current thread runs a [`LocalExecutor`] and the main `future` on it. /// 2. A thread pool runs an [`Executor`] until the main `future` completes. /// -/// The number of spawned threads matches the number of logical CPU cores on the system. +/// The number of spawned threads matches the number of logical CPU cores on the system, but it can +/// be overriden by setting the `SMOL_THREADS` environment variable. /// /// # Examples /// @@ -109,42 +155,43 @@ pub mod prelude { /// ``` pub fn run(future: impl Future) -> T { // A channel that coordinates shutdown when the main future completes. - let (trigger, shutdown) = unbounded::<()>(); + let (trigger, shutdown) = async_channel::unbounded::<()>(); let future = async move { let _trigger = trigger; // Dropped at the end of this async block. future.await }; - setup(num_cpus::get(), shutdown, future) -} - -#[cfg(not(feature = "tokio02"))] -fn setup(num_threads: usize, shutdown: Receiver<()>, future: impl Future) -> T { - let ex = Executor::new(); - let local_ex = LocalExecutor::new(); - - Parallel::new() - .each(0..num_threads, |_| ex.run(shutdown.recv())) - .finish(|| ex.enter(|| local_ex.run(future))) - .1 -} - -#[cfg(feature = "tokio02")] -fn setup(num_threads: usize, shutdown: Receiver<()>, future: impl Future) -> T { - // A minimal tokio runtime. - let mut rt = tokio::runtime::Builder::new() - .enable_all() - .basic_scheduler() - .build() - .expect("cannot start tokio runtime"); - let handle = rt.handle().clone(); + let num_threads = { + // Parse SMOL_THREADS or use the number of CPU cores on the system. + env::var("SMOL_THREADS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or_else(|| num_cpus::get()) + }; let ex = Executor::new(); let local_ex = LocalExecutor::new(); - Parallel::new() - .add(|| ex.enter(|| rt.block_on(shutdown.recv()))) - .each(0..num_threads, |_| handle.enter(|| ex.run(shutdown.recv()))) - .finish(|| handle.enter(|| ex.enter(|| local_ex.run(future)))) - .1 + cfg_if! { + if #[cfg(not(feature = "tokio02"))] { + Parallel::new() + .each(0..num_threads, |_| ex.run(shutdown.recv())) + .finish(|| ex.enter(|| local_ex.run(future))) + .1 + } else { + // A minimal tokio runtime to support libraries depending on it. + let mut rt = tokio::runtime::Builder::new() + .enable_all() + .basic_scheduler() + .build() + .expect("cannot start tokio runtime"); + let handle = rt.handle().clone(); + + Parallel::new() + .add(|| ex.enter(|| rt.block_on(shutdown.recv()))) + .each(0..num_threads, |_| handle.enter(|| ex.run(shutdown.recv()))) + .finish(|| handle.enter(|| ex.enter(|| local_ex.run(future)))) + .1 + } + } }