Skip to content

Commit c4f4acf

Browse files
RUST-1400 GridFS download methods
1 parent 55cd3d9 commit c4f4acf

24 files changed

+3928
-126
lines changed

Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ bson = { git = "https://github.com/mongodb/bson-rust", branch = "main" }
7878
chrono = { version = "0.4.7", default-features = false, features = ["clock", "std"] }
7979
derivative = "2.1.1"
8080
flate2 = { version = "1.0", optional = true }
81+
futures-io = "0.3.21"
8182
futures-core = "0.3.14"
8283
futures-util = { version = "0.3.14", features = ["io"] }
8384
futures-executor = "0.3.14"
@@ -149,7 +150,7 @@ features = ["dangerous_configuration"]
149150

150151
[dependencies.tokio-util]
151152
version = "0.7.0"
152-
features = ["io"]
153+
features = ["io", "compat"]
153154

154155
[dependencies.uuid]
155156
version = "1.1.2"
@@ -162,6 +163,7 @@ ctrlc = "3.2.2"
162163
derive_more = "0.99.13"
163164
function_name = "0.2.1"
164165
futures = "0.3"
166+
hex = "0.4"
165167
home = "0.5"
166168
lambda_runtime = "0.6.0"
167169
pretty_assertions = "1.3.0"

src/db/mod.rs

