Skip to content

Commit b2d67ef

Browse files
committed
time: fix wake-up with interval on Ready (#5551)
When `tokio::time::Interval::poll_tick()` returns `Poll::Pending`, it schedules itself for being woken up again through the waker of the passed context, which is correct behavior. However when `Poll::Ready(_)` is returned, the interval timer should be reset but not scheduled to be woken up again as this is up to the caller. This commit fixes the bug by introducing a `reset_without_reregister` method on `TimerEntry` which is called by `Intervall::poll_tick(cx)` in case the delay poll returns `Poll::Ready(_)`.
1 parent f177aad commit b2d67ef

File tree

4 files changed

+108
-1
lines changed

4 files changed

+108
-1
lines changed

tokio/src/runtime/time/entry.rs

+11
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,17 @@ impl TimerEntry {
543543
}
544544
}
545545

546+
pub(crate) fn reset_without_reregister(mut self: Pin<&mut Self>, new_time: Instant) {
547+
unsafe { self.as_mut().get_unchecked_mut() }.deadline = new_time;
548+
unsafe { self.as_mut().get_unchecked_mut() }.registered = false;
549+
550+
let tick = self.driver().time_source().deadline_to_tick(new_time);
551+
552+
if self.inner().extend_expiration(tick).is_ok() {
553+
return;
554+
}
555+
}
556+
546557
pub(crate) fn poll_elapsed(
547558
mut self: Pin<&mut Self>,
548559
cx: &mut Context<'_>,

tokio/src/time/interval.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,10 @@ impl Interval {
482482
timeout + self.period
483483
};
484484

485-
self.delay.as_mut().reset(next);
485+
// When we arrive here, the internal delay returned `Poll::Ready`.
486+
// Reset the delay but do not register it. It should be registered with
487+
// the next call to [`poll_tick`].
488+
self.delay.as_mut().reset_without_reregister(next);
486489

487490
// Return the time when we were scheduled to tick
488491
Poll::Ready(timeout)

tokio/src/time/sleep.rs

+20
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,26 @@ impl Sleep {
353353
self.reset_inner(deadline)
354354
}
355355

356+
/// Resets the `Sleep` instance to a new deadline without reregistering it
357+
/// to be woken up.
358+
///
359+
/// Calling this function allows changing the instant at which the `Sleep`
360+
/// future completes without having to create new associated state and
361+
/// without having it registered. This is required in e.g. the
362+
/// [crate::time::Interval] where we want to reset the internal [Sleep]
363+
/// without having it wake up the last task that polled it.
364+
///
365+
/// This function can be called both before and after the future has
366+
/// completed.
367+
///
368+
/// To call this method, you will usually combine the call with
369+
/// [`Pin::as_mut`], which lets you call the method without consuming the
370+
/// `Sleep` itself.
371+
pub fn reset_without_reregister(self: Pin<&mut Self>, deadline: Instant) {
372+
let mut me = self.project();
373+
me.entry.as_mut().reset_without_reregister(deadline);
374+
}
375+
356376
fn reset_inner(self: Pin<&mut Self>, deadline: Instant) {
357377
let mut me = self.project();
358378
me.entry.as_mut().reset(deadline);

tokio/tests/time_interval.rs

+73
Original file line numberDiff line numberDiff line change
@@ -209,3 +209,76 @@ fn poll_next(interval: &mut task::Spawn<time::Interval>) -> Poll<Instant> {
209209
fn ms(n: u64) -> Duration {
210210
Duration::from_millis(n)
211211
}
212+
213+
mod tmp_tests {
214+
use std::{
215+
pin::Pin,
216+
task::{Context, Poll},
217+
time::Instant,
218+
};
219+
220+
use crate::time::Interval;
221+
use futures::{pin_mut, Stream, StreamExt};
222+
223+
struct IntervalStreamer {
224+
start: Instant,
225+
counter: u32,
226+
timer: Interval,
227+
}
228+
229+
impl Stream for IntervalStreamer {
230+
type Item = u32;
231+
232+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
233+
let this = Pin::into_inner(self);
234+
235+
if this.counter > 12 {
236+
return Poll::Ready(None);
237+
}
238+
239+
match this.timer.poll_tick(cx) {
240+
Poll::Pending => {
241+
println!(
242+
"Timer returned Poll::Pending after {:?}",
243+
this.start.elapsed()
244+
);
245+
Poll::Pending
246+
}
247+
Poll::Ready(_) => {
248+
println!(
249+
"Timer returned Poll::Ready after {:?}",
250+
this.start.elapsed()
251+
);
252+
this.counter += 1;
253+
if this.counter % 4 == 0 {
254+
Poll::Ready(Some(this.counter))
255+
} else {
256+
// Schedule this task for wake-up
257+
cx.waker().wake_by_ref();
258+
Poll::Pending
259+
}
260+
}
261+
}
262+
}
263+
}
264+
265+
#[tokio::test]
266+
async fn reset_without_reregister() {
267+
let stream = IntervalStreamer {
268+
start: Instant::now(),
269+
counter: 0,
270+
timer: crate::time::interval(std::time::Duration::from_millis(10)),
271+
};
272+
273+
pin_mut!(stream);
274+
275+
let mut results = Vec::with_capacity(4);
276+
while let Some(item) = stream.next().await {
277+
println!("Stream yielded an item: {}", item);
278+
results.push(item);
279+
}
280+
281+
dbg!(&results);
282+
assert_eq!(results, vec![4, 8, 12]);
283+
}
284+
}

0 commit comments

Comments
 (0)