Skip to content

Commit 7a6e0bf

Browse files
committed
Added completeness checking store
1 parent 22abf90 commit 7a6e0bf

File tree

12 files changed

+1913
-94
lines changed

12 files changed

+1913
-94
lines changed

Cargo.lock

+1,375-92
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ members = [
2929
"gencargo/cas_server",
3030
"gencargo/cas_server_test",
3131
"gencargo/common",
32+
"gencargo/completeness_checking_store",
33+
"gencargo/completeness_checking_store_test",
3234
"gencargo/compression_store",
3335
"gencargo/compression_store_test",
3436
"gencargo/config",
@@ -175,6 +177,8 @@ cas = { path = "gencargo/cas" }
175177
cas_server = { path = "gencargo/cas_server" }
176178
cas_server_test = { path = "gencargo/cas_server_test" }
177179
common = { path = "gencargo/common" }
180+
completeness_checking_store = { path = "gencargo/completeness_checking_store" }
181+
completeness_checking_store_test = { path = "gencargo/completeness_checking_store_test" }
178182
compression_store = { path = "gencargo/compression_store" }
179183
compression_store_test = { path = "gencargo/compression_store_test" }
180184
config = { path = "gencargo/config" }

cas/scheduler/action_messages.rs

+47
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,53 @@ impl From<ActionResult> for ProtoActionResult {
748748
}
749749
}
750750

