Skip to content

Commit bd4ccae

Browse files
authored
time: add abstraction for RwLock to remove poisoning aspect (tokio-rs#6807)
With tokio-rs#6779 we removed unnecessary allocations from the timerwheel by wrapping it in an `std::sync::RwLock`. Since the `Mutex` used in this part of the project uses an abstraction in `loom::sync::Mutex` to get rid of the poisoning aspects of `std::sync::Mutex` the same should probably be done for the used read-write lock struct. This commit introduces an abstraction to get rid of the poisoning aspects of `std::sync::RwLock` by introducing a wrapper to the `loom::sync` module similar to `loom::sync::Mutex`. Refs: tokio-rs#6779
1 parent 4ed0fa2 commit bd4ccae

File tree

7 files changed

+114
-44
lines changed

7 files changed

+114
-44
lines changed

tokio/src/loom/mocked.rs

+33-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ pub(crate) use loom::*;
22

33
pub(crate) mod sync {
44

5-
pub(crate) use loom::sync::MutexGuard;
5+
pub(crate) use loom::sync::{MutexGuard, RwLockReadGuard, RwLockWriteGuard};
66

77
#[derive(Debug)]
88
pub(crate) struct Mutex<T>(loom::sync::Mutex<T>);
@@ -30,6 +30,38 @@ pub(crate) mod sync {
3030
self.0.get_mut().unwrap()
3131
}
3232
}
33+
34+
#[derive(Debug)]
35+
pub(crate) struct RwLock<T>(loom::sync::RwLock<T>);
36+
37+
#[allow(dead_code)]
38+
impl<T> RwLock<T> {
39+
#[inline]
40+
pub(crate) fn new(t: T) -> Self {
41+
Self(loom::sync::RwLock::new(t))
42+
}
43+
44+
#[inline]
45+
pub(crate) fn read(&self) -> RwLockReadGuard<'_, T> {
46+
self.0.read().unwrap()
47+
}
48+
49+
#[inline]
50+
pub(crate) fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
51+
self.0.try_read().ok()
52+
}
53+
54+
#[inline]
55+
pub(crate) fn write(&self) -> RwLockWriteGuard<'_, T> {
56+
self.0.write().unwrap()
57+
}
58+
59+
#[inline]
60+
pub(crate) fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
61+
self.0.try_write().ok()
62+
}
63+
}
64+
3365
pub(crate) use loom::sync::*;
3466

3567
pub(crate) mod atomic {

tokio/src/loom/std/mod.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod barrier;
88
mod mutex;
99
#[cfg(all(feature = "parking_lot", not(miri)))]
1010
mod parking_lot;
11+
mod rwlock;
1112
mod unsafe_cell;
1213

1314
pub(crate) mod cell {
@@ -64,11 +65,14 @@ pub(crate) mod sync {
6465

6566
#[cfg(not(all(feature = "parking_lot", not(miri))))]
6667
#[allow(unused_imports)]
67-
pub(crate) use std::sync::{Condvar, MutexGuard, RwLock, RwLockReadGuard, WaitTimeoutResult};
68+
pub(crate) use std::sync::{Condvar, MutexGuard, RwLockReadGuard, WaitTimeoutResult};
6869

6970
#[cfg(not(all(feature = "parking_lot", not(miri))))]
7071
pub(crate) use crate::loom::std::mutex::Mutex;
7172

73+
#[cfg(not(all(feature = "parking_lot", not(miri))))]
74+
pub(crate) use crate::loom::std::rwlock::RwLock;
75+
7276
pub(crate) mod atomic {
7377
pub(crate) use crate::loom::std::atomic_u16::AtomicU16;
7478
pub(crate) use crate::loom::std::atomic_u32::AtomicU32;

tokio/src/loom/std/parking_lot.rs

+12-4
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,20 @@ impl<T> RwLock<T> {
9696
RwLock(PhantomData, parking_lot::RwLock::new(t))
9797
}
9898

99-
pub(crate) fn read(&self) -> LockResult<RwLockReadGuard<'_, T>> {
100-
Ok(RwLockReadGuard(PhantomData, self.1.read()))
99+
pub(crate) fn read(&self) -> RwLockReadGuard<'_, T> {
100+
RwLockReadGuard(PhantomData, self.1.read())
101101
}
102102

103-
pub(crate) fn write(&self) -> LockResult<RwLockWriteGuard<'_, T>> {
104-
Ok(RwLockWriteGuard(PhantomData, self.1.write()))
103+
pub(crate) fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
104+
Some(RwLockReadGuard(PhantomData, self.1.read()))
105+
}
106+
107+
pub(crate) fn write(&self) -> RwLockWriteGuard<'_, T> {
108+
RwLockWriteGuard(PhantomData, self.1.write())
109+
}
110+
111+
pub(crate) fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
112+
Some(RwLockWriteGuard(PhantomData, self.1.write()))
105113
}
106114
}
107115

