19
19
package org .apache .flink .runtime .jobmaster ;
20
20
21
21
import org .apache .flink .api .common .JobStatus ;
22
+ import org .apache .flink .client .program .ClusterClient ;
22
23
import org .apache .flink .client .program .MiniClusterClient ;
24
+ import org .apache .flink .configuration .Configuration ;
25
+ import org .apache .flink .configuration .JobManagerOptions ;
23
26
import org .apache .flink .core .execution .SavepointFormatType ;
24
27
import org .apache .flink .runtime .checkpoint .CheckpointException ;
25
28
import org .apache .flink .runtime .checkpoint .CheckpointMetaData ;
36
39
import org .apache .flink .runtime .jobgraph .tasks .AbstractInvokable ;
37
40
import org .apache .flink .runtime .jobgraph .tasks .CheckpointCoordinatorConfiguration ;
38
41
import org .apache .flink .runtime .jobgraph .tasks .JobCheckpointingSettings ;
39
- import org .apache .flink .test .util .AbstractTestBaseJUnit4 ;
40
- import org .apache .flink .testutils .junit .FailsInGHAContainerWithRootUser ;
42
+ import org .apache .flink .runtime .testutils .MiniClusterResourceConfiguration ;
43
+ import org .apache .flink .test .junit5 .InjectClusterClient ;
44
+ import org .apache .flink .test .junit5 .MiniClusterExtension ;
41
45
import org .apache .flink .util .ExceptionUtils ;
42
46
43
- import org .junit .Assume ;
44
- import org .junit .Rule ;
45
- import org .junit .Test ;
46
- import org .junit .experimental .categories .Category ;
47
- import org .junit .rules .TemporaryFolder ;
47
+ import org .junit .jupiter .api .Assertions ;
48
+ import org .junit .jupiter .api .Assumptions ;
49
+ import org .junit .jupiter .api .Tag ;
50
+ import org .junit .jupiter .api .Test ;
51
+ import org .junit .jupiter .api .extension .RegisterExtension ;
52
+ import org .junit .jupiter .api .io .TempDir ;
48
53
54
+ import java .io .File ;
49
55
import java .io .IOException ;
50
56
import java .nio .file .Files ;
51
57
import java .nio .file .Path ;
61
67
import java .util .stream .Collectors ;
62
68
import java .util .stream .Stream ;
63
69
70
+ import static org .apache .flink .configuration .JobManagerOptions .SCHEDULER ;
64
71
import static org .hamcrest .MatcherAssert .assertThat ;
65
72
import static org .hamcrest .Matchers .equalTo ;
66
73
import static org .hamcrest .Matchers .hasItem ;
67
74
import static org .hamcrest .Matchers .isOneOf ;
68
- import static org .junit .Assert .assertTrue ;
69
75
70
76
/**
71
77
* Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean,
72
78
* Duration)}.
73
79
*
74
80
* @see org.apache.flink.runtime.jobmaster.JobMaster
75
81
*/
76
- public class JobMasterTriggerSavepointITCase extends AbstractTestBaseJUnit4 {
82
+ public class JobMasterTriggerSavepointITCase {
77
83
78
84
private static CountDownLatch invokeLatch ;
79
85
80
86
private static volatile CountDownLatch triggerCheckpointLatch ;
81
87
82
- @ Rule public TemporaryFolder temporaryFolder = new TemporaryFolder ();
88
+ @ TempDir protected File temporaryFolder ;
89
+
90
+ @ RegisterExtension
91
+ public static MiniClusterExtension miniClusterResource =
92
+ new MiniClusterExtension (
93
+ new MiniClusterResourceConfiguration .Builder ()
94
+ .setConfiguration (getConfiguration ())
95
+ .setNumberTaskManagers (1 )
96
+ .setNumberSlotsPerTaskManager (4 )
97
+ .build ());
98
+
99
+ private static Configuration getConfiguration () {
100
+ Configuration configuration = new Configuration ();
101
+ configuration .set (SCHEDULER , JobManagerOptions .SchedulerType .Adaptive );
102
+ return configuration ;
103
+ }
83
104
84
105
private Path savepointDirectory ;
85
- private MiniClusterClient clusterClient ;
86
106
private JobGraph jobGraph ;
87
107
88
- private void setUpWithCheckpointInterval (long checkpointInterval ) throws Exception {
108
+ private void setUpWithCheckpointInterval (
109
+ long checkpointInterval , ClusterClient <?> clusterClient ) throws Exception {
89
110
invokeLatch = new CountDownLatch (1 );
90
111
triggerCheckpointLatch = new CountDownLatch (1 );
91
- savepointDirectory = temporaryFolder .newFolder ().toPath ();
92
-
93
- Assume .assumeTrue (
94
- "ClusterClient is not an instance of MiniClusterClient" ,
95
- MINI_CLUSTER_RESOURCE .getClusterClient () instanceof MiniClusterClient );
112
+ savepointDirectory = temporaryFolder .toPath ();
96
113
97
- clusterClient = (MiniClusterClient ) MINI_CLUSTER_RESOURCE .getClusterClient ();
114
+ Assumptions .assumeTrue (
115
+ clusterClient instanceof MiniClusterClient ,
116
+ "ClusterClient is not an instance of MiniClusterClient" );
98
117
99
118
final JobVertex vertex = new JobVertex ("testVertex" );
100
119
vertex .setInvokableClass (NoOpBlockingInvokable .class );
@@ -121,15 +140,16 @@ private void setUpWithCheckpointInterval(long checkpointInterval) throws Excepti
121
140
.build ();
122
141
123
142
clusterClient .submitJob (jobGraph ).get ();
124
- assertTrue (invokeLatch .await (60 , TimeUnit .SECONDS ));
125
- waitForJob ();
143
+ Assertions . assertTrue (invokeLatch .await (60 , TimeUnit .SECONDS ));
144
+ waitForJob (clusterClient );
126
145
}
127
146
128
147
@ Test
129
- public void testStopJobAfterSavepoint () throws Exception {
130
- setUpWithCheckpointInterval (10L );
148
+ public void testStopJobAfterSavepoint (@ InjectClusterClient ClusterClient <?> clusterClient )
149
+ throws Exception {
150
+ setUpWithCheckpointInterval (10L , clusterClient );
131
151
132
- final String savepointLocation = cancelWithSavepoint ();
152
+ final String savepointLocation = cancelWithSavepoint (clusterClient );
133
153
final JobStatus jobStatus = clusterClient .getJobStatus (jobGraph .getJobID ()).get ();
134
154
135
155
assertThat (jobStatus , isOneOf (JobStatus .CANCELED , JobStatus .CANCELLING ));
@@ -142,11 +162,12 @@ public void testStopJobAfterSavepoint() throws Exception {
142
162
}
143
163
144
164
@ Test
145
- public void testStopJobAfterSavepointWithDeactivatedPeriodicCheckpointing () throws Exception {
165
+ public void testStopJobAfterSavepointWithDeactivatedPeriodicCheckpointing (
166
+ @ InjectClusterClient ClusterClient <?> clusterClient ) throws Exception {
146
167
// set checkpointInterval to Long.MAX_VALUE, which means deactivated checkpointing
147
- setUpWithCheckpointInterval (Long .MAX_VALUE );
168
+ setUpWithCheckpointInterval (Long .MAX_VALUE , clusterClient );
148
169
149
- final String savepointLocation = cancelWithSavepoint ();
170
+ final String savepointLocation = cancelWithSavepoint (clusterClient );
150
171
final JobStatus jobStatus =
151
172
clusterClient .getJobStatus (jobGraph .getJobID ()).get (60 , TimeUnit .SECONDS );
152
173
@@ -160,18 +181,19 @@ public void testStopJobAfterSavepointWithDeactivatedPeriodicCheckpointing() thro
160
181
}
161
182
162
183
@ Test
163
- @ Category (FailsInGHAContainerWithRootUser .class )
164
- public void testDoNotCancelJobIfSavepointFails () throws Exception {
165
- setUpWithCheckpointInterval (10L );
184
+ @ Tag ("org.apache.flink.testutils.junit.FailsInGHAContainerWithRootUser" )
185
+ public void testDoNotCancelJobIfSavepointFails (
186
+ @ InjectClusterClient ClusterClient <?> clusterClient ) throws Exception {
187
+ setUpWithCheckpointInterval (10L , clusterClient );
166
188
167
189
try {
168
190
Files .setPosixFilePermissions (savepointDirectory , Collections .emptySet ());
169
191
} catch (IOException e ) {
170
- Assume . assumeNoException ( e );
192
+ Assumptions . assumeTrue ( e == null );
171
193
}
172
194
173
195
try {
174
- cancelWithSavepoint ();
196
+ cancelWithSavepoint (clusterClient );
175
197
} catch (Exception e ) {
176
198
assertThat (
177
199
ExceptionUtils .findThrowable (e , CheckpointException .class ).isPresent (),
@@ -192,8 +214,9 @@ public void testDoNotCancelJobIfSavepointFails() throws Exception {
192
214
* with a meaningful exception message.
193
215
*/
194
216
@ Test
195
- public void testCancelWithSavepointWithoutConfiguredSavepointDirectory () throws Exception {
196
- setUpWithCheckpointInterval (10L );
217
+ public void testCancelWithSavepointWithoutConfiguredSavepointDirectory (
218
+ @ InjectClusterClient ClusterClient <?> clusterClient ) throws Exception {
219
+ setUpWithCheckpointInterval (10L , clusterClient );
197
220
198
221
try {
199
222
clusterClient
@@ -206,7 +229,7 @@ public void testCancelWithSavepointWithoutConfiguredSavepointDirectory() throws
206
229
}
207
230
}
208
231
209
- private void waitForJob () throws Exception {
232
+ private void waitForJob (ClusterClient <?> clusterClient ) throws Exception {
210
233
for (int i = 0 ; i < 60 ; i ++) {
211
234
try {
212
235
final JobStatus jobStatus =
@@ -275,7 +298,7 @@ public Future<Void> notifyCheckpointAbortAsync(
275
298
}
276
299
}
277
300
278
- private String cancelWithSavepoint () throws Exception {
301
+ private String cancelWithSavepoint (ClusterClient <?> clusterClient ) throws Exception {
279
302
return clusterClient
280
303
.cancelWithSavepoint (
281
304
jobGraph .getJobID (),
0 commit comments