Skip to content

Commit 8dbaf54

Browse files
committed
add compaction tests and fix bugs in compaction
Signed-off-by: Alex Chi <iskyzh@gmail.com>
1 parent 971d0b1 commit 8dbaf54

23 files changed

+375
-38
lines changed

.config/nextest.toml

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[profile.default]
2+
slow-timeout = { period = "10s", terminate-after = 3 }

mini-lsm-book/src/week2-01-compaction.md

+7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,13 @@ In this chapter, you will:
88
* Implement the logic to update the LSM states and manage SST files on the filesystem.
99
* Update LSM read path to incorporate the LSM levels.
1010

11+
To copy the test cases into the starter code and run them,
12+
13+
```
14+
cargo x copy-test --week 2 --day 1
15+
cargo x scheck
16+
```
17+
1118
## Task 1: Compaction Implementation
1219

1320
In this task, you will implement the core logic of doing a compaction -- merge sort a set of SST files into a sorted run. You will need to modify:

mini-lsm-book/src/week2-02-simple.md

+7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ In this chapter, you will:
77
* Implement a simple leveled compaction strategy and simulate it on the compaction simulator.
88
* Start compaction as a background task and implement a compaction trigger in the system.
99

10+
To copy the test cases into the starter code and run them,
11+
12+
```
13+
cargo x copy-test --week 2 --day 2
14+
cargo x scheck
15+
```
16+
1017
## Task 1: Simple Leveled Compaction
1118

1219
In this chapter, we are going to implement our first compaction strategy -- simple leveled compaction. In this task, you will need to modify:

mini-lsm-book/src/week2-03-tiered.md

+7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ In this chapter, you will:
99

1010
The tiered compaction we talk about in this chapter is the same as RocksDB's universal compaction. We will use these two terminologies interchangeably.
1111

12+
To copy the test cases into the starter code and run them,
13+
14+
```
15+
cargo x copy-test --week 2 --day 3
16+
cargo x scheck
17+
```
18+
1219
## Task 1: Universal Compaction
1320

1421
In this chapter, you will implement RocksDB's universal compaction, which is of the tiered compaction family compaction strategies. Similar to the simple leveled compaction strategy, we only use number of files as the indicator in this compaction strategy. And when we trigger the compaction jobs, we always include a full sorted run (tier) in the compaction job.

mini-lsm-book/src/week2-04-leveled.md

+7
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,13 @@ In this chapter, you will:
77
* Implement a leveled compaction strategy and simulate it on the compaction simulator.
88
* Incorporate leveled compaction strategy into the system.
99

10+
To copy the test cases into the starter code and run them,
11+
12+
```
13+
cargo x copy-test --week 2 --day 4
14+
cargo x scheck
15+
```
16+
1017
## Task 1: Leveled Compaction
1118

1219
In chapter 2 day 2, you have implemented the simple leveled compaction strategies. However, the implementation has a few problems:

mini-lsm-mvcc/src/compact.rs

+12-10
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ impl CompactionController {
9898
}
9999
}
100100

