Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix possible fd leak with threaded runtime #28

Merged
merged 4 commits into from
Feb 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions src/runtime/threadpool/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,22 @@ impl Builder {
let compat_reactor = compat_bg.reactor().clone();
let compat_sender: Arc<RwLock<Option<super::CompatSpawner<tokio_02::runtime::Handle>>>> =
Arc::new(RwLock::new(None));
let compat_sender2 = compat_sender.clone();

// We need a weak ref here so that when we pass this into the
// `on_thread_start` closure it's stored as a _weak_ ref and not
// a strong one. This is important because tokio 0.2's runtime
// should shut down when the compat Runtime has been dropped. There
// can be a race condition where we hold onto an extra Arc, which holds
// a runtime handle beyond the drop. This causes the tokio 0.2 runtime to
// not shut down and leak fds.
//
// Tokio 0.2's threaded_scheduler will spawn threads that each contain an arced
// ref to the `on_thread_start` fn. If the runtime shuts down but there is still
// access to a runtime handle, the mio driver will not shutdown. To avoid this we
// only want the `on_thread_start` to hold a weak ref and attempt to upgrade, since
// the runtime will still be acitve the upgrade should work. Otherwise, somehow
// tokio started a new thread after its runtime has been dropped.
let compat_sender2 = Arc::downgrade(&compat_sender);
let mut lock = compat_sender.write().unwrap();

let runtime = self
Expand All @@ -172,8 +187,15 @@ impl Builder {
.enable_all()
.on_thread_start(move || {
// We need the threadpool's sender to set up the default tokio
// 0.1 executor.
let lock = compat_sender2.read().unwrap();
// 0.1 executor. We also need to upgrade the weak pointer. If the
// pointer is no longer valid, then the runtime has shut down and the
// handle is no longer available.
//
// This upgrade will only fail if the compat runtime has been dropped.
let sender = compat_sender2
.upgrade()
.expect("Runtime dropped but thread started; this is a bug!");
let lock = sender.read().unwrap();
let compat_sender = lock
.as_ref()
.expect("compat executor needs to be set before the pool is run!")
Expand All @@ -193,6 +215,7 @@ impl Builder {
inner: Some(Inner { runtime, compat_bg }),
idle_rx,
idle,
compat_sender,
};

Ok(runtime)
Expand Down
11 changes: 10 additions & 1 deletion src/runtime/threadpool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,12 @@ use super::{

use futures_01::future::Future as Future01;
use futures_util::{compat::Future01CompatExt, future::FutureExt};
use std::{fmt, future::Future, io};
use std::{
fmt,
future::Future,
io,
sync::{Arc, RwLock},
};
use tokio_02::{
runtime::{self, Handle},
task::JoinHandle,
Expand Down Expand Up @@ -43,6 +48,10 @@ pub struct Runtime {
/// Idleness tracking.
idle: idle::Idle,
idle_rx: idle::Rx,

// This should store the only long-living strong ref to the handle,
// and once the Runtime is dropped, it should also be deallocated.
compat_sender: Arc<RwLock<Option<CompatSpawner<tokio_02::runtime::Handle>>>>,
}

/// A future that resolves when the Tokio `Runtime` is shut down.
Expand Down