Skip to content

Commit 1486b54

Browse files
authored
Merge pull request #2585 from subspace/parallel-farm-initialization
Parallel farm initialization
2 parents 2cb7995 + 92abca3 commit 1486b54

File tree

2 files changed

+78
-64
lines changed

2 files changed

+78
-64
lines changed

crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs

+73-62
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use futures::stream::{FuturesOrdered, FuturesUnordered};
1212
use futures::{FutureExt, StreamExt};
1313
use parking_lot::Mutex;
1414
use prometheus_client::registry::Registry;
15+
use rayon::prelude::*;
1516
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
1617
use std::num::{NonZeroU8, NonZeroUsize};
1718
use std::path::PathBuf;
@@ -47,6 +48,7 @@ use subspace_networking::libp2p::Multiaddr;
4748
use subspace_networking::utils::piece_provider::PieceProvider;
4849
use subspace_proof_of_space::Table;
4950
use thread_priority::ThreadPriority;
51+
use tokio::runtime::Handle;
5052
use tokio::sync::Semaphore;
5153
use tracing::{debug, error, info, info_span, warn};
5254
use zeroize::Zeroizing;
@@ -528,7 +530,6 @@ where
528530
"farmer-cache-worker".to_string(),
529531
)?;
530532

531-
let mut single_disk_farms = Vec::with_capacity(disk_farms.len());
532533
let max_pieces_in_sector = match max_pieces_in_sector {
533534
Some(max_pieces_in_sector) => {
534535
if max_pieces_in_sector > farmer_app_info.protocol_info.max_pieces_in_sector {
@@ -626,69 +627,79 @@ where
626627
.map(|farming_thread_pool_size| farming_thread_pool_size.get())
627628
.unwrap_or_else(recommended_number_of_farming_threads);
628629

629-
for (disk_farm_index, disk_farm) in disk_farms.into_iter().enumerate() {
630-
debug!(url = %node_rpc_url, %disk_farm_index, "Connecting to node RPC");
631-
let node_client = NodeRpcClient::new(&node_rpc_url).await?;
632-
633-
let single_disk_farm_fut = SingleDiskFarm::new::<_, _, PosTable>(
634-
SingleDiskFarmOptions {
635-
directory: disk_farm.directory.clone(),
636-
farmer_app_info: farmer_app_info.clone(),
637-
allocated_space: disk_farm.allocated_plotting_space,
638-
max_pieces_in_sector,
639-
node_client,
640-
reward_address,
641-
kzg: kzg.clone(),
642-
erasure_coding: erasure_coding.clone(),
643-
piece_getter: piece_getter.clone(),
644-
cache_percentage,
645-
downloading_semaphore: Arc::clone(&downloading_semaphore),
646-
record_encoding_concurrency,
647-
farm_during_initial_plotting,
648-
farming_thread_pool_size,
649-
plotting_thread_pool_manager: plotting_thread_pool_manager.clone(),
650-
disable_farm_locking,
651-
},
652-
disk_farm_index,
653-
);
654-
655-
let single_disk_farm = match single_disk_farm_fut.await {
656-
Ok(single_disk_farm) => single_disk_farm,
657-
Err(SingleDiskFarmError::InsufficientAllocatedSpace {
658-
min_space,
659-
allocated_space,
660-
}) => {
661-
return Err(anyhow::anyhow!(
662-
"Allocated space {} ({}) is not enough, minimum is ~{} (~{}, {} bytes to be \
663-
exact)",
664-
bytesize::to_string(allocated_space, true),
665-
bytesize::to_string(allocated_space, false),
666-
bytesize::to_string(min_space, true),
667-
bytesize::to_string(min_space, false),
668-
min_space
669-
));
670-
}
671-
Err(error) => {
672-
return Err(error.into());
673-
}
674-
};
630+
let single_disk_farms = tokio::task::block_in_place(|| {
631+
let handle = Handle::current();
632+
633+
disk_farms
634+
.into_par_iter()
635+
.enumerate()
636+
.map(move |(disk_farm_index, disk_farm)| {
637+
let _tokio_handle_guard = handle.enter();
638+
639+
debug!(url = %node_rpc_url, %disk_farm_index, "Connecting to node RPC");
640+
let node_client = handle.block_on(NodeRpcClient::new(&node_rpc_url))?;
641+
642+
let single_disk_farm_fut = SingleDiskFarm::new::<_, _, PosTable>(
643+
SingleDiskFarmOptions {
644+
directory: disk_farm.directory.clone(),
645+
farmer_app_info: farmer_app_info.clone(),
646+
allocated_space: disk_farm.allocated_plotting_space,
647+
max_pieces_in_sector,
648+
node_client,
649+
reward_address,
650+
kzg: kzg.clone(),
651+
erasure_coding: erasure_coding.clone(),
652+
piece_getter: piece_getter.clone(),
653+
cache_percentage,
654+
downloading_semaphore: Arc::clone(&downloading_semaphore),
655+
record_encoding_concurrency,
656+
farm_during_initial_plotting,
657+
farming_thread_pool_size,
658+
plotting_thread_pool_manager: plotting_thread_pool_manager.clone(),
659+
disable_farm_locking,
660+
},
661+
disk_farm_index,
662+
);
675663

676-
if !no_info {
677-
let info = single_disk_farm.info();
678-
println!("Single disk farm {disk_farm_index}:");
679-
println!(" ID: {}", info.id());
680-
println!(" Genesis hash: 0x{}", hex::encode(info.genesis_hash()));
681-
println!(" Public key: 0x{}", hex::encode(info.public_key()));
682-
println!(
683-
" Allocated space: {} ({})",
684-
bytesize::to_string(info.allocated_space(), true),
685-
bytesize::to_string(info.allocated_space(), false)
686-
);
687-
println!(" Directory: {}", disk_farm.directory.display());
688-
}
664+
let single_disk_farm = match handle.block_on(single_disk_farm_fut) {
665+
Ok(single_disk_farm) => single_disk_farm,
666+
Err(SingleDiskFarmError::InsufficientAllocatedSpace {
667+
min_space,
668+
allocated_space,
669+
}) => {
670+
return Err(anyhow::anyhow!(
671+
"Allocated space {} ({}) is not enough, minimum is ~{} (~{}, \
672+
{} bytes to be exact)",
673+
bytesize::to_string(allocated_space, true),
674+
bytesize::to_string(allocated_space, false),
675+
bytesize::to_string(min_space, true),
676+
bytesize::to_string(min_space, false),
677+
min_space
678+
));
679+
}
680+
Err(error) => {
681+
return Err(error.into());
682+
}
683+
};
689684

690-
single_disk_farms.push(single_disk_farm);
691-
}
685+
if !no_info {
686+
let info = single_disk_farm.info();
687+
println!("Single disk farm {disk_farm_index}:");
688+
println!(" ID: {}", info.id());
689+
println!(" Genesis hash: 0x{}", hex::encode(info.genesis_hash()));
690+
println!(" Public key: 0x{}", hex::encode(info.public_key()));
691+
println!(
692+
" Allocated space: {} ({})",
693+
bytesize::to_string(info.allocated_space(), true),
694+
bytesize::to_string(info.allocated_space(), false)
695+
);
696+
println!(" Directory: {}", disk_farm.directory.display());
697+
}
698+
699+
Ok(single_disk_farm)
700+
})
701+
.collect::<Result<Vec<_>, _>>()
702+
})?;
692703

693704
// Acknowledgement is not necessary
694705
drop(

crates/subspace-farmer/src/single_disk_farm.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,9 @@ impl SingleDiskFarm {
621621
} = options;
622622
fs::create_dir_all(&directory)?;
623623

624+
let span = info_span!("", %disk_farm_index);
625+
let span_guard = span.enter();
626+
624627
let identity = Identity::open_or_create(&directory)?;
625628
let public_key = identity.public_key().to_bytes().into();
626629

@@ -918,8 +921,6 @@ impl SingleDiskFarm {
918921
(Some(sender), Some(receiver))
919922
};
920923

921-
let span = info_span!("", %disk_farm_index);
922-
923924
let plotting_join_handle = tokio::task::spawn_blocking({
924925
let sectors_metadata = Arc::clone(&sectors_metadata);
925926
let kzg = kzg.clone();
@@ -1179,6 +1180,8 @@ impl SingleDiskFarm {
11791180
Ok(())
11801181
}));
11811182

1183+
drop(span_guard);
1184+
11821185
let farm = Self {
11831186
farmer_protocol_info: farmer_app_info.protocol_info,
11841187
single_disk_farm_info,

0 commit comments

Comments
 (0)