Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve compile times; rename Timestamp -> DateTime #7

Merged
merged 18 commits into from
Oct 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "amadeus"
version = "0.1.2"
version = "0.1.3"
license = "Apache-2.0"
authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -14,7 +14,7 @@ parquet postgres aws s3 cloudfront elb json csv logs hadoop hdfs arrow common cr
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus/0.1.2"
documentation = "https://docs.rs/amadeus/0.1.3"
readme = "README.md"
edition = "2018"

Expand All @@ -30,23 +30,25 @@ parquet = ["amadeus-parquet", "amadeus-derive/parquet"]
postgres = ["amadeus-postgres", "amadeus-derive/postgres"]
csv = ["amadeus-serde", "amadeus-derive/serde"]
json = ["amadeus-serde", "amadeus-derive/serde"]
doc = ["amadeus-aws/doc", "amadeus-commoncrawl/doc", "amadeus-parquet/doc", "amadeus-postgres/doc", "amadeus-serde/doc"]

[package.metadata.docs.rs]
features = ["constellation", "aws", "commoncrawl", "postgres", "csv", "json"]
features = ["doc", "constellation", "aws", "commoncrawl", "postgres", "csv", "json"]

[dependencies]
amadeus-core = { version = "=0.1.2", path = "amadeus-core" }
amadeus-derive = { version = "=0.1.2", path = "amadeus-derive" }
amadeus-types = { version = "=0.1.2", path = "amadeus-types" }
amadeus-aws = { version = "=0.1.2", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.1.2", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.1.2", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.1.2", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.1.2", path = "amadeus-serde", optional = true }
amadeus-core = { version = "=0.1.3", path = "amadeus-core" }
amadeus-derive = { version = "=0.1.3", path = "amadeus-derive" }
amadeus-types = { version = "=0.1.3", path = "amadeus-types" }
amadeus-aws = { version = "=0.1.3", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.1.3", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.1.3", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.1.3", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.1.3", path = "amadeus-serde", optional = true }
constellation-rs = { version = "0.1", default-features = false, optional = true }
futures-preview = "=0.3.0-alpha.18"
futures-preview = "=0.3.0-alpha.19"
pin-utils = "0.1.0-alpha.4"
serde = { version = "1.0", features = ["derive"] }
serde_closure = "0.1"
serde_closure = "0.2"
serde_traitobject = "0.1.6"

# pin; broken for some reason
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
</p>

<p align="center">
Harmonious distributed data analysis in Rust
Harmonious distributed data processing & analysis in Rust
</p>

<p align="center">
Expand All @@ -13,7 +13,7 @@
</p>

<p align="center">
<a href="https://docs.rs/amadeus/0.1.2">Docs</a>
<a href="https://docs.rs/amadeus/0.1.3">Docs</a>
</p>