101+
#[derive(Debug, Clone)]
101102
pub enum CompactionOptions {
102103
/// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled
103104
/// Compaction)
@@ -309,25 +310,25 @@ impl LsmStorageInner {
309310
};
310311
println!("running compaction task: {:?}", task);
311312
let sstables = self.compact(&task)?;
312-
let files_added = sstables.len();
313313
let output = sstables.iter().map(|x| x.sst_id()).collect::<Vec<_>>();
314314
let ssts_to_remove = {
315315
let state_lock = self.state_lock.lock();
316+
let mut snapshot = self.state.read().as_ref().clone();
317+
let mut new_sst_ids = Vec::new();
318+
for file_to_add in sstables {
319+
new_sst_ids.push(file_to_add.sst_id());
320+
let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add);
321+
assert!(result.is_none());
322+
}
316323
let (mut snapshot, files_to_remove) = self
317324
.compaction_controller
318-
.apply_compaction_result(&self.state.read(), &task, &output);
325+
.apply_compaction_result(&snapshot, &task, &output);
319326
let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
320327
for file_to_remove in &files_to_remove {
321328
let result = snapshot.sstables.remove(file_to_remove);
322329
assert!(result.is_some(), "cannot remove {}.sst", file_to_remove);
323330
ssts_to_remove.push(result.unwrap());
324331
}
325-
let mut new_sst_ids = Vec::new();
326-
for file_to_add in sstables {
327-
new_sst_ids.push(file_to_add.sst_id());
328-
let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add);
329-
assert!(result.is_none());
330-
}
331332
let mut state = self.state.write();
332333
*state = Arc::new(snapshot);
333334
drop(state);
@@ -339,9 +340,10 @@ impl LsmStorageInner {
339340
ssts_to_remove
340341
};
341342
println!(
342-
"compaction finished: {} files removed, {} files added",
343+
"compaction finished: {} files removed, {} files added, output={:?}",
343344
ssts_to_remove.len(),
344-
files_added
345+
output.len(),
346+
output
345347
);
346348
for sst in ssts_to_remove {
347349
std::fs::remove_file(self.path_of_sst(sst.sst_id()))?;

mini-lsm-mvcc/src/compact/leveled.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,11 @@ impl LeveledCompactionController {
124124
"target level sizes: {:?}, real level sizes: {:?}, base_level: {}",
125125
target_level_size
126126
.iter()
127-
.map(|x| format!("{}MB", x / 1024 / 1024))
127+
.map(|x| format!("{:.3}MB", *x as f64 / 1024.0 / 1024.0))
128128
.collect::<Vec<_>>(),
129129
real_level_size
130130
.iter()
131-
.map(|x| format!("{}MB", x / 1024 / 1024))
131+
.map(|x| format!("{:.3}MB", *x as f64 / 1024.0 / 1024.0))
132132
.collect::<Vec<_>>(),
133133
base_level,
134134
);

mini-lsm-mvcc/src/compact/simple_leveled.rs

+16-6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::HashSet;
2+
13
use serde::{Deserialize, Serialize};
24

35
use crate::lsm_storage::LsmStorageState;
@@ -95,12 +97,20 @@ impl SimpleLeveledCompactionController {
9597
files_to_remove.extend(&snapshot.levels[upper_level - 1].1);
9698
snapshot.levels[upper_level - 1].1.clear();
9799
} else {
98-
assert_eq!(
99-
task.upper_level_sst_ids, snapshot.l0_sstables,
100-
"sst mismatched"
101-
);
102-
files_to_remove.extend(&snapshot.l0_sstables);
103-
snapshot.l0_sstables.clear();
100+
files_to_remove.extend(&task.upper_level_sst_ids);
101+
let mut l0_ssts_compacted = task
102+
.upper_level_sst_ids
103+
.iter()
104+
.copied()
105+
.collect::<HashSet<_>>();
106+
let new_l0_sstables = snapshot
107+
.l0_sstables
108+
.iter()
109+
.copied()
110+
.filter(|x| !l0_ssts_compacted.remove(x))
111+
.collect::<Vec<_>>();
112+
assert!(l0_ssts_compacted.is_empty());
113+
snapshot.l0_sstables = new_l0_sstables;
104114
}
105115
assert_eq!(
106116
task.lower_level_sst_ids,

mini-lsm-mvcc/src/lsm_storage.rs

+10
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,16 @@ impl LsmStorageOptions {
9494
num_memtable_limit: 2,
9595
}
9696
}
97+
98+
pub fn default_for_week2_test(compaction_options: CompactionOptions) -> Self {
99+
Self {
100+
block_size: 4096,
101+
target_sst_size: 1 << 20, // 1MB
102+
compaction_options,
103+
enable_wal: false,
104+
num_memtable_limit: 2,
105+
}
106+
}
97107
}
98108

99109
fn range_overlap(

mini-lsm-mvcc/src/tests.rs

+3
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,6 @@ mod week1_day5;
77
mod week1_day6;
88
mod week1_day7;
99
mod week2_day1;
10+
mod week2_day2;
11+
mod week2_day3;
12+
mod week2_day4;

mini-lsm-mvcc/src/tests/week2_day2.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../../mini-lsm/src/tests/week2_day2.rs

mini-lsm-mvcc/src/tests/week2_day3.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../../mini-lsm/src/tests/week2_day3.rs

mini-lsm-mvcc/src/tests/week2_day4.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
../../../mini-lsm/src/tests/week2_day4.rs

mini-lsm-starter/src/lsm_storage.rs

+10
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,16 @@ impl LsmStorageOptions {
9090
num_memtable_limit: 2,
9191
}
9292
}
93+
94+
pub fn default_for_week2_test(compaction_options: CompactionOptions) -> Self {
95+
Self {
96+
block_size: 4096,
97+
target_sst_size: 1 << 20, // 1MB
98+
compaction_options,
99+
enable_wal: false,
100+
num_memtable_limit: 2,
101+
}
102+
}
93103
}
94104

95105
/// The storage interface of the LSM tree.

mini-lsm/src/compact.rs

+12-10
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ impl CompactionController {
9898
}
9999
}
100100

