Skip to content

Commit

Permalink
switch to indexmap
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed Jul 27, 2020
1 parent 59937f0 commit 3c04f51
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 53 deletions.
1 change: 1 addition & 0 deletions amadeus-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ derive-new = "0.5"
educe = "0.4"
either = { version = "1.5", features = ["serde"] }
futures = "0.3"
indexmap = { version = "1.5", features = ["serde-1"] }
itertools = "0.9"
owned_chars = "0.3"
pin-project = "0.4"
Expand Down
76 changes: 26 additions & 50 deletions amadeus-core/src/par_sink/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
use derive_new::new;
use educe::Educe;
use futures::{pin_mut, ready, stream, Stream, StreamExt};
use indexmap::IndexMap;
use pin_project::pin_project;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap, hash::Hash, marker::PhantomData, mem, pin::Pin, task::{Context, Poll}
hash::Hash, marker::PhantomData, mem, pin::Pin, task::{Context, Poll}
};
use sum::Sum2;

Expand All @@ -33,7 +34,7 @@ where
B::ReduceC: Clone,
B::Done: Send + 'static,
{
type Done = HashMap<T, B::Done>;
type Done = IndexMap<T, B::Done>;
type Pipe = A;
type ReduceA = GroupByReducerA<<B::Pipe as ParallelPipe<U>>::Task, B::ReduceA, T, U>;
type ReduceC = GroupByReducerB<
Expand Down Expand Up @@ -62,7 +63,7 @@ where
B::ReduceC: Clone,
B::Done: ProcessSend + 'static,
{
type Done = HashMap<T, B::Done>;
type Done = IndexMap<T, B::Done>;
type Pipe = A;
type ReduceA = GroupByReducerA<<B::Pipe as DistributedPipe<U>>::Task, B::ReduceA, T, U>;
type ReduceB = GroupByReducerB<
Expand Down Expand Up @@ -103,7 +104,7 @@ where
R: Reducer<P::Output> + Clone,
T: Eq + Hash,
{
type Done = HashMap<T, R::Done>;
type Done = IndexMap<T, R::Done>;
type Async = GroupByReducerAAsync<P::Async, R, T, U>;

fn into_async(self) -> Self::Async {
Expand All @@ -117,7 +118,7 @@ where
T: Eq + Hash + ProcessSend + 'static,
R::Done: ProcessSend + 'static,
{
type Done = HashMap<T, R::Done>;
type Done = IndexMap<T, R::Done>;
}
impl<P, R, T, U> ReducerSend<(T, U)> for GroupByReducerA<P, R, T, U>
where
Expand All @@ -126,7 +127,7 @@ where
T: Eq + Hash + Send + 'static,
R::Done: Send + 'static,
{
type Done = HashMap<T, R::Done>;
type Done = IndexMap<T, R::Done>;
}

#[pin_project]
Expand All @@ -142,8 +143,7 @@ where
#[new(default)]
pending: Option<Sum2<(T, Option<U>, Option<Pin<Box<R::Async>>>), Vec<Option<R::Done>>>>,
#[new(default)]
map: HashMap<T, Pin<Box<R::Async>>>,
marker: PhantomData<fn() -> (R, U)>,
map: IndexMap<T, Pin<Box<R::Async>>>,
}

impl<P, R, T, U> Sink<(T, U)> for GroupByReducerAAsync<P, R, T, U>
Expand All @@ -152,7 +152,7 @@ where
R: Reducer<P::Output> + Clone,
T: Eq + Hash,
{
type Done = HashMap<T, R::Done>;
type Done = IndexMap<T, R::Done>;

#[inline(always)]
fn poll_forward(
Expand Down Expand Up @@ -196,7 +196,7 @@ where
.pipe(self_.pipe.as_mut());
pin_mut!(stream);
let map = &mut *self_.map;
let r_ = r.as_mut().unwrap_or_else(|| map.get_mut(&k).unwrap());
let r_ = r.as_mut().unwrap_or_else(|| map.get_mut(k).unwrap());
if r_.as_mut().poll_forward(cx, stream).is_ready() {
let _ = u.take();
}
Expand Down Expand Up @@ -228,9 +228,8 @@ where
if !done_ {
return Poll::Pending;
}
let ret = self_
.map
.drain()
let ret = mem::take(self_.map)
.into_iter()
.zip(done.iter_mut())
.map(|((k, _), v)| (k, v.take().unwrap()))
.collect();
Expand All @@ -249,33 +248,33 @@ where
)]
pub struct GroupByReducerB<R, T, U>(R, PhantomData<fn() -> (T, U)>);

impl<R, T, U> Reducer<HashMap<T, U>> for GroupByReducerB<R, T, U>
impl<R, T, U> Reducer<IndexMap<T, U>> for GroupByReducerB<R, T, U>
where
R: Reducer<U> + Clone,
T: Eq + Hash,
{
type Done = HashMap<T, R::Done>;
type Done = IndexMap<T, R::Done>;
type Async = GroupByReducerBAsync<R, T, U>;

fn into_async(self) -> Self::Async {
GroupByReducerBAsync::new(self.0)
}
}
impl<R, T, U> ReducerProcessSend<HashMap<T, U>> for GroupByReducerB<R, T, U>
impl<R, T, U> ReducerProcessSend<IndexMap<T, U>> for GroupByReducerB<R, T, U>
where
R: Reducer<U> + Clone,
T: Eq + Hash + ProcessSend + 'static,
R::Done: ProcessSend + 'static,
{
type Done = HashMap<T, R::Done>;
type Done = IndexMap<T, R::Done>;
}
impl<R, T, U> ReducerSend<HashMap<T, U>> for GroupByReducerB<R, T, U>
impl<R, T, U> ReducerSend<IndexMap<T, U>> for GroupByReducerB<R, T, U>
where
R: Reducer<U> + Clone,
T: Eq + Hash + Send + 'static,
R::Done: Send + 'static,
{
type Done = HashMap<T, R::Done>;
type Done = IndexMap<T, R::Done>;
}

#[pin_project]
Expand All @@ -286,23 +285,22 @@ where
{
f: R,
#[new(default)]
pending: Option<Sum2<HashMap<T, (U, Option<Pin<Box<R::Async>>>)>, Vec<Option<R::Done>>>>,
pending: Option<Sum2<IndexMap<T, (U, Option<Pin<Box<R::Async>>>)>, Vec<Option<R::Done>>>>,
#[new(default)]
map: HashMap<T, Pin<Box<R::Async>>>,
marker: PhantomData<fn() -> (T, U)>,
map: IndexMap<T, Pin<Box<R::Async>>>,
}

impl<R, T, U> Sink<HashMap<T, U>> for GroupByReducerBAsync<R, T, U>
impl<R, T, U> Sink<IndexMap<T, U>> for GroupByReducerBAsync<R, T, U>
where
R: Reducer<U> + Clone,
T: Eq + Hash,
{
type Done = HashMap<T, R::Done>;
type Done = IndexMap<T, R::Done>;

#[inline(always)]
fn poll_forward(
self: Pin<&mut Self>, cx: &mut Context,
mut stream: Pin<&mut impl Stream<Item = HashMap<T, U>>>,
mut stream: Pin<&mut impl Stream<Item = IndexMap<T, U>>>,
) -> Poll<Self::Done> {
let self_ = self.project();
loop {
Expand All @@ -329,7 +327,7 @@ where
}
match self_.pending.as_mut().unwrap() {
Sum2::A(pending) => {
while let Some((k, (v, mut r))) = pop(pending) {
while let Some((k, (v, mut r))) = pending.pop() {
let mut v = Some(v);
let waker = cx.waker();
let stream = stream::poll_fn(|cx| {
Expand Down Expand Up @@ -381,9 +379,8 @@ where
if !done_ {
return Poll::Pending;
}
let ret = self_
.map
.drain()
let ret = mem::take(self_.map)
.into_iter()
.zip(done.iter_mut())
.map(|((k, _), v)| (k, v.take().unwrap()))
.collect();
Expand All @@ -393,24 +390,3 @@ where
}
}
}

// https://github.com/rust-lang/rfcs/issues/1800#issuecomment-653757340
#[allow(clippy::unnecessary_filter_map)]
fn pop<K, V>(map: &mut HashMap<K, V>) -> Option<(K, V)>
where
K: Eq + Hash,
{
let mut first = None;
*map = mem::take(map)
.into_iter()
.filter_map(|el| {
if first.is_none() {
first = Some(el);
None
} else {
Some(el)
}
})
.collect();
first
}
7 changes: 4 additions & 3 deletions amadeus-core/src/par_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ mod update;
use async_trait::async_trait;
use either::Either;
use futures::{future, pin_mut, stream, stream::StreamExt as _, Stream};
use indexmap::IndexMap;
use serde_closure::{traits, FnOnce};
use std::{
cmp::Ordering, collections::HashMap, hash::Hash, iter, ops, pin::Pin, task::{Context, Poll}, vec
cmp::Ordering, hash::Hash, iter, ops, pin::Pin, task::{Context, Poll}, vec
};

use super::{par_pipe::*, par_sink::*};
Expand Down Expand Up @@ -454,7 +455,7 @@ stream!(ParallelStream ParallelPipe ParallelSink FromParallelStream IntoParallel
.await
}

async fn group_by<P, S, A, B>(self, pool: &P, sink: S) -> HashMap<A, S::Done>
async fn group_by<P, S, A, B>(self, pool: &P, sink: S) -> IndexMap<A, S::Done>
where
P: ThreadPool,
A: Eq + Hash + Send + 'static,
Expand Down Expand Up @@ -680,7 +681,7 @@ stream!(DistributedStream DistributedPipe DistributedSink FromDistributedStream
.await
}

async fn group_by<P, S, A, B>(self, pool: &P, sink: S) -> HashMap<A, S::Done>
async fn group_by<P, S, A, B>(self, pool: &P, sink: S) -> IndexMap<A, S::Done>
where
P: ProcessPool,
A: Eq + Hash + ProcessSend + 'static,
Expand Down

0 comments on commit 3c04f51

Please sign in to comment.