Skip to content

Commit dcbd530

Browse files
committed
Add completeness checking store
1 parent db724c0 commit dcbd530

File tree

11 files changed

+674
-1313
lines changed

11 files changed

+674
-1313
lines changed

Cargo.lock

+70-1,310
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ members = [
2929
"gencargo/cas_server",
3030
"gencargo/cas_server_test",
3131
"gencargo/common",
32+
"gencargo/completeness_checking_store",
33+
"gencargo/completeness_checking_store_test",
3234
"gencargo/compression_store",
3335
"gencargo/compression_store_test",
3436
"gencargo/config",
@@ -177,6 +179,8 @@ cas = { path = "gencargo/cas" }
177179
cas_server = { path = "gencargo/cas_server" }
178180
cas_server_test = { path = "gencargo/cas_server_test" }
179181
common = { path = "gencargo/common" }
182+
completeness_checking_store = { path = "gencargo/completeness_checking_store" }
183+
completeness_checking_store_test = { path = "gencargo/completeness_checking_store_test" }
180184
compression_store = { path = "gencargo/compression_store" }
181185
compression_store_test = { path = "gencargo/compression_store_test" }
182186
config = { path = "gencargo/config" }

cas/scheduler/action_messages.rs

+47
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,53 @@ impl From<ActionResult> for ProtoActionResult {
748748
}
749749
}
750750