tokio/src/loom/std/rwlock.rs

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use std::sync::{self, RwLockReadGuard, RwLockWriteGuard, TryLockError};
2+
3+
/// Adapter for `std::sync::RwLock` that removes the poisoning aspects
4+
/// from its api.
5+
#[derive(Debug)]
6+
pub(crate) struct RwLock<T: ?Sized>(sync::RwLock<T>);
7+
8+
#[allow(dead_code)]
9+
impl<T> RwLock<T> {
10+
#[inline]
11+
pub(crate) fn new(t: T) -> Self {
12+
Self(sync::RwLock::new(t))
13+
}
14+
15+
#[inline]
16+
pub(crate) fn read(&self) -> RwLockReadGuard<'_, T> {
17+
match self.0.read() {
18+
Ok(guard) => guard,
19+
Err(p_err) => p_err.into_inner(),
20+
}
21+
}
22+
23+
#[inline]
24+
pub(crate) fn try_read(&self) -> Option<RwLockReadGuard<'_, T>> {
25+
match self.0.try_read() {
26+
Ok(guard) => Some(guard),
27+
Err(TryLockError::Poisoned(p_err)) => Some(p_err.into_inner()),
28+
Err(TryLockError::WouldBlock) => None,
29+
}
30+
}
31+
32+
#[inline]
33+
pub(crate) fn write(&self) -> RwLockWriteGuard<'_, T> {
34+
match self.0.write() {
35+
Ok(guard) => guard,
36+
Err(p_err) => p_err.into_inner(),
37+
}
38+
}
39+
40+
#[inline]
41+
pub(crate) fn try_write(&self) -> Option<RwLockWriteGuard<'_, T>> {
42+
match self.0.try_write() {
43+
Ok(guard) => Some(guard),
44+
Err(TryLockError::Poisoned(p_err)) => Some(p_err.into_inner()),
45+
Err(TryLockError::WouldBlock) => None,
46+
}
47+
}
48+
}

tokio/src/runtime/time/mod.rs

+6-28
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@ pub(crate) use source::TimeSource;
2020
mod wheel;
2121

2222
use crate::loom::sync::atomic::{AtomicBool, Ordering};
23-
use crate::loom::sync::Mutex;
23+
use crate::loom::sync::{Mutex, RwLock};
2424
use crate::runtime::driver::{self, IoHandle, IoStack};
2525
use crate::time::error::Error;
2626
use crate::time::{Clock, Duration};
2727
use crate::util::WakeList;
2828

2929
use crate::loom::sync::atomic::AtomicU64;
3030
use std::fmt;
31-
use std::sync::RwLock;
3231
use std::{num::NonZeroU64, ptr::NonNull};
3332

