From 3d494796402f41e5819fb62e03c43c783e1c6721 Mon Sep 17 00:00:00 2001 From: ade Date: Wed, 27 Apr 2022 02:14:42 -0400 Subject: [PATCH 01/16] Remote benchmarking --- sui/src/benchmark.rs | 11 +- sui/src/benchmark/bench_types.rs | 26 +- sui/src/benchmark/load_generator.rs | 2 - sui/src/benchmark/multi_load_generator.rs | 313 ++++++++++++++++++++++ sui/src/benchmark/transaction_creator.rs | 197 +++++++------- sui/src/benchmark/validator_preparer.rs | 72 +++-- sui/src/bin/bench.rs | 7 +- sui/src/bin/bench_configure.rs | 158 +++++++++++ sui/src/bin/remote_load_generator.rs | 183 +++++++++++++ sui/src/config.rs | 31 ++- sui/src/sui_commands.rs | 35 ++- sui_types/src/base_types.rs | 61 +++++ 12 files changed, 957 insertions(+), 139 deletions(-) create mode 100644 sui/src/benchmark/multi_load_generator.rs create mode 100644 sui/src/bin/bench_configure.rs create mode 100644 sui/src/bin/remote_load_generator.rs diff --git a/sui/src/benchmark.rs b/sui/src/benchmark.rs index bbf7789f242d2..bcc6f7b6b25e1 100644 --- a/sui/src/benchmark.rs +++ b/sui/src/benchmark.rs @@ -17,6 +17,7 @@ use tokio::runtime::{Builder, Runtime}; use tracing::{error, info}; pub mod bench_types; pub mod load_generator; +pub mod multi_load_generator; pub mod transaction_creator; pub mod validator_preparer; use crate::benchmark::bench_types::{Benchmark, BenchmarkType}; @@ -54,7 +55,7 @@ fn run_microbenchmark(benchmark: Benchmark) -> MicroBenchmarkResult { } else { num_cpus::get() }; - let validator_preparer = ValidatorPreparer::new( + let validator_preparer = ValidatorPreparer::new_for_local( benchmark.running_mode, benchmark.working_dir, benchmark.committee_size, @@ -68,7 +69,7 @@ fn run_microbenchmark(benchmark: Benchmark) -> MicroBenchmarkResult { network_server, connections, benchmark.batch_size, - benchmark.use_move, + !benchmark.use_native, num_transactions, validator_preparer, ), @@ -80,7 +81,7 @@ fn run_microbenchmark(benchmark: Benchmark) -> MicroBenchmarkResult { network_client, network_server, connections, - benchmark.use_move, + !benchmark.use_native, num_chunks, chunk_size, period_us, @@ -118,6 +119,7 @@ fn run_throughout_microbench( use_move, batch_size * connections, num_transactions / chunk_size, + None, &mut validator_preparer, ); @@ -182,12 +184,13 @@ fn run_latency_microbench( use_move, chunk_size, num_chunks, + None, &mut validator_preparer, ); // These are tracer TXes used for measuring latency let tracer_txes = - tx_cr.generate_transactions(1, use_move, 1, num_chunks, &mut validator_preparer); + tx_cr.generate_transactions(1, use_move, 1, num_chunks, None, &mut validator_preparer); validator_preparer.deploy_validator(_network_server); diff --git a/sui/src/benchmark/bench_types.rs b/sui/src/benchmark/bench_types.rs index 4d666fc213d9d..50ef8c42cc274 100644 --- a/sui/src/benchmark/bench_types.rs +++ b/sui/src/benchmark/bench_types.rs @@ -1,12 +1,18 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::config::Config; + use super::load_generator::calculate_throughput; use clap::*; +use serde::{Deserialize, Serialize}; +use serde_with::serde_as; use std::default::Default; use std::path::PathBuf; use strum_macros::EnumString; use sui_network::transport; +use sui_types::base_types::ObjectID; +use sui_types::crypto::KeyPair; #[derive(Debug, Clone, Parser)] #[clap( @@ -34,7 +40,7 @@ pub struct Benchmark { pub db_cpus: usize, /// Use Move orders #[clap(long, global = true)] - pub use_move: bool, + pub use_native: bool, #[clap(long, default_value = "2000", global = true)] pub batch_size: usize, @@ -74,8 +80,9 @@ pub enum BenchmarkType { #[derive(Debug, Parser, Clone, Copy, ArgEnum, EnumString)] #[clap(rename_all = "kebab-case")] pub enum RunningMode { - LocalSingleValidatorThread, - LocalSingleValidatorProcess, + SingleValidatorThread, + SingleValidatorProcess, + RemoteValidator, } #[derive(Debug, Clone, Parser, Eq, PartialEq, EnumString)] @@ -172,3 +179,16 @@ impl std::fmt::Display for MicroBenchmarkResult { } } } + +#[serde_as] +#[derive(Serialize, Deserialize)] +pub struct RemoteLoadGenConfig { + /// Account keypair to use for transactions + pub account_keypair: KeyPair, + /// ObjectID offset for transaction objects + pub object_id_offset: ObjectID, + /// Network config path which point to validators + pub network_cfg_path: PathBuf, +} + +impl Config for RemoteLoadGenConfig {} diff --git a/sui/src/benchmark/load_generator.rs b/sui/src/benchmark/load_generator.rs index 17a60dd8d6fdd..93e5825a9a5e3 100644 --- a/sui/src/benchmark/load_generator.rs +++ b/sui/src/benchmark/load_generator.rs @@ -94,8 +94,6 @@ pub struct FixedRateLoadGenerator { pub chunk_size_per_task: usize, } -// new -> ready -> start - impl FixedRateLoadGenerator { pub async fn new( transactions: Vec, diff --git a/sui/src/benchmark/multi_load_generator.rs b/sui/src/benchmark/multi_load_generator.rs new file mode 100644 index 0000000000000..1166c960f2907 --- /dev/null +++ b/sui/src/benchmark/multi_load_generator.rs @@ -0,0 +1,313 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +#![deny(warnings)] + +use anyhow::Error; +use bytes::{Bytes, BytesMut}; +use futures::channel::mpsc::{channel as MpscChannel, Receiver, Sender as MpscSender}; +use futures::stream::StreamExt; +use futures::SinkExt; + +use rayon::prelude::*; +use std::io; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use sui_network::network::{NetworkClient}; +use sui_types::committee::Committee; +use sui_types::{messages::*, serialize::*}; +use tokio::sync::Notify; +use tokio::time; +use tracing::{error, info}; + +use crate::config::{ NetworkConfig}; + + +pub fn check_transaction_response(reply_message: Result) { + match reply_message { + Ok(SerializedMessage::TransactionResp(res)) => { + if let Some(e) = res.signed_effects { + if matches!(e.effects.status, ExecutionStatus::Failure { .. }) { + info!("Execution Error {:?}", e.effects.status); + } + } + } + Err(err) => { + error!("Received Error {:?}", err); + } + Ok(q) => error!("Received invalid response {:?}", q), + }; +} + +pub async fn send_tx_chunks( + tx_chunks: Vec, + net_client: NetworkClient, + conn: usize, +) -> (u128, Vec>) { + let time_start = Instant::now(); + + let tx_resp = net_client + .batch_send(tx_chunks, conn, 0) + .map(|x| x.unwrap()) + .concat() + .await; + + let elapsed = time_start.elapsed().as_micros(); + + (elapsed, tx_resp) +} + +async fn send_tx_chunks_notif( + notif: Arc, + tx_chunk: Vec, + result_chann_tx: &mut MpscSender<(u128, usize)>, + net_client: NetworkClient, + stake: usize, + conn: usize, +) { + notif.notified().await; + let r = send_tx_chunks(tx_chunk, net_client, conn).await; + result_chann_tx.send((r.0, stake)).await.unwrap(); + + let _: Vec<_> = + r.1.par_iter() + .map(|q| check_transaction_response(deserialize_message(&(q.as_ref().unwrap())[..]))) + .collect(); +} + +async fn send_tx_for_quorum( + notif: Arc, + order_chunk: Vec, + conf_chunk: Vec, + + result_chann_tx: &mut MpscSender, + net_clients: Vec<(NetworkClient, usize)>, + conn: usize, + quorum_threshold: usize, +) { + // For receiving info back from the subtasks + let (order_chann_tx, mut order_chann_rx) = MpscChannel(net_clients.len() * 2); + + // Send intent orders to 3f+1 + let order_start_notifier = Arc::new(Notify::new()); + for (net_client, stake) in net_clients.clone() { + // This is for sending a start signal to the subtasks + let notif = order_start_notifier.clone(); + // This is for getting the elapsed time + let mut ch_tx = order_chann_tx.clone(); + // Chunk to send for order_ + let chunk = order_chunk.clone(); + + tokio::spawn(async move { + send_tx_chunks_notif(notif, chunk, &mut ch_tx, net_client.clone(), stake, conn).await; + println!("Spawn for order {:?} with stake {}", net_client, stake); + }); + } + drop(order_chann_tx); + + // Wait for timer tick + notif.notified().await; + // Notify all the subtasks + order_start_notifier.notify_waiters(); + // Start timer + let time_start = Instant::now(); + + // Wait for 2f+1 by stake + let mut total = 0; + + while let Some(v) = time::timeout(Duration::from_secs(10), order_chann_rx.next()) + .await + .unwrap() + { + total += v.1; + if total >= quorum_threshold { + break; + } + } + if total < quorum_threshold { + panic!("Quorum threshold not reached for orders") + } + + println!("order {}", total); + // Confirmation step + let (conf_chann_tx, mut conf_chann_rx) = MpscChannel(net_clients.len() * 2); + + // Send the confs + let mut handles = vec![]; + for (net_client, stake) in net_clients { + let chunk = conf_chunk.clone(); + let mut chann_tx = conf_chann_tx.clone(); + handles.push(tokio::spawn(async move { + let r = send_tx_chunks(chunk, net_client.clone(), conn).await; + println!("Spawn for conf {:?} with stake {}", net_client, stake); + match chann_tx.send((r.0, stake)).await { + Ok(_) => (), + Err(e) => { + if !e.is_disconnected() { + panic!("Send failed! {:?}", net_client) + } + } + } + + let _: Vec<_> = + r.1.par_iter() + .map(|q| { + check_transaction_response(deserialize_message(&(q.as_ref().unwrap())[..])) + }) + .collect(); + })); + } + drop(conf_chann_tx); + + // Reset counter + total = 0; + while let Some(v) = time::timeout(Duration::from_secs(10), conf_chann_rx.next()) + .await + .unwrap() + { + total += v.1; + if total >= quorum_threshold { + break; + } + } + if total < quorum_threshold { + panic!("Quorum threshold not reached for confirmation") + } + + println!("conf {}", total); + + let elapsed = time_start.elapsed().as_micros(); + + // Send the total time over + result_chann_tx.send(elapsed).await.unwrap(); +} + +pub struct MultiFixedRateLoadGenerator { + /// The time between sending transactions chunks + /// Anything below 10ms causes degradation in resolution + pub period_us: u64, + + //pub network_config: Vec, + + pub tick_notifier: Arc, + + /// Number of TCP connections to open + pub connections: usize, + + pub transactions: Vec, + + pub results_chann_rx: Receiver, + + /// This is the chunk size actually assigned for each tick per task + /// It is 2*chunk_size due to order and confirmation steps + pub chunk_size_per_task: usize, +} + +impl MultiFixedRateLoadGenerator { + pub async fn new( + transactions: Vec, + period_us: u64, + connections: usize, + + network_cfg: &NetworkConfig, + recv_timeout: Duration, + send_timeout: Duration, + ) -> Self { + let network_clients_stake: Vec<(NetworkClient, usize)> = network_cfg + .authorities + .iter() + .map(|q| { + ( + NetworkClient::new( + q.host.clone(), + q.port, + network_cfg.buffer_size, + send_timeout, + recv_timeout, + ), + q.stake, + ) + }) + .collect(); + let committee_quorum_threshold = Committee::from(network_cfg).quorum_threshold(); + let mut handles = vec![]; + let tick_notifier = Arc::new(Notify::new()); + + let (result_chann_tx, results_chann_rx) = MpscChannel(transactions.len() * 2); + + let conn = connections; + // Spin up a bunch of worker tasks + // Give each task + // Step by 2*conn due to order+confirmation, with `conn` tcp connections + // Take up to 2*conn for each task + let num_chunks_per_task = conn * 2; + for tx_chunk in transactions[..].chunks(num_chunks_per_task) { + let notif = tick_notifier.clone(); + let mut result_chann_tx = result_chann_tx.clone(); + let tx_chunk = tx_chunk.to_vec(); + let clients = network_clients_stake.clone(); + + let mut order_chunk = vec![]; + let mut conf_chunk = vec![]; + + for ch in tx_chunk[..].chunks(2) { + order_chunk.push(ch[0].clone()); + conf_chunk.push(ch[1].clone()); + } + + handles.push(tokio::spawn(async move { + send_tx_for_quorum( + notif, + order_chunk, + conf_chunk, + &mut result_chann_tx, + clients, + conn, + committee_quorum_threshold, + ) + .await; + })); + } + + drop(result_chann_tx); + + Self { + period_us, + transactions, + connections, + results_chann_rx, + tick_notifier, + chunk_size_per_task: num_chunks_per_task, + //network_config: network_cfg.authorities, + } + } + + pub async fn start(&mut self) -> Vec { + let mut interval = time::interval(Duration::from_micros(self.period_us)); + let mut count = 0; + loop { + tokio::select! { + _ = interval.tick() => { + self.tick_notifier.notify_one(); + count += self.chunk_size_per_task; + if count >= self.transactions.len() { + break; + } + } + } + } + let mut times = Vec::new(); + while let Some(v) = time::timeout(Duration::from_secs(10), self.results_chann_rx.next()) + .await + .unwrap_or(None) + { + times.push(v); + } + + times + } +} + +pub fn calculate_throughput(num_items: usize, elapsed_time_us: u128) -> f64 { + 1_000_000.0 * num_items as f64 / elapsed_time_us as f64 +} diff --git a/sui/src/benchmark/transaction_creator.rs b/sui/src/benchmark/transaction_creator.rs index e213715eef17e..99a2b168dbab2 100644 --- a/sui/src/benchmark/transaction_creator.rs +++ b/sui/src/benchmark/transaction_creator.rs @@ -12,7 +12,8 @@ use sui_types::crypto::{get_key_pair, AuthoritySignature, KeyPair, PublicKeyByte use sui_types::SUI_FRAMEWORK_ADDRESS; use sui_types::{base_types::*, committee::*, messages::*, object::Object, serialize::*}; -const OBJECT_ID_OFFSET: usize = 10000; +const OBJECT_ID_OFFSET: &str = "0x10000"; +const GAS_PER_TX: u64 = 10000000; /// Create a transaction for object transfer /// This can either use the Move path or the native path @@ -47,17 +48,13 @@ fn make_transfer_transaction( } /// Creates an object for use in the microbench -fn create_object(object_id: ObjectID, owner: SuiAddress, use_move: bool) -> Object { - if use_move { - Object::with_id_owner_gas_coin_object_for_testing( - object_id, - SequenceNumber::new(), - owner, - 1, - ) - } else { - Object::with_id_owner_for_testing(object_id, owner) - } +fn create_gas_object(object_id: ObjectID, owner: SuiAddress) -> Object { + Object::with_id_owner_gas_coin_object_for_testing( + object_id, + SequenceNumber::new(), + owner, + GAS_PER_TX, + ) } /// This builds, signs a cert and serializes it @@ -80,39 +77,6 @@ fn make_serialized_cert( serialized_certificate } -fn make_gas_objects( - address: SuiAddress, - tx_count: usize, - batch_size: usize, - obj_id_offset: usize, - use_move: bool, -) -> Vec<(Vec, Object)> { - (0..tx_count) - .into_par_iter() - .map(|x| { - let mut objects = vec![]; - for i in 0..batch_size { - let mut obj_id = [0; 20]; - obj_id[..8] - .clone_from_slice(&(obj_id_offset + x * batch_size + i).to_be_bytes()[..8]); - objects.push(create_object(ObjectID::from(obj_id), address, use_move)); - } - - let mut gas_object_id = [0; 20]; - gas_object_id[8..16].clone_from_slice(&(obj_id_offset + x).to_be_bytes()[..8]); - let gas_object = Object::with_id_owner_gas_coin_object_for_testing( - ObjectID::from(gas_object_id), - SequenceNumber::new(), - address, - 2000000, - ); - assert!(gas_object.version() == SequenceNumber::from(0)); - - (objects, gas_object) - }) - .collect() -} - fn make_serialized_transactions( address: SuiAddress, keypair: KeyPair, @@ -171,52 +135,8 @@ fn make_serialized_transactions( .collect() } -fn make_transactions( - address: SuiAddress, - key_pair: KeyPair, - chunk_size: usize, - num_chunks: usize, - conn: usize, - use_move: bool, - object_id_offset: usize, - auth_keys: &[(PublicKeyBytes, KeyPair)], - committee: &Committee, -) -> (Vec, Vec) { - assert_eq!(chunk_size % conn, 0); - let batch_size_per_conn = chunk_size / conn; - - // The batch-adjusted number of transactions - let batch_tx_count = num_chunks * conn; - // Only need one gas object per batch - let account_gas_objects: Vec<_> = make_gas_objects( - address, - batch_tx_count, - batch_size_per_conn, - object_id_offset, - use_move, - ); - - // Bulk load objects - let all_objects: Vec<_> = account_gas_objects - .clone() - .into_iter() - .flat_map(|(objects, gas)| objects.into_iter().chain(std::iter::once(gas))) - .collect(); - - let serialized_txes = make_serialized_transactions( - address, - key_pair, - committee, - &account_gas_objects, - auth_keys, - batch_size_per_conn, - use_move, - ); - (serialized_txes, all_objects) -} - pub struct TransactionCreator { - pub object_id_offset: usize, + pub object_id_offset: ObjectID, } impl Default for TransactionCreator { @@ -228,9 +148,12 @@ impl Default for TransactionCreator { impl TransactionCreator { pub fn new() -> Self { Self { - object_id_offset: OBJECT_ID_OFFSET, + object_id_offset: ObjectID::from_hex_literal(OBJECT_ID_OFFSET).unwrap(), } } + pub fn new_with_offset(object_id_offset: ObjectID) -> Self { + Self { object_id_offset } + } pub fn generate_transactions( &mut self, @@ -238,10 +161,15 @@ impl TransactionCreator { use_move: bool, chunk_size: usize, num_chunks: usize, + sender: Option<&KeyPair>, validator_preparer: &mut ValidatorPreparer, ) -> Vec { - let (address, keypair) = get_key_pair(); - let (signed_txns, txn_objects) = make_transactions( + let (address, keypair) = if let Some(a) = sender { + (SuiAddress::from(a.public_key_bytes()), a.copy()) + } else { + get_key_pair() + }; + let (signed_txns, txn_objects) = self.make_transactions( address, keypair, chunk_size, @@ -253,10 +181,89 @@ impl TransactionCreator { &validator_preparer.committee, ); - self.object_id_offset += chunk_size * num_chunks; - validator_preparer.update_objects_for_validator(txn_objects, address); signed_txns } + + fn make_gas_objects( + &mut self, + address: SuiAddress, + tx_count: usize, + batch_size: usize, + obj_id_offset: ObjectID, + ) -> Vec<(Vec, Object)> { + let total_count = tx_count * batch_size; + let mut objects = vec![]; + let mut gas_objects = vec![]; + // Objects to be transferred + ObjectID::in_range(obj_id_offset, total_count as u64) + .unwrap() + .iter() + .for_each(|q| objects.push(create_gas_object(*q, address))); + + // Objects for payment + let next_offset = objects[objects.len() - 1].id(); + + ObjectID::in_range(next_offset.next_increment().unwrap(), tx_count as u64) + .unwrap() + .iter() + .for_each(|q| gas_objects.push(create_gas_object(*q, address))); + + self.object_id_offset = gas_objects[gas_objects.len() - 1] + .id() + .next_increment() + .unwrap(); + + objects[..] + .chunks(batch_size) + .into_iter() + .map(|q| q.to_vec()) + .zip(gas_objects.into_iter()) + .collect::>() + } + + fn make_transactions( + &mut self, + address: SuiAddress, + key_pair: KeyPair, + chunk_size: usize, + num_chunks: usize, + conn: usize, + use_move: bool, + object_id_offset: ObjectID, + auth_keys: &[(PublicKeyBytes, KeyPair)], + committee: &Committee, + ) -> (Vec, Vec) { + assert_eq!(chunk_size % conn, 0); + let batch_size_per_conn = chunk_size / conn; + + // The batch-adjusted number of transactions + let batch_tx_count = num_chunks * conn; + // Only need one gas object per batch + let account_gas_objects: Vec<_> = self.make_gas_objects( + address, + batch_tx_count, + batch_size_per_conn, + object_id_offset, + ); + + // Bulk load objects + let all_objects: Vec<_> = account_gas_objects + .clone() + .into_iter() + .flat_map(|(objects, gas)| objects.into_iter().chain(std::iter::once(gas))) + .collect(); + + let serialized_txes = make_serialized_transactions( + address, + key_pair, + committee, + &account_gas_objects, + auth_keys, + batch_size_per_conn, + use_move, + ); + (serialized_txes, all_objects) + } } diff --git a/sui/src/benchmark/validator_preparer.rs b/sui/src/benchmark/validator_preparer.rs index acea0bb8956b5..183ed62360395 100644 --- a/sui/src/benchmark/validator_preparer.rs +++ b/sui/src/benchmark/validator_preparer.rs @@ -5,7 +5,7 @@ use crate::benchmark::bench_types::RunningMode; use crate::benchmark::load_generator::spawn_authority_server; -use crate::config::{AccountConfig, ObjectConfig}; +use crate::config::{AccountConfig, NetworkConfig, ObjectConfig}; use crate::config::{Config, GenesisConfig}; use rocksdb::Options; use std::env; @@ -64,10 +64,27 @@ pub enum ValidatorConfig { working_dir: PathBuf, validator_process: Option, }, + RemoteValidatorConfig, } impl ValidatorPreparer { - pub fn new( + pub fn new_for_remote(network_config: &NetworkConfig) -> Self { + let keys = network_config + .authorities + .iter() + .map(|q| (*q.key_pair.public_key_bytes(), q.key_pair.copy())) + .collect::>(); + let committee = Committee::from(network_config); + + Self { + running_mode: RunningMode::RemoteValidator, + keys, + main_authority_address_hex: "".to_string(), + committee, + validator_config: ValidatorConfig::RemoteValidatorConfig, + } + } + pub fn new_for_local( running_mode: RunningMode, working_dir: Option, committee_size: usize, @@ -91,7 +108,7 @@ impl ValidatorPreparer { let committee = Committee::new(0, keys.iter().map(|(k, _)| (*k, 1)).collect()); match running_mode { - RunningMode::LocalSingleValidatorProcess => { + RunningMode::SingleValidatorProcess => { // Honor benchmark's host:port setting genesis_config.authorities[0].port = validator_port; genesis_config.authorities[0].host = validator_host.into(); @@ -108,7 +125,7 @@ impl ValidatorPreparer { } } - RunningMode::LocalSingleValidatorThread => { + RunningMode::SingleValidatorThread => { // Pick the first validator and create state. let public_auth0 = keys[0].0; let secret_auth0 = keys[0].1.copy(); @@ -134,12 +151,13 @@ impl ValidatorPreparer { }, } } + RunningMode::RemoteValidator => panic!("Use new_for_remote"), } } pub fn deploy_validator(&mut self, _network_server: NetworkServer) { match self.running_mode { - RunningMode::LocalSingleValidatorProcess => { + RunningMode::SingleValidatorProcess => { if let ValidatorConfig::LocalSingleValidatorProcessConfig { working_dir, genesis_config, @@ -166,7 +184,7 @@ impl ValidatorPreparer { panic!("Invalid validator config in local-single-validator-process mode"); } } - RunningMode::LocalSingleValidatorThread => { + RunningMode::SingleValidatorThread => { if let ValidatorConfig::LocalSingleValidatorThreadConfig { authority_state, authority_store: _, @@ -187,6 +205,7 @@ impl ValidatorPreparer { panic!("Invalid validator config in local-single-validator-thread mode"); } } + RunningMode::RemoteValidator => (), } // Wait for server start sleep(Duration::from_secs(3)); @@ -194,7 +213,7 @@ impl ValidatorPreparer { pub fn update_objects_for_validator(&mut self, objects: Vec, address: SuiAddress) { match self.running_mode { - RunningMode::LocalSingleValidatorProcess => { + RunningMode::SingleValidatorProcess => { let all_objects: Vec = objects .iter() .map(|object| ObjectConfig { @@ -215,12 +234,13 @@ impl ValidatorPreparer { .push(AccountConfig { address: Some(address), gas_objects: all_objects, + gas_object_ranges: None, }) } else { panic!("invalid validator config in local-single-validator-process mode"); } } - RunningMode::LocalSingleValidatorThread => { + RunningMode::SingleValidatorThread => { if let ValidatorConfig::LocalSingleValidatorThreadConfig { authority_state: _, authority_store, @@ -233,29 +253,29 @@ impl ValidatorPreparer { panic!("invalid validator config in local-single-validator-thread mode"); } } + + // Nothing to do here. Remote machine must be provisioned separately + RunningMode::RemoteValidator => (), } } pub fn clean_up(&mut self) { - match self.running_mode { - RunningMode::LocalSingleValidatorProcess => { - info!("Cleaning up local validator process..."); - if let ValidatorConfig::LocalSingleValidatorProcessConfig { - working_dir: _, - genesis_config: _, - validator_process, - } = &mut self.validator_config - { - validator_process - .take() - .unwrap() - .kill() - .expect("Failed to kill validator process"); - } else { - panic!("invalid validator config in local-single-validator-process mode"); - } + if let RunningMode::SingleValidatorProcess = self.running_mode { + info!("Cleaning up local validator process..."); + if let ValidatorConfig::LocalSingleValidatorProcessConfig { + working_dir: _, + genesis_config: _, + validator_process, + } = &mut self.validator_config + { + validator_process + .take() + .unwrap() + .kill() + .expect("Failed to kill validator process"); + } else { + panic!("invalid validator config in local-single-validator-process mode"); } - RunningMode::LocalSingleValidatorThread => {} } } } diff --git a/sui/src/bin/bench.rs b/sui/src/bin/bench.rs index 38cb6be743dcb..021f103fbc3ac 100644 --- a/sui/src/bin/bench.rs +++ b/sui/src/bin/bench.rs @@ -42,8 +42,8 @@ fn main() { fn running_mode_pre_check(benchmark: &bench_types::Benchmark) { match benchmark.running_mode { - bench_types::RunningMode::LocalSingleValidatorThread => {} - bench_types::RunningMode::LocalSingleValidatorProcess => match &benchmark.working_dir { + bench_types::RunningMode::SingleValidatorThread => {} + bench_types::RunningMode::SingleValidatorProcess => match &benchmark.working_dir { Some(path) => { assert!( path.clone().join(VALIDATOR_BINARY_NAME).is_file(), @@ -52,5 +52,8 @@ fn running_mode_pre_check(benchmark: &bench_types::Benchmark) { } None => panic!("working-dir option is required in local-single-authority-process mode"), }, + bench_types::RunningMode::RemoteValidator => { + unimplemented!("Remote benchmarks not supported through this entrypoint") + } } } diff --git a/sui/src/bin/bench_configure.rs b/sui/src/bin/bench_configure.rs new file mode 100644 index 0000000000000..3d3e61e59cc81 --- /dev/null +++ b/sui/src/bin/bench_configure.rs @@ -0,0 +1,158 @@ +#![deny(warnings)] + +use clap::*; +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::Path, +}; +use sui::{ + benchmark::bench_types::RemoteLoadGenConfig, + config::{ + AccountConfig, AuthorityPrivateInfo, Config, GenesisConfig, NetworkConfig, + ObjectConfigRange, + }, +}; +use sui_types::{base_types::ObjectID, crypto::get_key_pair}; + +const BUFFER_SIZE: usize = 650000; + +#[derive(Debug, Parser)] +#[clap( + name = "Sui Distributed Benchmark Config Creator", + about = "Creates the config files for distributed benchmarking" +)] +pub struct DistributedBenchmarkConfigurator { + /// List of space separated strings of the form: host:port:stake, example 127.0.0.1:8080:5 + #[clap(long, multiple_values = true)] + pub host_port_stake_triplets: Vec, + #[clap(long, default_value = "0x0000000000000000000000000010000000000000")] + pub object_id_offset: ObjectID, + #[clap(long, default_value = "4")] + pub number_of_generators: usize, + #[clap(long, default_value = "1000000")] + pub number_of_txes_per_generator: usize, +} +fn main() { + let bch = DistributedBenchmarkConfigurator::parse(); + + let mut authorities = vec![]; + let mut authorities_copy = vec![]; + + for b in bch.host_port_stake_triplets { + let (host, port, stake) = parse_host_port_stake_triplet(&b); + let (addr, kp) = get_key_pair(); + let db_path = format!("DB_{}", addr); + let path = Path::new(&db_path); + + let host_bytes: Vec = host + .split('.') + .into_iter() + .map(|q| q.parse::().unwrap()) + .collect(); + let consensus_address = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new( + host_bytes[0], + host_bytes[1], + host_bytes[2], + host_bytes[3], + )), + port + 1, + ); + + let auth = AuthorityPrivateInfo { + address: addr, + key_pair: kp, + host, + port, + db_path: path.to_path_buf(), + stake, + consensus_address, + }; + + authorities.push(auth.copy()); + authorities_copy.push(auth); + } + + // For each load generator, create an address as the source of transfers + let mut accounts = vec![]; + let mut obj_id_offset = bch.object_id_offset; + let mut account_private_info = vec![]; + for _ in 0..bch.number_of_generators { + let (address, kp) = get_key_pair(); + + let range_cfg = ObjectConfigRange { + offset: obj_id_offset, + count: bch.number_of_txes_per_generator as u64, + gas_value: u64::MAX, + }; + + account_private_info.push((kp, obj_id_offset)); + + // Ensure no overlap + obj_id_offset = obj_id_offset + .advance(bch.number_of_txes_per_generator) + .unwrap(); + + let account = AccountConfig { + address: Some(address), + gas_objects: vec![], + gas_object_ranges: Some(vec![range_cfg]), + }; + accounts.push(account); + } + + // Create and save the genesis configs for the validators + let genesis_config = GenesisConfig { + authorities, + accounts, + move_packages: vec![], + sui_framework_lib_path: Path::new("../../sui_programmability/framework").to_path_buf(), + move_framework_lib_path: Path::new("../../sui_programmability/framework/deps/move-stdlib") + .to_path_buf(), + }; + let genesis_path = Path::new("distributed_bench_genesis.conf"); + genesis_config.persisted(genesis_path).save().unwrap(); + + let network_config = NetworkConfig { + epoch: 0, + authorities: authorities_copy, + buffer_size: BUFFER_SIZE, + loaded_move_packages: vec![], + }; + let network_path = Path::new("distributed_bench_network.conf"); + network_config.persisted(network_path).save().unwrap(); + + // Generate the configs for each load generator + for (i, (kp, offset)) in account_private_info.into_iter().enumerate() { + let lg = RemoteLoadGenConfig { + account_keypair: kp, + object_id_offset: offset, + network_cfg_path: network_path.to_path_buf(), + }; + let path_str = format!("load_gen_{}.conf", i); + let load_gen_path = Path::new(&path_str); + + lg.persisted(load_gen_path).save().unwrap(); + } +} + +fn parse_host_port_stake_triplet(s: &str) -> (String, u16, usize) { + let tokens: Vec = s.split(':').into_iter().map(|t| t.to_owned()).collect(); + assert_eq!(tokens.len(), 3); + + let host = tokens[0].clone(); + + #[allow(clippy::needless_collect)] + let host_bytes = host + .split('.') + .into_iter() + .map(|q| q.parse::().unwrap()) + .collect::>(); + assert_eq!(host_bytes.len(), 4); + + ( + host, + tokens[1].parse::().unwrap(), + tokens[2].parse::().unwrap(), + ) +} diff --git a/sui/src/bin/remote_load_generator.rs b/sui/src/bin/remote_load_generator.rs new file mode 100644 index 0000000000000..82e3d70b631da --- /dev/null +++ b/sui/src/bin/remote_load_generator.rs @@ -0,0 +1,183 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +use clap::*; +use futures::join; +use sui::benchmark::bench_types::{MicroBenchmarkResult, RemoteLoadGenConfig}; + +use std::panic; +use std::path::PathBuf; +use std::time::Duration; +use sui::benchmark::multi_load_generator::MultiFixedRateLoadGenerator; +use sui::benchmark::transaction_creator::TransactionCreator; +use sui::benchmark::validator_preparer::ValidatorPreparer; +use sui::config::{NetworkConfig, PersistedConfig}; +use sui_types::base_types::ObjectID; +use sui_types::crypto::KeyPair; +use tokio::runtime::Builder; + +#[derive(Debug, Parser)] +#[clap( + name = "Sui Distributed Benchmark", + about = "Benchmark of the Sui authorities on remote machines" +)] +pub struct DistributedBenchmark { + /// Timeout for sending queries (us) + #[clap(long, default_value = "40000000", global = true)] + pub send_timeout_us: u64, + /// Timeout for receiving responses (us) + #[clap(long, default_value = "40000000", global = true)] + pub recv_timeout_us: u64, + + /// Number of connections to the server + #[clap(long, default_value = "0", global = true)] + pub tcp_connections: usize, + /// Number of database cpus + #[clap(long, default_value = "1", global = true)] + pub db_cpus: usize, + + /// Use Move orders + #[clap(long, global = true)] + pub use_native: bool, + /// Number of chunks to send + #[clap(long, default_value = "100")] + pub num_chunks: usize, + /// Size of chunks per tick + #[clap(long, default_value = "1000")] + pub chunk_size: usize, + /// The time between each tick. Default 10ms + #[clap(long, default_value = "10000")] + pub period_us: u64, + + /// Config file for remote validators + #[clap(long)] + pub remote_config: PathBuf, +} + +pub fn main() { + let benchmark = DistributedBenchmark::parse(); + + let remote_config: RemoteLoadGenConfig = + PersistedConfig::read(&benchmark.remote_config).unwrap(); + + let network_config: NetworkConfig = + PersistedConfig::read(&remote_config.network_cfg_path).unwrap(); + + let validator_preparer = ValidatorPreparer::new_for_remote(&network_config); + let connections = if benchmark.tcp_connections > 0 { + benchmark.tcp_connections + } else { + num_cpus::get() + }; + let g = run_latency_microbench( + connections, + !benchmark.use_native, + benchmark.num_chunks, + benchmark.chunk_size, + benchmark.period_us, + remote_config.object_id_offset, + &remote_config.account_keypair, + network_config, + Duration::from_micros(benchmark.send_timeout_us), + Duration::from_micros(benchmark.recv_timeout_us), + validator_preparer, + ); + println!("{:?}", g); +} + +fn run_latency_microbench( + connections: usize, + use_move: bool, + num_chunks: usize, + chunk_size: usize, + period_us: u64, + + object_id_offset: ObjectID, + sender: &KeyPair, + + network_config: NetworkConfig, + + send_timeout: Duration, + recv_timeout: Duration, + + mut validator_preparer: ValidatorPreparer, +) -> MicroBenchmarkResult { + // In order to simplify things, we send chunks on each connection and try to ensure all connections have equal load + assert!( + (num_chunks * chunk_size % connections) == 0, + "num_transactions must {} be multiple of number of TCP connections {}", + num_chunks * chunk_size, + connections + ); + + // This ensures that the load generator is run at a specific object ID offset which the validators must have provisioned. + let mut tx_cr = TransactionCreator::new_with_offset(object_id_offset); + + // These TXes are to load the network + let load_gen_txes = tx_cr.generate_transactions( + connections, + use_move, + chunk_size, + num_chunks, + Some(sender), + &mut validator_preparer, + ); + + // These are probe TXes used for measuring latency + let probe_txes = tx_cr.generate_transactions( + 1, + use_move, + 1, + num_chunks, + Some(sender), + &mut validator_preparer, + ); + + let result = panic::catch_unwind(|| { + let runtime = Builder::new_multi_thread() + .enable_all() + .thread_stack_size(32 * 1024 * 1024) + .worker_threads(usize::min(num_cpus::get(), 24)) + .build() + .unwrap(); + // Prep the generators + let (mut load_gen, mut probe_gen) = runtime.block_on(async move { + join!( + MultiFixedRateLoadGenerator::new( + load_gen_txes, + period_us, + connections, + &network_config, + recv_timeout, + send_timeout, + ), + MultiFixedRateLoadGenerator::new( + probe_txes, + period_us, + 1, + &network_config, + recv_timeout, + send_timeout, + ), + ) + }); + + // Run the load gen and probes + let (load_latencies, probe_latencies) = + runtime.block_on(async move { join!(load_gen.start(), probe_gen.start()) }); + + (load_latencies, probe_latencies) + }); + + match result { + Ok((load_latencies, probe_latencies)) => MicroBenchmarkResult::Latency { + load_chunk_size: chunk_size, + load_latencies, + tick_period_us: period_us as usize, + chunk_latencies: probe_latencies, + }, + Err(err) => { + panic::resume_unwind(err); + } + } +} diff --git a/sui/src/config.rs b/sui/src/config.rs index 42c1cc931ac89..57c6cc5c02ad8 100644 --- a/sui/src/config.rs +++ b/sui/src/config.rs @@ -56,6 +56,20 @@ pub struct AuthorityPrivateInfo { pub consensus_address: SocketAddr, } +impl AuthorityPrivateInfo { + pub fn copy(&self) -> Self { + Self { + key_pair: self.key_pair.copy(), + address: self.address, + host: self.host.clone(), + port: self.port, + db_path: self.db_path.clone(), + stake: self.stake, + consensus_address: self.consensus_address, + } + } +} + // Custom deserializer with optional default fields impl<'de> Deserialize<'de> for AuthorityPrivateInfo { fn deserialize(deserializer: D) -> Result @@ -231,7 +245,7 @@ pub struct GenesisConfig { impl Config for GenesisConfig {} -#[derive(Serialize, Deserialize, Default)] +#[derive(Serialize, Deserialize, Default, Debug)] #[serde(default)] pub struct AccountConfig { #[serde( @@ -241,8 +255,20 @@ pub struct AccountConfig { )] pub address: Option, pub gas_objects: Vec, + pub gas_object_ranges: Option>, } -#[derive(Serialize, Deserialize)] + +#[derive(Serialize, Deserialize, Debug)] +pub struct ObjectConfigRange { + /// Starting object id + pub offset: ObjectID, + /// Number of object ids + pub count: u64, + /// Gas value per object id + pub gas_value: u64, +} + +#[derive(Serialize, Deserialize, Debug)] pub struct ObjectConfig { #[serde(default = "ObjectID::random")] pub object_id: ObjectID, @@ -295,6 +321,7 @@ impl GenesisConfig { accounts.push(AccountConfig { address: None, gas_objects: objects, + gas_object_ranges: Some(Vec::new()), }) } Ok(Self { diff --git a/sui/src/sui_commands.rs b/sui/src/sui_commands.rs index b3c4e085739a7..8ac4c2aab1ff6 100644 --- a/sui/src/sui_commands.rs +++ b/sui/src/sui_commands.rs @@ -15,7 +15,7 @@ use move_binary_format::CompiledModule; use move_package::BuildConfig; use narwhal_config::{Committee as ConsensusCommittee, Parameters as ConsensusParameters}; use narwhal_crypto::ed25519::Ed25519PublicKey; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, BTreeSet}; use std::fs; use std::path::Path; use std::path::PathBuf; @@ -27,7 +27,7 @@ use sui_core::authority_server::AuthorityServer; use sui_core::consensus_adapter::ConsensusListener; use sui_network::transport::SpawnedServer; use sui_network::transport::DEFAULT_MAX_DATAGRAM_SIZE; -use sui_types::base_types::decode_bytes_hex; +use sui_types::base_types::{decode_bytes_hex, ObjectID}; use sui_types::base_types::encode_bytes_hex; use sui_types::base_types::{SequenceNumber, SuiAddress, TxContext}; use sui_types::committee::Committee; @@ -356,6 +356,7 @@ pub async fn genesis( let mut addresses = Vec::new(); let mut preload_modules: Vec> = Vec::new(); let mut preload_objects = Vec::new(); + let mut all_preload_objects_set = BTreeSet::new(); info!("Creating accounts and gas objects...",); @@ -368,13 +369,37 @@ pub async fn genesis( }; addresses.push(address); + let mut preload_objects_map = BTreeMap::new(); - for object_conf in account.gas_objects { + // Populate gas itemized objects + account.gas_objects.iter().for_each(|q| { + if !all_preload_objects_set.contains(&q.object_id) { + preload_objects_map.insert(q.object_id, q.gas_value); + } + }); + + // Populate ranged gas objects + if let Some(ranges) = account.gas_object_ranges { + for rg in ranges { + let ids = ObjectID::in_range(rg.offset, rg.count)?; + + for obj_id in ids { + if !preload_objects_map.contains_key(&obj_id) + && !all_preload_objects_set.contains(&obj_id) + { + preload_objects_map.insert(obj_id, rg.gas_value); + all_preload_objects_set.insert(obj_id); + } + } + } + } + + for (object_id, value) in preload_objects_map { let new_object = Object::with_id_owner_gas_coin_object_for_testing( - object_conf.object_id, + object_id, SequenceNumber::new(), address, - object_conf.gas_value, + value, ); preload_objects.push(new_object); } diff --git a/sui_types/src/base_types.rs b/sui_types/src/base_types.rs index 97fccc2a4aa3f..65fe87521ad64 100644 --- a/sui_types/src/base_types.rs +++ b/sui_types/src/base_types.rs @@ -1,6 +1,7 @@ // Copyright (c) 2021, Facebook, Inc. and its affiliates // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use anyhow::anyhow; use base64ct::Encoding; use std::collections::{HashMap, HashSet}; use std::convert::{TryFrom, TryInto}; @@ -519,6 +520,66 @@ impl ObjectID { .map_err(|_| ObjectIDParseError::TryFromSliceError) .map(ObjectID::from) } + + /// Incremenent the ObjectID by usize IDs, assuming the ObjectID hex is a number represented as an array of bytes + pub fn advance(&self, step: usize) -> Result { + let mut curr_vec = self.as_slice().to_vec(); + let mut step_copy = step; + + let mut carry = 0; + for idx in (0..Self::LENGTH).rev() { + if step_copy == 0 { + // Nothing else to do + break; + } + // Extract the relevant part + let g = (step_copy % 0x100) as u16; + // Shift to next group + step_copy >>= 8; + let mut val = curr_vec[idx] as u16; + (carry, val) = ((val + carry + g) / 0x100, (val + carry + g) % 0x100); + curr_vec[idx] = val as u8; + } + + if carry > 0 { + return Err(anyhow!("Increment will cause overflow")); + } + ObjectID::from_bytes(curr_vec).map_err(|w| w.into()) + } + + /// Incremenent the ObjectID by one, assuming the ObjectID hex is a number represented as an array of bytes + pub fn next_increment(&self) -> Result { + let mut prev_val = self.as_slice().to_vec(); + let mx = [0xFF; Self::LENGTH]; + + if prev_val == mx { + return Err(anyhow!("Increment will cause overflow")); + } + + // This logic increments the integer representation of an ObjectID u8 array + for idx in (0..Self::LENGTH).rev() { + if prev_val[idx] == 0xFF { + prev_val[idx] = 0; + } else { + prev_val[idx] += 1; + break; + }; + } + ObjectID::from_bytes(prev_val.clone()).map_err(|w| w.into()) + } + + /// Create `count` object IDs starting with one at `offset` + pub fn in_range(offset: ObjectID, count: u64) -> Result, anyhow::Error> { + let mut ret = Vec::new(); + let mut prev = offset; + for o in 0..count { + if o != 0 { + prev = prev.next_increment()?; + } + ret.push(prev); + } + Ok(ret) + } } #[derive(PartialEq, Clone, Debug, thiserror::Error)] From 1e0b42320eec965d69cf73385f1a9a6fd5da778f Mon Sep 17 00:00:00 2001 From: ade Date: Wed, 27 Apr 2022 03:30:09 -0400 Subject: [PATCH 02/16] use u64 max gas --- sui/src/benchmark/transaction_creator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sui/src/benchmark/transaction_creator.rs b/sui/src/benchmark/transaction_creator.rs index 99a2b168dbab2..e67dd360134f2 100644 --- a/sui/src/benchmark/transaction_creator.rs +++ b/sui/src/benchmark/transaction_creator.rs @@ -53,7 +53,7 @@ fn create_gas_object(object_id: ObjectID, owner: SuiAddress) -> Object { object_id, SequenceNumber::new(), owner, - GAS_PER_TX, + u64::MAX, ) } From 5f6aaf6cf50452d3b2921c7e7f8cab6ef2e9be08 Mon Sep 17 00:00:00 2001 From: ade Date: Wed, 27 Apr 2022 03:30:47 -0400 Subject: [PATCH 03/16] use u64 max gas --- sui/src/benchmark/transaction_creator.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sui/src/benchmark/transaction_creator.rs b/sui/src/benchmark/transaction_creator.rs index e67dd360134f2..93d5371a4d20b 100644 --- a/sui/src/benchmark/transaction_creator.rs +++ b/sui/src/benchmark/transaction_creator.rs @@ -13,7 +13,7 @@ use sui_types::SUI_FRAMEWORK_ADDRESS; use sui_types::{base_types::*, committee::*, messages::*, object::Object, serialize::*}; const OBJECT_ID_OFFSET: &str = "0x10000"; -const GAS_PER_TX: u64 = 10000000; +const GAS_PER_TX: u64 = u64::MAX; /// Create a transaction for object transfer /// This can either use the Move path or the native path @@ -53,7 +53,7 @@ fn create_gas_object(object_id: ObjectID, owner: SuiAddress) -> Object { object_id, SequenceNumber::new(), owner, - u64::MAX, + GAS_PER_TX, ) } From aa01684584bb384cf7efb3fdd0fc8587459fdf7e Mon Sep 17 00:00:00 2001 From: ade Date: Wed, 27 Apr 2022 03:50:27 -0400 Subject: [PATCH 04/16] Remove prints --- sui/src/benchmark/multi_load_generator.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sui/src/benchmark/multi_load_generator.rs b/sui/src/benchmark/multi_load_generator.rs index 1166c960f2907..22bc12be07e06 100644 --- a/sui/src/benchmark/multi_load_generator.rs +++ b/sui/src/benchmark/multi_load_generator.rs @@ -100,7 +100,6 @@ async fn send_tx_for_quorum( tokio::spawn(async move { send_tx_chunks_notif(notif, chunk, &mut ch_tx, net_client.clone(), stake, conn).await; - println!("Spawn for order {:?} with stake {}", net_client, stake); }); } drop(order_chann_tx); @@ -128,7 +127,6 @@ async fn send_tx_for_quorum( panic!("Quorum threshold not reached for orders") } - println!("order {}", total); // Confirmation step let (conf_chann_tx, mut conf_chann_rx) = MpscChannel(net_clients.len() * 2); @@ -139,7 +137,6 @@ async fn send_tx_for_quorum( let mut chann_tx = conf_chann_tx.clone(); handles.push(tokio::spawn(async move { let r = send_tx_chunks(chunk, net_client.clone(), conn).await; - println!("Spawn for conf {:?} with stake {}", net_client, stake); match chann_tx.send((r.0, stake)).await { Ok(_) => (), Err(e) => { @@ -174,8 +171,6 @@ async fn send_tx_for_quorum( panic!("Quorum threshold not reached for confirmation") } - println!("conf {}", total); - let elapsed = time_start.elapsed().as_micros(); // Send the total time over From 09925268ecd08740e143609786b8abab899dc4fa Mon Sep 17 00:00:00 2001 From: ade Date: Wed, 27 Apr 2022 16:58:26 -0400 Subject: [PATCH 05/16] SImpler genesis --- faucet/src/test_utils.rs | 2 +- sui/src/bin/validator.rs | 60 +++++++++++++++++++++---------- sui/src/sui_commands.rs | 20 ++++++++--- sui/src/unit_tests/sui_network.rs | 2 +- 4 files changed, 58 insertions(+), 26 deletions(-) diff --git a/faucet/src/test_utils.rs b/faucet/src/test_utils.rs index 10989f991fb48..35ed828521567 100644 --- a/faucet/src/test_utils.rs +++ b/faucet/src/test_utils.rs @@ -65,7 +65,7 @@ pub async fn start_test_network( .collect(); genesis_config.authorities = authorities; - let (network_config, accounts, mut keystore) = genesis(genesis_config).await?; + let (network_config, accounts, mut keystore) = genesis(genesis_config, None).await?; let network = SuiNetwork::start(&network_config).await?; let network_config = network_config.persisted(&network_path); diff --git a/sui/src/bin/validator.rs b/sui/src/bin/validator.rs index 3acdf01d71ead..e15199cf20fd4 100644 --- a/sui/src/bin/validator.rs +++ b/sui/src/bin/validator.rs @@ -58,20 +58,11 @@ async fn main() -> Result<(), anyhow::Error> { let network_config_path = sui_config_dir()?.join(SUI_NETWORK_CONFIG); - let network_config = match (network_config_path.exists(), cfg.force_genesis) { - (true, false) => PersistedConfig::::read(&network_config_path)?, - - // If network.conf is missing, or if --force-genesis is true, we run genesis. - _ => { - let genesis_conf: GenesisConfig = PersistedConfig::read(&cfg.genesis_config_path)?; - let (network_config, _, _) = genesis(genesis_conf).await?; - network_config - } - }; + let genesis_conf: GenesisConfig = PersistedConfig::read(&cfg.genesis_config_path)?; - let net_cfg = if let Some(address) = cfg.address { + let authority = if let Some(address) = cfg.address { // Find the network config for this validator - network_config + genesis_conf .authorities .iter() .find(|x| SuiAddress::from(x.key_pair.public_key_bytes()) == address) @@ -82,21 +73,52 @@ async fn main() -> Result<(), anyhow::Error> { ) })? } else if let Some(index) = cfg.validator_idx { - &network_config.authorities[index] + &genesis_conf.authorities[index] } else { return Err(anyhow!("Must supply either --address of --validator-idx")); }; + let genesis_conf_copy: GenesisConfig = PersistedConfig::read(&cfg.genesis_config_path)?; + + let network_config = match (network_config_path.exists(), cfg.force_genesis) { + (true, false) => PersistedConfig::::read(&network_config_path)?, + + // If network.conf is missing, or if --force-genesis is true, we run genesis. + _ => { + let (network_config, _, _) = + genesis(genesis_conf_copy, Some(authority.address)).await?; + network_config + } + }; + + // let net_cfg = if let Some(address) = cfg.address { + // // Find the network config for this validator + // network_config + // .authorities + // .iter() + // .find(|x| SuiAddress::from(x.key_pair.public_key_bytes()) == address) + // .ok_or_else(|| { + // anyhow!( + // "Network configs must include config for address {}", + // address + // ) + // })? + // } else if let Some(index) = cfg.validator_idx { + // &network_config.authorities[index] + // } else { + // return Err(anyhow!("Must supply either --address of --validator-idx")); + // }; + let listen_address = cfg .listen_address - .unwrap_or(format!("{}:{}", net_cfg.host, net_cfg.port)); + .unwrap_or(format!("{}:{}", authority.host, authority.port)); info!( "authority {:?} listening on {} (public addr: {}:{})", - net_cfg.key_pair.public_key_bytes(), + authority.key_pair.public_key_bytes(), listen_address, - net_cfg.host, - net_cfg.port + authority.host, + authority.port ); let consensus_committee = network_config.make_narwhal_committee(); @@ -107,10 +129,10 @@ async fn main() -> Result<(), anyhow::Error> { }; let consensus_store_path = sui_config_dir()? .join(CONSENSUS_DB_NAME) - .join(encode_bytes_hex(net_cfg.key_pair.public_key_bytes())); + .join(encode_bytes_hex(authority.key_pair.public_key_bytes())); if let Err(e) = make_server( - net_cfg, + authority, &Committee::from(&network_config), network_config.buffer_size, &consensus_committee, diff --git a/sui/src/sui_commands.rs b/sui/src/sui_commands.rs index bf58edafc42fe..58dbc4745959d 100644 --- a/sui/src/sui_commands.rs +++ b/sui/src/sui_commands.rs @@ -27,8 +27,8 @@ use sui_core::authority_server::AuthorityServer; use sui_core::consensus_adapter::ConsensusListener; use sui_network::transport::SpawnedServer; use sui_network::transport::DEFAULT_MAX_DATAGRAM_SIZE; -use sui_types::base_types::{decode_bytes_hex, ObjectID}; use sui_types::base_types::encode_bytes_hex; +use sui_types::base_types::{decode_bytes_hex, ObjectID}; use sui_types::base_types::{SequenceNumber, SuiAddress, TxContext}; use sui_types::committee::Committee; use sui_types::error::SuiResult; @@ -189,7 +189,7 @@ impl SuiCommand { return Ok(()); } - let (network_config, accounts, mut keystore) = genesis(genesis_conf).await?; + let (network_config, accounts, mut keystore) = genesis(genesis_conf, None).await?; info!("Network genesis completed."); let network_config = network_config.persisted(&network_path); network_config.save()?; @@ -335,11 +335,15 @@ impl SuiNetwork { pub async fn genesis( genesis_conf: GenesisConfig, + single_address: Option, ) -> Result<(NetworkConfig, Vec, SuiKeystore), anyhow::Error> { - info!( - "Creating {} new authorities...", + let num_to_provision = if single_address.is_none() { genesis_conf.authorities.len() - ); + } else { + 1 + }; + + info!("Creating {} new authorities...", num_to_provision); let mut network_config = NetworkConfig { epoch: 0, @@ -449,6 +453,12 @@ pub async fn genesis( let committee = Committee::new(network_config.epoch, voting_right); for authority in &network_config.authorities { + if let Some(addr) = single_address { + if addr != authority.address { + continue; + } + } + make_server_with_genesis_ctx( authority, &committee, diff --git a/sui/src/unit_tests/sui_network.rs b/sui/src/unit_tests/sui_network.rs index 7d46eb2553553..4a532648e23a2 100644 --- a/sui/src/unit_tests/sui_network.rs +++ b/sui/src/unit_tests/sui_network.rs @@ -28,7 +28,7 @@ pub async fn start_test_network( .map(|info| AuthorityPrivateInfo { port: 0, ..info }) .collect(); - let (network_config, accounts, mut keystore) = genesis(genesis_config).await?; + let (network_config, accounts, mut keystore) = genesis(genesis_config, None).await?; let network = SuiNetwork::start(&network_config).await?; let network_config = network_config.persisted(&network_path); From 664f5aca8e87090886229f28a5e9126dd4796982 Mon Sep 17 00:00:00 2001 From: ade Date: Wed, 27 Apr 2022 18:38:46 -0400 Subject: [PATCH 06/16] Fix out of order bug --- sui/src/benchmark/bench_types.rs | 2 +- sui_core/src/authority.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sui/src/benchmark/bench_types.rs b/sui/src/benchmark/bench_types.rs index 50ef8c42cc274..a0f60c6ddfad6 100644 --- a/sui/src/benchmark/bench_types.rs +++ b/sui/src/benchmark/bench_types.rs @@ -46,7 +46,7 @@ pub struct Benchmark { #[clap( arg_enum, - default_value = "local-single-validator-thread", + default_value = "single-validator-thread", global = true, ignore_case = true )] diff --git a/sui_core/src/authority.rs b/sui_core/src/authority.rs index d2980ffd8f72a..4d4d965bb9603 100644 --- a/sui_core/src/authority.rs +++ b/sui_core/src/authority.rs @@ -189,7 +189,7 @@ impl AuthorityState { let transaction_digest = transaction.digest(); // Ensure an idempotent answer. - if self._database.transaction_exists(&transaction_digest)? { + if self._database.transaction_exists(&transaction_digest)? || self._database.effects_exists(&transaction_digest)? { let transaction_info = self.make_transaction_info(&transaction_digest).await?; return Ok(transaction_info); } From 43e4e45ce9df5443f24fd35006079dffba63ab94 Mon Sep 17 00:00:00 2001 From: ade Date: Wed, 27 Apr 2022 19:17:53 -0400 Subject: [PATCH 07/16] Not fail on disconnecr --- sui/src/benchmark/multi_load_generator.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/sui/src/benchmark/multi_load_generator.rs b/sui/src/benchmark/multi_load_generator.rs index 22bc12be07e06..f6cf44dc16ebd 100644 --- a/sui/src/benchmark/multi_load_generator.rs +++ b/sui/src/benchmark/multi_load_generator.rs @@ -13,15 +13,14 @@ use rayon::prelude::*; use std::io; use std::sync::Arc; use std::time::{Duration, Instant}; -use sui_network::network::{NetworkClient}; +use sui_network::network::NetworkClient; use sui_types::committee::Committee; use sui_types::{messages::*, serialize::*}; use tokio::sync::Notify; use tokio::time; use tracing::{error, info}; -use crate::config::{ NetworkConfig}; - +use crate::config::NetworkConfig; pub fn check_transaction_response(reply_message: Result) { match reply_message { @@ -66,8 +65,17 @@ async fn send_tx_chunks_notif( conn: usize, ) { notif.notified().await; - let r = send_tx_chunks(tx_chunk, net_client, conn).await; - result_chann_tx.send((r.0, stake)).await.unwrap(); + let r = send_tx_chunks(tx_chunk, net_client.clone(), conn).await; + + match result_chann_tx.send((r.0, stake)).await { + Ok(_) => (), + Err(e) => { + // Disconnect is okay since we may leave f running + if !e.is_disconnected() { + panic!("Send failed! {:?}", net_client) + } + } + } let _: Vec<_> = r.1.par_iter() @@ -140,6 +148,7 @@ async fn send_tx_for_quorum( match chann_tx.send((r.0, stake)).await { Ok(_) => (), Err(e) => { + // Disconnect is okay since we may leave f running if !e.is_disconnected() { panic!("Send failed! {:?}", net_client) } @@ -183,7 +192,6 @@ pub struct MultiFixedRateLoadGenerator { pub period_us: u64, //pub network_config: Vec, - pub tick_notifier: Arc, /// Number of TCP connections to open From d2d4b84ef34fa0540a7e3860cc8d34d46ec9b4cc Mon Sep 17 00:00:00 2001 From: ade Date: Wed, 27 Apr 2022 22:19:22 -0400 Subject: [PATCH 08/16] improve genesis objectload --- sui/src/sui_commands.rs | 31 ++++++++++++++++++++++++++++--- sui_core/src/authority.rs | 6 ++++++ 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/sui/src/sui_commands.rs b/sui/src/sui_commands.rs index 58dbc4745959d..9b9d2d08bacff 100644 --- a/sui/src/sui_commands.rs +++ b/sui/src/sui_commands.rs @@ -523,9 +523,34 @@ async fn make_server_with_genesis_ctx( ) .await; - for object in preload_objects { - state.insert_genesis_object(object.clone()).await; - } + // use tokio::sync::broadcast; + + // let (obj_chann_tx, mut obj_chann_rx) = broadcast::channel(preload_objects.len()); + + // for _ in 0..50 { + // let mut chann_rx = obj_chann_tx.subscribe(); + // tokio::spawn(async move { + // loop { + // match chann_rx.recv().await { + // Ok(o) => state.insert_genesis_object(o).await, + // Err(RecvError) => break, + // Err(e) => panic!(""), + // } + // } + // }); + // } + + // for p in preload_objects { + // obj_chann_tx.send(*p).unwrap(); + // } + + state + .insert_genesis_objects_bulk_unsafe(&preload_objects.iter().collect::>()) + .await; + // for object in preload_objects { + + // state.insert_genesis_object(object.clone()).await; + // } let (tx_sui_to_consensus, _rx_sui_to_consensus) = channel(1); Ok(AuthorityServer::new( diff --git a/sui_core/src/authority.rs b/sui_core/src/authority.rs index 4d4d965bb9603..f75c95fd97008 100644 --- a/sui_core/src/authority.rs +++ b/sui_core/src/authority.rs @@ -644,6 +644,12 @@ impl AuthorityState { .expect("TODO: propagate the error") } + pub async fn insert_genesis_objects_bulk_unsafe(&self, objects: &[&Object]) { + self._database + .bulk_object_insert(objects) + .expect("TODO: propagate the error") + } + /// Persist the Genesis package to DB along with the side effects for module initialization async fn store_package_and_init_modules_for_genesis( &self, From 2251d0a2781f0d8437b90b7ae50a102b647be12e Mon Sep 17 00:00:00 2001 From: ade Date: Thu, 28 Apr 2022 19:13:57 -0400 Subject: [PATCH 09/16] Error expl --- sui_core/src/authority/authority_store.rs | 16 ++++++++++------ sui_types/src/error.rs | 15 +++++++++++++-- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/sui_core/src/authority/authority_store.rs b/sui_core/src/authority/authority_store.rs index 1c83b8a3aede7..bcd20f0ccf39b 100644 --- a/sui_core/src/authority/authority_store.rs +++ b/sui_core/src/authority/authority_store.rs @@ -294,10 +294,11 @@ impl Deserialize<'de>> &self, object_ref: &ObjectRef, ) -> Result>, SuiError> { - let transaction_option = self - .transaction_lock - .get(object_ref)? - .ok_or(SuiError::TransactionLockDoesNotExist)?; + let transaction_option = self.transaction_lock.get(object_ref)?.ok_or( + SuiError::TransactionLockDoesNotExistGet { + object_ref: *object_ref, + }, + )?; match transaction_option { Some(tx_digest) => Ok(Some( @@ -488,7 +489,8 @@ impl Deserialize<'de>> for lock in locks { // The object / version must exist, and therefore lock initialized. - let lock = lock.ok_or(SuiError::TransactionLockDoesNotExist)?; + let lock = + lock.ok_or(SuiError::TransactionLockDoesNotExistSet { digest: tx_digest })?; if let Some(previous_tx_digest) = lock { if previous_tx_digest != tx_digest { @@ -739,7 +741,9 @@ impl Deserialize<'de>> // TODO: maybe we could just check if the certificate is there instead? let locks = self.transaction_lock.multi_get(&active_inputs[..])?; for object_lock in locks { - object_lock.ok_or(SuiError::TransactionLockDoesNotExist)?; + object_lock.ok_or(SuiError::TransactionLockDoesNotExistUpdate { + digest: transaction_digest, + })?; } if let Some(next_seq) = seq_opt { diff --git a/sui_types/src/error.rs b/sui_types/src/error.rs index a677156f60772..d35be304d2b61 100644 --- a/sui_types/src/error.rs +++ b/sui_types/src/error.rs @@ -215,8 +215,19 @@ pub enum SuiError { InvalidTxUpdate, #[error("Attempt to re-initialize a transaction lock.")] TransactionLockExists, - #[error("Attempt to set an non-existing transaction lock.")] - TransactionLockDoesNotExist, + + + #[error("GET: Attempt to set an non-existing transaction lock..[{:?}].", object_ref)] + TransactionLockDoesNotExistGet { object_ref: ObjectRef }, + + + #[error("SET Attempt to set an non-existing transaction lock.[{:?}].", digest)] + TransactionLockDoesNotExistSet { digest: TransactionDigest }, + + #[error("UPDATE Attempt to set an non-existing transaction lock..[{:?}].", digest)] + TransactionLockDoesNotExistUpdate { digest: TransactionDigest }, + + #[error("Attempt to reset a set transaction lock to a different value.")] TransactionLockReset, #[error("Could not find the referenced transaction [{:?}].", digest)] From 43f08fbaf005071dd5f7aacb33805f118cf80a2e Mon Sep 17 00:00:00 2001 From: ade Date: Thu, 28 Apr 2022 21:38:23 -0400 Subject: [PATCH 10/16] simplify print --- sui/src/benchmark.rs | 2 +- sui/src/benchmark/bench_types.rs | 24 +++++++++++-- sui/src/bin/remote_load_generator.rs | 50 ++++++++-------------------- sui/src/bin/validator.rs | 16 ++++----- 4 files changed, 44 insertions(+), 48 deletions(-) diff --git a/sui/src/benchmark.rs b/sui/src/benchmark.rs index bcc6f7b6b25e1..d74d36d748ee8 100644 --- a/sui/src/benchmark.rs +++ b/sui/src/benchmark.rs @@ -218,7 +218,7 @@ fn run_latency_microbench( validator_preparer.clean_up(); match result { - Ok((load_latencies, tracer_latencies)) => MicroBenchmarkResult::Latency { + Ok((load_latencies, tracer_latencies)) => MicroBenchmarkResult::CombinedLatency { load_chunk_size: chunk_size, load_latencies, tick_period_us: period_us as usize, diff --git a/sui/src/benchmark/bench_types.rs b/sui/src/benchmark/bench_types.rs index a0f60c6ddfad6..66dc805e91fc0 100644 --- a/sui/src/benchmark/bench_types.rs +++ b/sui/src/benchmark/bench_types.rs @@ -143,12 +143,17 @@ pub enum MicroBenchmarkResult { Throughput { chunk_throughput: f64, }, - Latency { + CombinedLatency { load_chunk_size: usize, tick_period_us: usize, load_latencies: Vec, chunk_latencies: Vec, }, + Latency { + load_chunk_size: usize, + tick_period_us: usize, + latencies: Vec, + }, } impl std::fmt::Display for MicroBenchmarkResult { @@ -157,7 +162,7 @@ impl std::fmt::Display for MicroBenchmarkResult { MicroBenchmarkResult::Throughput { chunk_throughput } => { write!(f, "Throughout: {} tps", chunk_throughput) } - MicroBenchmarkResult::Latency { + MicroBenchmarkResult::CombinedLatency { chunk_latencies, load_chunk_size, tick_period_us: tick_period, @@ -176,6 +181,21 @@ impl std::fmt::Display for MicroBenchmarkResult { chunk_latencies.len() ) } + MicroBenchmarkResult::Latency { + load_chunk_size, + tick_period_us, + latencies, + } => { + let tracer_avg = latencies.iter().sum::() as f64 / latencies.len() as f64; + + write!( + f, + "Average Latency {} us @ {} tps ({} samples)", + tracer_avg, + calculate_throughput(*load_chunk_size, *tick_period_us as u128), + latencies.len() + ) + } } } } diff --git a/sui/src/bin/remote_load_generator.rs b/sui/src/bin/remote_load_generator.rs index 82e3d70b631da..a26b5702bc561 100644 --- a/sui/src/bin/remote_load_generator.rs +++ b/sui/src/bin/remote_load_generator.rs @@ -123,16 +123,6 @@ fn run_latency_microbench( &mut validator_preparer, ); - // These are probe TXes used for measuring latency - let probe_txes = tx_cr.generate_transactions( - 1, - use_move, - 1, - num_chunks, - Some(sender), - &mut validator_preparer, - ); - let result = panic::catch_unwind(|| { let runtime = Builder::new_multi_thread() .enable_all() @@ -141,40 +131,26 @@ fn run_latency_microbench( .build() .unwrap(); // Prep the generators - let (mut load_gen, mut probe_gen) = runtime.block_on(async move { - join!( - MultiFixedRateLoadGenerator::new( - load_gen_txes, - period_us, - connections, - &network_config, - recv_timeout, - send_timeout, - ), - MultiFixedRateLoadGenerator::new( - probe_txes, - period_us, - 1, - &network_config, - recv_timeout, - send_timeout, - ), - ) + let mut load_gen = runtime.block_on(async move { + join!(MultiFixedRateLoadGenerator::new( + load_gen_txes, + period_us, + connections, + &network_config, + recv_timeout, + send_timeout, + )) }); - // Run the load gen and probes - let (load_latencies, probe_latencies) = - runtime.block_on(async move { join!(load_gen.start(), probe_gen.start()) }); - - (load_latencies, probe_latencies) + // Run the load gen + runtime.block_on(async move { join!(load_gen.0.start()) }) }); match result { - Ok((load_latencies, probe_latencies)) => MicroBenchmarkResult::Latency { + Ok(load_latencies) => MicroBenchmarkResult::Latency { load_chunk_size: chunk_size, - load_latencies, tick_period_us: period_us as usize, - chunk_latencies: probe_latencies, + latencies: load_latencies.0, }, Err(err) => { panic::resume_unwind(err); diff --git a/sui/src/bin/validator.rs b/sui/src/bin/validator.rs index e15199cf20fd4..5f85fbd3cc802 100644 --- a/sui/src/bin/validator.rs +++ b/sui/src/bin/validator.rs @@ -113,14 +113,6 @@ async fn main() -> Result<(), anyhow::Error> { .listen_address .unwrap_or(format!("{}:{}", authority.host, authority.port)); - info!( - "authority {:?} listening on {} (public addr: {}:{})", - authority.key_pair.public_key_bytes(), - listen_address, - authority.host, - authority.port - ); - let consensus_committee = network_config.make_narwhal_committee(); let consensus_parameters = ConsensusParameters { max_header_delay: 5_000, @@ -131,6 +123,14 @@ async fn main() -> Result<(), anyhow::Error> { .join(CONSENSUS_DB_NAME) .join(encode_bytes_hex(authority.key_pair.public_key_bytes())); + info!( + "Initializing authority {:?} listening on {} (public addr: {}:{})", + authority.key_pair.public_key_bytes(), + listen_address, + authority.host, + authority.port + ); + if let Err(e) = make_server( authority, &Committee::from(&network_config), From 76e46335a371655be3a79ad65bc7f63e4465e0a4 Mon Sep 17 00:00:00 2001 From: ade Date: Thu, 28 Apr 2022 23:28:37 -0400 Subject: [PATCH 11/16] authority fix --- sui_core/src/authority.rs | 36 +++++++++++++++++++++++++++++------- 1 file changed, 29 insertions(+), 7 deletions(-) diff --git a/sui_core/src/authority.rs b/sui_core/src/authority.rs index f75c95fd97008..c9d6cd5f6b56d 100644 --- a/sui_core/src/authority.rs +++ b/sui_core/src/authority.rs @@ -179,17 +179,13 @@ impl AuthorityState { } } - /// Initiate a new transaction. - pub async fn handle_transaction( + async fn handle_transaction_impl( &self, transaction: Transaction, + transaction_digest: TransactionDigest, ) -> Result { - // Check the sender's signature. - transaction.check_signature()?; - let transaction_digest = transaction.digest(); - // Ensure an idempotent answer. - if self._database.transaction_exists(&transaction_digest)? || self._database.effects_exists(&transaction_digest)? { + if self._database.transaction_exists(&transaction_digest)? { let transaction_info = self.make_transaction_info(&transaction_digest).await?; return Ok(transaction_info); } @@ -225,6 +221,32 @@ impl AuthorityState { self.make_transaction_info(&transaction_digest).await } + /// Initiate a new transaction. + pub async fn handle_transaction( + &self, + transaction: Transaction, + ) -> Result { + // Check the sender's signature. + transaction.check_signature()?; + let transaction_digest = transaction.digest(); + + let response = self + .handle_transaction_impl(transaction, transaction_digest) + .await; + match response { + Ok(r) => Ok(r), + // If we see an error, it is possible that a certificate has already been processed. + // In that case, we could still return Ok to avoid showing confusing errors. + Err(err) => { + if self._database.effects_exists(&transaction_digest)? { + Ok(self.make_transaction_info(&transaction_digest).await?) + } else { + Err(err) + } + } + } + } + /// Confirm a transfer. pub async fn handle_confirmation_transaction( &self, From ef642736a7260b08d968ea2e91df745362e0abc5 Mon Sep 17 00:00:00 2001 From: ade Date: Mon, 2 May 2022 15:32:29 -0400 Subject: [PATCH 12/16] Revert error --- sui_core/src/authority/authority_store.rs | 17 ++++++----------- sui_types/src/error.rs | 15 ++------------- 2 files changed, 8 insertions(+), 24 deletions(-) diff --git a/sui_core/src/authority/authority_store.rs b/sui_core/src/authority/authority_store.rs index bcd20f0ccf39b..36d8b2d12c2be 100644 --- a/sui_core/src/authority/authority_store.rs +++ b/sui_core/src/authority/authority_store.rs @@ -294,11 +294,10 @@ impl Deserialize<'de>> &self, object_ref: &ObjectRef, ) -> Result>, SuiError> { - let transaction_option = self.transaction_lock.get(object_ref)?.ok_or( - SuiError::TransactionLockDoesNotExistGet { - object_ref: *object_ref, - }, - )?; + let transaction_option = self + .transaction_lock + .get(object_ref)? + .ok_or(SuiError::TransactionLockDoesNotExist)?; match transaction_option { Some(tx_digest) => Ok(Some( @@ -489,9 +488,7 @@ impl Deserialize<'de>> for lock in locks { // The object / version must exist, and therefore lock initialized. - let lock = - lock.ok_or(SuiError::TransactionLockDoesNotExistSet { digest: tx_digest })?; - + let lock = lock.ok_or(SuiError::TransactionLockDoesNotExist)?; if let Some(previous_tx_digest) = lock { if previous_tx_digest != tx_digest { warn!(prev_tx_digest =? previous_tx_digest, @@ -741,9 +738,7 @@ impl Deserialize<'de>> // TODO: maybe we could just check if the certificate is there instead? let locks = self.transaction_lock.multi_get(&active_inputs[..])?; for object_lock in locks { - object_lock.ok_or(SuiError::TransactionLockDoesNotExistUpdate { - digest: transaction_digest, - })?; + object_lock.ok_or(SuiError::TransactionLockDoesNotExist)?; } if let Some(next_seq) = seq_opt { diff --git a/sui_types/src/error.rs b/sui_types/src/error.rs index d35be304d2b61..a677156f60772 100644 --- a/sui_types/src/error.rs +++ b/sui_types/src/error.rs @@ -215,19 +215,8 @@ pub enum SuiError { InvalidTxUpdate, #[error("Attempt to re-initialize a transaction lock.")] TransactionLockExists, - - - #[error("GET: Attempt to set an non-existing transaction lock..[{:?}].", object_ref)] - TransactionLockDoesNotExistGet { object_ref: ObjectRef }, - - - #[error("SET Attempt to set an non-existing transaction lock.[{:?}].", digest)] - TransactionLockDoesNotExistSet { digest: TransactionDigest }, - - #[error("UPDATE Attempt to set an non-existing transaction lock..[{:?}].", digest)] - TransactionLockDoesNotExistUpdate { digest: TransactionDigest }, - - + #[error("Attempt to set an non-existing transaction lock.")] + TransactionLockDoesNotExist, #[error("Attempt to reset a set transaction lock to a different value.")] TransactionLockReset, #[error("Could not find the referenced transaction [{:?}].", digest)] From c67b42be802ecae98809b5dc59b540f47379fa67 Mon Sep 17 00:00:00 2001 From: ade Date: Tue, 3 May 2022 00:05:49 -0400 Subject: [PATCH 13/16] Synced to main --- sui/src/benchmark/transaction_creator.rs | 44 ------------------------ sui/src/sui_commands.rs | 26 +------------- 2 files changed, 1 insertion(+), 69 deletions(-) diff --git a/sui/src/benchmark/transaction_creator.rs b/sui/src/benchmark/transaction_creator.rs index 157bfe1a92a7d..332baea996a38 100644 --- a/sui/src/benchmark/transaction_creator.rs +++ b/sui/src/benchmark/transaction_creator.rs @@ -130,50 +130,6 @@ fn make_serialized_transactions( .collect() } -// fn make_transactions2( -// address: SuiAddress, -// key_pair: KeyPair, -// chunk_size: usize, -// num_chunks: usize, -// conn: usize, -// use_move: bool, -// object_id_offset: usize, -// auth_keys: &[KeyPair], -// committee: &Committee, -// ) -> (Vec, Vec) { -// assert_eq!(chunk_size % conn, 0); -// let batch_size_per_conn = chunk_size / conn; - -// // The batch-adjusted number of transactions -// let batch_tx_count = num_chunks * conn; -// // Only need one gas object per batch -// let account_gas_objects: Vec<_> = make_gas_objects( -// address, -// batch_tx_count, -// batch_size_per_conn, -// object_id_offset, -// use_move, -// ); - -// // Bulk load objects -// let all_objects: Vec<_> = account_gas_objects -// .clone() -// .into_iter() -// .flat_map(|(objects, gas)| objects.into_iter().chain(std::iter::once(gas))) -// .collect(); - -// let serialized_txes = make_serialized_transactions( -// address, -// key_pair, -// committee, -// &account_gas_objects, -// auth_keys, -// batch_size_per_conn, -// use_move, -// ); -// (serialized_txes, all_objects) -// } - pub struct TransactionCreator { pub object_id_offset: ObjectID, } diff --git a/sui/src/sui_commands.rs b/sui/src/sui_commands.rs index ce3d4a8d4cb4a..6930cfc7e38e3 100644 --- a/sui/src/sui_commands.rs +++ b/sui/src/sui_commands.rs @@ -578,34 +578,10 @@ async fn make_server_with_genesis_ctx( ) .await; - // use tokio::sync::broadcast; - - // let (obj_chann_tx, mut obj_chann_rx) = broadcast::channel(preload_objects.len()); - - // for _ in 0..50 { - // let mut chann_rx = obj_chann_tx.subscribe(); - // tokio::spawn(async move { - // loop { - // match chann_rx.recv().await { - // Ok(o) => state.insert_genesis_object(o).await, - // Err(RecvError) => break, - // Err(e) => panic!(""), - // } - // } - // }); - // } - - // for p in preload_objects { - // obj_chann_tx.send(*p).unwrap(); - // } - + // Okay to do this since we're at genesis state .insert_genesis_objects_bulk_unsafe(&preload_objects.iter().collect::>()) .await; - // for object in preload_objects { - - // state.insert_genesis_object(object.clone()).await; - // } let (tx_sui_to_consensus, _rx_sui_to_consensus) = channel(1); Ok(AuthorityServer::new( From a35f59d9576fc27a69bfd58429ef251eb525e793 Mon Sep 17 00:00:00 2001 From: ade Date: Tue, 3 May 2022 00:42:09 -0400 Subject: [PATCH 14/16] license --- sui/src/bin/bench_configure.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sui/src/bin/bench_configure.rs b/sui/src/bin/bench_configure.rs index 765e6f48f8cb4..a9920eb57f503 100644 --- a/sui/src/bin/bench_configure.rs +++ b/sui/src/bin/bench_configure.rs @@ -1,3 +1,6 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + #![deny(warnings)] use clap::*; From 009eb7f1e18b3d7a79de9ec649ec95a92ff82440 Mon Sep 17 00:00:00 2001 From: ade Date: Tue, 3 May 2022 03:09:20 -0400 Subject: [PATCH 15/16] fix test --- sui/src/unit_tests/sui_network.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sui/src/unit_tests/sui_network.rs b/sui/src/unit_tests/sui_network.rs index d4d93f1dbf90b..8c8f6ff26ce67 100644 --- a/sui/src/unit_tests/sui_network.rs +++ b/sui/src/unit_tests/sui_network.rs @@ -61,6 +61,10 @@ pub async fn start_test_network( .map(|info| AuthorityPrivateInfo { port: 0, ..info }) .collect(); + let (network_config, accounts, mut keystore) = genesis(genesis_config, None).await?; + let key_pair_refs = key_pairs.iter().collect::>(); + let network = SuiNetwork::start(&network_config, key_pair_refs).await?; + let network_config = network_config.persisted(&network_path); network_config.save()?; keystore.set_path(&keystore_path); From 9e110b7ef57088ab3caac0cd6c068af9ec832870 Mon Sep 17 00:00:00 2001 From: ade Date: Tue, 3 May 2022 03:50:18 -0400 Subject: [PATCH 16/16] Code cleanup --- sui/src/benchmark.rs | 1 - sui/src/benchmark/load_generator.rs | 269 ++++++++++++++++- sui/src/benchmark/multi_load_generator.rs | 333 ---------------------- sui/src/bin/remote_load_generator.rs | 2 +- 4 files changed, 269 insertions(+), 336 deletions(-) delete mode 100644 sui/src/benchmark/multi_load_generator.rs diff --git a/sui/src/benchmark.rs b/sui/src/benchmark.rs index 7ce9d85308846..c6c5473914963 100644 --- a/sui/src/benchmark.rs +++ b/sui/src/benchmark.rs @@ -18,7 +18,6 @@ use tokio::runtime::{Builder, Runtime}; use tracing::{error, info}; pub mod bench_types; pub mod load_generator; -pub mod multi_load_generator; pub mod transaction_creator; pub mod validator_preparer; use crate::benchmark::{ diff --git a/sui/src/benchmark/load_generator.rs b/sui/src/benchmark/load_generator.rs index 03a830909fc08..1833ad73a5d95 100644 --- a/sui/src/benchmark/load_generator.rs +++ b/sui/src/benchmark/load_generator.rs @@ -20,10 +20,12 @@ use sui_core::{ authority_server::{AuthorityServer, AuthorityServerHandle}, }; use sui_network::network::{NetworkClient, NetworkServer}; -use sui_types::{messages::*, serialize::*}; +use sui_types::{committee::Committee, messages::*, serialize::*}; use tokio::{sync::Notify, time}; use tracing::{error, info}; +use crate::config::NetworkConfig; + pub fn check_transaction_response(reply_message: Result) { match reply_message { Ok(SerializedMessage::TransactionResp(res)) => { @@ -202,3 +204,268 @@ pub async fn spawn_authority_server( pub fn calculate_throughput(num_items: usize, elapsed_time_us: u128) -> f64 { 1_000_000.0 * num_items as f64 / elapsed_time_us as f64 } + +async fn send_tx_chunks_for_quorum_notif( + notif: Arc, + tx_chunk: Vec, + result_chann_tx: &mut MpscSender<(u128, usize)>, + net_client: NetworkClient, + stake: usize, + conn: usize, +) { + notif.notified().await; + let r = send_tx_chunks(tx_chunk, net_client.clone(), conn).await; + + match result_chann_tx.send((r.0, stake)).await { + Ok(_) => (), + Err(e) => { + // Disconnect is okay since we may leave f running + if !e.is_disconnected() { + panic!("Send failed! {:?}", net_client) + } + } + } + + let _: Vec<_> = + r.1.par_iter() + .map(|q| check_transaction_response(deserialize_message(&(q.as_ref().unwrap())[..]))) + .collect(); +} + +async fn send_tx_for_quorum( + notif: Arc, + order_chunk: Vec, + conf_chunk: Vec, + + result_chann_tx: &mut MpscSender, + net_clients: Vec<(NetworkClient, usize)>, + conn: usize, + quorum_threshold: usize, +) { + // For receiving info back from the subtasks + let (order_chann_tx, mut order_chann_rx) = MpscChannel(net_clients.len() * 2); + + // Send intent orders to 3f+1 + let order_start_notifier = Arc::new(Notify::new()); + for (net_client, stake) in net_clients.clone() { + // This is for sending a start signal to the subtasks + let notif = order_start_notifier.clone(); + // This is for getting the elapsed time + let mut ch_tx = order_chann_tx.clone(); + // Chunk to send for order_ + let chunk = order_chunk.clone(); + + tokio::spawn(async move { + send_tx_chunks_for_quorum_notif( + notif, + chunk, + &mut ch_tx, + net_client.clone(), + stake, + conn, + ) + .await; + }); + } + drop(order_chann_tx); + + // Wait for timer tick + notif.notified().await; + // Notify all the subtasks + order_start_notifier.notify_waiters(); + // Start timer + let time_start = Instant::now(); + + // Wait for 2f+1 by stake + let mut total = 0; + + while let Some(v) = time::timeout(Duration::from_secs(10), order_chann_rx.next()) + .await + .unwrap() + { + total += v.1; + if total >= quorum_threshold { + break; + } + } + if total < quorum_threshold { + panic!("Quorum threshold not reached for orders") + } + + // Confirmation step + let (conf_chann_tx, mut conf_chann_rx) = MpscChannel(net_clients.len() * 2); + + // Send the confs + let mut handles = vec![]; + for (net_client, stake) in net_clients { + let chunk = conf_chunk.clone(); + let mut chann_tx = conf_chann_tx.clone(); + handles.push(tokio::spawn(async move { + let r = send_tx_chunks(chunk, net_client.clone(), conn).await; + match chann_tx.send((r.0, stake)).await { + Ok(_) => (), + Err(e) => { + // Disconnect is okay since we may leave f running + if !e.is_disconnected() { + panic!("Send failed! {:?}", net_client) + } + } + } + + let _: Vec<_> = + r.1.par_iter() + .map(|q| { + check_transaction_response(deserialize_message(&(q.as_ref().unwrap())[..])) + }) + .collect(); + })); + } + drop(conf_chann_tx); + + // Reset counter + total = 0; + while let Some(v) = time::timeout(Duration::from_secs(10), conf_chann_rx.next()) + .await + .unwrap() + { + total += v.1; + if total >= quorum_threshold { + break; + } + } + if total < quorum_threshold { + panic!("Quorum threshold not reached for confirmation") + } + + let elapsed = time_start.elapsed().as_micros(); + + // Send the total time over + result_chann_tx.send(elapsed).await.unwrap(); +} + +pub struct MultiFixedRateLoadGenerator { + /// The time between sending transactions chunks + /// Anything below 10ms causes degradation in resolution + pub period_us: u64, + + //pub network_config: Vec, + pub tick_notifier: Arc, + + /// Number of TCP connections to open + pub connections: usize, + + /// Transactions to be sent + pub transactions: Vec, + + /// Results are sent over this channel + pub results_chann_rx: Receiver, + + /// This is the chunk size actually assigned for each tick per task + /// It is 2*chunk_size due to order and confirmation steps + pub chunk_size_per_task: usize, +} + +impl MultiFixedRateLoadGenerator { + pub async fn new( + transactions: Vec, + period_us: u64, + connections: usize, + + network_cfg: &NetworkConfig, + recv_timeout: Duration, + send_timeout: Duration, + ) -> Self { + let network_clients_stake: Vec<(NetworkClient, usize)> = network_cfg + .authorities + .iter() + .map(|q| { + ( + NetworkClient::new( + q.host.clone(), + q.port, + network_cfg.buffer_size, + send_timeout, + recv_timeout, + ), + q.stake, + ) + }) + .collect(); + let committee_quorum_threshold = Committee::from(network_cfg).quorum_threshold(); + let mut handles = vec![]; + let tick_notifier = Arc::new(Notify::new()); + + let (result_chann_tx, results_chann_rx) = MpscChannel(transactions.len() * 2); + + let conn = connections; + // Spin up a bunch of worker tasks + // Give each task + // Step by 2*conn due to order+confirmation, with `conn` tcp connections + // Take up to 2*conn for each task + let num_chunks_per_task = conn * 2; + for tx_chunk in transactions[..].chunks(num_chunks_per_task) { + let notif = tick_notifier.clone(); + let mut result_chann_tx = result_chann_tx.clone(); + let tx_chunk = tx_chunk.to_vec(); + let clients = network_clients_stake.clone(); + + let mut order_chunk = vec![]; + let mut conf_chunk = vec![]; + + for ch in tx_chunk[..].chunks(2) { + order_chunk.push(ch[0].clone()); + conf_chunk.push(ch[1].clone()); + } + + handles.push(tokio::spawn(async move { + send_tx_for_quorum( + notif, + order_chunk, + conf_chunk, + &mut result_chann_tx, + clients, + conn, + committee_quorum_threshold, + ) + .await; + })); + } + + drop(result_chann_tx); + + Self { + period_us, + transactions, + connections, + results_chann_rx, + tick_notifier, + chunk_size_per_task: num_chunks_per_task, + //network_config: network_cfg.authorities, + } + } + + pub async fn start(&mut self) -> Vec { + let mut interval = time::interval(Duration::from_micros(self.period_us)); + let mut count = 0; + loop { + tokio::select! { + _ = interval.tick() => { + self.tick_notifier.notify_one(); + count += self.chunk_size_per_task; + if count >= self.transactions.len() { + break; + } + } + } + } + let mut times = Vec::new(); + while let Some(v) = time::timeout(Duration::from_secs(10), self.results_chann_rx.next()) + .await + .unwrap_or(None) + { + times.push(v); + } + + times + } +} diff --git a/sui/src/benchmark/multi_load_generator.rs b/sui/src/benchmark/multi_load_generator.rs deleted file mode 100644 index fb4dc17acb70c..0000000000000 --- a/sui/src/benchmark/multi_load_generator.rs +++ /dev/null @@ -1,333 +0,0 @@ -// Copyright (c) 2022, Mysten Labs, Inc. -// SPDX-License-Identifier: Apache-2.0 - -#![deny(warnings)] - -use anyhow::Error; -use bytes::Bytes; -use futures::channel::mpsc::{channel as MpscChannel, Receiver, Sender as MpscSender}; -use futures::stream::StreamExt; -use futures::SinkExt; - -use rayon::prelude::*; -use std::io; -use std::sync::Arc; -use std::time::{Duration, Instant}; -use sui_core::authority_client::{AuthorityAPI, NetworkAuthorityClient}; -use sui_network::network::NetworkClient; -use sui_types::committee::Committee; -use sui_types::{messages::*, serialize::*}; -use tokio::sync::Notify; -use tokio::time; -use tracing::{error, info}; - -use crate::config::NetworkConfig; - -pub fn check_transaction_response(reply_message: Result) { - match reply_message { - Ok(SerializedMessage::TransactionResp(res)) => { - if let Some(e) = res.signed_effects { - if matches!(e.effects.status, ExecutionStatus::Failure { .. }) { - info!("Execution Error {:?}", e.effects.status); - } - } - } - Err(err) => { - error!("Received Error {:?}", err); - } - Ok(q) => error!("Received invalid response {:?}", q), - }; -} - -pub async fn send_tx_chunks( - tx_chunks: Vec, - net_client: NetworkClient, - _conn: usize, -) -> (u128, Vec, io::Error>>) { - let time_start = Instant::now(); - - // This probably isn't going to be as fast so we probably want to provide away to send a batch - // of txns to the authority at a time - let client = NetworkAuthorityClient::new(net_client); - let mut tx_resp = Vec::new(); - for tx in tx_chunks { - let message = deserialize_message(&tx[..]).unwrap(); - let resp = match message { - SerializedMessage::Transaction(transaction) => client - .handle_transaction(*transaction) - .await - .map(|resp| serialize_transaction_info(&resp)) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), - SerializedMessage::Cert(cert) => client - .handle_confirmation_transaction(ConfirmationTransaction { certificate: *cert }) - .await - .map(|resp| serialize_transaction_info(&resp)) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)), - _ => panic!("unexpected message type"), - }; - tx_resp.push(resp); - } - - let elapsed = time_start.elapsed().as_micros(); - - (elapsed, tx_resp) -} - -async fn send_tx_chunks_notif( - notif: Arc, - tx_chunk: Vec, - result_chann_tx: &mut MpscSender<(u128, usize)>, - net_client: NetworkClient, - stake: usize, - conn: usize, -) { - notif.notified().await; - let r = send_tx_chunks(tx_chunk, net_client.clone(), conn).await; - - match result_chann_tx.send((r.0, stake)).await { - Ok(_) => (), - Err(e) => { - // Disconnect is okay since we may leave f running - if !e.is_disconnected() { - panic!("Send failed! {:?}", net_client) - } - } - } - - let _: Vec<_> = - r.1.par_iter() - .map(|q| check_transaction_response(deserialize_message(&(q.as_ref().unwrap())[..]))) - .collect(); -} - -async fn send_tx_for_quorum( - notif: Arc, - order_chunk: Vec, - conf_chunk: Vec, - - result_chann_tx: &mut MpscSender, - net_clients: Vec<(NetworkClient, usize)>, - conn: usize, - quorum_threshold: usize, -) { - // For receiving info back from the subtasks - let (order_chann_tx, mut order_chann_rx) = MpscChannel(net_clients.len() * 2); - - // Send intent orders to 3f+1 - let order_start_notifier = Arc::new(Notify::new()); - for (net_client, stake) in net_clients.clone() { - // This is for sending a start signal to the subtasks - let notif = order_start_notifier.clone(); - // This is for getting the elapsed time - let mut ch_tx = order_chann_tx.clone(); - // Chunk to send for order_ - let chunk = order_chunk.clone(); - - tokio::spawn(async move { - send_tx_chunks_notif(notif, chunk, &mut ch_tx, net_client.clone(), stake, conn).await; - }); - } - drop(order_chann_tx); - - // Wait for timer tick - notif.notified().await; - // Notify all the subtasks - order_start_notifier.notify_waiters(); - // Start timer - let time_start = Instant::now(); - - // Wait for 2f+1 by stake - let mut total = 0; - - while let Some(v) = time::timeout(Duration::from_secs(10), order_chann_rx.next()) - .await - .unwrap() - { - total += v.1; - if total >= quorum_threshold { - break; - } - } - if total < quorum_threshold { - panic!("Quorum threshold not reached for orders") - } - - // Confirmation step - let (conf_chann_tx, mut conf_chann_rx) = MpscChannel(net_clients.len() * 2); - - // Send the confs - let mut handles = vec![]; - for (net_client, stake) in net_clients { - let chunk = conf_chunk.clone(); - let mut chann_tx = conf_chann_tx.clone(); - handles.push(tokio::spawn(async move { - let r = send_tx_chunks(chunk, net_client.clone(), conn).await; - match chann_tx.send((r.0, stake)).await { - Ok(_) => (), - Err(e) => { - // Disconnect is okay since we may leave f running - if !e.is_disconnected() { - panic!("Send failed! {:?}", net_client) - } - } - } - - let _: Vec<_> = - r.1.par_iter() - .map(|q| { - check_transaction_response(deserialize_message(&(q.as_ref().unwrap())[..])) - }) - .collect(); - })); - } - drop(conf_chann_tx); - - // Reset counter - total = 0; - while let Some(v) = time::timeout(Duration::from_secs(10), conf_chann_rx.next()) - .await - .unwrap() - { - total += v.1; - if total >= quorum_threshold { - break; - } - } - if total < quorum_threshold { - panic!("Quorum threshold not reached for confirmation") - } - - let elapsed = time_start.elapsed().as_micros(); - - // Send the total time over - result_chann_tx.send(elapsed).await.unwrap(); -} - -pub struct MultiFixedRateLoadGenerator { - /// The time between sending transactions chunks - /// Anything below 10ms causes degradation in resolution - pub period_us: u64, - - //pub network_config: Vec, - pub tick_notifier: Arc, - - /// Number of TCP connections to open - pub connections: usize, - - pub transactions: Vec, - - pub results_chann_rx: Receiver, - - /// This is the chunk size actually assigned for each tick per task - /// It is 2*chunk_size due to order and confirmation steps - pub chunk_size_per_task: usize, -} - -impl MultiFixedRateLoadGenerator { - pub async fn new( - transactions: Vec, - period_us: u64, - connections: usize, - - network_cfg: &NetworkConfig, - recv_timeout: Duration, - send_timeout: Duration, - ) -> Self { - let network_clients_stake: Vec<(NetworkClient, usize)> = network_cfg - .authorities - .iter() - .map(|q| { - ( - NetworkClient::new( - q.host.clone(), - q.port, - network_cfg.buffer_size, - send_timeout, - recv_timeout, - ), - q.stake, - ) - }) - .collect(); - let committee_quorum_threshold = Committee::from(network_cfg).quorum_threshold(); - let mut handles = vec![]; - let tick_notifier = Arc::new(Notify::new()); - - let (result_chann_tx, results_chann_rx) = MpscChannel(transactions.len() * 2); - - let conn = connections; - // Spin up a bunch of worker tasks - // Give each task - // Step by 2*conn due to order+confirmation, with `conn` tcp connections - // Take up to 2*conn for each task - let num_chunks_per_task = conn * 2; - for tx_chunk in transactions[..].chunks(num_chunks_per_task) { - let notif = tick_notifier.clone(); - let mut result_chann_tx = result_chann_tx.clone(); - let tx_chunk = tx_chunk.to_vec(); - let clients = network_clients_stake.clone(); - - let mut order_chunk = vec![]; - let mut conf_chunk = vec![]; - - for ch in tx_chunk[..].chunks(2) { - order_chunk.push(ch[0].clone()); - conf_chunk.push(ch[1].clone()); - } - - handles.push(tokio::spawn(async move { - send_tx_for_quorum( - notif, - order_chunk, - conf_chunk, - &mut result_chann_tx, - clients, - conn, - committee_quorum_threshold, - ) - .await; - })); - } - - drop(result_chann_tx); - - Self { - period_us, - transactions, - connections, - results_chann_rx, - tick_notifier, - chunk_size_per_task: num_chunks_per_task, - //network_config: network_cfg.authorities, - } - } - - pub async fn start(&mut self) -> Vec { - let mut interval = time::interval(Duration::from_micros(self.period_us)); - let mut count = 0; - loop { - tokio::select! { - _ = interval.tick() => { - self.tick_notifier.notify_one(); - count += self.chunk_size_per_task; - if count >= self.transactions.len() { - break; - } - } - } - } - let mut times = Vec::new(); - while let Some(v) = time::timeout(Duration::from_secs(10), self.results_chann_rx.next()) - .await - .unwrap_or(None) - { - times.push(v); - } - - times - } -} - -pub fn calculate_throughput(num_items: usize, elapsed_time_us: u128) -> f64 { - 1_000_000.0 * num_items as f64 / elapsed_time_us as f64 -} diff --git a/sui/src/bin/remote_load_generator.rs b/sui/src/bin/remote_load_generator.rs index aa7e5aa3136d0..b083445b892c8 100644 --- a/sui/src/bin/remote_load_generator.rs +++ b/sui/src/bin/remote_load_generator.rs @@ -4,11 +4,11 @@ use clap::*; use futures::join; use sui::benchmark::bench_types::{MicroBenchmarkResult, RemoteLoadGenConfig}; +use sui::benchmark::load_generator::MultiFixedRateLoadGenerator; use std::panic; use std::path::PathBuf; use std::time::Duration; -use sui::benchmark::multi_load_generator::MultiFixedRateLoadGenerator; use sui::benchmark::transaction_creator::TransactionCreator; use sui::benchmark::validator_preparer::ValidatorPreparer; use sui::config::{NetworkConfig, PersistedConfig};