Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync: Remove readiness assertion in `watch::Receiver::changed() #2839

Merged
merged 3 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions tokio/src/sync/tests/loom_watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,30 @@ use loom::thread;
#[test]
fn smoke() {
loom::model(|| {
let (tx, mut rx) = watch::channel(1);
let (tx, mut rx1) = watch::channel(1);
let mut rx2 = rx1.clone();
let mut rx3 = rx1.clone();
let mut rx4 = rx1.clone();
let mut rx5 = rx1.clone();

let th = thread::spawn(move || {
tx.send(2).unwrap();
});

block_on(rx.changed()).unwrap();
assert_eq!(*rx.borrow(), 2);
block_on(rx1.changed()).unwrap();
assert_eq!(*rx1.borrow(), 2);

block_on(rx2.changed()).unwrap();
assert_eq!(*rx2.borrow(), 2);

block_on(rx3.changed()).unwrap();
assert_eq!(*rx3.borrow(), 2);

block_on(rx4.changed()).unwrap();
assert_eq!(*rx4.borrow(), 2);

block_on(rx5.changed()).unwrap();
assert_eq!(*rx5.borrow(), 2);

th.join().unwrap();
})
Expand Down
24 changes: 16 additions & 8 deletions tokio/src/sync/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,14 +251,22 @@ impl<T> Receiver<T> {
let notified = self.shared.notify_rx.notified();
pin!(notified);

// Polling the future once is guaranteed to return `Pending` as `watch`
// only notifies using `notify_waiters`.
crate::future::poll_fn(|cx| {
let res = Pin::new(&mut notified).poll(cx);
assert!(!res.is_ready());
Poll::Ready(())
})
.await;
// Polling the future here has dual purpose. The first one is to register
// the waiter so when `notify_waiters` is called it is notified. The second
// is to cover the case where another instance of `Notiified` has been dropped
// without receiving its notification. If that was the case polling the
// future for the first time will use this "lost" notification and return
// `Ready` immediatelly without registering any waiter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a couple typos and grammar nits:

Suggested change
// Polling the future here has dual purpose. The first one is to register
// the waiter so when `notify_waiters` is called it is notified. The second
// is to cover the case where another instance of `Notiified` has been dropped
// without receiving its notification. If that was the case polling the
// future for the first time will use this "lost" notification and return
// `Ready` immediatelly without registering any waiter
// Polling the future here has a dual purpose. The first one is to register
// the waiter so that it is notified when `notify_waiters` is called. The second
// is to cover the case where another instance of `Notified` has been dropped
// without receiving its notification. If this has happened, polling the future
// for the first time will use this "lost" notification and return `Ready`
// immediately without registering any waiter.

let aquired_lost_notification =
crate::future::poll_fn(|cx| match Pin::new(&mut notified).poll(cx) {
Poll::Ready(()) => Poll::Ready(true),
Poll::Pending => Poll::Ready(false),
})
.await;

if aquired_lost_notification {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, take it or leave it: "acquired_lost_notification" is kind of a mouthful, what about something like

Suggested change
let aquired_lost_notification =
crate::future::poll_fn(|cx| match Pin::new(&mut notified).poll(cx) {
Poll::Ready(()) => Poll::Ready(true),
Poll::Pending => Poll::Ready(false),
})
.await;
if aquired_lost_notification {
let already_notified =
crate::future::poll_fn(|cx| match Pin::new(&mut notified).poll(cx) {
Poll::Ready(()) => Poll::Ready(true),
Poll::Pending => Poll::Ready(false),
})
.await;
if already_notified {

(also, "acquired" is spelled wrong in the code, so we should fix that even if we don't change the name :) )

return Ok(());
}

if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
return ret;
Expand Down