Skip to content

Commit 7dd004d

Browse files
authored
Merge pull request #2483 from subspace/improve-farmer-thread-allocation
Improve farmer thread allocation
2 parents 56ff653 + cc0aff4 commit 7dd004d

File tree

4 files changed

+52
-38
lines changed

4 files changed

+52
-38
lines changed

Cargo.lock

-7
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/subspace-farmer/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ hwlocality = { version = "1.0.0-alpha.1", features = ["vendored"], optional = tr
3232
jsonrpsee = { version = "0.16.3", features = ["client"] }
3333
lru = "0.12.1"
3434
mimalloc = "0.1.39"
35-
libmimalloc-sys = { version = "0.1.35", features = ["extended"] }
35+
libmimalloc-sys = "0.1.35"
3636
num_cpus = "1.16.0"
3737
parity-scale-codec = "3.6.9"
3838
parking_lot = "0.12.1"

crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs

+24-28
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use subspace_farmer::utils::ss58::parse_ss58_reward_address;
3535
use subspace_farmer::utils::{
3636
all_cpu_cores, create_plotting_thread_pool_manager, parse_cpu_cores_sets,
3737
recommended_number_of_farming_threads, run_future_in_dedicated_thread,
38-
thread_pool_core_indices, AsyncJoinOnDrop,
38+
thread_pool_core_indices, AsyncJoinOnDrop, CpuCoreSet,
3939
};
4040
use subspace_farmer::{Identity, NodeClient, NodeRpcClient};
4141
use subspace_farmer_components::plotting::PlottedSector;
@@ -132,7 +132,8 @@ pub(crate) struct FarmingArgs {
132132
farm_during_initial_plotting: bool,
133133
/// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some
134134
/// compute-intensive operations during proving), defaults to number of logical CPUs
135-
/// available on UMA system and number of logical CPUs in first NUMA node on NUMA system
135+
/// available on UMA system and number of logical CPUs in first NUMA node on NUMA system, but
136+
/// not more than 32 threads
136137
#[arg(long)]
137138
farming_thread_pool_size: Option<NonZeroUsize>,
138139
/// Size of one thread pool used for plotting, defaults to number of logical CPUs available
@@ -469,8 +470,8 @@ where
469470
None => farmer_app_info.protocol_info.max_pieces_in_sector,
470471
};
471472

472-
let plotting_thread_pool_core_indices;
473-
let replotting_thread_pool_core_indices;
473+
let mut plotting_thread_pool_core_indices;
474+
let mut replotting_thread_pool_core_indices;
474475
if let Some(plotting_cpu_cores) = plotting_cpu_cores {
475476
plotting_thread_pool_core_indices = parse_cpu_cores_sets(&plotting_cpu_cores)
476477
.map_err(|error| anyhow::anyhow!("Failed to parse `--plotting-cpu-cores`: {error}"))?;
@@ -503,6 +504,25 @@ where
503504
}
504505
replotting_thread_pool_core_indices
505506
};
507+
508+
if plotting_thread_pool_core_indices.len() > 1 {
509+
info!(
510+
l3_cache_groups = %plotting_thread_pool_core_indices.len(),
511+
"Multiple L3 cache groups detected"
512+
);
513+
514+
if plotting_thread_pool_core_indices.len() > disk_farms.len() {
515+
plotting_thread_pool_core_indices =
516+
CpuCoreSet::regroup(&plotting_thread_pool_core_indices, disk_farms.len());
517+
replotting_thread_pool_core_indices =
518+
CpuCoreSet::regroup(&replotting_thread_pool_core_indices, disk_farms.len());
519+
520+
info!(
521+
farms_count = %disk_farms.len(),
522+
"Regrouped CPU cores to match number of farms, more farms may leverage CPU more efficiently"
523+
);
524+
}
525+
}
506526
}
507527

508528
let downloading_semaphore = Arc::new(Semaphore::new(
@@ -520,30 +540,6 @@ where
520540
.map(|farming_thread_pool_size| farming_thread_pool_size.get())
521541
.unwrap_or_else(recommended_number_of_farming_threads);
522542

523-
let all_cpu_cores = all_cpu_cores();
524-
if all_cpu_cores.len() > 1 {
525-
info!(l3_cache_groups = %all_cpu_cores.len(), "Multiple L3 cache groups detected");
526-
527-
if all_cpu_cores.len() > disk_farms.len() {
528-
warn!(
529-
l3_cache_groups = %all_cpu_cores.len(),
530-
farms_count = %disk_farms.len(),
531-
"Too few disk farms, CPU will not be utilized fully during plotting, same number \
532-
of farms as L3 cache groups or more is recommended"
533-
);
534-
}
535-
}
536-
537-
// TODO: Remove code or environment variable once identified whether it helps or not
538-
if std::env::var("NUMA_ALLOCATOR").is_ok() && all_cpu_cores.len() > 1 {
539-
unsafe {
540-
libmimalloc_sys::mi_option_set(
541-
libmimalloc_sys::mi_option_use_numa_nodes,
542-
all_cpu_cores.len() as std::ffi::c_long,
543-
);
544-
}
545-
}
546-
547543
let mut plotting_delay_senders = Vec::with_capacity(disk_farms.len());
548544

549545
for (disk_farm_index, disk_farm) in disk_farms.into_iter().enumerate() {

crates/subspace-farmer/src/utils.rs

+27-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ use tracing::debug;
2323
#[cfg(feature = "numa")]
2424
use tracing::warn;
2525

26+
/// It doesn't make a lot of sense to have a huge number of farming threads, 32 is plenty
27+
const MAX_DEFAULT_FARMING_THREADS: usize = 32;
28+
2629
/// Joins async join handle on drop
2730
pub struct AsyncJoinOnDrop<T> {
2831
handle: Option<task::JoinHandle<T>>,
@@ -151,6 +154,27 @@ pub struct CpuCoreSet {
151154
}
152155

153156
impl CpuCoreSet {
157+
/// Regroup CPU core sets to contain at most `target_sets` sets, useful when there are many L3
158+
/// cache groups and not as many farms
159+
pub fn regroup(cpu_core_sets: &[Self], target_sets: usize) -> Vec<Self> {
160+
cpu_core_sets
161+
// Chunk CPU core sets
162+
.chunks(cpu_core_sets.len().div_ceil(target_sets))
163+
.map(|sets| Self {
164+
// Combine CPU cores
165+
cores: sets
166+
.iter()
167+
.flat_map(|set| set.cores.iter())
168+
.copied()
169+
.collect(),
170+
// Preserve topology object
171+
#[cfg(feature = "numa")]
172+
topology: sets[0].topology.clone(),
173+
})
174+
.collect()
175+
}
176+
177+
/// Get cpu core numbers in this set
154178
pub fn cpu_cores(&self) -> &[usize] {
155179
&self.cores
156180
}
@@ -201,13 +225,14 @@ pub fn recommended_number_of_farming_threads() -> usize {
201225
// Get number of CPU cores
202226
.map(|cpuset| cpuset.iter_set().count())
203227
.find(|&count| count > 0)
204-
.unwrap_or_else(num_cpus::get);
228+
.unwrap_or_else(num_cpus::get)
229+
.min(MAX_DEFAULT_FARMING_THREADS);
205230
}
206231
Err(error) => {
207232
warn!(%error, "Failed to get NUMA topology");
208233
}
209234
}
210-
num_cpus::get()
235+
num_cpus::get().min(MAX_DEFAULT_FARMING_THREADS)
211236
}
212237

213238
/// Get all cpu cores, grouped into sets according to NUMA nodes or L3 cache groups on large CPUs.

0 commit comments

Comments
 (0)