From 697ed1e53eb935dad2fcef61d505790dab317fbd Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 28 Feb 2020 15:38:52 -0500 Subject: [PATCH 1/4] Fix possible fd leak with threaded runtime This PR introduces a fix where creating many runtimes in a loop will cause an fd leak. This is because storing a strong arc within each `on_thread_start` fn will cause the runtime handle to not be deallocated when the `compat::Runtime` is dropped. To avoid this we only store a weak pointer within the `on_thread_start` fn and attempt to upgrade it to get the handle. Since, `on_thread_start` is generally always called while the tokio runtime is not dropped the upgrade should not fail. This then allows us to drop the runtime and deallocate the handle by storing the only long lived strong arc within `compat::Runtime`. I've verified this fixes our repro and doesn't leak fds on our benchmarks where we orignally discovered this issue. Closes #27 Signed-off-by: Lucio Franco --- src/runtime/threadpool/builder.rs | 28 +++++++++++++++++++++++++--- src/runtime/threadpool/mod.rs | 11 ++++++++++- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/src/runtime/threadpool/builder.rs b/src/runtime/threadpool/builder.rs index 293eb29..125e3a4 100644 --- a/src/runtime/threadpool/builder.rs +++ b/src/runtime/threadpool/builder.rs @@ -163,7 +163,21 @@ impl Builder { let compat_reactor = compat_bg.reactor().clone(); let compat_sender: Arc>>> = Arc::new(RwLock::new(None)); - let compat_sender2 = compat_sender.clone(); + + // We need a weak ref here so that when we pass this into the + // `on_thread_start` closure its stored as a _weak_ ref and not + // a strong one. This is important because tokio 0.2's runtime + // should shutdown when the compat Runtime has been dropped. There + // can be a race condition where we hold onto an extra Arc which holds + // a runtime handle beyond the drop. This casuses the tokio runtime to + // not shutdown and leak fds. + // + // Tokio's threaded_scheduler will spawn threads that each contain a arced + // ref to the `on_thread_start` fn. If the runtime shutsdown but there is still + // access to a runtime handle the mio driver will not shutdown. To avoid this we + // only want the `on_thread_start` to hold a weak ref and attempt to check async if + // the runtime has been shutdown by upgrading the weak pointer. + let compat_sender2 = Arc::downgrade(&compat_sender); let mut lock = compat_sender.write().unwrap(); let runtime = self @@ -172,12 +186,19 @@ impl Builder { .enable_all() .on_thread_start(move || { // We need the threadpool's sender to set up the default tokio - // 0.1 executor. - let lock = compat_sender2.read().unwrap(); + // 0.1 executor. We also need to upgrade the weak pointer, if the + // pointer is no longer valid then the runtime has shutdown and the + // handle is no longer available. + // + // This upgrade will only fail if the compat runtime has been dropped. + let sender = compat_sender2.upgrade().expect("Compat runtime shutdown."); + let lock = sender.read().unwrap(); let compat_sender = lock .as_ref() .expect("compat executor needs to be set before the pool is run!") .clone(); + drop(lock); + drop(sender); compat::set_guards(compat_sender, &compat_timer, &compat_reactor); }) .on_thread_stop(|| { @@ -193,6 +214,7 @@ impl Builder { inner: Some(Inner { runtime, compat_bg }), idle_rx, idle, + compat_sender, }; Ok(runtime) diff --git a/src/runtime/threadpool/mod.rs b/src/runtime/threadpool/mod.rs index 0997c0b..64f6872 100644 --- a/src/runtime/threadpool/mod.rs +++ b/src/runtime/threadpool/mod.rs @@ -13,7 +13,12 @@ use super::{ use futures_01::future::Future as Future01; use futures_util::{compat::Future01CompatExt, future::FutureExt}; -use std::{fmt, future::Future, io}; +use std::{ + fmt, + future::Future, + io, + sync::{Arc, RwLock}, +}; use tokio_02::{ runtime::{self, Handle}, task::JoinHandle, @@ -43,6 +48,10 @@ pub struct Runtime { /// Idleness tracking. idle: idle::Idle, idle_rx: idle::Rx, + + // This should store the only long living strong ref to the handle + // and once Runtime is dropped it should also deallocate. + compat_sender: Arc>>>, } /// A future that resolves when the Tokio `Runtime` is shut down. From 1aa63a75f6667b310d7a680180a3d5b1a2317d3e Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 28 Feb 2020 16:25:16 -0500 Subject: [PATCH 2/4] Apply suggestions from code review Co-Authored-By: Eliza Weisman --- src/runtime/threadpool/builder.rs | 18 +++++++++--------- src/runtime/threadpool/mod.rs | 4 ++-- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/runtime/threadpool/builder.rs b/src/runtime/threadpool/builder.rs index 125e3a4..5cc5f5b 100644 --- a/src/runtime/threadpool/builder.rs +++ b/src/runtime/threadpool/builder.rs @@ -165,15 +165,15 @@ impl Builder { Arc::new(RwLock::new(None)); // We need a weak ref here so that when we pass this into the - // `on_thread_start` closure its stored as a _weak_ ref and not + // `on_thread_start` closure it's stored as a _weak_ ref and not // a strong one. This is important because tokio 0.2's runtime - // should shutdown when the compat Runtime has been dropped. There - // can be a race condition where we hold onto an extra Arc which holds - // a runtime handle beyond the drop. This casuses the tokio runtime to - // not shutdown and leak fds. + // should shut down when the compat Runtime has been dropped. There + // can be a race condition where we hold onto an extra Arc, which holds + // a runtime handle beyond the drop. This causes the tokio 0.2 runtime to + // not shut down and leak fds. // - // Tokio's threaded_scheduler will spawn threads that each contain a arced - // ref to the `on_thread_start` fn. If the runtime shutsdown but there is still + // Tokio 0.2's threaded_scheduler will spawn threads that each contain an arced + // ref to the `on_thread_start` fn. If the runtime shuts down but there is still // access to a runtime handle the mio driver will not shutdown. To avoid this we // only want the `on_thread_start` to hold a weak ref and attempt to check async if // the runtime has been shutdown by upgrading the weak pointer. @@ -186,8 +186,8 @@ impl Builder { .enable_all() .on_thread_start(move || { // We need the threadpool's sender to set up the default tokio - // 0.1 executor. We also need to upgrade the weak pointer, if the - // pointer is no longer valid then the runtime has shutdown and the + // 0.1 executor. We also need to upgrade the weak pointer. If the + // pointer is no longer valid, then the runtime has shut down and the // handle is no longer available. // // This upgrade will only fail if the compat runtime has been dropped. diff --git a/src/runtime/threadpool/mod.rs b/src/runtime/threadpool/mod.rs index 64f6872..c92223f 100644 --- a/src/runtime/threadpool/mod.rs +++ b/src/runtime/threadpool/mod.rs @@ -49,8 +49,8 @@ pub struct Runtime { idle: idle::Idle, idle_rx: idle::Rx, - // This should store the only long living strong ref to the handle - // and once Runtime is dropped it should also deallocate. + // This should store the only long-living strong ref to the handle, + // and once the Runtime is dropped, it should also be deallocated. compat_sender: Arc>>>, } From 120025daf45cb6b47223a272d95b38d95436a5c5 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 28 Feb 2020 16:26:04 -0500 Subject: [PATCH 3/4] Update src/runtime/threadpool/builder.rs Co-Authored-By: Eliza Weisman --- src/runtime/threadpool/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/runtime/threadpool/builder.rs b/src/runtime/threadpool/builder.rs index 5cc5f5b..ce96416 100644 --- a/src/runtime/threadpool/builder.rs +++ b/src/runtime/threadpool/builder.rs @@ -174,7 +174,7 @@ impl Builder { // // Tokio 0.2's threaded_scheduler will spawn threads that each contain an arced // ref to the `on_thread_start` fn. If the runtime shuts down but there is still - // access to a runtime handle the mio driver will not shutdown. To avoid this we + // access to a runtime handle, the mio driver will not shutdown. To avoid this we // only want the `on_thread_start` to hold a weak ref and attempt to check async if // the runtime has been shutdown by upgrading the weak pointer. let compat_sender2 = Arc::downgrade(&compat_sender); From 59b27ea18af1777b1bd81a71db62a058ebf97caa Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Fri, 28 Feb 2020 16:28:15 -0500 Subject: [PATCH 4/4] Address comments --- src/runtime/threadpool/builder.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/runtime/threadpool/builder.rs b/src/runtime/threadpool/builder.rs index ce96416..5a5fb7f 100644 --- a/src/runtime/threadpool/builder.rs +++ b/src/runtime/threadpool/builder.rs @@ -175,8 +175,9 @@ impl Builder { // Tokio 0.2's threaded_scheduler will spawn threads that each contain an arced // ref to the `on_thread_start` fn. If the runtime shuts down but there is still // access to a runtime handle, the mio driver will not shutdown. To avoid this we - // only want the `on_thread_start` to hold a weak ref and attempt to check async if - // the runtime has been shutdown by upgrading the weak pointer. + // only want the `on_thread_start` to hold a weak ref and attempt to upgrade, since + // the runtime will still be acitve the upgrade should work. Otherwise, somehow + // tokio started a new thread after its runtime has been dropped. let compat_sender2 = Arc::downgrade(&compat_sender); let mut lock = compat_sender.write().unwrap(); @@ -191,14 +192,14 @@ impl Builder { // handle is no longer available. // // This upgrade will only fail if the compat runtime has been dropped. - let sender = compat_sender2.upgrade().expect("Compat runtime shutdown."); + let sender = compat_sender2 + .upgrade() + .expect("Runtime dropped but thread started; this is a bug!"); let lock = sender.read().unwrap(); let compat_sender = lock .as_ref() .expect("compat executor needs to be set before the pool is run!") .clone(); - drop(lock); - drop(sender); compat::set_guards(compat_sender, &compat_timer, &compat_reactor); }) .on_thread_stop(|| {