Skip to content

Commit

Permalink
adding stats functions
Browse files Browse the repository at this point in the history
  • Loading branch information
kevloui committed Aug 2, 2020
1 parent c2f0e8f commit fb9eae7
Show file tree
Hide file tree
Showing 104 changed files with 784 additions and 293 deletions.
9 changes: 7 additions & 2 deletions amadeus-aws/src/cloudfront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@ use rusoto_s3::{GetObjectRequest, Object, S3Client, S3};
use serde::{Deserialize, Serialize};
use serde_closure::FnMutNamed;
use std::{
convert::identity, io::{self}, time::Duration
convert::identity,
io::{self},
time::Duration,
};

use amadeus_core::{
into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::{DistParStream, ResultExpandIter}, Source
into_par_stream::IntoDistributedStream,
par_stream::DistributedStream,
util::{DistParStream, ResultExpandIter},
Source,
};
use amadeus_types::{Data, DateTime, IpAddr, Url};

Expand Down
6 changes: 4 additions & 2 deletions amadeus-aws/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use rusoto_core::RusotoError;
use rusoto_s3::{GetObjectRequest, HeadObjectRequest, S3Client, S3};
use serde::{Deserialize, Serialize};
use std::{
convert::{TryFrom, TryInto}, sync::Arc
convert::{TryFrom, TryInto},
sync::Arc,
};
use tokio::io::AsyncReadExt;

use amadeus_core::{
file::{Directory, File, Page, Partition, PathBuf}, util::IoError
file::{Directory, File, Page, Partition, PathBuf},
util::IoError,
};

use super::{retry, AwsCredentials, AwsError, AwsRegion, Ref, RUSOTO_DISPATCHER};
Expand Down
12 changes: 10 additions & 2 deletions amadeus-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,21 @@ use async_trait::async_trait;
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
use once_cell::sync::Lazy;
use rusoto_core::{
credential::StaticProvider, request::{DispatchSignedRequest, DispatchSignedRequestFuture, HttpClient}, signature::SignedRequest, RusotoError
credential::StaticProvider,
request::{DispatchSignedRequest, DispatchSignedRequestFuture, HttpClient},
signature::SignedRequest,
RusotoError,
};
use rusoto_credential::{CredentialsError, DefaultCredentialsProvider, ProvideAwsCredentials};
use rusoto_s3::{GetObjectError, ListObjectsV2Error, ListObjectsV2Request, Object, S3Client, S3};
use serde::{Deserialize, Serialize};
use std::{
error, fmt::{self, Display}, future::Future, io, ops::FnMut, time::Duration
error,
fmt::{self, Display},
future::Future,
io,
ops::FnMut,
time::Duration,
};

use amadeus_core::util::{IoError, ResultExpand};
Expand Down
7 changes: 6 additions & 1 deletion amadeus-commoncrawl/src/commoncrawl.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use futures::{pin_mut, ready, AsyncRead, AsyncReadExt, Stream};
use pin_project::pin_project;
use std::{
borrow::Cow, future::Future, io::{self, Read}, iter, pin::Pin, task::{Context, Poll}
borrow::Cow,
future::Future,
io::{self, Read},
iter,
pin::Pin,
task::{Context, Poll},
};
use url::Url;

Expand Down
3 changes: 2 additions & 1 deletion amadeus-commoncrawl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ use serde_closure::FnMutNamed;
use std::{io, time};

use amadeus_core::{
into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::DistParStream, Source
into_par_stream::IntoDistributedStream, par_stream::DistributedStream, util::DistParStream,
Source,
};
use amadeus_types::Webpage;

