Skip to content

Commit a7f69e6

Browse files
dreamer-89pranikum
authored andcommitted
[Semgnet Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test (opensearch-project#4414)
* [Semgnet Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test Signed-off-by: Suraj Singh <surajrider@gmail.com> * Add changelog entry Signed-off-by: Suraj Singh <surajrider@gmail.com> * Update changelog entry Signed-off-by: Suraj Singh <surajrider@gmail.com> Signed-off-by: Suraj Singh <surajrider@gmail.com>
1 parent 55338b1 commit a7f69e6

File tree

4 files changed

+15
-9
lines changed

4 files changed

+15
-9
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
4242
- Fix flaky random test `NRTReplicationEngineTests.testUpdateSegments` ([#4352](https://github.com/opensearch-project/OpenSearch/pull/4352))
4343
- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386))
4444
- [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363))
45+
- [Segment Replication] Update flaky testOnNewCheckpointFromNewPrimaryCancelOngoingReplication unit test ([#4414](https://github.com/opensearch-project/OpenSearch/pull/4414))
4546
- Fixed the `_cat/shards/10_basic.yml` test cases fix.
4647

4748
### Security

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,9 @@ public void startReplication(ActionListener<Void> listener) {
160160
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();
161161
final StepListener<Void> finalizeListener = new StepListener<>();
162162

163+
cancellableThreads.checkForCancel();
163164
logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId());
164165
// Get list of files to copy from this checkpoint.
165-
cancellableThreads.checkForCancel();
166166
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
167167
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);
168168

server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.cluster.metadata.IndexMetadata;
1616
import org.opensearch.common.settings.ClusterSettings;
1717
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.common.util.CancellableThreads;
1819
import org.opensearch.index.engine.NRTReplicationEngineFactory;
1920
import org.opensearch.index.shard.IndexShard;
2021
import org.opensearch.index.shard.IndexShardTestCase;
@@ -29,6 +30,7 @@
2930
import java.util.concurrent.TimeUnit;
3031

3132
import static org.mockito.ArgumentMatchers.any;
33+
import static org.mockito.Mockito.doReturn;
3234
import static org.mockito.Mockito.mock;
3335
import static org.mockito.Mockito.when;
3436
import static org.mockito.Mockito.doAnswer;
@@ -37,6 +39,7 @@
3739
import static org.mockito.Mockito.times;
3840
import static org.mockito.Mockito.spy;
3941
import static org.mockito.Mockito.eq;
42+
import static org.opensearch.indices.replication.SegmentReplicationState.Stage.CANCELLED;
4043

4144
public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
4245

@@ -215,24 +218,25 @@ public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws I
215218
// Mocking response when startReplication is called on targetSpy we send a new checkpoint to serviceSpy and later reduce countdown
216219
// of latch.
217220
doAnswer(invocation -> {
218-
final ActionListener<Void> listener = invocation.getArgument(0);
221+
// short circuit loop on new checkpoint request
222+
doReturn(null).when(serviceSpy).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any());
219223
// a new checkpoint arrives before we've completed.
220224
serviceSpy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard);
221-
listener.onResponse(null);
222-
latch.countDown();
225+
try {
226+
invocation.callRealMethod();
227+
} catch (CancellableThreads.ExecutionCancelledException e) {
228+
latch.countDown();
229+
}
223230
return null;
224231
}).when(targetSpy).startReplication(any());
225-
doNothing().when(targetSpy).onDone();
226232

227233
// start replication. This adds the target to on-ongoing replication collection
228234
serviceSpy.startReplication(targetSpy);
229-
235+
latch.await();
230236
// wait for the new checkpoint to arrive, before the listener completes.
231-
latch.await(5, TimeUnit.SECONDS);
232-
doNothing().when(targetSpy).startReplication(any());
237+
assertEquals(CANCELLED, targetSpy.state().getStage());
233238
verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary");
234239
verify(serviceSpy, times(1)).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any());
235-
closeShards(replicaShard);
236240
}
237241

238242
public void testNewCheckpointBehindCurrentCheckpoint() {

test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java

+1
Original file line numberDiff line numberDiff line change
@@ -1207,6 +1207,7 @@ public void getCheckpointMetadata(
12071207
copyState.getPendingDeleteFiles()
12081208
)
12091209
);
1210+
copyState.decRef();
12101211
} catch (IOException e) {
12111212
logger.error("Unexpected error computing CopyState", e);
12121213
Assert.fail("Failed to compute copyState");

0 commit comments

Comments
 (0)