49
49
import org .apache .flink .table .runtime .typeutils .RowDataSerializer ;
50
50
51
51
import java .time .ZoneId ;
52
- import java .util .ArrayList ;
53
- import java .util .List ;
54
- import java .util .concurrent .atomic .AtomicReference ;
55
52
56
53
/**
57
54
* A {@link AsyncStateWindowJoinOperator} implemented by async state api.
@@ -196,7 +193,7 @@ public void onProcessingTime(InternalTimer<RowData, Long> timer) throws Exceptio
196
193
197
194
@ Override
198
195
public void onEventTime (InternalTimer <RowData , Long > timer ) throws Exception {
199
- asyncProcessWithKey ( timer . getKey (), () -> triggerJoin (timer .getNamespace () ));
196
+ triggerJoin (timer .getNamespace ());
200
197
}
201
198
202
199
/**
@@ -209,46 +206,13 @@ public void onEventTime(InternalTimer<RowData, Long> timer) throws Exception {
209
206
private void triggerJoin (long window ) {
210
207
StateFuture <StateIterator <RowData >> leftDataFuture = leftWindowState .asyncGet (window );
211
208
StateFuture <StateIterator <RowData >> rightDataFuture = rightWindowState .asyncGet (window );
212
-
213
- // join left records and right records
214
- AtomicReference <List <RowData >> leftDataRef = new AtomicReference <>();
215
- AtomicReference <List <RowData >> rightDataRef = new AtomicReference <>();
216
- leftDataFuture .thenCombine (
217
- rightDataFuture ,
218
- (leftDataIterator , rightDataIterator ) -> {
219
- StateFuture <Void > leftLoadToMemFuture ;
220
- if (leftDataIterator == null ) {
221
- leftDataRef .set (null );
222
- leftLoadToMemFuture = StateFutureUtils .completedVoidFuture ();
223
- } else {
224
- leftDataRef .set (new ArrayList <>());
225
- leftLoadToMemFuture =
226
- leftDataIterator .onNext (
227
- data -> {
228
- leftDataRef .get ().add (data );
229
- });
230
- }
231
-
232
- StateFuture <Void > rightLoadToMemFuture ;
233
- if (rightDataIterator == null ) {
234
- rightDataRef .set (null );
235
- rightLoadToMemFuture = StateFutureUtils .completedVoidFuture ();
236
- } else {
237
- rightDataRef .set (new ArrayList <>());
238
- rightLoadToMemFuture =
239
- rightDataIterator .onNext (
240
- data -> {
241
- rightDataRef .get ().add (data );
242
- });
243
- }
244
-
245
- return leftLoadToMemFuture .thenCombine (
246
- rightLoadToMemFuture ,
247
- (VOID1 , VOID2 ) -> {
248
- helper .joinAndClear (window , leftDataRef .get (), rightDataRef .get ());
249
- return null ;
250
- });
251
- });
209
+ StateFutureUtils .toIterable (leftDataFuture )
210
+ .thenCombine (
211
+ StateFutureUtils .toIterable (rightDataFuture ),
212
+ (leftDataIterator , rightDataIterator ) -> {
213
+ helper .joinAndClear (window , leftDataIterator , rightDataIterator );
214
+ return null ;
215
+ });
252
216
}
253
217
254
218
private class AsyncStateWindowJoinHelper extends WindowJoinHelper {
0 commit comments