diff --git a/fastpay/src/client.rs b/fastpay/src/client.rs index 451324953d1ee..d297194bb48b6 100644 --- a/fastpay/src/client.rs +++ b/fastpay/src/client.rs @@ -311,9 +311,7 @@ fn find_cached_owner_by_object_id( .map(|acc| acc.address) } -fn show_object_effects(order_info_resp: OrderInfoResponse) { - let order_effects = order_info_resp.signed_effects.unwrap().effects; - +fn show_object_effects(order_effects: OrderEffects) { if order_effects.status != ExecutionStatus::Success { error!("Error publishing module: {:#?}", order_effects.status); } @@ -698,7 +696,7 @@ fn main() { recv_timeout, ) .await; - recipient_client_state.receive_object(&cert).await.unwrap(); + recipient_client_state.sync_client_state().await.unwrap(); accounts_config.update_from_state(&recipient_client_state); accounts_config .write(accounts_config_path) diff --git a/fastpay_core/src/authority_aggregator.rs b/fastpay_core/src/authority_aggregator.rs index 20c8f0e16ee2e..c041639bc00ce 100644 --- a/fastpay_core/src/authority_aggregator.rs +++ b/fastpay_core/src/authority_aggregator.rs @@ -1,8 +1,8 @@ // Copyright (c) Facebook, Inc. and its affiliates. // SPDX-License-Identifier: Apache-2.0 -use crate::{authority_client::AuthorityAPI, downloader::*}; -use async_trait::async_trait; +use crate::authority_client::AuthorityAPI; + use fastx_types::object::Object; use fastx_types::{ base_types::*, @@ -11,8 +11,7 @@ use fastx_types::{ fp_ensure, messages::*, }; -use futures::{future, StreamExt, TryFutureExt}; -use rand::seq::SliceRandom; +use futures::{future, StreamExt}; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::time::Duration; @@ -55,73 +54,6 @@ pub enum ReduceOutput { End(S), } -#[allow(dead_code)] -#[derive(Clone)] -struct CertificateRequester { - committee: Committee, - authority_clients: Vec, - sender: Option, -} - -impl CertificateRequester { - fn new( - committee: Committee, - authority_clients: Vec, - sender: Option, - ) -> Self { - Self { - committee, - authority_clients, - sender, - } - } -} - -#[async_trait] -impl Requester for CertificateRequester -where - A: AuthorityAPI + Send + Sync + 'static + Clone, -{ - type Key = (ObjectID, SequenceNumber); - type Value = Result; - - /// Try to find a certificate for the given sender, object_id and sequence number. - async fn query( - &mut self, - (object_id, sequence_number): (ObjectID, SequenceNumber), - ) -> Result { - // BUG(https://github.com/MystenLabs/fastnft/issues/290): This function assumes that requesting the parent cert of object seq+1 will give the cert of - // that creates the object. This is not true, as objects may be deleted and may not have a seq+1 - // to look up. - // - // The authority `handle_object_info_request` is now fixed to return the parent at seq, and not - // seq+1. But a lot of the client code makes the above wrong assumption, and the line above reverts - // query to the old (incorrect) behavious to not break tests everywhere. - let inner_sequence_number = sequence_number.increment(); - - let request = ObjectInfoRequest { - object_id, - request_sequence_number: Some(inner_sequence_number), - }; - // Sequentially try each authority in random order. - // TODO: Improve shuffle, different authorities might different amount of stake. - self.authority_clients.shuffle(&mut rand::thread_rng()); - for client in self.authority_clients.iter_mut() { - let result = client.handle_object_info_request(request.clone()).await; - if let Ok(ObjectInfoResponse { - parent_certificate: Some(certificate), - .. - }) = result - { - if certificate.check(&self.committee).is_ok() { - return Ok(certificate); - } - } - } - Err(FastPayError::ErrorWhileRequestingCertificate) - } -} - impl AuthorityAggregator where A: AuthorityAPI + Send + Sync + 'static + Clone, @@ -180,7 +112,6 @@ where // TODO: Eventually the client will store more information, and we could // first try to read certificates and parents from a local cache before // asking an authority. - // let input_objects = target_cert.certificate.order.input_objects(); let order_info = if missing_certificates.is_empty() { // Here we cover a corner case due to the nature of using consistent @@ -254,7 +185,7 @@ where /// stake, in order to bring the destination authority up to date to accept /// the certificate. The time devoted to each attempt is bounded by /// `timeout_milliseconds`. - pub async fn sync_certificate_to_authority_with_timeout( + async fn sync_certificate_to_authority_with_timeout( &self, cert: ConfirmationOrder, destination_authority: AuthorityName, @@ -397,7 +328,7 @@ where /// Return all the information in the network about a specific object, including all versions of it /// as well as all certificates that lead to the versions and the authorities at that version. - pub async fn get_object_by_id( + async fn get_object_by_id( &self, object_id: ObjectID, timeout_after_quorum: Duration, @@ -520,7 +451,13 @@ where /// This function returns a map between object references owned and authorities that hold the objects /// at this version, as well as a list of authorities that responsed to the query for the objects owned. - pub async fn get_all_owned_objects( + /// + /// We do not expose this function to users, as its output is hard for callers to interpet. In particular, + /// some of the entries in the list might be the result of a query to a byzantine authority, so further + /// sanitization and checks are necessary to rely on this information. + /// + /// Clients should use `sync_all_owned_objects` instead. + async fn get_all_owned_objects( &self, address: FastPayAddress, timeout_after_quorum: Duration, @@ -597,7 +534,13 @@ where &self, objects: &[ObjectID], timeout_after_quorum: Duration, - ) -> Result<(Vec, Vec), FastPayError> { + ) -> Result< + ( + Vec<(Object, Option)>, + Vec<(ObjectRef, Option)>, + ), + FastPayError, + > { let mut active_objects = Vec::new(); let mut deleted_objects = Vec::new(); let mut certs_to_sync = BTreeMap::new(); @@ -632,7 +575,7 @@ where // Otherwise report the authority as potentially faulty. if let Some(obj) = object_option { - active_objects.push(obj); + active_objects.push((obj, None)); } // Cannot be that the genesis contributes to deleted objects @@ -654,13 +597,13 @@ where // Add authorities that need to be updated let entry = certs_to_sync .entry(cert.order.digest()) - .or_insert((cert, HashSet::new())); + .or_insert((cert.clone(), HashSet::new())); entry.1.extend(authorites); // Return the latest version of an object, or a deleted object match object_option { - Some(obj) => active_objects.push(obj), - None => deleted_objects.push(object_ref), + Some(obj) => active_objects.push((obj, Some(cert))), + None => deleted_objects.push((object_ref, Some(cert))), } break; @@ -700,9 +643,14 @@ where &self, address: FastPayAddress, timeout_after_quorum: Duration, - ) -> Result<(Vec, Vec), FastPayError> { - // First get a map of all objects at one authority holds when at - // least a quorum of authorities is contacted. + ) -> Result< + ( + Vec<(Object, Option)>, + Vec<(ObjectRef, Option)>, + ), + FastPayError, + > { + // Contact a quorum of authorities, and return all objects they report we own. let (object_map, _authority_list) = self .get_all_owned_objects(address, timeout_after_quorum) .await?; @@ -737,6 +685,7 @@ where .await?; // Now broadcast the order to all authorities. + let digest = order.digest(); let threshold = self.committee.quorum_threshold(); let validity = self.committee.validity_threshold(); @@ -761,10 +710,34 @@ where let state = self .quorum_map_then_reduce_with_timeout( state, - |_name, client| { - // NOTE: here I assume the AuthorityClient has done - // the checks on this order response, and it is well formed. - Box::pin(async move { client.handle_order(order_ref.clone()).await }) + |name, client| { + Box::pin(async move { + let info_reponse = client.handle_order(order_ref.clone()).await?; + + // NOTE: Move these and check all responses for validity. + + if let Some(signed_order) = &info_reponse.signed_order { + signed_order.check(&self.committee)?; + fp_ensure!( + signed_order.authority == name, + FastPayError::ByzantineAuthoritySuspicion { authority: name } + ); + fp_ensure!( + signed_order.order.digest() == digest, + FastPayError::ByzantineAuthoritySuspicion { authority: name } + ); + } + + if let Some(certificate) = &info_reponse.certified_order { + certificate.check(&self.committee)?; + fp_ensure!( + certificate.order.digest() == digest, + FastPayError::ByzantineAuthoritySuspicion { authority: name } + ); + } + + Ok(info_reponse) + }) }, |mut state, name, weight, result| { Box::pin(async move { @@ -942,17 +915,16 @@ where #[cfg(test)] async fn request_certificate( &self, - sender: FastPayAddress, + _sender: FastPayAddress, object_id: ObjectID, - sequence_number: SequenceNumber, + _sequence_number: SequenceNumber, ) -> Result { - CertificateRequester::new( - self.committee.clone(), - self.authority_clients.values().cloned().collect(), - Some(sender), - ) - .query((object_id, sequence_number)) - .await + let (object_map, transaction_map) = self + .get_object_by_id(object_id, Duration::from_secs(10)) + .await?; + + let (_obj_ref, tx_digest) = object_map.keys().last().unwrap(); + return Ok(transaction_map[tx_digest].clone()); } /// Find the highest sequence number that is known to a quorum of authorities. @@ -1025,278 +997,20 @@ where }) } - /// Broadcast transaction order on each authority client. - async fn broadcast_tx_order( - &self, - order: Order, - ) -> Result<(OrderInfoResponse, CertifiedOrder), anyhow::Error> { - let committee = self.committee.clone(); - // We are not broadcasting any confirmation orders, so certificates_to_broadcast vec is empty - let (_confirmation_responses, order_votes) = self - .broadcast_and_execute(Vec::new(), |name, authority| { - let order = order.clone(); - let committee = committee.clone(); - Box::pin(async move { - match authority.handle_order(order).await { - // Check if the response is okay - Ok(response) => - // Verify we have a signed order - { - match response.clone().signed_order { - Some(inner_signed_order) => { - fp_ensure!( - inner_signed_order.authority == name, - FastPayError::ErrorWhileProcessingTransactionOrder { - err: "Signed by unexpected authority".to_string() - } - ); - inner_signed_order.check(&committee)?; - Ok(response) - } - None => Err(FastPayError::ErrorWhileProcessingTransactionOrder { - err: "Invalid order response".to_string(), - }), - } - } - Err(err) => Err(err), - } - }) - }) - .await?; - // Collate the signatures - // If we made it here, values are safe - let signatures = order_votes - .iter() - .map(|vote| { - ( - vote.signed_order.as_ref().unwrap().authority, - vote.signed_order.as_ref().unwrap().signature, - ) - }) - .collect::>(); - - let certificate = CertifiedOrder { order, signatures }; - // Certificate is valid because - // * `communicate_with_quorum` ensured a sufficient "weight" of (non-error) answers were returned by authorities. - // * each answer is a vote signed by the expected authority. - - // Assume all responses are same. Pick first - Ok((order_votes.get(0).unwrap().clone(), certificate)) - } - - /// Broadcast missing confirmation orders and execute provided authority action on each authority. - // BUG(https://github.com/MystenLabs/fastnft/issues/290): This logic for - // updating an authority that is behind is not correct, since we now have - // potentially many dependencies that need to be satisfied, not just a - // list. - async fn broadcast_and_execute<'a, V, F: 'a>( - &'a self, - certificates_to_broadcast: Vec, - action: F, - ) -> Result<(Vec<(CertifiedOrder, OrderInfoResponse)>, Vec), anyhow::Error> - where - F: Fn(AuthorityName, &'a A) -> AsyncResult<'a, V, FastPayError> + Send + Sync + Copy, - V: Clone, - { - let result = self - .communicate_with_quorum(|name, client| { - let certificates_to_broadcast = certificates_to_broadcast.clone(); - Box::pin(async move { - let mut responses = vec![]; - for certificate in certificates_to_broadcast { - responses.push(( - certificate.clone(), - client - .handle_confirmation_order(ConfirmationOrder::new(certificate)) - .await?, - )); - } - Ok((responses, action(name, client).await?)) - }) - }) - .await?; - - let action_results = result.iter().map(|(_, result)| result.clone()).collect(); - - // Assume all responses are the same, pick the first one. - let order_response = result - .iter() - .map(|(response, _)| response.clone()) - .next() - .unwrap_or_default(); - - Ok((order_response, action_results)) - } - - pub async fn update_authority_certificates( - &mut self, - sender: FastPayAddress, - inputs: &[InputObjectKind], - known_certificates: Vec<((ObjectID, SequenceNumber), FastPayResult)>, - ) -> FastPayResult>> { - let requester = CertificateRequester::new( - self.committee.clone(), - self.authority_clients.values().cloned().collect(), - Some(sender), - ); - - let (_, handle) = Downloader::start(requester, known_certificates); - self.communicate_with_quorum(|_name, client| { - let mut handle = handle.clone(); - Box::pin(async move { - // Sync certificate with authority - // Figure out which certificates this authority is missing. - let mut responses = Vec::new(); - let mut missing_certificates = Vec::new(); - for input_kind in inputs { - let object_id = input_kind.object_id(); - let target_sequence_number = input_kind.version(); - let request = ObjectInfoRequest { - object_id, - request_sequence_number: None, - }; - let response = client.handle_object_info_request(request).await?; - - let current_sequence_number = response - .object_and_lock - .ok_or(FastPayError::ObjectNotFound { object_id })? - .object - .version(); - - // Download each missing certificate in reverse order using the downloader. - let mut number = target_sequence_number.decrement(); - while let Ok(seq) = number { - if seq < current_sequence_number { - break; - } - let certificate = handle - .query((object_id, seq)) - .await - .map_err(|_| FastPayError::ErrorWhileRequestingCertificate)??; - missing_certificates.push(certificate); - number = seq.decrement(); - } - } - - // Send all missing confirmation orders. - missing_certificates.reverse(); - for certificate in missing_certificates { - responses.push(( - certificate.clone(), - client - .handle_confirmation_order(ConfirmationOrder::new(certificate)) - .await?, - )); - } - Ok(responses) - }) - }) - .await - } - - /// Broadcast confirmation orders. - /// The corresponding sequence numbers should be consecutive and increasing. - pub async fn broadcast_confirmation_orders( - &self, - certificates_to_broadcast: Vec, - ) -> Result, anyhow::Error> { - self.broadcast_and_execute(certificates_to_broadcast, |_, _| Box::pin(async { Ok(()) })) - .await - .map(|(responses, _)| responses) - } - - pub async fn request_certificates_from_authority( - &self, - known_sequence_numbers_map: BTreeMap<(ObjectID, SequenceNumber), HashSet>, - ) -> Result>, FastPayError> { - let mut sent_certificates: BTreeMap> = BTreeMap::new(); - - for ((object_id, next_sequence_number), known_sequence_numbers) in - known_sequence_numbers_map - { - let mut requester = CertificateRequester::new( - self.committee.clone(), - self.authority_clients.values().cloned().collect(), - None, - ); - - let entry = sent_certificates.entry(object_id).or_default(); - // TODO: it's inefficient to loop through sequence numbers to retrieve missing cert, rethink this logic when we change certificate storage in client. - let mut number = SequenceNumber::from(0); - while number < next_sequence_number { - if !known_sequence_numbers.contains(&number) { - let certificate = requester.query((object_id, number)).await?; - entry.push(certificate); - } - number = number.increment(); - } - } - Ok(sent_certificates) - } - pub async fn execute_transaction( &self, order: &Order, - ) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error> { - let new_certificate = self.execute_transaction_without_confirmation(order).await?; - - // Confirm transfer certificate if specified. - let responses = self - .broadcast_confirmation_orders(vec![new_certificate.clone()]) + ) -> Result<(CertifiedOrder, OrderEffects), anyhow::Error> { + let new_certificate = self + .process_order(order.clone(), Duration::from_secs(60)) + .await?; + let response = self + .process_certificate(new_certificate.clone(), Duration::from_secs(60)) .await?; - - // Find response for the current order from all the returned order responses. - let (_, response) = responses - .into_iter() - .find(|(cert, _)| cert.order == new_certificate.order) - .ok_or(FastPayError::ErrorWhileRequestingInformation)?; Ok((new_certificate, response)) } - /// Execute (or retry) an order without confirmation. Update local object states using newly created certificate. - async fn execute_transaction_without_confirmation( - &self, - order: &Order, - ) -> Result { - let result = self.broadcast_tx_order(order.clone()).await; - - let (_, new_sent_certificate) = result?; - assert_eq!(&new_sent_certificate.order, order); - // TODO: Verify that we don't need to update client objects here based on order_info_responses, - // but can do it at the caller site. - - Ok(new_sent_certificate) - } - - // TODO: This is incomplete at the moment. - // A complete algorithm is being introduced in - // https://github.com/MystenLabs/fastnft/pull/336. - pub async fn download_own_object_ids_from_random_authority( - &self, - address: FastPayAddress, - ) -> Result<(AuthorityName, Vec), FastPayError> { - let request = AccountInfoRequest { account: address }; - // Sequentially try each authority in random order. - let mut authorities: Vec<&AuthorityName> = self.authority_clients.keys().collect(); - // TODO: implement sampling according to stake distribution and using secure RNG. https://github.com/MystenLabs/fastnft/issues/128 - authorities.shuffle(&mut rand::thread_rng()); - // Authority could be byzantine, add timeout to avoid waiting forever. - for authority_name in authorities { - let authority = self.authority_clients.get(authority_name).unwrap(); - let result = timeout( - AUTHORITY_REQUEST_TIMEOUT, - authority.handle_account_info_request(request.clone()), - ) - .map_err(|_| FastPayError::ErrorWhileRequestingInformation) - .await?; - if let Ok(AccountInfoResponse { object_ids, .. }) = &result { - return Ok((*authority_name, object_ids.clone())); - } - } - Err(FastPayError::ErrorWhileRequestingInformation) - } - pub async fn get_object_info_execute( &mut self, object_info_req: ObjectInfoRequest, diff --git a/fastpay_core/src/client.rs b/fastpay_core/src/client.rs index 27a823279e577..a1d3a88e21486 100644 --- a/fastpay_core/src/client.rs +++ b/fastpay_core/src/client.rs @@ -5,11 +5,7 @@ use crate::{authority_aggregator::AuthorityAggregator, authority_client::Authori use async_trait::async_trait; use fastx_framework::build_move_package_to_bytes; use fastx_types::{ - base_types::*, - committee::Committee, - error::{FastPayError, FastPayResult}, - fp_ensure, - messages::*, + base_types::*, committee::Committee, error::FastPayError, fp_ensure, messages::*, }; use futures::future; use itertools::Itertools; @@ -19,6 +15,7 @@ use typed_store::rocks::open_cf; use typed_store::Map; use std::path::{Path, PathBuf}; +use std::time::Duration; use std::{ collections::{BTreeMap, BTreeSet, HashSet}, pin::Pin, @@ -60,17 +57,14 @@ pub trait Client { object_id: ObjectID, gas_payment: ObjectID, recipient: FastPayAddress, - ) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error>; - - /// Receive object from FastX. - async fn receive_object(&mut self, certificate: &CertifiedOrder) -> Result<(), anyhow::Error>; + ) -> Result<(CertifiedOrder, OrderEffects), anyhow::Error>; /// Try to complete all pending orders once. Return if any fails async fn try_complete_pending_orders(&mut self) -> Result<(), FastPayError>; /// Synchronise client state with a random authorities, updates all object_ids and certificates, request only goes out to one authority. /// this method doesn't guarantee data correctness, client will have to handle potential byzantine authority - async fn sync_client_state(&mut self) -> Result; + async fn sync_client_state(&mut self) -> Result<(), anyhow::Error>; /// Call move functions in the module in the given package, with args supplied async fn move_call( @@ -83,14 +77,14 @@ pub trait Client { object_arguments: Vec, pure_arguments: Vec>, gas_budget: u64, - ) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error>; + ) -> Result<(CertifiedOrder, OrderEffects), anyhow::Error>; /// Publish Move modules async fn publish( &mut self, package_source_files_path: String, gas_object_ref: ObjectRef, - ) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error>; + ) -> Result<(CertifiedOrder, OrderEffects), anyhow::Error>; /// Get the object information async fn get_object_info( @@ -240,28 +234,6 @@ impl ClientState { pub fn secret(&self) -> &dyn signature::Signer { &*self.secret } - - /// Given an order, return the list of certificates that are known by this client - /// for each object in the input of the order. - fn get_known_certificates( - &self, - sender: &FastPayAddress, - inputs: &[InputObjectKind], - ) -> Vec<((ObjectID, SequenceNumber), FastPayResult)> { - inputs - .iter() - .flat_map(|input_kind| { - self.certificates(&input_kind.object_id()) - .filter_map(move |cert| { - if cert.order.sender() == sender { - Some(((input_kind.object_id(), input_kind.version()), Ok(cert))) - } else { - None - } - }) - }) - .collect() - } } impl ClientState @@ -291,118 +263,17 @@ where Ok(reference) } - /// Make sure we have all our certificates with sequence number - /// in the range 0..self.next_sequence_number - pub async fn download_certificates( - &mut self, - ) -> Result>, FastPayError> { - let known_sequence_numbers_map = self - .store - .object_sequence_numbers - .iter() - .map(|(object_id, next_sequence_number)| { - ( - (object_id, next_sequence_number), - self.certificates(&object_id) - .flat_map(|cert| cert.order.input_objects()) - .filter_map(|object_kind| { - if object_kind.object_id() == object_id { - Some(object_kind.version()) - } else { - None - } - }) - .collect::>(), - ) - }) - .collect::>(); - self.authorities - .request_certificates_from_authority(known_sequence_numbers_map) - .await - } - - /// Update our view of certificates. Update the object_id and the next sequence number accordingly. - /// NOTE: This is only useful in the eventuality of missing local data. - /// We assume certificates to be valid and sent by us, and their sequence numbers to be unique. - fn update_certificates( - &mut self, - object_id: &ObjectID, - certificates: &[CertifiedOrder], - ) -> Result<(), FastPayError> { - for new_cert in certificates { - // Try to get object's last seq number before the mutation, default to 0 for newly created object. - let seq = new_cert - .order - .input_objects() - .iter() - .find_map(|object_kind| { - if object_id == &object_kind.object_id() { - Some(object_kind.version()) - } else { - None - } - }) - .unwrap_or_default(); - - let mut new_next_sequence_number = self.next_sequence_number(object_id)?; - if seq >= new_next_sequence_number { - new_next_sequence_number = seq.increment(); - } - let new_cert_order_digest = new_cert.order.digest(); - // Multi table atomic insert using batches - let mut batch = self - .store - .object_sequence_numbers - .batch() - .insert_batch( - &self.store.object_sequence_numbers, - std::iter::once((object_id, new_next_sequence_number)), - )? - .insert_batch( - &self.store.certificates, - std::iter::once((&new_cert_order_digest, new_cert)), - )?; - let mut certs = match self.store.object_certs.get(object_id)? { - Some(c) => c.clone(), - None => Vec::new(), - }; - if !certs.contains(&new_cert_order_digest) { - certs.push(new_cert_order_digest); - batch = batch.insert_batch( - &self.store.object_certs, - std::iter::once((object_id, certs)), - )?; - } - // Execute atomic write of opers - batch.write()?; - } - // Sanity check - let certificates_count = self.certificates(object_id).count(); - - if certificates_count == usize::from(self.next_sequence_number(object_id)?) { - Ok(()) - } else { - Err(FastPayError::UnexpectedSequenceNumber { - object_id: *object_id, - expected_sequence: SequenceNumber::from(certificates_count as u64), - }) - } - } - async fn execute_transaction_inner( &mut self, order: &Order, - ) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error> { - let inputs = order.input_objects(); - let known_certificates = self.get_known_certificates(order.sender(), &inputs); - self.authorities - .update_authority_certificates(*order.sender(), &inputs, known_certificates) - .await?; + ) -> Result<(CertifiedOrder, OrderEffects), anyhow::Error> { + let (new_certificate, effects) = self.authorities.execute_transaction(order).await?; - let resp = self.authorities.execute_transaction(order).await?; + // Update local data using new order response. + self.update_objects_from_order_info(new_certificate.clone(), effects.clone()) + .await?; - self.update_objects_from_order_info(resp.1.clone()).await?; - Ok(resp) + Ok((new_certificate, effects)) } /// Execute (or retry) an order and execute the Confirmation Order. @@ -413,7 +284,7 @@ where async fn execute_transaction( &mut self, order: Order, - ) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error> { + ) -> Result<(CertifiedOrder, OrderEffects), anyhow::Error> { for object_kind in &order.input_objects() { let object_id = object_kind.object_id(); let next_sequence_number = self.next_sequence_number(&object_id).unwrap_or_default(); @@ -500,55 +371,51 @@ where async fn update_objects_from_order_info( &mut self, - order_info_resp: OrderInfoResponse, - ) -> Result<(), FastPayError> { - if let Some(v) = order_info_resp.signed_effects { - // The cert should be included in the response - let cert = order_info_resp.certified_order.unwrap(); - let parent_tx_digest = cert.order.digest(); - self.store.certificates.insert(&parent_tx_digest, &cert)?; - - let mut objs_to_download = Vec::new(); - - for &(object_ref, owner) in v.effects.all_mutated() { - let (object_id, seq, _) = object_ref; - let old_seq = self - .store - .object_sequence_numbers - .get(&object_id)? - .unwrap_or_default(); - // only update if data is new - if old_seq < seq { - if owner.is_address(&self.address) { - self.insert_object_info(&object_ref, &parent_tx_digest)?; - objs_to_download.push(object_ref); - } else { - self.remove_object_info(&object_id)?; - } - } else if old_seq == seq && owner.is_address(&self.address) { - // ObjectRef can be 1 version behind because it's only updated after confirmation. - self.store.object_refs.insert(&object_id, &object_ref)?; + cert: CertifiedOrder, + effects: OrderEffects, + ) -> Result<(CertifiedOrder, OrderEffects), FastPayError> { + // The cert should be included in the response + let parent_tx_digest = cert.order.digest(); + self.store.certificates.insert(&parent_tx_digest, &cert)?; + + let mut objs_to_download = Vec::new(); + + for &(object_ref, owner) in effects.all_mutated() { + let (object_id, seq, _) = object_ref; + let old_seq = self + .store + .object_sequence_numbers + .get(&object_id)? + .unwrap_or_default(); + // only update if data is new + if old_seq < seq { + if owner.is_address(&self.address) { + self.insert_object_info(&object_ref, &parent_tx_digest)?; + objs_to_download.push(object_ref); + } else { + self.remove_object_info(&object_id)?; } + } else if old_seq == seq && owner.is_address(&self.address) { + // ObjectRef can be 1 version behind because it's only updated after confirmation. + self.store.object_refs.insert(&object_id, &object_ref)?; } + } - // TODO: decide what to do with failed object downloads - // https://github.com/MystenLabs/fastnft/issues/331 - let _failed = self.download_objects_not_in_db(objs_to_download).await?; - - for (object_id, seq, _) in &v.effects.deleted { - let old_seq = self - .store - .object_sequence_numbers - .get(object_id)? - .unwrap_or_default(); - if old_seq < *seq { - self.remove_object_info(object_id)?; - } + // TODO: decide what to do with failed object downloads + // https://github.com/MystenLabs/fastnft/issues/331 + let _failed = self.download_objects_not_in_db(objs_to_download).await?; + + for (object_id, seq, _) in &effects.deleted { + let old_seq = self + .store + .object_sequence_numbers + .get(object_id)? + .unwrap_or_default(); + if old_seq < *seq { + self.remove_object_info(object_id)?; } - Ok(()) - } else { - Err(FastPayError::ErrorWhileRequestingInformation) } + Ok((cert, effects)) } /// Fetch the objects for the given list of ObjectRefs, which do not already exist in the db. @@ -604,7 +471,7 @@ where object_id: ObjectID, gas_payment: ObjectID, recipient: FastPayAddress, - ) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error> { + ) -> Result<(CertifiedOrder, OrderEffects), anyhow::Error> { let object_ref = self .store .object_refs @@ -626,86 +493,17 @@ where gas_payment, }; let order = Order::new_transfer(transfer, &*self.secret); - let (certificate, order_info_response) = self.execute_transaction(order).await?; + let (certificate, effects) = self.execute_transaction(order).await?; + self.authorities + .process_certificate(certificate.clone(), Duration::from_secs(60)) + .await?; // remove object from local storage if the recipient is not us. if recipient != self.address { self.remove_object_info(&object_id)?; } - Ok((certificate, order_info_response)) - } - - // TODO: Revisit this and see if this method is still necessary. - // Technically we can just `sync` and fetch all changes - async fn receive_object(&mut self, certificate: &CertifiedOrder) -> Result<(), anyhow::Error> { - certificate.check(&self.authorities.committee)?; - match &certificate.order.kind { - OrderKind::Transfer(transfer) => { - fp_ensure!( - transfer.recipient == self.address, - FastPayError::IncorrectRecipientError.into() - ); - let responses = self - .authorities - .broadcast_confirmation_orders(vec![certificate.clone()]) - .await?; - - for (_, response) in responses { - self.update_objects_from_order_info(response).await?; - } - - let response = self - .get_object_info(ObjectInfoRequest { - object_id: *certificate.order.object_id(), - // TODO(https://github.com/MystenLabs/fastnft/issues/290): - // This function assumes that requesting the parent cert of object seq+1 will give the cert of - // that creates the object. This is not true, as objects may be deleted and may not have a seq+1 - // to look up. - // - // The authority `handle_object_info_request` is now fixed to return the parent at seq, and not - // seq+1. But a lot of the client code makes the above wrong assumption, and the line above reverts - // query to the old (incorrect) behavious to not break tests everywhere. - request_sequence_number: Some(transfer.object_ref.1.increment()), - }) - .await?; - - let object = &response - .object_and_lock - .ok_or(FastPayError::ObjectNotFound { - object_id: *certificate.order.object_id(), - })? - .object; - self.store - .object_refs - .insert(&object.id(), &object.to_object_reference())?; - - // Everything worked: update the local objects and certs. - let cert_order_digest = certificate.order.digest(); - if !self.store.certificates.contains_key(&cert_order_digest)? { - self.store - .object_sequence_numbers - .insert(&transfer.object_ref.0, &transfer.object_ref.1.increment())?; - let mut tx_digests = - match self.store.object_certs.get(&transfer.object_ref.0)? { - Some(c) => c, - None => Vec::new(), - }; - tx_digests.push(cert_order_digest); - self.store - .object_certs - .insert(&transfer.object_ref.0, &tx_digests.to_vec())?; - self.store - .certificates - .insert(&cert_order_digest, certificate)?; - } - - Ok(()) - } - OrderKind::Publish(_) | OrderKind::Call(_) => { - unimplemented!("receiving (?) Move call or publish") - } - } + Ok((certificate, effects)) } async fn try_complete_pending_orders(&mut self) -> Result<(), FastPayError> { @@ -726,7 +524,7 @@ where Ok(()) } - async fn sync_client_state(&mut self) -> Result { + async fn sync_client_state(&mut self) -> Result<(), anyhow::Error> { if !self.store.pending_orders.is_empty() { // Finish executing the previous orders self.try_complete_pending_orders().await?; @@ -735,24 +533,26 @@ where self.store.object_sequence_numbers.clear()?; self.store.object_refs.clear()?; - let (authority_name, object_refs) = self + let (active_object_certs, _deleted_refs_certs) = self .authorities - .download_own_object_ids_from_random_authority(self.address) + .sync_all_owned_objects(self.address, Duration::from_secs(60)) .await?; - for object_ref in object_refs { + + for (object, option_cert) in active_object_certs { + let object_ref = object.to_object_reference(); let (object_id, sequence_number, _) = object_ref; self.store .object_sequence_numbers .insert(&object_id, &sequence_number)?; self.store.object_refs.insert(&object_id, &object_ref)?; + if let Some(cert) = option_cert { + self.store + .certificates + .insert(&cert.order.digest(), &cert)?; + } } - // Recover missing certificates. - let new_certificates = self.download_certificates().await?; - for (id, certs) in new_certificates { - self.update_certificates(&id, &certs)?; - } - Ok(authority_name) + Ok(()) } async fn move_call( @@ -765,7 +565,7 @@ where object_arguments: Vec, pure_arguments: Vec>, gas_budget: u64, - ) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error> { + ) -> Result<(CertifiedOrder, OrderEffects), anyhow::Error> { let move_call_order = Order::new_move_call( self.address, package_object_ref, @@ -785,7 +585,7 @@ where &mut self, package_source_files_path: String, gas_object_ref: ObjectRef, - ) -> Result<(CertifiedOrder, OrderInfoResponse), anyhow::Error> { + ) -> Result<(CertifiedOrder, OrderEffects), anyhow::Error> { // Try to compile the package at the given path let compiled_modules = build_move_package_to_bytes(Path::new(&package_source_files_path))?; let move_publish_order = Order::new_module( diff --git a/fastpay_core/src/downloader.rs b/fastpay_core/src/downloader.rs deleted file mode 100644 index 88c1dc21c040a..0000000000000 --- a/fastpay_core/src/downloader.rs +++ /dev/null @@ -1,170 +0,0 @@ -// Copyright (c) Facebook, Inc. and its affiliates. -// SPDX-License-Identifier: Apache-2.0 - -use async_trait::async_trait; -use futures::{ - channel::{mpsc, oneshot}, - SinkExt, StreamExt, -}; -use std::collections::BTreeMap; - -#[cfg(test)] -#[path = "unit_tests/downloader_tests.rs"] -mod downloader_tests; - -/// An asynchronous downloader that ensures that the value for each key is requested at most once. -pub struct Downloader { - /// User-provided logics to fetch data. - requester: R, - /// Status of previous downloads, indexed by key. - downloads: BTreeMap>, - /// Command stream of the main handler. - command_receiver: mpsc::Receiver>, - /// How to send commands to the main handler. - command_sender: mpsc::Sender>, -} - -/// The underlying data-fetching mechanism to be provided by the user. -#[async_trait] -pub trait Requester { - type Key: std::cmp::Ord + Send + Sync + Clone + 'static; - type Value: std::fmt::Debug + Send + Clone + 'static; - - /// Request the value corresponding to the given key. - async fn query(&mut self, key: Self::Key) -> Self::Value; -} - -/// Channel for using code to send requests and stop the downloader task. -#[derive(Clone)] -pub struct DownloadHandle(mpsc::Sender>); - -/// A command send to the downloader task. -enum DownloadCommand { - /// A user requests a value. - Request(K, oneshot::Sender), - /// A value has been downloaded. - Publish(K, V), - /// Shut down the main handler. - Quit, -} - -/// The status of a download job. -enum DownloadStatus { - /// A value is available. - Ready(V), - /// Download is in progress. Subscribers are waiting for the result. - WaitingList(Vec>), -} - -impl DownloadHandle { - /// Allow to make new download queries and wait for the result. - pub async fn query(&mut self, key: K) -> Result { - let (callback, receiver) = oneshot::channel(); - self.0.send(DownloadCommand::Request(key, callback)).await?; - let value = receiver.await?; - Ok(value) - } - - /// Shut down the main handler. - pub async fn stop(&mut self) -> Result<(), anyhow::Error> { - self.0.send(DownloadCommand::Quit).await?; - Ok(()) - } -} - -impl Downloader { - /// Recover the content of the downloading cache. - fn finalize(self) -> impl Iterator { - self.downloads.into_iter().filter_map(|(_, v)| match v { - DownloadStatus::Ready(value) => Some(value), - _ => None, - }) - } -} - -impl Downloader -where - R: Requester + Send + Clone + 'static, - K: std::cmp::Ord + Send + Sync + Clone + 'static, - V: std::fmt::Debug + Send + Clone + 'static, -{ - /// Create a downloader as a wrapper around the given `requester`. - /// Fill the initial cache with some known values. - pub fn start( - requester: R, - known_values: I, - ) -> ( - tokio::task::JoinHandle>, - DownloadHandle, - ) - where - I: IntoIterator, - { - let (command_sender, command_receiver) = mpsc::channel(1024); - let mut downloads = BTreeMap::new(); - for (key, value) in known_values { - downloads.insert(key, DownloadStatus::Ready(value)); - } - let mut downloader = Self { - requester, - downloads, - command_receiver, - command_sender: command_sender.clone(), - }; - // Spawn a task for the main handler. - let task = tokio::spawn(async move { - downloader.run().await; - downloader.finalize() - }); - (task, DownloadHandle(command_sender)) - } - - /// Main handler. - async fn run(&mut self) { - loop { - match self.command_receiver.next().await { - Some(DownloadCommand::Request(key, callback)) => { - // Deconstruct self to help the borrow checker below. - let requester_ref = &self.requester; - let command_sender_ref = &self.command_sender; - let downloads_ref_mut = &mut self.downloads; - // Recover current download status or create a new one. - let entry = downloads_ref_mut.entry(key.clone()).or_insert_with(|| { - let mut requester = requester_ref.clone(); - let mut command_sender = command_sender_ref.clone(); - tokio::spawn(async move { - let result = requester.query(key.clone()).await; - command_sender - .send(DownloadCommand::Publish(key, result)) - .await - .unwrap_or(()) - }); - DownloadStatus::WaitingList(Vec::new()) - }); - // Process Request: either subscribe or return the result now. - match entry { - DownloadStatus::WaitingList(list) => { - list.push(callback); - } - DownloadStatus::Ready(result) => { - callback.send(result.clone()).unwrap_or(()); - } - } - } - Some(DownloadCommand::Publish(key, result)) => { - // Handle newly found result. - let status = std::mem::replace( - self.downloads.get_mut(&key).expect("Key should be present"), - DownloadStatus::Ready(result.clone()), - ); - if let DownloadStatus::WaitingList(subscribers) = status { - for callback in subscribers { - callback.send(result.clone()).unwrap_or(()); - } - } - } - _ => return, - } - } - } -} diff --git a/fastpay_core/src/lib.rs b/fastpay_core/src/lib.rs index 27c7ab62a8147..6762131c0f905 100644 --- a/fastpay_core/src/lib.rs +++ b/fastpay_core/src/lib.rs @@ -6,4 +6,3 @@ pub mod authority_aggregator; pub mod authority_client; pub mod authority_server; pub mod client; -pub mod downloader; diff --git a/fastpay_core/src/unit_tests/client_tests.rs b/fastpay_core/src/unit_tests/client_tests.rs index 280003481d271..34ddaaf160d36 100644 --- a/fastpay_core/src/unit_tests/client_tests.rs +++ b/fastpay_core/src/unit_tests/client_tests.rs @@ -6,10 +6,8 @@ use super::*; use crate::authority::{AuthorityState, AuthorityStore}; use crate::client::client_store::ClientStore; use crate::client::{Client, ClientState}; -use fastx_types::{ - object::{Object, GAS_VALUE_FOR_TESTING, OBJECT_START_VERSION}, - FASTX_FRAMEWORK_ADDRESS, -}; +use async_trait::async_trait; +use fastx_types::object::{Object, GAS_VALUE_FOR_TESTING, OBJECT_START_VERSION}; use futures::lock::Mutex; use move_core_types::ident_str; use std::{ @@ -188,7 +186,6 @@ fn order_transfer( ) } -#[allow(dead_code)] #[cfg(test)] fn order_set( src: FastPayAddress, @@ -631,7 +628,7 @@ async fn test_bidirectional_transfer() { ); // Update client2's local object data. - client2.receive_object(&certificate).await.unwrap(); + client2.sync_client_state().await.unwrap(); // Confirm sequence number are consistent between clients. assert_eq!( @@ -766,184 +763,6 @@ async fn test_client_state_sync_with_transferred_object() { assert_eq!(1, client2.store().certificates.iter().count()); } -#[tokio::test] -async fn test_client_certificate_state() { - let number_of_authorities = 1; - let (authority_clients, committee) = init_local_authorities(number_of_authorities).await; - let mut client1 = make_client(authority_clients.clone(), committee.clone()); - let mut client2 = make_client(authority_clients.clone(), committee); - - let object_id_1 = ObjectID::random(); - let object_id_2 = ObjectID::random(); - let gas_object_id_1 = ObjectID::random(); - let gas_object_id_2 = ObjectID::random(); - - let client1_objects = vec![object_id_1, object_id_2, gas_object_id_1]; - let client2_objects = vec![gas_object_id_2]; - - let client1_objects: Vec> = (0..number_of_authorities) - .map(|_| client1_objects.clone()) - .collect(); - - let client2_objects: Vec> = (0..number_of_authorities) - .map(|_| client2_objects.clone()) - .collect(); - - fund_account( - authority_clients.values().collect(), - &mut client1, - client1_objects, - ) - .await; - - fund_account( - authority_clients.values().collect(), - &mut client2, - client2_objects, - ) - .await; - - // Transfer object to client2. - client1 - .transfer_object(object_id_1, gas_object_id_1, client2.address()) - .await - .unwrap(); - client1 - .transfer_object(object_id_2, gas_object_id_1, client2.address()) - .await - .unwrap(); - // Should have 2 certs after 2 transfer - assert_eq!(2, client1.store().certificates.iter().count()); - // Only gas_object left in account, so object_certs link should only have 1 entry - assert_eq!(1, client1.store().object_certs.iter().count()); - // it should have 2 certificates associated with the gas object - assert!(client1 - .store() - .object_certs - .contains_key(&gas_object_id_1) - .unwrap()); - assert_eq!( - 2, - client1 - .store() - .object_certs - .get(&gas_object_id_1) - .unwrap() - .unwrap() - .len() - ); - // Sequence number should be 2 for gas object after 2 mutation. - assert_eq!( - Ok(SequenceNumber::from(2)), - client1.next_sequence_number(&gas_object_id_1) - ); - - client2.sync_client_state().await.unwrap(); - - // Client 2 should retrieve 2 certificates for the 2 transactions after sync - assert_eq!(2, client2.store().certificates.iter().count()); - assert!(client2 - .store() - .object_certs - .contains_key(&object_id_1) - .unwrap()); - assert!(client2 - .store() - .object_certs - .contains_key(&object_id_2) - .unwrap()); - assert_eq!( - 1, - client2 - .store() - .object_certs - .get(&object_id_1) - .unwrap() - .unwrap() - .len() - ); - assert_eq!( - 1, - client2 - .store() - .object_certs - .get(&object_id_2) - .unwrap() - .unwrap() - .len() - ); - // Sequence number for object 1 and 2 should be 1 after 1 mutation. - assert_eq!( - Ok(SequenceNumber::from(1)), - client2.next_sequence_number(&object_id_1) - ); - assert_eq!( - Ok(SequenceNumber::from(1)), - client2.next_sequence_number(&object_id_2) - ); - // Transfer object 2 back to client 1. - client2 - .transfer_object(object_id_2, gas_object_id_2, client1.address()) - .await - .unwrap(); - - assert_eq!(3, client2.store().certificates.iter().count()); - assert!(client2 - .store() - .object_certs - .contains_key(&object_id_1) - .unwrap()); - assert!(!client2 - .store() - .object_certs - .contains_key(&object_id_2) - .unwrap()); - assert!(client2 - .store() - .object_certs - .contains_key(&gas_object_id_2) - .unwrap()); - assert_eq!( - 1, - client2 - .store() - .object_certs - .get(&object_id_1) - .unwrap() - .unwrap() - .len() - ); - assert_eq!( - 1, - client2 - .store() - .object_certs - .get(&gas_object_id_2) - .unwrap() - .unwrap() - .len() - ); - - client1.sync_client_state().await.unwrap(); - - assert_eq!(3, client1.store().certificates.iter().count()); - assert!(client1 - .store() - .object_certs - .contains_key(&object_id_2) - .unwrap()); - assert_eq!( - 2, - client1 - .store() - .object_certs - .get(&object_id_2) - .unwrap() - .unwrap() - .len() - ); -} - #[tokio::test] async fn test_move_calls_object_create() { let (authority_clients, committee) = init_local_authorities(4).await; @@ -985,8 +804,7 @@ async fn test_move_calls_object_create() { .await; // Check effects are good - let (_, order_info_resp) = call_response.unwrap(); - let order_effects = order_info_resp.signed_effects.unwrap().effects; + let (_, order_effects) = call_response.unwrap(); // Status flag should be success assert_eq!(order_effects.status, ExecutionStatus::Success); // Nothing should be deleted during a creation @@ -1040,8 +858,7 @@ async fn test_move_calls_object_transfer() { ) .await; - let (_, order_info_resp) = call_response.unwrap(); - let order_effects = order_info_resp.signed_effects.unwrap().effects; + let (_, order_effects) = call_response.unwrap(); assert_eq!(order_effects.gas_object.0 .0, gas_object_id); @@ -1082,8 +899,7 @@ async fn test_move_calls_object_transfer() { .await; // Check effects are good - let (_, order_info_resp) = call_response.unwrap(); - let order_effects = order_info_resp.signed_effects.unwrap().effects; + let (_, order_effects) = call_response.unwrap(); // Status flag should be success assert_eq!(order_effects.status, ExecutionStatus::Success); // Nothing should be deleted during a transfer @@ -1115,9 +931,10 @@ async fn test_move_calls_object_transfer() { } #[tokio::test] -async fn test_move_calls_chain_many_authority_syncronization() { +async fn test_move_calls_object_transfer_and_freeze() { let (authority_clients, committee) = init_local_authorities(4).await; let mut client1 = make_client(authority_clients.clone(), committee.clone()); + let client2 = make_client(authority_clients.clone(), committee); let object_value: u64 = 100; let gas_object_id = ObjectID::random(); @@ -1125,7 +942,7 @@ async fn test_move_calls_chain_many_authority_syncronization() { // Populate authorities with obj data let mut gas_object_ref = fund_account_with_same_objects( - authority_clients.clone().values().collect(), + authority_clients.values().collect(), &mut client1, vec![gas_object_id], ) @@ -1154,74 +971,20 @@ async fn test_move_calls_chain_many_authority_syncronization() { ) .await; - let (mut last_certificate, order_info_resp) = call_response.unwrap(); - let order_effects = order_info_resp.signed_effects.unwrap().effects; - - assert_eq!(order_effects.gas_object.0 .0, gas_object_id); - + let (_, order_effects) = call_response.unwrap(); // Get the object created from the call let (new_obj_ref, _) = order_effects.created[0]; - - for value in 0u64..10u64 { - // Fetch the full object - let new_obj = client1 - .get_object_info(ObjectInfoRequest { - object_id: new_obj_ref.0, - request_sequence_number: None, - }) - .await - .unwrap(); - - gas_object_ref = client1 - .get_object_info(ObjectInfoRequest { - object_id: gas_object_ref.0, - request_sequence_number: None, - }) - .await - .unwrap() - .object() - .unwrap() - .to_object_reference(); - - let pure_args = vec![bcs::to_bytes(&value).unwrap()]; - let _call_response = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("set_value").to_owned(), - Vec::new(), - gas_object_ref, - vec![new_obj.object().unwrap().to_object_reference()], - pure_args, - GAS_VALUE_FOR_TESTING / 2, - ) - .await; - - last_certificate = _call_response.unwrap().0; - } - - // For this test to work the client has updated the first 3 authorities but not the last one - // Assert this to catch any changes to the client behaviour that reqire fixing this test to still - // test sync. - - let authorities: Vec<_> = authority_clients.clone().into_iter().collect(); - - let full_seq = authorities[2] - .1 - .handle_object_info_request(ObjectInfoRequest { - object_id: gas_object_ref.0, + // Fetch the full object + let new_obj = client1 + .get_object_info(ObjectInfoRequest { + object_id: new_obj_ref.0, request_sequence_number: None, }) .await - .unwrap() - .object() - .unwrap() - .to_object_reference(); - assert_eq!(full_seq.1, SequenceNumber::from(11)); + .unwrap(); - let zero_seq = authorities[3] - .1 - .handle_object_info_request(ObjectInfoRequest { + gas_object_ref = client1 + .get_object_info(ObjectInfoRequest { object_id: gas_object_ref.0, request_sequence_number: None, }) @@ -1230,59 +993,58 @@ async fn test_move_calls_chain_many_authority_syncronization() { .object() .unwrap() .to_object_reference(); - assert_eq!(zero_seq.1, SequenceNumber::from(0)); - - // This is (finally) the function we want to test - // If we try to sync from the authority that does not have the data to the one - // that does not we fail. - let result = client1 - .authorities() - .sync_authority_source_to_destination( - ConfirmationOrder::new(last_certificate.clone()), - authorities[3].0, - authorities[3].0, + let pure_args = vec![bcs::to_bytes(&client2.address().to_vec()).unwrap()]; + let call_response = client1 + .move_call( + framework_obj_ref, + ident_str!("ObjectBasics").to_owned(), + ident_str!("transfer_and_freeze").to_owned(), + Vec::new(), + gas_object_ref, + vec![new_obj.object().unwrap().to_object_reference()], + pure_args, + GAS_VALUE_FOR_TESTING / 2, ) .await; - assert!(result.is_err()); + // Check effects are good + let (_, order_effects) = call_response.unwrap(); + // Status flag should be success + assert_eq!(order_effects.status, ExecutionStatus::Success); + // Nothing should be deleted during a transfer + assert!(order_effects.deleted.is_empty()); + // Item being transfered is mutated. + assert_eq!(order_effects.mutated.len(), 1); - // Here we get the list of objects known by authorities. - let (obj_map, _auths) = client1 - .authorities() - .get_all_owned_objects(client1.address(), Duration::from_secs(10)) - .await - .unwrap(); - // Check only 3 out of 4 authorities have the latest object - assert_eq!(obj_map[&full_seq].len(), 3); + let (transferred_obj_ref, _) = order_effects.mutated[0]; + assert_ne!(gas_object_ref, transferred_obj_ref); - // If we try to sync from the authority that does have the data to the one - // that does not we succeed. - let result = client1 - .authorities() - .sync_authority_source_to_destination( - ConfirmationOrder::new(last_certificate), - authorities[2].0, - authorities[3].0, - ) - .await; + assert_eq!(transferred_obj_ref.0, new_obj_ref.0); - // Here we get the list of objects known by authorities. - let (obj_map, _auths) = client1 - .authorities() - .get_all_owned_objects(client1.address(), Duration::from_secs(10)) + let transferred_obj_info = client1 + .get_object_info(ObjectInfoRequest { + object_id: new_obj_ref.0, + request_sequence_number: None, + }) .await .unwrap(); - // Check all 4 out of 4 authorities have the latest object - assert_eq!(obj_map[&full_seq].len(), 4); - assert!(result.is_ok()); + // Confirm new owner + assert!(transferred_obj_info + .object() + .unwrap() + .owner + .is_address(&client2.address())); + + // Confirm read only + assert!(transferred_obj_info.object().unwrap().is_read_only()); } #[tokio::test] -async fn test_move_calls_chain_many_delete_authority_synchronization() { +async fn test_move_calls_object_delete() { let (authority_clients, committee) = init_local_authorities(4).await; - let mut client1 = make_client(authority_clients.clone(), committee.clone()); + let mut client1 = make_client(authority_clients.clone(), committee); let object_value: u64 = 100; let gas_object_id = ObjectID::random(); @@ -1290,7 +1052,7 @@ async fn test_move_calls_chain_many_delete_authority_synchronization() { // Populate authorities with obj data let mut gas_object_ref = fund_account_with_same_objects( - authority_clients.clone().values().collect(), + authority_clients.values().collect(), &mut client1, vec![gas_object_id], ) @@ -1319,61 +1081,17 @@ async fn test_move_calls_chain_many_delete_authority_synchronization() { ) .await; - let (_, order_info_resp) = call_response.unwrap(); - let order_effects = order_info_resp.signed_effects.unwrap().effects; - - assert_eq!(order_effects.gas_object.0 .0, gas_object_id); - + let (_, order_effects) = call_response.unwrap(); // Get the object created from the call let (new_obj_ref, _) = order_effects.created[0]; - - for value in 0u64..20u64 { - // Fetch the full object - let new_obj = client1 - .get_object_info(ObjectInfoRequest { - object_id: new_obj_ref.0, - request_sequence_number: None, - }) - .await - .unwrap(); - - gas_object_ref = client1 - .get_object_info(ObjectInfoRequest { - object_id: gas_object_ref.0, - request_sequence_number: None, - }) - .await - .unwrap() - .object() - .unwrap() - .to_object_reference(); - - let pure_args = vec![bcs::to_bytes(&value).unwrap()]; - let _call_response = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("set_value").to_owned(), - Vec::new(), - gas_object_ref, - vec![new_obj.object().unwrap().to_object_reference()], - pure_args, - GAS_VALUE_FOR_TESTING / 2, - ) - .await; - } - // Fetch the full object - let new_obj_ref = client1 + let new_obj = client1 .get_object_info(ObjectInfoRequest { object_id: new_obj_ref.0, request_sequence_number: None, }) .await - .unwrap() - .object() - .unwrap() - .to_object_reference(); + .unwrap(); gas_object_ref = client1 .get_object_info(ObjectInfoRequest { @@ -1393,87 +1111,46 @@ async fn test_move_calls_chain_many_delete_authority_synchronization() { ident_str!("delete").to_owned(), Vec::new(), gas_object_ref, - vec![new_obj_ref], + vec![new_obj.object().unwrap().to_object_reference()], Vec::new(), GAS_VALUE_FOR_TESTING / 2, ) .await; - let last_certificate = call_response.unwrap().0; - - // For this test to work the client has updated the first 3 authorities but not the last one - // Assert this to catch any changes to the client behaviour that reqire fixing this test to still - // test sync. - - let authorities: Vec<_> = authority_clients.clone().into_iter().collect(); - - let full_seq = authorities[2] - .1 - .handle_object_info_request(ObjectInfoRequest { - object_id: gas_object_ref.0, - request_sequence_number: None, - }) - .await - .unwrap() - .object() - .unwrap() - .to_object_reference(); - assert_eq!(full_seq.1, SequenceNumber::from(22)); + // Check effects are good + let (_, order_effects) = call_response.unwrap(); + // Status flag should be success + assert_eq!(order_effects.status, ExecutionStatus::Success); + // Object be deleted during a delete + assert_eq!(order_effects.deleted.len(), 1); + // No item is mutated. + assert_eq!(order_effects.mutated.len(), 0); + // Confirm the items + assert_eq!(order_effects.gas_object.0 .0, gas_object_id); - let zero_seq = authorities[3] - .1 - .handle_object_info_request(ObjectInfoRequest { - object_id: gas_object_ref.0, + // Try to fetch the deleted object + let deleted_object_resp = client1 + .get_object_info(ObjectInfoRequest { + object_id: new_obj_ref.0, request_sequence_number: None, }) .await - .unwrap() - .object() - .unwrap() - .to_object_reference(); - assert_eq!(zero_seq.1, SequenceNumber::from(0)); - - // This is (finally) the function we want to test - - // If we try to sync from the authority that does not have the data to the one - // that does not have the data we fail. - let result = client1 - .authorities() - .sync_authority_source_to_destination( - ConfirmationOrder::new(last_certificate.clone()), - authorities[3].0, - authorities[3].0, - ) - .await; - - assert!(result.is_err()); - - // If we try to sync from the authority that does have the data to the one - // that does not we succeed. - let result = client1 - .authorities() - .sync_authority_source_to_destination( - ConfirmationOrder::new(last_certificate), - authorities[2].0, - authorities[3].0, - ) - .await; + .unwrap(); - assert!(result.is_ok()); + assert!(deleted_object_resp.object_and_lock.is_none()); } #[tokio::test] -async fn test_move_calls_chain_many_delete_authority_auto_synchronization() { +async fn test_module_publish_and_call_good() { + // Init the states let (authority_clients, committee) = init_local_authorities(4).await; - let mut client1 = make_client(authority_clients.clone(), committee.clone()); + let mut client1 = make_client(authority_clients.clone(), committee); - let object_value: u64 = 100; let gas_object_id = ObjectID::random(); - let framework_obj_ref = client1.get_framework_object_ref().await.unwrap(); - // Populate authorities with obj data - let mut gas_object_ref = fund_account_with_same_objects( - authority_clients.clone().values().collect(), + // Populate authorities with gas obj data + let gas_object_ref = fund_account_with_same_objects( + authority_clients.values().collect(), &mut client1, vec![gas_object_id], ) @@ -1484,165 +1161,108 @@ async fn test_move_calls_chain_many_delete_authority_auto_synchronization() { .1 .to_object_reference(); - // When creating an ObjectBasics object, we provide the value (u64) and address which will own the object - let pure_args = vec![ - object_value.to_le_bytes().to_vec(), - bcs::to_bytes(&client1.address().to_vec()).unwrap(), - ]; - let call_response = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("create").to_owned(), - Vec::new(), - gas_object_ref, - Vec::new(), - pure_args, - GAS_VALUE_FOR_TESTING - 1, // Make sure budget is less than gas value - ) - .await; - - let (_, order_info_resp) = call_response.unwrap(); - let order_effects = order_info_resp.signed_effects.unwrap().effects; - assert_eq!(order_effects.gas_object.0 .0, gas_object_id); + // Provide path to well formed package sources + let mut hero_path = env!("CARGO_MANIFEST_DIR").to_owned(); + hero_path.push_str("/../fastx_programmability/examples/"); - // Get the object created from the call - let (new_obj_ref, _) = order_effects.created[0]; + let pub_res = client1.publish(hero_path, gas_object_ref).await; - for value in 0u64..20u64 { - // Fetch the full object - let new_obj = client1 - .get_object_info(ObjectInfoRequest { - object_id: new_obj_ref.0, - request_sequence_number: None, - }) - .await - .unwrap(); + let (_, published_effects) = pub_res.unwrap(); - gas_object_ref = client1 - .get_object_info(ObjectInfoRequest { - object_id: gas_object_ref.0, - request_sequence_number: None, - }) - .await - .unwrap() - .object() - .unwrap() - .to_object_reference(); - - let pure_args = vec![bcs::to_bytes(&value).unwrap()]; - let _call_response = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("set_value").to_owned(), - Vec::new(), - gas_object_ref, - vec![new_obj.object().unwrap().to_object_reference()], - pure_args, - GAS_VALUE_FOR_TESTING / 2, - ) - .await; - } + // Only package obj should be created + assert_eq!(published_effects.created.len(), 1); - // Fetch the full object - let new_obj_ref = client1 + // Verif gas obj + assert_eq!(published_effects.gas_object.0 .0, gas_object_ref.0); + + let (new_obj_ref, _) = published_effects.created.get(0).unwrap(); + assert_ne!(gas_object_ref, *new_obj_ref); + + // We now have the module obj ref + // We can inspect it + + let new_obj = client1 .get_object_info(ObjectInfoRequest { object_id: new_obj_ref.0, request_sequence_number: None, }) .await - .unwrap() - .object() - .unwrap() - .to_object_reference(); + .unwrap(); - gas_object_ref = client1 + // Version should be 1 for all modules + assert_eq!(new_obj.object().unwrap().version(), OBJECT_START_VERSION); + // Must be immutable + assert!(new_obj.object().unwrap().is_read_only()); + + // StructTag type is not defined for package + assert!(new_obj.object().unwrap().type_().is_none()); + + // Data should be castable as a package + assert!(new_obj.object().unwrap().data.try_as_package().is_some()); + + // Retrieve latest gas obj spec + let gas_object = client1 .get_object_info(ObjectInfoRequest { - object_id: gas_object_ref.0, + object_id: gas_object_id, request_sequence_number: None, }) .await .unwrap() .object() .unwrap() - .to_object_reference(); + .clone(); - let call_response = client1 + let gas_object_ref = gas_object.to_object_reference(); + + //Try to call a function in TrustedCoin module + let call_resp = client1 .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("delete").to_owned(), - Vec::new(), + new_obj.object().unwrap().to_object_reference(), + ident_str!("TrustedCoin").to_owned(), + ident_str!("init").to_owned(), + vec![], gas_object_ref, - vec![new_obj_ref], - Vec::new(), - GAS_VALUE_FOR_TESTING / 2, + vec![], + vec![], + 1000, ) - .await; - - let last_certificate = call_response.unwrap().0; - - // For this test to work the client has updated the first 3 authorities but not the last one - // Assert this to catch any changes to the client behaviour that reqire fixing this test to still - // test sync. + .await + .unwrap(); - let authorities: Vec<_> = authority_clients.clone().into_iter().collect(); + let effects = call_resp.1; + assert!(effects.status == ExecutionStatus::Success); - let full_seq = authorities[2] - .1 - .handle_object_info_request(ObjectInfoRequest { - object_id: gas_object_ref.0, - request_sequence_number: None, - }) - .await - .unwrap() - .object() + // This gets the treasury cap for the coin and gives it to the sender + let tres_cap_ref = effects + .created + .iter() + .find(|r| r.0 .0 != gas_object_ref.0) .unwrap() - .to_object_reference(); - assert_eq!(full_seq.1, SequenceNumber::from(22)); + .0; - let zero_seq = authorities[3] - .1 - .handle_object_info_request(ObjectInfoRequest { - object_id: gas_object_ref.0, + // Fetch the full obj info + let tres_cap_obj_info = client1 + .get_object_info(ObjectInfoRequest { + object_id: tres_cap_ref.0, request_sequence_number: None, }) .await - .unwrap() - .object() - .unwrap() - .to_object_reference(); - assert_eq!(zero_seq.1, SequenceNumber::from(0)); - - // This is (finally) the function we want to test - - // If we try to sync we succeed. - let result = client1 - .authorities() - .sync_certificate_to_authority_with_timeout( - ConfirmationOrder::new(last_certificate), - authorities[3].0, - Duration::from_millis(1000), // ms - DEFAULT_RETRIES, // retry - ) - .await; - - assert!(result.is_ok()); + .unwrap(); + // Confirm we own this object + assert_eq!(tres_cap_obj_info.object().unwrap().owner, gas_object.owner); } +// Pass a file in a package dir instead of the root. The builder should be able to infer the root #[tokio::test] -async fn test_move_calls_object_transfer_and_freeze() { +async fn test_module_publish_file_path() { + // Init the states let (authority_clients, committee) = init_local_authorities(4).await; - let mut client1 = make_client(authority_clients.clone(), committee.clone()); - let client2 = make_client(authority_clients.clone(), committee); + let mut client1 = make_client(authority_clients.clone(), committee); - let object_value: u64 = 100; let gas_object_id = ObjectID::random(); - let framework_obj_ref = client1.get_framework_object_ref().await.unwrap(); - // Populate authorities with obj data - let mut gas_object_ref = fund_account_with_same_objects( + // Populate authorities with gas obj data + let gas_object_ref = fund_account_with_same_objects( authority_clients.values().collect(), &mut client1, vec![gas_object_id], @@ -1654,29 +1274,27 @@ async fn test_move_calls_object_transfer_and_freeze() { .1 .to_object_reference(); - // When creating an ObjectBasics object, we provide the value (u64) and address which will own the object - let pure_args = vec![ - object_value.to_le_bytes().to_vec(), - bcs::to_bytes(&client1.address().to_vec()).unwrap(), - ]; - let call_response = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("create").to_owned(), - Vec::new(), - gas_object_ref, - Vec::new(), - pure_args, - GAS_VALUE_FOR_TESTING - 1, // Make sure budget is less than gas value - ) - .await; + // Compile + let mut hero_path = env!("CARGO_MANIFEST_DIR").to_owned(); - let (_, order_info_resp) = call_response.unwrap(); - let order_effects = order_info_resp.signed_effects.unwrap().effects; - // Get the object created from the call - let (new_obj_ref, _) = order_effects.created[0]; - // Fetch the full object + // Use a path pointing to a different file + hero_path.push_str("/../fastx_programmability/examples/Hero.move"); + + let pub_resp = client1.publish(hero_path, gas_object_ref).await; + + let (_, published_effects) = pub_resp.unwrap(); + + // Only package obj should be created + assert_eq!(published_effects.created.len(), 1); + + // Verif gas + assert_eq!(published_effects.gas_object.0 .0, gas_object_ref.0); + + let (new_obj_ref, _) = published_effects.created.get(0).unwrap(); + assert_ne!(gas_object_ref, *new_obj_ref); + + // We now have the module obj ref + // We can inspect it let new_obj = client1 .get_object_info(ObjectInfoRequest { object_id: new_obj_ref.0, @@ -1685,76 +1303,91 @@ async fn test_move_calls_object_transfer_and_freeze() { .await .unwrap(); - gas_object_ref = client1 + // Version should be 1 for all modules + assert_eq!(new_obj.object().unwrap().version(), OBJECT_START_VERSION); + // Must be immutable + assert!(new_obj.object().unwrap().is_read_only()); + + // StructTag type is not defined for package + assert!(new_obj.object().unwrap().type_().is_none()); + + // Data should be castable as a package + assert!(new_obj.object().unwrap().data.try_as_package().is_some()); + + // Retrieve latest gas obj spec + let gas_object = client1 .get_object_info(ObjectInfoRequest { - object_id: gas_object_ref.0, + object_id: gas_object_id, request_sequence_number: None, }) .await .unwrap() .object() .unwrap() - .to_object_reference(); + .clone(); - let pure_args = vec![bcs::to_bytes(&client2.address().to_vec()).unwrap()]; - let call_response = client1 + let gas_object_ref = gas_object.to_object_reference(); + + // Even though we provided a path to Hero.move, the builder is able to find the package root + // build all in the package, including TrustedCoin module + //Try to call a function in TrustedCoin module + let call_resp = client1 .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("transfer_and_freeze").to_owned(), - Vec::new(), + new_obj.object().unwrap().to_object_reference(), + ident_str!("TrustedCoin").to_owned(), + ident_str!("init").to_owned(), + vec![], gas_object_ref, - vec![new_obj.object().unwrap().to_object_reference()], - pure_args, - GAS_VALUE_FOR_TESTING / 2, + vec![], + vec![], + 1000, ) .await; + assert!(call_resp.is_ok()); +} - // Check effects are good - let (_, order_info_resp) = call_response.unwrap(); - let order_effects = order_info_resp.signed_effects.unwrap().effects; - // Status flag should be success - assert_eq!(order_effects.status, ExecutionStatus::Success); - // Nothing should be deleted during a transfer - assert!(order_effects.deleted.is_empty()); - // Item being transfered is mutated. - assert_eq!(order_effects.mutated.len(), 1); +#[tokio::test] +async fn test_module_publish_bad_path() { + // Init the states + let (authority_clients, committee) = init_local_authorities(4).await; + let mut client1 = make_client(authority_clients.clone(), committee); - let (transferred_obj_ref, _) = order_effects.mutated[0]; - assert_ne!(gas_object_ref, transferred_obj_ref); + let gas_object_id = ObjectID::random(); - assert_eq!(transferred_obj_ref.0, new_obj_ref.0); + // Populate authorities with gas obj data + let gas_object_ref = fund_account_with_same_objects( + authority_clients.values().collect(), + &mut client1, + vec![gas_object_id], + ) + .await + .iter() + .next() + .unwrap() + .1 + .to_object_reference(); - let transferred_obj_info = client1 - .get_object_info(ObjectInfoRequest { - object_id: new_obj_ref.0, - request_sequence_number: None, - }) - .await - .unwrap(); + // Compile + let mut hero_path = env!("CARGO_MANIFEST_DIR").to_owned(); - // Confirm new owner - assert!(transferred_obj_info - .object() - .unwrap() - .owner - .is_address(&client2.address())); + // Use a bad path + hero_path.push_str("/../fastx_____programmability/examples/"); - // Confirm read only - assert!(transferred_obj_info.object().unwrap().is_read_only()); + let pub_resp = client1.publish(hero_path, gas_object_ref).await; + // Has to fail + assert!(pub_resp.is_err()); } #[tokio::test] -async fn test_move_calls_object_delete() { +async fn test_module_publish_naughty_path() { + // Init the states let (authority_clients, committee) = init_local_authorities(4).await; let mut client1 = make_client(authority_clients.clone(), committee); - let object_value: u64 = 100; let gas_object_id = ObjectID::random(); - let framework_obj_ref = client1.get_framework_object_ref().await.unwrap(); - // Populate authorities with obj data - let mut gas_object_ref = fund_account_with_same_objects( + // Populate authorities with gas obj data + let gas_object_ref = fund_account_with_same_objects( authority_clients.values().collect(), &mut client1, vec![gas_object_id], @@ -1766,934 +1399,25 @@ async fn test_move_calls_object_delete() { .1 .to_object_reference(); - // When creating an ObjectBasics object, we provide the value (u64) and address which will own the object - let pure_args = vec![ - object_value.to_le_bytes().to_vec(), - bcs::to_bytes(&client1.address().to_vec()).unwrap(), - ]; - let call_response = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("create").to_owned(), - Vec::new(), - gas_object_ref, - Vec::new(), - pure_args, - GAS_VALUE_FOR_TESTING - 1, // Make sure budget is less than gas value - ) - .await; - - let (_, order_info_resp) = call_response.unwrap(); - // Get the object created from the call - let order_effects = order_info_resp.signed_effects.unwrap().effects; - let (new_obj_ref, _) = order_effects.created[0]; - // Fetch the full object - let new_obj = client1 - .get_object_info(ObjectInfoRequest { - object_id: new_obj_ref.0, - request_sequence_number: None, - }) - .await - .unwrap(); - - gas_object_ref = client1 - .get_object_info(ObjectInfoRequest { - object_id: gas_object_ref.0, - request_sequence_number: None, - }) - .await - .unwrap() - .object() - .unwrap() - .to_object_reference(); - - let call_response = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("delete").to_owned(), - Vec::new(), - gas_object_ref, - vec![new_obj.object().unwrap().to_object_reference()], - Vec::new(), - GAS_VALUE_FOR_TESTING / 2, - ) - .await; - - // Check effects are good - let (_, order_info_resp) = call_response.unwrap(); - let order_effects = order_info_resp.signed_effects.unwrap().effects; - - // Status flag should be success - assert_eq!(order_effects.status, ExecutionStatus::Success); - // Object be deleted during a delete - assert_eq!(order_effects.deleted.len(), 1); - // No item is mutated. - assert_eq!(order_effects.mutated.len(), 0); - // Confirm the items - assert_eq!(order_effects.gas_object.0 .0, gas_object_id); - - // Try to fetch the deleted object - let deleted_object_resp = client1 - .get_object_info(ObjectInfoRequest { - object_id: new_obj_ref.0, - request_sequence_number: None, - }) - .await - .unwrap(); - - assert!(deleted_object_resp.object_and_lock.is_none()); -} - -#[tokio::test] -async fn test_move_calls_certs() { - let (authority_clients, committee) = init_local_authorities(1).await; - let mut client1 = make_client(authority_clients.clone(), committee.clone()); - let mut client2 = make_client(authority_clients.clone(), committee); - - let gas_object_id = ObjectID::random(); - - let framework_obj_ref = client1 - .get_object_info(ObjectInfoRequest { - object_id: FASTX_FRAMEWORK_ADDRESS, - request_sequence_number: None, - }) - .await - .unwrap() - .object() - .unwrap() - .to_object_reference(); - - // Populate authorities with obj data - let authority_objects = vec![vec![gas_object_id]]; - - let gas_object_ref = fund_account( - authority_clients.values().collect(), - &mut client1, - authority_objects, - ) - .await - .get(&gas_object_id) - .unwrap() - .to_object_reference(); - - // When creating an ObjectBasics object, we provide the value (u64) and address which will own the object - let object_value: u64 = 100; - let pure_args = vec![ - object_value.to_le_bytes().to_vec(), - bcs::to_bytes(&client1.address().to_vec()).unwrap(), - ]; - - // Create new object with move - let (cert, order_info_resp) = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("create").to_owned(), - Vec::new(), - gas_object_ref, - Vec::new(), - pure_args, - GAS_VALUE_FOR_TESTING - 1, // Make sure budget is less than gas value - ) - .await - .unwrap(); - let effect = order_info_resp.signed_effects.unwrap().effects; - let new_object_ref = &effect.created[0].0; - - let gas_object_ref = &effect.gas_object.0; - - let (new_object_id, _, _) = &new_object_ref; - - // Client 1 should have one certificate, one new object and one gas object, each with one associated certificate. - assert!(client1 - .store() - .certificates - .contains_key(&cert.order.digest()) - .unwrap()); - assert_eq!(1, client1.store().certificates.iter().count()); - assert_eq!(2, client1.store().object_sequence_numbers.iter().count()); - assert_eq!(2, client1.store().object_certs.iter().count()); - assert!(client1 - .store() - .object_certs - .contains_key(&gas_object_id) - .unwrap()); - assert!(client1 - .store() - .object_certs - .contains_key(new_object_id) - .unwrap()); - assert_eq!( - 1, - client1 - .store() - .object_certs - .get(&gas_object_id) - .unwrap() - .unwrap() - .len() - ); - assert_eq!( - 1, - client1 - .store() - .object_certs - .get(new_object_id) - .unwrap() - .unwrap() - .len() - ); - assert_eq!( - OBJECT_START_VERSION, - client1 - .store() - .object_sequence_numbers - .get(&gas_object_id) - .unwrap() - .unwrap() - .clone() - ); - assert_eq!( - OBJECT_START_VERSION, - client1 - .store() - .object_sequence_numbers - .get(new_object_id) - .unwrap() - .unwrap() - .clone() - ); - - // Transfer object with move - let pure_args = vec![bcs::to_bytes(&client2.address().to_vec()).unwrap()]; - let (cert, _) = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("transfer").to_owned(), - Vec::new(), - *gas_object_ref, - vec![*new_object_ref], - pure_args, - GAS_VALUE_FOR_TESTING / 2, // Make sure budget is less than gas value - ) - .await - .unwrap(); - - // Client 1 should have two certificate, one gas object, with two associated certificate. - assert!(client1 - .store() - .certificates - .contains_key(&cert.order.digest()) - .unwrap()); - assert_eq!(client1.store().certificates.iter().count(), 2); - assert_eq!(client1.store().object_sequence_numbers.iter().count(), 1); - assert_eq!(client1.store().object_certs.iter().count(), 1); - assert!(client1 - .store() - .object_certs - .contains_key(&gas_object_id) - .unwrap()); - assert_eq!( - 2, - client1 - .store() - .object_certs - .get(&gas_object_id) - .unwrap() - .unwrap() - .len() - ); - assert_eq!( - SequenceNumber::from(2), - client1 - .store() - .object_sequence_numbers - .get(&gas_object_id) - .unwrap() - .unwrap() - .clone() - ); - - // Sync client 2 - client2.sync_client_state().await.unwrap(); - - // Client 2 should have 2 certificate, one new object, with two associated certificate. - assert_eq!(2, client2.store().certificates.iter().count()); - assert_eq!(1, client2.store().object_sequence_numbers.iter().count()); - assert_eq!(1, client2.store().object_certs.iter().count()); - assert!(client2 - .store() - .object_certs - .contains_key(new_object_id) - .unwrap()); - assert_eq!( - 2, - client2 - .store() - .object_certs - .get(new_object_id) - .unwrap() - .unwrap() - .len() - ); - assert_eq!( - SequenceNumber::from(2), - client2 - .store() - .object_sequence_numbers - .get(new_object_id) - .unwrap() - .unwrap() - .clone() - ); -} - -#[tokio::test] -async fn test_module_publish_and_call_good() { - // Init the states - let (authority_clients, committee) = init_local_authorities(4).await; - let mut client1 = make_client(authority_clients.clone(), committee); - - let gas_object_id = ObjectID::random(); - - // Populate authorities with gas obj data - let gas_object_ref = fund_account_with_same_objects( - authority_clients.values().collect(), - &mut client1, - vec![gas_object_id], - ) - .await - .iter() - .next() - .unwrap() - .1 - .to_object_reference(); - - // Provide path to well formed package sources - let mut hero_path = env!("CARGO_MANIFEST_DIR").to_owned(); - hero_path.push_str("/../fastx_programmability/examples/"); - - let pub_res = client1.publish(hero_path, gas_object_ref).await; - - let (_, order_info_resp) = pub_res.unwrap(); - let published_effects = order_info_resp.signed_effects.unwrap().effects; - - // Only package obj should be created - assert_eq!(published_effects.created.len(), 1); - - // Verif gas obj - assert_eq!(published_effects.gas_object.0 .0, gas_object_ref.0); - - let (new_obj_ref, _) = published_effects.created.get(0).unwrap(); - assert_ne!(gas_object_ref, *new_obj_ref); - - // We now have the module obj ref - // We can inspect it - - let new_obj = client1 - .get_object_info(ObjectInfoRequest { - object_id: new_obj_ref.0, - request_sequence_number: None, - }) - .await - .unwrap(); - - // Version should be 1 for all modules - assert_eq!(new_obj.object().unwrap().version(), OBJECT_START_VERSION); - // Must be immutable - assert!(new_obj.object().unwrap().is_read_only()); - - // StructTag type is not defined for package - assert!(new_obj.object().unwrap().type_().is_none()); - - // Data should be castable as a package - assert!(new_obj.object().unwrap().data.try_as_package().is_some()); - - // Retrieve latest gas obj spec - let gas_object = client1 - .get_object_info(ObjectInfoRequest { - object_id: gas_object_id, - request_sequence_number: None, - }) - .await - .unwrap() - .object() - .unwrap() - .clone(); - - let gas_object_ref = gas_object.to_object_reference(); - - //Try to call a function in TrustedCoin module - let call_resp = client1 - .move_call( - new_obj.object().unwrap().to_object_reference(), - ident_str!("TrustedCoin").to_owned(), - ident_str!("init").to_owned(), - vec![], - gas_object_ref, - vec![], - vec![], - 1000, - ) - .await - .unwrap(); - - let effects = call_resp.1.signed_effects.unwrap().effects; - assert!(effects.status == ExecutionStatus::Success); - - // This gets the treasury cap for the coin and gives it to the sender - let tres_cap_ref = effects - .created - .iter() - .find(|r| r.0 .0 != gas_object_ref.0) - .unwrap() - .0; - - // Fetch the full obj info - let tres_cap_obj_info = client1 - .get_object_info(ObjectInfoRequest { - object_id: tres_cap_ref.0, - request_sequence_number: None, - }) - .await - .unwrap(); - // Confirm we own this object - assert_eq!(tres_cap_obj_info.object().unwrap().owner, gas_object.owner); -} - -// Pass a file in a package dir instead of the root. The builder should be able to infer the root -#[tokio::test] -async fn test_module_publish_file_path() { - // Init the states - let (authority_clients, committee) = init_local_authorities(4).await; - let mut client1 = make_client(authority_clients.clone(), committee); - - let gas_object_id = ObjectID::random(); - - // Populate authorities with gas obj data - let gas_object_ref = fund_account_with_same_objects( - authority_clients.values().collect(), - &mut client1, - vec![gas_object_id], - ) - .await - .iter() - .next() - .unwrap() - .1 - .to_object_reference(); - - // Compile - let mut hero_path = env!("CARGO_MANIFEST_DIR").to_owned(); - - // Use a path pointing to a different file - hero_path.push_str("/../fastx_programmability/examples/Hero.move"); - - let pub_resp = client1.publish(hero_path, gas_object_ref).await; - - let (_, order_info_resp) = pub_resp.unwrap(); - let published_effects = order_info_resp.signed_effects.unwrap().effects; - - // Only package obj should be created - assert_eq!(published_effects.created.len(), 1); - - // Verif gas - assert_eq!(published_effects.gas_object.0 .0, gas_object_ref.0); - - let (new_obj_ref, _) = published_effects.created.get(0).unwrap(); - assert_ne!(gas_object_ref, *new_obj_ref); - - // We now have the module obj ref - // We can inspect it - let new_obj = client1 - .get_object_info(ObjectInfoRequest { - object_id: new_obj_ref.0, - request_sequence_number: None, - }) - .await - .unwrap(); - - // Version should be 1 for all modules - assert_eq!(new_obj.object().unwrap().version(), OBJECT_START_VERSION); - // Must be immutable - assert!(new_obj.object().unwrap().is_read_only()); - - // StructTag type is not defined for package - assert!(new_obj.object().unwrap().type_().is_none()); - - // Data should be castable as a package - assert!(new_obj.object().unwrap().data.try_as_package().is_some()); - - // Retrieve latest gas obj spec - let gas_object = client1 - .get_object_info(ObjectInfoRequest { - object_id: gas_object_id, - request_sequence_number: None, - }) - .await - .unwrap() - .object() - .unwrap() - .clone(); - - let gas_object_ref = gas_object.to_object_reference(); - - // Even though we provided a path to Hero.move, the builder is able to find the package root - // build all in the package, including TrustedCoin module - //Try to call a function in TrustedCoin module - let call_resp = client1 - .move_call( - new_obj.object().unwrap().to_object_reference(), - ident_str!("TrustedCoin").to_owned(), - ident_str!("init").to_owned(), - vec![], - gas_object_ref, - vec![], - vec![], - 1000, - ) - .await; - assert!(call_resp.is_ok()); -} - -#[tokio::test] -async fn test_module_publish_bad_path() { - // Init the states - let (authority_clients, committee) = init_local_authorities(4).await; - let mut client1 = make_client(authority_clients.clone(), committee); - - let gas_object_id = ObjectID::random(); - - // Populate authorities with gas obj data - let gas_object_ref = fund_account_with_same_objects( - authority_clients.values().collect(), - &mut client1, - vec![gas_object_id], - ) - .await - .iter() - .next() - .unwrap() - .1 - .to_object_reference(); - - // Compile - let mut hero_path = env!("CARGO_MANIFEST_DIR").to_owned(); - - // Use a bad path - hero_path.push_str("/../fastx_____programmability/examples/"); - - let pub_resp = client1.publish(hero_path, gas_object_ref).await; - // Has to fail - assert!(pub_resp.is_err()); -} - -#[tokio::test] -async fn test_module_publish_naughty_path() { - // Init the states - let (authority_clients, committee) = init_local_authorities(4).await; - let mut client1 = make_client(authority_clients.clone(), committee); - - let gas_object_id = ObjectID::random(); - - // Populate authorities with gas obj data - let gas_object_ref = fund_account_with_same_objects( - authority_clients.values().collect(), - &mut client1, - vec![gas_object_id], - ) - .await - .iter() - .next() - .unwrap() - .1 - .to_object_reference(); - - for ns in naughty_strings::BLNS { - // Compile - let mut hero_path = env!("CARGO_MANIFEST_DIR").to_owned(); - - // Use a bad path - hero_path.push_str(&format!("/../{}", ns)); - - let pub_resp = client1.publish(hero_path, gas_object_ref).await; - // Has to fail - assert!(pub_resp.is_err()); - } -} - -#[test] -fn test_transfer_object_error() { - let rt = Runtime::new().unwrap(); - let (recipient, _) = get_key_pair(); - - let objects: Vec = (0..10).map(|_| ObjectID::random()).collect(); - let gas_object = ObjectID::random(); - let number_of_authorities = 4; - - let mut all_objects = objects.clone(); - all_objects.push(gas_object); - let authority_objects = (0..number_of_authorities) - .map(|_| all_objects.clone()) - .collect(); - - let mut sender = rt.block_on(init_local_client_state(authority_objects)); - - let mut objects = objects.iter(); - - // Test 1: Double spend - let object_id = *objects.next().unwrap(); - rt.block_on(sender.transfer_object(object_id, gas_object, recipient)) - .unwrap(); - let result = rt.block_on(sender.transfer_object(object_id, gas_object, recipient)); - - assert!(result.is_err()); - assert!(matches!( - result.unwrap_err().downcast_ref(), - Some(FastPayError::ObjectNotFound { .. }) - )); - - // Test 2: Object not known to authorities - let obj = Object::with_id_owner_for_testing(ObjectID::random(), sender.address()); - sender - .store() - .object_refs - .insert(&obj.id(), &obj.to_object_reference()) - .unwrap(); - sender - .store() - .object_sequence_numbers - .insert(&obj.id(), &SequenceNumber::new()) - .unwrap(); - let result = rt.block_on(sender.transfer_object(obj.id(), gas_object, recipient)); - assert!(result.is_err()); - assert!(matches!(result.unwrap_err().downcast_ref(), - Some(FastPayError::QuorumNotReached {errors, ..}) if matches!(errors.as_slice(), [FastPayError::ObjectNotFound{..}, ..]))); - - // Test 3: invalid object digest - let object_id = *objects.next().unwrap(); - - // give object an incorrect object digest - sender - .store() - .object_refs - .insert( - &object_id, - &(object_id, SequenceNumber::new(), ObjectDigest([0; 32])), - ) - .unwrap(); - - let result = rt.block_on(sender.transfer_object(object_id, gas_object, recipient)); - assert!(result.is_err()); - assert!(matches!(result.unwrap_err().downcast_ref(), - Some(FastPayError::QuorumNotReached {errors, ..}) if matches!(errors.as_slice(), [FastPayError::LockErrors{..}, ..]))); - - // Test 4: Invalid sequence number; - let object_id = *objects.next().unwrap(); - - // give object an incorrect sequence number - sender - .store() - .object_sequence_numbers - .insert(&object_id, &SequenceNumber::from(2)) - .unwrap(); - - let result = rt.block_on(sender.transfer_object(object_id, gas_object, recipient)); - assert!(result.is_err()); - assert!(matches!( - result.unwrap_err().downcast_ref(), - Some(FastPayError::UnexpectedSequenceNumber { .. }) - )); - - // Test 5: The client does not allow concurrent transfer; - let object_id = *objects.next().unwrap(); - // Fabricate a fake pending transfer - let transfer = Transfer { - sender: sender.address(), - recipient: FastPayAddress::random_for_testing_only(), - object_ref: (object_id, Default::default(), ObjectDigest::new([0; 32])), - gas_payment: (gas_object, Default::default(), ObjectDigest::new([0; 32])), - }; - sender - .lock_pending_order_objects(&Order::new( - OrderKind::Transfer(transfer), - &get_key_pair().1, - )) - .unwrap(); - - let result = rt.block_on(sender.transfer_object(object_id, gas_object, recipient)); - assert!(result.is_err()); - assert!(matches!( - result.unwrap_err().downcast_ref(), - Some(FastPayError::ConcurrentTransactionError) - )) -} - -#[tokio::test] -async fn test_receive_object_error() -> Result<(), anyhow::Error> { - let number_of_authorities = 4; - let (authority_clients, committee) = init_local_authorities(number_of_authorities).await; - let mut client1 = make_client(authority_clients.clone(), committee.clone()); - let mut client2 = make_client(authority_clients.clone(), committee); - - let objects: Vec = (0..10).map(|_| ObjectID::random()).collect(); - let gas_object = ObjectID::random(); - let gas_object_2 = ObjectID::random(); - let mut all_objects = objects.clone(); - all_objects.push(gas_object); - fund_account_with_same_objects( - authority_clients.values().collect(), - &mut client1, - all_objects, - ) - .await; - fund_account_with_same_objects( - authority_clients.values().collect(), - &mut client2, - vec![gas_object_2], - ) - .await; - - let mut objects = objects.iter(); - // Test 1: Recipient is not us. - let object_id = *objects.next().unwrap(); - let (certificate, _) = client1 - .transfer_object( - object_id, - gas_object, - FastPayAddress::random_for_testing_only(), - ) - .await?; - - let result = client2.receive_object(&certificate).await; - - assert!(matches!( - result.unwrap_err().downcast_ref(), - Some(FastPayError::IncorrectRecipientError) - )); - - // Test 2: Receive tempered certificate order. - let (transfer, sig) = match certificate.order { - Order { - kind: OrderKind::Transfer(t), - signature, - } => Some((t, signature)), - _ => None, - } - .unwrap(); - - let malformed_order = CertifiedOrder { - order: Order { - kind: OrderKind::Transfer(Transfer { - sender: client1.address(), - recipient: client2.address(), - object_ref: transfer.object_ref, - gas_payment: transfer.gas_payment, - }), - signature: sig, - }, - signatures: certificate.signatures, - }; - - let result = client2.receive_object(&malformed_order).await; - assert!(matches!( - result.unwrap_err().downcast_ref(), - Some(FastPayError::InvalidSignature { .. }) - )); - - Ok(()) -} - -#[test] -fn test_client_store() { - let store = - ClientStore::new(env::temp_dir().join(format!("CLIENT_DB_{:?}", ObjectID::random()))); - - // Make random sequence numbers - let keys_vals = (0..100) - .map(|i| (ObjectID::random(), SequenceNumber::from(i))) - .collect::>(); - // Try insert batch - store - .object_sequence_numbers - .multi_insert(keys_vals.clone().into_iter()) - .unwrap(); - - // Check the size - assert_eq!(store.object_sequence_numbers.iter().count(), 100); - - // Check that the items are all correct - keys_vals.iter().for_each(|(k, v)| { - assert_eq!(*v, store.object_sequence_numbers.get(k).unwrap().unwrap()); - }); - - // Check that are removed - store - .object_sequence_numbers - .multi_remove(keys_vals.into_iter().map(|(k, _)| k)) - .unwrap(); - - assert!(store.object_sequence_numbers.is_empty()); -} - -#[tokio::test] -async fn test_object_store() { - // Init the states - let (authority_clients, committee) = init_local_authorities(4).await; - let mut client1 = make_client(authority_clients.clone(), committee.clone()); - - let gas_object_id = ObjectID::random(); - - // Populate authorities with gas obj data - let gas_object = fund_account_with_same_objects( - authority_clients.values().collect(), - &mut client1, - vec![gas_object_id], - ) - .await - .iter() - .next() - .unwrap() - .1 - .clone(); - let gas_object_ref = gas_object.clone().to_object_reference(); - // Ensure that object store is empty - assert!(client1.store().objects.is_empty()); - - // Run a few syncs to retrieve objects ids - for _ in 0..4 { - let _ = client1.sync_client_state().await.unwrap(); - } - // Try to download objects which are not already in storage - client1.download_owned_objects_not_in_db().await.unwrap(); - - // Gas object should be in storage now - assert_eq!(client1.store().objects.iter().count(), 1); - - // Verify that we indeed have the object - let gas_obj_from_store = client1 - .store() - .objects - .get(&gas_object_ref) - .unwrap() - .unwrap(); - assert_eq!(gas_obj_from_store, gas_object); - - // Provide path to well formed package sources - let mut hero_path = env!("CARGO_MANIFEST_DIR").to_owned(); - hero_path.push_str("/../fastx_programmability/examples/"); - - let pub_res = client1.publish(hero_path, gas_object_ref).await; - - let (_, order_info_resp) = pub_res.unwrap(); - let published_effects = order_info_resp.signed_effects.unwrap().effects; - - // Only package obj should be created - assert_eq!(published_effects.created.len(), 1); - - // Verif gas obj - assert_eq!(published_effects.gas_object.0 .0, gas_object_ref.0); - - let (new_obj_ref, _) = published_effects.created.get(0).unwrap(); - assert_ne!(gas_object_ref, *new_obj_ref); - - // We now have the module obj ref - // We can inspect it - let new_obj = client1 - .get_object_info(ObjectInfoRequest { - object_id: new_obj_ref.0, - request_sequence_number: None, - }) - .await - .unwrap(); - - // Published object should be in storage now - // But also the new gas object should be in storage, so 2 new items, plus 1 from before - assert_eq!(client1.store().objects.iter().count(), 3); - - // Verify that we indeed have the new module object - let mod_obj_from_store = client1.store().objects.get(new_obj_ref).unwrap().unwrap(); - assert_eq!(mod_obj_from_store, *new_obj.object().unwrap()); -} - -#[tokio::test] -async fn test_object_store_transfer() { - let (authority_clients, committee) = init_local_authorities(4).await; - let mut client1 = make_client(authority_clients.clone(), committee.clone()); - let mut client2 = make_client(authority_clients.clone(), committee); - - let object_id = ObjectID::random(); - let gas_object1 = ObjectID::random(); - let gas_object2 = ObjectID::random(); - - fund_account_with_same_objects( - authority_clients.values().collect(), - &mut client1, - vec![object_id, gas_object1], - ) - .await; - fund_account_with_same_objects( - authority_clients.values().collect(), - &mut client2, - vec![gas_object2], - ) - .await; + for ns in naughty_strings::BLNS { + // Compile + let mut hero_path = env!("CARGO_MANIFEST_DIR").to_owned(); - // Clients should not have retrieved objects - assert_eq!(client1.store().objects.iter().count(), 0); - assert_eq!(client2.store().objects.iter().count(), 0); + // Use a bad path + hero_path.push_str(&format!("/../{}", ns)); - // Run a few syncs to populate object ids - for _ in 0..4 { - let _ = client1.sync_client_state().await.unwrap(); - let _ = client2.sync_client_state().await.unwrap(); + let pub_resp = client1.publish(hero_path, gas_object_ref).await; + // Has to fail + assert!(pub_resp.is_err()); } - - // Try to download objects which are not already in storage - client1.download_owned_objects_not_in_db().await.unwrap(); - client2.download_owned_objects_not_in_db().await.unwrap(); - - // Gas object and another object should be in storage now for client 1 - assert_eq!(client1.store().objects.iter().count(), 2); - - // Only gas object should be in storage now for client 2 - assert_eq!(client2.store().objects.iter().count(), 1); - - // Transfer object to client2. - let (certificate, _) = client1 - .transfer_object(object_id, gas_object1, client2.address()) - .await - .unwrap(); - - // Update client2's local object data. - client2.receive_object(&certificate).await.unwrap(); - - // Client 1 should not have lost its objects - // Plus it should have a new gas object - assert_eq!(client1.store().objects.iter().count(), 3); - // Client 2 should now have the new object - assert_eq!(client2.store().objects.iter().count(), 2); - - // Transfer the object back to Client1 - let (certificate, _) = client2 - .transfer_object(object_id, gas_object2, client1.address()) - .await - .unwrap(); - // Update client1's local object data. - client1.receive_object(&certificate).await.unwrap(); - - // Client 1 should have a new version of the object back - assert_eq!(client1.store().objects.iter().count(), 4); - // Client 2 should have new gas object version - assert_eq!(client2.store().objects.iter().count(), 3); } -#[tokio::test] -async fn test_transfer_pending_orders() { - let objects: Vec = (0..15).map(|_| ObjectID::random()).collect(); +#[test] +fn test_transfer_object_error() { + let rt = Runtime::new().unwrap(); + let (recipient, _) = get_key_pair(); + + let objects: Vec = (0..10).map(|_| ObjectID::random()).collect(); let gas_object = ObjectID::random(); let number_of_authorities = 4; @@ -2703,46 +1427,42 @@ async fn test_transfer_pending_orders() { .map(|_| all_objects.clone()) .collect(); - let mut sender_state = init_local_client_state(authority_objects).await; - let recipient = init_local_client_state(vec![vec![]]).await.address(); + let mut sender = rt.block_on(init_local_client_state(authority_objects)); let mut objects = objects.iter(); - // Test 1: Normal transfer + // Test 1: Double spend let object_id = *objects.next().unwrap(); - sender_state - .transfer_object(object_id, gas_object, recipient) - .await + rt.block_on(sender.transfer_object(object_id, gas_object, recipient)) .unwrap(); - // Pending order should be cleared - assert!(sender_state.store().pending_orders.is_empty()); + let result = rt.block_on(sender.transfer_object(object_id, gas_object, recipient)); - // Test 2: Object not known to authorities. This has no side effect - let obj = Object::with_id_owner_for_testing(ObjectID::random(), sender_state.address()); - sender_state + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err().downcast_ref(), + Some(FastPayError::ObjectNotFound { .. }) + )); + + // Test 2: Object not known to authorities + let obj = Object::with_id_owner_for_testing(ObjectID::random(), sender.address()); + sender .store() .object_refs .insert(&obj.id(), &obj.to_object_reference()) .unwrap(); - sender_state + sender .store() .object_sequence_numbers .insert(&obj.id(), &SequenceNumber::new()) .unwrap(); - let result = sender_state - .transfer_object(obj.id(), gas_object, recipient) - .await; + let result = rt.block_on(sender.transfer_object(obj.id(), gas_object, recipient)); assert!(result.is_err()); - assert!(matches!(result.unwrap_err().downcast_ref(), - Some(FastPayError::QuorumNotReached {errors, ..}) if matches!(errors.as_slice(), [FastPayError::ObjectNotFound{..}, ..]))); - // Pending order should be cleared - assert!(sender_state.store().pending_orders.is_empty()); - // Test 3: invalid object digest. This also has no side effect + // Test 3: invalid object digest let object_id = *objects.next().unwrap(); // give object an incorrect object digest - sender_state + sender .store() .object_refs .insert( @@ -2751,58 +1471,85 @@ async fn test_transfer_pending_orders() { ) .unwrap(); - let result = sender_state - .transfer_object(object_id, gas_object, recipient) - .await; + let result = rt.block_on(sender.transfer_object(object_id, gas_object, recipient)); assert!(result.is_err()); - assert!(matches!(result.unwrap_err().downcast_ref(), - Some(FastPayError::QuorumNotReached {errors, ..}) if matches!(errors.as_slice(), [FastPayError::LockErrors{..}, ..]))); - // Pending order should be cleared - assert!(sender_state.store().pending_orders.is_empty()); + // Test 4: Invalid sequence number; + let object_id = *objects.next().unwrap(); - // Test 4: Conflicting orders touching same objects + // give object an incorrect sequence number + sender + .store() + .object_sequence_numbers + .insert(&object_id, &SequenceNumber::from(2)) + .unwrap(); + + let result = rt.block_on(sender.transfer_object(object_id, gas_object, recipient)); + assert!(result.is_err()); + + // Test 5: The client does not allow concurrent transfer; let object_id = *objects.next().unwrap(); // Fabricate a fake pending transfer let transfer = Transfer { - sender: sender_state.address(), + sender: sender.address(), recipient: FastPayAddress::random_for_testing_only(), object_ref: (object_id, Default::default(), ObjectDigest::new([0; 32])), gas_payment: (gas_object, Default::default(), ObjectDigest::new([0; 32])), }; - // Simulate locking some objects - sender_state + sender .lock_pending_order_objects(&Order::new( OrderKind::Transfer(transfer), &get_key_pair().1, )) .unwrap(); - // Try to use those objects in another order - let result = sender_state - .transfer_object(object_id, gas_object, recipient) - .await; + + let result = rt.block_on(sender.transfer_object(object_id, gas_object, recipient)); assert!(result.is_err()); - assert!(matches!( - result.unwrap_err().downcast_ref(), - Some(FastPayError::ConcurrentTransactionError) - )); - // clear the pending orders - sender_state.store().pending_orders.clear().unwrap(); - assert_eq!(sender_state.store().pending_orders.iter().count(), 0); +} + +#[test] +fn test_client_store() { + let store = + ClientStore::new(env::temp_dir().join(format!("CLIENT_DB_{:?}", ObjectID::random()))); + + // Make random sequence numbers + let keys_vals = (0..100) + .map(|i| (ObjectID::random(), SequenceNumber::from(i))) + .collect::>(); + // Try insert batch + store + .object_sequence_numbers + .multi_insert(keys_vals.clone().into_iter()) + .unwrap(); + + // Check the size + assert_eq!(store.object_sequence_numbers.iter().count(), 100); + + // Check that the items are all correct + keys_vals.iter().for_each(|(k, v)| { + assert_eq!(*v, store.object_sequence_numbers.get(k).unwrap().unwrap()); + }); + + // Check that are removed + store + .object_sequence_numbers + .multi_remove(keys_vals.into_iter().map(|(k, _)| k)) + .unwrap(); + + assert!(store.object_sequence_numbers.is_empty()); } #[tokio::test] -async fn test_full_client_sync_move_calls() { +async fn test_object_store() { + // Init the states let (authority_clients, committee) = init_local_authorities(4).await; let mut client1 = make_client(authority_clients.clone(), committee.clone()); - let object_value: u64 = 100; let gas_object_id = ObjectID::random(); - let framework_obj_ref = client1.get_framework_object_ref().await.unwrap(); - // Populate authorities with obj data - let mut gas_object_ref = fund_account_with_same_objects( - authority_clients.clone().values().collect(), + // Populate authorities with gas obj data + let gas_object = fund_account_with_same_objects( + authority_clients.values().collect(), &mut client1, vec![gas_object_id], ) @@ -2811,241 +1558,137 @@ async fn test_full_client_sync_move_calls() { .next() .unwrap() .1 - .to_object_reference(); - - // When creating an ObjectBasics object, we provide the value (u64) and address which will own the object - let pure_args = vec![ - object_value.to_le_bytes().to_vec(), - bcs::to_bytes(&client1.address().to_vec()).unwrap(), - ]; - let call_res = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("create").to_owned(), - Vec::new(), - gas_object_ref, - Vec::new(), - pure_args, - GAS_VALUE_FOR_TESTING - 1, // Make sure budget is less than gas value - ) - .await; - - let (mut last_certificate, order_info_resp) = call_res.unwrap(); - let call_effects = order_info_resp.signed_effects.unwrap().effects; - - assert_eq!(call_effects.gas_object.0 .0, gas_object_id); - - // Get the object created from the call - let (new_obj_ref, _) = call_effects.created[0]; - - for value in 0u64..10u64 { - // Fetch the full object - let new_obj_ref = client_object(&mut client1, new_obj_ref.0).await.0; - gas_object_ref = client_object(&mut client1, gas_object_id).await.0; - - let pure_args = vec![bcs::to_bytes(&value).unwrap()]; - let _call_response = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("set_value").to_owned(), - Vec::new(), - gas_object_ref, - vec![new_obj_ref], - pure_args, - GAS_VALUE_FOR_TESTING / 2, - ) - .await; + .clone(); + let gas_object_ref = gas_object.clone().to_object_reference(); + // Ensure that object store is empty + assert!(client1.store().objects.is_empty()); - last_certificate = _call_response.unwrap().0; + // Run a few syncs to retrieve objects ids + for _ in 0..4 { + let _ = client1.sync_client_state().await.unwrap(); } + // Try to download objects which are not already in storage + client1.download_owned_objects_not_in_db().await.unwrap(); + + // Gas object should be in storage now + assert_eq!(client1.store().objects.iter().count(), 1); - // For this test to work the client has updated the first 3 authorities but not the last one - // Assert this to catch any changes to the client behaviour that reqire fixing this test to still - // test sync. + // Verify that we indeed have the object + let gas_obj_from_store = client1 + .store() + .objects + .get(&gas_object_ref) + .unwrap() + .unwrap(); + assert_eq!(gas_obj_from_store, gas_object); - let authorities: Vec<_> = authority_clients.clone().into_iter().collect(); + // Provide path to well formed package sources + let mut hero_path = env!("CARGO_MANIFEST_DIR").to_owned(); + hero_path.push_str("/../fastx_programmability/examples/"); - let (full_seq, _) = auth_object(&authorities[2].1, gas_object_id).await; - assert_eq!(full_seq.1, SequenceNumber::from(11)); + let pub_res = client1.publish(hero_path, gas_object_ref).await; - let (zero_seq, _) = auth_object(&authorities[3].1, gas_object_id).await; - assert_eq!(zero_seq.1, SequenceNumber::from(0)); + let (_, published_effects) = pub_res.unwrap(); - // This is (finally) the function we want to test + // Only package obj should be created + assert_eq!(published_effects.created.len(), 1); - // If we try to sync from the authority that does not have the data to the one - // that does not we fail. - let result = client1 - .authorities() - .sync_authority_source_to_destination( - ConfirmationOrder::new(last_certificate.clone()), - authorities[3].0, - authorities[3].0, - ) - .await; + // Verif gas obj + assert_eq!(published_effects.gas_object.0 .0, gas_object_ref.0); - assert!(result.is_err()); + let (new_obj_ref, _) = published_effects.created.get(0).unwrap(); + assert_ne!(gas_object_ref, *new_obj_ref); - // Here we get the list of objects known by authorities. - let (obj_map, _auths) = client1 - .authorities() - .get_all_owned_objects(client1.address(), Duration::from_secs(10)) + // We now have the module obj ref + // We can inspect it + let new_obj = client1 + .get_object_info(ObjectInfoRequest { + object_id: new_obj_ref.0, + request_sequence_number: None, + }) .await .unwrap(); - // Check only 3 out of 4 authorities have the latest object - assert_eq!(obj_map[&full_seq].len(), 3); - - // We sync all the client objects - let result = client1 - .authorities() - .sync_all_owned_objects(client1.address(), Duration::from_secs(10)) - .await; - - let (active, deleted) = result.unwrap(); - assert_eq!(0, deleted.len()); - assert_eq!(2, active.len()); + // Published object should be in storage now + // But also the new gas object should be in storage, so 2 new items, plus 1 from before + assert_eq!(client1.store().objects.iter().count(), 3); - // Here we get the list of objects known by authorities. - let (obj_map, _auths) = client1 - .authorities() - .get_all_owned_objects(client1.address(), Duration::from_secs(10)) - .await - .unwrap(); - // Check all 4 out of 4 authorities have the latest object - assert_eq!(obj_map[&full_seq].len(), 4); + // Verify that we indeed have the new module object + let mod_obj_from_store = client1.store().objects.get(new_obj_ref).unwrap().unwrap(); + assert_eq!(mod_obj_from_store, *new_obj.object().unwrap()); } #[tokio::test] -async fn test_full_client_sync_delete_calls() { +async fn test_object_store_transfer() { let (authority_clients, committee) = init_local_authorities(4).await; let mut client1 = make_client(authority_clients.clone(), committee.clone()); + let mut client2 = make_client(authority_clients.clone(), committee); - let object_value: u64 = 100; - let gas_object_id = ObjectID::random(); - let framework_obj_ref = client1.get_framework_object_ref().await.unwrap(); + let object_id = ObjectID::random(); + let gas_object1 = ObjectID::random(); + let gas_object2 = ObjectID::random(); - // Populate authorities with obj data - let mut gas_object_ref = fund_account_with_same_objects( - authority_clients.clone().values().collect(), + fund_account_with_same_objects( + authority_clients.values().collect(), &mut client1, - vec![gas_object_id], + vec![object_id, gas_object1], ) - .await - .iter() - .next() - .unwrap() - .1 - .to_object_reference(); - - let gas_id = gas_object_ref.0; - - // When creating an ObjectBasics object, we provide the value (u64) and address which will own the object - let pure_args = vec![ - object_value.to_le_bytes().to_vec(), - bcs::to_bytes(&client1.address().to_vec()).unwrap(), - ]; - let call_res = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("create").to_owned(), - Vec::new(), - gas_object_ref, - Vec::new(), - pure_args, - GAS_VALUE_FOR_TESTING - 1, // Make sure budget is less than gas value - ) - .await; - let (_, order_info_resp) = call_res.unwrap(); - let order_effects = order_info_resp.signed_effects.unwrap().effects; - - assert_eq!(order_effects.gas_object.0 .0, gas_object_id); + .await; + fund_account_with_same_objects( + authority_clients.values().collect(), + &mut client2, + vec![gas_object2], + ) + .await; - // Get the object created from the call - let (new_obj_ref, _) = order_effects.created[0]; + // Clients should not have retrieved objects + assert_eq!(client1.store().objects.iter().count(), 0); + assert_eq!(client2.store().objects.iter().count(), 0); - for value in 0u64..20u64 { - // Fetch the full object - let new_obj_ref = client_object(&mut client1, new_obj_ref.0).await.0; - gas_object_ref = client_object(&mut client1, gas_id).await.0; - - let pure_args = vec![bcs::to_bytes(&value).unwrap()]; - let _call_response = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("set_value").to_owned(), - Vec::new(), - gas_object_ref, - vec![new_obj_ref], - pure_args, - GAS_VALUE_FOR_TESTING / 2, - ) - .await; + // Run a few syncs to populate object ids + for _ in 0..4 { + let _ = client1.sync_client_state().await.unwrap(); + let _ = client2.sync_client_state().await.unwrap(); } - // Fetch the full object - let new_obj_ref = client_object(&mut client1, new_obj_ref.0).await.0; - gas_object_ref = client_object(&mut client1, gas_id).await.0; - - // We sync before we delete - let result = client1 - .authorities() - .sync_all_owned_objects(client1.address(), Duration::from_secs(10)) - .await; - - let (active, deleted) = result.unwrap(); - assert_eq!(0, deleted.len()); - assert_eq!(2, active.len()); + // Try to download objects which are not already in storage + client1.download_owned_objects_not_in_db().await.unwrap(); + client2.download_owned_objects_not_in_db().await.unwrap(); - let _call_response = client1 - .move_call( - framework_obj_ref, - ident_str!("ObjectBasics").to_owned(), - ident_str!("delete").to_owned(), - Vec::new(), - gas_object_ref, - vec![new_obj_ref], - Vec::new(), - GAS_VALUE_FOR_TESTING / 2, - ) - .await; + // Gas object and another object should be in storage now for client 1 + assert_eq!(client1.store().objects.iter().count(), 2); - // For this test to work the client has updated the first 3 authorities but not the last one - // Assert this to catch any changes to the client behaviour that reqire fixing this test to still - // test sync. + // Only gas object should be in storage now for client 2 + assert_eq!(client2.store().objects.iter().count(), 1); - let authorities: Vec<_> = authority_clients.clone().into_iter().collect(); + // Transfer object to client2. + let _certificate = client1 + .transfer_object(object_id, gas_object1, client2.address()) + .await + .unwrap(); - let (full_seq, _) = auth_object(&authorities[2].1, gas_id).await; - assert_eq!(full_seq.1, SequenceNumber::from(22)); + // Update client2's local object data. + client2.sync_client_state().await.unwrap(); - let (zero_seq, _) = auth_object(&authorities[3].1, gas_id).await; - assert_eq!(zero_seq.1, SequenceNumber::from(21)); + // Client 1 should not have lost its objects + // Plus it should have a new gas object + assert_eq!(client1.store().objects.iter().count(), 3); + // Client 2 should now have the new object + assert_eq!(client2.store().objects.iter().count(), 1); - // This is (finally) the function we want to test - // Here we get the list of objects known by authorities. - let (obj_map, _auths) = client1 - .authorities() - .get_all_owned_objects(client1.address(), Duration::from_secs(10)) + // Transfer the object back to Client1 + let _certificate = client2 + .transfer_object(object_id, gas_object2, client1.address()) .await .unwrap(); - // Check only 3 out of 4 authorities have the latest object - assert_eq!(obj_map[&full_seq].len(), 3); - - // We sync all the client objects - let result = client1 - .authorities() - .sync_all_owned_objects(client1.address(), Duration::from_secs(10)) - .await; - let (active, deleted) = result.unwrap(); + // Update client1's local object data. + client1.sync_client_state().await.unwrap(); - assert_eq!(1, deleted.len()); - assert_eq!(1, active.len()); + // Client 1 should have a new version of the object back + assert_eq!(client1.store().objects.iter().count(), 3); + // Client 2 should have new gas object version + assert_eq!(client2.store().objects.iter().count(), 2); } // A helper function to make tests less verbose @@ -3065,6 +1708,7 @@ async fn client_object(client: &mut dyn Client, object_id: ObjectID) -> (ObjectR } // A helper function to make tests less verbose +#[allow(dead_code)] async fn auth_object(authority: &LocalAuthorityClient, object_id: ObjectID) -> (ObjectRef, Object) { let response = authority .handle_object_info_request(ObjectInfoRequest::from(object_id)) @@ -3447,7 +2091,7 @@ async fn test_sync_all_owned_objects() { 2, owned_object .iter() - .filter(|o| o.owner.is_address(&client1.address())) + .filter(|(o, _)| o.owner.is_address(&client1.address())) .count() ); } @@ -3599,3 +2243,103 @@ async fn test_process_certificate() { let new_object_ref = client_object(&mut client1, new_ref_1.0).await.0; assert_eq!(SequenceNumber::from(2), new_object_ref.1); } + +#[tokio::test] +async fn test_transfer_pending_orders() { + let objects: Vec = (0..15).map(|_| ObjectID::random()).collect(); + let gas_object = ObjectID::random(); + let number_of_authorities = 4; + + let mut all_objects = objects.clone(); + all_objects.push(gas_object); + let authority_objects = (0..number_of_authorities) + .map(|_| all_objects.clone()) + .collect(); + + let mut sender_state = init_local_client_state(authority_objects).await; + let recipient = init_local_client_state(vec![vec![]]).await.address(); + + let mut objects = objects.iter(); + + // Test 1: Normal transfer + let object_id = *objects.next().unwrap(); + sender_state + .transfer_object(object_id, gas_object, recipient) + .await + .unwrap(); + // Pending order should be cleared + assert!(sender_state.store().pending_orders.is_empty()); + + // Test 2: Object not known to authorities. This has no side effect + let obj = Object::with_id_owner_for_testing(ObjectID::random(), sender_state.address()); + sender_state + .store() + .object_refs + .insert(&obj.id(), &obj.to_object_reference()) + .unwrap(); + sender_state + .store() + .object_sequence_numbers + .insert(&obj.id(), &SequenceNumber::new()) + .unwrap(); + let result = sender_state + .transfer_object(obj.id(), gas_object, recipient) + .await; + assert!(result.is_err()); + // assert!(matches!(result.unwrap_err().downcast_ref(), + // Some(FastPayError::QuorumNotReached {errors, ..}) if matches!(errors.as_slice(), [FastPayError::ObjectNotFound{..}, ..]))); + // Pending order should be cleared + assert!(sender_state.store().pending_orders.is_empty()); + + // Test 3: invalid object digest. This also has no side effect + let object_id = *objects.next().unwrap(); + + // give object an incorrect object digest + sender_state + .store() + .object_refs + .insert( + &object_id, + &(object_id, SequenceNumber::new(), ObjectDigest([0; 32])), + ) + .unwrap(); + + let result = sender_state + .transfer_object(object_id, gas_object, recipient) + .await; + assert!(result.is_err()); + //assert!(matches!(result.unwrap_err().downcast_ref(), + // Some(FastPayError::QuorumNotReached {errors, ..}) if matches!(errors.as_slice(), [FastPayError::LockErrors{..}, ..]))); + + // Pending order should be cleared + assert!(sender_state.store().pending_orders.is_empty()); + + // Test 4: Conflicting orders touching same objects + let object_id = *objects.next().unwrap(); + // Fabricate a fake pending transfer + let transfer = Transfer { + sender: sender_state.address(), + recipient: FastPayAddress::random_for_testing_only(), + object_ref: (object_id, Default::default(), ObjectDigest::new([0; 32])), + gas_payment: (gas_object, Default::default(), ObjectDigest::new([0; 32])), + }; + // Simulate locking some objects + sender_state + .lock_pending_order_objects(&Order::new( + OrderKind::Transfer(transfer), + &get_key_pair().1, + )) + .unwrap(); + // Try to use those objects in another order + let result = sender_state + .transfer_object(object_id, gas_object, recipient) + .await; + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err().downcast_ref(), + Some(FastPayError::ConcurrentTransactionError) + )); + // clear the pending orders + sender_state.store().pending_orders.clear().unwrap(); + assert_eq!(sender_state.store().pending_orders.iter().count(), 0); +} diff --git a/fastpay_core/src/unit_tests/downloader_tests.rs b/fastpay_core/src/unit_tests/downloader_tests.rs deleted file mode 100644 index 5bcda6acd2c12..0000000000000 --- a/fastpay_core/src/unit_tests/downloader_tests.rs +++ /dev/null @@ -1,47 +0,0 @@ -// Copyright (c) Facebook, Inc. and its affiliates. -// SPDX-License-Identifier: Apache-2.0 - -use super::*; -use async_trait::async_trait; -use std::sync::{ - atomic::{AtomicU32, Ordering}, - Arc, -}; -use tokio::runtime::Runtime; - -#[derive(Clone)] -struct LocalRequester(Arc); - -impl LocalRequester { - fn new() -> Self { - Self(Arc::new(AtomicU32::new(0))) - } -} - -#[async_trait] -impl Requester for LocalRequester { - type Key = &'static str; - type Value = u32; - - async fn query(&mut self, _key: Self::Key) -> Self::Value { - self.0.fetch_add(1, Ordering::Relaxed) - } -} - -#[test] -fn test_local_downloader() { - let rt = Runtime::new().unwrap(); - rt.block_on(async move { - let requester = LocalRequester::new(); - let (task, mut handle) = Downloader::start(requester, vec![("a", 10), ("d", 11)]); - assert_eq!(handle.query("b").await.unwrap(), 0); - assert_eq!(handle.query("a").await.unwrap(), 10); - assert_eq!(handle.query("d").await.unwrap(), 11); - assert_eq!(handle.query("c").await.unwrap(), 1); - assert_eq!(handle.query("b").await.unwrap(), 0); - handle.stop().await.unwrap(); - let values: Vec<_> = task.await.unwrap().collect(); - // Cached values are returned ordered by keys. - assert_eq!(values, vec![10, 0, 1, 11]); - }); -}