Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-1384 Implement auto encryption #717

Merged
merged 35 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9d05d23
api outline
abr-egn Jul 25, 2022
5bd3324
wip
abr-egn Jul 25, 2022
56c4160
shift operation defaults to mirror trait
abr-egn Jul 25, 2022
1349cff
needmongocollinfo
abr-egn Jul 25, 2022
d1ccd79
wip
abr-egn Jul 26, 2022
1709fd3
raw runcommand
abr-egn Jul 26, 2022
c7d02d9
local boxed run_mongocrypt_ctx
abr-egn Jul 27, 2022
99f2c18
dedicated ctx thread wip
abr-egn Jul 27, 2022
21f0189
dedicated ctx thread scaffolding
abr-egn Jul 27, 2022
e118f52
cleanup
abr-egn Jul 27, 2022
36c75db
catchup
abr-egn Jul 27, 2022
f586c44
tidy error handling
abr-egn Jul 27, 2022
7244004
needmongokeys
abr-egn Jul 28, 2022
2646bed
start to rejigger to new ctx
abr-egn Jul 29, 2022
edeaab4
progress
abr-egn Jul 29, 2022
04a2fac
select runtimes for ctx spawning
abr-egn Aug 1, 2022
d0f974b
needkms
abr-egn Aug 2, 2022
ed668cc
fill back in previous states
abr-egn Aug 3, 2022
9c98ecb
placeholder for needkmscredentials
abr-egn Aug 3, 2022
8a0cd31
auto decrypt
abr-egn Aug 3, 2022
54362a6
rustfmt
abr-egn Aug 3, 2022
edf0924
put crypto finalization on a thread pool
abr-egn Aug 4, 2022
bf78b11
format and clippy
abr-egn Aug 4, 2022
15006f1
fix comment
abr-egn Aug 4, 2022
e1b9ad8
point back to main mongocrypt repo
abr-egn Aug 4, 2022
293cfb9
move thread pool to client state
abr-egn Aug 8, 2022
d7acff9
review updates
abr-egn Aug 10, 2022
6bbd8c1
fix tests and include csfle in feature testing
abr-egn Aug 11, 2022
859967a
actually use tls_options
abr-egn Aug 12, 2022
0dedc5b
fix mongocrypt dep
abr-egn Aug 12, 2022
8169c6a
use advance/current
abr-egn Aug 15, 2022
40cffb9
comment rawoutput
abr-egn Aug 15, 2022
d270393
review updates
abr-egn Aug 17, 2022
614bd78
fix test imports
abr-egn Aug 17, 2022
0b007b5
fix rustfmt
abr-egn Aug 17, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .evergreen/feature-combinations.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@
export FEATURE_COMBINATIONS=(
'' # default features
'--no-default-features --features async-std-runtime,sync' # features that conflict w/ default features
'--features tokio-sync,zstd-compression,snappy-compression,zlib-compression,openssl-tls,aws-auth' # additive features
'--features tokio-sync,zstd-compression,snappy-compression,zlib-compression,openssl-tls,aws-auth,csfle' # additive features
)
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ zlib-compression = ["flate2"]
snappy-compression = ["snap"]

# DO NOT USE; see https://jira.mongodb.org/browse/RUST-569 for the status of CSFLE support in the Rust driver.
csfle = ["mongocrypt", "which"]
csfle = ["mongocrypt", "which", "rayon"]

[dependencies]
async-trait = "0.1.42"
Expand All @@ -89,6 +89,7 @@ openssl-probe = { version = "0.1.5", optional = true }
os_info = { version = "3.0.1", default-features = false }
percent-encoding = "2.0.0"
rand = { version = "0.8.3", features = ["small_rng"] }
rayon = { version = "1.5.3", optional = true }
rustc_version_runtime = "0.2.1"
rustls-pemfile = "0.3.0"
serde_with = "1.3.1"
Expand Down
19 changes: 12 additions & 7 deletions src/client/csfle.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod options;
mod state_machine;

