Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RUST-1306 Move Compression enum behind compression feature flags #1055

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions .evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,14 @@ buildvariants:

- name: compression
display_name: "Compression"
patchable: false
# patchable: false
run_on:
- rhel87-small
expansions:
COMPRESSION: true
AUTH: auth
SSL: ssl
tasks:
- .rapid .replicaset
- .compression

- name: stable-api
display_name: "Stable API V1"
Expand Down Expand Up @@ -872,15 +871,38 @@ tasks:
TOPOLOGY: sharded_cluster
- func: "run driver test suite"

- name: test-compression
- name: test-zstd-compression
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's not a ton of value in testing all of the compressors together IMO, and because the compressors list in the client options is considered in priority-order, separating these into different variants makes it easier to get full coverage of each compressor when running the test suite.

tags: [compression]
commands:
- func: "bootstrap mongo-orchestration"
vars:
MONGODB_VERSION: rapid
TOPOLOGY: replica_set
- func: "run driver test suite"
vars:
ZSTD: true

- name: test-zlib-compression
tags: [compression]
commands:
- func: "bootstrap mongo-orchestration"
vars:
MONGODB_VERSION: rapid
TOPOLOGY: replica_set
- func: "run driver test suite"
vars:
ZLIB: true

- name: test-snappy-compression
tags: [compression]
commands:
- func: "bootstrap mongo-orchestration"
vars:
MONGODB_VERSION: rapid
TOPOLOGY: replica_set
- func: "run driver test suite"
vars:
COMPRESSION: true
SNAPPY: true

- name: test-aws-auth-regular-credentials
tags: [aws-auth]
Expand Down Expand Up @@ -1485,10 +1507,12 @@ functions:
include_expansions_in_env:
- PROJECT_DIRECTORY
- OPENSSL
- COMPRESSION
- MONGODB_URI
- MONGODB_API_VERSION
- PATH
- ZSTD
- ZLIB
- SNAPPY

"run sync tests":
- command: subprocess.exec
Expand Down
12 changes: 10 additions & 2 deletions .evergreen/run-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,16 @@ if [ "$OPENSSL" = true ]; then
FEATURE_FLAGS+=("openssl-tls")
fi

if [ "$COMPRESSION" = true ]; then
FEATURE_FLAGS+=("snappy-compression", "zlib-compression", "zstd-compression")
if [ "$ZSTD" = true ]; then
FEATURE_FLAGS+=("zstd-compression")
fi

if [ "$ZLIB" = true ]; then
FEATURE_FLAGS+=("zlib-compression")
fi

if [ "$SNAPPY" = true ]; then
FEATURE_FLAGS+=("snappy-compression")
fi

export SESSION_TEST_REQUIRE_MONGOCRYPTD=true
Expand Down
86 changes: 65 additions & 21 deletions src/client/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@ use serde_with::skip_serializing_none;
use strsim::jaro_winkler;
use typed_builder::TypedBuilder;

