Skip to content

Commit

Permalink
all but parquet and constellation working on stable
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed Jul 14, 2020
1 parent 001b75d commit 7785643
Show file tree
Hide file tree
Showing 25 changed files with 867 additions and 1,049 deletions.
8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ parquet = ["amadeus-parquet", "amadeus-derive/parquet"]
postgres = ["amadeus-postgres", "amadeus-derive/postgres"]
csv = ["amadeus-serde", "amadeus-derive/serde"]
json = ["amadeus-serde", "amadeus-derive/serde"]
doc = ["amadeus-core/doc"]
nightly = ["amadeus-core/nightly"]

[package.metadata.docs.rs]
features = ["doc", "constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"]
features = ["nightly", "constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"]

[dependencies]
amadeus-core = { version = "=0.3.1", path = "amadeus-core" }
Expand All @@ -53,7 +52,7 @@ futures = "0.3"
num_cpus = "1.13"
pin-project = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_closure = { version = "0.2", default-features = false }
serde_closure = "0.3"
serde_traitobject = { version = "0.2", optional = true }
tokio = { version = "0.2", features = ["rt-threaded", "rt-util", "blocking"] }

Expand All @@ -67,6 +66,9 @@ tokio = { version = "0.2", features = ["macros", "time"] }
[patch.crates-io]
vec-utils = { version = "*", git = "https://github.com/alecmocatta/vec-utils", branch = "stable" }
streaming_algorithms = { version = "*", git = "https://github.com/alecmocatta/streaming_algorithms", branch = "stable" }
serde_closure = { version = "*", git = "https://github.com/alecmocatta/serde_closure", branch = "nameable" }
# serde_closure = { version = "*", path = "../serde_closure" }
# serde_traitobject = { version = "*", path = "../serde_traitobject" }

[[example]]
name = "cloudfront_logs"
Expand Down
5 changes: 4 additions & 1 deletion amadeus-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ edition = "2018"
azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" }
maintenance = { status = "actively-developed" }

[features]
nightly = []

[dependencies]
amadeus-core = { version = "=0.3.1", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.1", path = "../amadeus-types" }
Expand All @@ -31,7 +34,7 @@ once_cell = "1.0"
rusoto_core = "0.44"
rusoto_credential = "0.44"
rusoto_s3 = "0.44"
serde_closure = { version = "0.2", default-features = false }
serde_closure = "0.3"
serde = { version = "1.0", features = ["derive"] }
tokio = "0.2"
url = { version = "2.1", features = ["serde"] }
Expand Down
130 changes: 69 additions & 61 deletions amadeus-aws/src/cloudfront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

use async_compression::futures::bufread::GzipDecoder;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
use futures::{future, io::BufReader, AsyncBufReadExt, FutureExt, StreamExt, TryStreamExt};
use futures::{future, io::BufReader, AsyncBufReadExt, FutureExt, Stream, StreamExt, TryStreamExt};
use http::{Method, StatusCode};
use rusoto_s3::{GetObjectRequest, Object, S3Client, S3};
use serde::{Deserialize, Serialize};
use serde_closure::FnMut;
use serde_closure::FnMutNamed;
use std::{
convert::identity, io::{self}, time::Duration
};
Expand Down Expand Up @@ -52,20 +52,76 @@ impl Cloudfront {
})
}
}

#[cfg(not(feature = "nightly"))]
type Output = std::pin::Pin<Box<dyn Stream<Item = Result<CloudfrontRow, AwsError>> + Send>>;
#[cfg(feature = "nightly")]
type Output = impl Stream<Item = Result<CloudfrontRow, AwsError>> + Send;

FnMutNamed! {
pub type Closure<> = |self, credentials: AwsCredentials, region: AwsRegion, bucket: String|key=> String| -> Output where {
let (credentials, region, bucket) =
(self.credentials.clone(), self.region.clone(), self.bucket.clone());
#[allow(clippy::let_and_return)]
let ret = async move {
let client = S3Client::new_with(
Ref(once_cell::sync::Lazy::force(&RUSOTO_DISPATCHER)),
credentials,
region,
);
let rows = retry(|| {
client.get_object(GetObjectRequest {
bucket: bucket.clone(),
key: key.clone(),
..GetObjectRequest::default()
})
})
.await
.map_err(AwsError::from)
.map(|res| {
let body = BufReader::new(TryStreamExt::into_async_read(res.body.unwrap()));
let mut body = GzipDecoder::new(body); // Content-Encoding isn't set, so decode manually
body.multiple_members(true);
BufReader::new(body)
.lines()
.filter(|x: &Result<String, io::Error>| {
future::ready(if let Ok(x) = x {
x.chars().find(|x| !x.is_whitespace()) != Some('#')
} else {
true
})
})
.then(|x: Result<String, io::Error>| async {
if let Ok(x) = x {
Ok(CloudfrontRow::from_line(&x))
} else {
Err(AwsError::from(x.err().unwrap()))
}
})
});
ResultExpandIter::new(rows)
}
.flatten_stream()
.map(|x: Result<Result<CloudfrontRow, _>, _>| x.and_then(identity));
#[cfg(not(feature = "nightly"))]
let ret = ret.boxed();
ret
}
}