use std::{
path::Path,
Expand All @@ -7,6 +8,7 @@ use std::{

use derivative::Derivative;
use mongocrypt::Crypt;
use rayon::ThreadPool;

use crate::{
error::{Error, Result},
Expand All @@ -30,34 +32,37 @@ use super::WeakClient;
#[derivative(Debug)]
pub(super) struct ClientState {
#[derivative(Debug = "ignore")]
#[allow(dead_code)]
crypt: Crypt,
pub(crate) crypt: Crypt,
mongocryptd_client: Option<Client>,
aux_clients: AuxClients,
opts: AutoEncryptionOptions,
crypto_threads: ThreadPool,
}

#[derive(Debug)]
struct AuxClients {
#[allow(dead_code)]
key_vault_client: WeakClient,
#[allow(dead_code)]
metadata_client: Option<WeakClient>,
#[allow(dead_code)]
internal_client: Option<Client>,
_internal_client: Option<Client>,
}

impl ClientState {
pub(super) async fn new(client: &Client, opts: AutoEncryptionOptions) -> Result<Self> {
let crypt = Self::make_crypt(&opts)?;
let mongocryptd_client = Self::spawn_mongocryptd_if_needed(&opts, &crypt).await?;
let aux_clients = Self::make_aux_clients(client, &opts)?;
let num_cpus = std::thread::available_parallelism()?.get();
let crypto_threads = rayon::ThreadPoolBuilder::new()
.num_threads(num_cpus)
.build()
.map_err(|e| Error::internal(format!("could not initialize thread pool: {}", e)))?;

Ok(Self {
crypt,
mongocryptd_client,
aux_clients,
opts,
crypto_threads,
})
}

Expand Down Expand Up @@ -171,7 +176,7 @@ impl ClientState {
Ok(AuxClients {
key_vault_client,
metadata_client,
internal_client,
_internal_client: internal_client,
})
}
}
Expand Down
171 changes: 171 additions & 0 deletions src/client/csfle/state_machine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use std::convert::TryInto;

use bson::{Document, RawDocument, RawDocumentBuf};
use futures_util::{stream, TryStreamExt};
use mongocrypt::ctx::{Ctx, State};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
sync::oneshot,
};

use crate::{
client::options::ServerAddress,
cmap::options::StreamOptions,
error::{Error, Result},
operation::{RawOutput, RunCommand},
runtime::AsyncStream,
Client,
};

impl Client {
pub(crate) async fn run_mongocrypt_ctx(
&self,
ctx: Ctx,
db: Option<&str>,
) -> Result<RawDocumentBuf> {
let guard = self.inner.csfle.read().await;
let csfle = match guard.as_ref() {
Some(csfle) => csfle,
None => return Err(Error::internal("no csfle state for mongocrypt ctx")),
};
let mut result = None;
// This needs to be a `Result` so that the `Ctx` can be temporarily owned by the processing
// thread for crypto finalization. An `Option` would also work here, but `Result` means we
// can return a helpful error if things get into a broken state rather than panicing.
let mut ctx = Ok(ctx);
loop {
let state = result_ref(&ctx)?.state()?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result_ref and result_mut are needed because as_ref/as_mut on Results also applies to the error type, which causes an error when using ? because the types don't match the return type.

match state {
State::NeedMongoCollinfo => {
let ctx = result_mut(&mut ctx)?;
let filter = raw_to_doc(ctx.mongo_op()?)?;
let metadata_client = csfle
.aux_clients
.metadata_client
.as_ref()
.and_then(|w| w.upgrade())
.ok_or_else(|| {
Error::internal("metadata_client required for NeedMongoCollinfo state")
})?;
let db = metadata_client.database(db.as_ref().ok_or_else(|| {
Error::internal("db required for NeedMongoCollinfo state")
})?);
let mut cursor = db.list_collections(filter, None).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this operation is actually supposed to use the metadata client to avoid potential deadlocks, as described in this section of the spec. I see the integration guide mentions using the encrypted MongoClient, but I think we maybe forgot to update that back when SPEC-1768 was done (if so, it would be good to make a PR to update it for future readers).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed. I'll send a PR for the integration guide.

if cursor.advance().await? {
ctx.mongo_feed(cursor.current())?;
}
ctx.mongo_done()?;
}
State::NeedMongoMarkings => {
let ctx = result_mut(&mut ctx)?;
let command = ctx.mongo_op()?.to_raw_document_buf();
let db = db.as_ref().ok_or_else(|| {
Error::internal("db required for NeedMongoMarkings state")
})?;
let op = RawOutput(RunCommand::new_raw(db.to_string(), command, None, None)?);
let mongocryptd_client = csfle
.mongocryptd_client
.as_ref()
.ok_or_else(|| Error::internal("mongocryptd client not found"))?;
let response = mongocryptd_client.execute_operation(op, None).await?;
ctx.mongo_feed(response.raw_body())?;
ctx.mongo_done()?;
}
State::NeedMongoKeys => {
let ctx = result_mut(&mut ctx)?;
let filter = raw_to_doc(ctx.mongo_op()?)?;
let kv_ns = &csfle.opts.key_vault_namespace;
let kv_client = csfle
.aux_clients
.key_vault_client
.upgrade()
.ok_or_else(|| Error::internal("key vault client dropped"))?;
let kv_coll = kv_client
.database(&kv_ns.db)
.collection::<RawDocumentBuf>(&kv_ns.coll);
let mut cursor = kv_coll.find(filter, None).await?;
while cursor.advance().await? {
ctx.mongo_feed(cursor.current())?;
}
ctx.mongo_done()?;
}
State::NeedKms => {
let ctx = result_mut(&mut ctx)?;
let scope = ctx.kms_scope();
let mut kms_ctxen: Vec<Result<_>> = vec![];
while let Some(kms_ctx) = scope.next_kms_ctx() {
kms_ctxen.push(Ok(kms_ctx));
}
stream::iter(kms_ctxen)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just using spawn for concurrency here wouldn't work because that requires a 'static bound; it's a little odd that the non-static equivalent is only found when operating on streams, but 🤷

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the stream versoin works basically the same as FuturesUnordered: the current thread polls all the futures, rather than having potentially many threads doing it in parallel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, there it is. Yup, it's polling rather than parallel, which seems fine for this because these futures are going to be IO-bound anyway.

I'm going to stick with using the stream method rather than FuturesUnordered directly because the semantics of try_for_each_concurrent are very handy here.

.try_for_each_concurrent(None, |mut kms_ctx| async move {
let endpoint = kms_ctx.endpoint()?;
let addr = ServerAddress::parse(endpoint)?;
let provider = kms_ctx.kms_provider()?;
let tls_options = csfle
.opts()
.tls_options
.as_ref()
.and_then(|tls| tls.get(&provider))
.cloned()
.unwrap_or_default();
let mut stream = AsyncStream::connect(
StreamOptions::builder()
.address(addr)
.tls_options(tls_options)
.build(),
)
.await?;
stream.write_all(kms_ctx.message()?).await?;
let mut buf = vec![];
while kms_ctx.bytes_needed() > 0 {
let buf_size = kms_ctx.bytes_needed().try_into().map_err(|e| {
Error::internal(format!("buffer size overflow: {}", e))
})?;
buf.resize(buf_size, 0);
let count = stream.read(&mut buf).await?;
kms_ctx.feed(&buf[0..count])?;
}
Ok(())
})
.await?;
}
State::NeedKmsCredentials => todo!("RUST-1314"),
State::Ready => {
let (tx, rx) = oneshot::channel();
let mut thread_ctx = std::mem::replace(
&mut ctx,
Err(Error::internal("crypto context not present")),
)?;
csfle.crypto_threads.spawn(move || {
let result = thread_ctx.finalize().map(|doc| doc.to_owned());
let _ = tx.send((thread_ctx, result));
});
let (ctx_again, output) = rx
.await
.map_err(|_| Error::internal("crypto thread dropped"))?;
ctx = Ok(ctx_again);
result = Some(output?);
}
State::Done => break,
s => return Err(Error::internal(format!("unhandled state {:?}", s))),
}
}
match result {
Some(doc) => Ok(doc),
None => Err(Error::internal("libmongocrypt terminated without output")),
}
}
}

fn result_ref<T>(r: &Result<T>) -> Result<&T> {
r.as_ref().map_err(Error::clone)
}

fn result_mut<T>(r: &mut Result<T>) -> Result<&mut T> {
r.as_mut().map_err(|e| e.clone())
}

fn raw_to_doc(raw: &RawDocument) -> Result<Document> {
raw.try_into()
.map_err(|e| Error::internal(format!("could not parse raw document: {}", e)))
}
62 changes: 62 additions & 0 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
#[cfg(feature = "csfle")]
use bson::RawDocumentBuf;
use bson::{doc, RawBsonRef, RawDocument, Timestamp};
#[cfg(feature = "csfle")]
use futures_core::future::BoxFuture;
use lazy_static::lazy_static;
use serde::de::DeserializeOwned;

Expand Down Expand Up @@ -598,6 +602,21 @@ impl Client {
let target_db = cmd.target_db.clone();

let serialized = op.serialize_command(cmd)?;
#[cfg(feature = "csfle")]
let serialized = {
let guard = self.inner.csfle.read().await;
if let Some(ref csfle) = *guard {
if csfle.opts().bypass_auto_encryption != Some(true) {
self.auto_encrypt(csfle, RawDocument::from_bytes(&serialized)?, &target_db)
.await?
.into_bytes()
} else {
serialized
}
} else {
serialized
}
};
let raw_cmd = RawCommand {
name: cmd_name.clone(),
target_db,
Expand Down Expand Up @@ -750,6 +769,21 @@ impl Client {
handler.handle_command_succeeded_event(command_succeeded_event);
});

#[cfg(feature = "csfle")]
let response = {
let guard = self.inner.csfle.read().await;
if let Some(ref csfle) = *guard {
if csfle.opts().bypass_auto_encryption != Some(true) {
let new_body = self.auto_decrypt(csfle, response.raw_body()).await?;
RawCommandResponse::new_raw(response.source, new_body)
} else {
response
}
} else {
response
}
};

match op.handle_response(response, connection.stream_description()?) {
Ok(response) => Ok(response),
Err(mut err) => {
Expand All @@ -765,6 +799,34 @@ impl Client {
}
}

#[cfg(feature = "csfle")]
fn auto_encrypt<'a>(
&'a self,
csfle: &'a super::csfle::ClientState,
command: &'a RawDocument,
target_db: &'a str,
) -> BoxFuture<'a, Result<RawDocumentBuf>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be boxed because it's technically recursive (run_mongocrypt_ctx can execute operations).

Box::pin(async move {
let ctx = csfle
.crypt
.ctx_builder()
.build_encrypt(target_db, command)?;
self.run_mongocrypt_ctx(ctx, Some(target_db)).await
})
}

#[cfg(feature = "csfle")]
fn auto_decrypt<'a>(
&'a self,
csfle: &'a super::csfle::ClientState,
response: &'a RawDocument,
) -> BoxFuture<'a, Result<RawDocumentBuf>> {
Box::pin(async move {
let ctx = csfle.crypt.ctx_builder().build_decrypt(response)?;
self.run_mongocrypt_ctx(ctx, None).await
})
}

