Skip to content

Commit 42a45c2

Browse files
committed
feat(torture): add rollback
1 parent 0e1eb30 commit 42a45c2

File tree

6 files changed

+108
-29
lines changed

6 files changed

+108
-29
lines changed

torture/src/agent.rs

+17
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,17 @@ pub async fn run(input: UnixStream) -> Result<()> {
110110
})
111111
.await?;
112112
}
113+
ToAgent::Rollback(n_blocks) => {
114+
let start = std::time::Instant::now();
115+
agent.rollback(n_blocks)?;
116+
tracing::info!("rollback took {:?}", start.elapsed().as_millis());
117+
stream
118+
.send(Envelope {
119+
reqno,
120+
message: ToSupervisor::Ack,
121+
})
122+
.await?;
123+
}
113124
ToAgent::Query(key) => {
114125
let value = agent.query(key)?;
115126
stream
@@ -189,6 +200,7 @@ impl Agent {
189200
o.path(workdir.join("nomt_db"));
190201
o.bitbox_seed(init.bitbox_seed);
191202
o.hashtable_buckets(500_000);
203+
o.rollback(init.rollback);
192204
let nomt = Nomt::open(o)?;
193205
Ok(Self {
194206
workdir,
@@ -217,6 +229,11 @@ impl Agent {
217229
Ok(())
218230
}
219231

232+
fn rollback(&mut self, n_blocks: usize) -> Result<()> {
233+
self.nomt.rollback(n_blocks)?;
234+
Ok(())
235+
}
236+
220237
fn query(&mut self, key: message::Key) -> Result<Option<message::Value>> {
221238
let value = self.nomt.read(key)?;
222239
Ok(value)

torture/src/message.rs

+5
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ pub struct InitPayload {
4646
///
4747
/// Only used upon creation a new NOMT db.
4848
pub bitbox_seed: [u8; 16],
49+
/// Whether the agent is supposed to handle rollbacks.
50+
pub rollback: bool,
4951
}
5052

5153
/// The supervisor sends this message to the child process to indicate that the child should
@@ -85,6 +87,9 @@ pub enum ToAgent {
8587
/// The supervisor sends this message to the child process to indicate that the child should
8688
/// commit.
8789
Commit(CommitPayload),
90+
/// The supervisor sends this message to the child process to indicate that the child should
91+
/// perform a rollback of the amount of specified blocks.
92+
Rollback(usize),
8893
/// The supervisor sends this message to the child process to query the value of a given key.
8994
Query(Key),
9095
/// The supervisor sends this message to the child process to query the current sequence number

torture/src/supervisor/cli.rs

+15-2
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ pub struct WorkloadParams {
2222
/// The probability of a delete operation as opposed to an insert operation.
2323
///
2424
/// Accepted values are in the range of 0 to 100
25-
#[clap(default_value = "1")]
25+
#[clap(default_value = "10")]
2626
#[clap(value_parser=clap::value_parser!(u8).range(0..=100))]
2727
#[arg(long = "workload-delete-bias", short = 'd')]
2828
pub delete: u8,
@@ -31,7 +31,7 @@ pub struct WorkloadParams {
3131
/// overflow pages.
3232
///
3333
/// Accepted values are in the range of 0 to 100
34-
#[clap(default_value = "1")]
34+
#[clap(default_value = "10")]
3535
#[clap(value_parser=clap::value_parser!(u8).range(0..=100))]
3636
#[arg(long = "workload-overflow-bias", short = 'o')]
3737
pub overflow: u8,
@@ -71,6 +71,19 @@ pub struct WorkloadParams {
7171
#[arg(long = "workload-commit-crash")]
7272
pub crash: u8,
7373

74+
/// Instead of exercising a new commit, this is a probability of executing a rollback.
75+
///
76+
/// Accepted values are in the range of 0 to 100
77+
#[clap(default_value = "5")]
78+
#[clap(value_parser=clap::value_parser!(u8).range(0..=100))]
79+
#[arg(long = "workload-rollback")]
80+
pub rollback: u8,
81+
82+
/// The max amount of blocks involved in a rollback.
83+
#[clap(default_value = "10")]
84+
#[arg(long = "workload-max-rollback-blocks")]
85+
pub max_rollback_blocks: usize,
86+
7487
/// Whether to ensure the correct application of the changest after every commit.
7588
#[clap(default_value = "false")]
7689
#[arg(long = "ensure-changeset")]

torture/src/supervisor/comms.rs

-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use tokio::{
2222
use tokio_serde::{formats::SymmetricalBincode, SymmetricallyFramed};
2323
use tokio_stream::StreamExt;
2424
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
25-
use tracing::trace;
2625

2726
use crate::message::{self, Envelope, ToAgent, ToSupervisor, MAX_ENVELOPE_SIZE};
2827

torture/src/supervisor/controller.rs

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ impl SpawnedAgentController {
3333
workdir: String,
3434
workload_id: u64,
3535
bitbox_seed: [u8; 16],
36+
rollback: bool,
3637
) -> Result<()> {
3738
// Assign a unique ID to the agent.
3839
static AGENT_COUNT: AtomicUsize = AtomicUsize::new(0);
@@ -43,6 +44,7 @@ impl SpawnedAgentController {
4344
id,
4445
workdir,
4546
bitbox_seed,
47+
rollback,
4648
}))
4749
.await?;
4850
Ok(())

torture/src/supervisor/workload.rs

+69-26
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,18 @@ struct Biases {
3030
new_key: f64,
3131
/// When exercising a new commit, the probability of causing it to crash.
3232
crash: f64,
33+
/// Instead of exercising a new commit, this is a probability of executing a rollback.
34+
rollback: f64,
3335
}
3436

3537
impl Biases {
36-
fn new(delete: u8, overflow: u8, new_key: u8, crash: u8) -> Self {
38+
fn new(delete: u8, overflow: u8, new_key: u8, crash: u8, rollback: u8) -> Self {
3739
Self {
3840
delete: (delete as f64) / 100.0,
3941
overflow: (overflow as f64) / 100.0,
4042
new_key: (new_key as f64) / 100.0,
4143
crash: (crash as f64) / 100.0,
44+
rollback: (rollback as f64) / 100.0,
4245
}
4346
}
4447
}
@@ -68,8 +71,8 @@ struct WorkloadState {
6871
/// If true, the size of each commit will be within 0 and self.size,
6972
/// otherwise it will always be workload-size.
7073
random_size: bool,
71-
/// The values that were committed.
72-
committed: Snapshot,
74+
/// All committed key values, divided in Snapshots.
75+
committed: Vec<Snapshot>,
7376
}
7477

7578
impl WorkloadState {
@@ -79,7 +82,7 @@ impl WorkloadState {
7982
biases,
8083
size,
8184
random_size,
82-
committed: Snapshot::empty(),
85+
committed: vec![Snapshot::empty()],
8386
}
8487
}
8588

@@ -90,7 +93,7 @@ impl WorkloadState {
9093
}
9194

9295
fn gen_commit(&mut self) -> (Snapshot, Vec<KeyValueChange>) {
93-
let mut snapshot = self.committed.clone();
96+
let mut snapshot = self.last_snapshot().clone();
9497
snapshot.sync_seqn += 1;
9598

9699
let num_changes = if self.random_size {
@@ -129,27 +132,27 @@ impl WorkloadState {
129132
//
130133
// - Pick a key that was already generated before, but generate a key that shares some bits.
131134
let mut key = [0; 32];
132-
if !self.committed.state.is_empty() && self.rng.gen_bool(self.biases.delete) {
135+
if !self.last_snapshot().state.is_empty() && self.rng.gen_bool(self.biases.delete) {
133136
loop {
134137
self.rng.fill_bytes(&mut key);
135-
if let Some((next_key, Some(_))) = self.committed.state.get_next(&key) {
138+
if let Some((next_key, Some(_))) = self.last_snapshot().state.get_next(&key) {
136139
return KeyValueChange::Delete(*next_key);
137140
}
138141
}
139142
}
140143

141-
if self.committed.state.is_empty() || self.rng.gen_bool(self.biases.new_key) {
144+
if self.last_snapshot().state.is_empty() || self.rng.gen_bool(self.biases.new_key) {
142145
loop {
143146
self.rng.fill_bytes(&mut key);
144-
if !self.committed.state.contains_key(&key) {
147+
if !self.last_snapshot().state.contains_key(&key) {
145148
return KeyValueChange::Insert(key, self.gen_value());
146149
}
147150
}
148151
}
149152

150153
loop {
151154
self.rng.fill_bytes(&mut key);
152-
if let Some((next_key, _)) = self.committed.state.get_next(&key) {
155+
if let Some((next_key, _)) = self.last_snapshot().state.get_next(&key) {
153156
return KeyValueChange::Insert(*next_key, self.gen_value());
154157
}
155158
}
@@ -174,7 +177,12 @@ impl WorkloadState {
174177
}
175178

176179
fn commit(&mut self, snapshot: Snapshot) {
177-
self.committed = snapshot;
180+
self.committed.push(snapshot);
181+
}
182+
183+
fn last_snapshot(&self) -> &Snapshot {
184+
// UNWRAP: self.committed contains always at leas one empty snapshot.
185+
self.committed.last().unwrap()
178186
}
179187
}
180188

@@ -206,6 +214,8 @@ pub struct Workload {
206214
ensure_changeset: bool,
207215
/// Whether to ensure the correctness of the state after every crash.
208216
ensure_snapshot: bool,
217+
/// The max amount of blocks involved in a rollback.
218+
max_rollback_blocks: usize,
209219
}
210220

211221
impl Workload {
@@ -227,6 +237,7 @@ impl Workload {
227237
workload_params.overflow,
228238
workload_params.new_key,
229239
workload_params.crash,
240+
workload_params.rollback,
230241
);
231242
let mut state = WorkloadState::new(
232243
seed,
@@ -246,6 +257,7 @@ impl Workload {
246257
n_successfull_commit: 0,
247258
ensure_changeset: workload_params.ensure_changeset,
248259
ensure_snapshot: workload_params.ensure_snapshot,
260+
max_rollback_blocks: workload_params.max_rollback_blocks,
249261
}
250262
}
251263

@@ -277,19 +289,21 @@ impl Workload {
277289
async fn run_iteration(&mut self) -> Result<()> {
278290
let agent = self.agent.as_ref().unwrap();
279291
let rr = agent.rr().clone();
280-
// TODO: make the choice of the exercise more sophisticated.
281-
//
282-
// - commits should be much more frequent.
283-
// - crashes should be less frequent.
284-
// - rollbacks should be less frequent.
285-
let exercise_crash = self.state.rng.gen_bool(self.state.biases.crash);
286-
if exercise_crash {
287-
trace!("run_iteration, should crash");
288-
self.exercise_commit_crashing(&rr).await?;
289-
} else {
292+
293+
if self.state.rng.gen_bool(self.state.biases.rollback) {
290294
trace!("run_iteration");
291-
self.exercise_commit(&rr).await?;
295+
self.exercise_rollback(&rr).await?;
296+
return Ok(());
292297
}
298+
299+
if self.state.rng.gen_bool(self.state.biases.crash) {
300+
trace!("run_iteration");
301+
self.exercise_commit_crashing(&rr).await?;
302+
return Ok(());
303+
}
304+
305+
trace!("run_iteration");
306+
self.exercise_commit(&rr).await?;
293307
Ok(())
294308
}
295309

@@ -366,6 +380,7 @@ impl Workload {
366380
// possibilities of crashing during sync.
367381
crash_time = (crash_time as f64 * 0.98) as u64;
368382

383+
trace!("exercising crash");
369384
rr.send_request(crate::message::ToAgent::Commit(
370385
crate::message::CommitPayload {
371386
changeset: changeset.clone(),
@@ -421,7 +436,7 @@ impl Workload {
421436
KeyValueChange::Insert(key, _value) => {
422437
// The current value must be equal to the previous one.
423438
let current_value = rr.send_request_query(*key).await?;
424-
match self.state.committed.state.get(key) {
439+
match self.state.last_snapshot().state.get(key) {
425440
None | Some(None) if current_value.is_some() => {
426441
return Err(anyhow::anyhow!("New inserted item should not be present"));
427442
}
@@ -435,7 +450,7 @@ impl Workload {
435450
}
436451
KeyValueChange::Delete(key) => {
437452
// UNWRAP: Non existing keys are not deleted.
438-
let prev_value = self.state.committed.state.get(key).unwrap();
453+
let prev_value = self.state.last_snapshot().state.get(key).unwrap();
439454
assert!(prev_value.is_some());
440455
if rr.send_request_query(*key).await?.as_ref() != prev_value.as_ref() {
441456
return Err(anyhow::anyhow!(
@@ -453,7 +468,13 @@ impl Workload {
453468
return Ok(());
454469
}
455470

456-
for (i, (key, expected_value)) in (self.state.committed.state.iter()).enumerate() {
471+
if dbg!(self.state.last_snapshot().sync_seqn) != dbg!(rr.send_query_sync_seqn().await?) {
472+
return Err(anyhow::anyhow!(
473+
"Unexpected sync_seqn while ensuring snapshot validity"
474+
));
475+
}
476+
477+
for (i, (key, expected_value)) in (self.state.last_snapshot().state.iter()).enumerate() {
457478
let value = rr.send_request_query(*key).await?;
458479
if &value != expected_value {
459480
return Err(anyhow::anyhow!(
@@ -468,14 +489,36 @@ impl Workload {
468489
Ok(())
469490
}
470491

492+
async fn exercise_rollback(&mut self, rr: &comms::RequestResponse) -> anyhow::Result<()> {
493+
// TODO: n_blocks should also depend on the max rollback supported by nomt.
494+
let mut n_blocks = self.state.rng.gen_range(1..self.max_rollback_blocks) as usize;
495+
let blocks_avaiable_to_rollback = self.state.committed.len() - 1;
496+
n_blocks = std::cmp::min(blocks_avaiable_to_rollback, n_blocks);
497+
let last_sync_seqn = self.state.last_snapshot().sync_seqn;
498+
self.state
499+
.committed
500+
.truncate(self.state.committed.len() - n_blocks);
501+
// The application of a rollback counts as increased sync_seq.
502+
self.state.committed.last_mut().unwrap().sync_seqn = last_sync_seqn + 1;
503+
504+
trace!("performing rollback of {} blocks", n_blocks);
505+
rr.send_request(crate::message::ToAgent::Rollback(n_blocks))
506+
.await?;
507+
508+
self.ensure_snapshot_validity(rr).await?;
509+
510+
Ok(())
511+
}
512+
471513
async fn spawn_new_agent(&mut self) -> anyhow::Result<()> {
472514
assert!(self.agent.is_none());
473515
controller::spawn_agent_into(&mut self.agent).await?;
474516
let workdir = self.workdir.path().display().to_string();
517+
let rollback = self.state.biases.rollback > 0.0;
475518
self.agent
476519
.as_mut()
477520
.unwrap()
478-
.init(workdir, self.workload_id, self.bitbox_seed)
521+
.init(workdir, self.workload_id, self.bitbox_seed, dbg!(rollback))
479522
.await?;
480523

481524
self.ensure_snapshot_validity(self.agent.as_ref().unwrap().rr())

0 commit comments

Comments
 (0)