From 75316e2982120c898eab8b08459cf3f826fc5a1b Mon Sep 17 00:00:00 2001 From: David Craven Date: Sat, 28 Nov 2020 20:54:01 +0100 Subject: [PATCH] Add interval api. (#41) * Add interval api. * Fix workflows. * Address review comments. * Don't tick immediately. * Increase allowed jitter for ci. --- .github/workflows/build-and-test.yaml | 8 +-- .github/workflows/lint.yaml | 6 +- .github/workflows/security.yaml | 6 +- src/lib.rs | 80 ++++++++++++++++++++++++--- tests/timer.rs | 18 +++++- 5 files changed, 100 insertions(+), 18 deletions(-) diff --git a/.github/workflows/build-and-test.yaml b/.github/workflows/build-and-test.yaml index bbbe77a..186ce39 100644 --- a/.github/workflows/build-and-test.yaml +++ b/.github/workflows/build-and-test.yaml @@ -16,14 +16,14 @@ jobs: rust: [nightly, beta, stable, 1.39.0] steps: - uses: actions/checkout@v2 - + - name: Set current week of the year in environnement if: startsWith(matrix.os, 'ubuntu') || startsWith(matrix.os, 'macOS') - run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" - + run: echo "CURRENT_WEEK=$(date +%V)" >> $GITHUB_ENV + - name: Set current week of the year in environnement if: startsWith(matrix.os, 'windows') - run: echo "::set-env name=CURRENT_WEEK::$(Get-Date -UFormat %V)" + run: echo "CURRENT_WEEK=$(Get-Date -UFormat %V)" >> $GITHUB_ENV - name: Install latest ${{ matrix.rust }} uses: actions-rs/toolchain@v1 diff --git a/.github/workflows/lint.yaml b/.github/workflows/lint.yaml index bc3d50a..21c7963 100644 --- a/.github/workflows/lint.yaml +++ b/.github/workflows/lint.yaml @@ -11,10 +11,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - + - name: Set current week of the year in environnement - run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" - + run: echo "CURRENT_WEEK=$(date +%V)" >> $GITHUB_ENV + - uses: actions-rs/toolchain@v1 with: toolchain: stable diff --git a/.github/workflows/security.yaml b/.github/workflows/security.yaml index 8f722e7..4d995c3 100644 --- a/.github/workflows/security.yaml +++ b/.github/workflows/security.yaml @@ -11,10 +11,10 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - + - name: Set current week of the year in environnement - run: echo "::set-env name=CURRENT_WEEK::$(date +%V)" - + run: echo "CURRENT_WEEK=$(date +%V)" >> $GITHUB_ENV + - uses: actions-rs/audit-check@v1 with: token: ${{ secrets.GITHUB_TOKEN }} diff --git a/src/lib.rs b/src/lib.rs index ebec9c3..0c5db47 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -85,6 +85,11 @@ mod reactor; pub use driver::block_on; +/// Use Duration::MAX once duration_constants are stabilized. +fn duration_max() -> Duration { + Duration::new(u64::MAX, 1_000_000_000 - 1) +} + /// A future that expires at a point in time. /// /// Timers are futures that output the [`Instant`] at which they fired. @@ -127,6 +132,9 @@ pub struct Timer { /// When this timer fires. when: Instant, + + /// The period. + period: Duration, } impl Timer { @@ -161,10 +169,8 @@ impl Timer { /// # }); /// ``` pub fn at(instant: Instant) -> Timer { - Timer { - id_and_waker: None, - when: instant, - } + // Use Duration::MAX once duration_constants are stabilized. + Timer::interval_at(instant, duration_max()) } /// Sets the timer to expire after the new duration of time. @@ -222,6 +228,47 @@ impl Timer { *id = Reactor::get().insert_timer(self.when, waker); } } + + /// Creates a timer that ticks every period. + /// + /// # Examples + /// + /// ``` + /// use async_io::Timer; + /// use futures_lite::StreamExt; + /// use std::time::{Duration, Instant}; + /// + /// # futures_lite::future::block_on(async { + /// let period = Duration::from_secs(1); + /// Timer::interval(period).next().await; + /// # }); + /// ``` + pub fn interval(period: Duration) -> Timer { + Timer::interval_at(Instant::now() + period, period) + } + + /// Creates a timer that ticks every period, starting at `start`. + /// + /// # Examples + /// + /// ``` + /// use async_io::Timer; + /// use futures_lite::StreamExt; + /// use std::time::{Duration, Instant}; + /// + /// # futures_lite::future::block_on(async { + /// let now = Instant::now(); + /// let period = Duration::from_secs(1); + /// Timer::interval_at(now, period).next().await; + /// # }); + /// ``` + pub fn interval_at(start: Instant, period: Duration) -> Timer { + Timer { + id_and_waker: None, + when: start, + period: period, + } + } } impl Drop for Timer { @@ -236,14 +283,33 @@ impl Drop for Timer { impl Future for Timer { type Output = Instant; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.poll_next(cx) { + Poll::Ready(Some(when)) => Poll::Ready(when), + Poll::Pending => Poll::Pending, + Poll::Ready(None) => unreachable!(), + } + } +} + +impl Stream for Timer { + type Item = Instant; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // Check if the timer has already fired. if Instant::now() >= self.when { if let Some((id, _)) = self.id_and_waker.take() { // Deregister the timer from the reactor. Reactor::get().remove_timer(self.when, id); } - Poll::Ready(self.when) + let when = self.when; + if let Some(next) = when.checked_add(self.period) { + self.when = next; + // Register the timer in the reactor. + let id = Reactor::get().insert_timer(self.when, cx.waker()); + self.id_and_waker = Some((id, cx.waker().clone())); + } + return Poll::Ready(Some(when)); } else { match &self.id_and_waker { None => { @@ -261,8 +327,8 @@ impl Future for Timer { } Some(_) => {} } - Poll::Pending } + Poll::Pending } } diff --git a/tests/timer.rs b/tests/timer.rs index d1a358e..cdd90db 100644 --- a/tests/timer.rs +++ b/tests/timer.rs @@ -5,7 +5,7 @@ use std::thread; use std::time::{Duration, Instant}; use async_io::Timer; -use futures_lite::{future, FutureExt}; +use futures_lite::{future, FutureExt, StreamExt}; fn spawn( f: impl Future + Send + 'static, @@ -30,6 +30,22 @@ fn smoke() { }); } +#[test] +fn interval() { + future::block_on(async { + let period = Duration::from_secs(1); + let jitter = Duration::from_millis(500); + let start = Instant::now(); + let mut timer = Timer::interval(period); + timer.next().await; + let elapsed = start.elapsed(); + assert!(elapsed >= period && elapsed - period < jitter); + timer.next().await; + let elapsed = start.elapsed(); + assert!(elapsed >= period * 2 && elapsed - period * 2 < jitter); + }); +} + #[test] fn poll_across_tasks() { future::block_on(async {