3433
struct AtomicOptionNonZeroU64(AtomicU64);
@@ -199,12 +198,7 @@ impl Driver {
199198

200199
// Finds out the min expiration time to park.
201200
let expiration_time = {
202-
let mut wheels_lock = rt_handle
203-
.time()
204-
.inner
205-
.wheels
206-
.write()
207-
.expect("Timer wheel shards poisoned");
201+
let mut wheels_lock = rt_handle.time().inner.wheels.write();
208202
let expiration_time = wheels_lock
209203
.0
210204
.iter_mut()
@@ -324,11 +318,7 @@ impl Handle {
324318
// Returns the next wakeup time of this shard.
325319
pub(self) fn process_at_sharded_time(&self, id: u32, mut now: u64) -> Option<u64> {
326320
let mut waker_list = WakeList::new();
327-
let mut wheels_lock = self
328-
.inner
329-
.wheels
330-
.read()
331-
.expect("Timer wheel shards poisoned");
321+
let mut wheels_lock = self.inner.wheels.read();
332322
let mut lock = wheels_lock.lock_sharded_wheel(id);
333323

334324
if now < lock.elapsed() {
@@ -355,11 +345,7 @@ impl Handle {
355345

356346
waker_list.wake_all();
357347

358-
wheels_lock = self
359-
.inner
360-
.wheels
361-
.read()
362-
.expect("Timer wheel shards poisoned");
348+
wheels_lock = self.inner.wheels.read();
363349
lock = wheels_lock.lock_sharded_wheel(id);
364350
}
365351
}
@@ -384,11 +370,7 @@ impl Handle {
384370
/// `add_entry` must not be called concurrently.
385371
pub(self) unsafe fn clear_entry(&self, entry: NonNull<TimerShared>) {
386372
unsafe {
387-
let wheels_lock = self
388-
.inner
389-
.wheels
390-
.read()
391-
.expect("Timer wheel shards poisoned");
373+
let wheels_lock = self.inner.wheels.read();
392374
let mut lock = wheels_lock.lock_sharded_wheel(entry.as_ref().shard_id());
393375

394376
if entry.as_ref().might_be_registered() {
@@ -412,11 +394,7 @@ impl Handle {
412394
entry: NonNull<TimerShared>,
413395
) {
414396
let waker = unsafe {
415-
let wheels_lock = self
416-
.inner
417-
.wheels
418-
.read()
419-
.expect("Timer wheel shards poisoned");
397+
let wheels_lock = self.inner.wheels.read();
420398

421399
let mut lock = wheels_lock.lock_sharded_wheel(entry.as_ref().shard_id());
422400

tokio/src/sync/broadcast.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -599,7 +599,7 @@ impl<T> Sender<T> {
599599
tail.pos = tail.pos.wrapping_add(1);
600600

601601
// Get the slot
602-
let mut slot = self.shared.buffer[idx].write().unwrap();
602+
let mut slot = self.shared.buffer[idx].write();
603603

604604
// Track the position
605605
slot.pos = pos;
@@ -695,7 +695,7 @@ impl<T> Sender<T> {
695695
while low < high {
696696
let mid = low + (high - low) / 2;
697697
let idx = base_idx.wrapping_add(mid) & self.shared.mask;
698-
if self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0 {
698+
if self.shared.buffer[idx].read().rem.load(SeqCst) == 0 {
699699
low = mid + 1;
700700
} else {
701701
high = mid;
@@ -737,7 +737,7 @@ impl<T> Sender<T> {
737737
let tail = self.shared.tail.lock();
738738

739739
let idx = (tail.pos.wrapping_sub(1) & self.shared.mask as u64) as usize;
740-
self.shared.buffer[idx].read().unwrap().rem.load(SeqCst) == 0
740+
self.shared.buffer[idx].read().rem.load(SeqCst) == 0
741741
}
742742

743743
/// Returns the number of active receivers.
@@ -1057,7 +1057,7 @@ impl<T> Receiver<T> {
10571057
let idx = (self.next & self.shared.mask as u64) as usize;
10581058

10591059
// The slot holding the next value to read
1060-
let mut slot = self.shared.buffer[idx].read().unwrap();
1060+
let mut slot = self.shared.buffer[idx].read();
10611061

10621062
if slot.pos != self.next {
10631063
// Release the `slot` lock before attempting to acquire the `tail`
@@ -1074,7 +1074,7 @@ impl<T> Receiver<T> {
10741074
let mut tail = self.shared.tail.lock();
10751075

10761076
// Acquire slot lock again
1077-
slot = self.shared.buffer[idx].read().unwrap();
1077+
slot = self.shared.buffer[idx].read();
10781078

10791079
// Make sure the position did not change. This could happen in the
10801080
// unlikely event that the buffer is wrapped between dropping the

tokio/src/sync/watch.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,7 @@ impl<T> Receiver<T> {
575575
/// assert_eq!(*rx.borrow(), "hello");
576576
/// ```
577577
pub fn borrow(&self) -> Ref<'_, T> {
578-
let inner = self.shared.value.read().unwrap();
578+
let inner = self.shared.value.read();
579579

580580
// After obtaining a read-lock no concurrent writes could occur
581581
// and the loaded version matches that of the borrowed reference.
@@ -622,7 +622,7 @@ impl<T> Receiver<T> {
622622
/// [`changed`]: Receiver::changed
623623
/// [`borrow`]: Receiver::borrow
624624
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
625-
let inner = self.shared.value.read().unwrap();
625+
let inner = self.shared.value.read();
626626

627627
// After obtaining a read-lock no concurrent writes could occur
628628
// and the loaded version matches that of the borrowed reference.
@@ -813,7 +813,7 @@ impl<T> Receiver<T> {
813813
let mut closed = false;
814814
loop {
815815
{
816-
let inner = self.shared.value.read().unwrap();
816+
let inner = self.shared.value.read();
817817

818818
let new_version = self.shared.state.load().version();
819819
let has_changed = self.version != new_version;
@@ -1087,7 +1087,7 @@ impl<T> Sender<T> {
10871087
{
10881088
{
10891089
// Acquire the write lock and update the value.
1090-
let mut lock = self.shared.value.write().unwrap();
1090+
let mut lock = self.shared.value.write();
10911091

10921092
// Update the value and catch possible panic inside func.
10931093
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| modify(&mut lock)));
@@ -1164,7 +1164,7 @@ impl<T> Sender<T> {
11641164
/// assert_eq!(*tx.borrow(), "hello");
11651165
/// ```
11661166
pub fn borrow(&self) -> Ref<'_, T> {
1167-
let inner = self.shared.value.read().unwrap();
1167+
let inner = self.shared.value.read();
11681168

11691169
// The sender/producer always sees the current version
11701170
let has_changed = false;

0 commit comments

Comments
 (0)