-
Notifications
You must be signed in to change notification settings - Fork 173
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RUST-1384 Implement auto encryption #717
Changes from 32 commits
9d05d23
5bd3324
56c4160
1349cff
d1ccd79
1709fd3
c7d02d9
99f2c18
21f0189
e118f52
36c75db
f586c44
7244004
2646bed
edeaab4
04a2fac
d0f974b
ed668cc
9c98ecb
8a0cd31
54362a6
edf0924
bf78b11
15006f1
e1b9ad8
293cfb9
d7acff9
6bbd8c1
859967a
0dedc5b
8169c6a
40cffb9
d270393
614bd78
0b007b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
use std::convert::TryInto; | ||
|
||
use bson::{Document, RawDocument, RawDocumentBuf}; | ||
use futures_util::{stream, TryStreamExt}; | ||
use mongocrypt::ctx::{Ctx, State}; | ||
use tokio::{ | ||
io::{AsyncReadExt, AsyncWriteExt}, | ||
sync::oneshot, | ||
}; | ||
|
||
use crate::{ | ||
client::options::ServerAddress, | ||
cmap::options::StreamOptions, | ||
error::{Error, Result}, | ||
operation::{RawOutput, RunCommand}, | ||
runtime::AsyncStream, | ||
Client, | ||
}; | ||
|
||
impl Client { | ||
pub(crate) async fn run_mongocrypt_ctx( | ||
&self, | ||
ctx: Ctx, | ||
db: Option<&str>, | ||
) -> Result<RawDocumentBuf> { | ||
let guard = self.inner.csfle.read().await; | ||
let csfle = match guard.as_ref() { | ||
Some(csfle) => csfle, | ||
None => return Err(Error::internal("no csfle state for mongocrypt ctx")), | ||
}; | ||
let mut result = None; | ||
// This needs to be a `Result` so that the `Ctx` can be temporarily owned by the processing | ||
// thread for crypto finalization. An `Option` would also work here, but `Result` means we | ||
// can return a helpful error if things get into a broken state rather than panicing. | ||
let mut ctx = Ok(ctx); | ||
loop { | ||
let state = result_ref(&ctx)?.state()?; | ||
match state { | ||
State::NeedMongoCollinfo => { | ||
let ctx = result_mut(&mut ctx)?; | ||
let filter = raw_to_doc(ctx.mongo_op()?)?; | ||
let metadata_client = csfle | ||
.aux_clients | ||
.metadata_client | ||
.as_ref() | ||
.and_then(|w| w.upgrade()) | ||
patrickfreed marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.ok_or_else(|| { | ||
Error::internal("metadata_client required for NeedMongoCollinfo state") | ||
})?; | ||
let db = metadata_client.database(db.as_ref().ok_or_else(|| { | ||
Error::internal("db required for NeedMongoCollinfo state") | ||
})?); | ||
let mut cursor = db.list_collections(filter, None).await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this operation is actually supposed to use the metadata client to avoid potential deadlocks, as described in this section of the spec. I see the integration guide mentions using the encrypted There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch, fixed. I'll send a PR for the integration guide. |
||
if cursor.advance().await? { | ||
ctx.mongo_feed(cursor.current())?; | ||
} | ||
ctx.mongo_done()?; | ||
} | ||
State::NeedMongoMarkings => { | ||
let ctx = result_mut(&mut ctx)?; | ||
let command = ctx.mongo_op()?.to_raw_document_buf(); | ||
let db = db.as_ref().ok_or_else(|| { | ||
Error::internal("db required for NeedMongoMarkings state") | ||
})?; | ||
let op = RawOutput(RunCommand::new_raw(db.to_string(), command, None, None)?); | ||
let mongocryptd_client = csfle | ||
.mongocryptd_client | ||
.as_ref() | ||
.ok_or_else(|| Error::internal("mongocryptd client not found"))?; | ||
let response = mongocryptd_client.execute_operation(op, None).await?; | ||
ctx.mongo_feed(response.raw_body())?; | ||
ctx.mongo_done()?; | ||
} | ||
State::NeedMongoKeys => { | ||
let ctx = result_mut(&mut ctx)?; | ||
let filter = raw_to_doc(ctx.mongo_op()?)?; | ||
let kv_ns = &csfle.opts.key_vault_namespace; | ||
let kv_client = csfle | ||
.aux_clients | ||
.key_vault_client | ||
.upgrade() | ||
.ok_or_else(|| Error::internal("key vault client dropped"))?; | ||
let kv_coll = kv_client | ||
.database(&kv_ns.db) | ||
.collection::<RawDocumentBuf>(&kv_ns.coll); | ||
let mut cursor = kv_coll.find(filter, None).await?; | ||
while cursor.advance().await? { | ||
ctx.mongo_feed(cursor.current())?; | ||
} | ||
ctx.mongo_done()?; | ||
} | ||
State::NeedKms => { | ||
let ctx = result_mut(&mut ctx)?; | ||
let scope = ctx.kms_scope(); | ||
let mut kms_ctxen: Vec<Result<_>> = vec![]; | ||
while let Some(kms_ctx) = scope.next_kms_ctx() { | ||
kms_ctxen.push(Ok(kms_ctx)); | ||
} | ||
stream::iter(kms_ctxen) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the stream versoin works basically the same as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, there it is. Yup, it's polling rather than parallel, which seems fine for this because these futures are going to be IO-bound anyway. I'm going to stick with using the stream method rather than |
||
.try_for_each_concurrent(None, |mut kms_ctx| async move { | ||
let endpoint = kms_ctx.endpoint()?; | ||
let addr = ServerAddress::parse(endpoint)?; | ||
let provider = kms_ctx.kms_provider()?; | ||
let tls_options = csfle | ||
.opts() | ||
.tls_options | ||
.as_ref() | ||
.and_then(|tls| tls.get(&provider)) | ||
.cloned() | ||
.unwrap_or_default(); | ||
let mut stream = AsyncStream::connect( | ||
StreamOptions::builder() | ||
.address(addr) | ||
.tls_options(tls_options) | ||
.build(), | ||
) | ||
.await?; | ||
stream.write_all(kms_ctx.message()?).await?; | ||
let mut buf = vec![]; | ||
while kms_ctx.bytes_needed() > 0 { | ||
let buf_size = kms_ctx.bytes_needed().try_into().map_err(|e| { | ||
Error::internal(format!("buffer size overflow: {}", e)) | ||
})?; | ||
buf.resize(buf_size, 0); | ||
let count = stream.read(&mut buf).await?; | ||
kms_ctx.feed(&buf[0..count])?; | ||
} | ||
Ok(()) | ||
}) | ||
.await?; | ||
} | ||
State::NeedKmsCredentials => todo!("RUST-1314"), | ||
State::Ready => { | ||
let (tx, rx) = oneshot::channel(); | ||
let mut thread_ctx = std::mem::replace( | ||
&mut ctx, | ||
Err(Error::internal("crypto context not present")), | ||
)?; | ||
csfle.crypto_threads.spawn(move || { | ||
let result = thread_ctx.finalize().map(|doc| doc.to_owned()); | ||
let _ = tx.send((thread_ctx, result)); | ||
}); | ||
let (ctx_again, output) = rx | ||
.await | ||
.map_err(|_| Error::internal("crypto thread dropped"))?; | ||
patrickfreed marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ctx = Ok(ctx_again); | ||
result = Some(output?); | ||
} | ||
State::Done => break, | ||
s => return Err(Error::internal(format!("unhandled state {:?}", s))), | ||
} | ||
} | ||
match result { | ||
Some(doc) => Ok(doc), | ||
None => Err(Error::internal("libmongocrypt terminated without output")), | ||
} | ||
} | ||
} | ||
|
||
fn result_ref<T>(r: &Result<T>) -> Result<&T> { | ||
r.as_ref().map_err(Error::clone) | ||
} | ||
|
||
fn result_mut<T>(r: &mut Result<T>) -> Result<&mut T> { | ||
r.as_mut().map_err(|e| e.clone()) | ||
} | ||
|
||
fn raw_to_doc(raw: &RawDocument) -> Result<Document> { | ||
raw.try_into() | ||
.map_err(|e| Error::internal(format!("could not parse raw document: {}", e))) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,8 @@ | ||
#[cfg(feature = "csfle")] | ||
use bson::RawDocumentBuf; | ||
use bson::{doc, RawBsonRef, RawDocument, Timestamp}; | ||
#[cfg(feature = "csfle")] | ||
use futures_core::future::BoxFuture; | ||
use lazy_static::lazy_static; | ||
use serde::de::DeserializeOwned; | ||
|
||
|
@@ -598,6 +602,21 @@ impl Client { | |
let target_db = cmd.target_db.clone(); | ||
|
||
let serialized = op.serialize_command(cmd)?; | ||
#[cfg(feature = "csfle")] | ||
let serialized = { | ||
let guard = self.inner.csfle.read().await; | ||
if let Some(ref csfle) = *guard { | ||
if csfle.opts().bypass_auto_encryption != Some(true) { | ||
self.auto_encrypt(csfle, RawDocument::from_bytes(&serialized)?, &target_db) | ||
.await? | ||
.into_bytes() | ||
} else { | ||
serialized | ||
} | ||
} else { | ||
serialized | ||
} | ||
}; | ||
let raw_cmd = RawCommand { | ||
name: cmd_name.clone(), | ||
target_db, | ||
|
@@ -750,6 +769,21 @@ impl Client { | |
handler.handle_command_succeeded_event(command_succeeded_event); | ||
}); | ||
|
||
#[cfg(feature = "csfle")] | ||
let response = { | ||
let guard = self.inner.csfle.read().await; | ||
if let Some(ref csfle) = *guard { | ||
if csfle.opts().bypass_auto_encryption != Some(true) { | ||
let new_body = self.auto_decrypt(csfle, response.raw_body()).await?; | ||
RawCommandResponse::new_raw(response.source, new_body) | ||
} else { | ||
response | ||
} | ||
} else { | ||
response | ||
} | ||
}; | ||
|
||
match op.handle_response(response, connection.stream_description()?) { | ||
Ok(response) => Ok(response), | ||
Err(mut err) => { | ||
|
@@ -765,6 +799,34 @@ impl Client { | |
} | ||
} | ||
|
||
#[cfg(feature = "csfle")] | ||
fn auto_encrypt<'a>( | ||
&'a self, | ||
csfle: &'a super::csfle::ClientState, | ||
command: &'a RawDocument, | ||
target_db: &'a str, | ||
) -> BoxFuture<'a, Result<RawDocumentBuf>> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be boxed because it's technically recursive ( |
||
Box::pin(async move { | ||
let ctx = csfle | ||
.crypt | ||
.ctx_builder() | ||
.build_encrypt(target_db, command)?; | ||
self.run_mongocrypt_ctx(ctx, Some(target_db)).await | ||
patrickfreed marked this conversation as resolved.
Show resolved
Hide resolved
|
||
}) | ||
} | ||
|
||
#[cfg(feature = "csfle")] | ||
fn auto_decrypt<'a>( | ||
&'a self, | ||
csfle: &'a super::csfle::ClientState, | ||
response: &'a RawDocument, | ||
) -> BoxFuture<'a, Result<RawDocumentBuf>> { | ||
Box::pin(async move { | ||
let ctx = csfle.crypt.ctx_builder().build_decrypt(response)?; | ||
self.run_mongocrypt_ctx(ctx, None).await | ||
}) | ||
} | ||
|
||
/// Start an implicit session if the operation and write concern are compatible with sessions. | ||
async fn start_implicit_session<T: Operation>(&self, op: &T) -> Result<Option<ClientSession>> { | ||
match self.get_session_support_status().await? { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
result_ref
andresult_mut
are needed becauseas_ref
/as_mut
onResults
also applies to the error type, which causes an error when using?
because the types don't match the return type.