/// Start an implicit session if the operation and write concern are compatible with sessions.
async fn start_implicit_session<T: Operation>(&self, op: &T) -> Result<Option<ClientSession>> {
match self.get_session_support_status().await? {
Expand Down
9 changes: 5 additions & 4 deletions src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,11 @@ impl RawCommandResponse {

pub(crate) fn new(source: ServerAddress, message: Message) -> Result<Self> {
let raw = message.single_document_response()?;
Ok(Self {
source,
raw: RawDocumentBuf::from_bytes(raw)?,
})
Ok(Self::new_raw(source, RawDocumentBuf::from_bytes(raw)?))
}

pub(crate) fn new_raw(source: ServerAddress, raw: RawDocumentBuf) -> Self {
Self { source, raw }
}

pub(crate) fn body<'a, T: Deserialize<'a>>(&'a self) -> Result<T> {
Expand Down
6 changes: 5 additions & 1 deletion src/coll/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,10 @@ where
}

let ordered = options.as_ref().and_then(|o| o.ordered).unwrap_or(true);
#[cfg(feature = "csfle")]
let encrypted = self.client().auto_encryption_opts().await.is_some();
#[cfg(not(feature = "csfle"))]
let encrypted = false;

let mut cumulative_failure: Option<BulkWriteFailure> = None;
let mut error_labels: HashSet<String> = Default::default();
Expand All @@ -1198,7 +1202,7 @@ where

while n_attempted < ds.len() {
let docs: Vec<&T> = ds.iter().skip(n_attempted).map(Borrow::borrow).collect();
let insert = Insert::new(self.namespace(), docs, options.clone());
let insert = Insert::new_encrypted(self.namespace(), docs, options.clone(), encrypted);

match self
.client()
Expand Down
Loading