diff --git a/fastpay/src/server_lib.rs b/fastpay/src/server_lib.rs index 437fa749ce285..6b11abe9a2124 100644 --- a/fastpay/src/server_lib.rs +++ b/fastpay/src/server_lib.rs @@ -108,6 +108,12 @@ impl MessageHandler for RunningServerState { .handle_object_info_request(*message) .await .map(|info| Some(serialize_object_info_response(&info))), + SerializedMessage::OrderInfoReq(message) => self + .server + .state + .handle_order_info_request(*message) + .await + .map(|info| Some(serialize_order_info(&info))), _ => Err(FastPayError::UnexpectedMessage), } } diff --git a/fastpay_core/src/authority.rs b/fastpay_core/src/authority.rs index 8aa963d8d0514..34ca745405a87 100644 --- a/fastpay_core/src/authority.rs +++ b/fastpay_core/src/authority.rs @@ -235,17 +235,26 @@ impl AuthorityState { .map(|(_, object)| object) .collect(); + let mut transaction_dependencies: BTreeSet<_> = inputs + .iter() + .map(|object| object.previous_transaction) + .collect(); + // Insert into the certificates map let mut tx_ctx = TxContext::new(order.sender(), transaction_digest); let gas_object_id = *order.gas_payment_object_id(); let (temporary_store, status) = self.execute_order(order, inputs, &mut tx_ctx)?; + // Remove from dependencies the generic hash + transaction_dependencies.remove(&TransactionDigest::genesis()); + // Update the database in an atomic manner let to_signed_effects = temporary_store.to_signed_effects( &self.name, &self.secret, &transaction_digest, + transaction_dependencies.into_iter().collect(), status, &gas_object_id, ); @@ -259,7 +268,7 @@ impl AuthorityState { mut inputs: Vec, tx_ctx: &mut TxContext, ) -> FastPayResult<(AuthorityTemporaryStore, ExecutionStatus)> { - let mut temporary_store = AuthorityTemporaryStore::new(self, &inputs); + let mut temporary_store = AuthorityTemporaryStore::new(self, &inputs, tx_ctx.digest()); // unwraps here are safe because we built `inputs` let mut gas_object = inputs.pop().unwrap(); @@ -345,6 +354,13 @@ impl AuthorityState { Ok(ExecutionStatus::Success) } + pub async fn handle_order_info_request( + &self, + request: OrderInfoRequest, + ) -> Result { + self.make_order_info(&request.transaction_digest).await + } + pub async fn handle_account_info_request( &self, request: AccountInfoRequest, diff --git a/fastpay_core/src/authority/temporary_store.rs b/fastpay_core/src/authority/temporary_store.rs index 85328ce79d225..53d7ba35d6464 100644 --- a/fastpay_core/src/authority/temporary_store.rs +++ b/fastpay_core/src/authority/temporary_store.rs @@ -12,6 +12,7 @@ pub type InnerTemporaryStore = ( pub struct AuthorityTemporaryStore { object_store: Arc, + tx_digest: TransactionDigest, objects: BTreeMap, active_inputs: Vec, // Inputs that are not read only // TODO: We need to study whether it's worth to optimize the lookup of @@ -29,9 +30,11 @@ impl AuthorityTemporaryStore { pub fn new( authority_state: &AuthorityState, _input_objects: &'_ [Object], + tx_digest: TransactionDigest, ) -> AuthorityTemporaryStore { AuthorityTemporaryStore { object_store: authority_state._database.clone(), + tx_digest, objects: _input_objects.iter().map(|v| (v.id(), v.clone())).collect(), active_inputs: _input_objects .iter() @@ -92,6 +95,7 @@ impl AuthorityTemporaryStore { authority_name: &AuthorityName, secret: &KeyPair, transaction_digest: &TransactionDigest, + transaction_dependencies: Vec, status: ExecutionStatus, gas_object_id: &ObjectID, ) -> SignedOrderEffects { @@ -119,6 +123,7 @@ impl AuthorityTemporaryStore { .collect(), gas_object: (gas_object.to_object_reference(), gas_object.owner), events: self.events.clone(), + dependencies: transaction_dependencies, }; let signature = Signature::new(&effects, secret); @@ -196,7 +201,7 @@ impl Storage for AuthorityTemporaryStore { caller. */ - fn write_object(&mut self, object: Object) { + fn write_object(&mut self, mut object: Object) { // Check it is not read-only #[cfg(test)] // Movevm should ensure this if let Some(existing_object) = self.read_object(&object.id()) { @@ -207,6 +212,9 @@ impl Storage for AuthorityTemporaryStore { } } + // The adapter is not very disciplined at filling in the correct + // previous transaction digest, so we ensure it is correct here. + object.previous_transaction = self.tx_digest; self.written.insert(object.id(), object); } diff --git a/fastpay_core/src/client.rs b/fastpay_core/src/client.rs index b2d3b9f993256..5ffc7569720cc 100644 --- a/fastpay_core/src/client.rs +++ b/fastpay_core/src/client.rs @@ -59,6 +59,12 @@ pub trait AuthorityClient { &self, request: ObjectInfoRequest, ) -> Result; + + /// Handle Object information requests for this account. + async fn handle_order_info_request( + &self, + request: OrderInfoRequest, + ) -> Result; } #[async_trait] @@ -99,6 +105,18 @@ impl AuthorityClient for network::Client { ) .await } + + /// Handle Object information requests for this account. + async fn handle_order_info_request( + &self, + request: OrderInfoRequest, + ) -> Result { + self.send_recv_bytes( + serialize_order_info_request(&request), + order_info_deserializer, + ) + .await + } } pub struct ClientState { @@ -402,7 +420,7 @@ where source_authority: AuthorityName, destination_authority: AuthorityName, ) -> Result<(), FastPayError> { - let source_client = self.authority_clients[&source_authority].clone(); + let mut source_client = self.authority_clients[&source_authority].clone(); let mut destination_client = self.authority_clients[&destination_authority].clone(); // This represents a stack of certificates that we need to register with the @@ -444,40 +462,58 @@ where // TODO: Eventually the client will store more information, and we could // first try to read certificates and parents from a local cache before // asking an authority. - let input_objects = target_cert.certificate.order.input_objects(); + // let input_objects = target_cert.certificate.order.input_objects(); + + let order_info = if missing_certificates.is_empty() { + // Here we cover a corner case due to the nature of using consistent + // broadcast: it is possible for the client to have a certificate + // signed by some authority, before the authority has processed the + // certificate. This can only happen to a certificate for objects + // not used in another certificicate, hence it can only be the case + // for the very first certificate we try to sync. For this reason for + // this one instead of asking for the effects of a previous execution + // we send the cert for execution. Since execution is idempotent this + // is ok. + + source_client + .handle_confirmation_order(target_cert.clone()) + .await? + } else { + // Unlike the previous case if a certificate created an object that + // was involved in the processing of another certificate the previous + // cert must have been processed, so here we just ask for the effects + // of such an execution. + + source_client + .handle_order_info_request(OrderInfoRequest { + transaction_digest: cert_digest, + }) + .await? + }; // Put back the target cert missing_certificates.push(target_cert); + let signed_effects = &order_info + .signed_effects + .ok_or(FastPayError::AuthorityInformationUnavailable)?; - for object_kind in input_objects { - // Request the parent certificate from the authority. - let object_info_response = source_client - .handle_object_info_request(ObjectInfoRequest { - object_id: object_kind.object_id(), - request_sequence_number: Some(object_kind.version()), - }) - .await; - - let object_info = match object_info_response { - Ok(object_info) => object_info, - // Here we cover the case the object genuinely has no parent. - Err(FastPayError::ParentNotfound { .. }) => { - continue; - } - Err(e) => return Err(e), - }; - - let returned_certificate = object_info - .parent_certificate - .ok_or(FastPayError::AuthorityInformationUnavailable)?; - let returned_digest = returned_certificate.order.digest(); - + for returned_digest in &signed_effects.effects.dependencies { // We check that we are not processing twice the same certificate, as // it would be common if two objects used by one order, were also both // mutated by the same preceeding order. - if !candidate_certificates.contains(&returned_digest) { + if !candidate_certificates.contains(returned_digest) { // Add this cert to the set we have processed - candidate_certificates.insert(returned_digest); + candidate_certificates.insert(*returned_digest); + + let inner_order_info = source_client + .handle_order_info_request(OrderInfoRequest { + transaction_digest: *returned_digest, + }) + .await?; + + let returned_certificate = inner_order_info + .certified_order + .ok_or(FastPayError::AuthorityInformationUnavailable)?; // Check & Add it to the list of certificates to sync returned_certificate.check(&self.committee).map_err(|_| { diff --git a/fastpay_core/src/unit_tests/authority_tests.rs b/fastpay_core/src/unit_tests/authority_tests.rs index ec70883310530..4c278ac052fc0 100644 --- a/fastpay_core/src/unit_tests/authority_tests.rs +++ b/fastpay_core/src/unit_tests/authority_tests.rs @@ -886,6 +886,16 @@ async fn test_handle_confirmation_order_idempotent() { // this is valid because we're checking the authority state does not change the certificate compare_order_info_responses(&info, &info2); + + // Now check the order info request is also the same + let info3 = authority_state + .handle_order_info_request(OrderInfoRequest { + transaction_digest: certified_transfer_order.order.digest(), + }) + .await + .unwrap(); + + compare_order_info_responses(&info, &info3); } #[tokio::test] diff --git a/fastpay_core/src/unit_tests/client_tests.rs b/fastpay_core/src/unit_tests/client_tests.rs index c11a36e609d98..bb700b12dc93d 100644 --- a/fastpay_core/src/unit_tests/client_tests.rs +++ b/fastpay_core/src/unit_tests/client_tests.rs @@ -79,6 +79,17 @@ impl AuthorityClient for LocalAuthorityClient { let x = state.lock().await.handle_object_info_request(request).await; x } + + /// Handle Object information requests for this account. + async fn handle_order_info_request( + &self, + request: OrderInfoRequest, + ) -> Result { + let state = self.0.clone(); + + let result = state.lock().await.handle_order_info_request(request).await; + result + } } impl LocalAuthorityClient { @@ -1027,21 +1038,6 @@ async fn test_move_calls_chain_many_authority_syncronization() { .unwrap() .to_object_reference(); - /* Turn on if you want to observe the sequence numbers */ - /* - for (_auth_name, auth_client) in &authority_clients { - let single_client_response = auth_client.handle_object_info_request(ObjectInfoRequest { - object_id: gas_object_ref.0, - request_sequence_number: None, - }) - .await - .unwrap() - .object - .to_object_reference(); - println!("{:?} - {:?}", _auth_name, single_client_response.1); - } - */ - let pure_args = vec![bcs::to_bytes(&value).unwrap()]; let _call_response = client1 .move_call( @@ -1057,6 +1053,7 @@ async fn test_move_calls_chain_many_authority_syncronization() { .await; last_certificate = _call_response.unwrap().0; + println!("EXECUTE: {:?}", last_certificate.order.digest()); } // For this test to work the client has updated the first 3 authorities but not the last one @@ -1094,7 +1091,7 @@ async fn test_move_calls_chain_many_authority_syncronization() { // This is (finally) the function we want to test // If we try to sync from the authority that does not have the data to the one - // that does we fail. + // that does not we fail. let result = client1 .sync_authority_source_to_destination( ConfirmationOrder::new(last_certificate.clone()), @@ -1118,9 +1115,6 @@ async fn test_move_calls_chain_many_authority_syncronization() { assert!(result.is_ok()); } -// BUG(https://github.com/MystenLabs/fastnft/issues/282) -// We ignore this test due to a known bug. -#[ignore] #[tokio::test] async fn test_move_calls_chain_many_delete_authority_synchronization() { let (authority_clients, committee) = init_local_authorities(4).await; @@ -1189,21 +1183,6 @@ async fn test_move_calls_chain_many_delete_authority_synchronization() { .unwrap() .to_object_reference(); - /* Turn on if you want to observe the sequence numbers */ - /* - for (_auth_name, auth_client) in &authority_clients { - let single_client_response = auth_client.handle_object_info_request(ObjectInfoRequest { - object_id: gas_object_ref.0, - request_sequence_number: None, - }) - .await - .unwrap() - .object - .to_object_reference(); - println!("{:?} - {:?}", _auth_name, single_client_response.1); - } - */ - let pure_args = vec![bcs::to_bytes(&value).unwrap()]; let _call_response = client1 .move_call( @@ -1313,14 +1292,174 @@ async fn test_move_calls_chain_many_delete_authority_synchronization() { ) .await; - // BUG(https://github.com/MystenLabs/fastnft/issues/282) - // - // Here we get: - // ERROR: ObjectNotFound { object_id: 258E1B39E8300FB459A6A68785958C142A901D48 } - if let Err(e) = &result { - println!("ERROR: {:?}", e); + assert!(result.is_ok()); +} + +#[tokio::test] +async fn test_move_calls_chain_many_delete_authority_auto_synchronization() { + let (authority_clients, committee) = init_local_authorities(4).await; + let mut client1 = make_client(authority_clients.clone(), committee.clone()); + // let client2 = make_client(authority_clients.clone(), committee); + + let object_value: u64 = 100; + let gas_object_id = ObjectID::random(); + let framework_obj_ref = client1.get_framework_object_ref().await.unwrap(); + + // Populate authorities with obj data + let mut gas_object_ref = fund_account_with_same_objects( + authority_clients.clone().values().collect(), + &mut client1, + vec![gas_object_id], + ) + .await + .iter() + .next() + .unwrap() + .1 + .to_object_reference(); + + // When creating an ObjectBasics object, we provide the value (u64) and address which will own the object + let pure_args = vec![ + object_value.to_le_bytes().to_vec(), + bcs::to_bytes(&client1.address.to_vec()).unwrap(), + ]; + let call_response = client1 + .move_call( + framework_obj_ref, + ident_str!("ObjectBasics").to_owned(), + ident_str!("create").to_owned(), + Vec::new(), + gas_object_ref, + Vec::new(), + pure_args, + GAS_VALUE_FOR_TESTING - 1, // Make sure budget is less than gas value + ) + .await; + + let (_, order_effects) = call_response.unwrap(); + assert_eq!(order_effects.gas_object.0 .0, gas_object_id); + + // Get the object created from the call + let (new_obj_ref, _) = order_effects.created[0]; + + for value in 0u64..20u64 { + // Fetch the full object + let new_obj = client1 + .get_object_info(ObjectInfoRequest { + object_id: new_obj_ref.0, + request_sequence_number: None, + }) + .await + .unwrap(); + + gas_object_ref = client1 + .get_object_info(ObjectInfoRequest { + object_id: gas_object_ref.0, + request_sequence_number: None, + }) + .await + .unwrap() + .object() + .unwrap() + .to_object_reference(); + + let pure_args = vec![bcs::to_bytes(&value).unwrap()]; + let _call_response = client1 + .move_call( + framework_obj_ref, + ident_str!("ObjectBasics").to_owned(), + ident_str!("set_value").to_owned(), + Vec::new(), + gas_object_ref, + vec![new_obj.object().unwrap().to_object_reference()], + pure_args, + GAS_VALUE_FOR_TESTING / 2, + ) + .await; } + // Fetch the full object + let new_obj_ref = client1 + .get_object_info(ObjectInfoRequest { + object_id: new_obj_ref.0, + request_sequence_number: None, + }) + .await + .unwrap() + .object() + .unwrap() + .to_object_reference(); + + gas_object_ref = client1 + .get_object_info(ObjectInfoRequest { + object_id: gas_object_ref.0, + request_sequence_number: None, + }) + .await + .unwrap() + .object() + .unwrap() + .to_object_reference(); + + let call_response = client1 + .move_call( + framework_obj_ref, + ident_str!("ObjectBasics").to_owned(), + ident_str!("delete").to_owned(), + Vec::new(), + gas_object_ref, + vec![new_obj_ref], + Vec::new(), + GAS_VALUE_FOR_TESTING / 2, + ) + .await; + + let last_certificate = call_response.unwrap().0; + + // For this test to work the client has updated the first 3 authorities but not the last one + // Assert this to catch any changes to the client behaviour that reqire fixing this test to still + // test sync. + + let authorities: Vec<_> = authority_clients.clone().into_iter().collect(); + + let full_seq = authorities[2] + .1 + .handle_object_info_request(ObjectInfoRequest { + object_id: gas_object_ref.0, + request_sequence_number: None, + }) + .await + .unwrap() + .object() + .unwrap() + .to_object_reference(); + assert_eq!(full_seq.1, SequenceNumber::from(22)); + + let zero_seq = authorities[3] + .1 + .handle_object_info_request(ObjectInfoRequest { + object_id: gas_object_ref.0, + request_sequence_number: None, + }) + .await + .unwrap() + .object() + .unwrap() + .to_object_reference(); + assert_eq!(zero_seq.1, SequenceNumber::from(0)); + + // This is (finally) the function we want to test + + // If we try to sync we succeed. + let result = client1 + .sync_certificate_to_authority_with_timeout( + ConfirmationOrder::new(last_certificate), + authorities[3].0, + 1000, // ms + 2, // retry + ) + .await; + assert!(result.is_ok()); } diff --git a/fastx_types/src/messages.rs b/fastx_types/src/messages.rs index 7d1bd13f282fc..b149669478f7d 100644 --- a/fastx_types/src/messages.rs +++ b/fastx_types/src/messages.rs @@ -158,6 +158,11 @@ impl ObjectInfoResponse { } } +#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize)] +pub struct OrderInfoRequest { + pub transaction_digest: TransactionDigest, +} + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct OrderInfoResponse { // The signed order response to handle_order @@ -218,6 +223,8 @@ pub struct OrderEffects { pub gas_object: (ObjectRef, Authenticator), /// The events emitted during execution. Note that only successful transactions emit events pub events: Vec, + /// The set of transaction digests this order depends on. + pub dependencies: Vec, } impl OrderEffects { diff --git a/fastx_types/src/serialize.rs b/fastx_types/src/serialize.rs index d580d70c415e7..92b7818326fa6 100644 --- a/fastx_types/src/serialize.rs +++ b/fastx_types/src/serialize.rs @@ -22,6 +22,7 @@ pub enum SerializedMessage { ObjectInfoReq(Box), ObjectInfoResp(Box), OrderResp(Box), + OrderInfoReq(Box), } // This helper structure is only here to avoid cloning while serializing commands. @@ -39,6 +40,7 @@ enum ShallowSerializedMessage<'a> { ObjectInfoReq(&'a ObjectInfoRequest), ObjectInfoResp(&'a ObjectInfoResponse), OrderResp(&'a OrderInfoResponse), + OrderInfoReq(&'a OrderInfoRequest), } fn serialize_into(writer: W, msg: &T) -> Result<(), anyhow::Error> @@ -105,6 +107,10 @@ pub fn serialize_object_info_response(value: &ObjectInfoResponse) -> Vec { serialize(&ShallowSerializedMessage::ObjectInfoResp(value)) } +pub fn serialize_order_info_request(value: &OrderInfoRequest) -> Vec { + serialize(&ShallowSerializedMessage::OrderInfoReq(value)) +} + pub fn serialize_vote(value: &SignedOrder) -> Vec { serialize(&ShallowSerializedMessage::Vote(value)) }