Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding statistical functions #102

Merged
merged 31 commits into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e2e7d72
adding stats functions
kevloui Jul 31, 2020
8d9befa
adding stats functions
kevloui Jul 31, 2020
a73eb8c
minor fixups
alecmocatta Jul 31, 2020
9a6965c
adding stats functions
kevloui Aug 1, 2020
c9f6989
cargo fmt
alecmocatta Aug 1, 2020
b601901
get FolderSync::done to run only in the final reduce
alecmocatta Aug 1, 2020
6cc0772
adding stats functions
kevloui Aug 1, 2020
cef89c3
adding stats functions
kevloui Aug 1, 2020
c2f0e8f
fixes
alecmocatta Aug 1, 2020
26c9d5e
adding stats functions
kevloui Aug 3, 2020
80beab2
adding stats functions
kevloui Aug 3, 2020
bea8da8
fix
alecmocatta Aug 4, 2020
f3c9bdb
adding stats functions
kevloui Aug 5, 2020
776ad3a
adding stats functions
kevloui Aug 5, 2020
1e0a237
Stddev function improvements and fixed conversions of u64 to f64
kevloui Aug 6, 2020
ad35272
Adds u64_to_f64 and f64_to_u64 to src/util.rs to make usage cleaner. …
kevloui Aug 6, 2020
ee7552f
Fixes local.rs utils.rs function usage warnings
kevloui Aug 7, 2020
a69cbc4
Fixes stddev.rs calculation. Prevents calculation for < 2 values
kevloui Aug 7, 2020
34d1827
Fixes stddev.rs calculation. Prevents calculation for < 2 values
kevloui Aug 7, 2020
88a46d7
Fixes stddev.rs. Final sqrt calculation now happens in `done` for per…
kevloui Aug 7, 2020
000bc86
Resolves conflicts in `sum.rs`
kevloui Aug 7, 2020
9148498
Resolves conflicts in `sum.rs`
kevloui Aug 7, 2020
0c3fcf9
Resolves conflicts in `sum.rs`
kevloui Aug 7, 2020
9b1fa20
Resolves conflicts in `sum.rs`
kevloui Aug 7, 2020
4e70c92
Resolves conflicts in `sum.rs`
kevloui Aug 7, 2020
d562e2a
Cleans up stddev.rs by removing pre_variance
kevloui Aug 7, 2020
ca79a93
Cleans up stddev.rs by removing pre_variance
kevloui Aug 7, 2020
c1f362b
Cleans up stddev.rs
kevloui Aug 7, 2020
28ed8bf
Cleans up stddev.rs done function
kevloui Aug 7, 2020
0f60e1f
Update sum.rs
kevloui Aug 7, 2020
f0e0e53
tweaks
alecmocatta Aug 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 2 additions & 22 deletions amadeus-core/src/file/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use {
};

use super::{Directory, File, Page, Partition};
#[cfg(target_arch = "wasm32")]
use crate::util::{f64_to_u64, u64_to_f64};
use crate::util::{IoError, ResultExpand};

#[async_trait(?Send)]
Expand Down Expand Up @@ -359,28 +361,6 @@ impl LocalFile {
}
}

#[cfg(target_arch = "wasm32")]
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss
)]
fn u64_to_f64(x: u64) -> f64 {
assert_eq!(x, x as f64 as u64);
x as f64
}
#[cfg(target_arch = "wasm32")]
#[allow(
clippy::cast_possible_truncation,
clippy::cast_sign_loss,
clippy::cast_precision_loss,
clippy::float_cmp
)]
fn f64_to_u64(x: f64) -> u64 {
assert_eq!(x, x as u64 as f64);
x as u64
}

