diff --git a/src/storage/secondary/block.rs b/src/storage/secondary/block.rs index 0628d62e6..9f61f586c 100644 --- a/src/storage/secondary/block.rs +++ b/src/storage/secondary/block.rs @@ -51,8 +51,8 @@ pub type Block = Bytes; /// In RisingLight, the block encoding scheme is as follows: /// /// ```plain -/// | block_type | cksum_type | cksum | data | -/// | 4B | 4B | 8B | variable | +/// | data | block_type | cksum_type | cksum | +/// | variable | 4B | 4B | 8B | /// ``` pub trait BlockBuilder { /// Append one data into the block. @@ -118,17 +118,22 @@ impl BlockCacheKey { } #[derive(Default, Debug, Clone)] -pub struct BlockHeader { +pub struct BlockMeta { pub block_type: BlockType, pub checksum_type: ChecksumType, pub checksum: u64, } -pub const BLOCK_HEADER_SIZE: usize = 4 + 4 + 8; +pub const BLOCK_META_NON_CHECKSUM_SIZE: usize = 4; +pub const BLOCK_META_CHECKSUM_SIZE: usize = 4 + 8; +pub const BLOCK_META_SIZE: usize = BLOCK_META_NON_CHECKSUM_SIZE + BLOCK_META_CHECKSUM_SIZE; -impl BlockHeader { - pub fn encode(&self, buf: &mut impl BufMut) { +impl BlockMeta { + pub fn encode_except_checksum(&self, buf: &mut impl BufMut) { buf.put_i32(self.block_type.into()); + } + + pub fn encode_checksum(&self, buf: &mut impl BufMut) { buf.put_i32(self.checksum_type.into()); buf.put_u64(self.checksum); } diff --git a/src/storage/secondary/block/block_index_builder.rs b/src/storage/secondary/block/block_index_builder.rs index 0fa33ee44..d24f89af1 100644 --- a/src/storage/secondary/block/block_index_builder.rs +++ b/src/storage/secondary/block/block_index_builder.rs @@ -3,7 +3,7 @@ use risinglight_proto::rowset::block_index::BlockType; use risinglight_proto::rowset::{BlockIndex, BlockStatistics}; -use super::{BlockHeader, BLOCK_HEADER_SIZE}; +use super::{BlockMeta, BLOCK_META_NON_CHECKSUM_SIZE, BLOCK_META_SIZE}; use crate::storage::secondary::{build_checksum, ColumnBuilderOptions}; /// Builds the block index. @@ -46,7 +46,7 @@ impl BlockIndexBuilder { ) { self.indexes.push(BlockIndex { offset: column_data.len() as u64, - length: block_data.len() as u64 + BLOCK_HEADER_SIZE as u64, + length: block_data.len() as u64 + BLOCK_META_SIZE as u64, first_rowid: self.last_row_count as u32, row_count: (self.row_count - self.last_row_count) as u32, /// TODO(chi): support sort key @@ -58,22 +58,29 @@ impl BlockIndexBuilder { // the new block will begin at the current row count self.last_row_count = self.row_count; - self.block_header.resize(BLOCK_HEADER_SIZE, 0); - let mut block_header_ref = &mut self.block_header[..]; + self.block_header.resize(BLOCK_META_SIZE, 0); + let mut block_header_nonchecksum = &mut self.block_header[..BLOCK_META_NON_CHECKSUM_SIZE]; let checksum_type = self.options.checksum_type; - BlockHeader { + let mut header = BlockMeta { block_type, checksum_type, - checksum: build_checksum(checksum_type, block_data), - } - .encode(&mut block_header_ref); - - debug_assert!(block_header_ref.is_empty()); + checksum: 0, + }; + header.encode_except_checksum(&mut block_header_nonchecksum); + debug_assert!(block_header_nonchecksum.is_empty()); + // add block_type to block_data + block_data.extend_from_slice(&self.block_header[..BLOCK_META_NON_CHECKSUM_SIZE]); + + // calculate checksum and add + header.checksum = build_checksum(header.checksum_type, block_data); + let mut block_header_checksum = &mut self.block_header[BLOCK_META_NON_CHECKSUM_SIZE..]; + header.encode_checksum(&mut block_header_checksum); + debug_assert!(block_header_checksum.is_empty()); + block_data.extend_from_slice(&self.block_header[BLOCK_META_NON_CHECKSUM_SIZE..]); // add data to the column file - column_data.append(&mut self.block_header); column_data.append(block_data); } diff --git a/src/storage/secondary/column.rs b/src/storage/secondary/column.rs index 240c94e28..91897eb0e 100644 --- a/src/storage/secondary/column.rs +++ b/src/storage/secondary/column.rs @@ -37,7 +37,8 @@ use bytes::Bytes; pub use char_column_factory::*; use moka::future::Cache; -use super::{Block, BlockCacheKey, BlockHeader, ColumnIndex, BLOCK_HEADER_SIZE}; +use super::block::BLOCK_META_CHECKSUM_SIZE; +use super::{Block, BlockCacheKey, BlockMeta, ColumnIndex, BLOCK_META_SIZE}; use crate::array::Array; use crate::storage::secondary::verify_checksum; use crate::storage::{StorageResult, TracedStorageError}; @@ -143,14 +144,14 @@ impl Column { lst_idx.offset + lst_idx.length } - pub async fn get_block(&self, block_id: u32) -> StorageResult<(BlockHeader, Block)> { + pub async fn get_block(&self, block_id: u32) -> StorageResult<(BlockMeta, Block)> { // It is possible that there will be multiple futures accessing // one block not in cache concurrently, which might cause avalanche // in cache. For now, we don't handle it. let key = self.base_block_key.clone().block(block_id); - let mut block_header = BlockHeader::default(); + let mut block_header = BlockMeta::default(); let mut do_verify_checksum = false; // support multiple I/O backend @@ -190,23 +191,22 @@ impl Column { }) .await?; - if block.len() < BLOCK_HEADER_SIZE { + if block.len() < BLOCK_META_SIZE { return Err(TracedStorageError::decode( "block is smaller than header size", )); } - let mut header = &block[..BLOCK_HEADER_SIZE]; - let block_data = &block[BLOCK_HEADER_SIZE..]; + let mut header = &block[block.len() - BLOCK_META_SIZE..]; block_header.decode(&mut header)?; if do_verify_checksum { verify_checksum( block_header.checksum_type, - block_data, + &block[..block.len() - BLOCK_META_CHECKSUM_SIZE], block_header.checksum, )?; } - Ok((block_header, block.slice(BLOCK_HEADER_SIZE..))) + Ok((block_header, block.slice(..block.len() - BLOCK_META_SIZE))) } }