Skip to content

Commit 0415bd6

Browse files
authored
Use individual windows rather than window sets in the combining table. (#34193)
Using windows sets does not allow us to correctly compute timestamps for the resulting element when there is more than one window. This is particularly problematic for runners (such as Dataflow) that base watermarks (and window firings) on these timestamps by arranging that elements are seen in timstamp sorted order (e.g. by using the timestamp as a secondary key).
1 parent 1db2a5a commit 0415bd6

File tree

3 files changed

+55
-30
lines changed

3 files changed

+55
-30
lines changed

sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py

+21
Original file line numberDiff line numberDiff line change
@@ -1081,6 +1081,27 @@ def test_combine_per_key(self):
10811081
| beam.CombinePerKey(beam.combiners.MeanCombineFn()))
10821082
assert_that(res, equal_to([('a', 1.5), ('b', 3.0)]))
10831083

1084+
def test_windowed_combine_per_key(self):
1085+
with self.create_pipeline() as p:
1086+
input = (
1087+
p | beam.Create([12, 2, 1])
1088+
| beam.Map(lambda t: window.TimestampedValue(('k', t), t)))
1089+
1090+
fixed = input | 'Fixed' >> (
1091+
beam.WindowInto(beam.transforms.window.FixedWindows(10))
1092+
| beam.CombinePerKey(beam.combiners.MeanCombineFn()))
1093+
assert_that(fixed, equal_to([('k', 1.5), ('k', 12)]))
1094+
1095+
sliding = input | 'Sliding' >> (
1096+
beam.WindowInto(beam.transforms.window.SlidingWindows(20, 10))
1097+
| beam.CombinePerKey(beam.combiners.MeanCombineFn()))
1098+
assert_that(sliding, equal_to([('k', 1.5), ('k', 5.0), ('k', 12)]))
1099+
1100+
sessions = input | 'Sessions' >> (
1101+
beam.WindowInto(beam.transforms.window.Sessions(5))
1102+
| beam.CombinePerKey(beam.combiners.MeanCombineFn()))
1103+
assert_that(sessions, equal_to([('k', 1.5), ('k', 12)]))
1104+
10841105
def test_read(self):
10851106
# Can't use NamedTemporaryFile as a context
10861107
# due to https://bugs.python.org/issue14243

sdks/python/apache_beam/runners/worker/operations.pxd

+1
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ cdef class PGBKCVOperation(Operation):
140140
cdef long max_keys
141141
cdef long key_count
142142

143+
cpdef add_key_value(self, wkey, value, timestamp)
143144
cpdef output_key(self, wkey, value, timestamp)
144145

145146

sdks/python/apache_beam/runners/worker/operations.py

+33-30
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
from typing import DefaultDict
3030
from typing import Dict
3131
from typing import FrozenSet
32-
from typing import Hashable
3332
from typing import Iterable
3433
from typing import Iterator
3534
from typing import List
@@ -1280,33 +1279,37 @@ def process(self, wkv):
12801279
# pylint: disable=unidiomatic-typecheck
12811280
# Optimization for the global window case.
12821281
if self.is_default_windowing:
1283-
wkey = key # type: Hashable
1282+
self.add_key_value(key, value, None)
12841283
else:
1285-
wkey = tuple(wkv.windows), key
1286-
entry = self.table.get(wkey, None)
1287-
if entry is None:
1288-
if self.key_count >= self.max_keys:
1289-
target = self.key_count * 9 // 10
1290-
old_wkeys = []
1291-
# TODO(robertwb): Use an LRU cache?
1292-
for old_wkey, old_wvalue in self.table.items():
1293-
old_wkeys.append(old_wkey) # Can't mutate while iterating.
1294-
self.output_key(old_wkey, old_wvalue[0], old_wvalue[1])
1295-
self.key_count -= 1
1296-
if self.key_count <= target:
1297-
break
1298-
for old_wkey in reversed(old_wkeys):
1299-
del self.table[old_wkey]
1300-
self.key_count += 1
1301-
# We save the accumulator as a one element list so we can efficiently
1302-
# mutate when new values are added without searching the cache again.
1303-
entry = self.table[wkey] = [self.combine_fn.create_accumulator(), None]
1304-
if not self.is_default_windowing:
1305-
# Conditional as the timestamp attribute is lazily initialized.
1306-
entry[1] = wkv.timestamp
1307-
entry[0] = self.combine_fn_add_input(entry[0], value)
1308-
if not self.is_default_windowing and self.timestamp_combiner:
1309-
entry[1] = self.timestamp_combiner.combine(entry[1], wkv.timestamp)
1284+
for window in wkv.windows:
1285+
self.add_key_value((window, key),
1286+
value,
1287+
wkv.timestamp if self.timestamp_combiner else None)
1288+
1289+
def add_key_value(self, wkey, value, timestamp):
1290+
entry = self.table.get(wkey, None)
1291+
if entry is None:
1292+
if self.key_count >= self.max_keys:
1293+
target = self.key_count * 9 // 10
1294+
old_wkeys = []
1295+
# TODO(robertwb): Use an LRU cache?
1296+
for old_wkey, old_wvalue in self.table.items():
1297+
old_wkeys.append(old_wkey) # Can't mutate while iterating.
1298+
self.output_key(old_wkey, old_wvalue[0], old_wvalue[1])
1299+
self.key_count -= 1
1300+
if self.key_count <= target:
1301+
break
1302+
for old_wkey in reversed(old_wkeys):
1303+
del self.table[old_wkey]
1304+
self.key_count += 1
1305+
# We save the accumulator as a one element list so we can efficiently
1306+
# mutate when new values are added without searching the cache again.
1307+
entry = self.table[wkey] = [
1308+
self.combine_fn.create_accumulator(), timestamp
1309+
]
1310+
entry[0] = self.combine_fn_add_input(entry[0], value)
1311+
if not self.is_default_windowing and self.timestamp_combiner:
1312+
entry[1] = self.timestamp_combiner.combine(entry[1], timestamp)
13101313

13111314
def finish(self):
13121315
# type: () -> None
@@ -1331,10 +1334,10 @@ def output_key(self, wkey, accumulator, timestamp):
13311334
if self.is_default_windowing:
13321335
self.output(_globally_windowed_value.with_value((wkey, value)))
13331336
else:
1334-
windows, key = wkey
1337+
window, key = wkey
13351338
if self.timestamp_combiner is None:
1336-
timestamp = windows[0].max_timestamp()
1337-
self.output(WindowedValue((key, value), timestamp, windows))
1339+
timestamp = window.max_timestamp()
1340+
self.output(WindowedValue((key, value), timestamp, (window, )))
13381341

13391342

13401343
class FlattenOperation(Operation):

0 commit comments

Comments
 (0)