Skip to content

Commit

Permalink
refactor(storage-manager): LogLatest::lookup -> lookup_newer
Browse files Browse the repository at this point in the history
This commit optimises the method `lookup` to only lookup for newer (and
thus rename it to `lookup_newer`) Event.

As it is only looking for a newer Event, this method skips the Intervals
and SubIntervals that, by construction of the Replication Log, only
contain Events with lower timestamps.

* plugins/zenoh-plugin-storage-manager/src/replication/classification.rs:
  - Added new enumeration `EventLookup`.
  - Removed the `Interval::lookup` method as this commit made it unneeded.
  - Updated the `SubInterval::lookup` method to return the newly created
    `EventLookup` enumeration.

* plugins/zenoh-plugin-storage-manager/src/replication/core/aligner_reply.rs:
  - Removed the code to filter out an Event with an older timestamp as
    the method `LogLatest::lookup_newer` already does it.
  - Create the `EventMetadata` before calling `LogLatest::remove_event` to
    avoid multiple borrows.

* plugins/zenoh-plugin-storage-manager/src/replication/log.rs: renamed
  the method `LogLatest::lookup` to `LogLatest::lookup_newer` and
  modified its behaviour to only look for newer Event.

* plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs:
  only test if `LogLatest::lookup_newer` returned something.

Signed-off-by: Julien Loudet <julien.loudet@zettascale.tech>
  • Loading branch information
Julien Loudet committed Oct 28, 2024
1 parent dd12d95 commit 9b5c1d6
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ pub(crate) enum EventRemoval {
RemovedOlder(Event),
}

/// The `EventLookup` enumeration lists the possible outcomes when searching for an [Event] (with a
/// Timestamp) in the Replication Log.
///
/// The Timestamp allows comparing the [Event] that was found, establishing if it is Older, Newer
/// or Identical.
///
/// The Newer or Identical cases were merged as this enumeration was designed with the
/// `LogLatest::lookup_newer` method in mind, which does not need to distinguish them.
#[derive(Debug, PartialEq, Eq)]
pub(crate) enum EventLookup<'a> {
NotFound,
NewerOrIdentical(&'a Event),
Older,
}

/// An `IntervalIdx` represents the index of an `Interval`.
///
/// It is a thin wrapper around a `u64`.
Expand Down Expand Up @@ -139,17 +154,6 @@ impl Interval {
self.sub_intervals.get(sub_interval_idx)
}

/// Lookup the provided key expression and return, if found, its associated [Event].
pub(crate) fn lookup(&self, event_to_lookup: &EventMetadata) -> Option<&Event> {
for sub_interval in self.sub_intervals.values() {
if let Some(event) = sub_interval.lookup(event_to_lookup) {
return Some(event);
}
}

None
}

