Skip to content

Commit 60b5e16

Browse files
Upsert support (#263)
This PR introduces a prototype "upsert" arrangement operator. Instead of ingesting standard update triples, it receives keyed optional values, where the current value for a key is the most recently received value. This is substantially less general than differential dataflow allows, but lots of people use this format. The reason to do this here is that if the plan is to arrange the results anyhow, then we can re-use the arrangement and extract the "prior" values associated with keys, and undo the damage done at nominal incremental overhead.
1 parent 9fe777a commit 60b5e16

File tree

2 files changed

+338
-0
lines changed

2 files changed

+338
-0
lines changed

src/operators/arrange/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ pub mod writer;
6666
pub mod agent;
6767
pub mod arrangement;
6868

69+
pub mod upsert;
70+
6971
pub use self::writer::TraceWriter;
7072
pub use self::agent::{TraceAgent, ShutdownButton};
7173

src/operators/arrange/upsert.rs

+336
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,336 @@
1+
//! Support for forming collections from streams of upsert.
2+
//!
3+
//! Upserts are sequences of keyed optional values, and they define a collection of
4+
//! the pairs of keys and each's most recent value, if it is present. Element in the
5+
//! sequence effectively overwrites the previous value at the key, if present, and if
6+
//! the value is not present it uninstalls the key.
7+
//!
8+
//! Upserts are non-trivial because they do not themselves describe the deletions that
9+
//! the `Collection` update stream must present. However, if one creates an `Arrangement`
10+
//! then this state provides sufficient information. The arrangement will continue to
11+
//! exist even if dropped until the input or dataflow shuts down, as the upsert operator
12+
//! itself needs access to its accumulated state.
13+
//!
14+
//! # Notes
15+
//!
16+
//! Upserts currently only work with totally ordered timestamps.
17+
//!
18+
//! In the case of ties in timestamps (concurrent updates to the same key) they choose
19+
//! the *greatest* value according to `Option<Val>` ordering, which will prefer a value
20+
//! to `None` and choose the greatest value (informally, as if applied in order of value).
21+
//!
22+
//! If the same value is repeated, no change will occur in the output. That may make this
23+
//! operator effective at determining the difference between collections of keyed values,
24+
//! but note that it will not notice the absence of keys in a collection.
25+
//!
26+
//! To effect "filtering" in a way that reduces the arrangement footprint, apply a map to
27+
//! the input stream, mapping values that fail the predicate to `None` values, like so:
28+
//!
29+
//! ```ignore
30+
//! // Dropped values should be retained as "uninstall" upserts.
31+
//! upserts.map(|(key,opt_val)| (key, opt_val.filter(predicate)))
32+
//! ```
33+
//!
34+
//! # Example
35+
//!
36+
//! ```rust
37+
//! extern crate timely;
38+
//! extern crate differential_dataflow;
39+
//!
40+
//! fn main() {
41+
//!
42+
//! // define a new timely dataflow computation.
43+
//! timely::execute_from_args(std::env::args().skip(1), move |worker| {
44+
//!
45+
//! type Key = String;
46+
//! type Val = String;
47+
//!
48+
//! let mut input = timely::dataflow::InputHandle::new();
49+
//! let mut probe = timely::dataflow::ProbeHandle::new();
50+
//!
51+
//! // Create a dataflow demonstrating upserts.
52+
//! //
53+
//! // Upserts are a sequence of records (key, option<val>) where the intended
54+
//! // value associated with a key is the most recent value, and if that is a
55+
//! // `none` then the key is removed (until a new value shows up).
56+
//! //
57+
//! // The challenge with upserts is that the value to *retract* isn't supplied
58+
//! // as part of the input stream. We have to determine what it should be!
59+
//!
60+
//! worker.dataflow(|scope| {
61+
//!
62+
//! use timely::dataflow::operators::Input;
63+
//! use differential_dataflow::trace::implementations::ord::OrdValSpine;
64+
//! use differential_dataflow::operators::arrange::upsert;
65+
//!
66+
//! let stream = scope.input_from(&mut input);
67+
//! let arranged = upsert::arrange_from_upsert::<_, OrdValSpine<Key, Val, _, _>>(&stream, &"test");
68+
//!
69+
//! arranged
70+
//! .as_collection(|k,v| (k.clone(), v.clone()))
71+
//! .inspect(|x| println!("Observed: {:?}", x))
72+
//! .probe_with(&mut probe);
73+
//! });
74+
//!
75+
//! // Introduce the key, with a specific value.
76+
//! input.send(("frank".to_string(), Some("mcsherry".to_string()), 3));
77+
//! input.advance_to(4);
78+
//! while probe.less_than(input.time()) { worker.step(); }
79+
//!
80+
//! // Change the value to a different value.
81+
//! input.send(("frank".to_string(), Some("zappa".to_string()), 4));
82+
//! input.advance_to(5);
83+
//! while probe.less_than(input.time()) { worker.step(); }
84+
//!
85+
//! // Remove the key and its value.
86+
//! input.send(("frank".to_string(), None, 5));
87+
//! input.advance_to(9);
88+
//! while probe.less_than(input.time()) { worker.step(); }
89+
//!
90+
//! // Introduce a new totally different value
91+
//! input.send(("frank".to_string(), Some("oz".to_string()), 9));
92+
//! input.advance_to(10);
93+
//! while probe.less_than(input.time()) { worker.step(); }
94+
//!
95+
//! // Repeat the value, which should produce no output.
96+
//! input.send(("frank".to_string(), Some("oz".to_string()), 11));
97+
//! input.advance_to(12);
98+
//! while probe.less_than(input.time()) { worker.step(); }
99+
100+
//! // Remove the key and value.
101+
//! input.send(("frank".to_string(), None, 15));
102+
//! input.close();
103+
//!
104+
//! }).unwrap();
105+
//! }
106+
//! ```
107+
108+
use std::collections::{BinaryHeap, HashMap};
109+
110+
use timely::order::{PartialOrder, TotalOrder};
111+
use timely::dataflow::{Scope, Stream};
112+
use timely::dataflow::operators::generic::Operator;
113+
use timely::dataflow::channels::pact::Exchange;
114+
use timely::progress::Timestamp;
115+
use timely::progress::Antichain;
116+
use timely::dataflow::operators::Capability;
117+
118+
use timely_sort::Unsigned;
119+
120+
use ::{ExchangeData, Hashable};
121+
use lattice::Lattice;
122+
use trace::{Trace, TraceReader, Batch, Cursor};
123+
124+
use trace::Builder;
125+
126+
use operators::arrange::arrangement::Arranged;
127+
128+
use super::TraceAgent;
129+
130+
/// Arrange data from a stream of keyed upserts.
131+
///
132+
/// The input should be a stream of timestamped pairs of Key and Option<Val>.
133+
/// The contents of the collection are defined key-by-key, where each optional
134+
/// value in sequence either replaces or removes the existing value, should it
135+
/// exist.
136+
///
137+
/// This method is only implemented for totally ordered times, as we do not yet
138+
/// understand what a "sequence" of upserts would mean for partially ordered
139+
/// timestamps.
140+
pub fn arrange_from_upsert<G, Tr>(
141+
stream: &Stream<G, (Tr::Key, Option<Tr::Val>, G::Timestamp)>,
142+
name: &str,
143+
) -> Arranged<G, TraceAgent<Tr>>
144+
where
145+
G: Scope,
146+
G::Timestamp: Lattice+Ord+TotalOrder+ExchangeData,
147+
Tr::Key: ExchangeData+Hashable+std::hash::Hash,
148+
Tr::Val: ExchangeData,
149+
Tr: Trace+TraceReader<Time=G::Timestamp,R=isize>+'static,
150+
Tr::Batch: Batch<Tr::Key, Tr::Val, G::Timestamp, isize>,
151+
Tr::Cursor: Cursor<Tr::Key, Tr::Val, G::Timestamp, isize>,
152+
{
153+
let mut reader: Option<TraceAgent<Tr>> = None;
154+
155+
// fabricate a data-parallel operator using the `unary_notify` pattern.
156+
let stream = {
157+
158+
let reader = &mut reader;
159+
160+
let exchange = Exchange::new(move |update: &(Tr::Key,Option<Tr::Val>,G::Timestamp)| (update.0).hashed().as_u64());
161+
162+
stream.unary_frontier(exchange, name, move |_capability, info| {
163+
164+
// Acquire a logger for arrange events.
165+
let logger = {
166+
let scope = stream.scope();
167+
let register = scope.log_register();
168+
register.get::<::logging::DifferentialEvent>("differential/arrange")
169+
};
170+
171+
// Establish compaction effort to apply even without updates.
172+
let (activator, effort) =
173+
if let Ok(text) = ::std::env::var("DIFFERENTIAL_EAGER_MERGE") {
174+
let effort = text.parse::<isize>().expect("DIFFERENTIAL_EAGER_MERGE must be set to an integer");
175+
(Some(stream.scope().activator_for(&info.address[..])), Some(effort))
176+
}
177+
else {
178+
(None, None)
179+
};
180+
181+
// Tracks the lower envelope of times in `priority_queue`.
182+
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
183+
let mut buffer = Vec::new();
184+
// Form the trace we will both use internally and publish.
185+
let empty_trace = Tr::new(info.clone(), logger.clone(), activator);
186+
let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
187+
// Capture the reader outside the builder scope.
188+
*reader = Some(reader_local.clone());
189+
190+
// Tracks the input frontier, used to populate the lower bound of new batches.
191+
let mut input_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());
192+
193+
// For stashing input upserts, ordered increasing by time (`BinaryHeap` is a max-heap).
194+
let mut priority_queue = BinaryHeap::<std::cmp::Reverse<(G::Timestamp, Tr::Key, Option<Tr::Val>)>>::new();
195+
196+
move |input, output| {
197+
198+
// Stash capabilities and associated data (ordered by time).
199+
input.for_each(|cap, data| {
200+
capabilities.insert(cap.retain());
201+
data.swap(&mut buffer);
202+
for (key, val, time) in buffer.drain(..) {
203+
priority_queue.push(std::cmp::Reverse((time, key, val)))
204+
}
205+
});
206+
207+
// Test to see if strict progress has occurred, which happens whenever any element of
208+
// the old frontier is not greater or equal to the new frontier. It is only in this
209+
// case that we have any data processing to do.
210+
let progress = input_frontier.elements().iter().any(|t2| !input.frontier().less_equal(t2));
211+
if progress {
212+
213+
// If there is at least one capability not in advance of the input frontier ...
214+
if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {
215+
216+
let mut upper = Antichain::new(); // re-used allocation for sealing batches.
217+
218+
// For each capability not in advance of the input frontier ...
219+
for (index, capability) in capabilities.elements().iter().enumerate() {
220+
221+
if !input.frontier().less_equal(capability.time()) {
222+
223+
// Assemble the upper bound on times we can commit with this capabilities.
224+
// We must respect the input frontier, and *subsequent* capabilities, as
225+
// we are pretending to retire the capability changes one by one.
226+
upper.clear();
227+
for time in input.frontier().frontier().iter() {
228+
upper.insert(time.clone());
229+
}
230+
for other_capability in &capabilities.elements()[(index + 1) .. ] {
231+
upper.insert(other_capability.time().clone());
232+
}
233+
234+
// Extract upserts available to process as of this `upper`.
235+
let mut to_process = HashMap::new();
236+
while priority_queue.peek().map(|std::cmp::Reverse((t,_k,_v))| !upper.less_equal(t)).unwrap_or(false) {
237+
let std::cmp::Reverse((time, key, val)) = priority_queue.pop().expect("Priority queue just ensured non-empty");
238+
to_process.entry(key).or_insert(Vec::new()).push((time, std::cmp::Reverse(val)));
239+
}
240+
241+
// Put (key, list) into key order, to match cursor enumeration.
242+
let mut to_process = to_process.into_iter().collect::<Vec<_>>();
243+
to_process.sort();
244+
245+
// Prepare a cursor to the existing arrangement, and a batch builder for
246+
// new stuff that we add.
247+
let (mut trace_cursor, trace_storage) = reader_local.cursor();
248+
let mut builder = <Tr::Batch as Batch<Tr::Key,Tr::Val,G::Timestamp,Tr::R>>::Builder::new();
249+
for (key, mut list) in to_process.drain(..) {
250+
251+
// The prior value associated with the key.
252+
let mut prev_value: Option<Tr::Val> = None;
253+
254+
// Attempt to find the key in the trace.
255+
trace_cursor.seek_key(&trace_storage, &key);
256+
if trace_cursor.get_key(&trace_storage) == Some(&key) {
257+
// Determine the prior value associated with the key.
258+
while let Some(val) = trace_cursor.get_val(&trace_storage) {
259+
let mut count = 0;
260+
trace_cursor.map_times(&trace_storage, |_time, diff| count += *diff);
261+
assert!(count == 0 || count == 1);
262+
if count == 1 {
263+
assert!(prev_value.is_none());
264+
prev_value = Some(val.clone());
265+
}
266+
trace_cursor.step_val(&trace_storage);
267+
}
268+
trace_cursor.step_key(&trace_storage);
269+
}
270+
271+
// Sort the list of upserts to `key` by their time, suppress multiple updates.
272+
list.sort();
273+
list.dedup_by(|(t1,_), (t2,_)| t1 == t2);
274+
// Process distinct times
275+
for (time, std::cmp::Reverse(next)) in list {
276+
if prev_value != next {
277+
if let Some(prev) = prev_value {
278+
builder.push((key.clone(), prev, time.clone(), -1));
279+
}
280+
if let Some(next) = next.as_ref() {
281+
builder.push((key.clone(), next.clone(), time.clone(), 1));
282+
}
283+
prev_value = next;
284+
}
285+
}
286+
}
287+
let batch = builder.done(input_frontier.elements(), upper.elements(), &[G::Timestamp::minimum()]);
288+
input_frontier.clone_from(&upper);
289+
290+
// Communicate `batch` to the arrangement and the stream.
291+
writer.insert(batch.clone(), Some(capability.time().clone()));
292+
output.session(&capabilities.elements()[index]).give(batch);
293+
}
294+
}
295+
296+
// Having extracted and sent batches between each capability and the input frontier,
297+
// we should downgrade all capabilities to match the batcher's lower update frontier.
298+
// This may involve discarding capabilities, which is fine as any new updates arrive
299+
// in messages with new capabilities.
300+
301+
let mut new_capabilities = Antichain::new();
302+
if let Some(std::cmp::Reverse((time, _, _))) = priority_queue.peek() {
303+
if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) {
304+
new_capabilities.insert(capability.delayed(time));
305+
}
306+
else {
307+
panic!("failed to find capability");
308+
}
309+
}
310+
311+
capabilities = new_capabilities;
312+
}
313+
else {
314+
// Announce progress updates, even without data.
315+
writer.seal(&input.frontier().frontier()[..]);
316+
}
317+
318+
// Update our view of the input frontier.
319+
input_frontier.clear();
320+
input_frontier.extend(input.frontier().frontier().iter().cloned());
321+
322+
// Downgrade capabilities for `reader_local`.
323+
reader_local.advance_by(input_frontier.elements());
324+
reader_local.distinguish_since(input_frontier.elements());
325+
}
326+
327+
if let Some(mut fuel) = effort.clone() {
328+
writer.exert(&mut fuel);
329+
}
330+
}
331+
})
332+
};
333+
334+
Arranged { stream: stream, trace: reader.unwrap() }
335+
336+
}

0 commit comments

Comments
 (0)