Skip to content

Commit 4c8b66d

Browse files
authored
Merge pull request #2550 from subspace/farmer-plot-cache
Farmer plot cache
2 parents 7598165 + ce41784 commit 4c8b66d

File tree

9 files changed

+657
-85
lines changed

9 files changed

+657
-85
lines changed

crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs

+4
Original file line numberDiff line numberDiff line change
@@ -631,6 +631,10 @@ where
631631
.iter()
632632
.map(|single_disk_farm| single_disk_farm.piece_cache())
633633
.collect(),
634+
single_disk_farms
635+
.iter()
636+
.map(|single_disk_farm| single_disk_farm.plot_cache())
637+
.collect(),
634638
)
635639
.await;
636640
drop(farmer_cache);

crates/subspace-farmer/src/farmer_cache.rs

+147-41
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ mod tests;
33

44
use crate::node_client::NodeClient;
55
use crate::single_disk_farm::piece_cache::{DiskPieceCache, Offset};
6+
use crate::single_disk_farm::plot_cache::{DiskPlotCache, MaybePieceStoredResult};
67
use crate::utils::{run_future_in_dedicated_thread, AsyncJoinOnDrop};
78
use event_listener_primitives::{Bag, HandlerId};
89
use futures::channel::oneshot;
@@ -11,6 +12,7 @@ use futures::{select, FutureExt, StreamExt};
1112
use parking_lot::RwLock;
1213
use std::collections::{HashMap, VecDeque};
1314
use std::num::NonZeroU16;
15+
use std::sync::atomic::{AtomicUsize, Ordering};
1416
use std::sync::Arc;
1517
use std::time::Duration;
1618
use std::{fmt, mem};
@@ -51,7 +53,7 @@ struct DiskPieceCacheState {
5153
#[derive(Debug)]
5254
enum WorkerCommand {
5355
ReplaceBackingCaches {
54-
new_caches: Vec<DiskPieceCache>,
56+
new_piece_caches: Vec<DiskPieceCache>,
5557
acknowledgement: oneshot::Sender<()>,
5658
},
5759
ForgetKey {
@@ -102,11 +104,11 @@ where
102104
.expect("Always set during worker instantiation");
103105

104106
if let Some(WorkerCommand::ReplaceBackingCaches {
105-
new_caches,
107+
new_piece_caches,
106108
acknowledgement,
107109
}) = worker_receiver.recv().await
108110
{
109-
self.initialize(&piece_getter, &mut worker_state, new_caches)
111+
self.initialize(&piece_getter, &mut worker_state, new_piece_caches)
110112
.await;
111113
// Doesn't matter if receiver is still waiting for acknowledgement
112114
let _ = acknowledgement.send(());
@@ -163,10 +165,10 @@ where
163165
{
164166
match command {
165167
WorkerCommand::ReplaceBackingCaches {
166-
new_caches,
168+
new_piece_caches,
167169
acknowledgement,
168170
} => {
169-
self.initialize(piece_getter, worker_state, new_caches)
171+
self.initialize(piece_getter, worker_state, new_piece_caches)
170172
.await;
171173
// Doesn't matter if receiver is still waiting for acknowledgement
172174
let _ = acknowledgement.send(());
@@ -215,31 +217,31 @@ where
215217
&self,
216218
piece_getter: &PG,
217219
worker_state: &mut CacheWorkerState,
218-
new_caches: Vec<DiskPieceCache>,
220+
new_piece_caches: Vec<DiskPieceCache>,
219221
) where
220222
PG: PieceGetter,
221223
{
222224
info!("Initializing piece cache");
223225
// Pull old cache state since it will be replaced with a new one and reuse its allocations
224226
let cache_state = mem::take(&mut *self.caches.write());
225-
let mut stored_pieces = Vec::with_capacity(new_caches.len());
226-
let mut free_offsets = Vec::with_capacity(new_caches.len());
227+
let mut stored_pieces = Vec::with_capacity(new_piece_caches.len());
228+
let mut free_offsets = Vec::with_capacity(new_piece_caches.len());
227229
for mut state in cache_state {
228230
state.stored_pieces.clear();
229231
stored_pieces.push(state.stored_pieces);
230232
state.free_offsets.clear();
231233
free_offsets.push(state.free_offsets);
232234
}
233-
stored_pieces.resize(new_caches.len(), HashMap::default());
234-
free_offsets.resize(new_caches.len(), VecDeque::default());
235+
stored_pieces.resize(new_piece_caches.len(), HashMap::default());
236+
free_offsets.resize(new_piece_caches.len(), VecDeque::default());
235237

236238
debug!("Collecting pieces that were in the cache before");
237239

238240
// Build cache state of all backends
239241
let maybe_caches_futures = stored_pieces
240242
.into_iter()
241243
.zip(free_offsets)
242-
.zip(new_caches)
244+
.zip(new_piece_caches)
243245
.enumerate()
244246
.map(
245247
|(index, ((mut stored_pieces, mut free_offsets), new_cache))| {
@@ -760,8 +762,12 @@ where
760762
#[derive(Debug, Clone)]
761763
pub struct FarmerCache {
762764
peer_id: PeerId,
763-
/// Individual disk caches where pieces are stored
764-
caches: Arc<RwLock<Vec<DiskPieceCacheState>>>,
765+
/// Individual dedicated piece caches
766+
piece_caches: Arc<RwLock<Vec<DiskPieceCacheState>>>,
767+
/// Additional piece caches
768+
plot_caches: Arc<RwLock<Vec<DiskPlotCache>>>,
769+
/// Next plot cache to use for storing pieces
770+
next_plot_cache: Arc<AtomicUsize>,
765771
handlers: Arc<Handlers>,
766772
// We do not want to increase capacity unnecessarily on clone
767773
worker_sender: Arc<mpsc::Sender<WorkerCommand>>,
@@ -782,7 +788,9 @@ impl FarmerCache {
782788

783789
let instance = Self {
784790
peer_id,
785-
caches: Arc::clone(&caches),
791+
piece_caches: Arc::clone(&caches),
792+
plot_caches: Arc::default(),
793+
next_plot_cache: Arc::new(AtomicUsize::new(0)),
786794
handlers: Arc::clone(&handlers),
787795
worker_sender: Arc::new(worker_sender),
788796
};
@@ -801,34 +809,47 @@ impl FarmerCache {
801809
pub async fn get_piece(&self, key: RecordKey) -> Option<Piece> {
802810
let maybe_piece_fut = tokio::task::spawn_blocking({
803811
let key = key.clone();
804-
let caches = Arc::clone(&self.caches);
812+
let piece_caches = Arc::clone(&self.piece_caches);
813+
let plot_caches = Arc::clone(&self.plot_caches);
805814
let worker_sender = Arc::clone(&self.worker_sender);
806815

807816
move || {
808-
for (disk_farm_index, cache) in caches.read().iter().enumerate() {
809-
let Some(&offset) = cache.stored_pieces.get(&key) else {
810-
continue;
811-
};
812-
match cache.backend.read_piece(offset) {
813-
Ok(maybe_piece) => {
814-
return maybe_piece;
815-
}
816-
Err(error) => {
817-
error!(
818-
%error,
819-
%disk_farm_index,
820-
?key,
821-
%offset,
822-
"Error while reading piece from cache, might be a disk corruption"
823-
);
817+
{
818+
let piece_caches = piece_caches.read();
819+
for (disk_farm_index, cache) in piece_caches.iter().enumerate() {
820+
let Some(&offset) = cache.stored_pieces.get(&key) else {
821+
continue;
822+
};
823+
match cache.backend.read_piece(offset) {
824+
Ok(maybe_piece) => {
825+
return maybe_piece;
826+
}
827+
Err(error) => {
828+
error!(
829+
%error,
830+
%disk_farm_index,
831+
?key,
832+
%offset,
833+
"Error while reading piece from cache, might be a disk corruption"
834+
);
835+
836+
if let Err(error) =
837+
worker_sender.blocking_send(WorkerCommand::ForgetKey { key })
838+
{
839+
trace!(%error, "Failed to send ForgetKey command to worker");
840+
}
824841

825-
if let Err(error) =
826-
worker_sender.blocking_send(WorkerCommand::ForgetKey { key })
827-
{
828-
trace!(%error, "Failed to send ForgetKey command to worker");
842+
return None;
829843
}
844+
}
845+
}
846+
}
830847

831-
return None;
848+
{
849+
let plot_caches = plot_caches.read();
850+
for cache in plot_caches.iter() {
851+
if let Some(piece) = cache.read_piece(&key) {
852+
return Some(piece);
832853
}
833854
}
834855
}
@@ -846,24 +867,92 @@ impl FarmerCache {
846867
}
847868
}
848869

870+
/// Try to store a piece in additional downloaded pieces, if there is space for them
871+
pub async fn maybe_store_additional_piece(&self, piece_index: PieceIndex, piece: &Piece) {
872+
let key = RecordKey::from(piece_index.to_multihash());
873+
874+
let mut should_store = false;
875+
for cache in self.plot_caches.read().iter() {
876+
match cache.is_piece_maybe_stored(&key) {
877+
MaybePieceStoredResult::No => {
878+
// Try another one if there is any
879+
}
880+
MaybePieceStoredResult::Vacant => {
881+
should_store = true;
882+
break;
883+
}
884+
MaybePieceStoredResult::Yes => {
885+
// Already stored, nothing else left to do
886+
return;
887+
}
888+
}
889+
}
890+
891+
if !should_store {
892+
return;
893+
}
894+
895+
let should_store_fut = tokio::task::spawn_blocking({
896+
let plot_caches = Arc::clone(&self.plot_caches);
897+
let next_plot_cache = Arc::clone(&self.next_plot_cache);
898+
let piece = piece.clone();
899+
900+
move || {
901+
let plot_caches = plot_caches.read();
902+
let plot_caches_len = plot_caches.len();
903+
904+
// Store pieces in plots using round-robin distribution
905+
for _ in 0..plot_caches_len {
906+
let plot_cache_index =
907+
next_plot_cache.fetch_add(1, Ordering::Relaxed) % plot_caches_len;
908+
909+
match plot_caches[plot_cache_index].try_store_piece(piece_index, &piece) {
910+
Ok(true) => {
911+
return;
912+
}
913+
Ok(false) => {
914+
continue;
915+
}
916+
Err(error) => {
917+
error!(
918+
%error,
919+
%piece_index,
920+
%plot_cache_index,
921+
"Failed to store additional piece in cache"
922+
);
923+
continue;
924+
}
925+
}
926+
}
927+
}
928+
});
929+
930+
if let Err(error) = AsyncJoinOnDrop::new(should_store_fut, true).await {
931+
error!(%error, %piece_index, "Failed to store additional piece in cache");
932+
}
933+
}
934+
849935
/// Initialize replacement of backing caches, returns acknowledgement receiver that can be used
850936
/// to identify when cache initialization has finished
851937
pub async fn replace_backing_caches(
852938
&self,
853-
new_caches: Vec<DiskPieceCache>,
939+
new_piece_caches: Vec<DiskPieceCache>,
940+
new_plot_caches: Vec<DiskPlotCache>,
854941
) -> oneshot::Receiver<()> {
855942
let (sender, receiver) = oneshot::channel();
856943
if let Err(error) = self
857944
.worker_sender
858945
.send(WorkerCommand::ReplaceBackingCaches {
859-
new_caches,
946+
new_piece_caches,
860947
acknowledgement: sender,
861948
})
862949
.await
863950
{
864951
warn!(%error, "Failed to replace backing caches, worker exited");
865952
}
866953

954+
*self.plot_caches.write() = new_plot_caches;
955+
867956
receiver
868957
}
869958

@@ -876,10 +965,27 @@ impl FarmerCache {
876965
impl LocalRecordProvider for FarmerCache {
877966
fn record(&self, key: &RecordKey) -> Option<ProviderRecord> {
878967
// It is okay to take read lock here, writes locks are very infrequent and very short
879-
for cache in self.caches.read().iter() {
880-
if cache.stored_pieces.contains_key(key) {
968+
for piece_cache in self.piece_caches.read().iter() {
969+
if piece_cache.stored_pieces.contains_key(key) {
970+
// Note: We store our own provider records locally without local addresses
971+
// to avoid redundant storage and outdated addresses. Instead, these are
972+
// acquired on demand when returning a `ProviderRecord` for the local node.
973+
return Some(ProviderRecord {
974+
key: key.clone(),
975+
provider: self.peer_id,
976+
expires: None,
977+
addresses: Vec::new(),
978+
});
979+
};
980+
}
981+
// It is okay to take read lock here, writes locks almost never happen
982+
for plot_cache in self.plot_caches.read().iter() {
983+
if matches!(
984+
plot_cache.is_piece_maybe_stored(key),
985+
MaybePieceStoredResult::Yes
986+
) {
881987
// Note: We store our own provider records locally without local addresses
882-
// to avoid redundant storage and outdated addresses. Instead these are
988+
// to avoid redundant storage and outdated addresses. Instead, these are
883989
// acquired on demand when returning a `ProviderRecord` for the local node.
884990
return Some(ProviderRecord {
885991
key: key.clone(),

crates/subspace-farmer/src/farmer_cache/tests.rs

+14-8
Original file line numberDiff line numberDiff line change
@@ -192,10 +192,13 @@ async fn basic() {
192192
tokio::spawn(farmer_cache_worker.run(piece_getter.clone()));
193193

194194
let initialized_fut = farmer_cache
195-
.replace_backing_caches(vec![
196-
DiskPieceCache::open(path1.as_ref(), 1).unwrap(),
197-
DiskPieceCache::open(path2.as_ref(), 1).unwrap(),
198-
])
195+
.replace_backing_caches(
196+
vec![
197+
DiskPieceCache::open(path1.as_ref(), 1).unwrap(),
198+
DiskPieceCache::open(path2.as_ref(), 1).unwrap(),
199+
],
200+
vec![],
201+
)
199202
.await;
200203

201204
// Wait for piece cache to be initialized
@@ -375,10 +378,13 @@ async fn basic() {
375378

376379
// Reopen with the same backing caches
377380
let initialized_fut = farmer_cache
378-
.replace_backing_caches(vec![
379-
DiskPieceCache::open(path1.as_ref(), 1).unwrap(),
380-
DiskPieceCache::open(path2.as_ref(), 1).unwrap(),
381-
])
381+
.replace_backing_caches(
382+
vec![
383+
DiskPieceCache::open(path1.as_ref(), 1).unwrap(),
384+
DiskPieceCache::open(path2.as_ref(), 1).unwrap(),
385+
],
386+
vec![],
387+
)
382388
.await;
383389
drop(farmer_cache);
384390

0 commit comments

Comments
 (0)