Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] Refactor client storage #582

Merged
merged 4 commits into from
Feb 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions sui/src/wallet_commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl WalletCommands {
.await?
.get_single_owner_address()?;
let client_state = context.get_or_create_client_state(sender)?;
let gas_obj_ref = client_state.object_ref(*gas)?;
let gas_obj_ref = client_state.latest_object_ref(gas)?;

let (cert, effects) = client_state
.publish(path.clone(), gas_obj_ref, *gas_budget)
Expand Down Expand Up @@ -223,7 +223,7 @@ impl WalletCommands {
)?;

// Fetch the object info for the gas obj
let gas_obj_ref = client_state.object_ref(*gas)?;
let gas_obj_ref = client_state.latest_object_ref(gas)?;

// Fetch the objects for the object args
let mut object_args_refs = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl AuthorityState {
}
InputObjectKind::MoveObject((object_id, sequence_number, object_digest)) => {
fp_ensure!(
sequence_number <= SequenceNumber::max(),
sequence_number <= SequenceNumber::MAX,
SuiError::InvalidSequenceNumber
);

Expand Down
2 changes: 1 addition & 1 deletion sui_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ impl AuthorityStore {
.parent_sync
.iter()
// Make the max possible entry for this object ID.
.skip_prior_to(&(object_id, SEQUENCE_NUMBER_MAX, OBJECT_DIGEST_MAX))?;
.skip_prior_to(&(object_id, SequenceNumber::MAX, OBJECT_DIGEST_MAX))?;

Ok(iterator.next().and_then(|(obj_ref, tx_digest)| {
if obj_ref.0 == object_id {
Expand Down
97 changes: 22 additions & 75 deletions sui_core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,78 +248,37 @@ impl<A> ClientState<A> {
self.address
}

pub fn next_sequence_number(&self, object_id: &ObjectID) -> Result<SequenceNumber, SuiError> {
if self.store.object_sequence_numbers.contains_key(object_id)? {
Ok(self
.store
.object_sequence_numbers
.get(object_id)?
.expect("Unable to get sequence number"))
} else {
Err(SuiError::ObjectNotFound {
object_id: *object_id,
})
}
pub fn highest_known_version(&self, object_id: &ObjectID) -> Result<SequenceNumber, SuiError> {
self.latest_object_ref(object_id)
.map(|(_oid, seq_num, _digest)| seq_num)
}
pub fn object_ref(&self, object_id: ObjectID) -> Result<ObjectRef, SuiError> {
pub fn latest_object_ref(&self, object_id: &ObjectID) -> Result<ObjectRef, SuiError> {
self.store
.object_refs
.get(&object_id)?
.ok_or(SuiError::ObjectNotFound { object_id })
.get(object_id)?
.ok_or(SuiError::ObjectNotFound {
object_id: *object_id,
})
}

pub fn object_refs(&self) -> impl Iterator<Item = (ObjectID, ObjectRef)> + '_ {
self.store.object_refs.iter()
}

/// Need to remove unwraps. Found this tricky due to iterator requirements of downloader and not being able to exit from closure to top fn
/// https://github.com/MystenLabs/fastnft/issues/307
pub fn certificates(
&self,
object_id: &ObjectID,
) -> impl Iterator<Item = CertifiedTransaction> + '_ {
self.store
.object_certs
.get(object_id)
.unwrap()
.into_iter()
.flat_map(|cert_digests| {
self.store
.certificates
.multi_get(&cert_digests[..])
.unwrap()
.into_iter()
.flatten()
})
}

pub fn all_certificates(
&self,
) -> impl Iterator<Item = (TransactionDigest, CertifiedTransaction)> + '_ {
self.store.certificates.iter()
}

pub fn insert_object_info(
&mut self,
object_ref: &ObjectRef,
parent_tx_digest: &TransactionDigest,
) -> Result<(), SuiError> {
let (object_id, seq, _) = object_ref;
let mut tx_digests = self.store.object_certs.get(object_id)?.unwrap_or_default();
tx_digests.push(*parent_tx_digest);

let (object_id, _, _) = object_ref;
// Multi table atomic insert using batches
let batch = self
.store
.object_sequence_numbers
.object_refs
.batch()
.insert_batch(
&self.store.object_sequence_numbers,
std::iter::once((object_id, seq)),
)?
.insert_batch(
&self.store.object_certs,
std::iter::once((object_id, &tx_digests.to_vec())),
std::iter::once((object_ref, parent_tx_digest)),
)?
.insert_batch(
&self.store.object_refs,
Expand All @@ -331,16 +290,15 @@ impl<A> ClientState<A> {
}

pub fn remove_object_info(&mut self, object_id: &ObjectID) -> Result<(), SuiError> {
let min_for_id = (*object_id, SequenceNumber::MIN, ObjectDigest::MIN);
let max_for_id = (*object_id, SequenceNumber::MAX, ObjectDigest::MAX);

// Multi table atomic delete using batches
let batch = self
.store
.object_sequence_numbers
.object_refs
.batch()
.delete_batch(
&self.store.object_sequence_numbers,
std::iter::once(object_id),
)?
.delete_batch(&self.store.object_certs, std::iter::once(object_id))?
.delete_range(&self.store.object_certs, &min_for_id, &max_for_id)?
.delete_batch(&self.store.object_refs, std::iter::once(object_id))?;
// Execute atomic write of opers
batch.write()?;
Expand Down Expand Up @@ -398,7 +356,7 @@ where
) -> Result<(CertifiedTransaction, TransactionEffects), anyhow::Error> {
for object_kind in &transaction.input_objects() {
let object_id = object_kind.object_id();
let next_sequence_number = self.next_sequence_number(&object_id).unwrap_or_default();
let next_sequence_number = self.highest_known_version(&object_id).unwrap_or_default();
fp_ensure!(
object_kind.version() >= next_sequence_number,
SuiError::UnexpectedSequenceNumber {
Expand Down Expand Up @@ -499,11 +457,7 @@ where

for &(object_ref, owner) in effects.mutated_and_created() {
let (object_id, seq, _) = object_ref;
let old_seq = self
.store
.object_sequence_numbers
.get(&object_id)?
.unwrap_or_default();
let old_seq = self.highest_known_version(&object_id).unwrap_or_default();
// only update if data is new
if old_seq < seq {
if owner == self.address {
Expand All @@ -523,11 +477,7 @@ where
let _failed = self.download_objects_not_in_db(objs_to_download).await?;

for (object_id, seq, _) in &effects.deleted {
let old_seq = self
.store
.object_sequence_numbers
.get(object_id)?
.unwrap_or_default();
let old_seq = self.highest_known_version(object_id).unwrap_or_default();
if old_seq < *seq {
self.remove_object_info(object_id)?;
}
Expand Down Expand Up @@ -641,7 +591,6 @@ where
self.try_complete_pending_transactions().await?;
}
// update object_ids.
self.store.object_sequence_numbers.clear()?;
self.store.object_refs.clear()?;

let (active_object_certs, _deleted_refs_certs) = self
Expand All @@ -651,10 +600,8 @@ where

for (object, option_layout, option_cert) in active_object_certs {
let object_ref = object.to_object_reference();
let (object_id, sequence_number, _) = object_ref;
self.store
.object_sequence_numbers
.insert(&object_id, &sequence_number)?;
let (object_id, _seqnum, _) = object_ref;

self.store.object_refs.insert(&object_id, &object_ref)?;
if let Some(cert) = option_cert {
self.store
Expand Down Expand Up @@ -817,7 +764,7 @@ where
}

fn get_owned_objects(&self) -> Vec<ObjectID> {
self.store.object_sequence_numbers.keys().collect()
self.store.object_refs.keys().collect()
}

async fn download_owned_objects_not_in_db(&self) -> Result<BTreeSet<ObjectRef>, SuiError> {
Expand Down
5 changes: 1 addition & 4 deletions sui_core/src/client/client_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,10 @@ pub struct ClientSingleAddressStore {
// The remaining fields are used to minimize networking, and may not always be persisted locally.
/// Known certificates, indexed by TX digest.
pub certificates: DBMap<TransactionDigest, CertifiedTransaction>,
/// The known objects with it's sequence number owned by the client.
pub object_sequence_numbers: DBMap<ObjectID, SequenceNumber>,
/// Confirmed objects with it's ref owned by the client.
pub object_refs: DBMap<ObjectID, ObjectRef>,
/// Certificate <-> object id linking map.
pub object_certs: DBMap<ObjectID, Vec<TransactionDigest>>,
pub object_certs: DBMap<ObjectRef, TransactionDigest>,
/// Map from object ref to actual object to track object history
/// There can be duplicates and we never delete objects
pub objects: DBMap<ObjectRef, Object>,
Expand Down Expand Up @@ -132,7 +130,6 @@ impl ClientSingleAddressStore {
ClientSingleAddressStore {
pending_transactions: client_store::reopen_db(&db, PENDING_TRANSACTIONS_CF_NAME),
certificates: client_store::reopen_db(&db, CERT_CF_NAME),
object_sequence_numbers: client_store::reopen_db(&db, SEQ_NUMBER_CF_NAME),
object_refs: client_store::reopen_db(&db, OBJ_REF_CF_NAME),
object_certs: client_store::reopen_db(&db, TX_DIGEST_TO_CERT_CF_NAME),
objects: client_store::reopen_db(&db, OBJECT_CF_NAME),
Expand Down
Loading