Skip to content

Commit 18ce835

Browse files
authoredAug 15, 2022
adds cancellation of the source future support (#3146)
1 parent bc43e68 commit 18ce835

File tree

2 files changed

+32
-7
lines changed

2 files changed

+32
-7
lines changed
 

‎reactor-core/src/main/java/reactor/core/publisher/MonoCompletionStage.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.CancellationException;
2121
import java.util.concurrent.CompletionException;
2222
import java.util.concurrent.CompletionStage;
23+
import java.util.concurrent.Future;
2324

2425
import reactor.core.CoreSubscriber;
2526
import reactor.core.Exceptions;
@@ -47,7 +48,15 @@ final class MonoCompletionStage<T> extends Mono<T>
4748
@Override
4849
public void subscribe(CoreSubscriber<? super T> actual) {
4950
Operators.MonoSubscriber<T, T>
50-
sds = new Operators.MonoSubscriber<>(actual);
51+
sds = new Operators.MonoSubscriber<T, T>(actual) {
52+
@Override
53+
public void cancel() {
54+
super.cancel();
55+
if (future instanceof Future) {
56+
((Future<?>) future).cancel(true);
57+
}
58+
}
59+
};
5160

5261
actual.onSubscribe(sds);
5362

‎reactor-core/src/test/java/reactor/core/publisher/MonoCompletionStageTest.java

+21-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2015-2021 VMware Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,6 +33,22 @@
3333
public class
3434
MonoCompletionStageTest {
3535

36+
//https://github.com/reactor/reactor-core/issues/3138
37+
@Test
38+
public void propagateCancellationToCompletionFuture() {
39+
CompletableFuture<Integer> future = new CompletableFuture<>();
40+
41+
Mono<Integer> mono = Mono
42+
.fromFuture(future);
43+
44+
StepVerifier.create(mono)
45+
.expectSubscription()
46+
.thenCancel()
47+
.verify();
48+
49+
assertThat(future).isCancelled();
50+
}
51+
3652
@Test
3753
public void cancelThenFutureFails() {
3854
CompletableFuture<Integer> future = new CompletableFuture<>();
@@ -45,13 +61,13 @@ public void cancelThenFutureFails() {
4561
StepVerifier.create(mono)
4662
.expectSubscription()
4763
.then(() -> {
48-
subRef.get().cancel();
49-
future.completeExceptionally(new IllegalStateException("boom"));
50-
future.complete(1);
64+
subRef.get().cancel();
65+
future.completeExceptionally(new IllegalStateException("boom"));
66+
future.complete(1);
5167
})
5268
.thenCancel()//already cancelled but need to get to verification
5369
.verifyThenAssertThat()
54-
.hasDroppedErrorWithMessage("boom");
70+
.hasNotDroppedErrors();
5571
}
5672

5773
@Test

0 commit comments

Comments
 (0)
Please sign in to comment.