751+
impl From<ProtoActionResult> for ActionResult {
752+
fn from(val: ProtoActionResult) -> Self {
753+
let output_file_symlinks = val
754+
.output_file_symlinks
755+
.into_iter()
756+
.map(|output_symlink| SymlinkInfo::try_from(output_symlink).unwrap())
757+
.collect::<Vec<_>>();
758+
759+
let output_directory_symlinks = val
760+
.output_directory_symlinks
761+
.into_iter()
762+
.map(|output_symlink| SymlinkInfo::try_from(output_symlink).unwrap())
763+
.collect::<Vec<_>>();
764+
765+
Self {
766+
output_files: val
767+
.output_files
768+
.into_iter()
769+
.map(|output_file| output_file.try_into().unwrap())
770+
.collect(),
771+
output_folders: val
772+
.output_directories
773+
.into_iter()
774+
.map(|output_directory| output_directory.try_into().unwrap())
775+
.collect(),
776+
output_file_symlinks,
777+
output_directory_symlinks,
778+
exit_code: val.exit_code,
779+
stdout_digest: val
780+
.stdout_digest
781+
.map(|digest| digest.try_into().unwrap_or(DigestInfo::empty_digest()))
782+
.unwrap_or(DigestInfo::empty_digest()),
783+
stderr_digest: val
784+
.stderr_digest
785+
.map(|digest| digest.try_into().unwrap_or(DigestInfo::empty_digest()))
786+
.unwrap_or_else(DigestInfo::empty_digest),
787+
execution_metadata: val
788+
.execution_metadata
789+
.map(|metadata| metadata.try_into().unwrap_or(ExecutionMetadata::default()))
790+
.unwrap_or_default(),
791+
server_logs: Default::default(),
792+
error: None,
793+
message: String::new(),
794+
}
795+
}
796+
}
797+
751798
impl TryFrom<ExecuteResponse> for ActionStage {
752799
type Error = Error;
753800

cas/store/BUILD

+46-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ rust_library(
3131
":ref_store",
3232
":s3_store",
3333
":shard_store",
34+
":completeness_checking_store",
3435
":size_partitioning_store",
3536
":store",
3637
":traits",
@@ -126,6 +127,29 @@ rust_library(
126127
],
127128
)
128129

130+
rust_library(
131+
name = "completeness_checking_store",
132+
srcs = ["completeness_checking_store.rs"],
133+
proc_macro_deps = ["@crate_index//:async-trait"],
134+
visibility = ["//cas:__pkg__"],
135+
deps = [
136+
":traits",
137+
":ac_utils",
138+
"//proto",
139+
"//config",
140+
"//util:buf_channel",
141+
"//util:common",
142+
"//util:error",
143+
"//util:metrics_utils",
144+
"//cas/scheduler:action_messages",
145+
"@crate_index//:futures",
146+
"@crate_index//:hashbrown",
147+
"@crate_index//:hex",
148+
"@crate_index//:sha2",
149+
"@crate_index//:tokio",
150+
],
151+
)
152+
129153
rust_library(
130154
name = "verify_store",
131155
srcs = ["verify_store.rs"],
@@ -500,6 +524,27 @@ rust_test(
500524
],
501525
)
502526

527+
rust_test(
528+
name = "completeness_checking_store_test",
529+
srcs = ["tests/completeness_checking_store_test.rs"],
530+
deps = [
531+
":memory_store",
532+
":traits",
533+
":completeness_checking_store",
534+
":store",
535+
"//proto",
536+
"//config",
537+
"//util:buf_channel",
538+
"//util:common",
539+
"//util:error",
540+
"//cas/scheduler:action_messages",
541+
"@crate_index//:prost",
542+
"@crate_index//:futures",
543+
"@crate_index//:pretty_assertions",
544+
"@crate_index//:tokio",
545+
],
546+
)
547+
503548
rust_test(
504549
name = "filesystem_store_test",
505550
srcs = ["tests/filesystem_store_test.rs"],
@@ -537,4 +582,4 @@ rust_test(
537582
"@crate_index//:rand",
538583
"@crate_index//:tokio",
539584
],
540-
)
585+
)
+185
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Copyright 2023 The Turbo Cache Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::pin::Pin;
16+
use std::sync::Arc;
17+
18+
use ac_utils::get_and_decode_digest;
19+
use action_messages::ActionResult;
20+
use async_trait::async_trait;
21+
use futures::stream::FuturesUnordered;
22+
use futures::FutureExt;
23+
use futures::{
24+
future::BoxFuture,
25+
stream::{StreamExt, TryStreamExt},
26+
};
27+
use proto::build::bazel::remote::execution::v2::{ActionResult as ProtoActionResult, Directory as ProtoDirectory};
28+
29+
use buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
30+
use common::DigestInfo;
31+
use error::{
32+
//make_err, Code,
33+
Error,
34+
ResultExt,
35+
};
36+
use traits::{StoreTrait, UploadSizeInfo};
37+
38+
/// Aggressively check if the digests of files exist in the cas. This function
39+
/// will spawn unbounded number of futures check all of the files. The store itself
40+
/// should be rate limited if spawning too many requests at once is an issue.
41+
// Sadly we cannot use `async fn` here because the rust compiler cannot determine the auto traits
42+
// of the future. So we need to force this function to return a dynamic future instead.
43+
// see: https://github.com/rust-lang/rust/issues/78649
44+
pub fn check_directory_files_in_cas<'a>(
45+
cas_store: Pin<&'a dyn StoreTrait>,
46+
digest: &'a DigestInfo,
47+
current_directory: &'a str,
48+
) -> BoxFuture<'a, Result<(), Error>> {
49+
async move {
50+
let directory = get_and_decode_digest::<ProtoDirectory>(cas_store, digest)
51+
.await
52+
.err_tip(|| "Converting digest to Directory")?;
53+
let mut futures = FuturesUnordered::new();
54+
55+
for file in directory.files {
56+
let digest: DigestInfo = file
57+
.digest
58+
.err_tip(|| "Expected Digest to exist in Directory::file::digest")?
59+
.try_into()
60+
.err_tip(|| "In Directory::file::digest")?;
61+
// Maybe could be made more efficient
62+
futures.push(cas_store.has(digest).boxed());
63+
}
64+
65+
for directory in directory.directories {
66+
let digest: DigestInfo = directory
67+
.digest
68+
.err_tip(|| "Expected Digest to exist in Directory::directories::digest")?
69+
.try_into()
70+
.err_tip(|| "In Directory::file::digest")?;
71+
let new_directory_path = format!("{}/{}", current_directory, directory.name);
72+
futures.push(
73+
async move {
74+
check_directory_files_in_cas(cas_store, &digest, &new_directory_path)
75+
.await
76+
.err_tip(|| format!("in traverse_ : {new_directory_path}"))?;
77+
Ok(Some(1))
78+
}
79+
.boxed(),
80+
);
81+
}
82+
83+
while futures.try_next().await?.is_some() {}
84+
Ok(())
85+
}
86+
.boxed()
87+
}
88+
89+
pub struct CompletenessCheckingStore {
90+
cas_store: Arc<dyn StoreTrait>,
91+
}
92+
93+
impl CompletenessCheckingStore {
94+
pub fn new(_ac_store: Arc<dyn StoreTrait>, cas_store: Arc<dyn StoreTrait>) -> Self {
95+
CompletenessCheckingStore { cas_store }
96+
}
97+
98+
fn pin_cas(&self) -> Pin<&dyn StoreTrait> {
99+
Pin::new(self.cas_store.as_ref())
100+
}
101+
}
102+
103+
#[async_trait]
104+
impl StoreTrait for CompletenessCheckingStore {
105+
async fn has_with_results(
106+
self: Pin<&Self>,
107+
digests: &[DigestInfo],
108+
_results: &mut [Option<usize>],
109+
) -> Result<(), Error> {
110+
// The proto promises that all results will exist in the CAS when
111+
// requested using the associated actions. However we currently allow
112+
// stores to prune themselves which violates this condition, because the
113+
// action cache and CAS are different. Therefore we need this completeness checking
114+
// store to check that all results exist in the CAS before we allow the the action result
115+
// to be returned to the client.
116+
// * Take the root digest which is the serialzied action proto hash
117+
// * Deserialize the action proto
118+
// * Check files in the root tree exist in the CAS but there's directories that needs
119+
// to be traversed as the directories can have directories and files in them and esure all
120+
// of them exist in the CAS.
121+
122+
let pin_cas = self.pin_cas();
123+
let mut futures = FuturesUnordered::new();
124+
125+
for digest in digests {
126+
let action_result: ActionResult = get_and_decode_digest::<ProtoActionResult>(pin_cas, digest)
127+
.await?
128+
.try_into()
129+
.err_tip(|| "Action result could not be converted in completeness checking store")?;
130+
131+
for output_file in &action_result.output_files {
132+
let file = output_file.digest;
133+
let file_digest = DigestInfo::try_new(&file.hash_str(), file.size_bytes as usize)?;
134+
futures.push(
135+
async move {
136+
if let Err(e) = check_directory_files_in_cas(pin_cas, &file_digest, "").await {
137+
eprintln!("Error: {:?}", e);
138+
}
139+
}
140+
.boxed(),
141+
);
142+
}
143+
144+
for output_directory in action_result.output_folders {
145+
let path = output_directory.path;
146+
let tree_digest = output_directory.tree_digest;
147+
futures.push(
148+
async move {
149+
if let Err(e) = check_directory_files_in_cas(pin_cas, &tree_digest, &path).await {
150+
eprintln!("Error: {:?}", e);
151+
}
152+
}
153+
.boxed(),
154+
);
155+
}
156+
}
157+
158+
while (futures.next().await).is_some() {}
159+
160+
Ok(())
161+
}
162+
163+
async fn update(
164+
self: Pin<&Self>,
165+
digest: DigestInfo,
166+
reader: DropCloserReadHalf,
167+
size_info: UploadSizeInfo,
168+
) -> Result<(), Error> {
169+
self.pin_cas().update(digest, reader, size_info).await
170+
}
171+
172+
async fn get_part_ref(
173+
self: Pin<&Self>,
174+
digest: DigestInfo,
175+
writer: &mut DropCloserWriteHalf,
176+
offset: usize,
177+
length: Option<usize>,
178+
) -> Result<(), Error> {
179+
self.pin_cas().get_part_ref(digest, writer, offset, length).await
180+
}
181+
182+
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
183+
Box::new(self)
184+
}
185+
}

cas/store/default_store_factory.rs

+5
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use shard_store::ShardStore;
3434
use size_partitioning_store::SizePartitioningStore;
3535
use store::{Store, StoreManager};
3636
use verify_store::VerifyStore;
37+
use completeness_checking_store::CompletenessCheckingStore;
3738

3839
type FutureMaybeStore<'a> = Box<dyn Future<Output = Result<Arc<dyn Store>, Error>> + 'a>;
3940

@@ -46,6 +47,10 @@ pub fn store_factory<'a>(
4647
let store: Arc<dyn Store> = match backend {
4748
StoreConfig::memory(config) => Arc::new(MemoryStore::new(config)),
4849
StoreConfig::s3_store(config) => Arc::new(S3Store::new(config)?),
50+
StoreConfig::completeness_checking_store(config) => Arc::new(CompletenessCheckingStore::new(
51+
store_factory(&config.backend, store_manager, None).await?,
52+
store_factory(&config.backend, store_manager, None).await?,
53+
)),
4954
StoreConfig::verify(config) => Arc::new(VerifyStore::new(
5055
config,
5156
store_factory(&config.backend, store_manager, None).await?,

0 commit comments

Comments
 (0)