Skip to content

Commit

Permalink
feat(block-util): use rayon for get_entry_by_id
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Feb 26, 2025
1 parent c0e5d15 commit 6718dfb
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 29 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions block-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ bytes = { workspace = true }
everscale-types = { workspace = true, features = ["blake3", "rayon"] }
hex = { workspace = true }
parking_lot = { workspace = true }
rayon = { workspace = true }
thiserror = { workspace = true }
tl-proto = { workspace = true }

Expand Down
39 changes: 30 additions & 9 deletions block-util/src/archive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! * Archive entry data
use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::Result;
use bytes::Bytes;
Expand Down Expand Up @@ -90,19 +91,39 @@ impl Archive {
}
}

// TODO: Make async
pub fn get_entry_by_id(
&self,
/// NOTE: Takes up to a magnitude of seconds to run on large blocks.
pub async fn get_entry_by_id(
self: &Arc<Self>,
id: &BlockId,
) -> Result<(BlockStuffAug, BlockProofStuffAug, QueueDiffStuffAug), ArchiveError> {
// TODO: Rayon go brr
let block = self.get_block_by_id(id)?;
let proof = self.get_proof_by_id(id)?;
let queue_diff = self.get_queue_diff_by_id(id)?;

Ok((block, proof, queue_diff))
let this = self.clone();
let id = *id;

let (block, proof, queue_diff) = tycho_util::sync::rayon_run(move || {
let mut block_res = None;
let mut proof_res = None;
let mut diff_res = None;
rayon::scope(|s| {
s.spawn(|_| {
proof_res = Some(this.get_proof_by_id(&id));
diff_res = Some(this.get_queue_diff_by_id(&id));
});

block_res = Some(this.get_block_by_id(&id));
});

(
block_res.expect("scope must finish"),
proof_res.expect("scope must finish"),
diff_res.expect("scope must finish"),
)
})
.await;

Ok((block?, proof?, queue_diff?))
}

/// NOTE: Takes up to a magnitude of seconds to run on large blocks.
pub fn get_block_by_id(&self, id: &BlockId) -> Result<BlockStuffAug, ArchiveError> {
let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?;
entry
Expand Down
4 changes: 2 additions & 2 deletions core/src/block_strider/provider/archive_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,11 @@ impl ArchiveBlockProvider {

async fn checked_get_entry_by_id(
&self,
archive: &Archive,
archive: &Arc<Archive>,
mc_block_id: &BlockId,
block_id: &BlockId,
) -> Result<BlockStuffAug> {
let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id) {
let (block, ref proof, ref queue_diff) = match archive.get_entry_by_id(block_id).await {
Ok(entry) => entry,
Err(e) => anyhow::bail!("archive is corrupted: {e:?}"),
};
Expand Down
25 changes: 14 additions & 11 deletions core/tests/archives.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use anyhow::Result;
use bytes::BytesMut;
use bytesize::ByteSize;
Expand Down Expand Up @@ -41,7 +43,7 @@ impl StateSubscriber for DummySubscriber {
}

pub struct ArchiveProvider {
archive: Archive,
archive: Arc<Archive>,
proof_checker: ProofChecker,
}

Expand All @@ -52,10 +54,11 @@ impl ArchiveProvider {
block_id,
} = block_id_relation;

let (ref block, ref proof, ref queue_diff) = match self.archive.get_entry_by_id(block_id) {
Ok(entry) => entry,
Err(e) => return Some(Err(e.into())),
};
let (ref block, ref proof, ref queue_diff) =
match self.archive.get_entry_by_id(block_id).await {
Ok(entry) => entry,
Err(e) => return Some(Err(e.into())),
};

match self
.proof_checker
Expand Down Expand Up @@ -249,7 +252,7 @@ async fn archives() -> Result<()> {

// Archive provider
let archive_data = utils::read_file("archive_1.bin")?;
let archive = utils::parse_archive(&archive_data)?;
let archive = utils::parse_archive(&archive_data).map(Arc::new)?;

let archive_provider = ArchiveProvider {
archive,
Expand All @@ -258,7 +261,7 @@ async fn archives() -> Result<()> {

// Next archive provider
let next_archive_data = utils::read_file("archive_2.bin")?;
let next_archive = utils::parse_archive(&next_archive_data)?;
let next_archive = utils::parse_archive(&next_archive_data).map(Arc::new)?;

let next_archive_provider = ArchiveProvider {
archive: next_archive,
Expand All @@ -267,7 +270,7 @@ async fn archives() -> Result<()> {

// Last archive provider
let last_archive_data = utils::read_file("archive_3.bin")?;
let last_archive = utils::parse_archive(&last_archive_data)?;
let last_archive = utils::parse_archive(&last_archive_data).map(Arc::new)?;

let last_archive_provider = ArchiveProvider {
archive: last_archive,
Expand Down Expand Up @@ -360,7 +363,7 @@ async fn heavy_archives() -> Result<()> {
// Archive provider
let archive_path = integration_test_path.join("archive_1.bin");
let archive_data = std::fs::read(archive_path)?;
let archive = utils::parse_archive(&archive_data)?;
let archive = utils::parse_archive(&archive_data).map(Arc::new)?;

let archive_provider = ArchiveProvider {
archive,
Expand All @@ -370,7 +373,7 @@ async fn heavy_archives() -> Result<()> {
// Next archive provider
let next_archive_path = integration_test_path.join("archive_2.bin");
let next_archive_data = std::fs::read(next_archive_path)?;
let next_archive = utils::parse_archive(&next_archive_data)?;
let next_archive = utils::parse_archive(&next_archive_data).map(Arc::new)?;

let next_archive_provider = ArchiveProvider {
archive: next_archive,
Expand All @@ -380,7 +383,7 @@ async fn heavy_archives() -> Result<()> {
// Last archive provider
let last_archive_path = integration_test_path.join("archive_3.bin");
let last_archive_data = std::fs::read(last_archive_path)?;
let last_archive = utils::parse_archive(&last_archive_data)?;
let last_archive = utils::parse_archive(&last_archive_data).map(Arc::new)?;

let last_archive_provider = ArchiveProvider {
archive: last_archive,
Expand Down
4 changes: 2 additions & 2 deletions core/tests/overlay_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ async fn overlay_server_blocks() -> Result<()> {
.build();

let archive_data = utils::read_file("archive_1.bin")?;
let archive = utils::parse_archive(&archive_data)?;
let archive = utils::parse_archive(&archive_data).map(Arc::new)?;

for block_id in archive.blocks.keys() {
if block_id.shard.is_masterchain() {
Expand All @@ -248,7 +248,7 @@ async fn overlay_server_blocks() -> Result<()> {
.await?;

let (archive_block, archive_proof, archive_queue_diff) =
archive.get_entry_by_id(block_id)?;
archive.get_entry_by_id(block_id).await?;

if let Some(block_full) = &result.data {
let block = BlockStuff::deserialize_checked(block_id, &block_full.block_data)?;
Expand Down
6 changes: 4 additions & 2 deletions core/tests/storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use anyhow::{Context, Result};
use tempfile::TempDir;
use tycho_storage::{NewBlockMeta, Storage};
Expand Down Expand Up @@ -29,10 +31,10 @@ pub(crate) async fn init_storage() -> Result<(Storage, TempDir)> {

// Init blocks
let archive_data = utils::read_file("archive_1.bin")?;
let block_provider = utils::parse_archive(&archive_data)?;
let block_provider = utils::parse_archive(&archive_data).map(Arc::new)?;

for block_id in block_provider.mc_block_ids.values() {
let (block, proof, diff) = block_provider.get_entry_by_id(block_id)?;
let (block, proof, diff) = block_provider.get_entry_by_id(block_id).await?;

let info = block.load_info().context("Failed to load block info")?;
let meta = NewBlockMeta {
Expand Down
4 changes: 1 addition & 3 deletions core/tests/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ pub(crate) fn parse_archive(data: &[u8]) -> Result<Archive> {
let mut decompressed = Vec::new();
decoder.write(data, &mut decompressed)?;

let archive = Archive::new(decompressed)?;

Ok(archive)
Archive::new(decompressed)
}

pub(crate) fn read_file(filename: &str) -> Result<Vec<u8>> {
Expand Down

0 comments on commit 6718dfb

Please sign in to comment.