@@ -30,15 +30,18 @@ struct Biases {
30
30
new_key : f64 ,
31
31
/// When exercising a new commit, the probability of causing it to crash.
32
32
crash : f64 ,
33
+ /// Instead of exercising a new commit, this is a probability of executing a rollback.
34
+ rollback : f64 ,
33
35
}
34
36
35
37
impl Biases {
36
- fn new ( delete : u8 , overflow : u8 , new_key : u8 , crash : u8 ) -> Self {
38
+ fn new ( delete : u8 , overflow : u8 , new_key : u8 , crash : u8 , rollback : u8 ) -> Self {
37
39
Self {
38
40
delete : ( delete as f64 ) / 100.0 ,
39
41
overflow : ( overflow as f64 ) / 100.0 ,
40
42
new_key : ( new_key as f64 ) / 100.0 ,
41
43
crash : ( crash as f64 ) / 100.0 ,
44
+ rollback : ( rollback as f64 ) / 100.0 ,
42
45
}
43
46
}
44
47
}
@@ -68,8 +71,8 @@ struct WorkloadState {
68
71
/// If true, the size of each commit will be within 0 and self.size,
69
72
/// otherwise it will always be workload-size.
70
73
random_size : bool ,
71
- /// The values that were committed .
72
- committed : Snapshot ,
74
+ /// All committed key values, divided in Snapshots .
75
+ committed : Vec < Snapshot > ,
73
76
}
74
77
75
78
impl WorkloadState {
@@ -79,7 +82,7 @@ impl WorkloadState {
79
82
biases,
80
83
size,
81
84
random_size,
82
- committed : Snapshot :: empty ( ) ,
85
+ committed : vec ! [ Snapshot :: empty( ) ] ,
83
86
}
84
87
}
85
88
@@ -90,7 +93,7 @@ impl WorkloadState {
90
93
}
91
94
92
95
fn gen_commit ( & mut self ) -> ( Snapshot , Vec < KeyValueChange > ) {
93
- let mut snapshot = self . committed . clone ( ) ;
96
+ let mut snapshot = self . last_snapshot ( ) . clone ( ) ;
94
97
snapshot. sync_seqn += 1 ;
95
98
96
99
let num_changes = if self . random_size {
@@ -129,27 +132,27 @@ impl WorkloadState {
129
132
//
130
133
// - Pick a key that was already generated before, but generate a key that shares some bits.
131
134
let mut key = [ 0 ; 32 ] ;
132
- if !self . committed . state . is_empty ( ) && self . rng . gen_bool ( self . biases . delete ) {
135
+ if !self . last_snapshot ( ) . state . is_empty ( ) && self . rng . gen_bool ( self . biases . delete ) {
133
136
loop {
134
137
self . rng . fill_bytes ( & mut key) ;
135
- if let Some ( ( next_key, Some ( _) ) ) = self . committed . state . get_next ( & key) {
138
+ if let Some ( ( next_key, Some ( _) ) ) = self . last_snapshot ( ) . state . get_next ( & key) {
136
139
return KeyValueChange :: Delete ( * next_key) ;
137
140
}
138
141
}
139
142
}
140
143
141
- if self . committed . state . is_empty ( ) || self . rng . gen_bool ( self . biases . new_key ) {
144
+ if self . last_snapshot ( ) . state . is_empty ( ) || self . rng . gen_bool ( self . biases . new_key ) {
142
145
loop {
143
146
self . rng . fill_bytes ( & mut key) ;
144
- if !self . committed . state . contains_key ( & key) {
147
+ if !self . last_snapshot ( ) . state . contains_key ( & key) {
145
148
return KeyValueChange :: Insert ( key, self . gen_value ( ) ) ;
146
149
}
147
150
}
148
151
}
149
152
150
153
loop {
151
154
self . rng . fill_bytes ( & mut key) ;
152
- if let Some ( ( next_key, _) ) = self . committed . state . get_next ( & key) {
155
+ if let Some ( ( next_key, _) ) = self . last_snapshot ( ) . state . get_next ( & key) {
153
156
return KeyValueChange :: Insert ( * next_key, self . gen_value ( ) ) ;
154
157
}
155
158
}
@@ -176,7 +179,12 @@ impl WorkloadState {
176
179
}
177
180
178
181
fn commit ( & mut self , snapshot : Snapshot ) {
179
- self . committed = snapshot;
182
+ self . committed . push ( snapshot) ;
183
+ }
184
+
185
+ fn last_snapshot ( & self ) -> & Snapshot {
186
+ // UNWRAP: self.committed contains always at least one empty snapshot.
187
+ self . committed . last ( ) . unwrap ( )
180
188
}
181
189
}
182
190
@@ -208,6 +216,8 @@ pub struct Workload {
208
216
ensure_changeset : bool ,
209
217
/// Whether to ensure the correctness of the state after every crash.
210
218
ensure_snapshot : bool ,
219
+ /// The max number of blocks involved in a rollback.
220
+ max_rollback_blocks : usize ,
211
221
}
212
222
213
223
impl Workload {
@@ -229,6 +239,7 @@ impl Workload {
229
239
workload_params. overflow ,
230
240
workload_params. new_key ,
231
241
workload_params. crash ,
242
+ workload_params. rollback ,
232
243
) ;
233
244
let mut state = WorkloadState :: new (
234
245
seed,
@@ -248,6 +259,7 @@ impl Workload {
248
259
n_successfull_commit : 0 ,
249
260
ensure_changeset : workload_params. ensure_changeset ,
250
261
ensure_snapshot : workload_params. ensure_snapshot ,
262
+ max_rollback_blocks : workload_params. max_rollback_blocks ,
251
263
}
252
264
}
253
265
@@ -279,19 +291,19 @@ impl Workload {
279
291
async fn run_iteration ( & mut self ) -> Result < ( ) > {
280
292
let agent = self . agent . as_ref ( ) . unwrap ( ) ;
281
293
let rr = agent. rr ( ) . clone ( ) ;
282
- // TODO: make the choice of the exercise more sophisticated.
283
- //
284
- // - commits should be much more frequent.
285
- // - crashes should be less frequent.
286
- // - rollbacks should be less frequent.
287
- let exercise_crash = self . state . rng . gen_bool ( self . state . biases . crash ) ;
288
- if exercise_crash {
289
- trace ! ( "run_iteration, should crash" ) ;
294
+ trace ! ( "run_iteration" ) ;
295
+
296
+ if self . state . rng . gen_bool ( self . state . biases . rollback ) {
297
+ self . exercise_rollback ( & rr ) . await ? ;
298
+ return Ok ( ( ) ) ;
299
+ }
300
+
301
+ if self . state . rng . gen_bool ( self . state . biases . crash ) {
290
302
self . exercise_commit_crashing ( & rr) . await ?;
291
- } else {
292
- trace ! ( "run_iteration" ) ;
293
- self . exercise_commit ( & rr) . await ?;
303
+ return Ok ( ( ) ) ;
294
304
}
305
+
306
+ self . exercise_commit ( & rr) . await ?;
295
307
Ok ( ( ) )
296
308
}
297
309
@@ -368,6 +380,7 @@ impl Workload {
368
380
// possibilities of crashing during sync.
369
381
crash_time = ( crash_time as f64 * 0.98 ) as u64 ;
370
382
383
+ trace ! ( "exercising crash" ) ;
371
384
rr. send_request ( crate :: message:: ToAgent :: Commit (
372
385
crate :: message:: CommitPayload {
373
386
changeset : changeset. clone ( ) ,
@@ -423,7 +436,7 @@ impl Workload {
423
436
KeyValueChange :: Insert ( key, _value) => {
424
437
// The current value must be equal to the previous one.
425
438
let current_value = rr. send_request_query ( * key) . await ?;
426
- match self . state . committed . state . get ( key) {
439
+ match self . state . last_snapshot ( ) . state . get ( key) {
427
440
None | Some ( None ) if current_value. is_some ( ) => {
428
441
return Err ( anyhow:: anyhow!( "New inserted item should not be present" ) ) ;
429
442
}
@@ -437,7 +450,7 @@ impl Workload {
437
450
}
438
451
KeyValueChange :: Delete ( key) => {
439
452
// UNWRAP: Non existing keys are not deleted.
440
- let prev_value = self . state . committed . state . get ( key) . unwrap ( ) ;
453
+ let prev_value = self . state . last_snapshot ( ) . state . get ( key) . unwrap ( ) ;
441
454
assert ! ( prev_value. is_some( ) ) ;
442
455
if rr. send_request_query ( * key) . await ?. as_ref ( ) != prev_value. as_ref ( ) {
443
456
return Err ( anyhow:: anyhow!(
@@ -455,7 +468,13 @@ impl Workload {
455
468
return Ok ( ( ) ) ;
456
469
}
457
470
458
- for ( i, ( key, expected_value) ) in ( self . state . committed . state . iter ( ) ) . enumerate ( ) {
471
+ if self . state . last_snapshot ( ) . sync_seqn != rr. send_query_sync_seqn ( ) . await ? {
472
+ return Err ( anyhow:: anyhow!(
473
+ "Unexpected sync_seqn while ensuring snapshot validity"
474
+ ) ) ;
475
+ }
476
+
477
+ for ( i, ( key, expected_value) ) in ( self . state . last_snapshot ( ) . state . iter ( ) ) . enumerate ( ) {
459
478
let value = rr. send_request_query ( * key) . await ?;
460
479
if & value != expected_value {
461
480
return Err ( anyhow:: anyhow!(
@@ -470,14 +489,43 @@ impl Workload {
470
489
Ok ( ( ) )
471
490
}
472
491
492
+ async fn exercise_rollback ( & mut self , rr : & comms:: RequestResponse ) -> anyhow:: Result < ( ) > {
493
+ // TODO: n_blocks should also depend on the max rollback supported by NOMT.
494
+ let n_snapshots = self . state . committed . len ( ) ;
495
+ let n_blocks_to_rollback = std:: cmp:: min (
496
+ self . state . rng . gen_range ( 1 ..self . max_rollback_blocks ) as usize ,
497
+ n_snapshots - 1 ,
498
+ ) ;
499
+
500
+ let last_sync_seqn = self . state . last_snapshot ( ) . sync_seqn ;
501
+ self . state
502
+ . committed
503
+ . truncate ( n_snapshots - n_blocks_to_rollback) ;
504
+ // The application of a rollback counts as increased sync_seq.
505
+ self . state . committed . last_mut ( ) . unwrap ( ) . sync_seqn = last_sync_seqn + 1 ;
506
+
507
+ if n_blocks_to_rollback == 0 {
508
+ trace ! ( "No available blocks to perform rollback with" ) ;
509
+ return Ok ( ( ) ) ;
510
+ }
511
+
512
+ trace ! ( "exercising rollback of {} blocks" , n_blocks_to_rollback) ;
513
+ rr. send_request ( crate :: message:: ToAgent :: Rollback ( n_blocks_to_rollback) )
514
+ . await ?;
515
+
516
+ self . ensure_snapshot_validity ( rr) . await ?;
517
+ Ok ( ( ) )
518
+ }
519
+
473
520
async fn spawn_new_agent ( & mut self ) -> anyhow:: Result < ( ) > {
474
521
assert ! ( self . agent. is_none( ) ) ;
475
522
controller:: spawn_agent_into ( & mut self . agent ) . await ?;
476
523
let workdir = self . workdir . path ( ) . display ( ) . to_string ( ) ;
524
+ let rollback = self . state . biases . rollback > 0.0 ;
477
525
self . agent
478
526
. as_mut ( )
479
527
. unwrap ( )
480
- . init ( workdir, self . workload_id , self . bitbox_seed )
528
+ . init ( workdir, self . workload_id , self . bitbox_seed , rollback )
481
529
. await ?;
482
530
483
531
self . ensure_snapshot_validity ( self . agent . as_ref ( ) . unwrap ( ) . rr ( ) )
0 commit comments