#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
use crate::options::Compressor;
#[cfg(test)]
use crate::srv::LookupHosts;
use crate::{
bson::{doc, Bson, Document},
client::auth::{AuthMechanism, Credential},
compression::Compressor,
concern::{Acknowledgment, ReadConcern, WriteConcern},
error::{Error, ErrorKind, Result},
event::EventHandler,
Expand Down Expand Up @@ -389,10 +394,15 @@ pub struct ClientOptions {
#[builder(default)]
pub app_name: Option<String>,

/// The compressors that the Client is willing to use in the order they are specified
/// in the configuration. The Client sends this list of compressors to the server.
/// The server responds with the intersection of its supported list of compressors.
/// The order of compressors indicates preference of compressors.
/// The allowed compressors to use to compress messages sent to and decompress messages
/// received from the server. This list should be specified in priority order, as the
/// compressor used for messages will be the first compressor in this list that is also
/// supported by the server selected for operations.
#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
#[builder(default)]
#[serde(skip)]
pub compressors: Option<Vec<Compressor>>,
Expand Down Expand Up @@ -836,6 +846,11 @@ pub struct ConnectionString {
/// By default, connections will not be closed due to being idle.
pub max_idle_time: Option<Duration>,

#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
/// The compressors that the Client is willing to use in the order they are specified
/// in the configuration. The Client sends this list of compressors to the server.
/// The server responds with the intersection of its supported list of compressors.
Expand Down Expand Up @@ -1344,6 +1359,11 @@ impl ClientOptions {
max_idle_time: conn_str.max_idle_time,
max_connecting: conn_str.max_connecting,
server_selection_timeout: conn_str.server_selection_timeout,
#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
compressors: conn_str.compressors,
connect_timeout: conn_str.connect_timeout,
retry_reads: conn_str.retry_reads,
Expand Down Expand Up @@ -1415,6 +1435,11 @@ impl ClientOptions {
}
}

#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
if let Some(ref compressors) = self.compressors {
for compressor in compressors {
compressor.validate()?;
Expand Down Expand Up @@ -1483,12 +1508,19 @@ impl ClientOptions {
if self.hosts.is_empty() {
self.hosts = other.hosts;
}

#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
merge_options!(other, self, [compressors]);

merge_options!(
other,
self,
[
app_name,
compressors,
cmap_event_handler,
command_event_handler,
connect_timeout,
Expand Down Expand Up @@ -1939,15 +1971,23 @@ impl ConnectionString {
}
}

// If zlib and zlib_compression_level are specified then write zlib_compression_level into
// zlib enum
if let (Some(compressors), Some(zlib_compression_level)) =
(self.compressors.as_mut(), parts.zlib_compression)
{
for compressor in compressors {
compressor.write_zlib_level(zlib_compression_level)
#[cfg(feature = "zlib-compression")]
if let Some(zlib_compression_level) = parts.zlib_compression {
if let Some(compressors) = self.compressors.as_mut() {
for compressor in compressors {
compressor.write_zlib_level(zlib_compression_level)?;
}
}
}
#[cfg(not(feature = "zlib-compression"))]
if parts.zlib_compression.is_some() {
return Err(ErrorKind::InvalidArgument {
message: "zlibCompressionLevel may not be specified without the zlib-compression \
feature flag enabled"
.into(),
}
.into());
}

Ok(parts)
}
Expand Down Expand Up @@ -2078,16 +2118,20 @@ impl ConnectionString {
}
parts.auth_mechanism_properties = Some(doc);
}
#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
"compressors" => {
let compressors = value
.split(',')
.filter_map(|x| Compressor::parse_str(x).ok())
.collect::<Vec<Compressor>>();
self.compressors = if compressors.is_empty() {
None
} else {
Some(compressors)
let mut compressors: Option<Vec<Compressor>> = None;
for compressor in value.split(',') {
let compressor = Compressor::from_str(compressor)?;
compressors
.get_or_insert_with(Default::default)
.push(compressor);
}
self.compressors = compressors;
}
k @ "connecttimeoutms" => {
self.connect_timeout = Some(Duration::from_millis(get_duration!(value, k)));
Expand Down
17 changes: 13 additions & 4 deletions src/client/options/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ use bson::UuidRepresentation;
use pretty_assertions::assert_eq;
use serde::Deserialize;

#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
use crate::options::Compressor;
use crate::{
bson::{Bson, Document},
client::options::{ClientOptions, ConnectionString, ServerAddress},
error::ErrorKind,
options::Compressor,
test::run_spec_test,
Client,
};
Expand Down Expand Up @@ -172,9 +177,13 @@ async fn run_test(test_file: TestFile) {
.filter(|(ref key, _)| json_options.contains_key(key))
.collect();

// This is required because compressor is not serialize, but the spec tests
// still expect to see serialized compressors.
// This hardcodes the compressors into the options.
// Compressor does not implement Serialize, so add the compressor names to the
// options manually.
#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
if let Some(compressors) = options.compressors {
options_doc.insert(
"compressors",
Expand Down
53 changes: 40 additions & 13 deletions src/cmap/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ use tokio::{
sync::{mpsc, Mutex},
};

#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
use crate::options::Compressor;

use self::wire::{Message, MessageFlags};
use super::manager::PoolManager;
use crate::{
bson::oid::ObjectId,
cmap::PoolGeneration,
compression::Compressor,
error::{load_balanced_mode_mismatch, Error, ErrorKind, Result},
event::cmap::{
CmapEventEmitter,
Expand Down Expand Up @@ -97,12 +103,13 @@ pub(crate) struct Connection {

stream: BufStream<AsyncStream>,

/// Compressor that the client will use before sending messages.
/// This compressor does not get used to decompress server messages.
/// The client will decompress server messages using whichever compressor
/// the server indicates in its message. This compressor is the first
/// compressor in the client's compressor list that also appears in the
/// server's compressor list.
/// Compressor to use to compress outgoing messages. This compressor is not used to decompress
/// incoming messages from the server.
#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
pub(super) compressor: Option<Compressor>,

/// If the connection is pinned to a cursor or transaction, the channel sender to return this
Expand Down Expand Up @@ -141,6 +148,11 @@ impl Connection {
stream_description: None,
error: None,
pinned_sender: None,
#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
compressor: None,
more_to_come: false,
oidc_token_gen_id: std::sync::RwLock::new(0),
Expand Down Expand Up @@ -272,7 +284,8 @@ impl Connection {
pub(crate) async fn send_message(
&mut self,
message: Message,
to_compress: bool,
// This value is only read if a compression feature flag is enabled.
#[allow(unused_variables)] can_compress: bool,
) -> Result<RawCommandResponse> {
if self.more_to_come {
return Err(Error::internal(format!(
Expand All @@ -283,16 +296,25 @@ impl Connection {

self.command_executing = true;

// If the client has agreed on a compressor with the server, and the command
// is the right type of command, then compress the message.
#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
let write_result = match self.compressor {
Some(ref compressor) if to_compress => {
Some(ref compressor) if can_compress => {
message
.write_compressed_to(&mut self.stream, compressor)
.write_op_compressed_to(&mut self.stream, compressor)
.await
}
_ => message.write_to(&mut self.stream).await,
_ => message.write_op_msg_to(&mut self.stream).await,
};
#[cfg(all(
not(feature = "zstd-compression"),
not(feature = "zlib-compression"),
not(feature = "snappy-compression")
))]
let write_result = message.write_op_msg_to(&mut self.stream).await;

if let Err(ref err) = write_result {
self.error = Some(err.clone());
Expand Down Expand Up @@ -430,6 +452,11 @@ impl Connection {
pool_manager: None,
ready_and_available_time: None,
pinned_sender: self.pinned_sender.clone(),
#[cfg(any(
feature = "zstd-compression",
feature = "zlib-compression",
feature = "snappy-compression"
))]
compressor: self.compressor.clone(),
more_to_come: false,
oidc_token_gen_id: std::sync::RwLock::new(0),
Expand Down
Loading