diff --git a/src/io_event.rs b/src/io_event.rs index 377a06c..38f4e96 100644 --- a/src/io_event.rs +++ b/src/io_event.rs @@ -55,7 +55,7 @@ struct SelfPipe { impl SelfPipe { /// Creates a self-pipe. fn new() -> io::Result { - let (writer, reader) = pipe()?; + let (writer, reader) = socket_pair()?; writer.set_send_buffer_size(1)?; reader.set_recv_buffer_size(1)?; Ok(SelfPipe { @@ -109,7 +109,7 @@ impl SelfPipe { /// TODO #[cfg(unix)] -fn pipe() -> io::Result<(Socket, Socket)> { +fn socket_pair() -> io::Result<(Socket, Socket)> { let (sock1, sock2) = Socket::pair(Domain::unix(), Type::stream(), None)?; sock1.set_nonblocking(true)?; sock2.set_nonblocking(true)?; @@ -117,16 +117,15 @@ fn pipe() -> io::Result<(Socket, Socket)> { } /// TODO +/// TODO The only portable way of manually triggering I/O events is to create a socket and +/// send/receive dummy data on it. This pattern is also known as "the self-pipe trick". +/// See the links below for more information. +/// +/// https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py +/// https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows +/// https://gist.github.com/geertj/4325783 #[cfg(windows)] -fn pipe() -> io::Result<(Socket, Socket)> { - // TODO The only portable way of manually triggering I/O events is to create a socket and - // send/receive dummy data on it. This pattern is also known as "the self-pipe trick". - // See the links below for more information. - // - // https://github.com/python-trio/trio/blob/master/trio/_core/_wakeup_socketpair.py - // https://stackoverflow.com/questions/24933411/how-to-emulate-socket-socketpair-on-windows - // https://gist.github.com/geertj/4325783 - +fn socket_pair() -> io::Result<(Socket, Socket)> { // Create a temporary listener. let listener = Socket::new(Domain::ipv4(), Type::stream(), None)?; listener.bind(&SocketAddr::from(([127, 0, 0, 1], 0)).into())?; diff --git a/src/lib.rs b/src/lib.rs index dd9ea1f..6af13d7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -80,6 +80,9 @@ //! Finally, you can mix this runtime with [async-std][compat-async-std] and [tokio][compat-tokio] //! to use runtime-dependent libraries like [surf][compat-surf] and [reqwest][compat-reqwest]. //! +//! TODO: See [here][compat] for an example of using this runtime with libraries like async-std, tokio, +//! reqwest, and surf. +//! //! [epoll]: https://en.wikipedia.org/wiki/Epoll //! [kqueue]: https://en.wikipedia.org/wiki/Kqueue //! [wepoll]: https://github.com/piscisaureus/wepoll diff --git a/src/reactor.rs b/src/reactor.rs index f7937e9..90c91c4 100644 --- a/src/reactor.rs +++ b/src/reactor.rs @@ -129,20 +129,19 @@ impl Reactor { pub fn insert_timer(&self, when: Instant, waker: Waker) -> u64 { let mut timers = self.timers.lock(); - // If this timer is going to be the earliest one, interrupt the reactor. - if let Some((first, _)) = timers.keys().next() { - if when < *first { - self.event.set(); - } - } - - // Generate a new ID. + // Generate a new timer ID. static ID_GENERATOR: AtomicU64 = AtomicU64::new(1); let id = ID_GENERATOR.fetch_add(1, Ordering::Relaxed); assert!(id < u64::max_value() / 2, "exhausted timer IDs"); // Insert this timer into the timers map. timers.insert((when, id), waker); + + // If this timer is now the earliest one, interrupt the reactor. + if timers.keys().next().map(|(when, _)| *when) == Some(when) { + self.event.set(); + } + id } @@ -189,26 +188,27 @@ impl ReactorLock<'_> { // TODO: document this function....................... self.reactor.event.clear(); - let next_timer = { + let timeout = { // Split timers into ready and pending timers. let mut timers = self.reactor.timers.lock(); - let pending = timers.split_off(&(Instant::now(), 0)); + let now = Instant::now(); + let pending = timers.split_off(&(now, 0)); let ready = mem::replace(&mut *timers, pending); + let timeout = if ready.is_empty() && block { + // Calculate the timeout till the first timer fires. + timers.keys().next().map(|(when, _)| when.saturating_duration_since(now)) + } else { + // If there are ready timers or this poll doesn't block, the timeout is zero. + Some(Duration::from_secs(0)) + }; + // Wake up tasks waiting on timers. for (_, waker) in ready { waker.wake(); } - // Find when the next timer fires. - timers.keys().next().map(|(when, _)| *when) - }; - let timeout = if block { - // Calculate the timeout till the first timer fires. - next_timer.map(|when| when.saturating_duration_since(Instant::now())) - } else { - // If this poll doesn't block, the timeout is zero. - Some(Duration::from_secs(0)) + timeout }; // Block on I/O events. diff --git a/src/run.rs b/src/run.rs index f790080..0426630 100644 --- a/src/run.rs +++ b/src/run.rs @@ -33,6 +33,7 @@ pub fn run(future: impl Future) -> T { let local = ThreadLocalExecutor::new(); let ws_executor = WorkStealingExecutor::get(); let worker = ws_executor.worker(); + let reactor = Reactor::get(); // Create a waker that triggers an I/O event in the thread-local scheduler. let ev = local.event().clone(); @@ -52,17 +53,17 @@ pub fn run(future: impl Future) -> T { // 1. `future` - the main future. // 2. `local - the thread-local executor. // 3. `worker` - the work-stealing executor. - // 4. `Reactor::get()` - the reactor. + // 4. `reactor` - the reactor. // // When all four components are out of work, we block the current thread on // epoll/kevent/wepoll. If new work comes in that isn't naturally triggered by an I/O event // registered with `Async` handles, we use `IoEvent`s to simulate an I/O event that will // unblock the thread: // - // - When the main future is woken, `local.event` is triggered. - // - When thread-local executor gets new work, `local.event` is triggered. - // - When work-stealing executor gets new work, `ws_executor.event` is triggered. - // - When a new earliest timer is registered, `Reactor::get().event` is triggered. + // - When the main future is woken, `local.event()` is triggered. + // - When thread-local executor gets new work, `local.event()` is triggered. + // - When work-stealing executor gets new work, `ws_executor.event()` is triggered. + // - When a new earliest timer is registered, `reactor.event()` is triggered. // // This way we make sure that if any changes happen that might give us new work will // unblock epoll/kevent/wepoll and let us continue the loop. @@ -76,7 +77,7 @@ pub fn run(future: impl Future) -> T { // 3. Run a batch of tasks in the work-stealing executor. let more_worker = worker.execute(); // 4. Poll the reactor. - Reactor::get().poll().expect("failure while polling I/O"); + reactor.poll().expect("failure while polling I/O"); // If there is more work in the thread-local or the work-stealing executor, continue // the loop. @@ -84,22 +85,22 @@ pub fn run(future: impl Future) -> T { continue; } - // Prepare for blocking until the reactor is locked or `local.event` is triggered. + // Prepare for blocking until the reactor is locked or `local.event()` is triggered. // - // Note that there is no need to wait for `executor.event`. If the reactor is locked - // immediately, we'll check for the I/O event right after that anyway. + // Note that there is no need to wait for `ws_executor.event()`. If the reactor is + // locked immediately, we'll check for the I/O event right after that anyway. // // If some other worker is holding the reactor locked, it will be unblocked as soon as // the I/O event is triggered. Then, another worker will be allowed to lock the // reactor, and will be unblocked if there is more work to do. Every worker triggers - // `worker.executor.event` each time it finds a runnable task. - let lock = Reactor::get().lock(); + // `ws_executor.event()` each time it finds a runnable task. + let lock = reactor.lock(); let ready = local.event().ready(); futures::pin_mut!(lock); futures::pin_mut!(ready); - // Block until either the reactor is locked or `local.event` is triggered. - if let Either::Left((mut reactor, _)) = block_on(future::select(lock, ready)) { + // Block until either the reactor is locked or `local.event()` is triggered. + if let Either::Left((mut reactor_lock, _)) = block_on(future::select(lock, ready)) { // Clear the two I/O events. let local_ev = local.event().clear(); let ws_ev = ws_executor.event().clear(); @@ -110,7 +111,7 @@ pub fn run(future: impl Future) -> T { } // Block until an I/O event occurs. - reactor.wait().expect("failure while waiting on I/O"); + reactor_lock.wait().expect("failure while waiting on I/O"); } } }) diff --git a/src/timer.rs b/src/timer.rs index d33280e..70fb14e 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -39,7 +39,7 @@ impl Timer { impl Drop for Timer { fn drop(&mut self) { - if let Some(id) = self.id { + if let Some(id) = self.id.take() { // Deregister the timer from the reactor. Reactor::get().remove_timer(self.when, id); } @@ -52,7 +52,7 @@ impl Future for Timer { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Check if the timer has already fired. if Instant::now() >= self.when { - if let Some(id) = self.id { + if let Some(id) = self.id.take() { // Deregister the timer from the reactor. Reactor::get().remove_timer(self.when, id); } diff --git a/src/work_stealing.rs b/src/work_stealing.rs index 2885169..98e88db 100644 --- a/src/work_stealing.rs +++ b/src/work_stealing.rs @@ -1,4 +1,25 @@ //! The work-stealing executor. +//! +//! Tasks created by [`Task::spawn()`] go into this executor. Any thread calling [`run()`] +//! initializes a `Worker` that participates in work stealing, which is allowed to run any task in +//! this executor or in other workers. +//! +//! Since tasks can be stolen by any worker and thus move from thread to thread, their futures must +//! implement [`Send`]. +//! +//! There is only one global instance of this type, accessible by [`WorkStealingExecutor::get()`]. +//! +//! Work stealing is a strategy that reduces contention in a multi-threaded environment. If all +//! invocations of [`run()`] used the same global task queue all the time, they would constantly +//! "step on each other's toes", causing a lot of CPU cache traffic and too often waste time +//! retrying queue operations in compare-and-swap loops. +//! +//! The solution is to have a separate queue in each invocation of [`run()`], called a "worker". +//! Each thread is primarily using its own worker. Once there are no more tasks in the worker, we +//! either grab a batch of tasks from the main global queue, or steal tasks from other workers. +//! Of course, work-stealing still causes contention in some cases, but much less often. +//! +//! More about work stealing: https://en.wikipedia.org/wiki/Work_stealing use std::cell::Cell; use std::future::Future; @@ -25,27 +46,6 @@ use crate::throttle; scoped_thread_local!(static WORKER: for<'a> &'a Worker<'a>); /// The global work-stealing executor. -/// -/// Tasks created by `Task::spawn()` go into this executor. Any calling `run()` initializes a -/// `Worker` that participates in work stealing, which is allowed to run any task in this executor -/// or in other workers. -/// -/// Since tasks can be stolen by any worker and thus move from thread to thread, their futures must -/// implement `Send`. -/// -/// There is only one global instance of this type, accessible by `WorkStealingExecutor::get()`. -/// -/// Work stealing is a strategy that reduces contention in a multi-threaded environment. If all -/// invocations of `run()` used the same global task queue all the time, they would constantly -/// "step on each other's toes", causing a lot of CPU cache traffic and too often waste time -/// retrying queue operations in compare-and-swap loops. -/// -/// The solution is to have a separate queue in each invocation of `run()`, called a "worker". -/// Each thread is primarily using its own worker. Once there are no more tasks in the worker, we -/// either grab a batch of tasks from the main global queue, or steal tasks from other workers. -/// Of course, work-stealing still causes contention in some cases, but much less often. -/// -/// More about work stealing: https://en.wikipedia.org/wiki/Work_stealing pub(crate) struct WorkStealingExecutor { /// When a thread that is not inside `run()` spawns or wakes a task, it goes into this queue. injector: deque::Injector, @@ -88,7 +88,8 @@ impl WorkStealingExecutor { } else { // If scheduling from a non-worker thread, push into the injector queue. self.injector.push(runnable); - // Trigger an I/O event to let workers know that a task has been scheduled. + + // Notify workers that there is a task in the injector queue. self.event.set(); } }; @@ -159,7 +160,11 @@ impl Worker<'_> { return false; } Some(r) => { - // Notify other workers that there may be more tasks. + // Notify other workers that there may be stealable tasks. + // + // This is necessary because `pop()` sometimes re-shuffles tasks between + // queues, which races with other workers looking for tasks. They might + // believe there are no tasks while there really are, so we notify here. self.executor.event.set(); // Run the task. @@ -194,6 +199,9 @@ impl Worker<'_> { if let Some(r) = self.slot.replace(runnable.into()) { // If the slot had a task, push it into the queue. self.queue.push(r); + + // Notify other workers that there are stealable tasks. + self.executor.event.set(); } } @@ -203,8 +211,10 @@ impl Worker<'_> { if let Some(r) = self.slot.take().or_else(|| self.queue.pop()) { return Some(r); } + // If not, fetch more tasks from the injector queue, the reactor, or other workers. self.fetch(); + // Check the slot and the queue again. self.slot.take().or_else(|| self.queue.pop()) } @@ -225,6 +235,7 @@ impl Worker<'_> { self.slot.set(Some(r)); return; } + // If there is at least one task in the queue, return. if !self.queue.is_empty() { return; @@ -259,12 +270,14 @@ impl Drop for Worker<'_> { // Unregister the worker. self.executor.stealers.write().unwrap().remove(self.key); - // Flush the slot. - self.push(None); + // Move the task in the slot into the injector queue. + if let Some(r) = self.slot.take() { + r.schedule(); + } // Move all tasks in this worker's queue into the injector queue. while let Some(r) = self.queue.pop() { - self.executor.injector.push(r); + r.schedule(); } } }