Skip to content

Commit

Permalink
Merge pull request #1 from alecmocatta/faster
Browse files Browse the repository at this point in the history
Improve performance including SIMD-acceleration
  • Loading branch information
alecmocatta authored Sep 25, 2018
2 parents 0f63752 + 283183c commit c5da5a6
Show file tree
Hide file tree
Showing 9 changed files with 387 additions and 104 deletions.
1 change: 0 additions & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,3 @@ merge_imports = true
use_field_init_shorthand = true
use_try_shorthand = true
# wrap_comments = true
ignore = ["src/distinct/consts.rs"]
8 changes: 2 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ authors = ["Alec Mocatta <alec@mocatta.net>"]
categories = ["data-structures","algorithms","science"]
keywords = ["streaming-algorithm","probabilistic","sketch","data-structure","hyperloglog"]
description = """
Performant implementations of various streaming algorithms, including Count–min sketch; Top k; HyperLogLog; Reservoir sampling.
SIMD-accelerated implementations of various streaming algorithms, including Count–min sketch, Top k, HyperLogLog, Reservoir sampling.
"""
repository = "https://github.com/alecmocatta/streaming_algorithms"
homepage = "https://github.com/alecmocatta/streaming_algorithms"
Expand All @@ -26,8 +26,4 @@ twox-hash = "1.1"
serde_derive = "1.0"
serde = "1.0"
rand = "0.5"
bytecount = "0.3"

[features]
avx-accel = ["bytecount/avx-accel"]
simd-accel = ["bytecount/simd-accel"]
packed_simd = { version = "0.3", features = ["into_bits"] }
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

