Skip to content

Commit 8540019

Browse files
committed
Add Completeness Checking Store
1 parent 22abf90 commit 8540019

35 files changed

+2344
-180
lines changed

Cargo.lock

+1,375-92
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+8
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ members = [
2929
"gencargo/cas_server",
3030
"gencargo/cas_server_test",
3131
"gencargo/common",
32+
"gencargo/completeness_checking_store",
33+
"gencargo/completeness_checking_store_test",
3234
"gencargo/compression_store",
3335
"gencargo/compression_store_test",
3436
"gencargo/config",
@@ -40,6 +42,8 @@ members = [
4042
"gencargo/evicting_map",
4143
"gencargo/evicting_map_test",
4244
"gencargo/execution_server",
45+
"gencargo/existence_store",
46+
"gencargo/existence_store_test",
4347
"gencargo/fast_slow_store",
4448
"gencargo/fast_slow_store_test",
4549
"gencargo/fastcdc",
@@ -175,6 +179,8 @@ cas = { path = "gencargo/cas" }
175179
cas_server = { path = "gencargo/cas_server" }
176180
cas_server_test = { path = "gencargo/cas_server_test" }
177181
common = { path = "gencargo/common" }
182+
completeness_checking_store = { path = "gencargo/completeness_checking_store" }
183+
completeness_checking_store_test = { path = "gencargo/completeness_checking_store_test" }
178184
compression_store = { path = "gencargo/compression_store" }
179185
compression_store_test = { path = "gencargo/compression_store_test" }
180186
config = { path = "gencargo/config" }
@@ -186,6 +192,8 @@ error = { path = "gencargo/error" }
186192
evicting_map = { path = "gencargo/evicting_map" }
187193
evicting_map_test = { path = "gencargo/evicting_map_test" }
188194
execution_server = { path = "gencargo/execution_server" }
195+
existence_store = { path = "gencargo/existence_store" }
196+
existence_store_test = { path = "gencargo/existence_store_test" }
189197
fast_slow_store = { path = "gencargo/fast_slow_store" }
190198
fast_slow_store_test = { path = "gencargo/fast_slow_store_test" }
191199
fastcdc = { path = "gencargo/fastcdc" }

cas/grpc_service/tests/worker_api_server_test.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use proto::build::bazel::remote::execution::v2::{
2828
ActionResult as ProtoActionResult, ExecuteResponse, ExecutedActionMetadata, LogFile, OutputDirectory, OutputFile,
2929
OutputSymlink,
3030
};
31-
use proto::com::github::allada::turbo_cache::remote_execution::{
31+
use proto::com::github::trace_machina::turbo_cache::remote_execution::{
3232
execute_result, update_for_worker, worker_api_server::WorkerApi, ExecuteResult, KeepAliveRequest,
3333
SupportedProperties,
3434
};

cas/grpc_service/worker_api_server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use common::DigestInfo;
2929
use config::cas_server::WorkerApiConfig;
3030
use error::{make_err, Code, Error, ResultExt};
3131
use platform_property_manager::PlatformProperties;
32-
use proto::com::github::allada::turbo_cache::remote_execution::{
32+
use proto::com::github::trace_machina::turbo_cache::remote_execution::{
3333
execute_result, worker_api_server::WorkerApi, worker_api_server::WorkerApiServer as Server, ExecuteResult,
3434
GoingAwayRequest, KeepAliveRequest, SupportedProperties, UpdateForWorker,
3535
};

cas/scheduler/action_messages.rs

+47
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,53 @@ impl From<ActionResult> for ProtoActionResult {
748748
}
749749
}
750750

751+
impl From<ProtoActionResult> for ActionResult {
752+
fn from(val: ProtoActionResult) -> Self {
753+
let output_file_symlinks = val
754+
.output_file_symlinks
755+
.into_iter()
756+
.map(|output_symlink| SymlinkInfo::try_from(output_symlink).unwrap())
757+
.collect::<Vec<_>>();
758+
759+
let output_directory_symlinks = val
760+
.output_directory_symlinks
761+
.into_iter()
762+
.map(|output_symlink| SymlinkInfo::try_from(output_symlink).unwrap())
763+
.collect::<Vec<_>>();
764+
765+
Self {
766+
output_files: val
767+
.output_files
768+
.into_iter()
769+
.map(|output_file| output_file.try_into().unwrap())
770+
.collect(),
771+
output_folders: val
772+
.output_directories
773+
.into_iter()
774+
.map(|output_directory| output_directory.try_into().unwrap())
775+
.collect(),
776+
output_file_symlinks,
777+
output_directory_symlinks,
778+
exit_code: val.exit_code,
779+
stdout_digest: val
780+
.stdout_digest
781+
.map(|digest| digest.try_into().unwrap_or(DigestInfo::empty_digest()))
782+
.unwrap_or(DigestInfo::empty_digest()),
783+
stderr_digest: val
784+
.stderr_digest
785+
.map(|digest| digest.try_into().unwrap_or(DigestInfo::empty_digest()))
786+
.unwrap_or_else(DigestInfo::empty_digest),
787+
execution_metadata: val
788+
.execution_metadata
789+
.map(|metadata| metadata.try_into().unwrap_or(ExecutionMetadata::default()))
790+
.unwrap_or_default(),
791+
server_logs: Default::default(),
792+
error: None,
793+
message: String::new(),
794+
}
795+
}
796+
}
797+
751798
impl TryFrom<ExecuteResponse> for ActionStage {
752799
type Error = Error;
753800

cas/scheduler/tests/simple_scheduler_test.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use common::DigestInfo;
2727
use error::{make_err, Code, Error, ResultExt};
2828
use platform_property_manager::{PlatformProperties, PlatformPropertyValue};
2929
use proto::build::bazel::remote::execution::v2::{digest_function, ExecuteRequest};
30-
use proto::com::github::allada::turbo_cache::remote_execution::{
30+
use proto::com::github::trace_machina::turbo_cache::remote_execution::{
3131
update_for_worker, ConnectionResult, StartExecute, UpdateForWorker,
3232
};
3333
use scheduler::{ActionScheduler, WorkerScheduler};

cas/scheduler/worker.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use action_messages::ActionInfo;
2323
use error::{make_err, make_input_err, Code, Error, ResultExt};
2424
use metrics_utils::{CollectorState, CounterWithTime, FuncCounterWrapper, MetricsComponent};
2525
use platform_property_manager::{PlatformProperties, PlatformPropertyValue};
26-
use proto::com::github::allada::turbo_cache::remote_execution::{
26+
use proto::com::github::trace_machina::turbo_cache::remote_execution::{
2727
update_for_worker, ConnectionResult, StartExecute, UpdateForWorker,
2828
};
2929
use tokio::sync::mpsc::UnboundedSender;

cas/store/BUILD

+88-1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ rust_library(
3535
":store",
3636
":traits",
3737
":verify_store",
38+
":completeness_checking_store",
39+
":existence_store",
3840
"//config",
3941
"//util:error",
4042
"//util:metrics_utils",
@@ -125,18 +127,65 @@ rust_library(
125127
],
126128
)
127129

130+
rust_library(
131+
name = "completeness_checking_store",
132+
srcs = ["completeness_checking_store.rs"],
133+
proc_macro_deps = ["@crate_index//:async-trait"],
134+
visibility = ["//cas:__pkg__"],
135+
deps = [
136+
":traits",
137+
":ac_utils",
138+
"//proto",
139+
"//config",
140+
"//util:buf_channel",
141+
"//util:common",
142+
"//util:error",
143+
"//util:metrics_utils",
144+
"//cas/scheduler:action_messages",
145+
"@crate_index//:futures",
146+
"@crate_index//:hashbrown",
147+
"@crate_index//:hex",
148+
"@crate_index//:sha2",
149+
"@crate_index//:tokio",
150+
],
151+
)
152+
128153
rust_library(
129154
name = "verify_store",
130155
srcs = ["verify_store.rs"],
131156
proc_macro_deps = ["@crate_index//:async-trait"],
132157
visibility = ["//cas:__pkg__"],
133158
deps = [
134159
":traits",
160+
":ac_utils",
161+
"//proto",
162+
"//config",
163+
"//util:buf_channel",
164+
"//util:common",
165+
"//util:error",
166+
"//util:metrics_utils",
167+
"@crate_index//:hashbrown",
168+
"@crate_index//:hex",
169+
"@crate_index//:sha2",
170+
"@crate_index//:tokio",
171+
],
172+
)
173+
174+
rust_library(
175+
name = "existence_store",
176+
srcs = ["existence_store.rs"],
177+
proc_macro_deps = ["@crate_index//:async-trait"],
178+
visibility = ["//cas:__pkg__"],
179+
deps = [
180+
":traits",
181+
":ac_utils",
182+
"//proto",
135183
"//config",
136184
"//util:buf_channel",
137185
"//util:common",
138186
"//util:error",
139187
"//util:metrics_utils",
188+
"@crate_index//:hashbrown",
140189
"@crate_index//:hex",
141190
"@crate_index//:sha2",
142191
"@crate_index//:tokio",
@@ -388,6 +437,27 @@ rust_test(
388437
],
389438
)
390439

440+
rust_test(
441+
name = "completeness_checking_store_test",
442+
srcs = ["tests/completeness_checking_store_test.rs"],
443+
deps = [
444+
":memory_store",
445+
":traits",
446+
":completeness_checking_store",
447+
":store",
448+
"//proto",
449+
"//config",
450+
"//util:buf_channel",
451+
"//util:common",
452+
"//util:error",
453+
"//cas/scheduler:action_messages",
454+
"@crate_index//:prost",
455+
"@crate_index//:futures",
456+
"@crate_index//:pretty_assertions",
457+
"@crate_index//:tokio",
458+
],
459+
)
460+
391461
rust_test(
392462
name = "verify_store_test",
393463
srcs = ["tests/verify_store_test.rs"],
@@ -405,6 +475,23 @@ rust_test(
405475
],
406476
)
407477

478+
rust_test(
479+
name = "existence_store_test",
480+
srcs = ["tests/existence_store_test.rs"],
481+
deps = [
482+
":memory_store",
483+
":traits",
484+
":existence_store",
485+
"//config",
486+
"//util:buf_channel",
487+
"//util:common",
488+
"//util:error",
489+
"@crate_index//:futures",
490+
"@crate_index//:pretty_assertions",
491+
"@crate_index//:tokio",
492+
],
493+
)
494+
408495
rust_test(
409496
name = "s3_store_test",
410497
srcs = ["tests/s3_store_test.rs"],
@@ -498,4 +585,4 @@ rust_test(
498585
"@crate_index//:rand",
499586
"@crate_index//:tokio",
500587
],
501-
)
588+
)
+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Copyright 2023 The Turbo Cache Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::pin::Pin;
16+
use std::sync::Arc;
17+
18+
use ac_utils::get_and_decode_digest;
19+
use action_messages::ActionResult;
20+
use async_trait::async_trait;
21+
use futures::stream::{self, StreamExt, TryStreamExt};
22+
use proto::build::bazel::remote::execution::v2::{ActionResult as ProtoActionResult, Tree};
23+
24+
use buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
25+
use common::DigestInfo;
26+
use error::{Error, ResultExt};
27+
use traits::{StoreTrait, UploadSizeInfo};
28+
29+
pub struct CompletenessCheckingStore {
30+
cas_store: Arc<dyn StoreTrait>,
31+
}
32+
33+
impl CompletenessCheckingStore {
34+
pub fn new(_ac_store: Arc<dyn StoreTrait>, cas_store: Arc<dyn StoreTrait>) -> Self {
35+
CompletenessCheckingStore { cas_store }
36+
}
37+
38+
fn pin_cas(&self) -> Pin<&dyn StoreTrait> {
39+
Pin::new(self.cas_store.as_ref())
40+
}
41+
}
42+
43+
#[async_trait]
44+
impl StoreTrait for CompletenessCheckingStore {
45+
async fn has_with_results(
46+
self: Pin<&Self>,
47+
digests: &[DigestInfo],
48+
_results: &mut [Option<usize>],
49+
) -> Result<(), Error> {
50+
// The proto promises that all results will exist in the CAS when
51+
// requested using the associated actions. However we currently allow
52+
// stores to prune themselves which violates this condition, because the
53+
// action cache and CAS are different. Therefore we need this completeness checking
54+
// store to check that all results exist in the CAS before we allow the the action result
55+
// to be returned to the client.
56+
// * Take the root digest which is the serialzied action proto hash
57+
// * Deserialize the action proto
58+
// * Check files in the root tree exist in the CAS but there's directories that needs
59+
// to be traversed as the directories can have directories and files in them and esure all
60+
// of them exist in the CAS.
61+
62+
for digest in digests {
63+
let action_result: ActionResult = get_and_decode_digest::<ProtoActionResult>(self.pin_cas(), digest)
64+
.await?
65+
.try_into()
66+
.err_tip(|| "Action result could not be converted in completeness checking store has")?;
67+
68+
// Something that takes a stream of iterators and flattens it into a stream of items
69+
// 'temp' is a stream of digests obtained from the output folders of the action result
70+
let _temp =
71+
stream::iter(action_result.output_folders)
72+
.then(|directory| async move {
73+
get_and_decode_digest::<Tree>(self.pin_cas(), &directory.tree_digest).await
74+
})
75+
.map_ok(|tree| async {
76+
let trees =
77+
tree.children
78+
.into_iter()
79+
.chain(tree.root.into_iter())
80+
.flat_map(|child_directory| {
81+
child_directory.files.into_iter().filter_map(|file| file.digest)
82+
});
83+
84+
let item_stream = stream::iter(trees);
85+
item_stream
86+
.map(|file_digest| async move {
87+
let digest_info =
88+
DigestInfo::try_new(&file_digest.hash, file_digest.size_bytes as usize)?;
89+
self.pin_cas().has(digest_info).await
90+
})
91+
.buffer_unordered(10)
92+
.try_collect::<Vec<_>>()
93+
.await
94+
});
95+
}
96+
97+
Ok(())
98+
}
99+
100+
async fn update(
101+
self: Pin<&Self>,
102+
digest: DigestInfo,
103+
reader: DropCloserReadHalf,
104+
size_info: UploadSizeInfo,
105+
) -> Result<(), Error> {
106+
self.pin_cas().update(digest, reader, size_info).await
107+
}
108+
109+
async fn get_part_ref(
110+
self: Pin<&Self>,
111+
digest: DigestInfo,
112+
writer: &mut DropCloserWriteHalf,
113+
offset: usize,
114+
length: Option<usize>,
115+
) -> Result<(), Error> {
116+
self.pin_cas().get_part_ref(digest, writer, offset, length).await
117+
}
118+
119+
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
120+
Box::new(self)
121+
}
122+
}

0 commit comments

Comments
 (0)