751+
impl From<ProtoActionResult> for ActionResult {
752+
fn from(val: ProtoActionResult) -> Self {
753+
let output_file_symlinks = val
754+
.output_file_symlinks
755+
.into_iter()
756+
.map(|output_symlink| SymlinkInfo::try_from(output_symlink).unwrap())
757+
.collect::<Vec<_>>();
758+
759+
let output_directory_symlinks = val
760+
.output_directory_symlinks
761+
.into_iter()
762+
.map(|output_symlink| SymlinkInfo::try_from(output_symlink).unwrap())
763+
.collect::<Vec<_>>();
764+
765+
Self {
766+
output_files: val
767+
.output_files
768+
.into_iter()
769+
.map(|output_file| output_file.try_into().unwrap())
770+
.collect(),
771+
output_folders: val
772+
.output_directories
773+
.into_iter()
774+
.map(|output_directory| output_directory.try_into().unwrap())
775+
.collect(),
776+
output_file_symlinks,
777+
output_directory_symlinks,
778+
exit_code: val.exit_code,
779+
stdout_digest: val
780+
.stdout_digest
781+
.map(|digest| digest.try_into().unwrap_or(DigestInfo::empty_digest()))
782+
.unwrap_or(DigestInfo::empty_digest()),
783+
stderr_digest: val
784+
.stderr_digest
785+
.map(|digest| digest.try_into().unwrap_or(DigestInfo::empty_digest()))
786+
.unwrap_or_else(DigestInfo::empty_digest),
787+
execution_metadata: val
788+
.execution_metadata
789+
.map(|metadata| metadata.try_into().unwrap_or(ExecutionMetadata::default()))
790+
.unwrap_or_default(),
791+
server_logs: Default::default(),
792+
error: None,
793+
message: String::new(),
794+
}
795+
}
796+
}
797+
751798
impl TryFrom<ExecuteResponse> for ActionStage {
752799
type Error = Error;
753800

cas/store/BUILD

+49-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ rust_library(
3535
":store",
3636
":traits",
3737
":verify_store",
38+
":completeness_checking_store",
3839
"//config",
3940
"//util:error",
4041
"//util:metrics_utils",
@@ -125,18 +126,44 @@ rust_library(
125126
],
126127
)
127128

129+
rust_library(
130+
name = "completeness_checking_store",
131+
srcs = ["completeness_checking_store.rs"],
132+
proc_macro_deps = ["@crate_index//:async-trait"],
133+
visibility = ["//cas:__pkg__"],
134+
deps = [
135+
":traits",
136+
":ac_utils",
137+
"//proto",
138+
"//config",
139+
"//util:buf_channel",
140+
"//util:common",
141+
"//util:error",
142+
"//util:metrics_utils",
143+
"//cas/scheduler:action_messages",
144+
"@crate_index//:futures",
145+
"@crate_index//:hashbrown",
146+
"@crate_index//:hex",
147+
"@crate_index//:sha2",
148+
"@crate_index//:tokio",
149+
],
150+
)
151+
128152
rust_library(
129153
name = "verify_store",
130154
srcs = ["verify_store.rs"],
131155
proc_macro_deps = ["@crate_index//:async-trait"],
132156
visibility = ["//cas:__pkg__"],
133157
deps = [
134158
":traits",
159+
":ac_utils",
160+
"//proto",
135161
"//config",
136162
"//util:buf_channel",
137163
"//util:common",
138164
"//util:error",
139165
"//util:metrics_utils",
166+
"@crate_index//:hashbrown",
140167
"@crate_index//:hex",
141168
"@crate_index//:sha2",
142169
"@crate_index//:tokio",
@@ -388,6 +415,27 @@ rust_test(
388415
],
389416
)
390417

418+
rust_test(
419+
name = "completeness_checking_store_test",
420+
srcs = ["tests/completeness_checking_store_test.rs"],
421+
deps = [
422+
":memory_store",
423+
":traits",
424+
":completeness_checking_store",
425+
":store",
426+
"//proto",
427+
"//config",
428+
"//util:buf_channel",
429+
"//util:common",
430+
"//util:error",
431+
"//cas/scheduler:action_messages",
432+
"@crate_index//:prost",
433+
"@crate_index//:futures",
434+
"@crate_index//:pretty_assertions",
435+
"@crate_index//:tokio",
436+
],
437+
)
438+
391439
rust_test(
392440
name = "verify_store_test",
393441
srcs = ["tests/verify_store_test.rs"],
@@ -498,4 +546,4 @@ rust_test(
498546
"@crate_index//:rand",
499547
"@crate_index//:tokio",
500548
],
501-
)
549+
)
+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Copyright 2023 The Turbo Cache Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::pin::Pin;
16+
use std::sync::Arc;
17+
18+
use ac_utils::get_and_decode_digest;
19+
use action_messages::ActionResult;
20+
use async_trait::async_trait;
21+
use futures::stream::{self, StreamExt, TryStreamExt};
22+
use proto::build::bazel::remote::execution::v2::{ActionResult as ProtoActionResult, Tree};
23+
24+
use buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
25+
use common::DigestInfo;
26+
use error::{Error, ResultExt};
27+
use traits::{StoreTrait, UploadSizeInfo};
28+
29+
pub struct CompletenessCheckingStore {
30+
cas_store: Arc<dyn StoreTrait>,
31+
}
32+
33+
impl CompletenessCheckingStore {
34+
pub fn new(_ac_store: Arc<dyn StoreTrait>, cas_store: Arc<dyn StoreTrait>) -> Self {
35+
CompletenessCheckingStore { cas_store }
36+
}
37+
38+
fn pin_cas(&self) -> Pin<&dyn StoreTrait> {
39+
Pin::new(self.cas_store.as_ref())
40+
}
41+
}
42+
43+
#[async_trait]
44+
impl StoreTrait for CompletenessCheckingStore {
45+
async fn has_with_results(
46+
self: Pin<&Self>,
47+
digests: &[DigestInfo],
48+
_results: &mut [Option<usize>],
49+
) -> Result<(), Error> {
50+
// The proto promises that all results will exist in the CAS when
51+
// requested using the associated actions. However we currently allow
52+
// stores to prune themselves which violates this condition, because the
53+
// action cache and CAS are different. Therefore we need this completeness checking
54+
// store to check that all results exist in the CAS before we allow the the action result
55+
// to be returned to the client.
56+
// * Take the root digest which is the serialzied action proto hash
57+
// * Deserialize the action proto
58+
// * Check files in the root tree exist in the CAS but there's directories that needs
59+
// to be traversed as the directories can have directories and files in them and esure all
60+
// of them exist in the CAS.
61+
62+
for digest in digests {
63+
let action_result: ActionResult = get_and_decode_digest::<ProtoActionResult>(self.pin_cas(), digest)
64+
.await?
65+
.try_into()
66+
.err_tip(|| "Action result could not be converted in completeness checking store has")?;
67+
68+
// Something that takes a stream of iterators and flattens it into a stream of items
69+
// 'temp' is a stream of digests obtained from the output folders of the action result
70+
let _temp =
71+
stream::iter(action_result.output_folders)
72+
.then(|directory| async move {
73+
get_and_decode_digest::<Tree>(self.pin_cas(), &directory.tree_digest).await
74+
})
75+
.map_ok(|tree| async {
76+
let trees =
77+
tree.children
78+
.into_iter()
79+
.chain(tree.root.into_iter())
80+
.flat_map(|child_directory| {
81+
child_directory.files.into_iter().filter_map(|file| file.digest)
82+
});
83+
84+
let item_stream = stream::iter(trees);
85+
item_stream
86+
.map(|file_digest| async move {
87+
let digest_info =
88+
DigestInfo::try_new(&file_digest.hash, file_digest.size_bytes as usize)?;
89+
self.pin_cas().has(digest_info).await
90+
})
91+
.buffer_unordered(10)
92+
.try_collect::<Vec<_>>()
93+
.await
94+
});
95+
}
96+
97+
Ok(())
98+
}
99+
100+
async fn update(
101+
self: Pin<&Self>,
102+
digest: DigestInfo,
103+
reader: DropCloserReadHalf,
104+
size_info: UploadSizeInfo,
105+
) -> Result<(), Error> {
106+
self.pin_cas().update(digest, reader, size_info).await
107+
}
108+
109+
async fn get_part_ref(
110+
self: Pin<&Self>,
111+
digest: DigestInfo,
112+
writer: &mut DropCloserWriteHalf,
113+
offset: usize,
114+
length: Option<usize>,
115+
) -> Result<(), Error> {
116+
self.pin_cas().get_part_ref(digest, writer, offset, length).await
117+
}
118+
119+
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
120+
Box::new(self)
121+
}
122+
}

cas/store/default_store_factory.rs

+5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::Arc;
1818
use futures::stream::FuturesOrdered;
1919
use futures::{Future, TryStreamExt};
2020

21+
use completeness_checking_store::CompletenessCheckingStore;
2122
use compression_store::CompressionStore;
2223
use config::{self, stores::StoreConfig};
2324
use dedup_store::DedupStore;
@@ -45,6 +46,10 @@ pub fn store_factory<'a>(
4546
let store: Arc<dyn Store> = match backend {
4647
StoreConfig::memory(config) => Arc::new(MemoryStore::new(config)),
4748
StoreConfig::s3_store(config) => Arc::new(S3Store::new(config)?),
49+
StoreConfig::completeness_checking_store(config) => Arc::new(CompletenessCheckingStore::new(
50+
store_factory(&config.backend, store_manager, None).await?,
51+
store_factory(&config.backend, store_manager, None).await?,
52+
)),
4853
StoreConfig::verify(config) => Arc::new(VerifyStore::new(
4954
config,
5055
store_factory(&config.backend, store_manager, None).await?,

0 commit comments

Comments
 (0)