Skip to content

Commit 8b47982

Browse files
authored
Merge branch 'main' into completeness-checking-store-restructuring
2 parents 73d8ee7 + e8e6701 commit 8b47982

28 files changed

+423
-88
lines changed

Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ members = [
4242
"gencargo/evicting_map",
4343
"gencargo/evicting_map_test",
4444
"gencargo/execution_server",
45+
"gencargo/existence_store",
46+
"gencargo/existence_store_test",
4547
"gencargo/fast_slow_store",
4648
"gencargo/fast_slow_store_test",
4749
"gencargo/fastcdc",
@@ -190,6 +192,8 @@ error = { path = "gencargo/error" }
190192
evicting_map = { path = "gencargo/evicting_map" }
191193
evicting_map_test = { path = "gencargo/evicting_map_test" }
192194
execution_server = { path = "gencargo/execution_server" }
195+
existence_store = { path = "gencargo/existence_store" }
196+
existence_store_test = { path = "gencargo/existence_store_test" }
193197
fast_slow_store = { path = "gencargo/fast_slow_store" }
194198
fast_slow_store_test = { path = "gencargo/fast_slow_store_test" }
195199
fastcdc = { path = "gencargo/fastcdc" }

cas/grpc_service/tests/worker_api_server_test.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use proto::build::bazel::remote::execution::v2::{
2828
ActionResult as ProtoActionResult, ExecuteResponse, ExecutedActionMetadata, LogFile, OutputDirectory, OutputFile,
2929
OutputSymlink,
3030
};
31-
use proto::com::github::allada::turbo_cache::remote_execution::{
31+
use proto::com::github::trace_machina::turbo_cache::remote_execution::{
3232
execute_result, update_for_worker, worker_api_server::WorkerApi, ExecuteResult, KeepAliveRequest,
3333
SupportedProperties,
3434
};

cas/grpc_service/worker_api_server.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use common::DigestInfo;
2929
use config::cas_server::WorkerApiConfig;
3030
use error::{make_err, Code, Error, ResultExt};
3131
use platform_property_manager::PlatformProperties;
32-
use proto::com::github::allada::turbo_cache::remote_execution::{
32+
use proto::com::github::trace_machina::turbo_cache::remote_execution::{
3333
execute_result, worker_api_server::WorkerApi, worker_api_server::WorkerApiServer as Server, ExecuteResult,
3434
GoingAwayRequest, KeepAliveRequest, SupportedProperties, UpdateForWorker,
3535
};

cas/scheduler/tests/simple_scheduler_test.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use common::DigestInfo;
2727
use error::{make_err, Code, Error, ResultExt};
2828
use platform_property_manager::{PlatformProperties, PlatformPropertyValue};
2929
use proto::build::bazel::remote::execution::v2::{digest_function, ExecuteRequest};
30-
use proto::com::github::allada::turbo_cache::remote_execution::{
30+
use proto::com::github::trace_machina::turbo_cache::remote_execution::{
3131
update_for_worker, ConnectionResult, StartExecute, UpdateForWorker,
3232
};
3333
use scheduler::{ActionScheduler, WorkerScheduler};

cas/scheduler/worker.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use action_messages::ActionInfo;
2323
use error::{make_err, make_input_err, Code, Error, ResultExt};
2424
use metrics_utils::{CollectorState, CounterWithTime, FuncCounterWrapper, MetricsComponent};
2525
use platform_property_manager::{PlatformProperties, PlatformPropertyValue};
26-
use proto::com::github::allada::turbo_cache::remote_execution::{
26+
use proto::com::github::trace_machina::turbo_cache::remote_execution::{
2727
update_for_worker, ConnectionResult, StartExecute, UpdateForWorker,
2828
};
2929
use tokio::sync::mpsc::UnboundedSender;

cas/store/BUILD

+38
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,27 @@ rust_library(
171171
],
172172
)
173173

174+
rust_library(
175+
name = "existence_store",
176+
srcs = ["existence_store.rs"],
177+
proc_macro_deps = ["@crate_index//:async-trait"],
178+
visibility = ["//cas:__pkg__"],
179+
deps = [
180+
":traits",
181+
":ac_utils",
182+
"//proto",
183+
"//config",
184+
"//util:buf_channel",
185+
"//util:common",
186+
"//util:error",
187+
"//util:metrics_utils",
188+
"@crate_index//:hashbrown",
189+
"@crate_index//:hex",
190+
"@crate_index//:sha2",
191+
"@crate_index//:tokio",
192+
],
193+
)
194+
174195
rust_library(
175196
name = "fast_slow_store",
176197
srcs = ["fast_slow_store.rs"],
@@ -454,6 +475,23 @@ rust_test(
454475
],
455476
)
456477

478+
rust_test(
479+
name = "existence_store_test",
480+
srcs = ["tests/existence_store_test.rs"],
481+
deps = [
482+
":memory_store",
483+
":traits",
484+
":existence_store",
485+
"//config",
486+
"//util:buf_channel",
487+
"//util:common",
488+
"//util:error",
489+
"@crate_index//:futures",
490+
"@crate_index//:pretty_assertions",
491+
"@crate_index//:tokio",
492+
],
493+
)
494+
457495
rust_test(
458496
name = "s3_store_test",
459497
srcs = ["tests/s3_store_test.rs"],

cas/store/default_store_factory.rs

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use compression_store::CompressionStore;
2323
use config::{self, stores::StoreConfig};
2424
use dedup_store::DedupStore;
2525
use error::Error;
26+
use existence_store::ExistenceStore;
2627
use fast_slow_store::FastSlowStore;
2728
use filesystem_store::FilesystemStore;
2829
use grpc_store::GrpcStore;
@@ -63,6 +64,9 @@ pub fn store_factory<'a>(
6364
store_factory(&config.index_store, store_manager, None).await?,
6465
store_factory(&config.content_store, store_manager, None).await?,
6566
)),
67+
StoreConfig::existence_store(config) => Arc::new(ExistenceStore::new(
68+
store_factory(&config.inner, store_manager, None).await?,
69+
)),
6670
StoreConfig::fast_slow(config) => Arc::new(FastSlowStore::new(
6771
config,
6872
store_factory(&config.fast, store_manager, None).await?,

cas/store/existence_store.rs

+105
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright 2022 The Turbo Cache Authors. All rights reserved.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use hashbrown::HashSet;
16+
use std::pin::Pin;
17+
use std::sync::{Arc, Mutex};
18+
19+
use async_trait::async_trait;
20+
21+
use buf_channel::{DropCloserReadHalf, DropCloserWriteHalf};
22+
use common::DigestInfo;
23+
use error::Error;
24+
use traits::{StoreTrait, UploadSizeInfo};
25+
26+
pub struct ExistenceStore {
27+
inner_store: Arc<dyn StoreTrait>,
28+
pub existence_cache: Mutex<HashSet<DigestInfo>>,
29+
}
30+
31+
impl ExistenceStore {
32+
pub fn new(inner_store: Arc<dyn StoreTrait>) -> Self {
33+
Self {
34+
inner_store,
35+
// TODO (BlakeHatch):
36+
// Consider using RwLock in a future commit.
37+
// Since HashSet implements Send and Sync this should
38+
// be a drop-in replacement in theory.
39+
// Make sure benchmark is done to justify.
40+
existence_cache: Mutex::new(HashSet::new()),
41+
}
42+
}
43+
44+
fn pin_inner(&self) -> Pin<&dyn StoreTrait> {
45+
Pin::new(self.inner_store.as_ref())
46+
}
47+
}
48+
49+
#[async_trait]
50+
impl StoreTrait for ExistenceStore {
51+
async fn has_with_results(
52+
self: Pin<&Self>,
53+
digests: &[DigestInfo],
54+
results: &mut [Option<usize>],
55+
) -> Result<(), Error> {
56+
let mut pruned_digests = Vec::new();
57+
58+
for (i, digest) in digests.iter().enumerate() {
59+
if self.existence_cache.lock().unwrap().contains(digest) {
60+
results[i] = Some(1);
61+
} else {
62+
pruned_digests.push(*digest);
63+
}
64+
}
65+
66+
if !pruned_digests.is_empty() {
67+
let mut inner_results = vec![None; pruned_digests.len()];
68+
self.pin_inner()
69+
.has_with_results(&pruned_digests, &mut inner_results)
70+
.await?;
71+
72+
for (i, result) in inner_results.iter().enumerate() {
73+
if result.is_some() {
74+
self.existence_cache.lock().unwrap().insert(pruned_digests[i]);
75+
results[i] = Some(1);
76+
}
77+
}
78+
}
79+
80+
Ok(())
81+
}
82+
83+
async fn update(
84+
self: Pin<&Self>,
85+
digest: DigestInfo,
86+
reader: DropCloserReadHalf,
87+
size_info: UploadSizeInfo,
88+
) -> Result<(), Error> {
89+
self.pin_inner().update(digest, reader, size_info).await
90+
}
91+
92+
async fn get_part_ref(
93+
self: Pin<&Self>,
94+
digest: DigestInfo,
95+
writer: &mut DropCloserWriteHalf,
96+
offset: usize,
97+
length: Option<usize>,
98+
) -> Result<(), Error> {
99+
self.pin_inner().get_part_ref(digest, writer, offset, length).await
100+
}
101+
102+
fn as_any(self: Arc<Self>) -> Box<dyn std::any::Any + Send> {
103+
Box::new(self)
104+
}
105+
}

cas/store/grpc_store.rs

+79-38
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use prost::Message;
2525
use proto::build::bazel::remote::execution::v2::digest_function;
2626
use rand::{rngs::OsRng, Rng};
2727
use tokio::time::sleep;
28+
use tonic::transport::Channel;
2829
use tonic::{transport, IntoRequest, Request, Response, Streaming};
2930
use uuid::Uuid;
3031

@@ -111,16 +112,20 @@ impl GrpcStore {
111112
})
112113
}
113114

115+
fn get_retry_config(&self) -> impl Iterator<Item = Duration> + '_ {
116+
ExponentialBackoff::new(Duration::from_millis(self.retry.delay as u64))
117+
.map(|d| (self.jitter_fn)(d))
118+
.take(self.retry.max_retries) // Remember this is number of retries, so will run max_retries + 1.
119+
}
120+
114121
async fn perform_request<F, Fut, R, I>(&self, input: I, mut request: F) -> Result<R, Error>
115122
where
116123
F: FnMut(I) -> Fut + Send + Copy,
117124
Fut: Future<Output = Result<R, Error>> + Send,
118125
R: Send,
119126
I: Send + Clone,
120127
{
121-
let retry_config = ExponentialBackoff::new(Duration::from_millis(self.retry.delay as u64))
122-
.map(|d| (self.jitter_fn)(d))
123-
.take(self.retry.max_retries); // Remember this is number of retries, so will run max_retries + 1.
128+
let retry_config = self.get_retry_config();
124129
self.retrier
125130
.retry(
126131
retry_config,
@@ -263,50 +268,86 @@ impl GrpcStore {
263268
"CAS operation on AC store"
264269
);
265270

266-
let mut client = self.bytestream_client.clone();
267-
268-
let error = Arc::new(Mutex::new(None));
269-
struct LocalState {
271+
struct LocalState<T, E>
272+
where
273+
T: Stream<Item = Result<WriteRequest, E>> + Unpin + Send + 'static,
274+
E: Into<Error> + 'static,
275+
{
270276
instance_name: String,
271-
error: Arc<Mutex<Option<Error>>>,
277+
error: Mutex<Option<Error>>,
278+
read_stream: Mutex<Option<WriteRequestStreamWrapper<T, E>>>,
279+
client: ByteStreamClient<Channel>,
272280
}
273281

274-
let local_state = LocalState {
282+
let local_state = Arc::new(LocalState {
275283
instance_name: self.instance_name.clone(),
276-
error: error.clone(),
277-
};
284+
error: Mutex::new(None),
285+
read_stream: Mutex::new(Some(stream)),
286+
client: self.bytestream_client.clone(),
287+
});
278288

279-
let stream = unfold((stream, local_state), move |(mut stream, local_state)| async {
280-
let maybe_message = stream.next().await;
281-
if let Ok(maybe_message) = maybe_message {
282-
if let Some(mut message) = maybe_message {
283-
// `resource_name` pattern is: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}".
284-
let first_slash_pos = match message.resource_name.find('/') {
285-
Some(pos) => pos,
286-
None => {
287-
log::error!("{}", "Resource name should follow pattern {instance_name}/uploads/{uuid}/blobs/{hash}/{size}");
289+
let retry_config = self.get_retry_config();
290+
let result = self
291+
.retrier
292+
.retry(
293+
retry_config,
294+
unfold(local_state, move |local_state| async move {
295+
let stream = unfold((None, local_state.clone()), move |(stream, local_state)| async {
296+
// Only consume the stream on the first request to read,
297+
// then pass it for future requests in the unfold.
298+
let Some(mut stream) = stream.or_else(|| local_state.read_stream.lock().take()) else {
299+
return None;
300+
};
301+
let maybe_message = stream.next().await;
302+
if let Ok(maybe_message) = maybe_message {
303+
if let Some(mut message) = maybe_message {
304+
// `resource_name` pattern is: "{instance_name}/uploads/{uuid}/blobs/{hash}/{size}".
305+
let first_slash_pos = match message.resource_name.find('/') {
306+
Some(pos) => pos,
307+
None => {
308+
log::error!("{}", "Resource name should follow pattern {instance_name}/uploads/{uuid}/blobs/{hash}/{size}");
309+
return None;
310+
}
311+
};
312+
message.resource_name = format!(
313+
"{}/{}",
314+
&local_state.instance_name,
315+
message.resource_name.get((first_slash_pos + 1)..).unwrap()
316+
);
317+
return Some((message, (Some(stream), local_state)));
318+
}
288319
return None;
289320
}
321+
// TODO(allada) I'm sure there's a way to do this without a mutex, but rust can be super
322+
// picky with borrowing through a stream await.
323+
*local_state.error.lock() = Some(maybe_message.unwrap_err());
324+
None
325+
});
326+
327+
let result = local_state.client.clone()
328+
.write(stream)
329+
.await
330+
.err_tip(|| "in GrpcStore::write");
331+
332+
// If the stream has been consumed, don't retry, but
333+
// otherwise it's ok to try again.
334+
let result = if local_state.read_stream.lock().is_some() {
335+
result.map_or_else(RetryResult::Retry, RetryResult::Ok)
336+
} else {
337+
result.map_or_else(RetryResult::Err, RetryResult::Ok)
290338
};
291-
message.resource_name = format!(
292-
"{}/{}",
293-
&local_state.instance_name,
294-
message.resource_name.get((first_slash_pos + 1)..).unwrap()
295-
);
296-
return Some((message, (stream, local_state)));
297-
}
298-
return None;
299-
}
300-
// TODO(allada) I'm sure there's a way to do this without a mutex, but rust can be super
301-
// picky with borrowing through a stream await.
302-
*local_state.error.lock() = Some(maybe_message.unwrap_err());
303-
None
304-
});
305339

306-
let result = client.write(stream).await.err_tip(|| "in GrpcStore::write")?;
307-
if let Some(err) = error.lock().take() {
308-
return Err(err);
309-
}
340+
// If there was an error with the stream, then don't retry.
341+
let result = if let Some(err) = local_state.error.lock().take() {
342+
RetryResult::Err(err)
343+
} else {
344+
result
345+
};
346+
347+
Some((result, local_state))
348+
}),
349+
)
350+
.await?;
310351
Ok(result)
311352
}
312353

0 commit comments

Comments
 (0)