#[cfg(target_arch = "wasm32")]
impl LocalFile {
fn read_at<'a>(
Expand Down
16 changes: 16 additions & 0 deletions amadeus-core/src/par_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,22 @@ macro_rules! pipe {
$assert_sink(Sum::new(self))
}

#[inline]
fn mean(self) -> Mean<Self>
where
Self: $pipe<Input, Output = f64> + Sized,
{
$assert_sink(Mean::new(self))
}

#[inline]
fn stddev(self) -> StdDev<Self>
where
Self: $pipe<Input, Output = f64> + Sized,
{
$assert_sink(StdDev::new(self))
}

#[inline]
fn combine<F>(self, f: F) -> Combine<Self, F>
where
Expand Down
4 changes: 3 additions & 1 deletion amadeus-core/src/par_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ mod fork;
mod group_by;
mod histogram;
mod max;
mod mean;
mod pipe;
mod sample;
mod stddev;
mod sum;
mod tuple;

use super::par_pipe::*;
use crate::{pipe::Sink, pool::ProcessSend};

pub use self::{
all::*, any::*, collect::*, combine::*, combiner::*, count::*, fold::*, folder::*, for_each::*, fork::*, group_by::*, histogram::*, max::*, pipe::*, sample::*, sum::*, tuple::*
all::*, any::*, collect::*, combine::*, combiner::*, count::*, fold::*, folder::*, for_each::*, fork::*, group_by::*, histogram::*, max::*, mean::*, pipe::*, sample::*, stddev::*, sum::*, tuple::*
};

#[must_use]
Expand Down
10 changes: 7 additions & 3 deletions amadeus-core/src/par_sink/combine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ where
F: FnMut<(A, A), Output = A>,
Item: Into<Option<A>>,
{
type Done = Option<A>;
type State = Option<A>;
type Done = Self::State;

fn zero(&mut self) -> Self::Done {
fn zero(&mut self) -> Self::State {
None
}
fn push(&mut self, state: &mut Self::Done, item: Item) {
fn push(&mut self, state: &mut Self::State, item: Item) {
if let Some(item) = item.into() {
*state = Some(if let Some(state) = state.take() {
self.0.call_mut((state, item))
Expand All @@ -34,6 +35,9 @@ where
});
}
}
fn done(&mut self, state: Self::State) -> Self::Done {
state
}
}

#[derive(new)]
Expand Down
20 changes: 12 additions & 8 deletions amadeus-core/src/par_sink/combiner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ mod macros {
($combiner:ty, $self:ident, $init:expr) => {
type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done;
type Pipe = P;
type ReduceA = FolderSyncReducer<P::Output, $combiner>;
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $combiner>;
type ReduceA = FolderSyncReducer<P::Output, $combiner, crate::par_sink::Inter>;
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $combiner, crate::par_sink::Final>;

fn reducers($self) -> (P, Self::ReduceA, Self::ReduceC) {
let init = $init;
Expand All @@ -26,9 +26,9 @@ mod macros {
($combiner:ty, $self:ident, $init:expr) => {
type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done>>::Done;
type Pipe = P;
type ReduceA = FolderSyncReducer<P::Output, $combiner>;
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $combiner>;
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done, $combiner>;
type ReduceA = FolderSyncReducer<P::Output, $combiner, crate::par_sink::Inter>;
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $combiner, crate::par_sink::Inter>;
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done, $combiner, crate::par_sink::Final>;

fn reducers($self) -> (P, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
let init = $init;
Expand Down Expand Up @@ -57,12 +57,13 @@ where
C: CombinerSync<Done = B>,
Item: Into<Option<B>>,
{
type Done = Option<B>;
type State = Option<B>;
type Done = Self::State;

fn zero(&mut self) -> Self::Done {
fn zero(&mut self) -> Self::State {
None
}
fn push(&mut self, state: &mut Self::Done, item: Item) {
fn push(&mut self, state: &mut Self::State, item: Item) {
if let Some(item) = item.into() {
*state = Some(if let Some(state) = state.take() {
self.combine(state, item)
Expand All @@ -71,4 +72,7 @@ where
});
}
}
fn done(&mut self, state: Self::State) -> Self::Done {
state
}
}
11 changes: 8 additions & 3 deletions amadeus-core/src/par_sink/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,19 @@ impl_par_dist! {
pub struct CountFolder;

impl<Item> FolderSync<Item> for CountFolder {
type Done = usize;
type State = usize;
type Done = Self::State;

#[inline(always)]
fn zero(&mut self) -> Self::Done {
fn zero(&mut self) -> Self::State {
0
}
#[inline(always)]
fn push(&mut self, state: &mut Self::Done, _item: Item) {
fn push(&mut self, state: &mut Self::State, _item: Item) {
*state += 1;
}
#[inline(always)]
fn done(&mut self, state: Self::State) -> Self::Done {
state
}
}
21 changes: 15 additions & 6 deletions amadeus-core/src/par_sink/fold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,28 +50,37 @@ where
ID: FnMut<(), Output = B>,
F: FnMut<(B, Either<Item, B>), Output = B>,
{
type Done = B;
type State = B;
type Done = Self::State;

fn zero(&mut self) -> Self::Done {
fn zero(&mut self) -> Self::State {
self.identity.call_mut(())
}
fn push(&mut self, state: &mut Self::Done, item: Item) {
fn push(&mut self, state: &mut Self::State, item: Item) {
replace_with_or_abort(state, |state| self.op.call_mut((state, Either::Left(item))))
}
fn done(&mut self, state: Self::State) -> Self::Done {
state
}
}
impl<A, ID, F, Item> FolderSync<Item> for FoldFolder<A, ID, F, Item, StepB>
where
ID: FnMut<(), Output = Item>,
F: FnMut<(Item, Either<A, Item>), Output = Item>,
{
type Done = Item;
type State = Item;
type Done = Self::State;

fn zero(&mut self) -> Self::Done {
fn zero(&mut self) -> Self::State {
self.identity.call_mut(())
}
fn push(&mut self, state: &mut Self::Done, item: Item) {
fn push(&mut self, state: &mut Self::State, item: Item) {
replace_with_or_abort(state, |state| {
self.op.call_mut((state, Either::Right(item)))
})
}
#[inline(always)]
fn done(&mut self, state: Self::State) -> Self::Done {
state
}
}
88 changes: 71 additions & 17 deletions amadeus-core/src/par_sink/folder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ mod macros {
($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done;
type Pipe = P;
type ReduceA = FolderSyncReducer<P::Output, $folder_a>;
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b>;
type ReduceA = FolderSyncReducer<P::Output, $folder_a, crate::par_sink::Inter>;
type ReduceC = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b, crate::par_sink::Final>;

fn reducers($self) -> (P, Self::ReduceA, Self::ReduceC) {
let init_a = $init_a;
Expand All @@ -37,9 +37,9 @@ mod macros {
($folder_a:ty, $folder_b:ty, $self:ident, $init_a:expr, $init_b:expr) => {
type Done = <Self::ReduceC as $crate::par_sink::Reducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done>>::Done;
type Pipe = P;
type ReduceA = FolderSyncReducer<P::Output, $folder_a>;
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b>;
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done, $folder_b>;
type ReduceA = FolderSyncReducer<P::Output, $folder_a, crate::par_sink::Inter>;
type ReduceB = FolderSyncReducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done, $folder_b, crate::par_sink::Inter>;
type ReduceC = FolderSyncReducer<<Self::ReduceB as $crate::par_sink::Reducer<<Self::ReduceA as $crate::par_sink::Reducer<P::Output>>::Done>>::Done, $folder_b, crate::par_sink::Final>;

fn reducers($self) -> (P, Self::ReduceA, Self::ReduceB, Self::ReduceC) {
let init_a = $init_a;
Expand All @@ -60,29 +60,64 @@ mod macros {
pub(crate) use macros::{folder_dist_sink, folder_par_sink};

pub trait FolderSync<Item> {
type State;
type Done;

fn zero(&mut self) -> Self::Done;
fn push(&mut self, state: &mut Self::Done, item: Item);
fn zero(&mut self) -> Self::State;
fn push(&mut self, state: &mut Self::State, item: Item);
fn done(&mut self, state: Self::State) -> Self::Done;
}

pub struct Inter;
pub struct Final;

#[derive(Educe, Serialize, Deserialize, new)]
#[educe(Clone(bound = "F: Clone"))]
#[serde(
bound(serialize = "F: Serialize"),
bound(deserialize = "F: Deserialize<'de>")
)]
pub struct FolderSyncReducer<Item, F> {
pub struct FolderSyncReducer<Item, F, Final> {
folder: F,
marker: PhantomData<fn() -> Item>,
marker: PhantomData<fn() -> (Item, Final)>,
}

impl<Item, F> Reducer<Item> for FolderSyncReducer<Item, F, Inter>
where
F: FolderSync<Item>,
{
type Done = F::State;
type Async = FolderSyncReducerAsync<Item, F, F::State, Inter>;

fn into_async(mut self) -> Self::Async {
FolderSyncReducerAsync {
state: Some(self.folder.zero()),
folder: self.folder,
marker: PhantomData,
}
}
}
impl<Item, F> ReducerProcessSend<Item> for FolderSyncReducer<Item, F, Inter>
where
F: FolderSync<Item>,
F::State: ProcessSend + 'static,
{
type Done = F::State;
}
impl<Item, F> ReducerSend<Item> for FolderSyncReducer<Item, F, Inter>
where
F: FolderSync<Item>,
F::State: Send + 'static,
{
type Done = F::State;
}

impl<Item, F> Reducer<Item> for FolderSyncReducer<Item, F>
impl<Item, F> Reducer<Item> for FolderSyncReducer<Item, F, Final>
where
F: FolderSync<Item>,
{
type Done = F::Done;
type Async = FolderSyncReducerAsync<Item, F, F::Done>;
type Async = FolderSyncReducerAsync<Item, F, F::State, Final>;

fn into_async(mut self) -> Self::Async {
FolderSyncReducerAsync {
Expand All @@ -92,14 +127,14 @@ where
}
}
}
impl<Item, F> ReducerProcessSend<Item> for FolderSyncReducer<Item, F>
impl<Item, F> ReducerProcessSend<Item> for FolderSyncReducer<Item, F, Final>
where
F: FolderSync<Item>,
F::Done: ProcessSend + 'static,
{
type Done = F::Done;
}
impl<Item, F> ReducerSend<Item> for FolderSyncReducer<Item, F>
impl<Item, F> ReducerSend<Item> for FolderSyncReducer<Item, F, Final>
where
F: FolderSync<Item>,
F::Done: Send + 'static,
Expand All @@ -108,16 +143,16 @@ where
}

#[pin_project]
pub struct FolderSyncReducerAsync<Item, F, S> {
pub struct FolderSyncReducerAsync<Item, F, S, Final> {
state: Option<S>,
folder: F,
marker: PhantomData<fn() -> Item>,
marker: PhantomData<fn() -> (Item, Final)>,
}
impl<Item, F> Sink<Item> for FolderSyncReducerAsync<Item, F, F::Done>
impl<Item, F> Sink<Item> for FolderSyncReducerAsync<Item, F, F::State, Inter>
where
F: FolderSync<Item>,
{
type Done = F::Done;
type Done = F::State;

#[inline(always)]
fn poll_forward(
Expand All @@ -132,3 +167,22 @@ where
Poll::Ready(self_.state.take().unwrap())
}
}
impl<Item, F> Sink<Item> for FolderSyncReducerAsync<Item, F, F::State, Final>
where
F: FolderSync<Item>,
{
type Done = F::Done;

#[inline(always)]
fn poll_forward(
self: Pin<&mut Self>, cx: &mut Context, mut stream: Pin<&mut impl Stream<Item = Item>>,
) -> Poll<Self::Done> {
let self_ = self.project();
let folder = self_.folder;
let state = self_.state.as_mut().unwrap();
while let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
folder.push(state, item);
}
Poll::Ready(folder.done(self_.state.take().unwrap()))
}
}
Loading