101+
#[derive(Debug, Clone)]
101102
pub enum CompactionOptions {
102103
/// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled
103104
/// Compaction)
@@ -309,25 +310,25 @@ impl LsmStorageInner {
309310
};
310311
println!("running compaction task: {:?}", task);
311312
let sstables = self.compact(&task)?;
312-
let files_added = sstables.len();
313313
let output = sstables.iter().map(|x| x.sst_id()).collect::<Vec<_>>();
314314
let ssts_to_remove = {
315315
let state_lock = self.state_lock.lock();
316+
let mut snapshot = self.state.read().as_ref().clone();
317+
let mut new_sst_ids = Vec::new();
318+
for file_to_add in sstables {
319+
new_sst_ids.push(file_to_add.sst_id());
320+
let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add);
321+
assert!(result.is_none());
322+
}
316323
let (mut snapshot, files_to_remove) = self
317324
.compaction_controller
318-
.apply_compaction_result(&self.state.read(), &task, &output);
325+
.apply_compaction_result(&snapshot, &task, &output);
319326
let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
320327
for file_to_remove in &files_to_remove {
321328
let result = snapshot.sstables.remove(file_to_remove);
322329
assert!(result.is_some(), "cannot remove {}.sst", file_to_remove);
323330
ssts_to_remove.push(result.unwrap());
324331
}
325-
let mut new_sst_ids = Vec::new();
326-
for file_to_add in sstables {
327-
new_sst_ids.push(file_to_add.sst_id());
328-
let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add);
329-
assert!(result.is_none());
330-
}
331332
let mut state = self.state.write();
332333
*state = Arc::new(snapshot);
333334
drop(state);
@@ -339,9 +340,10 @@ impl LsmStorageInner {
339340
ssts_to_remove
340341
};
341342
println!(
342-
"compaction finished: {} files removed, {} files added",
343+
"compaction finished: {} files removed, {} files added, output={:?}",
343344
ssts_to_remove.len(),
344-
files_added
345+
output.len(),
346+
output
345347
);
346348
for sst in ssts_to_remove {
347349
std::fs::remove_file(self.path_of_sst(sst.sst_id()))?;

mini-lsm/src/compact/leveled.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,11 @@ impl LeveledCompactionController {
124124
"target level sizes: {:?}, real level sizes: {:?}, base_level: {}",
125125
target_level_size
126126
.iter()
127-
.map(|x| format!("{}MB", x / 1024 / 1024))
127+
.map(|x| format!("{:.3}MB", *x as f64 / 1024.0 / 1024.0))
128128
.collect::<Vec<_>>(),
129129
real_level_size
130130
.iter()
131-
.map(|x| format!("{}MB", x / 1024 / 1024))
131+
.map(|x| format!("{:.3}MB", *x as f64 / 1024.0 / 1024.0))
132132
.collect::<Vec<_>>(),
133133
base_level,
134134
);

mini-lsm/src/compact/simple_leveled.rs

+16-6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::HashSet;
2+
13
use serde::{Deserialize, Serialize};
24

35
use crate::lsm_storage::LsmStorageState;
@@ -95,12 +97,20 @@ impl SimpleLeveledCompactionController {
9597
files_to_remove.extend(&snapshot.levels[upper_level - 1].1);
9698
snapshot.levels[upper_level - 1].1.clear();
9799
} else {
98-
assert_eq!(
99-
task.upper_level_sst_ids, snapshot.l0_sstables,
100-
"sst mismatched"
101-
);
102-
files_to_remove.extend(&snapshot.l0_sstables);
103-
snapshot.l0_sstables.clear();
100+
files_to_remove.extend(&task.upper_level_sst_ids);
101+
let mut l0_ssts_compacted = task
102+
.upper_level_sst_ids
103+
.iter()
104+
.copied()
105+
.collect::<HashSet<_>>();
106+
let new_l0_sstables = snapshot
107+
.l0_sstables
108+
.iter()
109+
.copied()
110+
.filter(|x| !l0_ssts_compacted.remove(x))
111+
.collect::<Vec<_>>();
112+
assert!(l0_ssts_compacted.is_empty());
113+
snapshot.l0_sstables = new_l0_sstables;
104114
}
105115
assert_eq!(
106116
task.lower_level_sst_ids,

mini-lsm/src/lsm_storage.rs

+10
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,16 @@ impl LsmStorageOptions {
9494
num_memtable_limit: 2,
9595
}
9696
}
97+
98+
pub fn default_for_week2_test(compaction_options: CompactionOptions) -> Self {
99+
Self {
100+
block_size: 4096,
101+
target_sst_size: 1 << 20, // 1MB
102+
compaction_options,
103+
enable_wal: false,
104+
num_memtable_limit: 2,
105+
}
106+
}
97107
}
98108

99109
fn range_overlap(

mini-lsm/src/tests.rs

+3
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,6 @@ mod week1_day5;
77
mod week1_day6;
88
mod week1_day7;
99
mod week2_day1;
10+
mod week2_day2;
11+
mod week2_day3;
12+
mod week2_day4;

0 commit comments

Comments
 (0)