Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-1553 Add support for document sequences (OP_MSG payload type 1) #1009

Merged
merged 3 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/client/auth/sasl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl SaslStart {
body.insert("options", doc! { "skipEmptyExchange": true });
}

let mut command = Command::new("saslStart".into(), self.source, body);
let mut command = Command::new("saslStart", self.source, body);
if let Some(server_api) = self.server_api {
command.set_server_api(&server_api);
}
Expand Down Expand Up @@ -81,7 +81,7 @@ impl SaslContinue {
"payload": Binary { subtype: BinarySubtype::Generic, bytes: self.payload },
};

let mut command = Command::new("saslContinue".into(), self.source, body);
let mut command = Command::new("saslContinue", self.source, body);
if let Some(server_api) = self.server_api {
command.set_server_api(&server_api);
}
Expand Down
2 changes: 1 addition & 1 deletion src/client/auth/x509.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub(crate) fn build_client_first(
auth_command_doc.insert("username", username);
}

let mut command = Command::new("authenticate".into(), "$external".into(), auth_command_doc);
let mut command = Command::new("authenticate", "$external", auth_command_doc);
if let Some(server_api) = server_api {
command.set_server_api(server_api);
}
Expand Down
42 changes: 18 additions & 24 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ use crate::{
WatchArgs,
},
cmap::{
conn::PinnedConnectionHandle,
conn::{
wire::{next_request_id, Message},
PinnedConnectionHandle,
},
Connection,
ConnectionPool,
RawCommand,
RawCommandResponse,
},
cursor::{session::SessionCursor, Cursor, CursorSpecification},
Expand Down Expand Up @@ -573,51 +575,43 @@ impl Client {

let connection_info = connection.info();
let service_id = connection.service_id();
let request_id = crate::cmap::conn::next_request_id();
let request_id = next_request_id();

if let Some(ref server_api) = self.inner.options.server_api {
cmd.set_server_api(server_api);
}

let should_redact = cmd.should_redact();
let should_compress = cmd.should_compress();

let cmd_name = cmd.name.clone();
let target_db = cmd.target_db.clone();

let serialized = op.serialize_command(cmd)?;
#[allow(unused_mut)]
let mut message = Message::from_command(cmd, Some(request_id))?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I eliminated the RawCommand layer between Command and Message here. RawCommand contained a vec of bytes created by Operation::serialize_command with the assumption that those bytes should be sent as a single payload type 0 section when converting to Message: this no longer makes sense when commands could require multiple sections.

#[cfg(feature = "in-use-encryption-unstable")]
let serialized = {
{
let guard = self.inner.csfle.read().await;
if let Some(ref csfle) = *guard {
if csfle.opts().bypass_auto_encryption != Some(true) {
self.auto_encrypt(csfle, RawDocument::from_bytes(&serialized)?, &target_db)
.await?
.into_bytes()
} else {
serialized
let encrypted_payload = self
.auto_encrypt(csfle, &message.document_payload, &target_db)
.await?;
message.document_payload = encrypted_payload;
}
} else {
serialized
}
};
let raw_cmd = RawCommand {
name: cmd_name.clone(),
target_db,
exhaust_allowed: false,
bytes: serialized,
};
}

self.emit_command_event(|| {
let command_body = if should_redact {
Document::new()
} else {
Document::from_reader(raw_cmd.bytes.as_slice())
.unwrap_or_else(|e| doc! { "serialization error": e.to_string() })
message.get_command_document()
};
CommandEvent::Started(CommandStartedEvent {
command: command_body,
db: raw_cmd.target_db.clone(),
command_name: raw_cmd.name.clone(),
db: target_db.clone(),
command_name: cmd_name.clone(),
request_id,
connection: connection_info.clone(),
service_id,
Expand All @@ -626,7 +620,7 @@ impl Client {
.await;

let start_time = Instant::now();
let command_result = match connection.send_raw_command(raw_cmd, request_id).await {
let command_result = match connection.send_message(message, should_compress).await {
Ok(response) => {
async fn handle_response<T: Operation>(
client: &Client,
Expand Down
2 changes: 1 addition & 1 deletion src/cmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use derivative::Derivative;

pub use self::conn::ConnectionInfo;
pub(crate) use self::{
conn::{Command, Connection, RawCommand, RawCommandResponse, StreamDescription},
conn::{Command, Connection, RawCommandResponse, StreamDescription},
status::PoolGenerationSubscriber,
worker::PoolGeneration,
};
Expand Down
35 changes: 12 additions & 23 deletions src/cmap/conn.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod command;
mod stream_description;
mod wire;
pub(crate) mod wire;

use std::{
sync::Arc,
Expand Down Expand Up @@ -33,9 +33,8 @@ use crate::{
options::ServerAddress,
runtime::AsyncStream,
};
pub(crate) use command::{Command, RawCommand, RawCommandResponse};
pub(crate) use command::{Command, RawCommandResponse};
pub(crate) use stream_description::StreamDescription;
pub(crate) use wire::next_request_id;

/// User-facing information about a connection to the database.
#[derive(Clone, Debug, Serialize)]
Expand Down Expand Up @@ -273,7 +272,7 @@ impl Connection {
}
}

async fn send_message(
pub(crate) async fn send_message(
&mut self,
message: Message,
to_compress: bool,
Expand Down Expand Up @@ -318,7 +317,10 @@ impl Connection {
let response_message = response_message_result?;
self.more_to_come = response_message.flags.contains(MessageFlags::MORE_TO_COME);

RawCommandResponse::new(self.address.clone(), response_message)
Ok(RawCommandResponse::new(
self.address.clone(),
response_message,
))
}

/// Executes a `Command` and returns a `CommandResponse` containing the result from the server.
Expand All @@ -332,23 +334,7 @@ impl Connection {
request_id: impl Into<Option<i32>>,
) -> Result<RawCommandResponse> {
let to_compress = command.should_compress();
let message = Message::with_command(command, request_id.into())?;
self.send_message(message, to_compress).await
}

/// Executes a `RawCommand` and returns a `CommandResponse` containing the result from the
/// server.
///
/// An `Ok(...)` result simply means the server received the command and that the driver
/// received the response; it does not imply anything about the success of the command
/// itself.
pub(crate) async fn send_raw_command(
&mut self,
command: RawCommand,
request_id: impl Into<Option<i32>>,
) -> Result<RawCommandResponse> {
let to_compress = command.should_compress();
let message = Message::with_raw_command(command, request_id.into());
let message = Message::from_command(command, request_id.into())?;
self.send_message(message, to_compress).await
}

Expand Down Expand Up @@ -379,7 +365,10 @@ impl Connection {
let response_message = response_message_result?;
self.more_to_come = response_message.flags.contains(MessageFlags::MORE_TO_COME);

RawCommandResponse::new(self.address.clone(), response_message)
Ok(RawCommandResponse::new(
self.address.clone(),
response_message,
))
}

/// Gets the connection's StreamDescription.
Expand Down
76 changes: 33 additions & 43 deletions src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use bson::{RawDocument, RawDocumentBuf};
use serde::{de::DeserializeOwned, Deserialize, Serialize};

use super::wire::Message;
use super::wire::{message::DocumentSequence, Message};
use crate::{
bson::{rawdoc, Document},
bson_util::extend_raw_document_buf,
client::{options::ServerApi, ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
bson::Document,
client::{options::ServerApi, ClusterTime},
error::{Error, ErrorKind, Result},
hello::{HelloCommandResponse, HelloReply},
operation::{CommandErrorBody, CommandResponse},
Expand All @@ -14,37 +13,26 @@ use crate::{
ClientSession,
};

/// A command that has been serialized to BSON.
#[derive(Debug)]
pub(crate) struct RawCommand {
pub(crate) name: String,
pub(crate) target_db: String,
/// Whether or not the server may respond to this command multiple times via the moreToComeBit.
pub(crate) exhaust_allowed: bool,
pub(crate) bytes: Vec<u8>,
}

impl RawCommand {
pub(crate) fn should_compress(&self) -> bool {
let name = self.name.to_lowercase();
!REDACTED_COMMANDS.contains(name.as_str()) && !HELLO_COMMAND_NAMES.contains(name.as_str())
}
}

/// Driver-side model of a database command.
#[serde_with::skip_serializing_none]
#[derive(Clone, Debug, Serialize, Default)]
#[serde(rename_all = "camelCase")]
pub(crate) struct Command<T = Document> {
pub(crate) struct Command<T = Document>
where
T: Serialize,
{
#[serde(skip)]
pub(crate) name: String,

#[serde(skip)]
pub(crate) exhaust_allowed: bool,

#[serde(flatten)]
#[serde(skip)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT the new behavior (skip and extend_raw_document_buf in Message::from_command) should be equivalent to flatten?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah you're right, reverted

pub(crate) body: T,

#[serde(skip)]
pub(crate) document_sequences: Vec<DocumentSequence>,

#[serde(rename = "$db")]
pub(crate) target_db: String,

Expand All @@ -70,13 +58,17 @@ pub(crate) struct Command<T = Document> {
recovery_token: Option<Document>,
}

impl<T> Command<T> {
pub(crate) fn new(name: String, target_db: String, body: T) -> Self {
impl<T> Command<T>
where
T: Serialize,
{
pub(crate) fn new(name: impl ToString, target_db: impl ToString, body: T) -> Self {
Self {
name,
target_db,
name: name.to_string(),
target_db: target_db.to_string(),
exhaust_allowed: false,
body,
document_sequences: Vec::new(),
lsid: None,
cluster_time: None,
server_api: None,
Expand All @@ -100,6 +92,7 @@ impl<T> Command<T> {
target_db,
exhaust_allowed: false,
body,
document_sequences: Vec::new(),
lsid: None,
cluster_time: None,
server_api: None,
Expand All @@ -112,6 +105,17 @@ impl<T> Command<T> {
}
}

pub(crate) fn add_document_sequence(
&mut self,
identifier: impl ToString,
documents: Vec<RawDocumentBuf>,
) {
self.document_sequences.push(DocumentSequence {
identifier: identifier.to_string(),
documents,
});
}

pub(crate) fn set_session(&mut self, session: &ClientSession) {
self.lsid = Some(session.id().clone())
}
Expand Down Expand Up @@ -178,19 +182,6 @@ impl<T> Command<T> {
}
}

impl Command<RawDocumentBuf> {
pub(crate) fn into_bson_bytes(mut self) -> Result<Vec<u8>> {
let mut command = self.body;

// Clear the body of the command to avoid re-serializing.
self.body = rawdoc! {};
let rest_of_command = bson::to_raw_document_buf(&self)?;

extend_raw_document_buf(&mut command, rest_of_command)?;
Ok(command.into_bytes())
}
}

#[derive(Debug, Clone)]
pub(crate) struct RawCommandResponse {
pub(crate) source: ServerAddress,
Expand Down Expand Up @@ -220,9 +211,8 @@ impl RawCommandResponse {
)
}

pub(crate) fn new(source: ServerAddress, message: Message) -> Result<Self> {
let raw = message.single_document_response()?;
Ok(Self::new_raw(source, RawDocumentBuf::from_bytes(raw)?))
pub(crate) fn new(source: ServerAddress, message: Message) -> Self {
Self::new_raw(source, message.document_payload)
}

pub(crate) fn new_raw(source: ServerAddress, raw: RawDocumentBuf) -> Self {
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/conn/stream_description.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl StreamDescription {
max_bson_object_size: 16 * 1024 * 1024,
max_write_batch_size: 100_000,
hello_ok: false,
max_message_size_bytes: Default::default(),
max_message_size_bytes: 48_000_000,
service_id: None,
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/conn/wire.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
mod header;
mod message;
pub(crate) mod message;
mod util;

pub(crate) use self::{
Expand Down
Loading