/// Returns an [HashMap] of the index and [Fingerprint] of all the [SubInterval]s contained in
/// this [Interval].
pub(crate) fn sub_intervals_fingerprints(&self) -> HashMap<SubIntervalIdx, Fingerprint> {
Expand Down Expand Up @@ -397,8 +401,25 @@ impl SubInterval {
EventRemoval::NotFound
}

fn lookup(&self, event_to_lookup: &EventMetadata) -> Option<&Event> {
self.events.get(&event_to_lookup.log_key())
/// Looks up the key expression of the provided `event_to_lookup` in this [SubInterval] and,
/// depending on its timestamp and if an [Event] has been found, returns the [EventLookup].
///
/// If the Event in the Replication Log has the same or a greater timestamp than
/// `event_to_lookup` then `NewerOrIdentical` is returned. If its timestamp is lower then
/// `Older` is returned.
///
/// If this SubInterval contains no Event with the same key expression, `NotFound` is returned.
pub(crate) fn lookup(&self, event_to_lookup: &EventMetadata) -> EventLookup {
match self.events.get(&event_to_lookup.log_key()) {
Some(event) => {
if event.timestamp >= event_to_lookup.timestamp {
EventLookup::NewerOrIdentical(event)
} else {
EventLookup::Older
}
}
None => EventLookup::NotFound,
}
}

fn remove_event(&mut self, event_to_remove: &EventMetadata) -> Option<Event> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,16 +425,7 @@ impl Replication {
return false;
};

let maybe_newer_event = replication_log_guard
.lookup(replica_event)
.and_then(|log_event| {
if log_event.timestamp < replica_event.timestamp {
None
} else {
Some(log_event)
}
})
.cloned();
let maybe_newer_event = replication_log_guard.lookup_newer(replica_event);

match (maybe_wildcard_update, maybe_newer_event) {
// No Event in the Replication Log or Wildcard Update, we go on, we need to process it.
Expand Down Expand Up @@ -491,8 +482,9 @@ impl Replication {
.await;
}

let log_event_metadata = log_event.into();
if let Some(mut removed_event) =
replication_log_guard.remove_event(&(&log_event).into())
replication_log_guard.remove_event(&log_event_metadata)
{
removed_event.set_timestamp_and_action(
replica_event.timestamp,
Expand Down
49 changes: 43 additions & 6 deletions plugins/zenoh-plugin-storage-manager/src/replication/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use zenoh::{key_expr::OwnedKeyExpr, sample::SampleKind, time::Timestamp, Result
use zenoh_backend_traits::config::ReplicaConfig;

use super::{
classification::{EventRemoval, Interval, IntervalIdx},
classification::{EventLookup, EventRemoval, Interval, IntervalIdx},
configuration::Configuration,
digest::{Digest, Fingerprint},
};
Expand Down Expand Up @@ -364,15 +364,52 @@ impl LogLatest {
&self.configuration
}

/// Lookup the provided key expression and, if found, return its associated [Event].
pub fn lookup(&self, event_to_lookup: &EventMetadata) -> Option<&Event> {
/// Returns the [Event] from the Replication Log with a newer Timestamp and the same key
/// expression, or None if no such Event exists in the Replication Log.
///
/// To speed up this lookup, only the intervals that contain Events with a more recent Timestamp
/// are visited.
///
/// # ⚠️ Caveat
///
/// It is not because this method returns `None` that the Replication Log does not contain any
/// Event with the same key expression. It could return None and *still contain* an Event with
/// the same key expression.
pub fn lookup_newer(&self, event_to_lookup: &EventMetadata) -> Option<&Event> {
if !self.bloom_filter_event.check(&event_to_lookup.log_key()) {
return None;
}

for interval in self.intervals.values().rev() {
if let Some(event) = interval.lookup(event_to_lookup) {
return Some(event);
let Ok((event_interval_idx, event_sub_interval_idx)) = self
.configuration
.get_time_classification(&event_to_lookup.timestamp)
else {
tracing::error!(
"Fatal error: failed to compute the time classification of < {event_to_lookup:?} >"
);
return None;
};

for (interval_idx, interval) in self
.intervals
.iter()
.filter(|(&idx, _)| idx >= event_interval_idx)
{
for (_, sub_interval) in interval.sub_intervals().filter(|(&sub_idx, _)| {
// All sub-intervals must be visited except in one Interval: the Interval in which
// the `event_to_lookup` belongs. In this specific Interval, only the SubIntervals
// with a greater index should be visited (lower index means lower timestamp).
if *interval_idx == event_interval_idx {
return sub_idx >= event_sub_interval_idx;
}

true
}) {
match sub_interval.lookup(event_to_lookup) {
EventLookup::NotFound => continue,
EventLookup::NewerOrIdentical(event) => return Some(event),
EventLookup::Older => return None,
}
}
}

Expand Down
11 changes: 7 additions & 4 deletions plugins/zenoh-plugin-storage-manager/src/storages_mgt/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,13 @@ impl StorageService {
}

if let Some(replication_log) = &self.cache_latest.replication_log {
if let Some(event) = replication_log.read().await.lookup(new_event) {
if new_event.timestamp <= event.timestamp {
return None;
}
if replication_log
.read()
.await
.lookup_newer(new_event)
.is_some()
{
return None;
}
} else {
let mut storage = self.storage.lock().await;
Expand Down

0 comments on commit 9b5c1d6

Please sign in to comment.