Skip to content

Commit 932f17d

Browse files
authored
[FLINK-37458][datastream] Forbid enableAsyncState() for synchronous operators (#26283)
1 parent 6099697 commit 932f17d

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
2828
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
2929
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
30+
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;
3031

3132
import org.apache.flink.shaded.guava33.com.google.common.collect.Lists;
3233

@@ -195,6 +196,11 @@ public boolean isInternalSorterSupported() {
195196

196197
@Override
197198
public void enableAsyncState() {
198-
// nothing to do.
199+
OneInputStreamOperator<IN, OUT> operator =
200+
(OneInputStreamOperator<IN, OUT>)
201+
((SimpleOperatorFactory<OUT>) operatorFactory).getOperator();
202+
if (!(operator instanceof AsyncStateProcessingOperator)) {
203+
super.enableAsyncState();
204+
}
199205
}
200206
}

flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/JobGraphGeneratorTestBase.java

+29
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
2525
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
2626
import org.apache.flink.api.common.functions.FilterFunction;
27+
import org.apache.flink.api.common.functions.FlatMapFunction;
2728
import org.apache.flink.api.common.functions.MapFunction;
2829
import org.apache.flink.api.common.functions.ReduceFunction;
2930
import org.apache.flink.api.common.io.InputFormat;
@@ -119,6 +120,7 @@
119120
import org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask;
120121
import org.apache.flink.streaming.util.TestAnyModeReadingStreamOperator;
121122
import org.apache.flink.util.AbstractID;
123+
import org.apache.flink.util.Collector;
122124
import org.apache.flink.util.SerializedValue;
123125

124126
import org.apache.flink.shaded.guava33.com.google.common.collect.Iterables;
@@ -155,6 +157,7 @@
155157
import static org.apache.flink.util.Preconditions.checkNotNull;
156158
import static org.assertj.core.api.Assertions.assertThat;
157159
import static org.assertj.core.api.Assertions.assertThatThrownBy;
160+
import static org.assertj.core.api.Assertions.fail;
158161

159162
/**
160163
* Tests for {@link StreamingJobGraphGenerator} and {@link AdaptiveGraphManager}.
@@ -2163,6 +2166,32 @@ void testOutputFormatSupportConcurrentExecutionAttempts() {
21632166
new TestingOutputFormatSupportConcurrentExecutionAttempts<>(), true);
21642167
}
21652168

2169+
@Test
2170+
void testEnableAsyncStateForSyncOperatorThrowException() throws Exception {
2171+
final StreamExecutionEnvironment env =
2172+
StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
2173+
try {
2174+
env.fromData(1, 2, 3, 4, 5)
2175+
.keyBy(k -> k)
2176+
.flatMap(
2177+
new FlatMapFunction<Integer, Integer>() {
2178+
@Override
2179+
public void flatMap(Integer value, Collector<Integer> out)
2180+
throws Exception {
2181+
out.collect(value);
2182+
}
2183+
})
2184+
.enableAsyncState()
2185+
.print();
2186+
fail("Enabling async state for synchronous operators is forbidden.");
2187+
} catch (UnsupportedOperationException e) {
2188+
assertThat(e.getMessage())
2189+
.isEqualTo(
2190+
"The transformation does not support "
2191+
+ "async state, or you are enabling the async state without a keyed context (not behind a keyBy()).");
2192+
}
2193+
}
2194+
21662195
private void testWhetherOutputFormatSupportsConcurrentExecutionAttempts(
21672196
OutputFormat<Integer> outputFormat, boolean isSupported) {
21682197
final StreamExecutionEnvironment env =

0 commit comments

Comments
 (0)