Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Modularize benches #1321

Merged
merged 5 commits into from
Apr 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/bench.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ jobs:
- name: run benchmark
run: |
set -o pipefail
cargo run --release --bin microbench 2>&1 | huniq | tee -a artifacts/bench_results.txt
cargo run --release --bin bench microbench throughput 2>&1 | huniq | tee -a artifacts/bench_results.txt
- name: retrieve benchmark results
id: get-comment-body
run: |
Expand Down
2 changes: 1 addition & 1 deletion network_utils/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use futures::StreamExt;
use tokio::task::JoinError;
use tokio::time;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct NetworkClient {
base_address: String,
base_port: u16,
Expand Down
12 changes: 6 additions & 6 deletions scripts/bench_sweep.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from string import Template

cmd_template = Template(
"../target/release/microbench_latency --period-us $period_us --chunk-size $chunk_size --num-chunks $num_chunks")
"../target/release/bench microbench latency --period-us $period_us --chunk-size $chunk_size --num-chunks $num_chunks")

def get_avg_latency(period_us, chunk_size, num_chunks):
cmd = cmd_template.substitute(
Expand All @@ -15,11 +15,11 @@ def get_avg_latency(period_us, chunk_size, num_chunks):
output, error = process.communicate()

resp = output.decode("utf-8")
res = ast.literal_eval(resp)

# Example output: `Average Latency 6577.06 us @ 100000 tps`
res = float(resp.split(" ")[2])
print(res)
# Pick upper half at steady state
res = res[len(res)//2:]
return sum(res)/len(res)
return res


def plot(vals):
Expand All @@ -32,7 +32,7 @@ def plot(vals):
lats = []
for i in range(10):
chunk_size = 200 * (i+1)
period_us = 10000
period_us = 1000
num_chunks = 10
thr = chunk_size*1000*1000/period_us
avg_lat_ms = get_avg_latency(period_us, chunk_size, num_chunks)/1000
Expand Down
8 changes: 2 additions & 6 deletions sui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,8 @@ reqwest = { version = "0.11.10", features=["json","serde_json", "blocking"]}
tracing-test = "0.2.1"

[[bin]]
name = "microbench"
path = "src/microbench.rs"

[[bin]]
name = "microbench_latency"
path = "src/microbench_latency.rs"
name = "bench"
path = "src/bench.rs"

[[bin]]
name = "wallet"
Expand Down
22 changes: 22 additions & 0 deletions sui/src/bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#![deny(warnings)]

use clap::*;

use sui::benchmark::{bench_types, run_benchmark};
use tracing::subscriber::set_global_default;
use tracing_subscriber::EnvFilter;

fn main() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the input parameters that this function is expecting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses CLI args. Am I supposed to specify something here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where are the cli arguments defined?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let subscriber_builder =
tracing_subscriber::fmt::Subscriber::builder().with_env_filter(env_filter);
let subscriber = subscriber_builder.with_writer(std::io::stderr).finish();
set_global_default(subscriber).expect("Failed to set subscriber");
let benchmark = bench_types::Benchmark::parse();

let r = run_benchmark(benchmark);
println!("{}", r);
}
267 changes: 267 additions & 0 deletions sui/src/benchmark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

#![deny(warnings)]

use futures::{join, StreamExt};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use std::thread;
use std::{thread::sleep, time::Duration};
use sui_core::authority_client::AuthorityClient;
use sui_network::network::{NetworkClient, NetworkServer};
use sui_types::batch::UpdateItem;
use sui_types::messages::{BatchInfoRequest, BatchInfoResponseItem};
use sui_types::serialize::*;
use tokio::runtime::{Builder, Runtime};
use tracing::*;

pub mod bench_types;
pub mod load_generator;
pub mod transaction_creator;
use crate::benchmark::bench_types::{Benchmark, BenchmarkType};
use crate::benchmark::load_generator::{
calculate_throughput, check_transaction_response, send_tx_chunks, spawn_authority_server,
FixedRateLoadGenerator,
};
use crate::benchmark::transaction_creator::TransactionCreator;

use self::bench_types::{BenchmarkResult, MicroBenchmarkResult, MicroBenchmarkType};

const FOLLOWER_BATCH_SIZE: u64 = 10_000;

pub fn run_benchmark(benchmark: Benchmark) -> BenchmarkResult {
// Only microbenchmark support is supported
BenchmarkResult::MicroBenchmark(run_microbenchmark(benchmark))
}

fn run_microbenchmark(benchmark: Benchmark) -> MicroBenchmarkResult {
let (host, port, type_) = match benchmark.bench_type {
BenchmarkType::MicroBenchmark { host, port, type_ } => (host, port, type_),
};

let network_client = NetworkClient::new(
host.clone(),
port,
benchmark.buffer_size,
Duration::from_micros(benchmark.send_timeout_us),
Duration::from_micros(benchmark.recv_timeout_us),
);
let network_server = NetworkServer::new(host, port, benchmark.buffer_size);
let connections = if benchmark.tcp_connections > 0 {
benchmark.tcp_connections
} else {
num_cpus::get()
};

match type_ {
MicroBenchmarkType::Throughput { num_transactions } => run_throughout_microbench(
network_client,
network_server,
connections,
benchmark.batch_size,
benchmark.use_move,
num_transactions,
benchmark.committee_size,
benchmark.db_cpus,
),
MicroBenchmarkType::Latency {
num_chunks,
chunk_size,
period_us,
} => run_latency_microbench(
network_client,
network_server,
connections,
benchmark.use_move,
benchmark.committee_size,
benchmark.db_cpus,
num_chunks,
chunk_size,
period_us,
),
}
}

fn run_throughout_microbench(
network_client: NetworkClient,
network_server: NetworkServer,
connections: usize,
batch_size: usize,
use_move: bool,
num_transactions: usize,
committee_size: usize,
db_cpus: usize,
) -> MicroBenchmarkResult {
assert_eq!(
num_transactions % batch_size,
0,
"num_transactions must integer divide batch_size",
);
// In order to simplify things, we send chunks on each connection and try to ensure all connections have equal load
assert!(
(num_transactions % connections) == 0,
"num_transactions must {} be multiple of number of TCP connections {}",
num_transactions,
connections
);
let mut tx_cr = TransactionCreator::new(committee_size, db_cpus);

let chunk_size = batch_size * connections;
let txes = tx_cr.generate_transactions(
connections,
use_move,
batch_size * connections,
num_transactions / chunk_size,
);

// Make multi-threaded runtime for the authority
thread::spawn(move || {
get_multithread_runtime().block_on(async move {
let server = spawn_authority_server(network_server, tx_cr.authority_state).await;
if let Err(e) = server.join().await {
error!("Server ended with an error: {e}");
}
});
});

// Wait for server start
sleep(Duration::from_secs(3));

// Follower to observe batches
let follower_network_client = network_client.clone();
thread::spawn(move || {
get_multithread_runtime()
.block_on(async move { run_follower(follower_network_client).await });
});

sleep(Duration::from_secs(3));

// Run load
let (elapsed, resp) = get_multithread_runtime()
.block_on(async move { send_tx_chunks(txes, network_client, connections).await });

let _: Vec<_> = resp
.par_iter()
.map(|q| check_transaction_response(deserialize_message(&(q.as_ref().unwrap())[..])))
.collect();
MicroBenchmarkResult::Throughput {
chunk_throughput: calculate_throughput(num_transactions, elapsed),
}
}

fn run_latency_microbench(
network_client: NetworkClient,
network_server: NetworkServer,
connections: usize,
use_move: bool,
committee_size: usize,
db_cpus: usize,

num_chunks: usize,
chunk_size: usize,
period_us: u64,
) -> MicroBenchmarkResult {
// In order to simplify things, we send chunks on each connection and try to ensure all connections have equal load
assert!(
(num_chunks * chunk_size % connections) == 0,
"num_transactions must {} be multiple of number of TCP connections {}",
num_chunks * chunk_size,
connections
);
let mut tx_cr = TransactionCreator::new(committee_size, db_cpus);

// These TXes are to load the network
let load_gen_txes = tx_cr.generate_transactions(connections, use_move, chunk_size, num_chunks);

// These are tracer TXes used for measuring latency
let tracer_txes = tx_cr.generate_transactions(1, use_move, 1, num_chunks);

// Make multi-threaded runtime for the authority
thread::spawn(move || {
get_multithread_runtime().block_on(async move {
let server = spawn_authority_server(network_server, tx_cr.authority_state).await;
if let Err(e) = server.join().await {
error!("Server ended with an error: {e}");
}
});
});

// Wait for server start
sleep(Duration::from_secs(3));
let runtime = get_multithread_runtime();
// Prep the generators
let (mut load_gen, mut tracer_gen) = runtime.block_on(async move {
join!(
FixedRateLoadGenerator::new(
load_gen_txes,
period_us,
network_client.clone(),
connections,
),
FixedRateLoadGenerator::new(tracer_txes, period_us, network_client, 1),
)
});

// Run the load gen and tracers
let (load_latencies, tracer_latencies) =
runtime.block_on(async move { join!(load_gen.start(), tracer_gen.start()) });

MicroBenchmarkResult::Latency {
load_chunk_size: chunk_size,
load_latencies,
tick_period_us: period_us as usize,
chunk_latencies: tracer_latencies,
}
}

async fn run_follower(network_client: NetworkClient) {
// We spawn a second client that listens to the batch interface
let _batch_client_handle = tokio::task::spawn(async move {
let authority_client = AuthorityClient::new(network_client);

let mut start = 0;

loop {
let receiver = authority_client
.handle_batch_streaming_as_stream(BatchInfoRequest {
start,
end: start + FOLLOWER_BATCH_SIZE,
})
.await;

if let Err(e) = &receiver {
error!("Listener error: {:?}", e);
break;
}
let mut receiver = receiver.unwrap();

info!("Start batch listener at sequence: {}.", start);
while let Some(item) = receiver.next().await {
match item {
Ok(BatchInfoResponseItem(UpdateItem::Transaction((_tx_seq, _tx_digest)))) => {
start = _tx_seq + 1;
}
Ok(BatchInfoResponseItem(UpdateItem::Batch(_signed_batch))) => {
info!(
"Client received batch up to sequence {}",
_signed_batch.batch.next_sequence_number
);
}
Err(err) => {
error!("{:?}", err);
break;
}
}
}
}
});
}

fn get_multithread_runtime() -> Runtime {
Builder::new_multi_thread()
.enable_all()
.thread_stack_size(32 * 1024 * 1024)
.worker_threads(usize::min(num_cpus::get(), 24))
.build()
.unwrap()
}
Loading