[Docs](https://docs.rs/streaming_algorithms/0.1.0)

Performant implementations of various [streaming algorithms](https://en.wikipedia.org/wiki/Streaming_algorithm).
SIMD-accelerated implementations of various [streaming algorithms](https://en.wikipedia.org/wiki/Streaming_algorithm).

This library is a work in progress. PRs are very welcome! Currently implemented algorithms include:

Expand All @@ -17,7 +17,13 @@ This library is a work in progress. PRs are very welcome! Currently implemented
* HyperLogLog
* Reservoir sampling

A goal of this library is to enable composition of these algorithms; for example Top k + HyperLogLog to enable roughly `SELECT key FROM table GROUP BY key ORDER BY COUNT(DISTINCT value) DESC LIMIT k`.
A goal of this library is to enable composition of these algorithms; for example Top k + HyperLogLog to enable an approximate version of something akin to `SELECT key FROM table GROUP BY key ORDER BY COUNT(DISTINCT value) DESC LIMIT k`.

Run your application with `RUSTFLAGS="-C target-cpu=native"` to benefit from the SIMD-acceleration like so:

```bash
RUSTFLAGS="-C target-cpu=native" cargo run --release
```

See [this gist](https://gist.github.com/debasishg/8172796) for a good list of further algorithms to be implemented. Other resources are [Probabilistic data structures – Wikipedia](https://en.wikipedia.org/wiki/Category:Probabilistic_data_structures), [DataSketches – A similar Java library originating at Yahoo](https://datasketches.github.io/), and [Algebird – A similar Java library originating at Twitter](https://github.com/twitter/algebird).

Expand Down
82 changes: 56 additions & 26 deletions src/count_min.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use serde::{de::Deserialize, ser::Serialize};
use std::{
borrow::Borrow, cmp::max, fmt, hash::{Hash, Hasher}, marker::PhantomData, ops
};
use traits::{Intersect, New, UnionAssign};
use traits::{Intersect, IntersectPlusUnionIsPlus, New, UnionAssign};
use twox_hash::XxHash;

/// An implementation of a [count-min sketch](https://en.wikipedia.org/wiki/Count–min_sketch) data structure with *conservative updating* for increased accuracy.
Expand All @@ -39,7 +39,7 @@ use twox_hash::XxHash;
))]
pub struct CountMinSketch<K: ?Sized, C: New> {
counters: Vec<Vec<C>>,
// offsets: Vec<usize>,
offsets: Vec<usize>, // to avoid malloc/free each push
mask: usize,
k_num: usize,
config: <C as New>::Config,
Expand All @@ -58,8 +58,10 @@ where
let counters: Vec<Vec<C>> = (0..k_num)
.map(|_| (0..width).map(|_| C::new(&config)).collect())
.collect();
let offsets = vec![0; k_num];
Self {
counters,
offsets,
mask: Self::mask(width),
k_num,
config,
Expand All @@ -74,19 +76,40 @@ where
K: Borrow<Q>,
C: for<'a> ops::AddAssign<&'a V>,
{
let offsets = self.offsets(key).take(self.k_num).collect::<Vec<_>>();
let mut lowest = C::intersect(
offsets
.iter()
.cloned()
.enumerate()
.map(|(k_i, offset)| &self.counters[k_i][offset]),
).unwrap();
lowest += value;
for (k_i, offset) in offsets.into_iter().enumerate() {
self.counters[k_i][offset].union_assign(&lowest);
if !<C as IntersectPlusUnionIsPlus>::VAL {
let offsets = self.offsets(key);
self.offsets
.iter_mut()
.zip(offsets)
.for_each(|(offset, offset_new)| {
*offset = offset_new;
});
let mut lowest = C::intersect(
self.offsets
.iter()
.enumerate()
.map(|(k_i, &offset)| &self.counters[k_i][offset]),
).unwrap();
lowest += value;
self.counters
.iter_mut()
.zip(self.offsets.iter())
.for_each(|(counters, &offset)| {
counters[offset].union_assign(&lowest);
});
lowest
} else {
let offsets = self.offsets(key);
C::intersect(
self.counters
.iter_mut()
.zip(offsets)
.map(|(counters, offset)| {
counters[offset] += value;
&counters[offset]
}),
).unwrap()
}
lowest
}

/// Union the aggregated value for `key` with `value`.
Expand All @@ -95,9 +118,13 @@ where
Q: Hash,
K: Borrow<Q>,
{
for (k_i, offset) in self.offsets(key).take(self.k_num).enumerate() {
self.counters[k_i][offset].union_assign(value);
}
let offsets = self.offsets(key);
self.counters
.iter_mut()
.zip(offsets)
.for_each(|(counters, offset)| {
counters[offset].union_assign(value);
})
}

/// Retrieve an estimate of the aggregated value for `key`.
Expand All @@ -107,10 +134,10 @@ where
K: Borrow<Q>,
{
C::intersect(
self.offsets(key)
.take(self.k_num)
.enumerate()
.map(|(k_i, offset)| &self.counters[k_i][offset]),
self.counters
.iter()
.zip(self.offsets(key))
.map(|(counters, offset)| &counters[offset]),
).unwrap()
}

Expand All @@ -124,11 +151,13 @@ where

/// Clears the `CountMinSketch` data structure, as if it was new.
pub fn clear(&mut self) {
for k_i in 0..self.k_num {
for counter in &mut self.counters[k_i] {
*counter = C::new(&self.config);
}
}
let config = &self.config;
self.counters
.iter_mut()
.flat_map(|x| x.iter_mut())
.for_each(|counter| {
*counter = C::new(config);
})
}

fn optimal_width(tolerance: f64) -> usize {
Expand Down Expand Up @@ -193,6 +222,7 @@ impl<K: ?Sized, C: New + Clone> Clone for CountMinSketch<K, C> {
fn clone(&self) -> Self {
Self {
counters: self.counters.clone(),
offsets: vec![0; self.offsets.len()],
mask: self.mask,
k_num: self.k_num,
config: self.config.clone(),
Expand Down
Loading

0 comments on commit c5da5a6

Please sign in to comment.