impl Source for Cloudfront {
type Item = CloudfrontRow;
type Error = AwsError;

#[cfg(not(doc))]
type ParStream =
impl amadeus_core::par_stream::ParallelStream<Item = Result<Self::Item, Self::Error>>;
#[cfg(doc)]
type ParStream =
DistParStream<amadeus_core::util::ImplDistributedStream<Result<Self::Item, Self::Error>>>;
#[cfg(not(doc))]
type ParStream = DistParStream<Self::DistStream>;
#[cfg(not(feature = "nightly"))]
#[allow(clippy::type_complexity)]
type DistStream = amadeus_core::par_stream::FlatMap<
amadeus_core::into_par_stream::IterDistStream<std::vec::IntoIter<String>>,
Closure,
>;
#[cfg(feature = "nightly")]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;
#[cfg(doc)]
type DistStream = amadeus_core::util::ImplDistributedStream<Result<Self::Item, Self::Error>>;

fn par_stream(self) -> Self::ParStream {
DistParStream::new(self.dist_stream())
Expand All @@ -78,57 +134,9 @@ impl Source for Cloudfront {
objects,
credentials,
} = self;
let ret = objects
objects
.into_dist_stream()
.flat_map(FnMut!(move |key: String| {
let (credentials, region, bucket) =
(credentials.clone(), region.clone(), bucket.clone());
async move {
let client = S3Client::new_with(
Ref(once_cell::sync::Lazy::force(&RUSOTO_DISPATCHER)),
credentials,
region,
);
let rows = retry(|| {
client.get_object(GetObjectRequest {
bucket: bucket.clone(),
key: key.clone(),
..GetObjectRequest::default()
})
})
.await
.map_err(AwsError::from)
.map(|res| {
let body = BufReader::new(TryStreamExt::into_async_read(res.body.unwrap()));
let mut body = GzipDecoder::new(body); // Content-Encoding isn't set, so decode manually
body.multiple_members(true);
BufReader::new(body)
.lines()
.filter(|x: &Result<String, io::Error>| {
future::ready(if let Ok(x) = x {
x.chars().find(|x| !x.is_whitespace()) != Some('#')
} else {
true
})
})
.then(|x: Result<String, io::Error>| async {
if let Ok(x) = x {
Ok(CloudfrontRow::from_line(&x))
} else {
Err(AwsError::from(x.err().unwrap()))
}
})
});
ResultExpandIter::new(rows)
}
.flatten_stream()
}))
.map(FnMut!(
|x: Result<Result<CloudfrontRow, _>, _>| x.and_then(self::identity)
));
#[cfg(doc)]
let ret = amadeus_core::util::ImplDistributedStream::new(ret);
ret
.flat_map(Closure::new(credentials, region, bucket))
}
}

Expand Down
2 changes: 1 addition & 1 deletion amadeus-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).
#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.1")]
#![feature(type_alias_impl_trait)]
#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
// missing_debug_implementations,
Expand Down
5 changes: 4 additions & 1 deletion amadeus-commoncrawl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ edition = "2018"
azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "26" }
maintenance = { status = "actively-developed" }

[features]
nightly = []

[dependencies]
amadeus-core = { version = "=0.3.1", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.1", path = "../amadeus-types" }
Expand All @@ -28,7 +31,7 @@ pin-project = "0.4"
reqwest = "0.10"
reqwest_resume = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_closure = { version = "0.2", default-features = false }
serde_closure = "0.3"
url = { version = "2.1", features = ["serde"] }

# dependency of reqwest/native-tls; ensure it's vendored to simplify cross-compilation
Expand Down
66 changes: 37 additions & 29 deletions amadeus-commoncrawl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).
#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.1")]
#![feature(type_alias_impl_trait)]
#![cfg_attr(feature = "nightly", feature(type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
// missing_debug_implementations,
Expand All @@ -30,9 +30,9 @@ mod commoncrawl;
mod parser;

use async_compression::futures::bufread::GzipDecoder; // TODO: use stream or https://github.com/alexcrichton/flate2-rs/pull/214
use futures::{io::BufReader, AsyncBufReadExt, FutureExt, StreamExt, TryStreamExt};
use futures::{io::BufReader, AsyncBufReadExt, FutureExt, Stream, StreamExt, TryStreamExt};
use reqwest_resume::ClientExt;
use serde_closure::FnMut;
use serde_closure::FnMutNamed;
use std::{io, time};

