Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce TX/RX buffers allocation #1749

Merged
merged 7 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,12 @@
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: 1,
},
allocation: {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a result that shows that the first allocation has a sensible effect, which would justify the need of this configuration addition?

/// Mode for memory allocation of batches in the priority queues.
/// - "init": batches are allocated at queue initialization time.
/// - "lazy": batches are allocated when needed up to the maximum number of batches configured in the size configuration parameter.
mode: "init",
},
},
},
/// Configure the zenoh RX parameters of a link
Expand Down
15 changes: 15 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,13 @@ validated_struct::validator! {
/// The maximum time limit (in ms) a message should be retained for batching when back-pressure happens.
time_limit: u64,
},
/// Perform lazy memory allocation of batches in the prioritiey queues. If set to false all batches are initialized at
/// initialization time. If set to true the batches will be allocated when needed up to the maximum number of batches
/// configured in the size configuration parameter.
pub allocation: #[derive(Default, Copy, PartialEq, Eq)]
QueueAllocConf {
pub mode: QueueAllocMode,
},
},
// Number of threads used for TX
threads: usize,
Expand Down Expand Up @@ -619,6 +626,14 @@ validated_struct::validator! {
}
}

#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum QueueAllocMode {
#[default]
Init,
Lazy,
}

impl Default for PermissionsConf {
fn default() -> Self {
PermissionsConf {
Expand Down
40 changes: 33 additions & 7 deletions io/zenoh-transport/src/common/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use zenoh_buffers::{
ZBuf,
};
use zenoh_codec::{transport::batch::BatchError, WCodec, Zenoh080};
use zenoh_config::QueueSizeConf;
use zenoh_config::{QueueAllocConf, QueueAllocMode, QueueSizeConf};
use zenoh_core::zlock;
use zenoh_protocol::{
core::Priority,
Expand All @@ -55,6 +55,8 @@ const RBLEN: usize = QueueSizeConf::MAX;
struct StageInRefill {
n_ref_r: Waiter,
s_ref_r: RingBufferReader<WBatch, RBLEN>,
batch_config: (usize, BatchConfig),
batch_allocs: usize,
}

#[derive(Debug)]
Expand All @@ -68,7 +70,14 @@ impl std::error::Error for TransportClosed {}

impl StageInRefill {
fn pull(&mut self) -> Option<WBatch> {
self.s_ref_r.pull()
match self.s_ref_r.pull() {
Some(b) => Some(b),
None if self.batch_allocs < self.batch_config.0 => {
self.batch_allocs += 1;
Some(WBatch::new(self.batch_config.1))
}
None => None,
}
}

fn wait(&self) -> bool {
Expand Down Expand Up @@ -637,6 +646,7 @@ pub(crate) struct TransmissionPipelineConf {
pub(crate) wait_before_close: Duration,
pub(crate) batching_enabled: bool,
pub(crate) batching_time_limit: Duration,
pub(crate) queue_alloc: QueueAllocConf,
}

// A 2-stage transmission pipeline
Expand Down Expand Up @@ -667,10 +677,14 @@ impl TransmissionPipeline {
// Create the refill ring buffer
// This is a SPSC ring buffer
let (mut s_ref_w, s_ref_r) = RingBuffer::<WBatch, RBLEN>::init();
// Fill the refill ring buffer with batches
for _ in 0..*num {
let batch = WBatch::new(config.batch);
assert!(s_ref_w.push(batch).is_none());
let mut batch_allocs = 0;
if *config.queue_alloc.mode() == QueueAllocMode::Init {
// Fill the refill ring buffer with batches
for _ in 0..*num {
let batch = WBatch::new(config.batch);
batch_allocs += 1;
assert!(s_ref_w.push(batch).is_none());
}
}
// Create the channel for notifying that new batches are in the refill ring buffer
// This is a SPSC channel
Expand All @@ -689,7 +703,12 @@ impl TransmissionPipeline {
});

stage_in.push(Mutex::new(StageIn {
s_ref: StageInRefill { n_ref_r, s_ref_r },
s_ref: StageInRefill {
n_ref_r,
s_ref_r,
batch_config: (*num, config.batch),
batch_allocs,
},
s_out: StageInOut {
n_out_w: n_out_w.clone(),
s_out_w,
Expand Down Expand Up @@ -963,6 +982,7 @@ mod tests {
ZBuf,
};
use zenoh_codec::{RCodec, Zenoh080};
use zenoh_config::{QueueAllocConf, QueueAllocMode};
use zenoh_protocol::{
core::{Bits, CongestionControl, Encoding, Priority},
network::{ext, Push},
Expand All @@ -988,6 +1008,9 @@ mod tests {
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: Duration::from_secs(5),
batching_time_limit: Duration::from_micros(1),
queue_alloc: QueueAllocConf {
mode: QueueAllocMode::Init,
},
};

const CONFIG_NOT_STREAMED: TransmissionPipelineConf = TransmissionPipelineConf {
Expand All @@ -1002,6 +1025,9 @@ mod tests {
wait_before_drop: (Duration::from_millis(1), Duration::from_millis(1024)),
wait_before_close: Duration::from_secs(5),
batching_time_limit: Duration::from_micros(1),
queue_alloc: QueueAllocConf {
mode: QueueAllocMode::Init,
},
};

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
Expand Down
12 changes: 11 additions & 1 deletion io/zenoh-transport/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration};

use rand::{RngCore, SeedableRng};
use tokio::sync::Mutex as AsyncMutex;
use zenoh_config::{Config, LinkRxConf, QueueConf, QueueSizeConf};
use zenoh_config::{Config, LinkRxConf, QueueAllocConf, QueueConf, QueueSizeConf};
use zenoh_crypto::{BlockCipher, PseudoRng};
use zenoh_link::NewLinkChannelSender;
use zenoh_protocol::{
Expand Down Expand Up @@ -112,6 +112,7 @@ pub struct TransportManagerConfig {
pub wait_before_close: Duration,
pub queue_size: [usize; Priority::NUM],
pub queue_backoff: Duration,
pub queue_alloc: QueueAllocConf,
pub defrag_buff_size: usize,
pub link_rx_buffer_size: usize,
pub unicast: TransportManagerConfigUnicast,
Expand Down Expand Up @@ -143,6 +144,7 @@ pub struct TransportManagerBuilder {
wait_before_drop: (Duration, Duration),
wait_before_close: Duration,
queue_size: QueueSizeConf,
queue_alloc: QueueAllocConf,
defrag_buff_size: usize,
link_rx_buffer_size: usize,
unicast: TransportManagerBuilderUnicast,
Expand Down Expand Up @@ -191,6 +193,11 @@ impl TransportManagerBuilder {
self
}

pub fn queue_alloc(mut self, queue_alloc: QueueAllocConf) -> Self {
self.queue_alloc = queue_alloc;
self
}

pub fn wait_before_drop(mut self, wait_before_drop: (Duration, Duration)) -> Self {
self.wait_before_drop = wait_before_drop;
self
Expand Down Expand Up @@ -266,6 +273,7 @@ impl TransportManagerBuilder {
));
self = self.wait_before_close(duration_from_i64us(*cc_block.wait_before_close()));
self = self.queue_size(link.tx().queue().size().clone());
self = self.queue_alloc(*link.tx().queue().allocation());
self = self.tx_threads(*link.tx().threads());
self = self.protocols(link.protocols().clone());

Expand Down Expand Up @@ -326,6 +334,7 @@ impl TransportManagerBuilder {
wait_before_close: self.wait_before_close,
queue_size,
queue_backoff: self.batching_time_limit,
queue_alloc: self.queue_alloc,
defrag_buff_size: self.defrag_buff_size,
link_rx_buffer_size: self.link_rx_buffer_size,
unicast: unicast.config,
Expand Down Expand Up @@ -377,6 +386,7 @@ impl Default for TransportManagerBuilder {
),
wait_before_close: duration_from_i64us(*cc_block.wait_before_close()),
queue_size: queue.size,
queue_alloc: queue.allocation,
batching_time_limit: Duration::from_millis(backoff),
defrag_buff_size: *link_rx.max_message_size(),
link_rx_buffer_size: *link_rx.buffer_size(),
Expand Down
6 changes: 4 additions & 2 deletions io/zenoh-transport/src/multicast/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ impl TransportLinkMulticastUniversal {
wait_before_close: self.transport.manager.config.wait_before_close,
batching_enabled: self.transport.manager.config.batching,
batching_time_limit: self.transport.manager.config.queue_backoff,
queue_alloc: self.transport.manager.config.queue_alloc,
};
// The pipeline
let (producer, consumer) = TransmissionPipeline::make(tpc, &priority_tx);
Expand Down Expand Up @@ -543,8 +544,9 @@ async fn rx_task(
// The pool of buffers
let mtu = link.inner.config.batch.mtu as usize;
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
if n == 0 {
tracing::debug!("RX configured buffer of {rx_buffer_size} bytes is too small for {link} that has an MTU of {mtu} bytes. Defaulting to {mtu} bytes for RX buffer.");
n = 1;
}

let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice());
Expand Down
5 changes: 3 additions & 2 deletions io/zenoh-transport/src/unicast/lowlatency/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ impl TransportUnicastLowlatency {
let pool = {
let mtu = link_rx.config.batch.mtu as usize;
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
if n == 0 {
tracing::debug!("RX configured buffer of {rx_buffer_size} bytes is too small for {link_rx} that has an MTU of {mtu} bytes. Defaulting to {mtu} bytes for RX buffer.");
n = 1;
}
zenoh_sync::RecyclingObjectPool::new(n, move || vec![0_u8; mtu].into_boxed_slice())
};
Expand Down
6 changes: 4 additions & 2 deletions io/zenoh-transport/src/unicast/universal/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl TransportLinkUnicastUniversal {
wait_before_close: transport.manager.config.wait_before_close,
batching_enabled: transport.manager.config.batching,
batching_time_limit: transport.manager.config.queue_backoff,
queue_alloc: transport.manager.config.queue_alloc,
};

// The pipeline
Expand Down Expand Up @@ -261,8 +262,9 @@ async fn rx_task(
// The pool of buffers
let mtu = link.config.batch.mtu as usize;
let mut n = rx_buffer_size / mtu;
if rx_buffer_size % mtu != 0 {
n += 1;
if n == 0 {
tracing::debug!("RX configured buffer of {rx_buffer_size} bytes is too small for {link} that has an MTU of {mtu} bytes. Defaulting to {mtu} bytes for RX buffer.");
n = 1;
}

let pool = RecyclingObjectPool::new(n, || vec![0_u8; mtu].into_boxed_slice());
Expand Down