Skip to content

Commit

Permalink
make the file trait more portable
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed Jul 19, 2020
1 parent 45e6207 commit 55a15e3
Show file tree
Hide file tree
Showing 8 changed files with 522 additions and 202 deletions.
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,18 @@ serde_closure = "0.3"
serde_traitobject = { version = "0.2", optional = true }
tokio = { version = "0.2", features = ["rt-threaded", "rt-util", "blocking"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
web-sys = { version = "0.3", features = ["Blob", "Performance", "Response", "Window"] }

[dev-dependencies]
either = { version = "1.5", features = ["serde"] }
rand = "0.7"
serde_json = "1.0"
streaming_algorithms = "0.3"
tokio = { version = "0.2", features = ["macros", "time"] }
wasm-bindgen-test = "0.3"

[build-dependencies]
rustversion = "1.0"
Expand Down
18 changes: 10 additions & 8 deletions amadeus-aws/src/file.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use async_trait::async_trait;
use futures::future::BoxFuture;
use futures::{future, future::LocalBoxFuture, FutureExt};
use rusoto_core::RusotoError;
use rusoto_s3::{GetObjectRequest, HeadObjectRequest, S3Client, S3};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -227,17 +227,17 @@ impl S3Page {
impl Page for S3Page {
type Error = IoError;

fn len(&self) -> u64 {
self.inner.len
fn len(&self) -> LocalBoxFuture<'static, Result<u64, Self::Error>> {
future::ready(Ok(self.inner.len)).boxed_local()
}
fn set_len(&self, _len: u64) -> Result<(), Self::Error> {
todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/61")
}
fn read(&self, offset: u64, len: usize) -> BoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
fn read(
&self, offset: u64, len: usize,
) -> LocalBoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
let self_ = S3Page {
inner: self.inner.clone(),
};
Box::pin(async move {
let len = len.min(usize::try_from(self_.inner.len.saturating_sub(offset)).unwrap());
let mut buf_ = vec![0; len].into_boxed_slice();
let mut buf = &mut *buf_;
let len: u64 = len.try_into().unwrap();
Expand Down Expand Up @@ -279,7 +279,9 @@ impl Page for S3Page {
Ok(buf_)
})
}
fn write(&self, _offset: u64, _buf: Box<[u8]>) -> BoxFuture<'static, Result<(), Self::Error>> {
fn write(
&self, _offset: u64, _buf: Box<[u8]>,
) -> LocalBoxFuture<'static, Result<(), Self::Error>> {
todo!("Tracking at https://github.com/constellation-rs/amadeus/issues/61")
}
}
7 changes: 7 additions & 0 deletions amadeus-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,15 @@ serde = { version = "1.0", features = ["derive"] }
serde_closure = "0.3"
streaming_algorithms = "0.3"
sum = { version = "0.1", features = ["futures", "serde"] }
tokio = "0.2"
walkdir = "2.2"
widestring = "0.4"

[target.'cfg(target_arch = "wasm32")'.dependencies]
js-sys = "0.3"
wasm-bindgen = "0.2"
wasm-bindgen-futures = "0.4"
web-sys = { version = "0.3", features = ["Blob", "Response", "Window"] }

[build-dependencies]
rustversion = "1.0"
65 changes: 30 additions & 35 deletions amadeus-core/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
mod local;

use async_trait::async_trait;
use futures::{future::BoxFuture, ready};
use futures::{future::LocalBoxFuture, ready};
use pin_project::pin_project;
use std::{
convert::TryFrom, error::Error, ffi, fmt, future::Future, io, pin::Pin, sync::Arc, task::{Context, Poll}
Expand Down Expand Up @@ -208,14 +208,16 @@ pub trait Partition: Clone + fmt::Debug + ProcessSend + 'static {
async fn pages(self) -> Result<Vec<Self::Page>, Self::Error>;
}
#[allow(clippy::len_without_is_empty)]
#[async_trait]
pub trait Page: Send {
pub trait Page {
type Error: Error + Clone + PartialEq + Into<io::Error> + ProcessSend + 'static;

fn len(&self) -> u64;
fn set_len(&self, len: u64) -> Result<(), Self::Error>;
fn read(&self, offset: u64, len: usize) -> BoxFuture<'static, Result<Box<[u8]>, Self::Error>>;
fn write(&self, offset: u64, buf: Box<[u8]>) -> BoxFuture<'static, Result<(), Self::Error>>;
fn len(&self) -> LocalBoxFuture<'static, Result<u64, Self::Error>>;
fn read(
&self, offset: u64, len: usize,
) -> LocalBoxFuture<'static, Result<Box<[u8]>, Self::Error>>;
fn write(
&self, offset: u64, buf: Box<[u8]>,
) -> LocalBoxFuture<'static, Result<(), Self::Error>>;

fn reader(self) -> Reader<Self>
where
Expand All @@ -225,43 +227,43 @@ pub trait Page: Send {
}
}

#[async_trait]
impl<T: ?Sized> Page for &T
where
T: Page + Sync,
T: Page,
{
type Error = T::Error;

fn len(&self) -> u64 {
fn len(&self) -> LocalBoxFuture<'static, Result<u64, Self::Error>> {
(**self).len()
}
fn set_len(&self, len: u64) -> Result<(), Self::Error> {
(**self).set_len(len)
}
fn read(&self, offset: u64, len: usize) -> BoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
fn read(
&self, offset: u64, len: usize,
) -> LocalBoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
(**self).read(offset, len)
}
fn write(&self, offset: u64, buf: Box<[u8]>) -> BoxFuture<'static, Result<(), Self::Error>> {
fn write(
&self, offset: u64, buf: Box<[u8]>,
) -> LocalBoxFuture<'static, Result<(), Self::Error>> {
(**self).write(offset, buf)
}
}
#[async_trait]
impl<T: ?Sized> Page for Arc<T>
where
T: Page + Sync,
T: Page,
{
type Error = T::Error;

fn len(&self) -> u64 {
fn len(&self) -> LocalBoxFuture<'static, Result<u64, Self::Error>> {
(**self).len()
}
fn set_len(&self, len: u64) -> Result<(), Self::Error> {
(**self).set_len(len)
}
fn read(&self, offset: u64, len: usize) -> BoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
fn read(
&self, offset: u64, len: usize,
) -> LocalBoxFuture<'static, Result<Box<[u8]>, Self::Error>> {
(**self).read(offset, len)
}
fn write(&self, offset: u64, buf: Box<[u8]>) -> BoxFuture<'static, Result<(), Self::Error>> {
fn write(
&self, offset: u64, buf: Box<[u8]>,
) -> LocalBoxFuture<'static, Result<(), Self::Error>> {
(**self).write(offset, buf)
}
}
Expand All @@ -274,8 +276,7 @@ where
#[pin]
page: P,
#[pin]
pending: Option<BoxFuture<'static, Result<Box<[u8]>, P::Error>>>,
pending_len: Option<usize>,
pending: Option<LocalBoxFuture<'static, Result<Box<[u8]>, P::Error>>>,
offset: u64,
}
#[allow(clippy::len_without_is_empty)]
Expand All @@ -287,13 +288,9 @@ where
Self {
page,
pending: None,
pending_len: None,
offset: 0,
}
}
pub fn len(&self) -> u64 {
self.page.len()
}
}
impl<P> futures::io::AsyncRead for Reader<P>
where
Expand All @@ -305,22 +302,20 @@ where
let mut self_ = self.project();
if self_.pending.is_none() {
let start = *self_.offset;
let len = usize::try_from((self_.page.len() - start).min(buf.len() as u64)).unwrap();
let len = buf.len();
let len = len.min(PAGE_SIZE);
let pending = self_.page.read(start, len);
*self_.pending = Some(pending);
*self_.pending_len = Some(len);
}
let ret = ready!(self_.pending.as_mut().as_pin_mut().unwrap().poll(cx));
*self_.pending = None;
let len = self_.pending_len.take().unwrap();
let ret = ret
.map(|buf_| {
buf[..len].copy_from_slice(&buf_);
len
buf[..buf_.len()].copy_from_slice(&buf_);
buf_.len()
})
.map_err(Into::into);
*self_.offset += u64::try_from(len).unwrap();
*self_.offset += u64::try_from(ret.as_ref().ok().cloned().unwrap_or(0)).unwrap();
Poll::Ready(ret)
}
}
Expand Down
Loading

0 comments on commit 55a15e3

Please sign in to comment.