Expand Down
9 changes: 8 additions & 1 deletion amadeus-core/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@ use async_trait::async_trait;
use futures::{future::LocalBoxFuture, ready};
use pin_project::pin_project;
use std::{
convert::TryFrom, error::Error, ffi, fmt, future::Future, io, pin::Pin, sync::Arc, task::{Context, Poll}
convert::TryFrom,
error::Error,
ffi, fmt,
future::Future,
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use widestring::U16String;

Expand Down
17 changes: 14 additions & 3 deletions amadeus-core/src/file/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
use async_trait::async_trait;
use futures::{future, future::LocalBoxFuture, stream, FutureExt, StreamExt, TryStreamExt};
use std::{
ffi::{OsStr, OsString}, fs, future::Future, io, path::{Path, PathBuf}, sync::Arc
ffi::{OsStr, OsString},
fs,
future::Future,
io,
path::{Path, PathBuf},
sync::Arc,
};
use walkdir::WalkDir;

Expand All @@ -13,11 +18,17 @@ use std::os::unix::fs::FileExt;
use std::os::windows::fs::FileExt;
#[cfg(target_arch = "wasm32")]
use {
futures::lock::Mutex, js_sys::{ArrayBuffer, Uint8Array}, std::convert::TryFrom, wasm_bindgen::{JsCast, JsValue}, wasm_bindgen_futures::JsFuture, web_sys::{Blob, Response}
futures::lock::Mutex,
js_sys::{ArrayBuffer, Uint8Array},
std::convert::TryFrom,
wasm_bindgen::{JsCast, JsValue},
wasm_bindgen_futures::JsFuture,
web_sys::{Blob, Response},
};
#[cfg(not(target_arch = "wasm32"))]
use {
std::io::{Seek, SeekFrom}, tokio::task::spawn_blocking
std::io::{Seek, SeekFrom},
tokio::task::spawn_blocking,
};

use super::{Directory, File, Page, Partition};
Expand Down
7 changes: 5 additions & 2 deletions amadeus-core/src/into_par_stream/collections.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use owned_chars::{OwnedChars as IntoChars, OwnedCharsExt};
use std::{
collections::{
binary_heap, btree_map, btree_set, hash_map, hash_set, linked_list, vec_deque, BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, LinkedList, VecDeque
}, hash::{BuildHasher, Hash}, iter, option, result, slice, str, vec
binary_heap, btree_map, btree_set, hash_map, hash_set, linked_list, vec_deque, BTreeMap,
BTreeSet, BinaryHeap, HashMap, HashSet, LinkedList, VecDeque,
},
hash::{BuildHasher, Hash},
iter, option, result, slice, str, vec,
};

use super::{IntoDistributedStream, IntoParallelStream, IterDistStream, IterParStream};
Expand Down
6 changes: 4 additions & 2 deletions amadeus-core/src/into_par_stream/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use futures::Stream;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
ops::{Range, RangeFrom, RangeInclusive}, pin::Pin, task::{Context, Poll}
ops::{Range, RangeFrom, RangeInclusive},
pin::Pin,
task::{Context, Poll},
};

use super::{
DistributedStream, IntoDistributedStream, IntoParallelStream, ParallelStream, StreamTask
DistributedStream, IntoDistributedStream, IntoParallelStream, ParallelStream, StreamTask,
};
use crate::pool::ProcessSend;

Expand Down
9 changes: 7 additions & 2 deletions amadeus-core/src/into_par_stream/slice.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use futures::Stream;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::{
convert::Infallible, iter, pin::Pin, slice, task::{Context, Poll}
convert::Infallible,
iter,
pin::Pin,
slice,
task::{Context, Poll},
};

use super::{
DistributedStream, IntoDistributedStream, IntoParallelStream, IterDistStream, IterParStream, ParallelStream, StreamTask
DistributedStream, IntoDistributedStream, IntoParallelStream, IterDistStream, IterParStream,
ParallelStream, StreamTask,
};
use crate::pool::ProcessSend;

Expand Down
6 changes: 5 additions & 1 deletion amadeus-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@ macro_rules! impl_par_dist {
mod impl_par_dist {
#[allow(unused_imports)]
pub(crate) use crate::{
combiner_dist_sink as combiner_par_sink, folder_dist_sink as folder_par_sink, par_pipe::DistributedPipe as ParallelPipe, par_sink::{DistributedSink as ParallelSink, FromDistributedStream as FromParallelStream}, par_stream::DistributedStream as ParallelStream, pool::ProcessSend as Send
combiner_dist_sink as combiner_par_sink, folder_dist_sink as folder_par_sink,
par_pipe::DistributedPipe as ParallelPipe,
par_sink::{DistributedSink as ParallelSink, FromDistributedStream as FromParallelStream},
par_stream::DistributedStream as ParallelStream,
pool::ProcessSend as Send,
};
}

Expand Down
3 changes: 2 additions & 1 deletion amadeus-core/src/par_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use super::par_pipe::*;
use crate::{pipe::Sink, pool::ProcessSend};

pub use self::{
all::*, any::*, collect::*, combine::*, combiner::*, count::*, fold::*, folder::*, for_each::*, fork::*, group_by::*, histogram::*, max::*, pipe::*, sample::*, stats::*, sum::*, tuple::*
all::*, any::*, collect::*, combine::*, combiner::*, count::*, fold::*, folder::*, for_each::*,
fork::*, group_by::*, histogram::*, max::*, pipe::*, sample::*, stats::*, sum::*, tuple::*,
};

#[must_use]
Expand Down
7 changes: 5 additions & 2 deletions amadeus-core/src/par_sink/all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use serde_closure::traits::FnMut;
use std::{
marker::PhantomData, pin::Pin, task::{Context, Poll}
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};

use super::{
DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, Reducer, ReducerProcessSend, ReducerSend
DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, Reducer, ReducerProcessSend,
ReducerSend,
};
use crate::{pipe::Sink, pool::ProcessSend};

Expand Down
7 changes: 5 additions & 2 deletions amadeus-core/src/par_sink/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use serde_closure::traits::FnMut;
use std::{
marker::PhantomData, pin::Pin, task::{Context, Poll}
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};

use super::{
DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, Reducer, ReducerProcessSend, ReducerSend
DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, Reducer, ReducerProcessSend,
ReducerSend,
};
use crate::{pipe::Sink, pool::ProcessSend};

Expand Down
10 changes: 8 additions & 2 deletions amadeus-core/src/par_sink/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ use futures::{pin_mut, ready, Stream, StreamExt};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, LinkedList, VecDeque}, hash::{BuildHasher, Hash}, iter, marker::PhantomData, pin::Pin, task::{Context, Poll}
collections::{BTreeMap, BTreeSet, BinaryHeap, HashMap, HashSet, LinkedList, VecDeque},
hash::{BuildHasher, Hash},
iter,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};

