Collect results in run()
This commit is contained in:
parent
364eb2fa4e
commit
7d1c36b2fc
75
src/lib.rs
75
src/lib.rs
|
@ -21,16 +21,19 @@
|
|||
//! assert_eq!(*m.get_mut().unwrap(), 2);
|
||||
//! ```
|
||||
//!
|
||||
//! Print each number in a vector on a different thread:
|
||||
//! Square each number of a vector on a different thread:
|
||||
//!
|
||||
//! ```
|
||||
//! use easy_parallel::Parallel;
|
||||
//!
|
||||
//! let v = vec![10, 20, 30];
|
||||
//!
|
||||
//! Parallel::new()
|
||||
//! .each(0..v.len(), |i| println!("{}", v[i]))
|
||||
//! let mut squares = Parallel::new()
|
||||
//! .each(0..v.len(), |i| v[i] * v[i])
|
||||
//! .run();
|
||||
//!
|
||||
//! squares.sort();
|
||||
//! assert_eq!(squares, [100, 400, 900]);
|
||||
//! ```
|
||||
|
||||
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
|
||||
|
@ -40,20 +43,21 @@ use std::marker::PhantomData;
|
|||
use std::mem;
|
||||
use std::panic;
|
||||
use std::process;
|
||||
use std::sync::mpsc;
|
||||
use std::thread;
|
||||
|
||||
/// A builder that runs closures in parallel.
|
||||
#[derive(Default)]
|
||||
#[must_use]
|
||||
pub struct Parallel<'a> {
|
||||
pub struct Parallel<'a, T> {
|
||||
/// Closures to run.
|
||||
closures: Vec<Box<dyn FnOnce() + Send + 'a>>,
|
||||
closures: Vec<Box<dyn FnOnce() -> T + Send + 'a>>,
|
||||
|
||||
/// Makes the lifetime invariant.
|
||||
_marker: PhantomData<&'a mut &'a ()>,
|
||||
}
|
||||
|
||||
impl<'a> Parallel<'a> {
|
||||
impl<'a, T> Parallel<'a, T> {
|
||||
/// Creates a builder for running closures in parallel.
|
||||
///
|
||||
/// # Examples
|
||||
|
@ -61,9 +65,9 @@ impl<'a> Parallel<'a> {
|
|||
/// ```
|
||||
/// use easy_parallel::Parallel;
|
||||
///
|
||||
/// let p = Parallel::new();
|
||||
/// let p = Parallel::<()>::new();
|
||||
/// ```
|
||||
pub fn new() -> Parallel<'a> {
|
||||
pub fn new() -> Parallel<'a, T> {
|
||||
Parallel {
|
||||
closures: Vec::new(),
|
||||
_marker: PhantomData,
|
||||
|
@ -81,9 +85,10 @@ impl<'a> Parallel<'a> {
|
|||
/// .add(|| println!("hello from a thread"))
|
||||
/// .run();
|
||||
/// ```
|
||||
pub fn add<F>(mut self, f: F) -> Parallel<'a>
|
||||
pub fn add<F>(mut self, f: F) -> Parallel<'a, T>
|
||||
where
|
||||
F: FnOnce() + Send + 'a,
|
||||
F: FnOnce() -> T + Send + 'a,
|
||||
T: Send + 'a,
|
||||
{
|
||||
self.closures.push(Box::new(f));
|
||||
self
|
||||
|
@ -99,13 +104,14 @@ impl<'a> Parallel<'a> {
|
|||
/// use easy_parallel::Parallel;
|
||||
///
|
||||
/// Parallel::new()
|
||||
/// .each(0..5, |i| println!("thread #{}", i))
|
||||
/// .each(0..5, |i| println!("hello from thread #{}", i))
|
||||
/// .run();
|
||||
/// ```
|
||||
pub fn each<T, I, F>(mut self, iter: I, f: F) -> Parallel<'a>
|
||||
pub fn each<A, I, F>(mut self, iter: I, f: F) -> Parallel<'a, T>
|
||||
where
|
||||
I: IntoIterator<Item = T>,
|
||||
F: FnOnce(T) + Clone + Send + 'a,
|
||||
I: IntoIterator<Item = A>,
|
||||
F: FnOnce(A) -> T + Clone + Send + 'a,
|
||||
A: Send + 'a,
|
||||
T: Send + 'a,
|
||||
{
|
||||
for t in iter.into_iter() {
|
||||
|
@ -115,7 +121,9 @@ impl<'a> Parallel<'a> {
|
|||
self
|
||||
}
|
||||
|
||||
/// Spawns a thread for each closure and waits from them to complete.
|
||||
/// Spawns a thread for each closure and collects their results.
|
||||
///
|
||||
/// Results are collected in the order in which threads completed.
|
||||
///
|
||||
/// If a closure panics, panicking will resume in the main thread after all threads are joined.
|
||||
///
|
||||
|
@ -123,28 +131,41 @@ impl<'a> Parallel<'a> {
|
|||
///
|
||||
/// ```
|
||||
/// use easy_parallel::Parallel;
|
||||
/// use std::thread;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// Parallel::new()
|
||||
/// .add(|| println!("thread #1"))
|
||||
/// .add(|| println!("thread #2"))
|
||||
/// let res = Parallel::new()
|
||||
/// .each(0..5, |i| {
|
||||
/// thread::sleep(Duration::from_secs(5 - i));
|
||||
/// i
|
||||
/// })
|
||||
/// .run();
|
||||
///
|
||||
/// // Threads finish in reverse order because of how they sleep.
|
||||
/// assert_eq!(res, [4, 3, 2, 1, 0]);
|
||||
/// ```
|
||||
pub fn run(self) {
|
||||
pub fn run(self) -> Vec<T>
|
||||
where
|
||||
T: Send + 'a,
|
||||
{
|
||||
// Set up a guard that aborts on a panic.
|
||||
let guard = NoPanic;
|
||||
|
||||
// Join handles for spawned threads.
|
||||
let mut handles = Vec::new();
|
||||
|
||||
// Channel to collect results from spawned threads.
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
|
||||
// Spawn a thread for each closure.
|
||||
for f in self.closures {
|
||||
// Convert the `FnOnce()` closure into a `FnMut()` closure.
|
||||
let mut f = Some(f);
|
||||
let f = move || (f.take().unwrap())();
|
||||
// Wrap into a closure that sends the result back.
|
||||
let sender = sender.clone();
|
||||
let f = move || sender.send(f()).unwrap();
|
||||
|
||||
// Erase the `'a` lifetime.
|
||||
let f: Box<dyn FnMut() + Send + 'a> = Box::new(f);
|
||||
let f: Box<dyn FnMut() + Send + 'static> = unsafe { mem::transmute(f) };
|
||||
let f: Box<dyn FnOnce() + Send + 'a> = Box::new(f);
|
||||
let f: Box<dyn FnOnce() + Send + 'static> = unsafe { mem::transmute(f) };
|
||||
|
||||
// Spawn a thread for the closure.
|
||||
handles.push(thread::spawn(f));
|
||||
|
@ -166,10 +187,14 @@ impl<'a> Parallel<'a> {
|
|||
if let Some(err) = last_err {
|
||||
panic::resume_unwind(err);
|
||||
}
|
||||
|
||||
// Collect the results into a `Vec`.
|
||||
drop(sender);
|
||||
receiver.into_iter().collect()
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Debug for Parallel<'_> {
|
||||
impl<T> fmt::Debug for Parallel<'_, T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("Parallel")
|
||||
.field("len", &self.closures.len())
|
||||
|
|
|
@ -15,3 +15,15 @@ fn smoke() {
|
|||
|
||||
assert_eq!(m.into_inner().unwrap(), 10 + 20 + v.iter().sum::<i32>());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn squares() {
|
||||
let v = vec![10, 20, 30];
|
||||
|
||||
let mut squares = Parallel::new()
|
||||
.each(0..v.len(), |i| v[i] * v[i])
|
||||
.run();
|
||||
|
||||
squares.sort();
|
||||
assert_eq!(squares, [100, 400, 900]);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue