From 592ae2b354187d31a790b51ccd01168048e4ce24 Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Fri, 8 Jul 2022 15:05:48 +0100 Subject: [PATCH] [metrics] adding more metrics for primary node (#461) Added more metrics for primary node across the core, header_waiter, certificate_waiter & primary to primary endpoint requests. --- narwhal/primary/src/certificate_waiter.rs | 14 +++ narwhal/primary/src/core.rs | 29 +++++-- narwhal/primary/src/header_waiter.rs | 7 +- narwhal/primary/src/metrics.rs | 101 +++++++++++++++++++++- narwhal/primary/src/primary.rs | 8 +- 5 files changed, 148 insertions(+), 11 deletions(-) diff --git a/narwhal/primary/src/certificate_waiter.rs b/narwhal/primary/src/certificate_waiter.rs index ab8ae7271108f..d673cf6fbc7a0 100644 --- a/narwhal/primary/src/certificate_waiter.rs +++ b/narwhal/primary/src/certificate_waiter.rs @@ -1,6 +1,7 @@ // Copyright (c) 2021, Facebook, Inc. and its affiliates // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::metrics::PrimaryMetrics; use config::Committee; use crypto::traits::VerifyingKey; use futures::{ @@ -49,6 +50,8 @@ pub struct CertificateWaiter { /// resume when we get all their dependencies. The map holds a cancellation `Sender` /// which we can use to give up on a certificate. pending: HashMap)>, + /// The metrics handler + metrics: Arc, } impl CertificateWaiter { @@ -60,6 +63,7 @@ impl CertificateWaiter { rx_reconfigure: watch::Receiver>, rx_synchronizer: Receiver>, tx_core: Sender>, + metrics: Arc, ) -> JoinHandle<()> { tokio::spawn(async move { Self { @@ -71,6 +75,7 @@ impl CertificateWaiter { rx_synchronizer, tx_core, pending: HashMap::new(), + metrics, } .run() .await; @@ -162,6 +167,15 @@ impl CertificateWaiter { } self.pending.retain(|_, (r, _)| *r > gc_round + 1); } + + self.update_metrics(); } } + + fn update_metrics(&self) { + self.metrics + .pending_elements_certificate_waiter + .with_label_values(&[&self.committee.epoch.to_string()]) + .set(self.pending.len() as i64); + } } diff --git a/narwhal/primary/src/core.rs b/narwhal/primary/src/core.rs index b78dd3d56ce38..54bf0fd823324 100644 --- a/narwhal/primary/src/core.rs +++ b/narwhal/primary/src/core.rs @@ -180,12 +180,29 @@ impl Core { #[instrument(level = "debug", skip_all)] async fn process_header(&mut self, header: &Header) -> DagResult<()> { debug!("Processing {:?}", header); + let header_source = if self.name.eq(&header.author) { + "own" + } else { + "other" + }; + // Indicate that we are processing this header. - self.processing + let inserted = self + .processing .entry(header.round) .or_insert_with(HashSet::new) .insert(header.id); + if inserted { + // Only increase the metric when the header has been seen for the first + // time. Edge case is headers received past gc_round so we might have already + // processed them, but not big issue for now. + self.metrics + .unique_headers_received + .with_label_values(&[&header.epoch.to_string(), header_source]) + .inc(); + } + // If the following condition is valid, it means we already garbage collected the parents. There is thus // no points in trying to synchronize them or vote for the header. We just need to gather the payload. if self.gc_round >= header.round { @@ -240,11 +257,6 @@ impl Core { // Store the header. self.header_store.write(header.id, header.clone()).await; - let header_source = if self.name.eq(&header.author) { - "own" - } else { - "other" - }; self.metrics .headers_processed .with_label_values(&[&header.epoch.to_string(), header_source]) @@ -579,6 +591,11 @@ impl Core { .with_label_values(&[&self.committee.epoch.to_string()]) .observe(now.elapsed().as_secs_f64()); } + + self.metrics + .core_cancel_handlers_total + .with_label_values(&[&self.committee.epoch.to_string()]) + .set(self.cancel_handlers.len() as i64); } } } diff --git a/narwhal/primary/src/header_waiter.rs b/narwhal/primary/src/header_waiter.rs index 72fed9b97d4c5..57cb72dad3466 100644 --- a/narwhal/primary/src/header_waiter.rs +++ b/narwhal/primary/src/header_waiter.rs @@ -350,11 +350,16 @@ impl HeaderWaiter { .observe(now.elapsed().as_secs_f64()); } - // measure the pending elements + // measure the pending & parent elements self.metrics .pending_elements_header_waiter .with_label_values(&[&self.committee.epoch.to_string()]) .set(self.pending.len() as i64); + + self.metrics + .parent_requests_header_waiter + .with_label_values(&[&self.committee.epoch.to_string()]) + .set(self.parent_requests.len() as i64); } } } diff --git a/narwhal/primary/src/metrics.rs b/narwhal/primary/src/metrics.rs index 1b4fd88503c06..d1576885c4320 100644 --- a/narwhal/primary/src/metrics.rs +++ b/narwhal/primary/src/metrics.rs @@ -1,20 +1,24 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use crate::EndpointMetrics; +use mysten_network::metrics::MetricsCallbackProvider; use prometheus::{ default_registry, register_histogram_vec_with_registry, register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, HistogramVec, IntCounterVec, IntGaugeVec, Registry, }; -use std::sync::Once; +use std::{sync::Once, time::Duration}; +use tonic::Code; #[derive(Clone)] pub(crate) struct Metrics { pub(crate) endpoint_metrics: Option, + pub(crate) primary_endpoint_metrics: Option, pub(crate) node_metrics: Option, } static mut METRICS: Metrics = Metrics { endpoint_metrics: None, + primary_endpoint_metrics: None, node_metrics: None, }; static INIT: Once = Once::new(); @@ -25,15 +29,19 @@ static INIT: Once = Once::new(); pub(crate) fn initialise_metrics(metrics_registry: &Registry) -> Metrics { unsafe { INIT.call_once(|| { - // The metrics used for the primary node endpoints + // The metrics used for the gRPC primary node endpoints we expose to the external consensus let endpoint_metrics = EndpointMetrics::new(metrics_registry); + // The metrics used for the primary-to-primary communication node endpoints + let primary_endpoint_metrics = PrimaryEndpointMetrics::new(metrics_registry); + // Essential/core metrics across the primary node let node_metrics = PrimaryMetrics::new(metrics_registry); METRICS = Metrics { node_metrics: Some(node_metrics), endpoint_metrics: Some(endpoint_metrics), + primary_endpoint_metrics: Some(primary_endpoint_metrics), } }); METRICS.clone() @@ -44,6 +52,8 @@ pub(crate) fn initialise_metrics(metrics_registry: &Registry) -> Metrics { pub struct PrimaryMetrics { /// count number of headers that the node processed (others + own) pub headers_processed: IntCounterVec, + /// count unique number of headers that we have received for processing (others + own) + pub unique_headers_received: IntCounterVec, /// count number of headers that we suspended their processing pub headers_suspended: IntCounterVec, /// count number of certificates that the node created @@ -56,12 +66,18 @@ pub struct PrimaryMetrics { pub batches_received: IntCounterVec, /// Latency to perform a garbage collection in core module pub gc_core_latency: HistogramVec, + /// Number of cancel handlers for core module + pub core_cancel_handlers_total: IntGaugeVec, /// The current Narwhal round pub current_round: IntGaugeVec, /// Latency to perform a garbage collection in header_waiter pub gc_header_waiter_latency: HistogramVec, /// Number of elements in pending list of header_waiter pub pending_elements_header_waiter: IntGaugeVec, + /// Number of parent requests list of header_waiter + pub parent_requests_header_waiter: IntGaugeVec, + /// Number of elements in pending list of certificate_waiter + pub pending_elements_certificate_waiter: IntGaugeVec, } impl PrimaryMetrics { @@ -74,6 +90,13 @@ impl PrimaryMetrics { registry ) .unwrap(), + unique_headers_received: register_int_counter_vec_with_registry!( + "unique_headers_received", + "Number of unique headers that received for processing (others + own)", + &["epoch", "source"], + registry + ) + .unwrap(), headers_suspended: register_int_counter_vec_with_registry!( "headers_suspended", "Number of headers that node suspended processing for", @@ -116,6 +139,13 @@ impl PrimaryMetrics { registry ) .unwrap(), + core_cancel_handlers_total: register_int_gauge_vec_with_registry!( + "core_cancel_handlers_total", + "Number of cancel handlers in the core module", + &["epoch"], + registry + ) + .unwrap(), current_round: register_int_gauge_vec_with_registry!( "current_round", "Current round the node is in", @@ -137,6 +167,20 @@ impl PrimaryMetrics { registry ) .unwrap(), + parent_requests_header_waiter: register_int_gauge_vec_with_registry!( + "parent_requests_header_waiter", + "Number of parent requests in header waiter", + &["epoch"], + registry + ) + .unwrap(), + pending_elements_certificate_waiter: register_int_gauge_vec_with_registry!( + "pending_elements_certificate_waiter", + "Number of pending elements in certificate waiter", + &["epoch"], + registry + ) + .unwrap(), } } } @@ -146,3 +190,56 @@ impl Default for PrimaryMetrics { Self::new(default_registry()) } } + +#[derive(Clone)] +pub struct PrimaryEndpointMetrics { + /// Counter of requests, route is a label (ie separate timeseries per route) + requests_by_route: IntCounterVec, + /// Request latency, route is a label + req_latency_by_route: HistogramVec, +} + +impl PrimaryEndpointMetrics { + pub fn new(registry: &Registry) -> Self { + Self { + requests_by_route: register_int_counter_vec_with_registry!( + "primary_requests_by_route", + "Number of requests by route", + &["route", "status", "grpc_status_code"], + registry + ) + .unwrap(), + req_latency_by_route: register_histogram_vec_with_registry!( + "primary_req_latency_by_route", + "Latency of a request by route", + &["route", "status", "grpc_status_code"], + registry + ) + .unwrap(), + } + } +} + +impl MetricsCallbackProvider for PrimaryEndpointMetrics { + fn on_request(&self, _path: String) { + // For now we just do nothing + } + + fn on_response(&self, path: String, latency: Duration, status: u16, grpc_status_code: Code) { + let code: i32 = grpc_status_code.into(); + let labels = [path.as_str(), &status.to_string(), &code.to_string()]; + + self.requests_by_route.with_label_values(&labels).inc(); + + let req_latency_secs = latency.as_secs_f64(); + self.req_latency_by_route + .with_label_values(&labels) + .observe(req_latency_secs); + } +} + +impl Default for PrimaryEndpointMetrics { + fn default() -> Self { + Self::new(default_registry()) + } +} diff --git a/narwhal/primary/src/primary.rs b/narwhal/primary/src/primary.rs index 2c35a40d9a9b0..8dbdb35b08dc5 100644 --- a/narwhal/primary/src/primary.rs +++ b/narwhal/primary/src/primary.rs @@ -10,7 +10,7 @@ use crate::{ grpc_server::ConsensusAPIGrpc, header_waiter::HeaderWaiter, helper::Helper, - metrics::{initialise_metrics, PrimaryMetrics}, + metrics::{initialise_metrics, PrimaryEndpointMetrics, PrimaryMetrics}, payload_receiver::PayloadReceiver, proposer::Proposer, state_handler::StateHandler, @@ -136,6 +136,7 @@ impl Primary { // Initialise the metrics let metrics = initialise_metrics(registry); let endpoint_metrics = metrics.endpoint_metrics.unwrap(); + let primary_endpoint_metrics = metrics.primary_endpoint_metrics.unwrap(); let node_metrics = Arc::new(metrics.node_metrics.unwrap()); // Atomic variable use to synchronize all tasks with the latest consensus round. This is only @@ -161,6 +162,7 @@ impl Primary { address.clone(), parameters.max_concurrent_requests, tx_reconfigure.subscribe(), + primary_endpoint_metrics, ); info!( "Primary {} listening to primary messages on {}", @@ -311,6 +313,7 @@ impl Primary { tx_reconfigure.subscribe(), /* rx_synchronizer */ rx_sync_certificates, /* tx_core */ tx_certificates_loopback, + node_metrics.clone(), ); // When the `Core` collects enough parent certificates, the `Proposer` generates a new header with new batch @@ -418,13 +421,14 @@ impl PrimaryReceiverHandler { address: Multiaddr, max_concurrent_requests: usize, rx_reconfigure: watch::Receiver>, + primary_endpoint_metrics: PrimaryEndpointMetrics, ) -> JoinHandle<()> { tokio::spawn(async move { let mut config = mysten_network::config::Config::new(); config.concurrency_limit_per_connection = Some(max_concurrent_requests); tokio::select! { _result = config - .server_builder() + .server_builder_with_metrics(primary_endpoint_metrics) .add_service(PrimaryToPrimaryServer::new(self)) .bind(&address) .await