Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): checksum BlockType in block header #704

Merged
merged 3 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/storage/secondary/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ pub type Block = Bytes;
/// In RisingLight, the block encoding scheme is as follows:
///
/// ```plain
/// | block_type | cksum_type | cksum | data |
/// | 4B | 4B | 8B | variable |
/// | data | block_type | cksum_type | cksum |
/// | variable | 4B | 4B | 8B |
/// ```
pub trait BlockBuilder<A: Array> {
/// Append one data into the block.
Expand Down Expand Up @@ -118,17 +118,22 @@ impl BlockCacheKey {
}

#[derive(Default, Debug, Clone)]
pub struct BlockHeader {
pub struct BlockMeta {
pub block_type: BlockType,
pub checksum_type: ChecksumType,
pub checksum: u64,
}

pub const BLOCK_HEADER_SIZE: usize = 4 + 4 + 8;
pub const BLOCK_META_NON_CHECKSUM_SIZE: usize = 4;
pub const BLOCK_META_CHECKSUM_SIZE: usize = 4 + 8;
pub const BLOCK_META_SIZE: usize = BLOCK_META_NON_CHECKSUM_SIZE + BLOCK_META_CHECKSUM_SIZE;

impl BlockHeader {
pub fn encode(&self, buf: &mut impl BufMut) {
impl BlockMeta {
pub fn encode_except_checksum(&self, buf: &mut impl BufMut) {
buf.put_i32(self.block_type.into());
}

pub fn encode_checksum(&self, buf: &mut impl BufMut) {
buf.put_i32(self.checksum_type.into());
buf.put_u64(self.checksum);
}
Expand Down
29 changes: 18 additions & 11 deletions src/storage/secondary/block/block_index_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use risinglight_proto::rowset::block_index::BlockType;
use risinglight_proto::rowset::{BlockIndex, BlockStatistics};

use super::{BlockHeader, BLOCK_HEADER_SIZE};
use super::{BlockMeta, BLOCK_META_NON_CHECKSUM_SIZE, BLOCK_META_SIZE};
use crate::storage::secondary::{build_checksum, ColumnBuilderOptions};

/// Builds the block index.
Expand Down Expand Up @@ -46,7 +46,7 @@ impl BlockIndexBuilder {
) {
self.indexes.push(BlockIndex {
offset: column_data.len() as u64,
length: block_data.len() as u64 + BLOCK_HEADER_SIZE as u64,
length: block_data.len() as u64 + BLOCK_META_SIZE as u64,
first_rowid: self.last_row_count as u32,
row_count: (self.row_count - self.last_row_count) as u32,
/// TODO(chi): support sort key
Expand All @@ -58,22 +58,29 @@ impl BlockIndexBuilder {
// the new block will begin at the current row count
self.last_row_count = self.row_count;

self.block_header.resize(BLOCK_HEADER_SIZE, 0);
let mut block_header_ref = &mut self.block_header[..];
self.block_header.resize(BLOCK_META_SIZE, 0);
let mut block_header_nonchecksum = &mut self.block_header[..BLOCK_META_NON_CHECKSUM_SIZE];

let checksum_type = self.options.checksum_type;

BlockHeader {
let mut header = BlockMeta {
block_type,
checksum_type,
checksum: build_checksum(checksum_type, block_data),
}
.encode(&mut block_header_ref);

debug_assert!(block_header_ref.is_empty());
checksum: 0,
};
header.encode_except_checksum(&mut block_header_nonchecksum);
debug_assert!(block_header_nonchecksum.is_empty());
// add block_type to block_data
block_data.extend_from_slice(&self.block_header[..BLOCK_META_NON_CHECKSUM_SIZE]);

// calculate checksum and add
header.checksum = build_checksum(header.checksum_type, block_data);
let mut block_header_checksum = &mut self.block_header[BLOCK_META_NON_CHECKSUM_SIZE..];
header.encode_checksum(&mut block_header_checksum);
debug_assert!(block_header_checksum.is_empty());
block_data.extend_from_slice(&self.block_header[BLOCK_META_NON_CHECKSUM_SIZE..]);

// add data to the column file
column_data.append(&mut self.block_header);
column_data.append(block_data);
}

Expand Down
16 changes: 8 additions & 8 deletions src/storage/secondary/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ use bytes::Bytes;
pub use char_column_factory::*;
use moka::future::Cache;

use super::{Block, BlockCacheKey, BlockHeader, ColumnIndex, BLOCK_HEADER_SIZE};
use super::block::BLOCK_META_CHECKSUM_SIZE;
use super::{Block, BlockCacheKey, BlockMeta, ColumnIndex, BLOCK_META_SIZE};
use crate::array::Array;
use crate::storage::secondary::verify_checksum;
use crate::storage::{StorageResult, TracedStorageError};
Expand Down Expand Up @@ -143,14 +144,14 @@ impl Column {
lst_idx.offset + lst_idx.length
}

pub async fn get_block(&self, block_id: u32) -> StorageResult<(BlockHeader, Block)> {
pub async fn get_block(&self, block_id: u32) -> StorageResult<(BlockMeta, Block)> {
// It is possible that there will be multiple futures accessing
// one block not in cache concurrently, which might cause avalanche
// in cache. For now, we don't handle it.

let key = self.base_block_key.clone().block(block_id);

let mut block_header = BlockHeader::default();
let mut block_header = BlockMeta::default();
let mut do_verify_checksum = false;

// support multiple I/O backend
Expand Down Expand Up @@ -190,23 +191,22 @@ impl Column {
})
.await?;

if block.len() < BLOCK_HEADER_SIZE {
if block.len() < BLOCK_META_SIZE {
return Err(TracedStorageError::decode(
"block is smaller than header size",
));
}
let mut header = &block[..BLOCK_HEADER_SIZE];
let block_data = &block[BLOCK_HEADER_SIZE..];
let mut header = &block[block.len() - BLOCK_META_SIZE..];
block_header.decode(&mut header)?;

if do_verify_checksum {
verify_checksum(
block_header.checksum_type,
block_data,
&block[..block.len() - BLOCK_META_CHECKSUM_SIZE],
block_header.checksum,
)?;
}

Ok((block_header, block.slice(BLOCK_HEADER_SIZE..)))
Ok((block_header, block.slice(..block.len() - BLOCK_META_SIZE)))
}
}