Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove atime references to FilesystemStore #1584

Merged
merged 1 commit into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions nativelink-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,9 +318,7 @@ pub enum StoreSpec {
/// Stores the data on the filesystem. This store is designed for
/// local persistent storage. Restarts of this program should restore
/// the previous state, meaning anything uploaded will be persistent
/// as long as the filesystem integrity holds. This store uses the
/// filesystem's `atime` (access time) to hold the last touched time
/// of the file(s).
/// as long as the filesystem integrity holds.
///
/// **Example JSON Config:**
/// ```json
Expand Down
1 change: 0 additions & 1 deletion nativelink-store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ rust_library(
"@crates//:bytes",
"@crates//:bytes-utils",
"@crates//:const_format",
"@crates//:filetime",
"@crates//:fred",
"@crates//:futures",
"@crates//:hex",
Expand Down
1 change: 0 additions & 1 deletion nativelink-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ byteorder = { version = "1.5.0", default-features = false }
bytes = { version = "1.9.0", default-features = false }
bytes-utils = { version = "0.1.4", default-features = false }
const_format = { version = "0.2.34", default-features = false }
filetime = "0.2.25"
fred = { version = "10.0.3", default-features = false, features = [
"i-std",
"i-scripts",
Expand Down
65 changes: 21 additions & 44 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use std::time::SystemTime;
use async_lock::RwLock;
use async_trait::async_trait;
use bytes::BytesMut;
use filetime::{set_file_atime, FileTime};
use futures::stream::{StreamExt, TryStreamExt};
use futures::{Future, TryFutureExt};
use nativelink_config::stores::FilesystemSpec;
use nativelink_error::{make_err, make_input_err, Code, Error, ResultExt};
use nativelink_metric::MetricsComponent;
use nativelink_util::background_spawn;
use nativelink_util::buf_channel::{
make_buf_channel_pair, DropCloserReadHalf, DropCloserWriteHalf,
};
Expand All @@ -38,7 +38,6 @@ use nativelink_util::health_utils::{HealthRegistryBuilder, HealthStatus, HealthS
use nativelink_util::store_trait::{
StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
};
use nativelink_util::{background_spawn, spawn_blocking};
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
use tokio_stream::wrappers::ReadDirStream;
use tracing::{event, Level};
Expand Down Expand Up @@ -338,33 +337,6 @@ impl LenEntry for FileEntryImpl {
self.data_size == 0
}

#[inline]
async fn touch(&self) -> bool {
let result = self
.get_file_path_locked(move |full_content_path| async move {
let full_content_path = full_content_path.clone();
spawn_blocking!("filesystem_touch_set_mtime", move || {
set_file_atime(&full_content_path, FileTime::now()).err_tip(|| {
format!("Failed to touch file in filesystem store {full_content_path:?}")
})
})
.await
.map_err(|e| {
make_err!(
Code::Internal,
"Failed to change atime of file due to spawn failing {:?}",
e
)
})?
})
.await;
if let Err(err) = result {
event!(Level::ERROR, ?err, "Failed to touch file",);
return false;
}
true
}

// unref() only triggers when an item is removed from the eviction_map. It is possible
// that another place in code has a reference to `FileEntryImpl` and may later read the
// file. To support this edge case, we first move the file to a temp file and point
Expand Down Expand Up @@ -499,20 +471,13 @@ async fn add_files_to_cache<Fe: FileEntry>(
// We need to filter out folders - we do not want to try to cache the s and d folders.
let is_file =
metadata.is_file() || !(file_name == STR_FOLDER || file_name == DIGEST_FOLDER);
let atime = match metadata.accessed() {
Ok(atime) => atime,
Err(err) => {
panic!(
"{}{}{} : {} {:?}",
"It appears this filesystem does not support access time. ",
"Please configure this program to run on a drive that supports ",
"atime",
file_name,
err
);
}
};

// Using access time is not perfect, but better than random. We do not update the
// atime when a file is actually "touched", we rely on whatever the filesystem does
// when we read the file (usually update on read).
let atime = metadata
.accessed()
.or_else(|_| metadata.modified())
.unwrap_or(SystemTime::UNIX_EPOCH);
Result::<(String, SystemTime, u64, bool), Error>::Ok((
file_name,
atime,
Expand Down Expand Up @@ -966,7 +931,19 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
)
})?;
let read_limit = length.unwrap_or(u64::MAX);
let mut temp_file = entry.read_file_part(offset, read_limit).await?;
let mut temp_file = entry.read_file_part(offset, read_limit).or_else(|err| async move {
// If the file is not found, we need to remove it from the eviction map.
if err.code == Code::NotFound {
event!(
Level::ERROR,
?err,
?key,
"Entry was in our map, but not found on disk. Removing from map as a precaution, but process probably need restarted."
);
self.evicting_map.remove(&key).await;
}
Err(err)
}).await?;

loop {
let mut buf = BytesMut::with_capacity(self.read_buffer_size);
Expand Down
113 changes: 5 additions & 108 deletions nativelink-store/tests/filesystem_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ use std::marker::PhantomData;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, LazyLock};
use std::time::{Duration, SystemTime};
use std::time::Duration;

use async_lock::RwLock;
use bytes::Bytes;
use filetime::{set_file_atime, FileTime};
use futures::executor::block_on;
use futures::task::Poll;
use futures::{poll, Future, FutureExt};
Expand Down Expand Up @@ -142,10 +141,6 @@ impl<Hooks: FileEntryHooks + 'static + Sync + Send> LenEntry for TestFileEntry<H
self.inner.as_ref().unwrap().is_empty()
}

async fn touch(&self) -> bool {
self.inner.as_ref().unwrap().touch().await
}

async fn unref(&self) {
Hooks::on_unref(self);
self.inner.as_ref().unwrap().unref().await;
Expand Down Expand Up @@ -570,98 +565,6 @@ async fn file_gets_cleans_up_on_cache_eviction() -> Result<(), Error> {
check_temp_empty(&temp_path).await
}

#[nativelink_test]
async fn atime_updates_on_get_part_test() -> Result<(), Error> {
let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?;

let store = Box::pin(
FilesystemStore::<FileEntryImpl>::new(&FilesystemSpec {
content_path: make_temp_path("content_path"),
temp_path: make_temp_path("temp_path"),
eviction_policy: None,
..Default::default()
})
.await?,
);
// Insert data into store.
store.update_oneshot(digest1, VALUE1.into()).await?;

let file_entry = store.get_file_entry_for_digest(&digest1).await?;
file_entry
.get_file_path_locked(move |path| async move {
// Set atime to along time ago.
set_file_atime(&path, FileTime::from_system_time(SystemTime::UNIX_EPOCH))?;

// Check to ensure it was set to zero from previous command.
assert_eq!(
fs::metadata(&path).await?.accessed()?,
SystemTime::UNIX_EPOCH
);
Ok(())
})
.await?;

// Now touch digest1.
let data = store.get_part_unchunked(digest1, 0, None).await?;
assert_eq!(data, VALUE1.as_bytes());

file_entry
.get_file_path_locked(move |path| async move {
// Ensure it was updated.
assert!(fs::metadata(&path).await?.accessed()? > SystemTime::UNIX_EPOCH);
Ok(())
})
.await?;

Ok(())
}

#[nativelink_test]
async fn eviction_drops_file_test() -> Result<(), Error> {
let digest1 = DigestInfo::try_new(HASH1, VALUE1.len())?;

let store = Box::pin(
FilesystemStore::<FileEntryImpl>::new(&FilesystemSpec {
content_path: make_temp_path("content_path"),
temp_path: make_temp_path("temp_path"),
eviction_policy: None,
..Default::default()
})
.await?,
);
// Insert data into store.
store.update_oneshot(digest1, VALUE1.into()).await?;

let file_entry = store.get_file_entry_for_digest(&digest1).await?;
file_entry
.get_file_path_locked(move |path| async move {
// Set atime to along time ago.
set_file_atime(&path, FileTime::from_system_time(SystemTime::UNIX_EPOCH))?;

// Check to ensure it was set to zero from previous command.
assert_eq!(
fs::metadata(&path).await?.accessed()?,
SystemTime::UNIX_EPOCH
);
Ok(())
})
.await?;

// Now touch digest1.
let data = store.get_part_unchunked(digest1, 0, None).await?;
assert_eq!(data, VALUE1.as_bytes());

file_entry
.get_file_path_locked(move |path| async move {
// Ensure it was updated.
assert!(fs::metadata(&path).await?.accessed()? > SystemTime::UNIX_EPOCH);
Ok(())
})
.await?;

Ok(())
}

// Test to ensure that if we are holding a reference to `FileEntry` and the contents are
// replaced, the `FileEntry` continues to use the old data.
// `FileEntry` file contents should be immutable for the lifetime of the object.
Expand Down Expand Up @@ -1150,11 +1053,8 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> {
let stored_file_path = OsString::from(format!("{content_path}/{DIGEST_FOLDER}/{digest}"));
std::fs::remove_file(stored_file_path)?;

let digest_result = store
.has(digest)
.await
.err_tip(|| "Failed to execute has")?;
assert!(digest_result.is_none());
let get_part_res = store.get_part_unchunked(digest, 0, None).await;
assert_eq!(get_part_res.unwrap_err().code, Code::NotFound);

// Repeat with a string typed key.

Expand All @@ -1168,11 +1068,8 @@ async fn deleted_file_removed_from_store() -> Result<(), Error> {
let stored_file_path = OsString::from(format!("{content_path}/{STR_FOLDER}/{STRING_NAME}"));
std::fs::remove_file(stored_file_path)?;

let string_result = store
.has(string_key)
.await
.err_tip(|| "Failed to execute has")?;
assert!(string_result.is_none());
let string_digest_get_part_res = store.get_part_unchunked(string_key, 0, None).await;
assert_eq!(string_digest_get_part_res.unwrap_err().code, Code::NotFound);

Ok(())
}
Expand Down
45 changes: 10 additions & 35 deletions nativelink-util/src/evicting_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@ pub trait LenEntry: 'static {
/// Returns `true` if `self` has zero length.
fn is_empty(&self) -> bool;

/// Called when an entry is touched. On failure, will remove the entry
/// from the map.
#[inline]
fn touch(&self) -> impl Future<Output = bool> + Send {
std::future::ready(true)
}

/// This will be called when object is removed from map.
/// Note: There may still be a reference to it held somewhere else, which
/// is why it can't be mutable. This is a good place to mark the item
Expand Down Expand Up @@ -86,11 +79,6 @@ impl<T: LenEntry + Send + Sync> LenEntry for Arc<T> {
T::is_empty(self.as_ref())
}

#[inline]
async fn touch(&self) -> bool {
self.as_ref().touch().await
}

#[inline]
async fn unref(&self) {
self.as_ref().unref().await;
Expand Down Expand Up @@ -347,27 +335,21 @@ where
};
match maybe_entry {
Some(entry) => {
// Since we are not inserting anythign we don't need to evict based
// on the size of the store.
// Note: We need to check eviction because the item might be expired
// based on the current time. In such case, we remove the item while
// we are here.
let should_evict = self.should_evict(lru_len, entry, 0, u64::MAX);
if !should_evict && peek {
*result = Some(entry.data.len());
} else if !should_evict && entry.data.touch().await {
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
*result = Some(entry.data.len());
} else {
if self.should_evict(lru_len, entry, 0, u64::MAX) {
*result = None;
if let Some((key, eviction_item)) = state.lru.pop_entry(key.borrow()) {
if should_evict {
event!(Level::INFO, ?key, "Item expired, evicting");
} else {
event!(Level::INFO, ?key, "Touch failed, evicting");
}
event!(Level::INFO, ?key, "Item expired, evicting");
state.remove(key.borrow(), &eviction_item, false).await;
}
} else {
if !peek {
entry.seconds_since_anchor =
self.anchor_time.elapsed().as_secs() as i32;
}
*result = Some(entry.data.len());
}
}
None => *result = None,
Expand All @@ -385,15 +367,8 @@ where

let entry = state.lru.get_mut(key.borrow())?;

if entry.data.touch().await {
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
return Some(entry.data.clone());
}

let (key, eviction_item) = state.lru.pop_entry(key.borrow())?;
event!(Level::INFO, ?key, "Touch failed, evicting");
state.remove(key.borrow(), &eviction_item, false).await;
None
entry.seconds_since_anchor = self.anchor_time.elapsed().as_secs() as i32;
Some(entry.data.clone())
}

/// Returns the replaced item if any.
Expand Down
5 changes: 0 additions & 5 deletions nativelink-util/tests/evicting_map_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,6 @@ async fn unref_called_on_replace() -> Result<(), Error> {
unreachable!("We are not testing this functionality");
}

async fn touch(&self) -> bool {
// Do nothing. We are not testing this functionality.
true
}

async fn unref(&self) {
self.unref_called.store(true, Ordering::Relaxed);
}
Expand Down
Loading