Skip to content

Commit

Permalink
feat: introduce an override for atomic batch operations
Browse files Browse the repository at this point in the history
Signed-off-by: ljedrz <ljedrz@gmail.com>
  • Loading branch information
ljedrz committed Jan 11, 2024
1 parent 6b2a814 commit 6a17280
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 6 deletions.
8 changes: 8 additions & 0 deletions ledger/store/src/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ pub trait BlockStorage<N: Network>: 'static + Clone + Send + Sync {
self.transaction_store().finish_atomic()
}

fn flip_atomic_override(&self) -> Result<bool> {
self.state_root_map().flip_atomic_override()
}

/// Stores the given `(state root, block)` pair into storage.
fn insert(&self, state_root: N::StateRoot, block: &Block<N>) -> Result<()> {
// Prepare the confirmed transactions.
Expand Down Expand Up @@ -1147,6 +1151,10 @@ impl<N: Network, B: BlockStorage<N>> BlockStore<N, B> {
pub fn dev(&self) -> Option<u16> {
self.storage.dev()
}

pub fn flip_atomic_override(&self) -> Result<bool> {
self.storage.flip_atomic_override()
}
}

impl<N: Network, B: BlockStorage<N>> BlockStore<N, B> {
Expand Down
12 changes: 12 additions & 0 deletions ledger/store/src/helpers/memory/internal/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,18 @@ impl<

Ok(())
}

///
/// The atomic override can be used to merge disjoint atomic write batches.
/// When enabled, the subsequent atomic write batches no longer automatically
/// perform a write at the end of their scope; instead, they only extend the
/// pending write batch until `flip_atomic_override` is called again.
/// The returned boolean indicates the current state of the override (`true`
/// means it was enabled, `false` that it was disabled).
///
fn flip_atomic_override(&self) -> Result<bool> {
Ok(false)
}
}

impl<
Expand Down
18 changes: 16 additions & 2 deletions ledger/store/src/helpers/rocksdb/internal/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ impl<
// Ensure that the atomic batch is empty.
assert!(self.atomic_batch.lock().is_empty());
// Ensure that the database atomic batch is empty.
assert!(self.database.atomic_batch.lock().is_empty());
if !self.database.atomic_override.load(Ordering::SeqCst) {
assert!(self.database.atomic_batch.lock().is_empty());
}
}

///
Expand Down Expand Up @@ -218,7 +220,7 @@ impl<

// If we're at depth 0, it is the final call to `finish_atomic` and the
// atomic write batch can be physically executed.
if previous_atomic_depth == 1 {
if previous_atomic_depth == 1 && !self.database.atomic_override.load(Ordering::SeqCst) {
// Empty the collection of pending operations.
let batch = mem::take(&mut *self.database.atomic_batch.lock());
// Execute all the operations atomically.
Expand All @@ -229,6 +231,18 @@ impl<

Ok(())
}

///
/// The atomic override can be used to merge disjoint atomic write batches.
/// When enabled, the subsequent atomic write batches no longer automatically
/// perform a write at the end of their scope; instead, they only extend the
/// pending write batch until `flip_atomic_override` is called again.
/// The returned boolean indicates the current state of the override (`true`
/// means it was enabled, `false` that it was disabled).
///
fn flip_atomic_override(&self) -> Result<bool> {
self.database.flip_atomic_override()
}
}

impl<
Expand Down
21 changes: 20 additions & 1 deletion ledger/store/src/helpers/rocksdb/internal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ use serde::{de::DeserializeOwned, Serialize};
use std::{
borrow::Borrow,
marker::PhantomData,
mem,
ops::Deref,
sync::{
atomic::{AtomicBool, AtomicUsize},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
};
Expand Down Expand Up @@ -81,6 +82,8 @@ pub struct RocksDB {
/// The depth of the current atomic write batch; it gets incremented with every call
/// to `start_atomic` and decremented with each call to `finish_atomic`.
pub(super) atomic_depth: Arc<AtomicUsize>,
/// TODO
pub(super) atomic_override: Arc<AtomicBool>,
}

impl Deref for RocksDB {
Expand Down Expand Up @@ -125,6 +128,7 @@ impl Database for RocksDB {
dev,
atomic_batch: Default::default(),
atomic_depth: Default::default(),
atomic_override: Default::default(),
})
})?
.clone();
Expand Down Expand Up @@ -189,6 +193,20 @@ impl Database for RocksDB {
}