use amadeus_core::{
Expand Down Expand Up @@ -80,30 +80,15 @@ impl CommonCrawl {
}
}

impl Source for CommonCrawl {
type Item = Webpage<'static>;
type Error = io::Error;
#[cfg(not(feature = "nightly"))]
type Output = std::pin::Pin<Box<dyn Stream<Item = Result<Webpage<'static>, io::Error>> + Send>>;
#[cfg(feature = "nightly")]
type Output = impl Stream<Item = Result<Webpage<'static>, io::Error>> + Send;

#[cfg(not(doc))]
type ParStream =
impl amadeus_core::par_stream::ParallelStream<Item = Result<Self::Item, Self::Error>>;
#[cfg(doc)]
type ParStream =
DistParStream<amadeus_core::util::ImplDistributedStream<Result<Self::Item, Self::Error>>>;
#[cfg(not(doc))]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;
#[cfg(doc)]
type DistStream = amadeus_core::util::ImplDistributedStream<Result<Self::Item, Self::Error>>;

fn par_stream(self) -> Self::ParStream {
DistParStream::new(self.dist_stream())
}
#[allow(clippy::let_and_return)]
fn dist_stream(self) -> Self::DistStream {
let ret = self
.urls
.into_dist_stream()
.flat_map(FnMut!(|url: String| async move {
FnMutNamed! {
pub type Closure<> = |self|url=> String| -> Output where {
#[allow(clippy::let_and_return)]
let ret = async move {
let body = reqwest_resume::get(url.parse().unwrap()).await.unwrap();
let body = body
.bytes_stream()
Expand All @@ -113,9 +98,32 @@ impl Source for CommonCrawl {
body.multiple_members(true);
WarcParser::new(body)
}
.flatten_stream()));
#[cfg(doc)]
let ret = amadeus_core::util::ImplDistributedStream::new(ret);
.flatten_stream();
#[cfg(not(feature = "nightly"))]
let ret = ret.boxed();
ret
}
}

impl Source for CommonCrawl {
type Item = Webpage<'static>;
type Error = io::Error;

type ParStream = DistParStream<Self::DistStream>;
#[cfg(not(feature = "nightly"))]
#[allow(clippy::type_complexity)]
type DistStream = amadeus_core::par_stream::FlatMap<
amadeus_core::into_par_stream::IterDistStream<std::vec::IntoIter<String>>,
Closure,
>;
#[cfg(feature = "nightly")]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;

fn par_stream(self) -> Self::ParStream {
DistParStream::new(self.dist_stream())
}
#[allow(clippy::let_and_return)]
fn dist_stream(self) -> Self::DistStream {
self.urls.into_dist_stream().flat_map(Closure::new())
}
}
4 changes: 2 additions & 2 deletions amadeus-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2
maintenance = { status = "actively-developed" }

[features]
doc = ["serde_closure/nightly"] # this cleans up Fn*() in docs
# uses Fn*() sugar (better docs), and SIMD for streaming_algorithms
nightly = ["streaming_algorithms/nightly"]

[dependencies]
Expand All @@ -34,7 +34,7 @@ pin-project = "0.4"
rand = "0.7"
replace_with = "0.1"
serde = { version = "1.0", features = ["derive"] }
serde_closure = { version = "0.2", default-features = false }
serde_closure = "0.3"
streaming_algorithms = "0.2"
sum = { version = "0.1", features = ["futures", "serde"] }
walkdir = "2.2"
Expand Down
6 changes: 3 additions & 3 deletions amadeus-core/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ pub trait Partition: Clone + fmt::Debug + ProcessSend + 'static {
}
#[allow(clippy::len_without_is_empty)]
#[async_trait]
pub trait Page {
pub trait Page: Send {
type Error: Error + Clone + PartialEq + Into<io::Error> + ProcessSend + 'static;

fn len(&self) -> u64;
Expand All @@ -228,7 +228,7 @@ pub trait Page {
#[async_trait]
impl<T: ?Sized> Page for &T
where
T: Page,
T: Page + Sync,
{
type Error = T::Error;

Expand All @@ -248,7 +248,7 @@ where
#[async_trait]
impl<T: ?Sized> Page for Arc<T>
where
T: Page,
T: Page + Sync,
{
type Error = T::Error;

Expand Down
1 change: 1 addition & 0 deletions amadeus-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. All functionality is re-exposed in [`amadeus`](https://docs.rs/amadeus/0.3/amadeus/).
#![doc(html_root_url = "https://docs.rs/amadeus-core/0.3.1")]
#![cfg_attr(feature = "nightly", feature(unboxed_closures))]
#![recursion_limit = "25600"]
#![warn(
// missing_copy_implementations,
Expand Down
Loading

0 comments on commit 7785643

Please sign in to comment.