1
1
use anyhow:: Result ;
2
2
use imbl:: OrdMap ;
3
- use rand:: prelude:: * ;
3
+ use rand:: { distributions :: WeightedIndex , prelude:: * } ;
4
4
use std:: time:: Duration ;
5
5
use tempfile:: TempDir ;
6
6
use tokio:: time:: { error:: Elapsed , timeout} ;
@@ -28,6 +28,9 @@ struct Biases {
28
28
/// When generating a key, whether it should be one that was appeared somewhere or a brand new
29
29
/// key.
30
30
new_key : f64 ,
31
+ /// Distribution used when generating a new key to decide how many bytes needs to be shared
32
+ /// with an already existing key.
33
+ new_key_distribution : WeightedIndex < usize > ,
31
34
/// When executing a workload iteration ,this is the probability of executing a rollback.
32
35
rollback : f64 ,
33
36
/// When executing a commit this is the probability of causing it to crash.
@@ -45,13 +48,30 @@ impl Biases {
45
48
commit_crash : u8 ,
46
49
rollback_crash : u8 ,
47
50
) -> Self {
51
+ // When generating a new key to be inserted in the database,
52
+ // this distribution will generate the key.
53
+ // There is a 25% chance that the key is completely random,
54
+ // half of the 25% chance that the first byte will be shared with an existing key,
55
+ // one third of the 25% chance that two bytes will be shared with an existing key,
56
+ // and so on.
57
+ //
58
+ // There are:
59
+ // + 25% probability of having a key with 0 shared bytes.
60
+ // + 48% probability of having a key with 1 to 9 shared bytes.
61
+ // + 27% probability of having a key with more than 10 shared bytes.
62
+ //
63
+ // UNWRAP: provided iterator is not empty, no item is lower than zero
64
+ // and the total sum is greater than one.
65
+ let new_key_distribution = WeightedIndex :: new ( ( 1usize ..33 ) . map ( |x| ( 32 * 32 ) / x) ) . unwrap ( ) ;
66
+
48
67
Self {
49
68
delete : ( delete as f64 ) / 100.0 ,
50
69
overflow : ( overflow as f64 ) / 100.0 ,
51
70
new_key : ( new_key as f64 ) / 100.0 ,
52
71
rollback : ( rollback as f64 ) / 100.0 ,
53
72
commit_crash : ( commit_crash as f64 ) / 100.0 ,
54
73
rollback_crash : ( rollback_crash as f64 ) / 100.0 ,
74
+ new_key_distribution,
55
75
}
56
76
}
57
77
}
@@ -138,10 +158,8 @@ impl WorkloadState {
138
158
139
159
/// Returns a KeyValueChange with a new key, a deleted or a modified one.
140
160
fn gen_key_value_change ( & mut self ) -> KeyValueChange {
141
- // TODO: sophisticated key generation.
142
- //
143
- // - Pick a key that was already generated before, but generate a key that shares some bits.
144
161
let mut key = [ 0 ; 32 ] ;
162
+ // Generate a Delete KeyValueChange
145
163
if !self . committed . state . is_empty ( ) && self . rng . gen_bool ( self . biases . delete ) {
146
164
loop {
147
165
self . rng . fill_bytes ( & mut key) ;
@@ -151,15 +169,31 @@ impl WorkloadState {
151
169
}
152
170
}
153
171
154
- if self . committed . state . is_empty ( ) || self . rng . gen_bool ( self . biases . new_key ) {
172
+ // Generate a new key KeyValueChange
173
+ if self . committed . state . is_empty ( ) {
174
+ self . rng . fill_bytes ( & mut key) ;
175
+ return KeyValueChange :: Insert ( key, self . gen_value ( ) ) ;
176
+ }
177
+
178
+ if self . rng . gen_bool ( self . biases . new_key ) {
155
179
loop {
156
180
self . rng . fill_bytes ( & mut key) ;
181
+
182
+ let Some ( next_key) = self . committed . state . get_next ( & key) . map ( |( k, _) | * k) else {
183
+ continue ;
184
+ } ;
185
+
186
+ let common_bytes =
187
+ self . rng . sample ( self . biases . new_key_distribution . clone ( ) ) as usize ;
188
+ key[ ..common_bytes] . copy_from_slice ( & next_key[ ..common_bytes] ) ;
189
+
157
190
if !self . committed . state . contains_key ( & key) {
158
191
return KeyValueChange :: Insert ( key, self . gen_value ( ) ) ;
159
192
}
160
193
}
161
194
}
162
195
196
+ // Generate an update KeyValueChange
163
197
loop {
164
198
self . rng . fill_bytes ( & mut key) ;
165
199
if let Some ( ( next_key, _) ) = self . committed . state . get_next ( & key) {
@@ -169,14 +203,12 @@ impl WorkloadState {
169
203
}
170
204
171
205
fn gen_value ( & mut self ) -> Vec < u8 > {
172
- // TODO: sophisticated value generation.
173
- //
174
- // - Different power of two sizes.
175
- // - Change it to be a non-even.
206
+ // MAX_LEAF_VALUE_SIZE is 1332,
207
+ // thus every value size bigger than this will create an overflow value.
176
208
let len = if self . rng . gen_bool ( self . biases . overflow ) {
177
- 32 * 1024
209
+ self . rng . gen_range ( 1333 .. 32 * 1024 )
178
210
} else {
179
- 32
211
+ self . rng . gen_range ( 1 .. 1333 )
180
212
} ;
181
213
let mut value = vec ! [ 0 ; len] ;
182
214
self . rng . fill_bytes ( & mut value) ;
@@ -224,7 +256,7 @@ pub struct Workload {
224
256
/// Whether to randomly sample the state after every crash or rollback.
225
257
sample_snapshot : bool ,
226
258
/// The max number of commits involved in a rollback.
227
- max_rollback_commits : usize ,
259
+ max_rollback_commits : u32 ,
228
260
/// If `Some` there are rollbacks waiting to be applied.
229
261
scheduled_rollbacks : ScheduledRollbacks ,
230
262
}
@@ -444,16 +476,8 @@ impl Workload {
444
476
Ok ( ( ) )
445
477
}
446
478
447
- fn commits_to_rollback ( & mut self ) -> usize {
448
- // TODO: n_commits should also depend on the max rollback supported by NOMT.
449
- std:: cmp:: min (
450
- self . state . rng . gen_range ( 1 ..self . max_rollback_commits ) as usize ,
451
- self . state . committed . sync_seqn as usize ,
452
- )
453
- }
454
-
455
479
async fn schedule_rollback ( & mut self , should_crash : bool ) -> anyhow:: Result < ( ) > {
456
- let n_commits_to_rollback = self . commits_to_rollback ( ) ;
480
+ let n_commits_to_rollback = self . state . rng . gen_range ( 1 .. self . max_rollback_commits ) as usize ;
457
481
if n_commits_to_rollback == 0 {
458
482
trace ! ( "No available commits to perform rollback with" ) ;
459
483
return Ok ( ( ) ) ;
@@ -582,7 +606,6 @@ impl Workload {
582
606
let agent_died_or_timeout = timeout ( TOLERANCE , agent. died ( ) ) . await ;
583
607
self . agent . take ( ) . unwrap ( ) . teardown ( ) . await ;
584
608
if let Err ( Elapsed { .. } ) = agent_died_or_timeout {
585
- // TODO: flag for investigation.
586
609
return Err ( anyhow:: anyhow!( "agent did not die" ) ) ;
587
610
}
588
611
@@ -729,7 +752,11 @@ impl Workload {
729
752
assert ! ( self . agent. is_none( ) ) ;
730
753
controller:: spawn_agent_into ( & mut self . agent ) . await ?;
731
754
let workdir = self . workdir . path ( ) . display ( ) . to_string ( ) ;
732
- let rollback = self . state . biases . rollback > 0.0 ;
755
+ let rollback = if self . state . biases . rollback > 0.0 {
756
+ Some ( self . max_rollback_commits )
757
+ } else {
758
+ None
759
+ } ;
733
760
self . agent
734
761
. as_mut ( )
735
762
. unwrap ( )
0 commit comments