Skip to content

Commit b94fd7c

Browse files
committed
Add Completeness Checking Store
This verifies all file and folder digests associated with action results are in CAS. This enables us to still prune caches while still maintaining the completeness guarantees in the proto. The store is useful when combined with existence caching.
1 parent ab0f1ac commit b94fd7c

File tree

9 files changed

+620
-4
lines changed

9 files changed

+620
-4
lines changed

native-link-config/src/stores.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ pub enum StoreConfig {
4444
/// hash and size and the AC validate nothing.
4545
verify(Box<VerifyStore>),
4646

47+
/// Completeness checking store verifies if the
48+
/// output files & folders exist in the CAS before forwarding
49+
/// the request to the underlying store.
50+
/// Note: This store should only be used on AC stores.
51+
completeness_checking(Box<CompletenessCheckingStore>),
52+
4753
/// A compression store that will compress the data inbound and
4854
/// outbound. There will be a non-trivial cost to compress and
4955
/// decompress the data, but in many cases if the final store is
@@ -331,6 +337,16 @@ pub struct VerifyStore {
331337
pub verify_hash: bool,
332338
}
333339

340+
#[derive(Serialize, Deserialize, Debug, Clone)]
341+
pub struct CompletenessCheckingStore {
342+
/// The underlying store that will have it's results validated before sending to client.
343+
pub backend: StoreConfig,
344+
345+
/// When a request is made, the results are decoded and all output digests/files are verified
346+
/// to exist in this CAS store before returning success.
347+
pub cas_store: StoreConfig,
348+
}
349+
334350
#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Clone, Copy)]
335351
pub struct Lz4Config {
336352
/// Size of the blocks to compress.

native-link-store/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ rust_library(
1010
name = "native-link-store",
1111
srcs = [
1212
"src/ac_utils.rs",
13+
"src/completeness_checking_store.rs",
1314
"src/compression_store.rs",
1415
"src/dedup_store.rs",
1516
"src/default_store_factory.rs",
@@ -69,6 +70,7 @@ rust_test_suite(
6970
name = "integration",
7071
srcs = [
7172
"tests/ac_utils_test.rs",
73+
"tests/completeness_checking_store_test.rs",
7274
"tests/compression_store_test.rs",
7375
"tests/dedup_store_test.rs",
7476
"tests/existence_store_test.rs",
@@ -89,6 +91,7 @@ rust_test_suite(
8991
"//error",
9092
"//native-link-config",
9193
"//native-link-util",
94+
"//proto",
9295
"@crate_index//:async-lock",
9396
"@crate_index//:aws-sdk-s3",
9497
"@crate_index//:aws-smithy-runtime",

native-link-store/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ futures = "0.3.28"
2323
hex = "0.4.3"
2424
hyper = { version = "0.14.27" }
2525
hyper-rustls = { version = "0.24.2", features = ["webpki-tokio"] }
26+
tracing = "0.1.40"
2627
lz4_flex = "0.11.1"
2728
parking_lot = "0.12.1"
2829
prost = "0.11.9"
@@ -34,7 +35,6 @@ tokio = { version = "1.29.1" }
3435
tokio-stream = { version = "0.1.14", features = ["fs"] }
3536
tokio-util = { version = "0.7.8" }
3637
tonic = { version = "0.9.2", features = ["gzip"] }
37-
tracing = "0.1.40"
3838
uuid = { version = "1.4.0", features = ["v4"] }
3939

4040
[dev-dependencies]

native-link-store/src/ac_utils.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use std::pin::Pin;
2323
use bytes::{Bytes, BytesMut};
2424
use error::{Code, Error, ResultExt};
2525
use futures::future::join;
26-
use futures::{Future, FutureExt};
26+
use futures::{Future, FutureExt, TryFutureExt};
2727
use native_link_util::buf_channel::{make_buf_channel_pair, DropCloserWriteHalf};
2828
use native_link_util::common::{fs, DigestInfo};
2929
use native_link_util::digest_hasher::DigestHasher;
@@ -48,6 +48,14 @@ pub async fn get_and_decode_digest<T: Message + Default>(
4848
store: Pin<&dyn Store>,
4949
digest: &DigestInfo,
5050
) -> Result<T, Error> {
51+
get_size_and_decode_digest(store, digest).map_ok(|(v, _)| v).await
52+
}
53+
54+
/// Attempts to fetch the digest contents from a store into the associated proto.
55+
pub async fn get_size_and_decode_digest<T: Message + Default>(
56+
store: Pin<&dyn Store>,
57+
digest: &DigestInfo,
58+
) -> Result<(T, usize), Error> {
5159
let mut store_data_resp = store
5260
.get_part_unchunked(*digest, 0, Some(MAX_ACTION_MSG_SIZE), Some(ESTIMATED_DIGEST_SIZE))
5361
.await;
@@ -60,8 +68,11 @@ pub async fn get_and_decode_digest<T: Message + Default>(
6068
}
6169
}
6270
let store_data = store_data_resp?;
71+
let store_data_len = store_data.len();
6372

64-
T::decode(store_data).err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))
73+
T::decode(store_data)
74+
.err_tip_with_code(|e| (Code::NotFound, format!("Stored value appears to be corrupt: {}", e)))
75+
.map(|v| (v, store_data_len))
6576
}
6677

6778
/// Computes the digest of a message.

0 commit comments

Comments
 (0)