Add Parallel::finish()
This commit is contained in:
parent
e78d9ab79c
commit
d8499b9844
57
src/lib.rs
57
src/lib.rs
|
@ -157,17 +157,46 @@ impl<'a, T> Parallel<'a, T> {
|
|||
///
|
||||
/// assert_eq!(res, [10, 20, 30, 100]);
|
||||
/// ```
|
||||
pub fn run(self) -> Vec<T>
|
||||
pub fn run(mut self) -> Vec<T>
|
||||
where
|
||||
T: Send + 'a,
|
||||
{
|
||||
// Get the first closure.
|
||||
let mut closures = self.closures.into_iter();
|
||||
let f = match closures.next() {
|
||||
// Get the last closure.
|
||||
let f = match self.closures.pop() {
|
||||
None => return Vec::new(),
|
||||
Some(f) => f,
|
||||
};
|
||||
|
||||
// Spawn threads, run the last closure on the current thread.
|
||||
let (mut results, r) = self.finish(f);
|
||||
results.push(r);
|
||||
results
|
||||
}
|
||||
|
||||
/// Finishes with a closure to run on the main thread, starts threads, and collects results.
|
||||
///
|
||||
/// Results are collected in the order in which closures were added.
|
||||
///
|
||||
/// If a closure panics, panicking will resume in the main thread after all threads are joined.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```
|
||||
/// use easy_parallel::Parallel;
|
||||
/// use std::thread;
|
||||
/// use std::time::Duration;
|
||||
///
|
||||
/// let (res, ()) = Parallel::new()
|
||||
/// .each(1..=3, |i| 10 * i)
|
||||
/// .finish(|| println!("Waiting for results"));
|
||||
///
|
||||
/// assert_eq!(res, [10, 20, 30]);
|
||||
/// ```
|
||||
pub fn finish<F, R>(self, f: F) -> (Vec<T>, R)
|
||||
where
|
||||
F: FnOnce() -> R,
|
||||
T: Send + 'a,
|
||||
{
|
||||
// Set up a guard that aborts on panic.
|
||||
let guard = NoPanic;
|
||||
|
||||
|
@ -178,7 +207,7 @@ impl<'a, T> Parallel<'a, T> {
|
|||
let mut receivers = Vec::new();
|
||||
|
||||
// Spawn a thread for each closure after the first one.
|
||||
for f in closures {
|
||||
for f in self.closures.into_iter() {
|
||||
// Wrap into a closure that sends the result back.
|
||||
let (sender, receiver) = mpsc::channel();
|
||||
let f = move || sender.send(f()).unwrap();
|
||||
|
@ -192,14 +221,10 @@ impl<'a, T> Parallel<'a, T> {
|
|||
receivers.push(receiver);
|
||||
}
|
||||
|
||||
let mut results = Vec::new();
|
||||
let mut last_err = None;
|
||||
|
||||
// Run the first closure on the main thread.
|
||||
match panic::catch_unwind(panic::AssertUnwindSafe(f)) {
|
||||
Ok(r) => results.push(r),
|
||||
Err(err) => last_err = Some(err),
|
||||
}
|
||||
// Run the main closure on the main thread.
|
||||
let res = panic::catch_unwind(panic::AssertUnwindSafe(f));
|
||||
|
||||
// Join threads and save the last panic if there was one.
|
||||
for h in handles {
|
||||
|
@ -211,16 +236,22 @@ impl<'a, T> Parallel<'a, T> {
|
|||
// Drop the guard because we may resume a panic now.
|
||||
drop(guard);
|
||||
|
||||
// If a closure panicked, resume the last panic.
|
||||
// If a thread has panicked, resume the last collected panic.
|
||||
if let Some(err) = last_err {
|
||||
panic::resume_unwind(err);
|
||||
}
|
||||
|
||||
// Collect the results from threads.
|
||||
let mut results = Vec::new();
|
||||
for receiver in receivers {
|
||||
results.push(receiver.recv().unwrap());
|
||||
}
|
||||
results
|
||||
|
||||
// If the main closure panicked, resume its panic.
|
||||
match res {
|
||||
Ok(r) => (results, r),
|
||||
Err(err) => panic::resume_unwind(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -26,3 +26,15 @@ fn squares() {
|
|||
|
||||
assert_eq!(squares, [100, 400, 900]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn finish() {
|
||||
let v = vec![10, 20, 30];
|
||||
|
||||
let (squares, len) = Parallel::new()
|
||||
.each(0..v.len(), |i| v[i] * v[i])
|
||||
.finish(|| v.len());
|
||||
|
||||
assert_eq!(squares, [100, 400, 900]);
|
||||
assert_eq!(len, 3);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue