Skip to content

Commit

Permalink
feat(network): add metrics for quinn connections
Browse files Browse the repository at this point in the history
  • Loading branch information
0xdeafbeef committed Mar 3, 2025
1 parent 604bbc3 commit e066007
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 20 deletions.
101 changes: 99 additions & 2 deletions network/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;

use bytes::Bytes;
use metrics::Label;
use quinn::{ConnectionError, SendDatagramError};
use tycho_util::metrics::spawn_metrics_loop;
use webpki::types::CertificateDer;

use crate::network::crypto::peer_id_from_certificate;
Expand All @@ -16,16 +19,106 @@ pub struct Connection {
request_meta: Arc<InboundRequestMeta>,
}

macro_rules! emit_gauges {
($prefix:literal, $stats:expr, $labels:expr, [ $($field:ident),* $(,)? ]) => {
$(
metrics::gauge!(concat!($prefix, stringify!($field)), $labels.clone())
.set($stats.$field as f64);
)*
};
}

impl Connection {
pub fn with_peer_id(inner: quinn::Connection, origin: Direction, peer_id: PeerId) -> Self {
Self {
let connection = Self {
request_meta: Arc::new(InboundRequestMeta {
peer_id,
origin,
remote_address: inner.remote_address(),
}),
inner,
}
};

let connection_ref = Arc::new(connection.clone());
let peer_id = peer_id.to_string();
let remote_addr = connection.remote_address().to_string();

spawn_metrics_loop(&connection_ref, Duration::from_secs(5), move |conn| {
let peer_id = peer_id.clone();
let remote_addr = remote_addr.clone();
let labels = vec![
Label::new("peer_id", peer_id),
Label::new("addr", remote_addr),
];

async move {
let stats = conn.stats();

metrics::gauge!("tycho_network_connection_rtt_ms", labels.clone())
.set(stats.path.rtt.as_millis() as f64);

metrics::gauge!("tycho_network_connection_invalid_messages", labels.clone()).set(
stats.frame_rx.connection_close as f64 + stats.frame_rx.reset_stream as f64,
);

emit_gauges!("tycho_network_connection_", stats.path, labels, [
cwnd, // Congestion window size
congestion_events, // Network congestion indicators
lost_packets, // Total packet loss
lost_bytes, // Byte-level loss tracking
sent_packets // Baseline for loss calculations
]);

emit_gauges!("tycho_network_connection_rx_", stats.udp_rx, labels, [
datagrams, bytes
]);

emit_gauges!("tycho_network_connection_tx_", stats.udp_tx, labels, [
datagrams, bytes
]);

// Frame RX
emit_gauges!(
"tycho_network_connection_rx_",
stats.frame_rx,
labels.clone(),
[
acks, // Ack volume
crypto, // Handshake overhead
connection_close, // Unexpected termination
data_blocked, // Flow control limits
datagram, // Main message carrier
max_data, // Connection-level flow control
max_stream_data, // Stream-level flow control
ping, // Keepalives
reset_stream, // Stream errors
stream_data_blocked, // Per-stream bottlenecks
streams_blocked_bidi, // Stream quota issues
stop_sending, // Critical flow control
stream // Stream-based messaging
]
);

// Frame TX
emit_gauges!("tycho_network_connection_tx_", stats.frame_tx, labels, [
acks,
crypto,
connection_close,
data_blocked,
datagram,
max_data,
max_stream_data,
ping,
reset_stream,
stream_data_blocked,
streams_blocked_bidi,
stop_sending,
stream
]);
}
});

connection
}

pub fn request_meta(&self) -> &Arc<InboundRequestMeta> {
Expand Down Expand Up @@ -81,6 +174,10 @@ impl Connection {
pub async fn read_datagram(&self) -> Result<Bytes, ConnectionError> {
self.inner.read_datagram().await
}

pub fn stats(&self) -> quinn::ConnectionStats {
self.inner.stats()
}
}

impl std::fmt::Debug for Connection {
Expand Down
3 changes: 3 additions & 0 deletions scripts/check-metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def process_metric_arg(
blacklisted_dirs: [str],
):
arg = arg.split(",")[0].strip() # Remove labels
if "$" in arg:
# It's a macro
return
if arg.startswith('"') and arg.endswith('"'):
# It's a string literal
metric_names.add(arg[1:-1])
Expand Down
Loading

0 comments on commit e066007

Please sign in to comment.