Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Completeness Checking Store #404

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions native-link-config/src/stores.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ pub enum StoreConfig {
/// hash and size and the AC validate nothing.
verify(Box<VerifyStore>),

/// Completeness checking store verifies if the
/// output files & folders exist in the CAS before forwarding
/// the request to the underlying store.
/// Note: This store should only be used on AC stores.
completeness_checking(Box<CompletenessCheckingStore>),

/// A compression store that will compress the data inbound and
/// outbound. There will be a non-trivial cost to compress and
/// decompress the data, but in many cases if the final store is
Expand Down Expand Up @@ -331,6 +337,16 @@ pub struct VerifyStore {
pub verify_hash: bool,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CompletenessCheckingStore {
/// The underlying store that will have it's results validated before sending to client.
pub backend: StoreConfig,

/// When a request is made, the results are decoded and all output digests/files are verified
/// to exist in this CAS store before returning success.
pub cas_store: StoreConfig,
}

#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone, Copy)]
pub struct Lz4Config {
/// Size of the blocks to compress.
Expand Down
3 changes: 3 additions & 0 deletions native-link-store/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ rust_library(
name = "native-link-store",
srcs = [
"src/ac_utils.rs",
"src/completeness_checking_store.rs",
"src/compression_store.rs",
"src/dedup_store.rs",
"src/default_store_factory.rs",
Expand Down Expand Up @@ -69,6 +70,7 @@ rust_test_suite(
name = "integration",
srcs = [
"tests/ac_utils_test.rs",
"tests/completeness_checking_store_test.rs",
"tests/compression_store_test.rs",
"tests/dedup_store_test.rs",
"tests/existence_store_test.rs",
Expand All @@ -89,6 +91,7 @@ rust_test_suite(
"//error",
"//native-link-config",
"//native-link-util",
"//proto",
"@crate_index//:async-lock",
"@crate_index//:aws-sdk-s3",
"@crate_index//:aws-smithy-runtime",
Expand Down
15 changes: 13 additions & 2 deletions native-link-store/src/ac_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::pin::Pin;
use bytes::{Bytes, BytesMut};
use error::{Code, Error, ResultExt};
use futures::future::join;
use futures::{Future, FutureExt};
use futures::{Future, FutureExt, TryFutureExt};
use native_link_util::buf_channel::{make_buf_channel_pair, DropCloserWriteHalf};
use native_link_util::common::{fs, DigestInfo};
use native_link_util::digest_hasher::DigestHasher;
Expand All @@ -48,6 +48,14 @@ pub async fn get_and_decode_digest<T: Message + Default>(
store: Pin<&dyn Store>,
digest: &DigestInfo,
) -> Result<T, Error> {
get_size_and_decode_digest(store, digest).map_ok(|(v, _)| v).await
}

/// Attempts to fetch the digest contents from a store into the associated proto.
pub async fn get_size_and_decode_digest<T: Message + Default>(
store: Pin<&dyn Store>,
digest: &DigestInfo,
) -> Result<(T, usize), Error> {
let mut store_data_resp = store
.get_part_unchunked(*digest, 0, Some(MAX_ACTION_MSG_SIZE), Some(ESTIMATED_DIGEST_SIZE))
.await;
Expand All @@ -60,8 +68,11 @@ pub async fn get_and_decode_digest<T: Message + Default>(
}
}
let store_data = store_data_resp?;
let store_data_len = store_data.len();

T::decode(store_data).err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))
T::decode(store_data)
.err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))
.map(|v| (v, store_data_len))
}

/// Computes the digest of a message.
Expand Down
Loading