impl RocksDB {
fn flip_atomic_override(&self) -> Result<bool> {
// https://github.com/rust-lang/rust/issues/98485
let previous_value = self.atomic_override.load(Ordering::SeqCst);

if previous_value {
let batch = mem::take(&mut *self.atomic_batch.lock());
self.rocksdb.write(batch)?;
}

self.atomic_override.store(!previous_value, Ordering::SeqCst);

Ok(!previous_value)
}

/// Opens the test database.
#[cfg(any(test, feature = "test"))]
pub fn open_testing(temp_dir: std::path::PathBuf, dev: Option<u16>) -> Result<Self> {
Expand Down Expand Up @@ -238,6 +256,7 @@ impl RocksDB {
dev,
atomic_batch: Default::default(),
atomic_depth: Default::default(),
atomic_override: Default::default(),
})
}?;

Expand Down
6 changes: 4 additions & 2 deletions ledger/store/src/helpers/rocksdb/internal/nested_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ impl<
// Ensure that the atomic batch is empty.
assert!(self.atomic_batch.lock().is_empty());
// Ensure that the database atomic batch is empty.
assert!(self.database.atomic_batch.lock().is_empty());
if !self.database.atomic_override.load(Ordering::SeqCst) {
assert!(self.database.atomic_batch.lock().is_empty());
}
}

///
Expand Down Expand Up @@ -321,7 +323,7 @@ impl<

// If we're at depth 0, it is the final call to `finish_atomic` and the
// atomic write batch can be physically executed.
if previous_atomic_depth == 1 {
if previous_atomic_depth == 1 && !self.database.atomic_override.load(Ordering::SeqCst) {
// Empty the collection of pending operations.
let batch = mem::take(&mut *self.database.atomic_batch.lock());
// Execute all the operations atomically.
Expand Down
10 changes: 10 additions & 0 deletions ledger/store/src/helpers/traits/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,16 @@ pub trait Map<
/// Finishes an atomic operation, performing all the queued writes.
///
fn finish_atomic(&self) -> Result<()>;

///
/// The atomic override can be used to merge disjoint atomic write batches.
/// When enabled, the subsequent atomic write batches no longer automatically
/// perform a write at the end of their scope; instead, they only extend the
/// pending write batch until `flip_atomic_override` is called again.
/// The returned boolean indicates the current state of the override (`true`
/// means it was enabled, `false` that it was disabled).
///
fn flip_atomic_override(&self) -> Result<bool>;
}

/// A trait representing map-like storage operations with read-only capabilities.
Expand Down
17 changes: 16 additions & 1 deletion synthesizer/src/vm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,27 @@ impl<N: Network, C: ConsensusStorage<N>> VM<N, C> {
// Attention: The following order is crucial because if 'finalize' fails, we can rollback the block.
// If one first calls 'finalize', then calls 'insert(block)' and it fails, there is no way to rollback 'finalize'.

// Enable the atomic batch override, so that both the insertion and finalization belong to a single batch.
#[cfg(feature = "rocks")]
assert!(self.block_store().flip_atomic_override()?);

// First, insert the block.
self.block_store().insert(block)?;
// Next, finalize the transactions.
match self.finalize(state, block.ratifications(), block.solutions(), block.transactions()) {
Ok(_ratified_finalize_operations) => Ok(()),
Ok(_ratified_finalize_operations) => {
// Disable the atomic batch override, executing it.
#[cfg(feature = "rocks")]
assert!(!self.block_store().flip_atomic_override()?);
Ok(())
}
Err(finalize_error) => {
// Rewind the pending operations related to block insertion.
#[cfg(feature = "rocks")]
self.block_store().atomic_rewind();
// Disable the atomic batch override.
#[cfg(feature = "rocks")]
assert!(!self.block_store().flip_atomic_override()?);
// Rollback the block.
self.block_store().remove_last_n(1).map_err(|removal_error| {
// Log the finalize error.
Expand Down

0 comments on commit 6a17280

Please sign in to comment.