Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed Aug 7, 2020
1 parent 0f60e1f commit f0e0e53
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 57 deletions.
32 changes: 16 additions & 16 deletions amadeus-core/src/par_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,22 @@ macro_rules! pipe {
$assert_sink(Sum::new(self))
}

#[inline]
fn mean(self) -> Mean<Self>
where
Self: $pipe<Input, Output = f64> + Sized,
{
$assert_sink(Mean::new(self))
}

#[inline]
fn stddev(self) -> StdDev<Self>
where
Self: $pipe<Input, Output = f64> + Sized,
{
$assert_sink(StdDev::new(self))
}

#[inline]
fn combine<F>(self, f: F) -> Combine<Self, F>
where
Expand Down Expand Up @@ -244,14 +260,6 @@ macro_rules! pipe {
$assert_sink(MaxByKey::new(self, f))
}

#[inline]
fn mean(self) -> Mean<Self>
where
Self: $pipe<Input, Output = f64> + Sized,
{
$assert_sink(Mean::new(self))
}

#[inline]
fn min(self) -> Min<Self>
where
Expand Down Expand Up @@ -318,14 +326,6 @@ macro_rules! pipe {
$assert_sink(SampleUnstable::new(self, samples))
}

#[inline]
fn stddev(self) -> StdDev<Self>
where
Self: $pipe<Input, Output = f64> + Sized,
{
$assert_sink(StdDev::new(self))
}

#[inline]
fn all<F>(self, f: F) -> All<Self, F>
where
Expand Down
3 changes: 1 addition & 2 deletions amadeus-core/src/par_sink/mean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ pub struct Mean<P> {
}

