diff --git a/Cargo.lock b/Cargo.lock
index 751fed588..408401584 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2199,7 +2199,6 @@ dependencies = [
  "rand",
  "serde",
  "serde_json",
- "serial_test",
  "sha2",
  "tokio",
  "tokio-stream",
@@ -2240,6 +2239,7 @@ dependencies = [
  "prost",
  "prost-types",
  "rand",
+ "rlimit",
  "serde",
  "serde_json",
  "sha2",
@@ -2280,6 +2280,7 @@ dependencies = [
  "scopeguard",
  "serde",
  "serde_json5",
+ "serial_test",
  "shlex",
  "tokio",
  "tokio-stream",
@@ -2834,6 +2835,15 @@ dependencies = [
  "windows-sys 0.52.0",
 ]
 
+[[package]]
+name = "rlimit"
+version = "0.10.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7043b63bd0cd1aaa628e476b80e6d4023a3b50eb32789f2728908107bd0c793a"
+dependencies = [
+ "libc",
+]
+
 [[package]]
 name = "roxmltree"
 version = "0.14.1"
diff --git a/nativelink-config/src/cas_server.rs b/nativelink-config/src/cas_server.rs
index f64dbe7e2..a2dfd6bc5 100644
--- a/nativelink-config/src/cas_server.rs
+++ b/nativelink-config/src/cas_server.rs
@@ -716,21 +716,6 @@ pub struct GlobalConfig {
     #[serde(deserialize_with = "convert_numeric_with_shellexpand")]
     pub max_open_files: usize,
 
-    /// If a file descriptor is idle for this many milliseconds, it will be closed.
-    /// In the event a client or store takes a long time to send or receive data
-    /// the file descriptor will be closed, and since `max_open_files` blocks new
-    /// `open_file` requests until a slot opens up, it will allow new requests to be
-    /// processed. If a read or write is attempted on a closed file descriptor, the
-    /// file will be reopened and the operation will continue.
-    ///
-    /// On services where worker(s) and scheduler(s) live in the same process, this
-    /// also prevents deadlocks if a file->file copy is happening, but cannot open
-    /// a new file descriptor because the limit has been reached.
-    ///
-    /// Default: 1000 (1 second)
-    #[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
-    pub idle_file_descriptor_timeout_millis: u64,
-
     /// This flag can be used to prevent metrics from being collected at runtime.
     /// Metrics are still able to be collected, but this flag prevents metrics that
     /// are collected at runtime (performance metrics) from being tallied. The
diff --git a/nativelink-macro/src/lib.rs b/nativelink-macro/src/lib.rs
index f37175569..98ea64e74 100644
--- a/nativelink-macro/src/lib.rs
+++ b/nativelink-macro/src/lib.rs
@@ -36,12 +36,10 @@ pub fn nativelink_test(attr: TokenStream, item: TokenStream) -> TokenStream {
         async fn #fn_name(#fn_inputs) #fn_output {
             // Error means already initialized, which is ok.
             let _ = nativelink_util::init_tracing();
-            // If already set it's ok.
-            let _ = nativelink_util::fs::set_idle_file_descriptor_timeout(std::time::Duration::from_millis(100));
 
             #[warn(clippy::disallowed_methods)]
             ::std::sync::Arc::new(::nativelink_util::origin_context::OriginContext::new()).wrap_async(
-                ::nativelink_util::__tracing::trace_span!("test"), async move {
+                ::nativelink_util::__tracing::error_span!(stringify!(#fn_name)), async move {
                     #fn_block
                 }
             )
diff --git a/nativelink-store/Cargo.toml b/nativelink-store/Cargo.toml
index a9ee0aeaa..0648a502e 100644
--- a/nativelink-store/Cargo.toml
+++ b/nativelink-store/Cargo.toml
@@ -78,9 +78,6 @@ aws-sdk-s3 = { version = "=1.68.0", features = [
   "rt-tokio",
 ], default-features = false }
 aws-smithy-runtime-api = "1.7.3"
-serial_test = { version = "3.2.0", features = [
-  "async",
-], default-features = false }
 serde_json = "1.0.135"
 fred = { version = "10.0.3", default-features = false, features = ["mocks"] }
 tracing-subscriber = { version = "0.3.19", default-features = false }
diff --git a/nativelink-store/src/fast_slow_store.rs b/nativelink-store/src/fast_slow_store.rs
index 60789e3a5..e2eec4b6f 100644
--- a/nativelink-store/src/fast_slow_store.rs
+++ b/nativelink-store/src/fast_slow_store.rs
@@ -14,6 +14,7 @@
 
 use std::borrow::BorrowMut;
 use std::cmp::{max, min};
+use std::ffi::OsString;
 use std::ops::Range;
 use std::pin::Pin;
 use std::sync::atomic::{AtomicU64, Ordering};
@@ -224,9 +225,10 @@ impl StoreDriver for FastSlowStore {
     async fn update_with_whole_file(
         self: Pin<&Self>,
         key: StoreKey<'_>,
-        mut file: fs::ResumeableFileSlot,
+        path: OsString,
+        mut file: fs::FileSlot,
         upload_size: UploadSizeInfo,
-    ) -> Result<Option<fs::ResumeableFileSlot>, Error> {
+    ) -> Result<Option<fs::FileSlot>, Error> {
         if self
             .fast_store
             .optimized_for(StoreOptimizations::FileUpdates)
@@ -246,7 +248,7 @@ impl StoreDriver for FastSlowStore {
             }
             return self
                 .fast_store
-                .update_with_whole_file(key, file, upload_size)
+                .update_with_whole_file(key, path, file, upload_size)
                 .await;
         }
 
@@ -269,7 +271,7 @@ impl StoreDriver for FastSlowStore {
             }
             return self
                 .slow_store
-                .update_with_whole_file(key, file, upload_size)
+                .update_with_whole_file(key, path, file, upload_size)
                 .await;
         }
 
diff --git a/nativelink-store/src/filesystem_store.rs b/nativelink-store/src/filesystem_store.rs
index b5c7261dc..8e6f0a1d6 100644
--- a/nativelink-store/src/filesystem_store.rs
+++ b/nativelink-store/src/filesystem_store.rs
@@ -18,7 +18,7 @@ use std::fmt::{Debug, Formatter};
 use std::pin::Pin;
 use std::sync::atomic::{AtomicU64, Ordering};
 use std::sync::{Arc, Weak};
-use std::time::{Duration, SystemTime};
+use std::time::SystemTime;
 
 use async_lock::RwLock;
 use async_trait::async_trait;
@@ -39,8 +39,7 @@ use nativelink_util::store_trait::{
     StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
 };
 use nativelink_util::{background_spawn, spawn_blocking};
-use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
-use tokio::time::{sleep, timeout, Sleep};
+use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
 use tokio_stream::wrappers::ReadDirStream;
 use tracing::{event, Level};
 
@@ -168,7 +167,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
     fn make_and_open_file(
         block_size: u64,
         encoded_file_path: EncodedFilePath,
-    ) -> impl Future<Output = Result<(Self, fs::ResumeableFileSlot, OsString), Error>> + Send
+    ) -> impl Future<Output = Result<(Self, fs::FileSlot, OsString), Error>> + Send
     where
         Self: Sized;
 
@@ -186,7 +185,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
         &self,
         offset: u64,
         length: u64,
-    ) -> impl Future<Output = Result<fs::ResumeableFileSlot, Error>> + Send;
+    ) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send;
 
     /// This function is a safe way to extract the file name of the underlying file. To protect users from
     /// accidentally creating undefined behavior we encourage users to do the logic they need to do with
@@ -231,7 +230,7 @@ impl FileEntry for FileEntryImpl {
     async fn make_and_open_file(
         block_size: u64,
         encoded_file_path: EncodedFilePath,
-    ) -> Result<(FileEntryImpl, fs::ResumeableFileSlot, OsString), Error> {
+    ) -> Result<(FileEntryImpl, fs::FileSlot, OsString), Error> {
         let temp_full_path = encoded_file_path.get_file_path().to_os_string();
         let temp_file_result = fs::create_file(temp_full_path.clone())
             .or_else(|mut err| async {
@@ -276,30 +275,19 @@ impl FileEntry for FileEntryImpl {
         &self.encoded_file_path
     }
 
-    async fn read_file_part(
+    fn read_file_part(
         &self,
         offset: u64,
         length: u64,
-    ) -> Result<fs::ResumeableFileSlot, Error> {
-        let (mut file, full_content_path_for_debug_only) = self
-            .get_file_path_locked(|full_content_path| async move {
-                let file = fs::open_file(full_content_path.clone(), length)
-                    .await
-                    .err_tip(|| {
-                        format!("Failed to open file in filesystem store {full_content_path:?}")
-                    })?;
-                Ok((file, full_content_path))
-            })
-            .await?;
-
-        file.as_reader()
-            .await
-            .err_tip(|| "Could not seek file in read_file_part()")?
-            .get_mut()
-            .seek(SeekFrom::Start(offset))
-            .await
-            .err_tip(|| format!("Failed to seek file: {full_content_path_for_debug_only:?}"))?;
-        Ok(file)
+    ) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send {
+        self.get_file_path_locked(move |full_content_path| async move {
+            let file = fs::open_file(&full_content_path, offset, length)
+                .await
+                .err_tip(|| {
+                    format!("Failed to open file in filesystem store {full_content_path:?}")
+                })?;
+            Ok(file)
+        })
     }
 
     async fn get_file_path_locked<
@@ -524,6 +512,7 @@ async fn add_files_to_cache<Fe: FileEntry>(
                         );
                     }
                 };
+
                 Result::<(String, SystemTime, u64, bool), Error>::Ok((
                     file_name,
                     atime,
@@ -668,19 +657,16 @@ pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
     #[metric(help = "Size of the configured read buffer size")]
     read_buffer_size: usize,
     weak_self: Weak<Self>,
-    sleep_fn: fn(Duration) -> Sleep,
     rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
 }
 
 impl<Fe: FileEntry> FilesystemStore<Fe> {
     pub async fn new(spec: &FilesystemSpec) -> Result<Arc<Self>, Error> {
-        Self::new_with_timeout_and_rename_fn(spec, sleep, |from, to| std::fs::rename(from, to))
-            .await
+        Self::new_with_timeout_and_rename_fn(spec, |from, to| std::fs::rename(from, to)).await
     }
 
     pub async fn new_with_timeout_and_rename_fn(
         spec: &FilesystemSpec,
-        sleep_fn: fn(Duration) -> Sleep,
         rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
     ) -> Result<Arc<Self>, Error> {
         async fn create_subdirs(path: &str) -> Result<(), Error> {
@@ -735,7 +721,6 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
             block_size,
             read_buffer_size,
             weak_self: weak_self.clone(),
-            sleep_fn,
             rename_fn,
         }))
     }
@@ -754,50 +739,34 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
     async fn update_file<'a>(
         self: Pin<&'a Self>,
         mut entry: Fe,
-        mut resumeable_temp_file: fs::ResumeableFileSlot,
+        mut temp_file: fs::FileSlot,
         final_key: StoreKey<'static>,
         mut reader: DropCloserReadHalf,
     ) -> Result<(), Error> {
         let mut data_size = 0;
         loop {
-            let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).await
-            else {
-                // In the event we timeout, we want to close the writing file, to prevent
-                // the file descriptor left open for long periods of time.
-                // This is needed because we wrap `fs` so only a fixed number of file
-                // descriptors may be open at any given time. If we are streaming from
-                // File -> File, it can cause a deadlock if the Write file is not sending
-                // data because it is waiting for a file descriotor to open before sending data.
-                resumeable_temp_file.close_file().await.err_tip(|| {
-                    "Could not close file due to timeout in FileSystemStore::update_file"
-                })?;
-                continue;
-            };
-            let mut data = data_result.err_tip(|| "Failed to receive data in filesystem store")?;
+            let mut data = reader
+                .recv()
+                .await
+                .err_tip(|| "Failed to receive data in filesystem store")?;
             let data_len = data.len();
             if data_len == 0 {
                 break; // EOF.
             }
-            resumeable_temp_file
-                .as_writer()
-                .await
-                .err_tip(|| "in filesystem_store::update_file")?
+            temp_file
                 .write_all_buf(&mut data)
                 .await
                 .err_tip(|| "Failed to write data into filesystem store")?;
             data_size += data_len as u64;
         }
 
-        resumeable_temp_file
-            .as_writer()
-            .await
-            .err_tip(|| "in filesystem_store::update_file")?
+        temp_file
             .as_ref()
             .sync_all()
             .await
             .err_tip(|| "Failed to sync_data in filesystem store")?;
 
-        drop(resumeable_temp_file);
+        drop(temp_file);
 
         *entry.data_size_mut() = data_size;
         self.emplace_file(final_key, Arc::new(entry)).await
@@ -942,19 +911,13 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
     async fn update_with_whole_file(
         self: Pin<&Self>,
         key: StoreKey<'_>,
-        mut file: fs::ResumeableFileSlot,
+        path: OsString,
+        file: fs::FileSlot,
         upload_size: UploadSizeInfo,
-    ) -> Result<Option<fs::ResumeableFileSlot>, Error> {
-        let path = file.get_path().as_os_str().to_os_string();
+    ) -> Result<Option<fs::FileSlot>, Error> {
         let file_size = match upload_size {
             UploadSizeInfo::ExactSize(size) => size,
             UploadSizeInfo::MaxSize(_) => file
-                .as_reader()
-                .await
-                .err_tip(|| {
-                    format!("While getting metadata for {path:?} in update_with_whole_file")
-                })?
-                .get_ref()
                 .as_ref()
                 .metadata()
                 .await
@@ -995,7 +958,6 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
                 .err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;
             return Ok(());
         }
-
         let entry = self.evicting_map.get(&key).await.ok_or_else(|| {
             make_err!(
                 Code::NotFound,
@@ -1004,47 +966,21 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
             )
         })?;
         let read_limit = length.unwrap_or(u64::MAX);
-        let mut resumeable_temp_file = entry.read_file_part(offset, read_limit).await?;
+        let mut temp_file = entry.read_file_part(offset, read_limit).await?;
 
         loop {
             let mut buf = BytesMut::with_capacity(self.read_buffer_size);
-            resumeable_temp_file
-                .as_reader()
-                .await
-                .err_tip(|| "In FileSystemStore::get_part()")?
+            temp_file
                 .read_buf(&mut buf)
                 .await
                 .err_tip(|| "Failed to read data in filesystem store")?;
             if buf.is_empty() {
                 break; // EOF.
             }
-            // In the event it takes a while to send the data to the client, we want to close the
-            // reading file, to prevent the file descriptor left open for long periods of time.
-            // Failing to do so might cause deadlocks if the receiver is unable to receive data
-            // because it is waiting for a file descriptor to open before receiving data.
-            // Using `ResumeableFileSlot` will re-open the file in the event it gets closed on the
-            // next iteration.
-            let buf_content = buf.freeze();
-            loop {
-                let sleep_fn = (self.sleep_fn)(fs::idle_file_descriptor_timeout());
-                tokio::pin!(sleep_fn);
-                tokio::select! {
-                    () = & mut (sleep_fn) => {
-                        resumeable_temp_file
-                            .close_file()
-                            .await
-                            .err_tip(|| "Could not close file due to timeout in FileSystemStore::get_part")?;
-                    }
-                    res = writer.send(buf_content.clone()) => {
-                        match res {
-                            Ok(()) => break,
-                            Err(err) => {
-                                return Err(err).err_tip(|| "Failed to send chunk in filesystem store get_part");
-                            }
-                        }
-                    }
-                }
-            }
+            writer
+                .send(buf.freeze())
+                .await
+                .err_tip(|| "Failed to send chunk in filesystem store get_part")?;
         }
         writer
             .send_eof()
diff --git a/nativelink-store/tests/ac_utils_test.rs b/nativelink-store/tests/ac_utils_test.rs
index 131e2688d..09b23ba65 100644
--- a/nativelink-store/tests/ac_utils_test.rs
+++ b/nativelink-store/tests/ac_utils_test.rs
@@ -40,7 +40,7 @@ async fn make_temp_path(data: &str) -> OsString {
 const HASH1: &str = "0123456789abcdef000000000000000000000000000000000123456789abcdef";
 const HASH1_SIZE: i64 = 147;
 
-// Regression test for bug created when implementing ResumeableFileSlot
+// Regression test for bug created when implementing FileSlot
 // where the timeout() success condition was breaking out of the outer
 // loop resulting in the file always being created with <= 4096 bytes.
 #[nativelink_test]
@@ -62,11 +62,15 @@ async fn upload_file_to_store_with_large_file() -> Result<(), Error> {
     }
     {
         // Upload our file.
-        let resumeable_file = fs::open_file(filepath, u64::MAX).await?;
+        let file = fs::open_file(&filepath, 0, u64::MAX)
+            .await
+            .unwrap()
+            .into_inner();
         store
             .update_with_whole_file(
                 digest,
-                resumeable_file,
+                filepath,
+                file,
                 UploadSizeInfo::ExactSize(expected_data.len() as u64),
             )
             .await?;
diff --git a/nativelink-store/tests/filesystem_store_test.rs b/nativelink-store/tests/filesystem_store_test.rs
index f02b32e82..3f5864744 100644
--- a/nativelink-store/tests/filesystem_store_test.rs
+++ b/nativelink-store/tests/filesystem_store_test.rs
@@ -27,10 +27,9 @@ use filetime::{set_file_atime, FileTime};
 use futures::executor::block_on;
 use futures::task::Poll;
 use futures::{poll, Future, FutureExt};
-use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec};
+use nativelink_config::stores::FilesystemSpec;
 use nativelink_error::{make_err, Code, Error, ResultExt};
 use nativelink_macro::nativelink_test;
-use nativelink_store::fast_slow_store::FastSlowStore;
 use nativelink_store::filesystem_store::{
     key_from_file, EncodedFilePath, FileEntry, FileEntryImpl, FileType, FilesystemStore,
     DIGEST_FOLDER, STR_FOLDER,
@@ -44,9 +43,8 @@ use nativelink_util::{background_spawn, spawn};
 use parking_lot::Mutex;
 use pretty_assertions::assert_eq;
 use rand::{thread_rng, Rng};
-use serial_test::serial;
 use sha2::{Digest, Sha256};
-use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
+use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Take};
 use tokio::sync::Barrier;
 use tokio::time::sleep;
 use tokio_stream::wrappers::ReadDirStream;
@@ -86,7 +84,7 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> FileEntry for TestFileEntry<
     async fn make_and_open_file(
         block_size: u64,
         encoded_file_path: EncodedFilePath,
-    ) -> Result<(Self, fs::ResumeableFileSlot, OsString), Error> {
+    ) -> Result<(Self, fs::FileSlot, OsString), Error> {
         let (inner, file_slot, path) =
             FileEntryImpl::make_and_open_file(block_size, encoded_file_path).await?;
         Ok((
@@ -111,11 +109,7 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> FileEntry for TestFileEntry<
         self.inner.as_ref().unwrap().get_encoded_file_path()
     }
 
-    async fn read_file_part(
-        &self,
-        offset: u64,
-        length: u64,
-    ) -> Result<fs::ResumeableFileSlot, Error> {
+    async fn read_file_part(&self, offset: u64, length: u64) -> Result<Take<fs::FileSlot>, Error> {
         self.inner
             .as_ref()
             .unwrap()
@@ -204,13 +198,11 @@ fn make_temp_path(data: &str) -> String {
 }
 
 async fn read_file_contents(file_name: &OsStr) -> Result<Vec<u8>, Error> {
-    let mut file = fs::open_file(file_name, u64::MAX)
+    let mut file = fs::open_file(file_name, 0, u64::MAX)
         .await
         .err_tip(|| format!("Failed to open file: {file_name:?}"))?;
     let mut data = vec![];
-    file.as_reader()
-        .await?
-        .read_to_end(&mut data)
+    file.read_to_end(&mut data)
         .await
         .err_tip(|| "Error reading file to end")?;
     Ok(data)
@@ -265,7 +257,6 @@ const VALUE1: &str = "0123456789";
 const VALUE2: &str = "9876543210";
 const STRING_NAME: &str = "String_Filename";
 
-#[serial]
 #[nativelink_test]
 async fn valid_results_after_shutdown_test() -> Result<(), Error> {
     let digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
@@ -282,7 +273,6 @@ async fn valid_results_after_shutdown_test() -> Result<(), Error> {
             })
             .await?,
         );
-
         // Insert dummy value into store.
         store.update_oneshot(digest, VALUE1.into()).await?;
 
@@ -314,7 +304,6 @@ async fn valid_results_after_shutdown_test() -> Result<(), Error> {
     Ok(())
 }
 
-#[serial]
 #[nativelink_test]
 async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> {
     static DELETES_FINISHED: AtomicU32 = AtomicU32::new(0);
@@ -381,7 +370,6 @@ async fn temp_files_get_deleted_on_replace_test() -> Result<(), Error> {
 // This test ensures that if a file is overridden and an open stream to the file already
 // exists, the open stream will continue to work properly and when the stream is done the
 // temporary file (of the object that was deleted) is cleaned up.
-#[serial]
 #[nativelink_test]
 async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error> {
     static DELETES_FINISHED: AtomicU32 = AtomicU32::new(0);
@@ -418,7 +406,7 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error>
     let digest1_clone = digest1;
     background_spawn!(
         "file_continues_to_stream_on_content_replace_test_store_get",
-        async move { store_clone.get(digest1_clone, writer).await },
+        async move { store_clone.get(digest1_clone, writer).await.unwrap() },
     );
 
     {
@@ -489,7 +477,6 @@ async fn file_continues_to_stream_on_content_replace_test() -> Result<(), Error>
 // Eviction has a different code path than a file replacement, so we check that if a
 // file is evicted and has an open stream on it, it will stay alive and eventually
 // get deleted.
-#[serial]
 #[nativelink_test]
 async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
     static DELETES_FINISHED: AtomicU32 = AtomicU32::new(0);
@@ -520,14 +507,14 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
     );
 
     // Insert data into store.
-    store.update_oneshot(digest1, VALUE1.into()).await?;
+    store.update_oneshot(digest1, VALUE1.into()).await.unwrap();
 
     let mut reader = {
         let (writer, reader) = make_buf_channel_pair();
         let store_clone = store.clone();
         background_spawn!(
             "file_gets_cleans_up_on_cache_eviction_store_get",
-            async move { store_clone.get(digest1, writer).await },
+            async move { store_clone.get(digest1, writer).await.unwrap() },
         );
         reader
     };
@@ -583,7 +570,6 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
     check_temp_empty(&temp_path).await
 }
 
-#[serial]
 #[nativelink_test]
 async fn atime_updates_on_get_part_test() -> Result<(), Error> {
     let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?;
@@ -630,7 +616,6 @@ async fn atime_updates_on_get_part_test() -> Result<(), Error> {
     Ok(())
 }
 
-#[serial]
 #[nativelink_test]
 async fn eviction_drops_file_test() -> Result<(), Error> {
     let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?;
@@ -680,7 +665,6 @@ async fn eviction_drops_file_test() -> Result<(), Error> {
 // Test to ensure that if we are holding a reference to `FileEntry` and the contents are
 // replaced, the `FileEntry` continues to use the old data.
 // `FileEntry` file contents should be immutable for the lifetime of the object.
-#[serial]
 #[nativelink_test]
 async fn digest_contents_replaced_continues_using_old_data() -> Result<(), Error> {
     let digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
@@ -701,11 +685,7 @@ async fn digest_contents_replaced_continues_using_old_data() -> Result<(), Error
         // The file contents should equal our initial data.
         let mut reader = file_entry.read_file_part(0, u64::MAX).await?;
         let mut file_contents = String::new();
-        reader
-            .as_reader()
-            .await?
-            .read_to_string(&mut file_contents)
-            .await?;
+        reader.read_to_string(&mut file_contents).await?;
         assert_eq!(file_contents, VALUE1);
     }
 
@@ -716,18 +696,13 @@ async fn digest_contents_replaced_continues_using_old_data() -> Result<(), Error
         // The file contents still equal our old data.
         let mut reader = file_entry.read_file_part(0, u64::MAX).await?;
         let mut file_contents = String::new();
-        reader
-            .as_reader()
-            .await?
-            .read_to_string(&mut file_contents)
-            .await?;
+        reader.read_to_string(&mut file_contents).await?;
         assert_eq!(file_contents, VALUE1);
     }
 
     Ok(())
 }
 
-#[serial]
 #[nativelink_test]
 async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> {
     const SMALL_VALUE: &str = "01";
@@ -788,7 +763,6 @@ async fn eviction_on_insert_calls_unref_once() -> Result<(), Error> {
     Ok(())
 }
 
-#[serial]
 #[nativelink_test]
 #[allow(clippy::await_holding_refcell_ref)]
 async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens() -> Result<(), Error>
@@ -814,18 +788,11 @@ async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens()
                 let dir_entry = dir_entry?;
                 {
                     // Some filesystems won't sync automatically, so force it.
-                    let mut file_handle =
-                        fs::open_file(dir_entry.path().into_os_string(), u64::MAX)
-                            .await
-                            .err_tip(|| "Failed to open temp file")?;
+                    let file_handle = fs::open_file(dir_entry.path().into_os_string(), 0, u64::MAX)
+                        .await
+                        .err_tip(|| "Failed to open temp file")?;
                     // We don't care if it fails, this is only best attempt.
-                    let _ = file_handle
-                        .as_reader()
-                        .await?
-                        .get_ref()
-                        .as_ref()
-                        .sync_all()
-                        .await;
+                    let _ = file_handle.get_ref().as_ref().sync_all().await;
                 }
                 // Ensure we have written to the file too. This ensures we have an open file handle.
                 // Failing to do this may result in the file existing, but the `update_fut` not actually
@@ -924,7 +891,6 @@ async fn rename_on_insert_fails_due_to_filesystem_error_proper_cleanup_happens()
     Ok(())
 }
 
-#[serial]
 #[nativelink_test]
 async fn get_part_timeout_test() -> Result<(), Error> {
     let large_value = "x".repeat(1024);
@@ -940,7 +906,6 @@ async fn get_part_timeout_test() -> Result<(), Error> {
                 read_buffer_size: 1,
                 ..Default::default()
             },
-            |_| sleep(Duration::ZERO),
             |from, to| std::fs::rename(from, to),
         )
         .await?,
@@ -972,7 +937,6 @@ async fn get_part_timeout_test() -> Result<(), Error> {
     Ok(())
 }
 
-#[serial]
 #[nativelink_test]
 async fn get_part_is_zero_digest() -> Result<(), Error> {
     let digest = DigestInfo::new(Sha256::new().finalize().into(), 0);
@@ -987,7 +951,6 @@ async fn get_part_is_zero_digest() -> Result<(), Error> {
                 read_buffer_size: 1,
                 ..Default::default()
             },
-            |_| sleep(Duration::ZERO),
             |from, to| std::fs::rename(from, to),
         )
         .await?,
@@ -1014,7 +977,6 @@ async fn get_part_is_zero_digest() -> Result<(), Error> {
     Ok(())
 }
 
-#[serial]
 #[nativelink_test]
 async fn has_with_results_on_zero_digests() -> Result<(), Error> {
     async fn wait_for_empty_content_file<
@@ -1055,7 +1017,6 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> {
                 read_buffer_size: 1,
                 ..Default::default()
             },
-            |_| sleep(Duration::ZERO),
             |from, to| std::fs::rename(from, to),
         )
         .await?,
@@ -1079,7 +1040,6 @@ async fn has_with_results_on_zero_digests() -> Result<(), Error> {
 }
 
 /// Regression test for: https://github.com/TraceMachina/nativelink/issues/495.
-#[serial]
 #[nativelink_test(flavor = "multi_thread")]
 async fn update_file_future_drops_before_rename() -> Result<(), Error> {
     // Mutex can be used to signal to the rename function to pause execution.
@@ -1098,7 +1058,6 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> {
                 eviction_policy: None,
                 ..Default::default()
             },
-            |_| sleep(Duration::ZERO),
             |from, to| {
                 // If someone locked our mutex, it means we need to pause, so we
                 // simply request a lock on the same mutex.
@@ -1167,7 +1126,6 @@ async fn update_file_future_drops_before_rename() -> Result<(), Error> {
     Ok(())
 }
 
-#[serial]
 #[nativelink_test]
 async fn deleted_file_removed_from_store() -> Result<(), Error> {
     let digest = DigestInfo::try_new(HASH1, VALUE1.len())?;
@@ -1182,7 +1140,6 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> {
                 read_buffer_size: 1,
                 ..Default::default()
             },
-            |_| sleep(Duration::ZERO),
             |from, to| std::fs::rename(from, to),
         )
         .await?,
@@ -1205,7 +1162,8 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> {
 
     store
         .update_oneshot(string_key.borrow(), VALUE2.into())
-        .await?;
+        .await
+        .unwrap();
 
     let stored_file_path = OsString::from(format!("{content_path}/{STR_FOLDER}/{STRING_NAME}"));
     std::fs::remove_file(stored_file_path)?;
@@ -1224,7 +1182,6 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> {
 // assume block size 4K
 // 1B data size = 4K size on disk
 // 5K data size = 8K size on disk
-#[serial]
 #[nativelink_test]
 async fn get_file_size_uses_block_size() -> Result<(), Error> {
     let content_path = make_temp_path("content_path");
@@ -1244,7 +1201,6 @@ async fn get_file_size_uses_block_size() -> Result<(), Error> {
                 read_buffer_size: 1,
                 ..Default::default()
             },
-            |_| sleep(Duration::ZERO),
             |from, to| std::fs::rename(from, to),
         )
         .await?,
@@ -1260,7 +1216,6 @@ async fn get_file_size_uses_block_size() -> Result<(), Error> {
     Ok(())
 }
 
-#[serial]
 #[nativelink_test]
 async fn update_with_whole_file_closes_file() -> Result<(), Error> {
     let mut permits = vec![];
@@ -1294,87 +1249,27 @@ async fn update_with_whole_file_closes_file() -> Result<(), Error> {
     );
     store.update_oneshot(digest, value.clone().into()).await?;
 
-    let mut file = fs::create_file(OsString::from(format!("{temp_path}/dummy_file"))).await?;
-    {
-        let writer = file.as_writer().await?;
-        writer.write_all(value.as_bytes()).await?;
-        writer.as_mut().sync_all().await?;
-        writer.seek(tokio::io::SeekFrom::Start(0)).await?;
-    }
-
-    store
-        .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len() as u64))
-        .await?;
-    Ok(())
-}
-
-#[serial]
-#[nativelink_test]
-async fn update_with_whole_file_slow_path_when_low_file_descriptors() -> Result<(), Error> {
-    let mut permits = vec![];
-    // Grab all permits to ensure only 1 permit is available.
-    {
-        wait_for_no_open_files().await?;
-        while fs::OPEN_FILE_SEMAPHORE.available_permits() > 1 {
-            permits.push(fs::get_permit().await);
-        }
-        assert_eq!(
-            fs::OPEN_FILE_SEMAPHORE.available_permits(),
-            1,
-            "Expected 1 permit to be available"
-        );
-    }
-
-    let value = "x".repeat(1024);
-
-    let digest = DigestInfo::try_new(HASH1, value.len())?;
-
-    let store = FastSlowStore::new(
-        // Note: The config is not needed for this test, so use dummy data.
-        &FastSlowSpec {
-            fast: StoreSpec::memory(MemorySpec::default()),
-            slow: StoreSpec::memory(MemorySpec::default()),
-        },
-        Store::new(
-            FilesystemStore::<FileEntryImpl>::new(&FilesystemSpec {
-                content_path: make_temp_path("content_path"),
-                temp_path: make_temp_path("temp_path"),
-                read_buffer_size: 1,
-                ..Default::default()
-            })
-            .await?,
-        ),
-        Store::new(
-            FilesystemStore::<FileEntryImpl>::new(&FilesystemSpec {
-                content_path: make_temp_path("content_path1"),
-                temp_path: make_temp_path("temp_path1"),
-                read_buffer_size: 1,
-                ..Default::default()
-            })
-            .await?,
-        ),
-    );
-    store.update_oneshot(digest, value.clone().into()).await?;
-
-    let temp_path = make_temp_path("temp_path2");
-    fs::create_dir_all(&temp_path).await?;
-    let mut file = fs::create_file(OsString::from(format!("{temp_path}/dummy_file"))).await?;
+    let file_path = OsString::from(format!("{temp_path}/dummy_file"));
+    let mut file = fs::create_file(&file_path).await?;
     {
-        let writer = file.as_writer().await?;
-        writer.write_all(value.as_bytes()).await?;
-        writer.as_mut().sync_all().await?;
-        writer.seek(tokio::io::SeekFrom::Start(0)).await?;
+        file.write_all(value.as_bytes()).await?;
+        file.as_mut().sync_all().await?;
+        file.seek(tokio::io::SeekFrom::Start(0)).await?;
     }
 
     store
-        .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len() as u64))
+        .update_with_whole_file(
+            digest,
+            file_path,
+            file,
+            UploadSizeInfo::ExactSize(value.len() as u64),
+        )
         .await?;
     Ok(())
 }
 
 // Ensure that update_with_whole_file() moves the file without making a copy.
 #[cfg(target_family = "unix")]
-#[serial]
 #[nativelink_test]
 async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> {
     use std::os::unix::fs::MetadataExt;
@@ -1393,36 +1288,35 @@ async fn update_with_whole_file_uses_same_inode() -> Result<(), Error> {
                 read_buffer_size: 1,
                 ..Default::default()
             },
-            |_| sleep(Duration::ZERO),
             |from, to| std::fs::rename(from, to),
         )
         .await?,
     );
 
-    let mut file = fs::create_file(OsString::from(format!("{temp_path}/dummy_file"))).await?;
-    let original_inode = file
-        .as_reader()
-        .await?
-        .get_ref()
-        .as_ref()
-        .metadata()
-        .await?
-        .ino();
+    let file_path = OsString::from(format!("{temp_path}/dummy_file"));
+    let original_inode = {
+        let file = fs::create_file(&file_path).await?;
+        let original_inode = file.as_ref().metadata().await?.ino();
 
-    let result = store
-        .update_with_whole_file(digest, file, UploadSizeInfo::ExactSize(value.len() as u64))
-        .await?;
-    assert!(
-        result.is_none(),
-        "Expected filesystem store to consume the file"
-    );
+        let result = store
+            .update_with_whole_file(
+                digest,
+                file_path,
+                file,
+                UploadSizeInfo::ExactSize(value.len() as u64),
+            )
+            .await?;
+        assert!(
+            result.is_none(),
+            "Expected filesystem store to consume the file"
+        );
+        original_inode
+    };
 
     let expected_file_name = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}"));
     let new_inode = fs::create_file(expected_file_name)
-        .await?
-        .as_reader()
-        .await?
-        .get_ref()
+        .await
+        .unwrap()
         .as_ref()
         .metadata()
         .await?
diff --git a/nativelink-util/BUILD.bazel b/nativelink-util/BUILD.bazel
index 2029ce230..d8e40c4e5 100644
--- a/nativelink-util/BUILD.bazel
+++ b/nativelink-util/BUILD.bazel
@@ -66,6 +66,7 @@ rust_library(
         "@crates//:prost",
         "@crates//:prost-types",
         "@crates//:rand",
+        "@crates//:rlimit",
         "@crates//:serde",
         "@crates//:sha2",
         "@crates//:tokio",
@@ -87,7 +88,6 @@ rust_test_suite(
         "tests/common_test.rs",
         "tests/evicting_map_test.rs",
         "tests/fastcdc_test.rs",
-        "tests/fs_test.rs",
         "tests/health_utils_test.rs",
         "tests/operation_id_tests.rs",
         "tests/origin_event_test.rs",
diff --git a/nativelink-util/Cargo.toml b/nativelink-util/Cargo.toml
index 704632614..953f1004e 100644
--- a/nativelink-util/Cargo.toml
+++ b/nativelink-util/Cargo.toml
@@ -33,6 +33,7 @@ pin-project-lite = "0.2.16"
 prost = { version = "0.13.4", default-features = false }
 prost-types = { version = "0.13.4", default-features = false }
 rand = { version = "0.8.5", default-features = false }
+rlimit = { version = "0.10.2", default-features = false }
 serde = { version = "1.0.217", default-features = false }
 sha2 = { version = "0.10.8", default-features = false }
 tokio = { version = "1.43.0", features = ["fs", "rt-multi-thread", "signal", "io-util"], default-features = false }
diff --git a/nativelink-util/src/buf_channel.rs b/nativelink-util/src/buf_channel.rs
index 178acc8fd..69f1cb6d7 100644
--- a/nativelink-util/src/buf_channel.rs
+++ b/nativelink-util/src/buf_channel.rs
@@ -372,7 +372,7 @@ impl DropCloserReadHalf {
             let mut chunk = self
                 .recv()
                 .await
-                .err_tip(|| "During first read of buf_channel::take()")?;
+                .err_tip(|| "During next read of buf_channel::take()")?;
             if chunk.is_empty() {
                 break; // EOF.
             }
diff --git a/nativelink-util/src/digest_hasher.rs b/nativelink-util/src/digest_hasher.rs
index 280311f7b..85882daec 100644
--- a/nativelink-util/src/digest_hasher.rs
+++ b/nativelink-util/src/digest_hasher.rs
@@ -25,7 +25,7 @@ use nativelink_metric::{
 use nativelink_proto::build::bazel::remote::execution::v2::digest_function::Value as ProtoDigestFunction;
 use serde::{Deserialize, Serialize};
 use sha2::{Digest, Sha256};
-use tokio::io::{AsyncRead, AsyncReadExt};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt};
 
 use crate::common::DigestInfo;
 use crate::origin_context::{ActiveOriginContext, OriginContext};
@@ -187,9 +187,10 @@ pub trait DigestHasher {
     /// the file and feed it into the hasher.
     fn digest_for_file(
         self,
-        file: fs::ResumeableFileSlot,
+        file_path: impl AsRef<std::path::Path>,
+        file: fs::FileSlot,
         size_hint: Option<u64>,
-    ) -> impl Future<Output = Result<(DigestInfo, fs::ResumeableFileSlot), Error>>;
+    ) -> impl Future<Output = Result<(DigestInfo, fs::FileSlot), Error>>;
 
     /// Utility function to compute a hash from a generic reader.
     fn compute_from_reader<R: AsyncRead + Unpin + Send>(
@@ -229,11 +230,10 @@ impl DigestHasherImpl {
     #[inline]
     async fn hash_file(
         &mut self,
-        mut file: fs::ResumeableFileSlot,
-    ) -> Result<(DigestInfo, fs::ResumeableFileSlot), Error> {
-        let reader = file.as_reader().await.err_tip(|| "In digest_for_file")?;
+        mut file: fs::FileSlot,
+    ) -> Result<(DigestInfo, fs::FileSlot), Error> {
         let digest = self
-            .compute_from_reader(reader)
+            .compute_from_reader(&mut file)
             .await
             .err_tip(|| "In digest_for_file")?;
         Ok((digest, file))
@@ -263,9 +263,10 @@ impl DigestHasher for DigestHasherImpl {
 
     async fn digest_for_file(
         mut self,
-        mut file: fs::ResumeableFileSlot,
+        file_path: impl AsRef<std::path::Path>,
+        mut file: fs::FileSlot,
         size_hint: Option<u64>,
-    ) -> Result<(DigestInfo, fs::ResumeableFileSlot), Error> {
+    ) -> Result<(DigestInfo, fs::FileSlot), Error> {
         let file_position = file
             .stream_position()
             .await
@@ -280,11 +281,12 @@ impl DigestHasher for DigestHasherImpl {
                 return self.hash_file(file).await;
             }
         }
+        let file_path = file_path.as_ref().to_path_buf();
         match self.hash_func_impl {
             DigestHasherFuncImpl::Sha256(_) => self.hash_file(file).await,
             DigestHasherFuncImpl::Blake3(mut hasher) => {
                 spawn_blocking!("digest_for_file", move || {
-                    hasher.update_mmap(file.get_path()).map_err(|e| {
+                    hasher.update_mmap(file_path).map_err(|e| {
                         make_err!(Code::Internal, "Error in blake3's update_mmap: {e:?}")
                     })?;
                     Result::<_, Error>::Ok((
diff --git a/nativelink-util/src/fs.rs b/nativelink-util/src/fs.rs
index e28fbcc74..d5765b2e5 100644
--- a/nativelink-util/src/fs.rs
+++ b/nativelink-util/src/fs.rs
@@ -13,26 +13,20 @@
 // limitations under the License.
 
 use std::fs::Metadata;
-use std::io::IoSlice;
+use std::io::{IoSlice, Seek};
 use std::path::{Path, PathBuf};
 use std::pin::Pin;
 use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::OnceLock;
 use std::task::{Context, Poll};
-use std::time::Duration;
 
-use bytes::BytesMut;
-use futures::Future;
 use nativelink_error::{make_err, Code, Error, ResultExt};
+use rlimit::increase_nofile_limit;
 /// We wrap all `tokio::fs` items in our own wrapper so we can limit the number of outstanding
 /// open files at any given time. This will greatly reduce the chance we'll hit open file limit
 /// issues.
 pub use tokio::fs::DirEntry;
-use tokio::io::{
-    AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite, ReadBuf, SeekFrom, Take,
-};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncWrite, ReadBuf, SeekFrom, Take};
 use tokio::sync::{Semaphore, SemaphorePermit};
-use tokio::time::timeout;
 use tracing::{event, Level};
 
 use crate::spawn_blocking;
@@ -40,171 +34,6 @@ use crate::spawn_blocking;
 /// Default read buffer size when reading to/from disk.
 pub const DEFAULT_READ_BUFF_SIZE: usize = 16384;
 
-type StreamPosition = u64;
-type BytesRemaining = u64;
-
-#[derive(Debug)]
-enum MaybeFileSlot {
-    Open(Take<FileSlot>),
-    Closed((StreamPosition, BytesRemaining)),
-}
-
-/// A wrapper around a generic `FileSlot`. This gives us the ability to
-/// close a file and then resume it later. Specifically useful for cases
-/// piping data from one location to another and one side is slow at
-/// reading or writing the data, we can have a timeout, close the file
-/// and then reopen it later.
-///
-/// Note: This wraps both files opened for read and write, so we always
-/// need to know how the original file was opened and the location of
-/// the file. To simplify the code significantly we always require the
-/// file to be a `Take<FileSlot>`.
-#[derive(Debug)]
-pub struct ResumeableFileSlot {
-    maybe_file_slot: MaybeFileSlot,
-    path: PathBuf,
-    is_write: bool,
-}
-
-impl ResumeableFileSlot {
-    pub fn new(file: FileSlot, path: PathBuf, is_write: bool) -> Self {
-        Self {
-            maybe_file_slot: MaybeFileSlot::Open(file.take(u64::MAX)),
-            path,
-            is_write,
-        }
-    }
-
-    pub fn new_with_take(file: Take<FileSlot>, path: PathBuf, is_write: bool) -> Self {
-        Self {
-            maybe_file_slot: MaybeFileSlot::Open(file),
-            path,
-            is_write,
-        }
-    }
-
-    /// Returns the path of the file.
-    pub fn get_path(&self) -> &Path {
-        Path::new(&self.path)
-    }
-
-    /// Returns the current read position of a file.
-    pub async fn stream_position(&mut self) -> Result<u64, Error> {
-        let file_slot = match &mut self.maybe_file_slot {
-            MaybeFileSlot::Open(file_slot) => file_slot,
-            MaybeFileSlot::Closed((pos, _)) => return Ok(*pos),
-        };
-        file_slot
-            .get_mut()
-            .inner
-            .stream_position()
-            .await
-            .err_tip(|| "Failed to get file position in digest_for_file")
-    }
-
-    pub async fn close_file(&mut self) -> Result<(), Error> {
-        let MaybeFileSlot::Open(file_slot) = &mut self.maybe_file_slot else {
-            return Ok(());
-        };
-        let position = file_slot
-            .get_mut()
-            .inner
-            .stream_position()
-            .await
-            .err_tip(|| format!("Failed to get file position {:?}", self.path))?;
-        self.maybe_file_slot = MaybeFileSlot::Closed((position, file_slot.limit()));
-        Ok(())
-    }
-
-    #[inline]
-    pub async fn as_reader(&mut self) -> Result<&mut Take<FileSlot>, Error> {
-        let (stream_position, bytes_remaining) = match self.maybe_file_slot {
-            MaybeFileSlot::Open(ref mut file_slot) => return Ok(file_slot),
-            MaybeFileSlot::Closed(pos) => pos,
-        };
-        let permit = OPEN_FILE_SEMAPHORE
-            .acquire()
-            .await
-            .map_err(|e| make_err!(Code::Internal, "Open file semaphore closed {:?}", e))?;
-        let inner = tokio::fs::OpenOptions::new()
-            .write(self.is_write)
-            .read(!self.is_write)
-            .open(&self.path)
-            .await
-            .err_tip(|| format!("Could not open after resume {:?}", self.path))?;
-        let mut file_slot = FileSlot {
-            _permit: permit,
-            inner,
-        };
-        file_slot
-            .inner
-            .seek(SeekFrom::Start(stream_position))
-            .await
-            .err_tip(|| {
-                format!(
-                    "Failed to seek to position {stream_position} {:?}",
-                    self.path
-                )
-            })?;
-
-        self.maybe_file_slot = MaybeFileSlot::Open(file_slot.take(bytes_remaining));
-        match &mut self.maybe_file_slot {
-            MaybeFileSlot::Open(file_slot) => Ok(file_slot),
-            MaybeFileSlot::Closed(_) => unreachable!(),
-        }
-    }
-
-    #[inline]
-    pub async fn as_writer(&mut self) -> Result<&mut FileSlot, Error> {
-        Ok(self.as_reader().await?.get_mut())
-    }
-
-    /// Utility function to read data from a handler and handles file descriptor
-    /// timeouts. Chunk size is based on the `buf`'s capacity.
-    /// Note: If the `handler` changes `buf`s capacity, it is responsible for reserving
-    /// more before returning.
-    pub async fn read_buf_cb<'b, T, F, Fut>(
-        &'b mut self,
-        (mut buf, mut state): (BytesMut, T),
-        mut handler: F,
-    ) -> Result<(BytesMut, T), Error>
-    where
-        F: (FnMut((BytesMut, T)) -> Fut) + 'b,
-        Fut: Future<Output = Result<(BytesMut, T), Error>> + 'b,
-    {
-        loop {
-            buf.clear();
-            self.as_reader()
-                .await
-                .err_tip(|| "Could not get reader from file slot in read_buf_cb")?
-                .read_buf(&mut buf)
-                .await
-                .err_tip(|| "Could not read chunk during read_buf_cb")?;
-            if buf.is_empty() {
-                return Ok((buf, state));
-            }
-            let handler_fut = handler((buf, state));
-            tokio::pin!(handler_fut);
-            loop {
-                match timeout(idle_file_descriptor_timeout(), &mut handler_fut).await {
-                    Ok(Ok(output)) => {
-                        (buf, state) = output;
-                        break;
-                    }
-                    Ok(Err(err)) => {
-                        return Err(err).err_tip(|| "read_buf_cb's handler returned an error")
-                    }
-                    Err(_) => {
-                        self.close_file()
-                            .await
-                            .err_tip(|| "Could not close file due to timeout in read_buf_cb")?;
-                    }
-                }
-            }
-        }
-    }
-}
-
 #[derive(Debug)]
 pub struct FileSlot {
     // We hold the permit because once it is dropped it goes back into the queue.
@@ -283,7 +112,9 @@ impl AsyncWrite for FileSlot {
     }
 }
 
-const DEFAULT_OPEN_FILE_PERMITS: usize = 10;
+// Note: If the default changes make sure you update the documentation in
+// `config/cas_server.rs`.
+pub const DEFAULT_OPEN_FILE_PERMITS: usize = 24 * 1024; // 24k.
 static TOTAL_FILE_SEMAPHORES: AtomicUsize = AtomicUsize::new(DEFAULT_OPEN_FILE_PERMITS);
 pub static OPEN_FILE_SEMAPHORE: Semaphore = Semaphore::const_new(DEFAULT_OPEN_FILE_PERMITS);
 
@@ -309,6 +140,42 @@ where
 }
 
 pub fn set_open_file_limit(limit: usize) {
+    let new_limit = {
+        // We increase the limit by 20% to give extra
+        // room for other file descriptors like sockets,
+        // pipes, and other things.
+        let fs_ulimit =
+            u64::try_from(limit.saturating_add(limit / 5)).expect("set_open_file_limit too large");
+        match increase_nofile_limit(fs_ulimit) {
+            Ok(new_fs_ulimit) => {
+                event!(
+                    Level::INFO,
+                    "set_open_file_limit({limit})::ulimit success. New fs.ulimit: {fs_ulimit} (20% increase of {limit}).",
+                );
+                usize::try_from(new_fs_ulimit).expect("new_fs_ulimit too large")
+            }
+            Err(e) => {
+                event!(
+                    Level::ERROR,
+                    "set_open_file_limit({limit})::ulimit failed. Maybe system does not have ulimits, continuing anyway. - {e:?}",
+                );
+                limit
+            }
+        }
+    };
+    if new_limit < DEFAULT_OPEN_FILE_PERMITS {
+        event!(
+            Level::WARN,
+            "set_open_file_limit({limit}) succeeded, but this is below the default limit of {DEFAULT_OPEN_FILE_PERMITS}. Will continue, but we recommend increasing the limit to at least the default.",
+        );
+    }
+    if new_limit < limit {
+        event!(
+            Level::WARN,
+            "set_open_file_limit({limit}) succeeded, but new open file limit is {new_limit}. Will continue, but likely a config or system options (ie: ulimit) needs updated.",
+        );
+    }
+
     let current_total = TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire);
     if limit < current_total {
         event!(
@@ -327,45 +194,33 @@ pub fn get_open_files_for_test() -> usize {
     TOTAL_FILE_SEMAPHORES.load(Ordering::Acquire) - OPEN_FILE_SEMAPHORE.available_permits()
 }
 
-/// How long a file descriptor can be open without being used before it is closed.
-static IDLE_FILE_DESCRIPTOR_TIMEOUT: OnceLock<Duration> = OnceLock::new();
-
-pub fn idle_file_descriptor_timeout() -> Duration {
-    *IDLE_FILE_DESCRIPTOR_TIMEOUT.get_or_init(|| Duration::MAX)
-}
-
-/// Set the idle file descriptor timeout. This is the amount of time
-/// a file descriptor can be open without being used before it is closed.
-pub fn set_idle_file_descriptor_timeout(timeout: Duration) -> Result<(), Error> {
-    IDLE_FILE_DESCRIPTOR_TIMEOUT
-        .set(timeout)
-        .map_err(|_| make_err!(Code::Internal, "idle_file_descriptor_timeout already set"))
-}
-
-pub async fn open_file(path: impl AsRef<Path>, limit: u64) -> Result<ResumeableFileSlot, Error> {
+pub async fn open_file(
+    path: impl AsRef<Path>,
+    start: u64,
+    limit: u64,
+) -> Result<Take<FileSlot>, Error> {
     let path = path.as_ref().to_owned();
-    let (permit, os_file, path) = call_with_permit(move |permit| {
-        Ok((
-            permit,
-            std::fs::File::open(&path).err_tip(|| format!("Could not open {path:?}"))?,
-            path,
-        ))
+    let (permit, os_file) = call_with_permit(move |permit| {
+        let mut os_file =
+            std::fs::File::open(&path).err_tip(|| format!("Could not open {path:?}"))?;
+        if start > 0 {
+            os_file
+                .seek(std::io::SeekFrom::Start(start))
+                .err_tip(|| format!("Could not seek to {start} in {path:?}"))?;
+        }
+        Ok((permit, os_file))
     })
     .await?;
-    Ok(ResumeableFileSlot::new_with_take(
-        FileSlot {
-            _permit: permit,
-            inner: tokio::fs::File::from_std(os_file),
-        }
-        .take(limit),
-        path,
-        false, /* is_write */
-    ))
+    Ok(FileSlot {
+        _permit: permit,
+        inner: tokio::fs::File::from_std(os_file),
+    }
+    .take(limit))
 }
 
-pub async fn create_file(path: impl AsRef<Path>) -> Result<ResumeableFileSlot, Error> {
+pub async fn create_file(path: impl AsRef<Path>) -> Result<FileSlot, Error> {
     let path = path.as_ref().to_owned();
-    let (permit, os_file, path) = call_with_permit(move |permit| {
+    let (permit, os_file) = call_with_permit(move |permit| {
         Ok((
             permit,
             std::fs::File::options()
@@ -375,18 +230,13 @@ pub async fn create_file(path: impl AsRef<Path>) -> Result<ResumeableFileSlot, E
                 .truncate(true)
                 .open(&path)
                 .err_tip(|| format!("Could not open {path:?}"))?,
-            path,
         ))
     })
     .await?;
-    Ok(ResumeableFileSlot::new(
-        FileSlot {
-            _permit: permit,
-            inner: tokio::fs::File::from_std(os_file),
-        },
-        path,
-        true, /* is_write */
-    ))
+    Ok(FileSlot {
+        _permit: permit,
+        inner: tokio::fs::File::from_std(os_file),
+    })
 }
 
 pub async fn hard_link(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<(), Error> {
diff --git a/nativelink-util/src/store_trait.rs b/nativelink-util/src/store_trait.rs
index 4f01194b3..39fad9b9c 100644
--- a/nativelink-util/src/store_trait.rs
+++ b/nativelink-util/src/store_trait.rs
@@ -15,6 +15,7 @@
 use std::borrow::{Borrow, BorrowMut, Cow};
 use std::collections::hash_map::DefaultHasher as StdHasher;
 use std::convert::Into;
+use std::ffi::OsString;
 use std::hash::{Hash, Hasher};
 use std::ops::{Bound, RangeBounds};
 use std::pin::Pin;
@@ -23,20 +24,18 @@ use std::sync::{Arc, OnceLock};
 
 use async_trait::async_trait;
 use bytes::{Bytes, BytesMut};
-use futures::future::{select, Either};
 use futures::{join, try_join, Future, FutureExt, Stream};
 use nativelink_error::{error_if, make_err, Code, Error, ResultExt};
 use nativelink_metric::MetricsComponent;
 use rand::rngs::StdRng;
 use rand::{RngCore, SeedableRng};
 use serde::{Deserialize, Serialize};
-use tokio::io::AsyncSeekExt;
-use tokio::time::timeout;
+use tokio::io::{AsyncReadExt, AsyncSeekExt};
 
 use crate::buf_channel::{make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf};
 use crate::common::DigestInfo;
 use crate::digest_hasher::{default_digest_hasher_func, DigestHasher, DigestHasherFunc};
-use crate::fs::{self, idle_file_descriptor_timeout};
+use crate::fs;
 use crate::health_utils::{HealthRegistryBuilder, HealthStatus, HealthStatusIndicator};
 
 static DEFAULT_DIGEST_SIZE_HEALTH_CHECK: OnceLock<usize> = OnceLock::new();
@@ -79,54 +78,37 @@ pub enum UploadSizeInfo {
 pub async fn slow_update_store_with_file<S: StoreDriver + ?Sized>(
     store: Pin<&S>,
     digest: impl Into<StoreKey<'_>>,
-    file: &mut fs::ResumeableFileSlot,
+    file: &mut fs::FileSlot,
     upload_size: UploadSizeInfo,
 ) -> Result<(), Error> {
-    file.as_writer()
-        .await
-        .err_tip(|| "Failed to get writer in upload_file_to_store")?
-        .rewind()
+    file.rewind()
         .await
         .err_tip(|| "Failed to rewind in upload_file_to_store")?;
-    let (tx, rx) = make_buf_channel_pair();
+    let (mut tx, rx) = make_buf_channel_pair();
 
-    let mut update_fut = store
+    let update_fut = store
         .update(digest.into(), rx, upload_size)
         .map(|r| r.err_tip(|| "Could not upload data to store in upload_file_to_store"));
-    let read_result = {
-        let read_data_fut = async {
-            let (_, mut tx) = file
-                .read_buf_cb(
-                    (BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx),
-                    move |(chunk, mut tx)| async move {
-                        tx.send(chunk.freeze())
-                            .await
-                            .err_tip(|| "Failed to send in upload_file_to_store")?;
-                        Ok((BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE), tx))
-                    },
-                )
+    let read_data_fut = async move {
+        loop {
+            let mut buf = BytesMut::with_capacity(fs::DEFAULT_READ_BUFF_SIZE);
+            let read = file
+                .read_buf(&mut buf)
                 .await
-                .err_tip(|| "Error in upload_file_to_store::read_buf_cb section")?;
-            tx.send_eof()
-                .err_tip(|| "Could not send EOF to store in upload_file_to_store")?;
-            Ok(())
-        };
-        tokio::pin!(read_data_fut);
-        match select(&mut update_fut, read_data_fut).await {
-            Either::Left((update_result, read_data_fut)) => {
-                return update_result.merge(read_data_fut.await)
+                .err_tip(|| "Failed to read in upload_file_to_store")?;
+            if read == 0 {
+                break;
             }
-            Either::Right((read_result, _)) => read_result,
+            tx.send(buf.freeze())
+                .await
+                .err_tip(|| "Failed to send in upload_file_to_store")?;
         }
+        tx.send_eof()
+            .err_tip(|| "Could not send EOF to store in upload_file_to_store")
     };
-    if let Ok(update_result) = timeout(idle_file_descriptor_timeout(), &mut update_fut).await {
-        update_result.merge(read_result)
-    } else {
-        file.close_file()
-            .await
-            .err_tip(|| "Failed to close file in upload_file_to_store")?;
-        update_fut.await.merge(read_result)
-    }
+    tokio::pin!(read_data_fut);
+    let (update_res, read_res) = tokio::join!(update_fut, read_data_fut);
+    update_res.merge(read_res)
 }
 
 /// Optimizations that stores may want to expose to the callers.
@@ -495,18 +477,19 @@ pub trait StoreLike: Send + Sync + Sized + Unpin + 'static {
         self.as_store_driver_pin().optimized_for(optimization)
     }
 
-    /// Specialized version of `.update()` which takes a `ResumeableFileSlot`.
+    /// Specialized version of `.update()` which takes a `FileSlot`.
     /// This is useful if the underlying store can optimize the upload process
     /// when it knows the data is coming from a file.
     #[inline]
     fn update_with_whole_file<'a>(
         &'a self,
         digest: impl Into<StoreKey<'a>>,
-        file: fs::ResumeableFileSlot,
+        path: OsString,
+        file: fs::FileSlot,
         upload_size: UploadSizeInfo,
-    ) -> impl Future<Output = Result<Option<fs::ResumeableFileSlot>, Error>> + Send + 'a {
+    ) -> impl Future<Output = Result<Option<fs::FileSlot>, Error>> + Send + 'a {
         self.as_store_driver_pin()
-            .update_with_whole_file(digest.into(), file, upload_size)
+            .update_with_whole_file(digest.into(), path, file, upload_size)
     }
 
     /// Utility to send all the data to the store when you have all the bytes.
@@ -635,9 +618,10 @@ pub trait StoreDriver:
     async fn update_with_whole_file(
         self: Pin<&Self>,
         key: StoreKey<'_>,
-        mut file: fs::ResumeableFileSlot,
+        path: OsString,
+        mut file: fs::FileSlot,
         upload_size: UploadSizeInfo,
-    ) -> Result<Option<fs::ResumeableFileSlot>, Error> {
+    ) -> Result<Option<fs::FileSlot>, Error> {
         let inner_store = self.inner_store(Some(key.borrow()));
         if inner_store.optimized_for(StoreOptimizations::FileUpdates) {
             error_if!(
@@ -645,7 +629,7 @@ pub trait StoreDriver:
                 "Store::inner_store() returned self when optimization present"
             );
             return Pin::new(inner_store)
-                .update_with_whole_file(key, file, upload_size)
+                .update_with_whole_file(key, path, file, upload_size)
                 .await;
         }
         slow_update_store_with_file(self, key, &mut file, upload_size).await?;
diff --git a/nativelink-util/tests/fs_test.rs b/nativelink-util/tests/fs_test.rs
deleted file mode 100644
index c215e537f..000000000
--- a/nativelink-util/tests/fs_test.rs
+++ /dev/null
@@ -1,180 +0,0 @@
-// Copyright 2024 The NativeLink Authors. All rights reserved.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-use std::env;
-use std::ffi::OsString;
-use std::io::SeekFrom;
-use std::str::from_utf8;
-
-use nativelink_error::Error;
-use nativelink_macro::nativelink_test;
-use nativelink_util::common::fs;
-use pretty_assertions::assert_eq;
-use rand::{thread_rng, Rng};
-use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
-use tokio::sync::Semaphore;
-
-/// Get temporary path from either `TEST_TMPDIR` or best effort temp directory if
-/// not set.
-async fn make_temp_path(data: &str) -> OsString {
-    let dir = format!(
-        "{}/{}",
-        env::var("TEST_TMPDIR").unwrap_or(env::temp_dir().to_str().unwrap().to_string()),
-        thread_rng().gen::<u64>(),
-    );
-    fs::create_dir_all(&dir).await.unwrap();
-    OsString::from(format!("{dir}/{data}"))
-}
-
-static TEST_EXCLUSIVE_SEMAPHORE: Semaphore = Semaphore::const_new(1);
-
-#[nativelink_test]
-async fn resumeable_file_slot_write_close_write_test() -> Result<(), Error> {
-    let _permit = TEST_EXCLUSIVE_SEMAPHORE.acquire().await; // One test at a time.
-    let filename = make_temp_path("test_file.txt").await;
-    {
-        let mut file = fs::create_file(&filename).await?;
-        file.as_writer().await?.write_all(b"Hello").await?;
-        file.close_file().await?;
-        assert_eq!(fs::get_open_files_for_test(), 0);
-        file.as_writer().await?.write_all(b"Goodbye").await?;
-        assert_eq!(fs::get_open_files_for_test(), 1);
-        file.as_writer().await?.as_mut().sync_all().await?;
-    }
-    assert_eq!(fs::get_open_files_for_test(), 0);
-    {
-        let mut file = fs::open_file(&filename, u64::MAX).await?;
-        let mut contents = String::new();
-        file.as_reader()
-            .await?
-            .read_to_string(&mut contents)
-            .await?;
-        assert_eq!(contents, "HelloGoodbye");
-    }
-    Ok(())
-}
-
-#[nativelink_test]
-async fn resumeable_file_slot_read_close_read_test() -> Result<(), Error> {
-    const DUMMYDATA: &str = "DummyDataTest";
-    let _permit = TEST_EXCLUSIVE_SEMAPHORE.acquire().await; // One test at a time.
-    let filename = make_temp_path("test_file.txt").await;
-    {
-        let mut file = fs::create_file(&filename).await?;
-        file.as_writer()
-            .await?
-            .write_all(DUMMYDATA.as_bytes())
-            .await?;
-        file.as_writer().await?.as_mut().sync_all().await?;
-    }
-    {
-        let mut file = fs::open_file(&filename, u64::MAX).await?;
-        let mut contents = [0u8; 5];
-        {
-            assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5);
-            assert_eq!(from_utf8(&contents[..]).unwrap(), "Dummy");
-        }
-        file.close_file().await?;
-        {
-            assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5);
-            assert_eq!(from_utf8(&contents[..]).unwrap(), "DataT");
-        }
-        file.close_file().await?;
-        {
-            assert_eq!(file.as_reader().await?.read(&mut contents).await?, 3);
-            assert_eq!(from_utf8(&contents[..3]).unwrap(), "est");
-        }
-    }
-    Ok(())
-}
-
-#[nativelink_test]
-async fn resumeable_file_slot_read_close_read_with_take_test() -> Result<(), Error> {
-    const DUMMYDATA: &str = "DummyDataTest";
-    let _permit = TEST_EXCLUSIVE_SEMAPHORE.acquire().await; // One test at a time.
-    let filename = make_temp_path("test_file.txt").await;
-    {
-        let mut file = fs::create_file(&filename).await?;
-        file.as_writer()
-            .await?
-            .write_all(DUMMYDATA.as_bytes())
-            .await?;
-        file.as_writer().await?.as_mut().sync_all().await?;
-    }
-    {
-        let mut file = fs::open_file(&filename, 11).await?;
-        let mut contents = [0u8; 5];
-        {
-            assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5);
-            assert_eq!(from_utf8(&contents[..]).unwrap(), "Dummy");
-        }
-        assert_eq!(fs::get_open_files_for_test(), 1);
-        file.close_file().await?;
-        assert_eq!(fs::get_open_files_for_test(), 0);
-        {
-            assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5);
-            assert_eq!(from_utf8(&contents[..]).unwrap(), "DataT");
-        }
-        assert_eq!(fs::get_open_files_for_test(), 1);
-        file.close_file().await?;
-        assert_eq!(fs::get_open_files_for_test(), 0);
-        {
-            assert_eq!(file.as_reader().await?.read(&mut contents).await?, 1);
-            assert_eq!(from_utf8(&contents[..1]).unwrap(), "e");
-        }
-    }
-    Ok(())
-}
-
-#[nativelink_test]
-async fn resumeable_file_slot_read_close_read_with_take_and_seek_test() -> Result<(), Error> {
-    const DUMMYDATA: &str = "DummyDataTest";
-    let _permit = TEST_EXCLUSIVE_SEMAPHORE.acquire().await; // One test at a time.
-    let filename = make_temp_path("test_file.txt").await;
-    {
-        let mut file = fs::create_file(&filename).await?;
-        file.as_writer()
-            .await?
-            .write_all(DUMMYDATA.as_bytes())
-            .await?;
-        file.as_writer().await?.as_mut().sync_all().await?;
-    }
-    {
-        let mut file = fs::open_file(&filename, 11).await?;
-        file.as_reader()
-            .await?
-            .get_mut()
-            .seek(SeekFrom::Start(2))
-            .await?;
-        let mut contents = [0u8; 5];
-        {
-            assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5);
-            assert_eq!(from_utf8(&contents[..]).unwrap(), "mmyDa");
-        }
-        assert_eq!(fs::get_open_files_for_test(), 1);
-        file.close_file().await?;
-        assert_eq!(fs::get_open_files_for_test(), 0);
-        {
-            assert_eq!(file.as_reader().await?.read(&mut contents).await?, 5);
-            assert_eq!(from_utf8(&contents[..]).unwrap(), "taTes");
-        }
-        file.close_file().await?;
-        assert_eq!(fs::get_open_files_for_test(), 0);
-        {
-            assert_eq!(file.as_reader().await?.read(&mut contents).await?, 1);
-            assert_eq!(from_utf8(&contents[..1]).unwrap(), "t");
-        }
-    }
-    Ok(())
-}
diff --git a/nativelink-worker/BUILD.bazel b/nativelink-worker/BUILD.bazel
index 111f96763..fd054831d 100644
--- a/nativelink-worker/BUILD.bazel
+++ b/nativelink-worker/BUILD.bazel
@@ -76,6 +76,7 @@ rust_test_suite(
         "@crates//:prost",
         "@crates//:prost-types",
         "@crates//:rand",
+        "@crates//:serial_test",
         "@crates//:tokio",
         "@crates//:tonic",
     ],
diff --git a/nativelink-worker/Cargo.toml b/nativelink-worker/Cargo.toml
index e136ec431..a59aa957c 100644
--- a/nativelink-worker/Cargo.toml
+++ b/nativelink-worker/Cargo.toml
@@ -39,3 +39,4 @@ hyper-util = "0.1.10"
 pretty_assertions = { version = "1.4.1", features = ["std"] }
 prost-types = { version = "0.13.4", default-features = false }
 rand = { version = "0.8.5", default-features = false }
+serial_test = { version = "3.2.0", features = ["async"], default-features = false }
diff --git a/nativelink-worker/src/running_actions_manager.rs b/nativelink-worker/src/running_actions_manager.rs
index 044f54785..f95974ea3 100644
--- a/nativelink-worker/src/running_actions_manager.rs
+++ b/nativelink-worker/src/running_actions_manager.rs
@@ -261,24 +261,17 @@ async fn upload_file(
 ) -> Result<FileInfo, Error> {
     let is_executable = is_executable(&metadata, &full_path);
     let file_size = metadata.len();
-    let resumeable_file = fs::open_file(&full_path, u64::MAX)
+    let file = fs::open_file(&full_path, 0, u64::MAX)
         .await
         .err_tip(|| format!("Could not open file {full_path:?}"))?;
 
-    let (digest, mut resumeable_file) = hasher
+    let (digest, mut file) = hasher
         .hasher()
-        .digest_for_file(resumeable_file, Some(file_size))
+        .digest_for_file(&full_path, file.into_inner(), Some(file_size))
         .await
         .err_tip(|| format!("Failed to hash file in digest_for_file failed for {full_path:?}"))?;
 
-    resumeable_file
-        .as_reader()
-        .await
-        .err_tip(|| "Could not get reader from file slot in RunningActionsManager::upload_file()")?
-        .get_mut()
-        .rewind()
-        .await
-        .err_tip(|| "Could not rewind file")?;
+    file.rewind().await.err_tip(|| "Could not rewind file")?;
 
     // Note: For unknown reasons we appear to be hitting:
     // https://github.com/rust-lang/rust/issues/92096
@@ -288,7 +281,8 @@ async fn upload_file(
         .as_store_driver_pin()
         .update_with_whole_file(
             digest.into(),
-            resumeable_file,
+            full_path.as_ref().into(),
+            file,
             UploadSizeInfo::ExactSize(digest.size_bytes()),
         )
         .await
@@ -490,7 +484,7 @@ async fn process_side_channel_file(
     let mut json_contents = String::new();
     {
         // Note: Scoping `file_slot` allows the file_slot semaphore to be released faster.
-        let mut file_slot = match fs::open_file(side_channel_file, u64::MAX).await {
+        let mut file_slot = match fs::open_file(side_channel_file, 0, u64::MAX).await {
             Ok(file_slot) => file_slot,
             Err(e) => {
                 if e.code != Code::NotFound {
@@ -500,11 +494,7 @@ async fn process_side_channel_file(
                 return Ok(None);
             }
         };
-        let reader = file_slot
-            .as_reader()
-            .await
-            .err_tip(|| "Error getting reader from side channel file (maybe permissions?)")?;
-        reader
+        file_slot
             .read_to_string(&mut json_contents)
             .await
             .err_tip(|| "Error reading side channel file")?;
@@ -992,12 +982,6 @@ impl RunningActionImpl {
                             ?err,
                             "Could not kill process",
                         );
-                    } else {
-                        event!(
-                            Level::ERROR,
-                            operation_id = ?self.operation_id,
-                            "Could not get child process id, maybe already dead?",
-                        );
                     }
                     {
                         let mut state = self.state.lock();
diff --git a/nativelink-worker/tests/local_worker_test.rs b/nativelink-worker/tests/local_worker_test.rs
index 7d0350c94..8bf0ca8f1 100644
--- a/nativelink-worker/tests/local_worker_test.rs
+++ b/nativelink-worker/tests/local_worker_test.rs
@@ -15,11 +15,9 @@
 use std::collections::HashMap;
 use std::env;
 use std::ffi::OsString;
-#[cfg(target_family = "unix")]
-use std::fs::Permissions;
 use std::io::Write;
 #[cfg(target_family = "unix")]
-use std::os::unix::fs::PermissionsExt;
+use std::os::unix::fs::OpenOptionsExt;
 use std::path::PathBuf;
 use std::sync::Arc;
 use std::time::{Duration, SystemTime};
@@ -75,8 +73,8 @@ fn make_temp_path(data: &str) -> String {
     )
 }
 
-#[cfg_attr(feature = "nix", ignore)]
 #[nativelink_test]
+#[cfg_attr(feature = "nix", ignore)]
 async fn platform_properties_smoke_test() -> Result<(), Error> {
     let mut platform_properties = HashMap::new();
     platform_properties.insert(
@@ -132,7 +130,7 @@ async fn platform_properties_smoke_test() -> Result<(), Error> {
 }
 
 #[nativelink_test]
-async fn reconnect_on_server_disconnect_test() -> Result<(), Box<dyn std::error::Error>> {
+async fn reconnect_on_server_disconnect_test() -> Result<(), Error> {
     let mut test_context = setup_local_worker(HashMap::new()).await;
     let streaming_response = test_context.maybe_streaming_response.take().unwrap();
 
@@ -162,7 +160,7 @@ async fn reconnect_on_server_disconnect_test() -> Result<(), Box<dyn std::error:
 }
 
 #[nativelink_test]
-async fn kill_all_called_on_disconnect() -> Result<(), Box<dyn std::error::Error>> {
+async fn kill_all_called_on_disconnect() -> Result<(), Error> {
     let mut test_context = setup_local_worker(HashMap::new()).await;
     let streaming_response = test_context.maybe_streaming_response.take().unwrap();
 
@@ -179,11 +177,14 @@ async fn kill_all_called_on_disconnect() -> Result<(), Box<dyn std::error::Error
     let tx_stream = test_context.maybe_tx_stream.take().unwrap();
     {
         tx_stream
-            .send(Frame::data(encode_stream_proto(&UpdateForWorker {
-                update: Some(Update::ConnectionResult(ConnectionResult {
-                    worker_id: "foobar".to_string(),
-                })),
-            })?))
+            .send(Frame::data(
+                encode_stream_proto(&UpdateForWorker {
+                    update: Some(Update::ConnectionResult(ConnectionResult {
+                        worker_id: "foobar".to_string(),
+                    })),
+                })
+                .unwrap(),
+            ))
             .await
             .map_err(|e| make_input_err!("Could not send : {:?}", e))?;
     }
@@ -198,7 +199,7 @@ async fn kill_all_called_on_disconnect() -> Result<(), Box<dyn std::error::Error
 }
 
 #[nativelink_test]
-async fn blake3_digest_function_registerd_properly() -> Result<(), Box<dyn std::error::Error>> {
+async fn blake3_digest_function_registerd_properly() -> Result<(), Error> {
     let mut test_context = setup_local_worker(HashMap::new()).await;
     let streaming_response = test_context.maybe_streaming_response.take().unwrap();
 
@@ -217,11 +218,14 @@ async fn blake3_digest_function_registerd_properly() -> Result<(), Box<dyn std::
     {
         // First initialize our worker by sending the response to the connection request.
         tx_stream
-            .send(Frame::data(encode_stream_proto(&UpdateForWorker {
-                update: Some(Update::ConnectionResult(ConnectionResult {
-                    worker_id: expected_worker_id.clone(),
-                })),
-            })?))
+            .send(Frame::data(
+                encode_stream_proto(&UpdateForWorker {
+                    update: Some(Update::ConnectionResult(ConnectionResult {
+                        worker_id: expected_worker_id.clone(),
+                    })),
+                })
+                .unwrap(),
+            ))
             .await
             .map_err(|e| make_input_err!("Could not send : {:?}", e))?;
     }
@@ -245,15 +249,18 @@ async fn blake3_digest_function_registerd_properly() -> Result<(), Box<dyn std::
     {
         // Send execution request.
         tx_stream
-            .send(Frame::data(encode_stream_proto(&UpdateForWorker {
-                update: Some(Update::StartAction(StartExecute {
-                    execute_request: Some((&action_info).into()),
-                    operation_id: String::new(),
-                    queued_timestamp: None,
-                    platform: Some(Platform::default()),
-                    worker_id: expected_worker_id.clone(),
-                })),
-            })?))
+            .send(Frame::data(
+                encode_stream_proto(&UpdateForWorker {
+                    update: Some(Update::StartAction(StartExecute {
+                        execute_request: Some((&action_info).into()),
+                        operation_id: String::new(),
+                        queued_timestamp: None,
+                        platform: Some(Platform::default()),
+                        worker_id: expected_worker_id.clone(),
+                    })),
+                })
+                .unwrap(),
+            ))
             .await
             .map_err(|e| make_input_err!("Could not send : {:?}", e))?;
     }
@@ -282,7 +289,7 @@ async fn blake3_digest_function_registerd_properly() -> Result<(), Box<dyn std::
 }
 
 #[nativelink_test]
-async fn simple_worker_start_action_test() -> Result<(), Box<dyn std::error::Error>> {
+async fn simple_worker_start_action_test() -> Result<(), Error> {
     let mut test_context = setup_local_worker(HashMap::new()).await;
     let streaming_response = test_context.maybe_streaming_response.take().unwrap();
 
@@ -301,11 +308,14 @@ async fn simple_worker_start_action_test() -> Result<(), Box<dyn std::error::Err
     {
         // First initialize our worker by sending the response to the connection request.
         tx_stream
-            .send(Frame::data(encode_stream_proto(&UpdateForWorker {
-                update: Some(Update::ConnectionResult(ConnectionResult {
-                    worker_id: expected_worker_id.clone(),
-                })),
-            })?))
+            .send(Frame::data(
+                encode_stream_proto(&UpdateForWorker {
+                    update: Some(Update::ConnectionResult(ConnectionResult {
+                        worker_id: expected_worker_id.clone(),
+                    })),
+                })
+                .unwrap(),
+            ))
             .await
             .map_err(|e| make_input_err!("Could not send : {:?}", e))?;
     }
@@ -329,15 +339,18 @@ async fn simple_worker_start_action_test() -> Result<(), Box<dyn std::error::Err
     {
         // Send execution request.
         tx_stream
-            .send(Frame::data(encode_stream_proto(&UpdateForWorker {
-                update: Some(Update::StartAction(StartExecute {
-                    execute_request: Some((&action_info).into()),
-                    operation_id: String::new(),
-                    queued_timestamp: None,
-                    platform: Some(Platform::default()),
-                    worker_id: expected_worker_id.clone(),
-                })),
-            })?))
+            .send(Frame::data(
+                encode_stream_proto(&UpdateForWorker {
+                    update: Some(Update::StartAction(StartExecute {
+                        execute_request: Some((&action_info).into()),
+                        operation_id: String::new(),
+                        queued_timestamp: None,
+                        platform: Some(Platform::default()),
+                        worker_id: expected_worker_id.clone(),
+                    })),
+                })
+                .unwrap(),
+            ))
             .await
             .map_err(|e| make_input_err!("Could not send : {:?}", e))?;
     }
@@ -411,7 +424,7 @@ async fn simple_worker_start_action_test() -> Result<(), Box<dyn std::error::Err
 }
 
 #[nativelink_test]
-async fn new_local_worker_creates_work_directory_test() -> Result<(), Box<dyn std::error::Error>> {
+async fn new_local_worker_creates_work_directory_test() -> Result<(), Error> {
     let cas_store = Store::new(FastSlowStore::new(
         &FastSlowSpec {
             // Note: These are not needed for this test, so we put dummy memory stores here.
@@ -450,8 +463,7 @@ async fn new_local_worker_creates_work_directory_test() -> Result<(), Box<dyn st
 }
 
 #[nativelink_test]
-async fn new_local_worker_removes_work_directory_before_start_test(
-) -> Result<(), Box<dyn std::error::Error>> {
+async fn new_local_worker_removes_work_directory_before_start_test() -> Result<(), Error> {
     let cas_store = Store::new(FastSlowStore::new(
         &FastSlowSpec {
             // Note: These are not needed for this test, so we put dummy memory stores here.
@@ -473,8 +485,8 @@ async fn new_local_worker_removes_work_directory_before_start_test(
     fs::create_dir_all(format!("{}/{}", work_directory, "another_dir")).await?;
     let mut file =
         fs::create_file(OsString::from(format!("{}/{}", work_directory, "foo.txt"))).await?;
-    file.as_writer().await?.write_all(b"Hello, world!").await?;
-    file.as_writer().await?.as_mut().sync_all().await?;
+    file.write_all(b"Hello, world!").await?;
+    file.as_mut().sync_all().await?;
     drop(file);
     new_local_worker(
         Arc::new(LocalWorkerConfig {
@@ -498,7 +510,7 @@ async fn new_local_worker_removes_work_directory_before_start_test(
 }
 
 #[nativelink_test]
-async fn experimental_precondition_script_fails() -> Result<(), Box<dyn std::error::Error>> {
+async fn experimental_precondition_script_fails() -> Result<(), Error> {
     #[cfg(target_family = "unix")]
     const EXPECTED_MSG: &str = "Preconditions script returned status exit status: 1 - ";
     #[cfg(target_family = "windows")]
@@ -509,15 +521,31 @@ async fn experimental_precondition_script_fails() -> Result<(), Box<dyn std::err
     #[cfg(target_family = "unix")]
     let precondition_script = {
         let precondition_script = format!("{temp_path}/precondition.sh");
+        let precondition_script_tmp = format!("{precondition_script}.tmp");
+
         // We use std::fs::File here because we sometimes get strange bugs here
         // that result in: "Text file busy (os error 26)" if it is an executeable.
         // It is likley because somewhere the file descriotor does not get closed
         // in tokio's async context.
-        let mut file = std::fs::File::create(OsString::from(&precondition_script))?;
-        file.write_all(b"#!/bin/sh\nexit 1\n")?;
-        file.set_permissions(Permissions::from_mode(0o777))?;
-        file.sync_all()?;
-        drop(file);
+        {
+            // We write to a temporary file and then rename it to force the kernel
+            // to flush all related file descriptors fully before we use it.
+            let mut file = std::fs::OpenOptions::new()
+                .create(true)
+                .truncate(true)
+                .write(true)
+                .mode(0o777)
+                .open(OsString::from(&precondition_script_tmp))
+                .unwrap();
+            file.write_all(b"#!/bin/sh\nexit 1\n").unwrap();
+            file.sync_all().unwrap();
+            // Note: Github runners appear to use some kind of filesystem driver
+            // that does not sync data as expected. This is the easiest solution.
+            // See: https://github.com/pantsbuild/pants/issues/10507
+            // See: https://github.com/moby/moby/issues/9547
+            std::process::Command::new("sync").output().unwrap();
+        }
+        std::fs::rename(&precondition_script_tmp, &precondition_script).unwrap();
         precondition_script
     };
     #[cfg(target_family = "windows")]
@@ -525,12 +553,10 @@ async fn experimental_precondition_script_fails() -> Result<(), Box<dyn std::err
         let precondition_script = format!("{}/precondition.bat", temp_path);
         let mut file = std::fs::File::create(OsString::from(&precondition_script))?;
         file.write_all(b"@echo off\r\nexit 1")?;
-        file.sync_all()?;
-        drop(file);
+        file.sync_all().unwrap();
         precondition_script
     };
-    // TODO(#527) Sleep to reduce flakey chances.
-    tokio::time::sleep(Duration::from_millis(250)).await;
+
     let local_worker_config = LocalWorkerConfig {
         experimental_precondition_script: Some(precondition_script),
         ..Default::default()
@@ -554,11 +580,14 @@ async fn experimental_precondition_script_fails() -> Result<(), Box<dyn std::err
     {
         // First initialize our worker by sending the response to the connection request.
         tx_stream
-            .send(Frame::data(encode_stream_proto(&UpdateForWorker {
-                update: Some(Update::ConnectionResult(ConnectionResult {
-                    worker_id: expected_worker_id.clone(),
-                })),
-            })?))
+            .send(Frame::data(
+                encode_stream_proto(&UpdateForWorker {
+                    update: Some(Update::ConnectionResult(ConnectionResult {
+                        worker_id: expected_worker_id.clone(),
+                    })),
+                })
+                .unwrap(),
+            ))
             .await
             .map_err(|e| make_input_err!("Could not send : {:?}", e))?;
     }
@@ -582,15 +611,18 @@ async fn experimental_precondition_script_fails() -> Result<(), Box<dyn std::err
     {
         // Send execution request.
         tx_stream
-            .send(Frame::data(encode_stream_proto(&UpdateForWorker {
-                update: Some(Update::StartAction(StartExecute {
-                    execute_request: Some((&action_info).into()),
-                    operation_id: String::new(),
-                    queued_timestamp: None,
-                    platform: Some(Platform::default()),
-                    worker_id: expected_worker_id.clone(),
-                })),
-            })?))
+            .send(Frame::data(
+                encode_stream_proto(&UpdateForWorker {
+                    update: Some(Update::StartAction(StartExecute {
+                        execute_request: Some((&action_info).into()),
+                        operation_id: String::new(),
+                        queued_timestamp: None,
+                        platform: Some(Platform::default()),
+                        worker_id: expected_worker_id.clone(),
+                    })),
+                })
+                .unwrap(),
+            ))
             .await
             .map_err(|e| make_input_err!("Could not send : {:?}", e))?;
     }
@@ -618,7 +650,7 @@ async fn experimental_precondition_script_fails() -> Result<(), Box<dyn std::err
 }
 
 #[nativelink_test]
-async fn kill_action_request_kills_action() -> Result<(), Box<dyn std::error::Error>> {
+async fn kill_action_request_kills_action() -> Result<(), Error> {
     let mut test_context = setup_local_worker(HashMap::new()).await;
 
     let streaming_response = test_context.maybe_streaming_response.take().unwrap();
@@ -638,11 +670,14 @@ async fn kill_action_request_kills_action() -> Result<(), Box<dyn std::error::Er
     let tx_stream = test_context.maybe_tx_stream.take().unwrap();
     {
         tx_stream
-            .send(Frame::data(encode_stream_proto(&UpdateForWorker {
-                update: Some(Update::ConnectionResult(ConnectionResult {
-                    worker_id: expected_worker_id.clone(),
-                })),
-            })?))
+            .send(Frame::data(
+                encode_stream_proto(&UpdateForWorker {
+                    update: Some(Update::ConnectionResult(ConnectionResult {
+                        worker_id: expected_worker_id.clone(),
+                    })),
+                })
+                .unwrap(),
+            ))
             .await
             .map_err(|e| make_input_err!("Could not send : {:?}", e))?;
     }
@@ -667,15 +702,18 @@ async fn kill_action_request_kills_action() -> Result<(), Box<dyn std::error::Er
     {
         // Send execution request.
         tx_stream
-            .send(Frame::data(encode_stream_proto(&UpdateForWorker {
-                update: Some(Update::StartAction(StartExecute {
-                    execute_request: Some((&action_info).into()),
-                    operation_id: operation_id.to_string(),
-                    queued_timestamp: None,
-                    platform: Some(Platform::default()),
-                    worker_id: expected_worker_id.clone(),
-                })),
-            })?))
+            .send(Frame::data(
+                encode_stream_proto(&UpdateForWorker {
+                    update: Some(Update::StartAction(StartExecute {
+                        execute_request: Some((&action_info).into()),
+                        operation_id: operation_id.to_string(),
+                        queued_timestamp: None,
+                        platform: Some(Platform::default()),
+                        worker_id: expected_worker_id.clone(),
+                    })),
+                })
+                .unwrap(),
+            ))
             .await
             .map_err(|e| make_input_err!("Could not send : {:?}", e))?;
     }
@@ -690,11 +728,14 @@ async fn kill_action_request_kills_action() -> Result<(), Box<dyn std::error::Er
     {
         // Send kill request.
         tx_stream
-            .send(Frame::data(encode_stream_proto(&UpdateForWorker {
-                update: Some(Update::KillOperationRequest(KillOperationRequest {
-                    operation_id: operation_id.to_string(),
-                })),
-            })?))
+            .send(Frame::data(
+                encode_stream_proto(&UpdateForWorker {
+                    update: Some(Update::KillOperationRequest(KillOperationRequest {
+                        operation_id: operation_id.to_string(),
+                    })),
+                })
+                .unwrap(),
+            ))
             .await
             .map_err(|e| make_input_err!("Could not send : {:?}", e))?;
     }
diff --git a/nativelink-worker/tests/running_actions_manager_test.rs b/nativelink-worker/tests/running_actions_manager_test.rs
index 647e38e2c..0347ad7b3 100644
--- a/nativelink-worker/tests/running_actions_manager_test.rs
+++ b/nativelink-worker/tests/running_actions_manager_test.rs
@@ -15,17 +15,16 @@
 use std::collections::HashMap;
 use std::env;
 use std::ffi::OsString;
-#[cfg(target_family = "unix")]
-use std::fs::Permissions;
 use std::io::{Cursor, Write};
 #[cfg(target_family = "unix")]
-use std::os::unix::fs::{MetadataExt, PermissionsExt};
+use std::os::unix::fs::{MetadataExt, OpenOptionsExt};
 use std::str::from_utf8;
 use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
 use std::sync::{Arc, LazyLock, Mutex};
+use std::task::Poll;
 use std::time::{Duration, SystemTime, UNIX_EPOCH};
 
-use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
+use futures::{poll, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
 use nativelink_config::cas_server::EnvironmentSource;
 use nativelink_config::stores::{FastSlowSpec, FilesystemSpec, MemorySpec, StoreSpec};
 use nativelink_error::{make_input_err, Code, Error, ResultExt};
@@ -60,6 +59,7 @@ use nativelink_worker::running_actions_manager::{
 use pretty_assertions::assert_eq;
 use prost::Message;
 use rand::{thread_rng, Rng};
+use serial_test::serial;
 use tokio::sync::oneshot;
 
 /// Get temporary path from either `TEST_TMPDIR` or best effort temp directory if
@@ -144,6 +144,7 @@ fn increment_clock(time: &mut SystemTime) -> SystemTime {
     previous_time
 }
 
+#[serial]
 #[nativelink_test]
 async fn download_to_directory_file_download_test() -> Result<(), Box<dyn std::error::Error>> {
     const FILE1_NAME: &str = "file1.txt";
@@ -242,6 +243,7 @@ async fn download_to_directory_file_download_test() -> Result<(), Box<dyn std::e
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn download_to_directory_folder_download_test() -> Result<(), Box<dyn std::error::Error>> {
     const DIRECTORY1_NAME: &str = "folder1";
@@ -340,6 +342,7 @@ async fn download_to_directory_folder_download_test() -> Result<(), Box<dyn std:
 
 // Windows does not support symlinks.
 #[cfg(not(target_family = "windows"))]
+#[serial]
 #[nativelink_test]
 async fn download_to_directory_symlink_download_test() -> Result<(), Box<dyn std::error::Error>> {
     const FILE_NAME: &str = "file.txt";
@@ -411,6 +414,7 @@ async fn download_to_directory_symlink_download_test() -> Result<(), Box<dyn std
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn ensure_output_files_full_directories_are_created_no_working_directory_test(
 ) -> Result<(), Box<dyn std::error::Error>> {
@@ -449,6 +453,10 @@ async fn ensure_output_files_full_directories_are_created_no_working_directory_t
         let command = Command {
             arguments: vec!["touch".to_string(), "./some/path/test.txt".to_string()],
             output_files: vec!["some/path/test.txt".to_string()],
+            environment_variables: vec![EnvironmentVariable {
+                name: "PATH".to_string(),
+                value: std::env::var("PATH").unwrap(),
+            }],
             ..Default::default()
         };
         let command_digest = serialize_and_upload_message(
@@ -528,6 +536,7 @@ async fn ensure_output_files_full_directories_are_created_no_working_directory_t
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn ensure_output_files_full_directories_are_created_test(
 ) -> Result<(), Box<dyn std::error::Error>> {
@@ -568,6 +577,10 @@ async fn ensure_output_files_full_directories_are_created_test(
             arguments: vec!["touch".to_string(), "./some/path/test.txt".to_string()],
             output_files: vec!["some/path/test.txt".to_string()],
             working_directory: working_directory.to_string(),
+            environment_variables: vec![EnvironmentVariable {
+                name: "PATH".to_string(),
+                value: std::env::var("PATH").unwrap(),
+            }],
             ..Default::default()
         };
         let command_digest = serialize_and_upload_message(
@@ -648,6 +661,7 @@ async fn ensure_output_files_full_directories_are_created_test(
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn blake3_upload_files() -> Result<(), Box<dyn std::error::Error>> {
     const WORKER_ID: &str = "foo_worker_id";
@@ -702,6 +716,10 @@ async fn blake3_upload_files() -> Result<(), Box<dyn std::error::Error>> {
             arguments,
             output_paths: vec!["test.txt".to_string()],
             working_directory: working_directory.to_string(),
+            environment_variables: vec![EnvironmentVariable {
+                name: "PATH".to_string(),
+                value: std::env::var("PATH").unwrap(),
+            }],
             ..Default::default()
         };
         let command_digest = serialize_and_upload_message(
@@ -823,6 +841,7 @@ async fn blake3_upload_files() -> Result<(), Box<dyn std::error::Error>> {
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn upload_files_from_above_cwd_test() -> Result<(), Box<dyn std::error::Error>> {
     const WORKER_ID: &str = "foo_worker_id";
@@ -877,6 +896,10 @@ async fn upload_files_from_above_cwd_test() -> Result<(), Box<dyn std::error::Er
             arguments,
             output_paths: vec!["test.txt".to_string()],
             working_directory: working_directory.to_string(),
+            environment_variables: vec![EnvironmentVariable {
+                name: "PATH".to_string(),
+                value: std::env::var("PATH").unwrap(),
+            }],
             ..Default::default()
         };
         let command_digest = serialize_and_upload_message(
@@ -999,7 +1022,7 @@ async fn upload_files_from_above_cwd_test() -> Result<(), Box<dyn std::error::Er
 
 // Windows does not support symlinks.
 #[cfg(not(target_family = "windows"))]
-#[cfg_attr(feature = "nix", ignore)]
+#[serial]
 #[nativelink_test]
 async fn upload_dir_and_symlink_test() -> Result<(), Box<dyn std::error::Error>> {
     const WORKER_ID: &str = "foo_worker_id";
@@ -1206,6 +1229,7 @@ async fn upload_dir_and_symlink_test() -> Result<(), Box<dyn std::error::Error>>
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn cleanup_happens_on_job_failure() -> Result<(), Box<dyn std::error::Error>> {
     const WORKER_ID: &str = "foo_worker_id";
@@ -1251,6 +1275,10 @@ async fn cleanup_happens_on_job_failure() -> Result<(), Box<dyn std::error::Erro
             arguments,
             output_paths: vec![],
             working_directory: ".".to_string(),
+            environment_variables: vec![EnvironmentVariable {
+                name: "PATH".to_string(),
+                value: std::env::var("PATH").unwrap(),
+            }],
             ..Default::default()
         };
         let command_digest = serialize_and_upload_message(
@@ -1340,7 +1368,7 @@ async fn cleanup_happens_on_job_failure() -> Result<(), Box<dyn std::error::Erro
     Ok(())
 }
 
-#[cfg_attr(feature = "nix", ignore)]
+#[serial]
 #[nativelink_test]
 async fn kill_ends_action() -> Result<(), Box<dyn std::error::Error>> {
     const WORKER_ID: &str = "foo_worker_id";
@@ -1366,11 +1394,21 @@ async fn kill_ends_action() -> Result<(), Box<dyn std::error::Error>> {
         })?);
 
     #[cfg(target_family = "unix")]
-    let arguments = vec![
-        "sh".to_string(),
-        "-c".to_string(),
-        "sleep infinity".to_string(),
-    ];
+    let (arguments, process_started_file) = {
+        let process_started_file = {
+            let tmp_dir = make_temp_path("root_action_directory");
+            fs::create_dir_all(&tmp_dir).await.unwrap();
+            format!("{tmp_dir}/process_started")
+        };
+        (
+            vec![
+                "sh".to_string(),
+                "-c".to_string(),
+                format!("touch {process_started_file} && sleep infinity"),
+            ],
+            process_started_file,
+        )
+    };
     #[cfg(target_family = "windows")]
     // Windows is weird with timeout, so we use ping. See:
     // https://www.ibm.com/support/pages/timeout-command-run-batch-job-exits-immediately-and-returns-error-input-redirection-not-supported-exiting-process-immediately
@@ -1384,6 +1422,10 @@ async fn kill_ends_action() -> Result<(), Box<dyn std::error::Error>> {
         arguments,
         output_paths: vec![],
         working_directory: ".".to_string(),
+        environment_variables: vec![EnvironmentVariable {
+            name: "PATH".to_string(),
+            value: std::env::var("PATH").unwrap(),
+        }],
         ..Default::default()
     };
     let command_digest = serialize_and_upload_message(
@@ -1430,16 +1472,35 @@ async fn kill_ends_action() -> Result<(), Box<dyn std::error::Error>> {
         )
         .await?;
 
-    // Start the action and kill it at the same time.
-    let result = futures::join!(
-        run_action(running_action_impl),
-        running_actions_manager.kill_all()
-    )
-    .0?;
+    let run_action_fut = run_action(running_action_impl);
+    tokio::pin!(run_action_fut);
 
-    // Check that the action was killed.
     #[cfg(target_family = "unix")]
-    assert_eq!(9, result.exit_code);
+    loop {
+        assert_eq!(poll!(&mut run_action_fut), Poll::Pending);
+        tokio::task::yield_now().await;
+        match fs::metadata(&process_started_file).await {
+            Ok(_) => break,
+            Err(err) => {
+                assert_eq!(err.code, Code::NotFound, "Unknown error {err:?}");
+                tokio::time::sleep(Duration::from_millis(1)).await;
+            }
+        }
+    }
+
+    let result = futures::join!(run_action_fut, running_actions_manager.kill_all())
+        .0
+        .unwrap();
+
+    // Check that the action was killed.
+    #[cfg(all(target_family = "unix", not(target_os = "macos")))]
+    assert_eq!(9, result.exit_code, "Wrong exit_code - {result:?}");
+    // Mac for some reason sometimes returns 1 and 9.
+    #[cfg(all(target_family = "unix", target_os = "macos"))]
+    assert!(
+        9 == result.exit_code || 1 == result.exit_code,
+        "Wrong exit_code - {result:?}"
+    );
     // Note: Windows kill command returns exit code 1.
     #[cfg(target_family = "windows")]
     assert_eq!(1, result.exit_code);
@@ -1452,6 +1513,7 @@ async fn kill_ends_action() -> Result<(), Box<dyn std::error::Error>> {
 // print to stdout. We then check the results of both to make sure the shell script was
 // invoked and the actual command was invoked under the shell script.
 #[cfg_attr(feature = "nix", ignore)]
+#[serial]
 #[nativelink_test]
 async fn entrypoint_does_invoke_if_set() -> Result<(), Box<dyn std::error::Error>> {
     #[cfg(target_family = "unix")]
@@ -1490,24 +1552,29 @@ exit 0
         let test_wrapper_script = OsString::from(test_wrapper_dir + "/test_wrapper_script.sh");
         #[cfg(target_family = "windows")]
         let test_wrapper_script = OsString::from(test_wrapper_dir + "\\test_wrapper_script.bat");
-
-        // We use std::fs::File here because we sometimes get strange bugs here
-        // that result in: "Text file busy (os error 26)" if it is an executeable.
-        // It is likley because somewhere the file descriotor does not get closed
-        // in tokio's async context.
-        let mut test_wrapper_script_handle = std::fs::File::create(&test_wrapper_script)?;
-        test_wrapper_script_handle.write_all(TEST_WRAPPER_SCRIPT_CONTENT.as_bytes())?;
-        #[cfg(target_family = "unix")]
-        test_wrapper_script_handle.set_permissions(Permissions::from_mode(0o777))?;
-        test_wrapper_script_handle.sync_all()?;
-        drop(test_wrapper_script_handle);
-
+        {
+            let mut file_options = std::fs::OpenOptions::new();
+            file_options.create(true);
+            file_options.truncate(true);
+            file_options.write(true);
+            #[cfg(target_family = "unix")]
+            file_options.mode(0o777);
+            let mut test_wrapper_script_handle = file_options
+                .open(OsString::from(&test_wrapper_script))
+                .unwrap();
+            test_wrapper_script_handle
+                .write_all(TEST_WRAPPER_SCRIPT_CONTENT.as_bytes())
+                .unwrap();
+            test_wrapper_script_handle.sync_all().unwrap();
+            // Note: Github runners appear to use some kind of filesystem driver
+            // that does not sync data as expected. This is the easiest solution.
+            // See: https://github.com/pantsbuild/pants/issues/10507
+            // See: https://github.com/moby/moby/issues/9547
+            std::process::Command::new("sync").output().unwrap();
+        }
         test_wrapper_script
     };
 
-    // TODO(#527) Sleep to reduce flakey chances.
-    tokio::time::sleep(Duration::from_millis(250)).await;
-
     let running_actions_manager =
         Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
             root_action_directory: root_action_directory.clone(),
@@ -1602,6 +1669,7 @@ exit 0
 }
 
 #[cfg_attr(feature = "nix", ignore)]
+#[serial]
 #[nativelink_test]
 async fn entrypoint_injects_properties() -> Result<(), Box<dyn std::error::Error>> {
     #[cfg(target_family = "unix")]
@@ -1641,24 +1709,29 @@ exit 0
         let test_wrapper_script = OsString::from(test_wrapper_dir + "/test_wrapper_script.sh");
         #[cfg(target_family = "windows")]
         let test_wrapper_script = OsString::from(test_wrapper_dir + "\\test_wrapper_script.bat");
-
-        // We use std::fs::File here because we sometimes get strange bugs here
-        // that result in: "Text file busy (os error 26)" if it is an executeable.
-        // It is likley because somewhere the file descriotor does not get closed
-        // in tokio's async context.
-        let mut test_wrapper_script_handle = std::fs::File::create(&test_wrapper_script)?;
-        test_wrapper_script_handle.write_all(TEST_WRAPPER_SCRIPT_CONTENT.as_bytes())?;
-        #[cfg(target_family = "unix")]
-        test_wrapper_script_handle.set_permissions(Permissions::from_mode(0o777))?;
-        test_wrapper_script_handle.sync_all()?;
-        drop(test_wrapper_script_handle);
-
+        {
+            let mut file_options = std::fs::OpenOptions::new();
+            file_options.create(true);
+            file_options.truncate(true);
+            file_options.write(true);
+            #[cfg(target_family = "unix")]
+            file_options.mode(0o777);
+            let mut test_wrapper_script_handle = file_options
+                .open(OsString::from(&test_wrapper_script))
+                .unwrap();
+            test_wrapper_script_handle
+                .write_all(TEST_WRAPPER_SCRIPT_CONTENT.as_bytes())
+                .unwrap();
+            test_wrapper_script_handle.sync_all().unwrap();
+            // Note: Github runners appear to use some kind of filesystem driver
+            // that does not sync data as expected. This is the easiest solution.
+            // See: https://github.com/pantsbuild/pants/issues/10507
+            // See: https://github.com/moby/moby/issues/9547
+            std::process::Command::new("sync").output().unwrap();
+        }
         test_wrapper_script
     };
 
-    // TODO(#527) Sleep to reduce flakey chances.
-    tokio::time::sleep(Duration::from_millis(250)).await;
-
     let running_actions_manager =
         Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
             root_action_directory: root_action_directory.clone(),
@@ -1701,6 +1774,10 @@ exit 0
     let command = Command {
         arguments,
         working_directory: ".".to_string(),
+        environment_variables: vec![EnvironmentVariable {
+            name: "PATH".to_string(),
+            value: std::env::var("PATH").unwrap(),
+        }],
         ..Default::default()
     };
     let command_digest = serialize_and_upload_message(
@@ -1784,6 +1861,7 @@ exit 0
 }
 
 #[cfg_attr(feature = "nix", ignore)]
+#[serial]
 #[nativelink_test]
 async fn entrypoint_sends_timeout_via_side_channel() -> Result<(), Box<dyn std::error::Error>> {
     #[cfg(target_family = "unix")]
@@ -1811,24 +1889,29 @@ exit 1
         let test_wrapper_script = OsString::from(test_wrapper_dir + "/test_wrapper_script.sh");
         #[cfg(target_family = "windows")]
         let test_wrapper_script = OsString::from(test_wrapper_dir + "\\test_wrapper_script.bat");
-
-        // We use std::fs::File here because we sometimes get strange bugs here
-        // that result in: "Text file busy (os error 26)" if it is an executeable.
-        // It is likley because somewhere the file descriotor does not get closed
-        // in tokio's async context.
-        let mut test_wrapper_script_handle = std::fs::File::create(&test_wrapper_script)?;
-        test_wrapper_script_handle.write_all(TEST_WRAPPER_SCRIPT_CONTENT.as_bytes())?;
-        #[cfg(target_family = "unix")]
-        test_wrapper_script_handle.set_permissions(Permissions::from_mode(0o777))?;
-        test_wrapper_script_handle.sync_all()?;
-        drop(test_wrapper_script_handle);
-
+        {
+            let mut file_options = std::fs::OpenOptions::new();
+            file_options.create(true);
+            file_options.truncate(true);
+            file_options.write(true);
+            #[cfg(target_family = "unix")]
+            file_options.mode(0o777);
+            let mut test_wrapper_script_handle = file_options
+                .open(OsString::from(&test_wrapper_script))
+                .unwrap();
+            test_wrapper_script_handle
+                .write_all(TEST_WRAPPER_SCRIPT_CONTENT.as_bytes())
+                .unwrap();
+            test_wrapper_script_handle.sync_all().unwrap();
+            // Note: Github runners appear to use some kind of filesystem driver
+            // that does not sync data as expected. This is the easiest solution.
+            // See: https://github.com/pantsbuild/pants/issues/10507
+            // See: https://github.com/moby/moby/issues/9547
+            std::process::Command::new("sync").output().unwrap();
+        }
         test_wrapper_script
     };
 
-    // TODO(#527) Sleep to reduce flakey chances.
-    tokio::time::sleep(Duration::from_millis(250)).await;
-
     let running_actions_manager =
         Arc::new(RunningActionsManagerImpl::new(RunningActionsManagerArgs {
             root_action_directory: root_action_directory.clone(),
@@ -1854,6 +1937,10 @@ exit 1
     let command = Command {
         arguments,
         working_directory: ".".to_string(),
+        environment_variables: vec![EnvironmentVariable {
+            name: "PATH".to_string(),
+            value: std::env::var("PATH").unwrap(),
+        }],
         ..Default::default()
     };
     let command_digest = serialize_and_upload_message(
@@ -1909,6 +1996,7 @@ exit 1
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn caches_results_in_action_cache_store() -> Result<(), Box<dyn std::error::Error>> {
     let (_, _, cas_store, ac_store) = setup_stores().await?;
@@ -1980,6 +2068,7 @@ async fn caches_results_in_action_cache_store() -> Result<(), Box<dyn std::error
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn failed_action_does_not_cache_in_action_cache() -> Result<(), Box<dyn std::error::Error>> {
     let (_, _, cas_store, ac_store) = setup_stores().await?;
@@ -2051,6 +2140,7 @@ async fn failed_action_does_not_cache_in_action_cache() -> Result<(), Box<dyn st
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn success_does_cache_in_historical_results() -> Result<(), Box<dyn std::error::Error>> {
     let (_, _, cas_store, ac_store) = setup_stores().await?;
@@ -2150,6 +2240,7 @@ async fn success_does_cache_in_historical_results() -> Result<(), Box<dyn std::e
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn failure_does_not_cache_in_historical_results() -> Result<(), Box<dyn std::error::Error>> {
     let (_, _, cas_store, ac_store) = setup_stores().await?;
@@ -2189,6 +2280,7 @@ async fn failure_does_not_cache_in_historical_results() -> Result<(), Box<dyn st
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn infra_failure_does_cache_in_historical_results() -> Result<(), Box<dyn std::error::Error>>
 {
@@ -2257,6 +2349,7 @@ async fn infra_failure_does_cache_in_historical_results() -> Result<(), Box<dyn
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn action_result_has_used_in_message() -> Result<(), Box<dyn std::error::Error>> {
     let (_, _, cas_store, ac_store) = setup_stores().await?;
@@ -2307,7 +2400,7 @@ async fn action_result_has_used_in_message() -> Result<(), Box<dyn std::error::E
     Ok(())
 }
 
-#[cfg_attr(feature = "nix", ignore)]
+#[serial]
 #[nativelink_test]
 async fn ensure_worker_timeout_chooses_correct_values() -> Result<(), Box<dyn std::error::Error>> {
     const WORKER_ID: &str = "foo_worker_id";
@@ -2336,6 +2429,10 @@ async fn ensure_worker_timeout_chooses_correct_values() -> Result<(), Box<dyn st
         arguments,
         output_paths: vec![],
         working_directory: ".".to_string(),
+        environment_variables: vec![EnvironmentVariable {
+            name: "PATH".to_string(),
+            value: std::env::var("PATH").unwrap(),
+        }],
         ..Default::default()
     };
     let command_digest = serialize_and_upload_message(
@@ -2598,6 +2695,7 @@ async fn ensure_worker_timeout_chooses_correct_values() -> Result<(), Box<dyn st
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn worker_times_out() -> Result<(), Box<dyn std::error::Error>> {
     const WORKER_ID: &str = "foo_worker_id";
@@ -2661,6 +2759,10 @@ async fn worker_times_out() -> Result<(), Box<dyn std::error::Error>> {
         arguments,
         output_paths: vec![],
         working_directory: ".".to_string(),
+        environment_variables: vec![EnvironmentVariable {
+            name: "PATH".to_string(),
+            value: std::env::var("PATH").unwrap(),
+        }],
         ..Default::default()
     };
     let command_digest = serialize_and_upload_message(
@@ -2729,7 +2831,7 @@ async fn worker_times_out() -> Result<(), Box<dyn std::error::Error>> {
     Ok(())
 }
 
-#[cfg_attr(feature = "nix", ignore)]
+#[serial]
 #[nativelink_test]
 async fn kill_all_waits_for_all_tasks_to_finish() -> Result<(), Box<dyn std::error::Error>> {
     const WORKER_ID: &str = "foo_worker_id";
@@ -2896,7 +2998,7 @@ async fn kill_all_waits_for_all_tasks_to_finish() -> Result<(), Box<dyn std::err
 
 /// Regression Test for Issue #675
 #[cfg(target_family = "unix")]
-#[cfg_attr(feature = "nix", ignore)]
+#[serial]
 #[nativelink_test]
 async fn unix_executable_file_test() -> Result<(), Box<dyn std::error::Error>> {
     const WORKER_ID: &str = "foo_worker_id";
@@ -2999,6 +3101,7 @@ async fn unix_executable_file_test() -> Result<(), Box<dyn std::error::Error>> {
     Ok(())
 }
 
+#[serial]
 #[nativelink_test]
 async fn action_directory_contents_are_cleaned() -> Result<(), Box<dyn std::error::Error>> {
     const WORKER_ID: &str = "foo_worker_id";
@@ -3096,11 +3199,13 @@ async fn action_directory_contents_are_cleaned() -> Result<(), Box<dyn std::erro
 
 // We've experienced deadlocks when uploading, so make only a single permit available and
 // check it's able to handle uploading some directories with some files in.
-// Be default this test is ignored because it *must* be run single threaded... to run this
-// test execute:
-// cargo test -p nativelink-worker --test running_actions_manager_test -- --test-threads=1 --ignored
+// Note: If this test is failing or timing out, check that other tests in this file
+// are also `#[serial]`.
+// TODO(allada) This is unix only only because I was lazy and didn't spend the time to
+// build the bash-like commands in windows as well.
+#[serial]
 #[nativelink_test]
-#[ignore]
+#[cfg(target_family = "unix")]
 async fn upload_with_single_permit() -> Result<(), Box<dyn std::error::Error>> {
     const WORKER_ID: &str = "foo_worker_id";
 
@@ -3141,26 +3246,21 @@ async fn upload_with_single_permit() -> Result<(), Box<dyn std::error::Error>> {
         },
     )?);
     let action_result = {
-        #[cfg(target_family = "unix")]
-            let arguments = vec![
-                "sh".to_string(),
-                "-c".to_string(),
-                "printf '123 ' > ./test.txt; mkdir ./tst; printf '456 ' > ./tst/tst.txt; printf 'foo-stdout '; >&2 printf 'bar-stderr  '"
-                    .to_string(),
-            ];
-        #[cfg(target_family = "windows")]
-            let arguments = vec![
-                "cmd".to_string(),
-                "/C".to_string(),
-                // Note: Windows adds two spaces after 'set /p=XXX'.
-                "echo | set /p=123> ./test.txt & mkdir ./tst & echo | set /p=456> ./tst/tst.txt & echo | set /p=foo-stdout & echo | set /p=bar-stderr 1>&2 & exit 0"
-                    .to_string(),
-            ];
+        let arguments = vec![
+            "sh".to_string(),
+            "-c".to_string(),
+            "printf '123 ' > ./test.txt; mkdir ./tst; printf '456 ' > ./tst/tst.txt; printf 'foo-stdout '; >&2 printf 'bar-stderr  '"
+                .to_string(),
+        ];
         let working_directory = "some_cwd";
         let command = Command {
             arguments,
             output_paths: vec!["test.txt".to_string(), "tst".to_string()],
             working_directory: working_directory.to_string(),
+            environment_variables: vec![EnvironmentVariable {
+                name: "PATH".to_string(),
+                value: std::env::var("PATH").unwrap(),
+            }],
             ..Default::default()
         };
         let command_digest = serialize_and_upload_message(
@@ -3287,7 +3387,7 @@ async fn upload_with_single_permit() -> Result<(), Box<dyn std::error::Error>> {
     Ok(())
 }
 
-#[cfg_attr(feature = "nix", ignore)]
+#[serial]
 #[nativelink_test]
 async fn running_actions_manager_respects_action_timeout() -> Result<(), Box<dyn std::error::Error>>
 {
diff --git a/src/bin/nativelink.rs b/src/bin/nativelink.rs
index f3d1dc931..e3d3dea6d 100644
--- a/src/bin/nativelink.rs
+++ b/src/bin/nativelink.rs
@@ -48,7 +48,7 @@ use nativelink_service::health_server::HealthServer;
 use nativelink_service::worker_api_server::WorkerApiServer;
 use nativelink_store::default_store_factory::store_factory;
 use nativelink_store::store_manager::StoreManager;
-use nativelink_util::common::fs::{set_idle_file_descriptor_timeout, set_open_file_limit};
+use nativelink_util::common::fs::set_open_file_limit;
 use nativelink_util::digest_hasher::{set_default_digest_hasher_func, DigestHasherFunc};
 use nativelink_util::health_utils::HealthRegistryBuilder;
 use nativelink_util::metrics_utils::{set_metrics_enabled_for_this_thread, Counter};
@@ -61,7 +61,7 @@ use nativelink_util::store_trait::{
     set_default_digest_size_health_check, DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG,
 };
 use nativelink_util::task::TaskExecutor;
-use nativelink_util::{background_spawn, init_tracing, spawn, spawn_blocking};
+use nativelink_util::{background_spawn, fs, init_tracing, spawn, spawn_blocking};
 use nativelink_worker::local_worker::new_local_worker;
 use opentelemetry::metrics::MeterProvider;
 use opentelemetry_sdk::metrics::SdkMeterProvider;
@@ -1009,20 +1009,10 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
 
     let mut cfg = futures::executor::block_on(get_config())?;
 
-    let (mut metrics_enabled, max_blocking_threads) = {
-        // Note: If the default changes make sure you update the documentation in
-        // `config/cas_server.rs`.
-        const DEFAULT_MAX_OPEN_FILES: usize = 512;
-        // Note: If the default changes make sure you update the documentation in
-        // `config/cas_server.rs`.
-        const DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS: u64 = 1000;
+    let mut metrics_enabled = {
         let global_cfg = if let Some(global_cfg) = &mut cfg.global {
             if global_cfg.max_open_files == 0 {
-                global_cfg.max_open_files = DEFAULT_MAX_OPEN_FILES;
-            }
-            if global_cfg.idle_file_descriptor_timeout_millis == 0 {
-                global_cfg.idle_file_descriptor_timeout_millis =
-                    DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS;
+                global_cfg.max_open_files = fs::DEFAULT_OPEN_FILE_PERMITS;
             }
             if global_cfg.default_digest_size_health_check == 0 {
                 global_cfg.default_digest_size_health_check = DEFAULT_DIGEST_SIZE_HEALTH_CHECK_CFG;
@@ -1031,8 +1021,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
             *global_cfg
         } else {
             GlobalConfig {
-                max_open_files: DEFAULT_MAX_OPEN_FILES,
-                idle_file_descriptor_timeout_millis: DEFAULT_IDLE_FILE_DESCRIPTOR_TIMEOUT_MILLIS,
+                max_open_files: fs::DEFAULT_OPEN_FILE_PERMITS,
                 disable_metrics: cfg.servers.iter().all(|v| {
                     let Some(service) = &v.services else {
                         return true;
@@ -1044,17 +1033,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
             }
         };
         set_open_file_limit(global_cfg.max_open_files);
-        set_idle_file_descriptor_timeout(Duration::from_millis(
-            global_cfg.idle_file_descriptor_timeout_millis,
-        ))?;
         set_default_digest_hasher_func(DigestHasherFunc::from(
             global_cfg
                 .default_digest_hash_function
                 .unwrap_or(ConfigDigestHashFunction::sha256),
         ))?;
         set_default_digest_size_health_check(global_cfg.default_digest_size_health_check)?;
-        // TODO (#513): prevent deadlocks by assigning max blocking threads number of open files * ten
-        (!global_cfg.disable_metrics, global_cfg.max_open_files * 10)
+        !global_cfg.disable_metrics
     };
     // Override metrics enabled if the environment variable is set.
     if std::env::var(METRICS_DISABLE_ENV).is_ok() {
@@ -1067,7 +1052,6 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
     #[allow(clippy::disallowed_methods)]
     {
         let runtime = tokio::runtime::Builder::new_multi_thread()
-            .max_blocking_threads(max_blocking_threads)
             .enable_all()
             .on_thread_start(move || set_metrics_enabled_for_this_thread(metrics_enabled))
             .build()?;