feat: Add generic collection methods (#9)

This commit is contained in:
John Nunley 2022-12-31 19:04:52 -08:00 committed by GitHub
parent 23e9387721
commit 7b7c472203
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 65 additions and 9 deletions

View File

@ -59,6 +59,7 @@
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
use std::fmt;
use std::iter::{self, FromIterator};
use std::mem;
use std::panic;
use std::process;
@ -153,26 +154,55 @@ impl<'a, T> Parallel<'a, T> {
/// let res = Parallel::new()
/// .each(1..=3, |i| 10 * i)
/// .add(|| 100)
/// .run();
/// .collect::<Vec<_>>();
///
/// assert_eq!(res, [10, 20, 30, 100]);
/// ```
pub fn run(mut self) -> Vec<T>
pub fn collect<C>(mut self) -> C
where
T: Send + 'a,
C: FromIterator<T> + Extend<T>,
{
// Get the last closure.
let f = match self.closures.pop() {
None => return Vec::new(),
None => return iter::empty().collect(),
Some(f) => f,
};
// Spawn threads, run the last closure on the current thread.
let (mut results, r) = self.finish(f);
results.push(r);
let (mut results, r) = self.finish_in::<_, _, C>(f);
results.extend(Some(r));
results
}
/// Runs each closure on a separate thread and collects their results.
///
/// Results are collected in the order in which closures were added. One of the closures always
/// runs on the main thread because there is no point in spawning an extra thread for it.
///
/// If a closure panics, panicking will resume in the main thread after all threads are joined.
///
/// # Examples
///
/// ```
/// use easy_parallel::Parallel;
/// use std::thread;
/// use std::time::Duration;
///
/// let res = Parallel::new()
/// .each(1..=3, |i| 10 * i)
/// .add(|| 100)
/// .run();
///
/// assert_eq!(res, [10, 20, 30, 100]);
/// ```
pub fn run(self) -> Vec<T>
where
T: Send + 'a,
{
self.collect()
}
/// Finishes with a closure to run on the main thread, starts threads, and collects results.
///
/// Results are collected in the order in which closures were added.
@ -196,6 +226,35 @@ impl<'a, T> Parallel<'a, T> {
where
F: FnOnce() -> R,
T: Send + 'a,
{
self.finish_in::<_, _, Vec<T>>(f)
}
/// Finishes with a closure to run on the main thread, starts threads, and collects results into an
/// arbitrary container.
///
/// Results are collected in the order in which closures were added.
///
/// If a closure panics, panicking will resume in the main thread after all threads are joined.
///
/// # Examples
///
/// ```
/// use easy_parallel::Parallel;
/// use std::thread;
/// use std::time::Duration;
///
/// let (res, ()) = Parallel::new()
/// .each(1..=3, |i| 10 * i)
/// .finish_in::<_, _, Vec<i32>>(|| println!("Waiting for results"));
///
/// assert_eq!(res, [10, 20, 30]);
/// ```
pub fn finish_in<F, R, C>(self, f: F) -> (C, R)
where
F: FnOnce() -> R,
T: Send + 'a,
C: FromIterator<T>,
{
// Set up a guard that aborts on panic.
let guard = NoPanic;
@ -242,10 +301,7 @@ impl<'a, T> Parallel<'a, T> {
}
// Collect the results from threads.
let mut results = Vec::new();
for receiver in receivers {
results.push(receiver.recv().unwrap());
}
let results = receivers.into_iter().map(|r| r.recv().unwrap()).collect();
// If the main closure panicked, resume its panic.
match res {