Skip to content

Commit a82bdee

Browse files
authored
sync: handle panic during mpsc drop (tokio-rs#7094)
1 parent 435e390 commit a82bdee

File tree

2 files changed

+72
-2
lines changed

2 files changed

+72
-2
lines changed

tokio/src/sync/mpsc/chan.rs

+26-2
Original file line numberDiff line numberDiff line change
@@ -490,10 +490,34 @@ impl<T, S: Semaphore> Drop for Rx<T, S> {
490490

491491
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
492492
let rx_fields = unsafe { &mut *rx_fields_ptr };
493+
struct Guard<'a, T, S: Semaphore> {
494+
list: &'a mut list::Rx<T>,
495+
tx: &'a list::Tx<T>,
496+
sem: &'a S,
497+
}
498+
499+
impl<'a, T, S: Semaphore> Guard<'a, T, S> {
500+
fn drain(&mut self) {
501+
// call T's destructor.
502+
while let Some(Value(_)) = self.list.pop(self.tx) {
503+
self.sem.add_permit();
504+
}
505+
}
506+
}
493507

494-
while let Some(Value(_)) = rx_fields.list.pop(&self.inner.tx) {
495-
self.inner.semaphore.add_permit();
508+
impl<'a, T, S: Semaphore> Drop for Guard<'a, T, S> {
509+
fn drop(&mut self) {
510+
self.drain();
511+
}
496512
}
513+
514+
let mut guard = Guard {
515+
list: &mut rx_fields.list,
516+
tx: &self.inner.tx,
517+
sem: &self.inner.semaphore,
518+
};
519+
520+
guard.drain();
497521
});
498522
}
499523
}

tokio/tests/sync_mpsc.rs

+46
Original file line numberDiff line numberDiff line change
@@ -1454,4 +1454,50 @@ async fn test_is_empty_32_msgs() {
14541454
}
14551455
}
14561456

1457+
#[test]
1458+
#[cfg(not(panic = "abort"))]
1459+
fn drop_all_elements_during_panic() {
1460+
use std::sync::atomic::AtomicUsize;
1461+
use std::sync::atomic::Ordering::Relaxed;
1462+
use tokio::sync::mpsc::UnboundedReceiver;
1463+
use tokio::sync::mpsc::UnboundedSender;
1464+
1465+
static COUNTER: AtomicUsize = AtomicUsize::new(0);
1466+
1467+
struct A(bool);
1468+
impl Drop for A {
1469+
// cause a panic when inner value is `true`.
1470+
fn drop(&mut self) {
1471+
COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1472+
if self.0 {
1473+
panic!("panic!")
1474+
}
1475+
}
1476+
}
1477+
1478+
fn func(tx: UnboundedSender<A>, rx: UnboundedReceiver<A>) {
1479+
tx.send(A(true)).unwrap();
1480+
tx.send(A(false)).unwrap();
1481+
tx.send(A(false)).unwrap();
1482+
1483+
drop(rx);
1484+
1485+
// `mpsc::Rx`'s drop is called and gets panicked while dropping the first value,
1486+
// but will keep dropping following elements.
1487+
}
1488+
1489+
let (tx, rx) = mpsc::unbounded_channel();
1490+
1491+
let _ = panic::catch_unwind(panic::AssertUnwindSafe(|| {
1492+
func(tx.clone(), rx);
1493+
}));
1494+
1495+
// all A's destructor should be called at this point, even before `mpsc::Chan`'s
1496+
// drop gets called.
1497+
assert_eq!(COUNTER.load(Relaxed), 3);
1498+
1499+
drop(tx);
1500+
// `mpsc::Chan`'s drop is called, freeing the `Block` memory allocation.
1501+
}
1502+
14571503
fn is_debug<T: fmt::Debug>(_: &T) {}

0 commit comments

Comments
 (0)