mirror of https://github.com/smol-rs/blocking
Gracefully handle the inability to spawn threads, take 2 (#31)
* Gracefully handle the inability to spawn threads * Ensure the thread limit is never zero
This commit is contained in:
parent
59b51b6e35
commit
dbdef7e55b
|
@ -21,3 +21,4 @@ async-task = "4.0.2"
|
|||
atomic-waker = "1.0.0"
|
||||
fastrand = "1.3.4"
|
||||
futures-lite = "1.11.0"
|
||||
log = "0.4.17"
|
||||
|
|
32
src/lib.rs
32
src/lib.rs
|
@ -83,6 +83,7 @@ use std::env;
|
|||
use std::fmt;
|
||||
use std::io::{self, Read, Seek, SeekFrom, Write};
|
||||
use std::mem;
|
||||
use std::num::NonZeroUsize;
|
||||
use std::panic;
|
||||
use std::pin::Pin;
|
||||
use std::slice;
|
||||
|
@ -120,9 +121,6 @@ struct Executor {
|
|||
|
||||
/// Used to put idle threads to sleep and wake them up when new work comes in.
|
||||
cvar: Condvar,
|
||||
|
||||
/// Maximum number of threads in the pool
|
||||
thread_limit: usize,
|
||||
}
|
||||
|
||||
/// Inner state of the blocking executor.
|
||||
|
@ -139,6 +137,9 @@ struct Inner {
|
|||
|
||||
/// The queue of blocking tasks.
|
||||
queue: VecDeque<Runnable>,
|
||||
|
||||
/// Maximum number of threads in the pool
|
||||
thread_limit: NonZeroUsize,
|
||||
}
|
||||
|
||||
impl Executor {
|
||||
|
@ -167,9 +168,9 @@ impl Executor {
|
|||
idle_count: 0,
|
||||
thread_count: 0,
|
||||
queue: VecDeque::new(),
|
||||
thread_limit: NonZeroUsize::new(thread_limit).unwrap(),
|
||||
}),
|
||||
cvar: Condvar::new(),
|
||||
thread_limit,
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -232,7 +233,9 @@ impl Executor {
|
|||
fn grow_pool(&'static self, mut inner: MutexGuard<'static, Inner>) {
|
||||
// If runnable tasks greatly outnumber idle threads and there aren't too many threads
|
||||
// already, then be aggressive: wake all idle threads and spawn one more thread.
|
||||
while inner.queue.len() > inner.idle_count * 5 && inner.thread_count < self.thread_limit {
|
||||
while inner.queue.len() > inner.idle_count * 5
|
||||
&& inner.thread_count < inner.thread_limit.get()
|
||||
{
|
||||
// The new thread starts in idle state.
|
||||
inner.idle_count += 1;
|
||||
inner.thread_count += 1;
|
||||
|
@ -245,10 +248,25 @@ impl Executor {
|
|||
let id = ID.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
// Spawn the new thread.
|
||||
thread::Builder::new()
|
||||
if let Err(e) = thread::Builder::new()
|
||||
.name(format!("blocking-{}", id))
|
||||
.spawn(move || self.main_loop())
|
||||
.unwrap();
|
||||
{
|
||||
// We were unable to spawn the thread, so we need to undo the state changes.
|
||||
log::error!("Failed to spawn a blocking thread: {}", e);
|
||||
inner.idle_count -= 1;
|
||||
inner.thread_count -= 1;
|
||||
|
||||
// The current number of threads is likely to be the system's upper limit, so update
|
||||
// thread_limit accordingly.
|
||||
inner.thread_limit = {
|
||||
let new_limit = inner.thread_count;
|
||||
|
||||
// If the limit is about to be set to zero, set it to one instead so that if,
|
||||
// in the future, we are able to spawn more threads, we will be able to do so.
|
||||
NonZeroUsize::new(new_limit).unwrap_or_else(|| NonZeroUsize::new(1).unwrap())
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue