Skip to content

Commit d264624

Browse files
authored
Add Completeness Checking Store (#404)
This verifies all file and folder digests from action results are in the CAS.
1 parent 4eab282 commit d264624

File tree

8 files changed

+619
-3
lines changed

8 files changed

+619
-3
lines changed

native-link-config/src/stores.rs

+16
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ pub enum StoreConfig {
4444
/// hash and size and the AC validate nothing.
4545
verify(Box<VerifyStore>),
4646

47+
/// Completeness checking store verifies if the
48+
/// output files & folders exist in the CAS before forwarding
49+
/// the request to the underlying store.
50+
/// Note: This store should only be used on AC stores.
51+
completeness_checking(Box<CompletenessCheckingStore>),
52+
4753
/// A compression store that will compress the data inbound and
4854
/// outbound. There will be a non-trivial cost to compress and
4955
/// decompress the data, but in many cases if the final store is
@@ -331,6 +337,16 @@ pub struct VerifyStore {
331337
pub verify_hash: bool,
332338
}
333339

340+
#[derive(Serialize, Deserialize, Debug, Clone)]
341+
pub struct CompletenessCheckingStore {
342+
/// The underlying store that will have it's results validated before sending to client.
343+
pub backend: StoreConfig,
344+
345+
/// When a request is made, the results are decoded and all output digests/files are verified
346+
/// to exist in this CAS store before returning success.
347+
pub cas_store: StoreConfig,
348+
}
349+
334350
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone, Copy)]
335351
pub struct Lz4Config {
336352
/// Size of the blocks to compress.

native-link-store/BUILD.bazel

+3
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ rust_library(
1010
name = "native-link-store",
1111
srcs = [
1212
"src/ac_utils.rs",
13+
"src/completeness_checking_store.rs",
1314
"src/compression_store.rs",
1415
"src/dedup_store.rs",
1516
"src/default_store_factory.rs",
@@ -69,6 +70,7 @@ rust_test_suite(
6970
name = "integration",
7071
srcs = [
7172
"tests/ac_utils_test.rs",
73+
"tests/completeness_checking_store_test.rs",
7274
"tests/compression_store_test.rs",
7375
"tests/dedup_store_test.rs",
7476
"tests/existence_store_test.rs",
@@ -89,6 +91,7 @@ rust_test_suite(
8991
"//error",
9092
"//native-link-config",
9193
"//native-link-util",
94+
"//proto",
9295
"@crate_index//:async-lock",
9396
"@crate_index//:aws-sdk-s3",
9497
"@crate_index//:aws-smithy-runtime",

native-link-store/src/ac_utils.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::pin::Pin;
2323
use bytes::{Bytes, BytesMut};
2424
use error::{Code, Error, ResultExt};
2525
use futures::future::join;
26-
use futures::{Future, FutureExt};
26+
use futures::{Future, FutureExt, TryFutureExt};
2727
use native_link_util::buf_channel::{make_buf_channel_pair, DropCloserWriteHalf};
2828
use native_link_util::common::{fs, DigestInfo};
2929
use native_link_util::digest_hasher::DigestHasher;
@@ -48,6 +48,14 @@ pub async fn get_and_decode_digest<T: Message + Default>(
4848
store: Pin<&dyn Store>,
4949
digest: &DigestInfo,
5050
) -> Result<T, Error> {
51+
get_size_and_decode_digest(store, digest).map_ok(|(v, _)| v).await
52+
}
53+
54+
/// Attempts to fetch the digest contents from a store into the associated proto.
55+
pub async fn get_size_and_decode_digest<T: Message + Default>(
56+
store: Pin<&dyn Store>,
57+
digest: &DigestInfo,
58+
) -> Result<(T, usize), Error> {
5159
let mut store_data_resp = store
5260
.get_part_unchunked(*digest, 0, Some(MAX_ACTION_MSG_SIZE), Some(ESTIMATED_DIGEST_SIZE))
5361
.await;
@@ -60,8 +68,11 @@ pub async fn get_and_decode_digest<T: Message + Default>(
6068
}
6169
}
6270
let store_data = store_data_resp?;
71+
let store_data_len = store_data.len();
6372

64-
T::decode(store_data).err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))
73+
T::decode(store_data)
74+
.err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))
75+
.map(|v| (v, store_data_len))
6576
}
6677

6778
/// Computes the digest of a message.

0 commit comments

Comments
 (0)