Skip to content

Commit

Permalink
[testing] introduce the Cluster struct & integration test for causal …
Browse files Browse the repository at this point in the history
…completion (MystenLabs#487)
  • Loading branch information
akichidis authored Jul 14, 2022
1 parent ea68404 commit ec7592e
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 108 deletions.
3 changes: 1 addition & 2 deletions narwhal/network/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use futures::FutureExt;
use multiaddr::Multiaddr;
use rand::{prelude::SliceRandom as _, rngs::SmallRng, SeedableRng as _};
use std::collections::HashMap;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
use tokio::{runtime::Handle, task::JoinHandle};
use tonic::transport::Channel;
use types::{
BincodeEncodedPayload, PrimaryMessage, PrimaryToPrimaryClient, PrimaryToWorkerClient,
Expand Down
3 changes: 1 addition & 2 deletions narwhal/network/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use futures::FutureExt;
use multiaddr::Multiaddr;
use rand::{prelude::SliceRandom as _, rngs::SmallRng, SeedableRng as _};
use std::collections::HashMap;
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
use tokio::{runtime::Handle, task::JoinHandle};
use tonic::transport::Channel;
use types::{BincodeEncodedPayload, WorkerMessage, WorkerToWorkerClient};

Expand Down
56 changes: 56 additions & 0 deletions narwhal/node/src/execution_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use async_trait::async_trait;
use executor::{ExecutionIndices, ExecutionState, ExecutionStateError};
use thiserror::Error;

/// A simple/dumb execution engine.
pub struct SimpleExecutionState;

#[async_trait]
impl ExecutionState for SimpleExecutionState {
type Transaction = String;
type Error = SimpleExecutionError;

async fn handle_consensus_transaction(
&self,
_execution_indices: ExecutionIndices,
_transaction: Self::Transaction,
) -> Result<Vec<u8>, Self::Error> {
Ok(Vec::default())
}

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}

async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error> {
Ok(ExecutionIndices::default())
}
}

/// A simple/dumb execution error.
#[derive(Debug, Error)]
pub enum SimpleExecutionError {
#[error("Something went wrong in the authority")]
ServerError,

#[error("The client made something bad")]
ClientError,
}

#[async_trait]
impl ExecutionStateError for SimpleExecutionError {
fn node_error(&self) -> bool {
match self {
Self::ServerError => true,
Self::ClientError => false,
}
}

fn to_string(&self) -> String {
ToString::to_string(&self)
}
}
1 change: 1 addition & 0 deletions narwhal/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use types::{
};
use worker::{metrics::initialise_metrics, Worker};

pub mod execution_state;
pub mod metrics;

/// All the data stores of the node.
Expand Down
58 changes: 2 additions & 56 deletions narwhal/node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,17 @@

use anyhow::{Context, Result};
use arc_swap::ArcSwap;
use async_trait::async_trait;
use clap::{crate_name, crate_version, App, AppSettings, ArgMatches, SubCommand};
use config::{Committee, Import, Parameters, WorkerId};
use crypto::{ed25519::Ed25519KeyPair, generate_production_keypair, traits::KeyPair};
use executor::{
ExecutionIndices, ExecutionState, ExecutionStateError, SerializedTransaction, SubscriberResult,
};
use executor::{SerializedTransaction, SubscriberResult};
use futures::future::join_all;
use node::{
execution_state::SimpleExecutionState,
metrics::{primary_metrics_registry, start_prometheus_server, worker_metrics_registry},
Node, NodeStorage,
};
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::mpsc::{channel, Receiver};
use tracing::{info, subscriber::set_global_default};
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
Expand Down Expand Up @@ -214,54 +211,3 @@ async fn analyze(mut rx_output: Receiver<(SubscriberResult<Vec<u8>>, SerializedT
// NOTE: Notify the user that its transaction has been processed.
}
}

/// A simple/dumb execution engine.
struct SimpleExecutionState;

#[async_trait]
impl ExecutionState for SimpleExecutionState {
type Transaction = String;
type Error = SimpleExecutionError;

async fn handle_consensus_transaction(
&self,
_execution_indices: ExecutionIndices,
_transaction: Self::Transaction,
) -> Result<Vec<u8>, Self::Error> {
Ok(Vec::default())
}

fn ask_consensus_write_lock(&self) -> bool {
true
}

fn release_consensus_write_lock(&self) {}

async fn load_execution_indices(&self) -> Result<ExecutionIndices, Self::Error> {
Ok(ExecutionIndices::default())
}
}

/// A simple/dumb execution error.
#[derive(Debug, Error)]
pub enum SimpleExecutionError {
#[error("Something went wrong in the authority")]
ServerError,

#[error("The client made something bad")]
ClientError,
}

#[async_trait]
impl ExecutionStateError for SimpleExecutionError {
fn node_error(&self) -> bool {
match self {
Self::ServerError => true,
Self::ClientError => false,
}
}

fn to_string(&self) -> String {
ToString::to_string(&self)
}
}
5 changes: 5 additions & 0 deletions narwhal/primary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ tempfile = "3.3.0"
test_utils = { path = "../test_utils" }
tracing-test = "0.2.2"
worker = { path = "../worker" }
async-trait = "0.1.56"
executor = { path = "../executor" }
thiserror = "1.0.31"
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.14", features = ["time", "env-filter"] }

[features]
benchmark = []
8 changes: 6 additions & 2 deletions narwhal/primary/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl<PublicKey: VerifyingKey> Core<PublicKey> {
#[async_recursion]
#[instrument(level = "debug", skip_all)]
async fn process_header(&mut self, header: &Header<PublicKey>) -> DagResult<()> {
debug!("Processing {:?}", header);
debug!("Processing {:?} round:{:?}", header, header.round);
let header_source = if self.name.eq(&header.author) {
"own"
} else {
Expand Down Expand Up @@ -345,7 +345,11 @@ impl<PublicKey: VerifyingKey> Core<PublicKey> {
#[async_recursion]
#[instrument(level = "debug", skip_all)]
async fn process_certificate(&mut self, certificate: Certificate<PublicKey>) -> DagResult<()> {
debug!("Processing {:?}", certificate);
debug!(
"Processing {:?} round:{:?}",
certificate,
certificate.round()
);

// Let the proposer draw early conclusions from a certificate at this round and epoch, without its
// parents or payload (which we may not have yet).
Expand Down
34 changes: 11 additions & 23 deletions narwhal/primary/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use prometheus::{
default_registry, register_histogram_vec_with_registry, register_int_counter_vec_with_registry,
register_int_gauge_vec_with_registry, HistogramVec, IntCounterVec, IntGaugeVec, Registry,
};
use std::{sync::Once, time::Duration};
use std::time::Duration;
use tonic::Code;

#[derive(Clone)]
Expand All @@ -16,35 +16,23 @@ pub(crate) struct Metrics {
pub(crate) node_metrics: Option<PrimaryMetrics>,
}

static mut METRICS: Metrics = Metrics {
endpoint_metrics: None,
primary_endpoint_metrics: None,
node_metrics: None,
};
static INIT: Once = Once::new();

/// Initialises the metrics. Should be called only once when the primary
/// node is initialised, otherwise it will lead to erroneously creating
/// multiple registries.
pub(crate) fn initialise_metrics(metrics_registry: &Registry) -> Metrics {
unsafe {
INIT.call_once(|| {
// The metrics used for the gRPC primary node endpoints we expose to the external consensus
let endpoint_metrics = EndpointMetrics::new(metrics_registry);
// The metrics used for the gRPC primary node endpoints we expose to the external consensus
let endpoint_metrics = EndpointMetrics::new(metrics_registry);

// The metrics used for the primary-to-primary communication node endpoints
let primary_endpoint_metrics = PrimaryEndpointMetrics::new(metrics_registry);
// The metrics used for the primary-to-primary communication node endpoints
let primary_endpoint_metrics = PrimaryEndpointMetrics::new(metrics_registry);

// Essential/core metrics across the primary node
let node_metrics = PrimaryMetrics::new(metrics_registry);
// Essential/core metrics across the primary node
let node_metrics = PrimaryMetrics::new(metrics_registry);

METRICS = Metrics {
node_metrics: Some(node_metrics),
endpoint_metrics: Some(endpoint_metrics),
primary_endpoint_metrics: Some(primary_endpoint_metrics),
}
});
METRICS.clone()
Metrics {
node_metrics: Some(node_metrics),
endpoint_metrics: Some(endpoint_metrics),
primary_endpoint_metrics: Some(primary_endpoint_metrics),
}
}

Expand Down
98 changes: 98 additions & 0 deletions narwhal/primary/tests/causal_completion_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use std::time::Duration;
use test_utils::cluster::Cluster;
use tracing::{info, subscriber::set_global_default};
use tracing_subscriber::filter::{EnvFilter, LevelFilter};

#[ignore]
#[tokio::test]
async fn test_read_causal_signed_certificates() {
const CURRENT_ROUND_METRIC: &str = "narwhal_primary_current_round";

// Enabled debug tracing so we can easily observe the
// nodes logs.
setup_tracing();

let mut cluster = Cluster::new(None);

// start the cluster
let nodes = cluster.start(4).await;

// Let primaries advance little bit
tokio::time::sleep(Duration::from_secs(10)).await;

// Ensure all nodes advanced
for node in nodes {
let metric_family = node.registry.gather();

for metric in metric_family {
if metric.get_name() == CURRENT_ROUND_METRIC {
let value = metric.get_metric().first().unwrap().get_gauge().get_value();

info!("Metrics name {} -> {:?}", metric.get_name(), value);

// If the current round is increasing then it means that the
// node starts catching up and is proposing.
assert!(value > 1.0, "Node didn't progress further than the round 1");
}
}
}

// Now stop node 0
cluster.stop_node(0);

// Let other primaries advance
tokio::time::sleep(Duration::from_secs(10)).await;

// Now start the validator 0 again
let node = cluster.start_node(0).await.unwrap();

// Now check that the current round advances. Give the opportunity with a few
// iterations. If metric hasn't picked up then we know that node can't make
// progress.
let mut node_made_progress = false;
for _ in 0..10 {
tokio::time::sleep(Duration::from_secs(1)).await;

let metric_family = node.registry.gather();

for metric in metric_family {
if metric.get_name() == CURRENT_ROUND_METRIC {
let value = metric.get_metric().first().unwrap().get_gauge().get_value();

info!("Metrics name {} -> {:?}", metric.get_name(), value);

// If the current round is increasing then it means that the
// node starts catching up and is proposing.
if value > 1.0 {
node_made_progress = true;
break;
}
}
}
}

assert!(
node_made_progress,
"Node 0 didn't make progress - causal completion didn't succeed"
);
}

fn setup_tracing() {
// Setup tracing
let tracing_level = "debug";
let network_tracing_level = "info";

let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.parse(format!(
"{tracing_level},h2={network_tracing_level},tower={network_tracing_level},hyper={network_tracing_level},tonic::transport={network_tracing_level}"
)).unwrap();
let env_filter = EnvFilter::try_from_default_env().unwrap_or(filter);
let subscriber_builder =
tracing_subscriber::fmt::Subscriber::builder().with_env_filter(env_filter);

let subscriber = subscriber_builder.with_writer(std::io::stderr).finish();
set_global_default(subscriber).expect("Failed to set subscriber");
}
Loading

0 comments on commit ec7592e

Please sign in to comment.