impl_par_dist! {
impl<P: ParallelPipe<Item, Output = f64>, Item> ParallelSink<Item> for Mean<P>
{
impl<P: ParallelPipe<Item, Output = f64>, Item> ParallelSink<Item> for Mean<P> {
folder_par_sink!(
MeanFolder<StepA>,
MeanFolder<StepB>,
Expand Down
6 changes: 5 additions & 1 deletion amadeus-core/src/par_sink/sample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,18 @@ impl<Item, F> FolderSync<Item> for SortFolder<F>
where
F: traits::Fn(&Item, &Item) -> Ordering + Clone,
{
type Done = SASort<Item, F>;
type State = SASort<Item, F>;
type Done = Self::State;

fn zero(&mut self) -> Self::Done {
SASort::new(self.f.clone(), self.n)
}
fn push(&mut self, state: &mut Self::Done, item: Item) {
state.push(item)
}
fn done(&mut self, state: Self::State) -> Self::Done {
state
}
}

#[derive(new)]
Expand Down
31 changes: 15 additions & 16 deletions amadeus-core/src/par_sink/stddev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct StdDev<P> {
}

impl_par_dist! {
impl<P: ParallelPipe<Item, Output = f64>, Item, > ParallelSink<Item> for StdDev<P> {
impl<P: ParallelPipe<Item, Output = f64>, Item, > ParallelSink<Item> for StdDev<P> {
folder_par_sink!(
SDFolder<StepA>,
SDFolder<StepB>,
Expand All @@ -27,7 +27,6 @@ impl_par_dist! {
#[derive(Educe, Serialize, Deserialize, new)]
#[educe(Clone)]
#[serde(bound = "")]

pub struct SDFolder<Step> {
marker: PhantomData<fn() -> Step>,
}
Expand All @@ -40,7 +39,7 @@ pub struct SDState {
#[new(default)]
count: u64,
#[new(default)]
sum: f64,
mean: f64,
#[new(default)]
variance: f64,
}
Expand All @@ -56,16 +55,13 @@ impl FolderSync<f64> for SDFolder<StepA> {

#[inline(always)]
fn push(&mut self, state: &mut Self::State, item: f64) {
// Taken from https://docs.rs/streaming-stats/0.2.3/src/stats/online.rs.html#64-103
let q_prev = state.variance * u64_to_f64(state.count);
let mean_prev = state.mean;
state.count += 1;
state.sum += item;
if state.count > 1 {
let diff = u64_to_f64(state.count) * item - state.sum;
state.variance +=
diff * diff / (u64_to_f64(state.count) * (u64_to_f64(state.count) - 1.0));
state.variance /= u64_to_f64(state.count) - 1.0;
} else {
state.variance = f64::NAN;
}
let count = u64_to_f64(state.count);
state.mean += (item - state.mean) / count;
state.variance = (q_prev + (item - mean_prev) * (item - state.mean)) / count;
}

#[inline(always)]
Expand All @@ -85,11 +81,14 @@ impl FolderSync<SDState> for SDFolder<StepB> {

#[inline(always)]
fn push(&mut self, state: &mut Self::State, item: SDState) {
state.variance = ((u64_to_f64(state.count) - 1.0) * state.variance
+ (u64_to_f64(item.count) - 1.0) * item.variance)
/ ((u64_to_f64(state.count) + u64_to_f64(item.count)) - 2.0);
state.sum += item.sum;
let (s1, s2) = (u64_to_f64(state.count), u64_to_f64(item.count));
let meandiffsq = (state.mean - item.mean) * (state.mean - item.mean);
let mean = ((s1 * state.mean) + (s2 * item.mean)) / (s1 + s2);
let var = (((s1 * state.variance) + (s2 * item.variance)) / (s1 + s2))
+ ((s1 * s2 * meandiffsq) / ((s1 + s2) * (s1 + s2)));
state.count += item.count;
state.mean = mean;
state.variance = var;
}

#[inline(always)]
Expand Down
36 changes: 24 additions & 12 deletions amadeus-core/src/par_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,30 @@ macro_rules! stream {
.await
}

#[inline]
async fn mean<P>(self, pool: &P) -> f64
where
P: $pool,
Self::Item: 'static,
Self::Task: 'static,
Self: $stream<Item = f64> + Sized,
{
self.pipe(pool, $pipe::<Self::Item>::mean(Identity))
.await
}

#[inline]
async fn stddev<P>(self, pool: &P) -> f64
where
P: $pool,
Self::Item: 'static,
Self::Task: 'static,
Self: $stream<Item = f64> + Sized,
{
self.pipe(pool, $pipe::<Self::Item>::stddev(Identity))
.await
}

#[inline]
async fn combine<P, F>(self, pool: &P, f: F) -> Option<Self::Item>
where
Expand Down Expand Up @@ -267,18 +291,6 @@ macro_rules! stream {
.await
}

#[inline]
async fn mean<P>(self, pool: &P) -> f64
where
P: $pool,
Self::Item: 'static,
Self::Task: 'static,
Self: $stream<Item = f64> + Sized,
{
self.pipe(pool, $pipe::<Self::Item>::mean(Identity))
.await
}

#[inline]
async fn min<P>(self, pool: &P) -> Option<Self::Item>
where
Expand Down
20 changes: 10 additions & 10 deletions amadeus-core/src/par_stream/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ mod workaround {
Sum::new(self)
}

#[inline]
pub fn mean(self) -> Mean<Self> {
Mean::new(self)
}

#[inline]
pub fn stddev(self) -> StdDev<Self> {
StdDev::new(self)
}

#[inline]
pub fn combine<F>(self, f: F) -> Combine<Self, F>
where
Expand Down Expand Up @@ -162,11 +172,6 @@ mod workaround {
MaxByKey::new(self, f)
}

#[inline]
pub fn mean(self) -> Mean<Self> {
Mean::new(self)
}

#[inline]
pub fn min(self) -> Min<Self> {
Min::new(self)
Expand Down Expand Up @@ -207,11 +212,6 @@ mod workaround {
SampleUnstable::new(self, samples)
}

#[inline]
pub fn stddev(self) -> StdDev<Self> {
StdDev::new(self)
}

#[inline]
pub fn all<F>(self, f: F) -> All<Self, F>
where
Expand Down

0 comments on commit f0e0e53

Please sign in to comment.