This commit is contained in:
Stjepan Glavina 2020-02-04 14:24:31 +01:00
parent a69c103470
commit dfb80d05fb
1 changed files with 20 additions and 28 deletions

View File

@ -32,6 +32,7 @@ struct Runtime {
epoll: RawFd,
entries: Mutex<Slab<Arc<Entry>>>,
timers: Mutex<BTreeMap<(Instant, usize), Waker>>,
queue: channel::Sender<Task>,
}
static RT: Lazy<Runtime> = Lazy::new(|| {
@ -71,10 +72,21 @@ static RT: Lazy<Runtime> = Lazy::new(|| {
}
});
let (sender, receiver) = channel::unbounded::<Task>();
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::spawn(move || {
receiver.iter().for_each(|task| {
let _ = catch_unwind(|| task.run());
})
});
}
Runtime {
epoll: epoll_create1(EpollCreateFlags::EPOLL_CLOEXEC).unwrap(),
entries: Mutex::new(Slab::new()),
timers: Mutex::new(BTreeMap::new()),
queue: sender,
}
});
@ -107,24 +119,6 @@ where
// drop(pool); // stops the threadpool!
}
/// A queue that holds scheduled tasks.
static QUEUE: Lazy<channel::Sender<Task>> = Lazy::new(|| {
// Create a queue.
let (sender, receiver) = channel::unbounded::<Task>();
// Spawn executor threads the first time the queue is created.
for _ in 0..num_cpus::get().max(1) {
let receiver = receiver.clone();
thread::spawn(move || {
receiver.iter().for_each(|task| {
let _ = catch_unwind(|| task.run());
})
});
}
sender
});
/// A spawned future and its current state.
type Task = async_task::Task<()>;
@ -135,7 +129,7 @@ where
R: Send + 'static,
{
// Create a task and schedule it for execution.
let (task, handle) = async_task::spawn(future, |t| QUEUE.send(t).unwrap(), ());
let (task, handle) = async_task::spawn(future, |t| RT.queue.send(t).unwrap(), ());
task.schedule();
// Return a join handle that retrieves the output of the future.
@ -158,20 +152,18 @@ impl<R> Future for JoinHandle<R> {
// ----- Timer -----
pub fn timer(dur: Duration) -> Timer {
Timer {
when: Instant::now() + dur,
inserted: false,
}
}
pub struct Timer {
when: Instant,
inserted: bool,
}
impl Timer {
pub fn after(dur: Duration) -> Timer {
Timer {
when: Instant::now() + dur,
inserted: false,
}
}
}
impl Drop for Timer {
fn drop(&mut self) {
if self.inserted {