Skip to content

Commit

Permalink
[metrics] adding more metrics for primary node (MystenLabs#461)
Browse files Browse the repository at this point in the history
Added more metrics for primary node across the core, header_waiter, certificate_waiter & primary to primary endpoint requests.
  • Loading branch information
akichidis authored Jul 8, 2022
1 parent 8053c9d commit 592ae2b
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 11 deletions.
14 changes: 14 additions & 0 deletions narwhal/primary/src/certificate_waiter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2021, Facebook, Inc. and its affiliates
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::metrics::PrimaryMetrics;
use config::Committee;
use crypto::traits::VerifyingKey;
use futures::{
Expand Down Expand Up @@ -49,6 +50,8 @@ pub struct CertificateWaiter<PublicKey: VerifyingKey> {
/// resume when we get all their dependencies. The map holds a cancellation `Sender`
/// which we can use to give up on a certificate.
pending: HashMap<HeaderDigest, (Round, Sender<()>)>,
/// The metrics handler
metrics: Arc<PrimaryMetrics>,
}

impl<PublicKey: VerifyingKey> CertificateWaiter<PublicKey> {
Expand All @@ -60,6 +63,7 @@ impl<PublicKey: VerifyingKey> CertificateWaiter<PublicKey> {
rx_reconfigure: watch::Receiver<Reconfigure<PublicKey>>,
rx_synchronizer: Receiver<Certificate<PublicKey>>,
tx_core: Sender<Certificate<PublicKey>>,
metrics: Arc<PrimaryMetrics>,
) -> JoinHandle<()> {
tokio::spawn(async move {
Self {
Expand All @@ -71,6 +75,7 @@ impl<PublicKey: VerifyingKey> CertificateWaiter<PublicKey> {
rx_synchronizer,
tx_core,
pending: HashMap::new(),
metrics,
}
.run()
.await;
Expand Down Expand Up @@ -162,6 +167,15 @@ impl<PublicKey: VerifyingKey> CertificateWaiter<PublicKey> {
}
self.pending.retain(|_, (r, _)| *r > gc_round + 1);
}

self.update_metrics();
}
}

fn update_metrics(&self) {
self.metrics
.pending_elements_certificate_waiter
.with_label_values(&[&self.committee.epoch.to_string()])
.set(self.pending.len() as i64);
}
}
29 changes: 23 additions & 6 deletions narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,12 +180,29 @@ impl<PublicKey: VerifyingKey> Core<PublicKey> {
#[instrument(level = "debug", skip_all)]
async fn process_header(&mut self, header: &Header<PublicKey>) -> DagResult<()> {
debug!("Processing {:?}", header);
let header_source = if self.name.eq(&header.author) {
"own"
} else {
"other"
};

// Indicate that we are processing this header.
self.processing
let inserted = self
.processing
.entry(header.round)
.or_insert_with(HashSet::new)
.insert(header.id);

if inserted {
// Only increase the metric when the header has been seen for the first
// time. Edge case is headers received past gc_round so we might have already
// processed them, but not big issue for now.
self.metrics
.unique_headers_received
.with_label_values(&[&header.epoch.to_string(), header_source])
.inc();
}

// If the following condition is valid, it means we already garbage collected the parents. There is thus
// no points in trying to synchronize them or vote for the header. We just need to gather the payload.
if self.gc_round >= header.round {
Expand Down Expand Up @@ -240,11 +257,6 @@ impl<PublicKey: VerifyingKey> Core<PublicKey> {
// Store the header.
self.header_store.write(header.id, header.clone()).await;

let header_source = if self.name.eq(&header.author) {
"own"
} else {
"other"
};
self.metrics
.headers_processed
.with_label_values(&[&header.epoch.to_string(), header_source])
Expand Down Expand Up @@ -579,6 +591,11 @@ impl<PublicKey: VerifyingKey> Core<PublicKey> {
.with_label_values(&[&self.committee.epoch.to_string()])
.observe(now.elapsed().as_secs_f64());
}

self.metrics
.core_cancel_handlers_total
.with_label_values(&[&self.committee.epoch.to_string()])
.set(self.cancel_handlers.len() as i64);
}
}
}
7 changes: 6 additions & 1 deletion narwhal/primary/src/header_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,11 +350,16 @@ impl<PublicKey: VerifyingKey> HeaderWaiter<PublicKey> {
.observe(now.elapsed().as_secs_f64());
}

// measure the pending elements
// measure the pending & parent elements
self.metrics
.pending_elements_header_waiter
.with_label_values(&[&self.committee.epoch.to_string()])
.set(self.pending.len() as i64);

self.metrics
.parent_requests_header_waiter
.with_label_values(&[&self.committee.epoch.to_string()])
.set(self.parent_requests.len() as i64);
}
}
}
101 changes: 99 additions & 2 deletions narwhal/primary/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::EndpointMetrics;
use mysten_network::metrics::MetricsCallbackProvider;
use prometheus::{
default_registry, register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry, HistogramVec, IntCounterVec, IntGaugeVec, Registry,
};
use std::sync::Once;
use std::{sync::Once, time::Duration};
use tonic::Code;

#[derive(Clone)]
pub(crate) struct Metrics {
pub(crate) endpoint_metrics: Option<EndpointMetrics>,
pub(crate) primary_endpoint_metrics: Option<PrimaryEndpointMetrics>,
pub(crate) node_metrics: Option<PrimaryMetrics>,
}

static mut METRICS: Metrics = Metrics {
endpoint_metrics: None,
primary_endpoint_metrics: None,
node_metrics: None,
};
static INIT: Once = Once::new();
Expand All @@ -25,15 +29,19 @@ static INIT: Once = Once::new();
pub(crate) fn initialise_metrics(metrics_registry: &Registry) -> Metrics {
unsafe {
INIT.call_once(|| {
// The metrics used for the primary node endpoints
// The metrics used for the gRPC primary node endpoints we expose to the external consensus
let endpoint_metrics = EndpointMetrics::new(metrics_registry);

// The metrics used for the primary-to-primary communication node endpoints
let primary_endpoint_metrics = PrimaryEndpointMetrics::new(metrics_registry);

// Essential/core metrics across the primary node
let node_metrics = PrimaryMetrics::new(metrics_registry);

METRICS = Metrics {
node_metrics: Some(node_metrics),
endpoint_metrics: Some(endpoint_metrics),
primary_endpoint_metrics: Some(primary_endpoint_metrics),
}
});
METRICS.clone()
Expand All @@ -44,6 +52,8 @@ pub(crate) fn initialise_metrics(metrics_registry: &Registry) -> Metrics {
pub struct PrimaryMetrics {
/// count number of headers that the node processed (others + own)
pub headers_processed: IntCounterVec,
/// count unique number of headers that we have received for processing (others + own)
pub unique_headers_received: IntCounterVec,
/// count number of headers that we suspended their processing
pub headers_suspended: IntCounterVec,
/// count number of certificates that the node created
Expand All @@ -56,12 +66,18 @@ pub struct PrimaryMetrics {
pub batches_received: IntCounterVec,
/// Latency to perform a garbage collection in core module
pub gc_core_latency: HistogramVec,
/// Number of cancel handlers for core module
pub core_cancel_handlers_total: IntGaugeVec,
/// The current Narwhal round
pub current_round: IntGaugeVec,
/// Latency to perform a garbage collection in header_waiter
pub gc_header_waiter_latency: HistogramVec,
/// Number of elements in pending list of header_waiter
pub pending_elements_header_waiter: IntGaugeVec,
/// Number of parent requests list of header_waiter
pub parent_requests_header_waiter: IntGaugeVec,
/// Number of elements in pending list of certificate_waiter
pub pending_elements_certificate_waiter: IntGaugeVec,
}

impl PrimaryMetrics {
Expand All @@ -74,6 +90,13 @@ impl PrimaryMetrics {
registry
)
.unwrap(),
unique_headers_received: register_int_counter_vec_with_registry!(
"unique_headers_received",
"Number of unique headers that received for processing (others + own)",
&["epoch", "source"],
registry
)
.unwrap(),
headers_suspended: register_int_counter_vec_with_registry!(
"headers_suspended",
"Number of headers that node suspended processing for",
Expand Down Expand Up @@ -116,6 +139,13 @@ impl PrimaryMetrics {
registry
)
.unwrap(),
core_cancel_handlers_total: register_int_gauge_vec_with_registry!(
"core_cancel_handlers_total",
"Number of cancel handlers in the core module",
&["epoch"],
registry
)
.unwrap(),
current_round: register_int_gauge_vec_with_registry!(
"current_round",
"Current round the node is in",
Expand All @@ -137,6 +167,20 @@ impl PrimaryMetrics {
registry
)
.unwrap(),
parent_requests_header_waiter: register_int_gauge_vec_with_registry!(
"parent_requests_header_waiter",
"Number of parent requests in header waiter",
&["epoch"],
registry
)
.unwrap(),
pending_elements_certificate_waiter: register_int_gauge_vec_with_registry!(
"pending_elements_certificate_waiter",
"Number of pending elements in certificate waiter",
&["epoch"],
registry
)
.unwrap(),
}
}
}
Expand All @@ -146,3 +190,56 @@ impl Default for PrimaryMetrics {
Self::new(default_registry())
}
}

#[derive(Clone)]
pub struct PrimaryEndpointMetrics {
/// Counter of requests, route is a label (ie separate timeseries per route)
requests_by_route: IntCounterVec,
/// Request latency, route is a label
req_latency_by_route: HistogramVec,
}

impl PrimaryEndpointMetrics {
pub fn new(registry: &Registry) -> Self {
Self {
requests_by_route: register_int_counter_vec_with_registry!(
"primary_requests_by_route",
"Number of requests by route",
&["route", "status", "grpc_status_code"],
registry
)
.unwrap(),
req_latency_by_route: register_histogram_vec_with_registry!(
"primary_req_latency_by_route",
"Latency of a request by route",
&["route", "status", "grpc_status_code"],
registry
)
.unwrap(),
}
}
}

impl MetricsCallbackProvider for PrimaryEndpointMetrics {
fn on_request(&self, _path: String) {
// For now we just do nothing
}

fn on_response(&self, path: String, latency: Duration, status: u16, grpc_status_code: Code) {
let code: i32 = grpc_status_code.into();
let labels = [path.as_str(), &status.to_string(), &code.to_string()];

self.requests_by_route.with_label_values(&labels).inc();

let req_latency_secs = latency.as_secs_f64();
self.req_latency_by_route
.with_label_values(&labels)
.observe(req_latency_secs);
}
}

impl Default for PrimaryEndpointMetrics {
fn default() -> Self {
Self::new(default_registry())
}
}
8 changes: 6 additions & 2 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
grpc_server::ConsensusAPIGrpc,
header_waiter::HeaderWaiter,
helper::Helper,
metrics::{initialise_metrics, PrimaryMetrics},
metrics::{initialise_metrics, PrimaryEndpointMetrics, PrimaryMetrics},
payload_receiver::PayloadReceiver,
proposer::Proposer,
state_handler::StateHandler,
Expand Down Expand Up @@ -136,6 +136,7 @@ impl Primary {
// Initialise the metrics
let metrics = initialise_metrics(registry);
let endpoint_metrics = metrics.endpoint_metrics.unwrap();
let primary_endpoint_metrics = metrics.primary_endpoint_metrics.unwrap();
let node_metrics = Arc::new(metrics.node_metrics.unwrap());

// Atomic variable use to synchronize all tasks with the latest consensus round. This is only
Expand All @@ -161,6 +162,7 @@ impl Primary {
address.clone(),
parameters.max_concurrent_requests,
tx_reconfigure.subscribe(),
primary_endpoint_metrics,
);
info!(
"Primary {} listening to primary messages on {}",
Expand Down Expand Up @@ -311,6 +313,7 @@ impl Primary {
tx_reconfigure.subscribe(),
/* rx_synchronizer */ rx_sync_certificates,
/* tx_core */ tx_certificates_loopback,
node_metrics.clone(),
);

// When the `Core` collects enough parent certificates, the `Proposer` generates a new header with new batch
Expand Down Expand Up @@ -418,13 +421,14 @@ impl<PublicKey: VerifyingKey> PrimaryReceiverHandler<PublicKey> {
address: Multiaddr,
max_concurrent_requests: usize,
rx_reconfigure: watch::Receiver<Reconfigure<PublicKey>>,
primary_endpoint_metrics: PrimaryEndpointMetrics,
) -> JoinHandle<()> {
tokio::spawn(async move {
let mut config = mysten_network::config::Config::new();
config.concurrency_limit_per_connection = Some(max_concurrent_requests);
tokio::select! {
_result = config
.server_builder()
.server_builder_with_metrics(primary_endpoint_metrics)
.add_service(PrimaryToPrimaryServer::new(self))
.bind(&address)
.await
Expand Down

0 comments on commit 592ae2b

Please sign in to comment.