use super::{
DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, Reducer, ReducerProcessSend, ReducerSend
DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, Reducer, ReducerProcessSend,
ReducerSend,
};
use crate::{pipe::Sink, pool::ProcessSend};

Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/src/par_sink/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use derive_new::new;
use serde::{Deserialize, Serialize};

use super::{
folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink, SumFolder
folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink, SumFolder,
};

#[derive(new)]
Expand Down
5 changes: 4 additions & 1 deletion amadeus-core/src/par_sink/folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use futures::{ready, Stream};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
future::Future, marker::PhantomData, pin::Pin, task::{Context, Poll}
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};

use super::{Reducer, ReducerProcessSend, ReducerSend};
Expand Down
7 changes: 5 additions & 2 deletions amadeus-core/src/par_sink/for_each.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use serde_closure::traits::FnMut;
use std::{
marker::PhantomData, pin::Pin, task::{Context, Poll}
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};

use super::{
DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PushReducer, Reducer, ReducerProcessSend, ReducerSend
DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PushReducer, Reducer,
ReducerProcessSend, ReducerSend,
};
use crate::{pipe::Sink, pool::ProcessSend};

Expand Down
10 changes: 7 additions & 3 deletions amadeus-core/src/par_sink/fork.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@ use futures::{pin_mut, ready, stream, Stream, StreamExt as _};
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
marker::PhantomData, pin::Pin, task::{Context, Poll}
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use sum::Sum2;

use super::{
DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask, ReduceA2, ReduceC2
DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask, ReduceA2, ReduceC2,
};
use crate::{
par_stream::{ParallelStream, StreamTask}, pipe::Pipe, util::transmute
par_stream::{ParallelStream, StreamTask},
pipe::Pipe,
util::transmute,
};

#[pin_project]
Expand Down
12 changes: 9 additions & 3 deletions amadeus-core/src/par_sink/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,21 @@ use indexmap::IndexMap;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
hash::Hash, marker::PhantomData, mem, pin::Pin, task::{Context, Poll}
hash::Hash,
marker::PhantomData,
mem,
pin::Pin,
task::{Context, Poll},
};
use sum::Sum2;

use super::{
DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask, Reducer, ReducerProcessSend, ReducerSend
DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask, Reducer,
ReducerProcessSend, ReducerSend,
};
use crate::{
pipe::{Pipe, Sink, StreamExt as _}, pool::ProcessSend
pipe::{Pipe, Sink, StreamExt as _},
pool::ProcessSend,
};

#[derive(new)]
Expand Down
6 changes: 4 additions & 2 deletions amadeus-core/src/par_sink/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ use derive_new::new;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
pin::Pin, task::{Context, Poll}
pin::Pin,
task::{Context, Poll},
};

use super::{DistributedPipe, DistributedSink, ParallelPipe, ParallelSink, PipeTask};
use crate::{
par_stream::{ParallelStream, StreamTask}, pipe::{Pipe as _, PipePipe, StreamExt, StreamPipe}
par_stream::{ParallelStream, StreamTask},
pipe::{Pipe as _, PipePipe, StreamExt, StreamPipe},
};

#[pin_project]
Expand Down
3 changes: 2 additions & 1 deletion amadeus-core/src/par_sink/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use std::hash::Hash;
use streaming_algorithms::{HyperLogLogMagnitude, SampleUnstable as SASampleUnstable, Top};

use super::{
folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink, SumFolder, SumZeroFolder
folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink, SumFolder,
SumZeroFolder,
};

#[derive(new)]
Expand Down
Loading

0 comments on commit fb9eae7

Please sign in to comment.