+2-24
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,7 @@ use crate::{
1919
concern::{ReadConcern, WriteConcern},
2020
cursor::Cursor,
2121
error::{Error, ErrorKind, Result},
22-
gridfs::{
23-
options::GridFsBucketOptions,
24-
GridFsBucket,
25-
DEFAULT_BUCKET_NAME,
26-
DEFAULT_CHUNK_SIZE_BYTES,
27-
},
22+
gridfs::{options::GridFsBucketOptions, GridFsBucket},
2823
operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand},
2924
options::{
3025
AggregateOptions,
@@ -573,23 +568,6 @@ impl Database {
573568

574569
/// Creates a new GridFsBucket in the database with the given options.
575570
pub fn gridfs_bucket(&self, options: impl Into<Option<GridFsBucketOptions>>) -> GridFsBucket {
576-
let mut options = options.into().unwrap_or_default();
577-
options.read_concern = options
578-
.read_concern
579-
.or_else(|| self.read_concern().cloned());
580-
options.write_concern = options
581-
.write_concern
582-
.or_else(|| self.write_concern().cloned());
583-
options.selection_criteria = options
584-
.selection_criteria
585-
.or_else(|| self.selection_criteria().cloned());
586-
options.bucket_name = options
587-
.bucket_name
588-
.or_else(|| Some(DEFAULT_BUCKET_NAME.to_string()));
589-
options.chunk_size_bytes = options.chunk_size_bytes.or(Some(DEFAULT_CHUNK_SIZE_BYTES));
590-
GridFsBucket {
591-
db: self.clone(),
592-
options,
593-
}
571+
GridFsBucket::new(self.clone(), options.into().unwrap_or_default())
594572
}
595573
}

src/error.rs

+5
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,11 @@ pub enum ErrorKind {
457457
#[non_exhaustive]
458458
DnsResolve { message: String },
459459

460+
/// A GridFS error occurred.
461+
#[error("{message}")]
462+
#[non_exhaustive]
463+
GridFS { message: String },
464+
460465
#[error("Internal error: {message}")]
461466
#[non_exhaustive]
462467
Internal { message: String },

src/gridfs.rs

+98-88
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,71 @@
1-
#![allow(dead_code, unused_variables)]
21
// TODO(RUST-1395) Remove these allows.
2+
#![allow(dead_code, unused_variables)]
33

4+
mod download;
45
pub mod options;
56

67
use core::task::{Context, Poll};
7-
use std::pin::Pin;
8+
use std::{
9+
pin::Pin,
10+
sync::{atomic::AtomicBool, Arc},
11+
};
12+
13+
use serde::{Deserialize, Serialize};
14+
use tokio::io::ReadBuf;
815

916
use crate::{
17+
bson::{doc, oid::ObjectId, Binary, Bson, DateTime, Document},
1018
concern::{ReadConcern, WriteConcern},
1119
cursor::Cursor,
1220
error::Result,
13-
selection_criteria::SelectionCriteria,
21+
options::SelectionCriteria,
22+
Collection,
1423
Database,
1524
};
16-
use bson::{oid::ObjectId, Bson, DateTime, Document};
25+
1726
use options::*;
18-
use serde::{Deserialize, Serialize};
19-
use tokio::io::ReadBuf;
2027

2128
pub const DEFAULT_BUCKET_NAME: &str = "fs";
2229
pub const DEFAULT_CHUNK_SIZE_BYTES: u32 = 255 * 1024;
2330

2431
// Contained in a "chunks" collection for each user file
32+
#[derive(Debug, Deserialize, Serialize)]
2533
struct Chunk {
34+
#[serde(rename = "_id")]
2635
id: ObjectId,
2736
files_id: Bson,
2837
n: u32,
2938
// default size is 255 KiB
30-
data: Vec<u8>,
39+
data: Binary,
3140
}
3241

3342
/// A collection in which information about stored files is stored. There will be one files
3443
/// collection document per stored file.
35-
#[derive(Serialize, Deserialize)]
44+
#[derive(Debug, Deserialize, Serialize)]
45+
#[serde(rename_all = "camelCase")]
3646
pub struct FilesCollectionDocument {
37-
id: Bson,
38-
length: i64,
39-
chunk_size: u32,
40-
upload_date: DateTime,
41-
filename: String,
42-
metadata: Document,
47+
#[serde(rename = "_id")]
48+
pub id: Bson,
49+
pub length: u64,
50+
pub chunk_size: u32,
51+
pub upload_date: DateTime,
52+
pub filename: Option<String>,
53+
#[serde(skip_serializing_if = "Option::is_none")]
54+
pub metadata: Option<Document>,
55+
}
56+
57+
#[derive(Debug)]
58+
struct GridFsBucketInner {
59+
options: GridFsBucketOptions,
60+
files: Collection<FilesCollectionDocument>,
61+
chunks: Collection<Chunk>,
62+
created_indexes: AtomicBool,
4363
}
4464

4565
/// Struct for storing GridFS managed files within a [`Database`].
66+
#[derive(Debug, Clone)]
4667
pub struct GridFsBucket {
47-
// Contains a "chunks" collection
48-
pub(crate) db: Database,
49-
pub(crate) options: GridFsBucketOptions,
68+
inner: Arc<GridFsBucketInner>,
5069
}
5170

5271
// TODO: RUST-1395 Add documentation and example code for this struct.
@@ -134,30 +153,67 @@ impl tokio::io::AsyncRead for GridFsDownloadStream {
134153
}
135154
}
136155

137-
impl futures_util::io::AsyncRead for GridFsDownloadStream {
138-
fn poll_read(
139-
self: Pin<&mut Self>,
140-
cx: &mut Context<'_>,
141-
buf: &mut [u8],
142-
) -> Poll<core::result::Result<usize, futures_util::io::Error>> {
143-
todo!()
156+
impl GridFsBucket {
157+
pub(crate) fn new(db: Database, mut options: GridFsBucketOptions) -> GridFsBucket {
158+
if options.read_concern.is_none() {
159+
options.read_concern = db.read_concern().cloned();
160+
}
161+
if options.write_concern.is_none() {
162+
options.write_concern = db.write_concern().cloned();
163+
}
164+
if options.selection_criteria.is_none() {
165+
options.selection_criteria = db.selection_criteria().cloned();
166+
}
167+
168+
let bucket_name = options
169+
.bucket_name
170+
.as_deref()
171+
.unwrap_or(DEFAULT_BUCKET_NAME);
172+
173+
let files = db.collection::<FilesCollectionDocument>(&format!("{}.files", bucket_name));
174+
let chunks = db.collection::<Chunk>(&format!("{}.chunks", bucket_name));
175+
176+
GridFsBucket {
177+
inner: Arc::new(GridFsBucketInner {
178+
options,
179+
files,
180+
chunks,
181+
created_indexes: AtomicBool::new(false),
182+
}),
183+
}
144184
}
145-
}
146185

147-
impl GridFsBucket {
148186
/// Gets the read concern of the [`GridFsBucket`].
149187
pub fn read_concern(&self) -> Option<&ReadConcern> {
150-
self.options.read_concern.as_ref()
188+
self.inner.options.read_concern.as_ref()
151189
}
152190

153191
/// Gets the write concern of the [`GridFsBucket`].
154192
pub fn write_concern(&self) -> Option<&WriteConcern> {
155-
self.options.write_concern.as_ref()
193+
self.inner.options.write_concern.as_ref()
156194
}
157195

158196
/// Gets the selection criteria of the [`GridFsBucket`].
159197
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
160-
self.options.selection_criteria.as_ref()
198+
self.inner.options.selection_criteria.as_ref()
199+
}
200+
201+
/// Gets the chunk size in bytes for the [`GridFsBucket`].
202+
fn chunk_size_bytes(&self) -> u32 {
203+
self.inner
204+
.options
205+
.chunk_size_bytes
206+
.unwrap_or(DEFAULT_CHUNK_SIZE_BYTES)
207+
}
208+
209+
/// Gets a handle to the files collection for the [`GridFsBucket`].
210+
fn files(&self) -> &Collection<FilesCollectionDocument> {
211+
&self.inner.files
212+
}
213+
214+
/// Gets a handle to the chunks collection for the [`GridFsBucket`].
215+
fn chunks(&self) -> &Collection<Chunk> {
216+
&self.inner.chunks
161217
}
162218

163219
/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to.
@@ -173,19 +229,6 @@ impl GridFsBucket {
173229
todo!()
174230
}
175231

176-
/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to.
177-
/// The driver generates a unique [`Bson::ObjectId`] for the file id.
178-
///
179-
/// Returns a [`GridFsUploadStream`] to which the application will write the contents.
180-
pub async fn open_upload_stream(
181-
&self,
182-
filename: String,
183-
options: impl Into<Option<GridFsUploadOptions>>,
184-
) -> Result<GridFsUploadStream> {
185-
self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options)
186-
.await
187-
}
188-
189232
/// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the
190233
/// `tokio` crate's `AsyncRead` trait for the `source`.
191234
pub async fn upload_from_tokio_reader_with_id(
@@ -244,6 +287,19 @@ impl GridFsBucket {
244287
.await
245288
}
246289

290+
/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to.
291+
/// The driver generates a unique [`Bson::ObjectId`] for the file id.
292+
///
293+
/// Returns a [`GridFsUploadStream`] to which the application will write the contents.
294+
pub async fn open_upload_stream(
295+
&self,
296+
filename: String,
297+
options: impl Into<Option<GridFsUploadOptions>>,
298+
) -> Result<GridFsUploadStream> {
299+
self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options)
300+
.await
301+
}
302+
247303
/// Opens and returns a [`GridFsDownloadStream`] from which the application can read
248304
/// the contents of the stored file specified by `id`.
249305
pub async fn open_download_stream(&self, id: Bson) -> Result<GridFsDownloadStream> {
@@ -261,52 +317,6 @@ impl GridFsBucket {
261317
todo!()
262318
}
263319

264-
/// Downloads the contents of the stored file specified by `id` and writes
265-
/// the contents to the `destination`. Uses the `tokio` crate's `AsyncWrite`
266-
/// trait for the `destination`.
267-
pub async fn download_to_tokio_writer(
268-
&self,
269-
id: Bson,
270-
destination: impl tokio::io::AsyncWrite,
271-
) {
272-
todo!()
273-
}
274-
275-
/// Downloads the contents of the stored file specified by `id` and writes
276-
/// the contents to the `destination`. Uses the `futures-0.3` crate's `AsyncWrite`
277-
/// trait for the `destination`.
278-
pub async fn download_to_futures_0_3_writer(
279-
&self,
280-
id: Bson,
281-
destination: impl futures_util::AsyncWrite,
282-
) {
283-
todo!()
284-
}
285-
286-
/// Downloads the contents of the stored file specified by `filename` and by
287-
/// the revision in `options` and writes the contents to the `destination`. Uses the
288-
/// `tokio` crate's `AsyncWrite` trait for the `destination`.
289-
pub async fn download_to_tokio_writer_by_name(
290-
&self,
291-
filename: String,
292-
destination: impl tokio::io::AsyncWrite,
293-
options: impl Into<Option<GridFsDownloadByNameOptions>>,
294-
) {
295-
todo!()
296-
}
297-
298-
/// Downloads the contents of the stored file specified by `filename` and by
299-
/// the revision in `options` and writes the contents to the `destination`. Uses the
300-
/// `futures-0.3` crate's `AsyncWrite` trait for the `destination`.
301-
pub async fn download_to_futures_0_3_writer_by_name(
302-
&self,
303-
filename: String,
304-
destination: impl futures_util::AsyncWrite,
305-
options: impl Into<Option<GridFsDownloadByNameOptions>>,
306-
) {
307-
todo!()
308-
}
309-
310320
/// Given an `id`, deletes the stored file's files collection document and
311321
/// associated chunks from a [`GridFsBucket`].
312322
pub async fn delete(&self, id: Bson) {

0 commit comments

Comments
 (0)