diff --git a/.evergreen/feature-combinations.sh b/.evergreen/feature-combinations.sh index 92fcc9eb4..20dc50e2f 100755 --- a/.evergreen/feature-combinations.sh +++ b/.evergreen/feature-combinations.sh @@ -6,5 +6,5 @@ export FEATURE_COMBINATIONS=( '' # default features '--no-default-features --features async-std-runtime,sync' # features that conflict w/ default features - '--features tokio-sync,zstd-compression,snappy-compression,zlib-compression,openssl-tls,aws-auth' # additive features + '--features tokio-sync,zstd-compression,snappy-compression,zlib-compression,openssl-tls,aws-auth,csfle' # additive features ) diff --git a/Cargo.toml b/Cargo.toml index 073b6a59f..e82b59047 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,7 @@ zlib-compression = ["flate2"] snappy-compression = ["snap"] # DO NOT USE; see https://jira.mongodb.org/browse/RUST-569 for the status of CSFLE support in the Rust driver. -csfle = ["mongocrypt", "which"] +csfle = ["mongocrypt", "which", "rayon"] [dependencies] async-trait = "0.1.42" @@ -89,6 +89,7 @@ openssl-probe = { version = "0.1.5", optional = true } os_info = { version = "3.0.1", default-features = false } percent-encoding = "2.0.0" rand = { version = "0.8.3", features = ["small_rng"] } +rayon = { version = "1.5.3", optional = true } rustc_version_runtime = "0.2.1" rustls-pemfile = "0.3.0" serde_with = "1.3.1" diff --git a/src/client/csfle.rs b/src/client/csfle.rs index 61cfe4641..7db1c87b2 100644 --- a/src/client/csfle.rs +++ b/src/client/csfle.rs @@ -1,4 +1,5 @@ pub mod options; +mod state_machine; use std::{ path::Path, @@ -7,6 +8,7 @@ use std::{ use derivative::Derivative; use mongocrypt::Crypt; +use rayon::ThreadPool; use crate::{ error::{Error, Result}, @@ -30,21 +32,18 @@ use super::WeakClient; #[derivative(Debug)] pub(super) struct ClientState { #[derivative(Debug = "ignore")] - #[allow(dead_code)] - crypt: Crypt, + pub(crate) crypt: Crypt, mongocryptd_client: Option, aux_clients: AuxClients, opts: AutoEncryptionOptions, + crypto_threads: ThreadPool, } #[derive(Debug)] struct AuxClients { - #[allow(dead_code)] key_vault_client: WeakClient, - #[allow(dead_code)] metadata_client: Option, - #[allow(dead_code)] - internal_client: Option, + _internal_client: Option, } impl ClientState { @@ -52,12 +51,18 @@ impl ClientState { let crypt = Self::make_crypt(&opts)?; let mongocryptd_client = Self::spawn_mongocryptd_if_needed(&opts, &crypt).await?; let aux_clients = Self::make_aux_clients(client, &opts)?; + let num_cpus = std::thread::available_parallelism()?.get(); + let crypto_threads = rayon::ThreadPoolBuilder::new() + .num_threads(num_cpus) + .build() + .map_err(|e| Error::internal(format!("could not initialize thread pool: {}", e)))?; Ok(Self { crypt, mongocryptd_client, aux_clients, opts, + crypto_threads, }) } @@ -171,7 +176,7 @@ impl ClientState { Ok(AuxClients { key_vault_client, metadata_client, - internal_client, + _internal_client: internal_client, }) } } diff --git a/src/client/csfle/state_machine.rs b/src/client/csfle/state_machine.rs new file mode 100644 index 000000000..1532b1ad7 --- /dev/null +++ b/src/client/csfle/state_machine.rs @@ -0,0 +1,171 @@ +use std::convert::TryInto; + +use bson::{Document, RawDocument, RawDocumentBuf}; +use futures_util::{stream, TryStreamExt}; +use mongocrypt::ctx::{Ctx, State}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + sync::oneshot, +}; + +use crate::{ + client::options::ServerAddress, + cmap::options::StreamOptions, + error::{Error, Result}, + operation::{RawOutput, RunCommand}, + runtime::AsyncStream, + Client, +}; + +impl Client { + pub(crate) async fn run_mongocrypt_ctx( + &self, + ctx: Ctx, + db: Option<&str>, + ) -> Result { + let guard = self.inner.csfle.read().await; + let csfle = match guard.as_ref() { + Some(csfle) => csfle, + None => return Err(Error::internal("no csfle state for mongocrypt ctx")), + }; + let mut result = None; + // This needs to be a `Result` so that the `Ctx` can be temporarily owned by the processing + // thread for crypto finalization. An `Option` would also work here, but `Result` means we + // can return a helpful error if things get into a broken state rather than panicing. + let mut ctx = Ok(ctx); + loop { + let state = result_ref(&ctx)?.state()?; + match state { + State::NeedMongoCollinfo => { + let ctx = result_mut(&mut ctx)?; + let filter = raw_to_doc(ctx.mongo_op()?)?; + let metadata_client = csfle + .aux_clients + .metadata_client + .as_ref() + .and_then(|w| w.upgrade()) + .ok_or_else(|| { + Error::internal("metadata_client required for NeedMongoCollinfo state") + })?; + let db = metadata_client.database(db.as_ref().ok_or_else(|| { + Error::internal("db required for NeedMongoCollinfo state") + })?); + let mut cursor = db.list_collections(filter, None).await?; + if cursor.advance().await? { + ctx.mongo_feed(cursor.current())?; + } + ctx.mongo_done()?; + } + State::NeedMongoMarkings => { + let ctx = result_mut(&mut ctx)?; + let command = ctx.mongo_op()?.to_raw_document_buf(); + let db = db.as_ref().ok_or_else(|| { + Error::internal("db required for NeedMongoMarkings state") + })?; + let op = RawOutput(RunCommand::new_raw(db.to_string(), command, None, None)?); + let mongocryptd_client = csfle + .mongocryptd_client + .as_ref() + .ok_or_else(|| Error::internal("mongocryptd client not found"))?; + let response = mongocryptd_client.execute_operation(op, None).await?; + ctx.mongo_feed(response.raw_body())?; + ctx.mongo_done()?; + } + State::NeedMongoKeys => { + let ctx = result_mut(&mut ctx)?; + let filter = raw_to_doc(ctx.mongo_op()?)?; + let kv_ns = &csfle.opts.key_vault_namespace; + let kv_client = csfle + .aux_clients + .key_vault_client + .upgrade() + .ok_or_else(|| Error::internal("key vault client dropped"))?; + let kv_coll = kv_client + .database(&kv_ns.db) + .collection::(&kv_ns.coll); + let mut cursor = kv_coll.find(filter, None).await?; + while cursor.advance().await? { + ctx.mongo_feed(cursor.current())?; + } + ctx.mongo_done()?; + } + State::NeedKms => { + let ctx = result_mut(&mut ctx)?; + let scope = ctx.kms_scope(); + let mut kms_ctxen: Vec> = vec![]; + while let Some(kms_ctx) = scope.next_kms_ctx() { + kms_ctxen.push(Ok(kms_ctx)); + } + stream::iter(kms_ctxen) + .try_for_each_concurrent(None, |mut kms_ctx| async move { + let endpoint = kms_ctx.endpoint()?; + let addr = ServerAddress::parse(endpoint)?; + let provider = kms_ctx.kms_provider()?; + let tls_options = csfle + .opts() + .tls_options + .as_ref() + .and_then(|tls| tls.get(&provider)) + .cloned() + .unwrap_or_default(); + let mut stream = AsyncStream::connect( + StreamOptions::builder() + .address(addr) + .tls_options(tls_options) + .build(), + ) + .await?; + stream.write_all(kms_ctx.message()?).await?; + let mut buf = vec![]; + while kms_ctx.bytes_needed() > 0 { + let buf_size = kms_ctx.bytes_needed().try_into().map_err(|e| { + Error::internal(format!("buffer size overflow: {}", e)) + })?; + buf.resize(buf_size, 0); + let count = stream.read(&mut buf).await?; + kms_ctx.feed(&buf[0..count])?; + } + Ok(()) + }) + .await?; + } + State::NeedKmsCredentials => todo!("RUST-1314"), + State::Ready => { + let (tx, rx) = oneshot::channel(); + let mut thread_ctx = std::mem::replace( + &mut ctx, + Err(Error::internal("crypto context not present")), + )?; + csfle.crypto_threads.spawn(move || { + let result = thread_ctx.finalize().map(|doc| doc.to_owned()); + let _ = tx.send((thread_ctx, result)); + }); + let (ctx_again, output) = rx + .await + .map_err(|_| Error::internal("crypto thread dropped"))?; + ctx = Ok(ctx_again); + result = Some(output?); + } + State::Done => break, + s => return Err(Error::internal(format!("unhandled state {:?}", s))), + } + } + match result { + Some(doc) => Ok(doc), + None => Err(Error::internal("libmongocrypt terminated without output")), + } + } +} + +fn result_ref(r: &Result) -> Result<&T> { + r.as_ref().map_err(Error::clone) +} + +fn result_mut(r: &mut Result) -> Result<&mut T> { + r.as_mut().map_err(|e| e.clone()) +} + +fn raw_to_doc(raw: &RawDocument) -> Result { + raw.try_into() + .map_err(|e| Error::internal(format!("could not parse raw document: {}", e))) +} diff --git a/src/client/executor.rs b/src/client/executor.rs index 731240fa5..6f08ac9ae 100644 --- a/src/client/executor.rs +++ b/src/client/executor.rs @@ -1,4 +1,8 @@ +#[cfg(feature = "csfle")] +use bson::RawDocumentBuf; use bson::{doc, RawBsonRef, RawDocument, Timestamp}; +#[cfg(feature = "csfle")] +use futures_core::future::BoxFuture; use lazy_static::lazy_static; use serde::de::DeserializeOwned; @@ -598,6 +602,21 @@ impl Client { let target_db = cmd.target_db.clone(); let serialized = op.serialize_command(cmd)?; + #[cfg(feature = "csfle")] + let serialized = { + let guard = self.inner.csfle.read().await; + if let Some(ref csfle) = *guard { + if csfle.opts().bypass_auto_encryption != Some(true) { + self.auto_encrypt(csfle, RawDocument::from_bytes(&serialized)?, &target_db) + .await? + .into_bytes() + } else { + serialized + } + } else { + serialized + } + }; let raw_cmd = RawCommand { name: cmd_name.clone(), target_db, @@ -750,6 +769,21 @@ impl Client { handler.handle_command_succeeded_event(command_succeeded_event); }); + #[cfg(feature = "csfle")] + let response = { + let guard = self.inner.csfle.read().await; + if let Some(ref csfle) = *guard { + if csfle.opts().bypass_auto_encryption != Some(true) { + let new_body = self.auto_decrypt(csfle, response.raw_body()).await?; + RawCommandResponse::new_raw(response.source, new_body) + } else { + response + } + } else { + response + } + }; + match op.handle_response(response, connection.stream_description()?) { Ok(response) => Ok(response), Err(mut err) => { @@ -765,6 +799,34 @@ impl Client { } } + #[cfg(feature = "csfle")] + fn auto_encrypt<'a>( + &'a self, + csfle: &'a super::csfle::ClientState, + command: &'a RawDocument, + target_db: &'a str, + ) -> BoxFuture<'a, Result> { + Box::pin(async move { + let ctx = csfle + .crypt + .ctx_builder() + .build_encrypt(target_db, command)?; + self.run_mongocrypt_ctx(ctx, Some(target_db)).await + }) + } + + #[cfg(feature = "csfle")] + fn auto_decrypt<'a>( + &'a self, + csfle: &'a super::csfle::ClientState, + response: &'a RawDocument, + ) -> BoxFuture<'a, Result> { + Box::pin(async move { + let ctx = csfle.crypt.ctx_builder().build_decrypt(response)?; + self.run_mongocrypt_ctx(ctx, None).await + }) + } + /// Start an implicit session if the operation and write concern are compatible with sessions. async fn start_implicit_session(&self, op: &T) -> Result> { match self.get_session_support_status().await? { diff --git a/src/cmap/conn/command.rs b/src/cmap/conn/command.rs index 265453feb..a6d6ddece 100644 --- a/src/cmap/conn/command.rs +++ b/src/cmap/conn/command.rs @@ -203,10 +203,11 @@ impl RawCommandResponse { pub(crate) fn new(source: ServerAddress, message: Message) -> Result { let raw = message.single_document_response()?; - Ok(Self { - source, - raw: RawDocumentBuf::from_bytes(raw)?, - }) + Ok(Self::new_raw(source, RawDocumentBuf::from_bytes(raw)?)) + } + + pub(crate) fn new_raw(source: ServerAddress, raw: RawDocumentBuf) -> Self { + Self { source, raw } } pub(crate) fn body<'a, T: Deserialize<'a>>(&'a self) -> Result { diff --git a/src/coll/mod.rs b/src/coll/mod.rs index 6916a5d84..ed9bfee0f 100644 --- a/src/coll/mod.rs +++ b/src/coll/mod.rs @@ -1189,6 +1189,10 @@ where } let ordered = options.as_ref().and_then(|o| o.ordered).unwrap_or(true); + #[cfg(feature = "csfle")] + let encrypted = self.client().auto_encryption_opts().await.is_some(); + #[cfg(not(feature = "csfle"))] + let encrypted = false; let mut cumulative_failure: Option = None; let mut error_labels: HashSet = Default::default(); @@ -1198,7 +1202,7 @@ where while n_attempted < ds.len() { let docs: Vec<&T> = ds.iter().skip(n_attempted).map(Borrow::borrow).collect(); - let insert = Insert::new(self.namespace(), docs, options.clone()); + let insert = Insert::new_encrypted(self.namespace(), docs, options.clone(), encrypted); match self .client() diff --git a/src/operation/abort_transaction/mod.rs b/src/operation/abort_transaction/mod.rs index 2e4f9b5ec..7a08861dd 100644 --- a/src/operation/abort_transaction/mod.rs +++ b/src/operation/abort_transaction/mod.rs @@ -5,12 +5,12 @@ use crate::{ client::session::TransactionPin, cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription}, error::Result, - operation::{Operation, Retryability}, + operation::Retryability, options::WriteConcern, selection_criteria::SelectionCriteria, }; -use super::WriteConcernOnlyBody; +use super::{OperationWithDefaults, WriteConcernOnlyBody}; pub(crate) struct AbortTransaction { write_concern: Option, @@ -26,7 +26,7 @@ impl AbortTransaction { } } -impl Operation for AbortTransaction { +impl OperationWithDefaults for AbortTransaction { type O = (); type Command = Document; diff --git a/src/operation/aggregate/change_stream.rs b/src/operation/aggregate/change_stream.rs index e6c1bb5ac..50a108cc3 100644 --- a/src/operation/aggregate/change_stream.rs +++ b/src/operation/aggregate/change_stream.rs @@ -4,7 +4,7 @@ use crate::{ cmap::{Command, RawCommandResponse, StreamDescription}, cursor::CursorSpecification, error::Result, - operation::{append_options, Operation, Retryability}, + operation::{append_options, OperationWithDefaults, Retryability}, options::{ChangeStreamOptions, SelectionCriteria, WriteConcern}, }; @@ -39,7 +39,7 @@ impl ChangeStreamAggregate { } } -impl Operation for ChangeStreamAggregate { +impl OperationWithDefaults for ChangeStreamAggregate { type O = (CursorSpecification, ChangeStreamData); type Command = Document; diff --git a/src/operation/aggregate/mod.rs b/src/operation/aggregate/mod.rs index b707bfc10..1bb5c9ad0 100644 --- a/src/operation/aggregate/mod.rs +++ b/src/operation/aggregate/mod.rs @@ -9,12 +9,12 @@ use crate::{ cmap::{Command, RawCommandResponse, StreamDescription}, cursor::CursorSpecification, error::Result, - operation::{append_options, remove_empty_write_concern, Operation, Retryability}, + operation::{append_options, remove_empty_write_concern, Retryability}, options::{AggregateOptions, SelectionCriteria, WriteConcern}, Namespace, }; -use super::{CursorBody, WriteConcernOnlyBody, SERVER_4_2_0_WIRE_VERSION}; +use super::{CursorBody, OperationWithDefaults, WriteConcernOnlyBody, SERVER_4_2_0_WIRE_VERSION}; pub(crate) use change_stream::ChangeStreamAggregate; @@ -46,7 +46,7 @@ impl Aggregate { // IMPORTANT: If new method implementations are added here, make sure `ChangeStreamAggregate` has // the equivalent delegations. -impl Operation for Aggregate { +impl OperationWithDefaults for Aggregate { type O = CursorSpecification; type Command = Document; diff --git a/src/operation/commit_transaction/mod.rs b/src/operation/commit_transaction/mod.rs index 1a3e4b532..3c3d455d6 100644 --- a/src/operation/commit_transaction/mod.rs +++ b/src/operation/commit_transaction/mod.rs @@ -5,7 +5,7 @@ use bson::{doc, Document}; use crate::{ cmap::{Command, RawCommandResponse, StreamDescription}, error::Result, - operation::{append_options, remove_empty_write_concern, Operation, Retryability}, + operation::{append_options, remove_empty_write_concern, OperationWithDefaults, Retryability}, options::{Acknowledgment, TransactionOptions, WriteConcern}, }; @@ -21,7 +21,7 @@ impl CommitTransaction { } } -impl Operation for CommitTransaction { +impl OperationWithDefaults for CommitTransaction { type O = (); type Command = Document; diff --git a/src/operation/count/mod.rs b/src/operation/count/mod.rs index 50d4dc7e5..15c0f5fe5 100644 --- a/src/operation/count/mod.rs +++ b/src/operation/count/mod.rs @@ -9,7 +9,7 @@ use crate::{ cmap::{Command, RawCommandResponse, StreamDescription}, coll::{options::EstimatedDocumentCountOptions, Namespace}, error::{Error, Result}, - operation::{append_options, Operation, Retryability}, + operation::{append_options, OperationWithDefaults, Retryability}, selection_criteria::SelectionCriteria, }; @@ -35,7 +35,7 @@ impl Count { } } -impl Operation for Count { +impl OperationWithDefaults for Count { type O = u64; type Command = Document; diff --git a/src/operation/count_documents/mod.rs b/src/operation/count_documents/mod.rs index 3853b944c..413059326 100644 --- a/src/operation/count_documents/mod.rs +++ b/src/operation/count_documents/mod.rs @@ -5,7 +5,7 @@ use std::convert::TryInto; use serde::Deserialize; -use super::{Operation, Retryability, SingleCursorResult}; +use super::{OperationWithDefaults, Retryability, SingleCursorResult}; use crate::{ cmap::{Command, RawCommandResponse, StreamDescription}, error::{Error, ErrorKind, Result}, @@ -75,7 +75,7 @@ impl CountDocuments { } } -impl Operation for CountDocuments { +impl OperationWithDefaults for CountDocuments { type O = u64; type Command = Document; diff --git a/src/operation/create/mod.rs b/src/operation/create/mod.rs index bda286099..d531e8ca1 100644 --- a/src/operation/create/mod.rs +++ b/src/operation/create/mod.rs @@ -7,7 +7,12 @@ use crate::{ bson::doc, cmap::{Command, RawCommandResponse, StreamDescription}, error::Result, - operation::{append_options, remove_empty_write_concern, Operation, WriteConcernOnlyBody}, + operation::{ + append_options, + remove_empty_write_concern, + OperationWithDefaults, + WriteConcernOnlyBody, + }, options::{CreateCollectionOptions, WriteConcern}, Namespace, }; @@ -35,7 +40,7 @@ impl Create { } } -impl Operation for Create { +impl OperationWithDefaults for Create { type O = (); type Command = Document; diff --git a/src/operation/create_indexes/mod.rs b/src/operation/create_indexes/mod.rs index a0815aff2..247e6cb40 100644 --- a/src/operation/create_indexes/mod.rs +++ b/src/operation/create_indexes/mod.rs @@ -6,7 +6,7 @@ use crate::{ cmap::{Command, RawCommandResponse, StreamDescription}, error::{ErrorKind, Result}, index::IndexModel, - operation::{append_options, remove_empty_write_concern, Operation}, + operation::{append_options, remove_empty_write_concern, OperationWithDefaults}, options::{CreateIndexOptions, WriteConcern}, results::CreateIndexesResult, Namespace, @@ -47,7 +47,7 @@ impl CreateIndexes { } } -impl Operation for CreateIndexes { +impl OperationWithDefaults for CreateIndexes { type O = CreateIndexesResult; type Command = Document; const NAME: &'static str = "createIndexes"; diff --git a/src/operation/delete/mod.rs b/src/operation/delete/mod.rs index 7af98f73e..7458011d4 100644 --- a/src/operation/delete/mod.rs +++ b/src/operation/delete/mod.rs @@ -10,7 +10,7 @@ use crate::{ operation::{ append_options, remove_empty_write_concern, - Operation, + OperationWithDefaults, Retryability, WriteResponseBody, }, @@ -59,7 +59,7 @@ impl Delete { } } -impl Operation for Delete { +impl OperationWithDefaults for Delete { type O = DeleteResult; type Command = Document; diff --git a/src/operation/distinct/mod.rs b/src/operation/distinct/mod.rs index a48d3594b..e5f18cdca 100644 --- a/src/operation/distinct/mod.rs +++ b/src/operation/distinct/mod.rs @@ -9,7 +9,7 @@ use crate::{ cmap::{Command, RawCommandResponse, StreamDescription}, coll::{options::DistinctOptions, Namespace}, error::Result, - operation::{append_options, Operation, Retryability}, + operation::{append_options, OperationWithDefaults, Retryability}, selection_criteria::SelectionCriteria, }; @@ -49,7 +49,7 @@ impl Distinct { } } -impl Operation for Distinct { +impl OperationWithDefaults for Distinct { type O = Vec; type Command = Document; diff --git a/src/operation/drop_collection/mod.rs b/src/operation/drop_collection/mod.rs index 6d79b985b..7d270ef4e 100644 --- a/src/operation/drop_collection/mod.rs +++ b/src/operation/drop_collection/mod.rs @@ -7,7 +7,12 @@ use crate::{ bson::doc, cmap::{Command, RawCommandResponse, StreamDescription}, error::{Error, Result}, - operation::{append_options, remove_empty_write_concern, Operation, WriteConcernOnlyBody}, + operation::{ + append_options, + remove_empty_write_concern, + OperationWithDefaults, + WriteConcernOnlyBody, + }, options::{DropCollectionOptions, WriteConcern}, Namespace, }; @@ -35,7 +40,7 @@ impl DropCollection { } } -impl Operation for DropCollection { +impl OperationWithDefaults for DropCollection { type O = (); type Command = Document; diff --git a/src/operation/drop_database/mod.rs b/src/operation/drop_database/mod.rs index d4cfafbcc..2cd2b8cf7 100644 --- a/src/operation/drop_database/mod.rs +++ b/src/operation/drop_database/mod.rs @@ -7,7 +7,12 @@ use crate::{ bson::doc, cmap::{Command, RawCommandResponse, StreamDescription}, error::Result, - operation::{append_options, remove_empty_write_concern, Operation, WriteConcernOnlyBody}, + operation::{ + append_options, + remove_empty_write_concern, + OperationWithDefaults, + WriteConcernOnlyBody, + }, options::{DropDatabaseOptions, WriteConcern}, }; @@ -28,7 +33,7 @@ impl DropDatabase { } } -impl Operation for DropDatabase { +impl OperationWithDefaults for DropDatabase { type O = (); type Command = Document; diff --git a/src/operation/drop_indexes/mod.rs b/src/operation/drop_indexes/mod.rs index 70c236607..dd7c9fbd5 100644 --- a/src/operation/drop_indexes/mod.rs +++ b/src/operation/drop_indexes/mod.rs @@ -5,7 +5,7 @@ use crate::{ bson::{doc, Document}, cmap::{Command, RawCommandResponse, StreamDescription}, error::Result, - operation::{append_options, remove_empty_write_concern, Operation}, + operation::{append_options, remove_empty_write_concern, OperationWithDefaults}, options::{DropIndexOptions, WriteConcern}, Namespace, }; @@ -34,7 +34,7 @@ impl DropIndexes { } } -impl Operation for DropIndexes { +impl OperationWithDefaults for DropIndexes { type O = (); type Command = Document; const NAME: &'static str = "dropIndexes"; diff --git a/src/operation/find/mod.rs b/src/operation/find/mod.rs index ecb124c2a..f4f50141f 100644 --- a/src/operation/find/mod.rs +++ b/src/operation/find/mod.rs @@ -6,7 +6,7 @@ use crate::{ cmap::{Command, RawCommandResponse, StreamDescription}, cursor::CursorSpecification, error::{ErrorKind, Result}, - operation::{append_options, CursorBody, Operation, Retryability}, + operation::{append_options, CursorBody, OperationWithDefaults, Retryability}, options::{CursorType, FindOptions, SelectionCriteria}, Namespace, }; @@ -44,7 +44,7 @@ impl Find { } } -impl Operation for Find { +impl OperationWithDefaults for Find { type O = CursorSpecification; type Command = Document; const NAME: &'static str = "find"; diff --git a/src/operation/find_and_modify/mod.rs b/src/operation/find_and_modify/mod.rs index 1e9e3101f..bdc4815e0 100644 --- a/src/operation/find_and_modify/mod.rs +++ b/src/operation/find_and_modify/mod.rs @@ -21,7 +21,7 @@ use crate::{ Namespace, }, error::{ErrorKind, Result}, - operation::{append_options, remove_empty_write_concern, Operation, Retryability}, + operation::{append_options, remove_empty_write_concern, OperationWithDefaults, Retryability}, options::WriteConcern, }; @@ -95,7 +95,7 @@ where } } -impl Operation for FindAndModify +impl OperationWithDefaults for FindAndModify where T: DeserializeOwned, { diff --git a/src/operation/get_more/mod.rs b/src/operation/get_more/mod.rs index 4a9e64a27..c12f5abe7 100644 --- a/src/operation/get_more/mod.rs +++ b/src/operation/get_more/mod.rs @@ -12,7 +12,7 @@ use crate::{ cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription}, cursor::CursorInformation, error::{ErrorKind, Result}, - operation::Operation, + operation::OperationWithDefaults, options::SelectionCriteria, results::GetMoreResult, Namespace, @@ -44,7 +44,7 @@ impl<'conn> GetMore<'conn> { } } -impl<'conn> Operation for GetMore<'conn> { +impl<'conn> OperationWithDefaults for GetMore<'conn> { type O = GetMoreResult; type Command = Document; diff --git a/src/operation/insert/mod.rs b/src/operation/insert/mod.rs index 0cf129b28..c7d36dc12 100644 --- a/src/operation/insert/mod.rs +++ b/src/operation/insert/mod.rs @@ -11,7 +11,12 @@ use crate::{ bson_util, cmap::{Command, RawCommandResponse, StreamDescription}, error::{BulkWriteFailure, Error, ErrorKind, Result}, - operation::{remove_empty_write_concern, Operation, Retryability, WriteResponseBody}, + operation::{ + remove_empty_write_concern, + OperationWithDefaults, + Retryability, + WriteResponseBody, + }, options::{InsertManyOptions, WriteConcern}, results::InsertManyResult, Namespace, @@ -25,6 +30,7 @@ pub(crate) struct Insert<'a, T> { documents: Vec<&'a T>, inserted_ids: Vec, options: Option, + encrypted: bool, } impl<'a, T> Insert<'a, T> { @@ -32,12 +38,22 @@ impl<'a, T> Insert<'a, T> { ns: Namespace, documents: Vec<&'a T>, options: Option, + ) -> Self { + Self::new_encrypted(ns, documents, options, false) + } + + pub(crate) fn new_encrypted( + ns: Namespace, + documents: Vec<&'a T>, + options: Option, + encrypted: bool, ) -> Self { Self { ns, options, documents, inserted_ids: vec![], + encrypted, } } @@ -49,7 +65,7 @@ impl<'a, T> Insert<'a, T> { } } -impl<'a, T: Serialize> Operation for Insert<'a, T> { +impl<'a, T: Serialize> OperationWithDefaults for Insert<'a, T> { type O = InsertManyResult; type Command = InsertCommand; @@ -58,6 +74,11 @@ impl<'a, T: Serialize> Operation for Insert<'a, T> { fn build(&mut self, description: &StreamDescription) -> Result> { let mut docs = RawArrayBuf::new(); let mut size = 0; + let batch_size_limit = if self.encrypted { + 2_097_152 + } else { + description.max_bson_object_size as u64 + }; for (i, d) in self .documents @@ -92,7 +113,7 @@ impl<'a, T: Serialize> Operation for Insert<'a, T> { let doc_size = bson_util::array_entry_size_bytes(i, doc.as_bytes().len()); - if (size + doc_size) <= description.max_bson_object_size as u64 { + if (size + doc_size) <= batch_size_limit { if self.inserted_ids.len() <= i { self.inserted_ids.push(id); } diff --git a/src/operation/list_collections/mod.rs b/src/operation/list_collections/mod.rs index c49c78a68..951c165f8 100644 --- a/src/operation/list_collections/mod.rs +++ b/src/operation/list_collections/mod.rs @@ -6,7 +6,7 @@ use crate::{ cmap::{Command, RawCommandResponse, StreamDescription}, cursor::CursorSpecification, error::Result, - operation::{append_options, CursorBody, Operation, Retryability}, + operation::{append_options, CursorBody, OperationWithDefaults, Retryability}, options::{ListCollectionsOptions, ReadPreference, SelectionCriteria}, }; @@ -39,7 +39,7 @@ impl ListCollections { } } -impl Operation for ListCollections { +impl OperationWithDefaults for ListCollections { type O = CursorSpecification; type Command = Document; diff --git a/src/operation/list_databases/mod.rs b/src/operation/list_databases/mod.rs index c202ef4f0..8360fd78f 100644 --- a/src/operation/list_databases/mod.rs +++ b/src/operation/list_databases/mod.rs @@ -8,7 +8,7 @@ use crate::{ bson::{doc, Document}, cmap::{Command, RawCommandResponse, StreamDescription}, error::Result, - operation::{append_options, Operation, Retryability}, + operation::{append_options, OperationWithDefaults, Retryability}, options::ListDatabasesOptions, selection_criteria::{ReadPreference, SelectionCriteria}, }; @@ -43,7 +43,7 @@ impl ListDatabases { } } -impl Operation for ListDatabases { +impl OperationWithDefaults for ListDatabases { type O = Vec; type Command = Document; diff --git a/src/operation/list_indexes/mod.rs b/src/operation/list_indexes/mod.rs index 9189b62af..ff9130017 100644 --- a/src/operation/list_indexes/mod.rs +++ b/src/operation/list_indexes/mod.rs @@ -3,7 +3,7 @@ use crate::{ cmap::{Command, RawCommandResponse, StreamDescription}, cursor::CursorSpecification, error::Result, - operation::{append_options, Operation}, + operation::{append_options, OperationWithDefaults}, options::ListIndexesOptions, selection_criteria::{ReadPreference, SelectionCriteria}, Namespace, @@ -36,7 +36,7 @@ impl ListIndexes { } } -impl Operation for ListIndexes { +impl OperationWithDefaults for ListIndexes { type O = CursorSpecification; type Command = Document; diff --git a/src/operation/mod.rs b/src/operation/mod.rs index e59ce4bad..414f71370 100644 --- a/src/operation/mod.rs +++ b/src/operation/mod.rs @@ -17,6 +17,7 @@ mod insert; mod list_collections; mod list_databases; mod list_indexes; +mod raw_output; mod run_command; mod update; @@ -67,12 +68,17 @@ pub(crate) use insert::Insert; pub(crate) use list_collections::ListCollections; pub(crate) use list_databases::ListDatabases; pub(crate) use list_indexes::ListIndexes; +#[cfg(feature = "csfle")] +pub(crate) use raw_output::RawOutput; pub(crate) use run_command::RunCommand; pub(crate) use update::Update; const SERVER_4_2_0_WIRE_VERSION: i32 = 8; /// A trait modeling the behavior of a server side operation. +/// +/// No methods in this trait should have default behaviors to ensure that wrapper operations +/// replicate all behavior. Default behavior is provided by the `OperationDefault` trait. pub(crate) trait Operation { /// The output type of this operation. type O; @@ -89,15 +95,11 @@ pub(crate) trait Operation { /// Perform custom serialization of the built command. /// By default, this will just call through to the `Serialize` implementation of the command. - fn serialize_command(&mut self, cmd: Command) -> Result> { - Ok(bson::to_vec(&cmd)?) - } + fn serialize_command(&mut self, cmd: Command) -> Result>; /// Parse the response for the atClusterTime field. /// Depending on the operation, this may be found in different locations. - fn extract_at_cluster_time(&self, _response: &RawDocument) -> Result> { - Ok(None) - } + fn extract_at_cluster_time(&self, _response: &RawDocument) -> Result>; /// Interprets the server response to the command. fn handle_response( @@ -108,52 +110,32 @@ pub(crate) trait Operation { /// Interpret an error encountered while sending the built command to the server, potentially /// recovering. - fn handle_error(&self, error: Error) -> Result { - Err(error) - } + fn handle_error(&self, error: Error) -> Result; /// Criteria to use for selecting the server that this operation will be executed on. - fn selection_criteria(&self) -> Option<&SelectionCriteria> { - None - } + fn selection_criteria(&self) -> Option<&SelectionCriteria>; /// Whether or not this operation will request acknowledgment from the server. - fn is_acknowledged(&self) -> bool { - self.write_concern() - .map(WriteConcern::is_acknowledged) - .unwrap_or(true) - } + fn is_acknowledged(&self) -> bool; /// The write concern to use for this operation, if any. - fn write_concern(&self) -> Option<&WriteConcern> { - None - } + fn write_concern(&self) -> Option<&WriteConcern>; /// Returns whether or not this command supports the `readConcern` field. - fn supports_read_concern(&self, _description: &StreamDescription) -> bool { - false - } + fn supports_read_concern(&self, _description: &StreamDescription) -> bool; /// Whether this operation supports sessions or not. - fn supports_sessions(&self) -> bool { - true - } + fn supports_sessions(&self) -> bool; /// The level of retryability the operation supports. - fn retryability(&self) -> Retryability { - Retryability::None - } + fn retryability(&self) -> Retryability; /// Updates this operation as needed for a retry. - fn update_for_retry(&mut self) {} + fn update_for_retry(&mut self); - fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { - None - } + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle>; - fn name(&self) -> &str { - Self::NAME - } + fn name(&self) -> &str; } pub(crate) trait CommandBody: Serialize { @@ -173,6 +155,17 @@ impl CommandBody for Document { } } +impl CommandBody for RawDocumentBuf { + fn should_redact(&self) -> bool { + if let Some(Ok((command_name, _))) = self.into_iter().next() { + HELLO_COMMAND_NAMES.contains(command_name.to_lowercase().as_str()) + && self.get("speculativeAuthenticate").ok().flatten().is_some() + } else { + false + } + } +} + impl Command { pub(crate) fn should_redact(&self) -> bool { let name = self.name.to_lowercase(); @@ -398,3 +391,140 @@ macro_rules! remove_empty_write_concern { } pub(crate) use remove_empty_write_concern; + +// A mirror of the `Operation` trait, with default behavior where appropriate. Should only be +// implemented by operation types that do not delegate to other operations. +pub(crate) trait OperationWithDefaults { + /// The output type of this operation. + type O; + + /// The format of the command body constructed in `build`. + type Command: CommandBody; + + /// The name of the server side command associated with this operation. + const NAME: &'static str; + + /// Returns the command that should be sent to the server as part of this operation. + /// The operation may store some additional state that is required for handling the response. + fn build(&mut self, description: &StreamDescription) -> Result>; + + /// Perform custom serialization of the built command. + /// By default, this will just call through to the `Serialize` implementation of the command. + fn serialize_command(&mut self, cmd: Command) -> Result> { + Ok(bson::to_vec(&cmd)?) + } + + /// Parse the response for the atClusterTime field. + /// Depending on the operation, this may be found in different locations. + fn extract_at_cluster_time(&self, _response: &RawDocument) -> Result> { + Ok(None) + } + + /// Interprets the server response to the command. + fn handle_response( + &self, + response: RawCommandResponse, + description: &StreamDescription, + ) -> Result; + + /// Interpret an error encountered while sending the built command to the server, potentially + /// recovering. + fn handle_error(&self, error: Error) -> Result { + Err(error) + } + + /// Criteria to use for selecting the server that this operation will be executed on. + fn selection_criteria(&self) -> Option<&SelectionCriteria> { + None + } + + /// Whether or not this operation will request acknowledgment from the server. + fn is_acknowledged(&self) -> bool { + self.write_concern() + .map(WriteConcern::is_acknowledged) + .unwrap_or(true) + } + + /// The write concern to use for this operation, if any. + fn write_concern(&self) -> Option<&WriteConcern> { + None + } + + /// Returns whether or not this command supports the `readConcern` field. + fn supports_read_concern(&self, _description: &StreamDescription) -> bool { + false + } + + /// Whether this operation supports sessions or not. + fn supports_sessions(&self) -> bool { + true + } + + /// The level of retryability the operation supports. + fn retryability(&self) -> Retryability { + Retryability::None + } + + /// Updates this operation as needed for a retry. + fn update_for_retry(&mut self) {} + + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { + None + } + + fn name(&self) -> &str { + Self::NAME + } +} + +impl Operation for T { + type O = T::O; + type Command = T::Command; + const NAME: &'static str = T::NAME; + fn build(&mut self, description: &StreamDescription) -> Result> { + self.build(description) + } + fn serialize_command(&mut self, cmd: Command) -> Result> { + self.serialize_command(cmd) + } + fn extract_at_cluster_time(&self, response: &RawDocument) -> Result> { + self.extract_at_cluster_time(response) + } + fn handle_response( + &self, + response: RawCommandResponse, + description: &StreamDescription, + ) -> Result { + self.handle_response(response, description) + } + fn handle_error(&self, error: Error) -> Result { + self.handle_error(error) + } + fn selection_criteria(&self) -> Option<&SelectionCriteria> { + self.selection_criteria() + } + fn is_acknowledged(&self) -> bool { + self.is_acknowledged() + } + fn write_concern(&self) -> Option<&WriteConcern> { + self.write_concern() + } + fn supports_read_concern(&self, description: &StreamDescription) -> bool { + self.supports_read_concern(description) + } + fn supports_sessions(&self) -> bool { + self.supports_sessions() + } + fn retryability(&self) -> Retryability { + self.retryability() + } + fn update_for_retry(&mut self) { + self.update_for_retry() + } + fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { + self.pinned_connection() + } + fn name(&self) -> &str { + self.name() + } +} diff --git a/src/operation/raw_output.rs b/src/operation/raw_output.rs new file mode 100644 index 000000000..d7b2b9534 --- /dev/null +++ b/src/operation/raw_output.rs @@ -0,0 +1,79 @@ +use crate::{ + cmap::{Command, RawCommandResponse, StreamDescription}, + error::Result, +}; + +use super::Operation; + +/// Forwards all implementation to the wrapped `Operation`, but returns the response unparsed and +/// unvalidated as a `RawCommandResponse`. +pub(crate) struct RawOutput(pub(crate) Op); + +impl Operation for RawOutput { + type O = RawCommandResponse; + type Command = Op::Command; + const NAME: &'static str = Op::NAME; + + fn build(&mut self, description: &StreamDescription) -> Result> { + self.0.build(description) + } + + fn serialize_command(&mut self, cmd: Command) -> Result> { + self.0.serialize_command(cmd) + } + + fn extract_at_cluster_time( + &self, + response: &bson::RawDocument, + ) -> Result> { + self.0.extract_at_cluster_time(response) + } + + fn handle_response( + &self, + response: RawCommandResponse, + _description: &StreamDescription, + ) -> Result { + Ok(response) + } + + fn handle_error(&self, error: crate::error::Error) -> Result { + Err(error) + } + + fn selection_criteria(&self) -> Option<&crate::selection_criteria::SelectionCriteria> { + self.0.selection_criteria() + } + + fn is_acknowledged(&self) -> bool { + self.0.is_acknowledged() + } + + fn write_concern(&self) -> Option<&crate::options::WriteConcern> { + self.0.write_concern() + } + + fn supports_read_concern(&self, description: &StreamDescription) -> bool { + self.0.supports_read_concern(description) + } + + fn supports_sessions(&self) -> bool { + self.0.supports_sessions() + } + + fn retryability(&self) -> super::Retryability { + self.0.retryability() + } + + fn update_for_retry(&mut self) { + self.0.update_for_retry() + } + + fn pinned_connection(&self) -> Option<&crate::cmap::conn::PinnedConnectionHandle> { + self.0.pinned_connection() + } + + fn name(&self) -> &str { + self.0.name() + } +} diff --git a/src/operation/run_command/mod.rs b/src/operation/run_command/mod.rs index ff862e1a0..c8b784a85 100644 --- a/src/operation/run_command/mod.rs +++ b/src/operation/run_command/mod.rs @@ -3,9 +3,9 @@ mod test; use std::convert::TryInto; -use bson::RawBsonRef; +use bson::{RawBsonRef, RawDocumentBuf}; -use super::{CursorBody, Operation}; +use super::{CursorBody, OperationWithDefaults}; use crate::{ bson::Document, client::SESSIONS_UNSUPPORTED_COMMANDS, @@ -18,7 +18,7 @@ use crate::{ #[derive(Debug)] pub(crate) struct RunCommand<'conn> { db: String, - command: Document, + command: RawDocumentBuf, selection_criteria: Option, write_concern: Option, pinned_connection: Option<&'conn PinnedConnectionHandle>, @@ -36,6 +36,28 @@ impl<'conn> RunCommand<'conn> { .map(|doc| bson::from_bson::(doc.clone())) .transpose()?; + Ok(Self { + db, + command: RawDocumentBuf::from_document(&command)?, + selection_criteria, + write_concern, + pinned_connection, + }) + } + + #[cfg(feature = "csfle")] + pub(crate) fn new_raw( + db: String, + command: RawDocumentBuf, + selection_criteria: Option, + pinned_connection: Option<&'conn PinnedConnectionHandle>, + ) -> Result { + let write_concern = command + .get("writeConcern")? + .and_then(|b| b.as_document()) + .map(|doc| bson::from_slice::(doc.as_bytes())) + .transpose()?; + Ok(Self { db, command, @@ -46,19 +68,23 @@ impl<'conn> RunCommand<'conn> { } fn command_name(&self) -> Option<&str> { - self.command.keys().next().map(String::as_str) + self.command + .into_iter() + .next() + .and_then(|r| r.ok()) + .map(|(k, _)| k) } } -impl<'conn> Operation for RunCommand<'conn> { +impl<'conn> OperationWithDefaults for RunCommand<'conn> { type O = Document; - type Command = Document; + type Command = RawDocumentBuf; // Since we can't actually specify a string statically here, we just put a descriptive string // that should fail loudly if accidentally passed to the server. const NAME: &'static str = "$genericRunCommand"; - fn build(&mut self, _description: &StreamDescription) -> Result { + fn build(&mut self, _description: &StreamDescription) -> Result> { let command_name = self .command_name() .ok_or_else(|| ErrorKind::InvalidArgument { diff --git a/src/operation/run_command/test.rs b/src/operation/run_command/test.rs index 901aad90d..467ccf5e1 100644 --- a/src/operation/run_command/test.rs +++ b/src/operation/run_command/test.rs @@ -1,29 +1,7 @@ use bson::Timestamp; use super::RunCommand; -use crate::{ - bson::doc, - cmap::StreamDescription, - operation::{test::handle_response_test, Operation}, -}; - -#[test] -fn build() { - let mut op = RunCommand::new("foo".into(), doc! { "hello": 1 }, None, None).unwrap(); - assert!(op.selection_criteria().is_none()); - - let command = op.build(&StreamDescription::new_testing()).unwrap(); - - assert_eq!(command.name, "hello"); - assert_eq!(command.target_db, "foo"); - assert_eq!( - command - .body - .get("hello") - .and_then(crate::bson_util::get_int), - Some(1) - ); -} +use crate::{bson::doc, operation::test::handle_response_test}; #[test] fn handle_success() { diff --git a/src/operation/update/mod.rs b/src/operation/update/mod.rs index a1b4fcda9..782a5c5d0 100644 --- a/src/operation/update/mod.rs +++ b/src/operation/update/mod.rs @@ -8,7 +8,7 @@ use crate::{ bson_util, cmap::{Command, RawCommandResponse, StreamDescription}, error::{convert_bulk_errors, Result}, - operation::{Operation, Retryability, WriteResponseBody}, + operation::{OperationWithDefaults, Retryability, WriteResponseBody}, options::{UpdateModifications, UpdateOptions, WriteConcern}, results::UpdateResult, Namespace, @@ -55,7 +55,7 @@ impl Update { } } -impl Operation for Update { +impl OperationWithDefaults for Update { type O = UpdateResult; type Command = Document;