Expand Down
19 changes: 13 additions & 6 deletions amadeus-aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-aws"
version = "0.1.2"
version = "0.1.3"
license = "Apache-2.0"
authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -10,26 +10,33 @@ Harmonious distributed data analysis in Rust.
"""
repository = "https://github.com/alecmocatta/amadeus"
homepage = "https://github.com/alecmocatta/amadeus"
documentation = "https://docs.rs/amadeus/0.1.2"
documentation = "https://docs.rs/amadeus/0.1.3"
readme = "README.md"
edition = "2018"

[badges]
azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests" }
maintenance = { status = "actively-developed" }

[features]
doc = []

[dependencies]
amadeus-core = { version = "=0.1.2", path = "../amadeus-core" }
chrono = "0.4"
amadeus-core = { version = "=0.1.3", path = "../amadeus-core" }
amadeus-types = { version = "=0.1.3", path = "../amadeus-types" }
chrono = { version = "0.4", default-features = false }
flate2 = "1.0"
futures-01 = { package = "futures", version = "0.1" }
futures-preview = { version = "=0.3.0-alpha.18", features = ["compat"] }
futures-preview = { version = "=0.3.0-alpha.19", features = ["compat"] }
http = "0.1"
once_cell = "1.0"
rusoto_core = "0.40"
rusoto_s3 = "0.40"
serde_closure = "0.1"
serde_closure = "0.2"
serde = { version = "1.0", features = ["derive"] }
tokio = "0.1.7"
tokio-retry = "0.2"
url = { version = "2.1", features = ["serde"] }

# dependency of rusoto_core/hyper-tls/native-tls; ensure it's vendored to simplify cross-compilation
openssl = { version = "0.10", features = ["vendored"] }
132 changes: 60 additions & 72 deletions amadeus-aws/src/cloudfront.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,21 @@
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
use flate2::read::MultiGzDecoder;
use http::{Method, StatusCode};

use rusoto_core::RusotoError;
use rusoto_s3::{GetObjectRequest, Object, S3Client, S3};
use serde::{Deserialize, Serialize};
use serde_closure::*;
use std::{
convert::identity, io::{self, BufRead, BufReader}, iter, net, time::Duration, vec
convert::identity, io::{self, BufRead, BufReader}, iter, time::Duration
};
use url::Url;

use amadeus_core::{
dist_iter::DistributedIterator, into_dist_iter::IntoDistributedIterator, util::ResultExpand, Source
};
use amadeus_types::{DateTime, IpAddr, Url};

use super::{block_on_01, list, retry, AwsError, AwsRegion};

type Closure<Env, Args, Output> =
serde_closure::FnMut<Env, for<'r> fn(&'r mut Env, Args) -> Output>;

type CloudfrontInner = amadeus_core::dist_iter::Map<
amadeus_core::dist_iter::FlatMap<
amadeus_core::into_dist_iter::IterIter<vec::IntoIter<String>>,
Closure<
(String, AwsRegion),
(String,),
ResultExpand<
iter::Map<
iter::Filter<
io::Lines<BufReader<MultiGzDecoder<Box<dyn io::Read + Send>>>>,
serde_closure::FnMut<
(),
for<'r, 'a> fn(&'r mut (), (&'a Result<String, io::Error>,)) -> bool,
>,
>,
Closure<(), (Result<String, io::Error>,), Result<CloudfrontRow, AwsError>>,
>,
AwsError,
>,
>,
>,
Closure<
(),
(Result<Result<CloudfrontRow, AwsError>, AwsError>,),
Result<CloudfrontRow, AwsError>,
>,
>;

pub struct Cloudfront {
region: AwsRegion,
bucket: String,
Expand All @@ -73,53 +42,71 @@ impl Source for Cloudfront {
type Item = CloudfrontRow;
type Error = AwsError;

// type DistIter = impl DistributedIterator<Item = Result<CloudfrontRow, AwsError>>; //, <Self as super::super::DistributedIterator>::Task: Serialize + for<'de> Deserialize<'de>
type DistIter = CloudfrontInner;
type Iter = iter::Empty<Result<CloudfrontRow, AwsError>>;
#[cfg(not(feature = "doc"))]
type DistIter = impl DistributedIterator<Item = Result<Self::Item, Self::Error>>;
#[cfg(feature = "doc")]
type DistIter = amadeus_core::util::ImplDistributedIterator<Result<Self::Item, Self::Error>>;
type Iter = iter::Empty<Result<Self::Item, Self::Error>>;

#[allow(clippy::let_and_return)]
fn dist_iter(self) -> Self::DistIter {
let Self {
bucket,
region,
objects,
} = self;
objects
let ret = objects
.into_dist_iter()
.flat_map(FnMut!([bucket, region] move |key:String| {
.flat_map(FnMut!(move |key: String| {
let client = S3Client::new(region.clone());
ResultExpand(
block_on_01(retry(||client
.get_object(GetObjectRequest {
bucket: bucket.clone(),
key: key.clone(),
..GetObjectRequest::default()
})
loop {
match self::block_on_01(self::retry(|| {
client.get_object(GetObjectRequest {
bucket: bucket.clone(),
key: key.clone(),
..GetObjectRequest::default()
})
})) {
Err(RusotoError::HttpDispatch(_)) => continue,
Err(RusotoError::Unknown(response))
if response.status.is_server_error() =>
{
continue
}
res => break res,
}
}
.map_err(AwsError::from)
.map(|res| {
let body = res.body.unwrap().into_blocking_read();
BufReader::new(MultiGzDecoder::new(
Box::new(body) as Box<dyn io::Read + Send>
))
.map_err(AwsError::from)
.map(|res| {
let body = res.body.unwrap().into_blocking_read();
BufReader::new(MultiGzDecoder::new(Box::new(body) as Box<dyn io::Read + Send>))
.lines()
.filter(FnMut!(|x:&Result<String,io::Error>| {
if let Ok(x) = x {
x.chars().filter(|x| !x.is_whitespace()).nth(0) != Some('#')
} else {
true
}
}))
.map(FnMut!(|x:Result<String,io::Error>| {
if let Ok(x) = x {
Ok(CloudfrontRow::from_line(&x))
} else {
Err(AwsError::from(x.err().unwrap()))
}
}))
}),
.lines()
.filter(|x: &Result<String, io::Error>| {
if let Ok(x) = x {
x.chars().filter(|x| !x.is_whitespace()).nth(0) != Some('#')
} else {
true
}
})
.map(|x: Result<String, io::Error>| {
if let Ok(x) = x {
Ok(CloudfrontRow::from_line(&x))
} else {
Err(AwsError::from(x.err().unwrap()))
}
})
}),
)
}))
.map(FnMut!(
|x: Result<Result<CloudfrontRow, _>, _>| x.and_then(identity)
))
|x: Result<Result<CloudfrontRow, _>, _>| x.and_then(self::identity)
));
#[cfg(feature = "doc")]
let ret = amadeus_core::util::ImplDistributedIterator::new(ret);
ret
}

fn iter(self) -> Self::Iter {
Expand All @@ -129,10 +116,10 @@ impl Source for Cloudfront {

#[derive(Clone, Eq, PartialEq, Serialize, Deserialize, Debug)]
pub struct CloudfrontRow {
pub time: DateTime<Utc>,
pub time: DateTime,
pub edge_location: String,
pub response_bytes: u64,
pub remote_ip: net::IpAddr,
pub remote_ip: IpAddr,
#[serde(with = "http_serde")]
pub method: Method,
pub host: String,
Expand All @@ -154,6 +141,7 @@ pub struct CloudfrontRow {
pub fle_encrypted_fields: Option<String>,
}
impl CloudfrontRow {
#[inline(always)]
fn from_line(line: &str) -> Self {
let mut values = line.split('\t');
let date = values.next().unwrap();
Expand Down Expand Up @@ -183,10 +171,10 @@ impl CloudfrontRow {
let fle_status = values.next().unwrap();
let fle_encrypted_fields = values.next().unwrap();
assert_eq!(values.next(), None);
let time = Utc.from_utc_datetime(&NaiveDateTime::new(
let time = DateTime::from_chrono(&Utc.from_utc_datetime(&NaiveDateTime::new(
NaiveDate::parse_from_str(&date, "%Y-%m-%d").unwrap(),
NaiveTime::parse_from_str(&time, "%H:%M:%S").unwrap(),
));
)));
let status = if sc_status != "000" {
Some(StatusCode::from_bytes(sc_status.as_bytes()).unwrap())
} else {
Expand Down
12 changes: 8 additions & 4 deletions amadeus-aws/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,14 @@ impl Page for S3Page {
key: self.key.clone(),
range: Some(format!("bytes={}-{}", start, end)),
..GetObjectRequest::default()
}))
.await;
if let Err(RusotoError::HttpDispatch(_)) = res {
continue;
}));
let res = res.await;
match res {
Err(RusotoError::HttpDispatch(_)) => continue,
Err(RusotoError::Unknown(response)) if response.status.is_server_error() => {
continue
}
_ => (),
}
let mut read = res.unwrap().body.unwrap().into_async_read();
while len - cursor.position() > 0 {
Expand Down
8 changes: 4 additions & 4 deletions amadeus-aws/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.1.2")]
#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.1.3")]
#![feature(type_alias_impl_trait)]

mod cloudfront;
mod file;

use futures::future::FutureExt;

use once_cell::sync::Lazy;
use rusoto_core::RusotoError;
use rusoto_s3::{GetObjectError, ListObjectsV2Error, ListObjectsV2Request, Object, S3Client, S3};

use std::{
cell::RefCell, error, fmt::{self, Display}, future::Future, io, iter, mem::transmute, ops::FnMut
};
use tokio::runtime::Runtime;

use amadeus_core::util::{IoError, ResultExpand};

#[doc(inline)]
pub use cloudfront::{Cloudfront, CloudfrontRow};
#[doc(inline)]
pub use file::{S3Directory, S3File};

#[doc(inline)]
pub use rusoto_core::Region as AwsRegion;

Expand Down
Loading