diff --git a/src/runtime/threadpool/builder.rs b/src/runtime/threadpool/builder.rs index 293eb29..5a5fb7f 100644 --- a/src/runtime/threadpool/builder.rs +++ b/src/runtime/threadpool/builder.rs @@ -163,7 +163,22 @@ impl Builder { let compat_reactor = compat_bg.reactor().clone(); let compat_sender: Arc>>> = Arc::new(RwLock::new(None)); - let compat_sender2 = compat_sender.clone(); + + // We need a weak ref here so that when we pass this into the + // `on_thread_start` closure it's stored as a _weak_ ref and not + // a strong one. This is important because tokio 0.2's runtime + // should shut down when the compat Runtime has been dropped. There + // can be a race condition where we hold onto an extra Arc, which holds + // a runtime handle beyond the drop. This causes the tokio 0.2 runtime to + // not shut down and leak fds. + // + // Tokio 0.2's threaded_scheduler will spawn threads that each contain an arced + // ref to the `on_thread_start` fn. If the runtime shuts down but there is still + // access to a runtime handle, the mio driver will not shutdown. To avoid this we + // only want the `on_thread_start` to hold a weak ref and attempt to upgrade, since + // the runtime will still be acitve the upgrade should work. Otherwise, somehow + // tokio started a new thread after its runtime has been dropped. + let compat_sender2 = Arc::downgrade(&compat_sender); let mut lock = compat_sender.write().unwrap(); let runtime = self @@ -172,8 +187,15 @@ impl Builder { .enable_all() .on_thread_start(move || { // We need the threadpool's sender to set up the default tokio - // 0.1 executor. - let lock = compat_sender2.read().unwrap(); + // 0.1 executor. We also need to upgrade the weak pointer. If the + // pointer is no longer valid, then the runtime has shut down and the + // handle is no longer available. + // + // This upgrade will only fail if the compat runtime has been dropped. + let sender = compat_sender2 + .upgrade() + .expect("Runtime dropped but thread started; this is a bug!"); + let lock = sender.read().unwrap(); let compat_sender = lock .as_ref() .expect("compat executor needs to be set before the pool is run!") @@ -193,6 +215,7 @@ impl Builder { inner: Some(Inner { runtime, compat_bg }), idle_rx, idle, + compat_sender, }; Ok(runtime) diff --git a/src/runtime/threadpool/mod.rs b/src/runtime/threadpool/mod.rs index 0997c0b..c92223f 100644 --- a/src/runtime/threadpool/mod.rs +++ b/src/runtime/threadpool/mod.rs @@ -13,7 +13,12 @@ use super::{ use futures_01::future::Future as Future01; use futures_util::{compat::Future01CompatExt, future::FutureExt}; -use std::{fmt, future::Future, io}; +use std::{ + fmt, + future::Future, + io, + sync::{Arc, RwLock}, +}; use tokio_02::{ runtime::{self, Handle}, task::JoinHandle, @@ -43,6 +48,10 @@ pub struct Runtime { /// Idleness tracking. idle: idle::Idle, idle_rx: idle::Rx, + + // This should store the only long-living strong ref to the handle, + // and once the Runtime is dropped, it should also be deallocated. + compat_sender: Arc>>>, } /// A future that resolves when the Tokio `Runtime` is shut down.