33
33
import org .opensearch .index .engine .Segment ;
34
34
import org .opensearch .index .shard .IndexShard ;
35
35
import org .opensearch .indices .IndicesService ;
36
+ import org .opensearch .indices .recovery .FileChunkRequest ;
36
37
import org .opensearch .indices .replication .common .ReplicationType ;
38
+ import org .opensearch .plugins .Plugin ;
37
39
import org .opensearch .test .BackgroundIndexer ;
38
40
import org .opensearch .test .InternalTestCluster ;
39
41
import org .opensearch .test .OpenSearchIntegTestCase ;
42
+ import org .opensearch .test .transport .MockTransportService ;
43
+ import org .opensearch .transport .TransportService ;
40
44
41
45
import java .io .IOException ;
46
+ import java .util .Collection ;
42
47
import java .util .Arrays ;
43
48
import java .util .List ;
44
49
import java .util .Map ;
45
- import java .util .Optional ;
46
50
import java .util .Set ;
51
+ import java .util .Optional ;
52
+ import java .util .concurrent .CountDownLatch ;
47
53
import java .util .concurrent .TimeUnit ;
48
54
import java .util .function .Function ;
49
55
import java .util .stream .Collectors ;
@@ -65,6 +71,11 @@ public static void assumeFeatureFlag() {
65
71
assumeTrue ("Segment replication Feature flag is enabled" , Boolean .parseBoolean (System .getProperty (FeatureFlags .REPLICATION_TYPE )));
66
72
}
67
73
74
+ @ Override
75
+ protected Collection <Class <? extends Plugin >> nodePlugins () {
76
+ return Arrays .asList (MockTransportService .TestPlugin .class );
77
+ }
78
+
68
79
@ Override
69
80
public Settings indexSettings () {
70
81
return Settings .builder ()
@@ -318,6 +329,65 @@ public void testReplicationAfterForceMerge() throws Exception {
318
329
}
319
330
}
320
331
332
+ public void testCancellation () throws Exception {
333
+ final String primaryNode = internalCluster ().startNode ();
334
+ createIndex (INDEX_NAME , Settings .builder ().put (indexSettings ()).put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 1 ).build ());
335
+ ensureYellow (INDEX_NAME );
336
+
337
+ final String replicaNode = internalCluster ().startNode ();
338
+
339
+ final SegmentReplicationSourceService segmentReplicationSourceService = internalCluster ().getInstance (
340
+ SegmentReplicationSourceService .class ,
341
+ primaryNode
342
+ );
343
+ final IndexShard primaryShard = getIndexShard (primaryNode );
344
+
345
+ CountDownLatch latch = new CountDownLatch (1 );
346
+
347
+ MockTransportService mockTransportService = ((MockTransportService ) internalCluster ().getInstance (
348
+ TransportService .class ,
349
+ primaryNode
350
+ ));
351
+ mockTransportService .addSendBehavior (
352
+ internalCluster ().getInstance (TransportService .class , replicaNode ),
353
+ (connection , requestId , action , request , options ) -> {
354
+ if (action .equals (SegmentReplicationTargetService .Actions .FILE_CHUNK )) {
355
+ FileChunkRequest req = (FileChunkRequest ) request ;
356
+ logger .debug ("file chunk [{}] lastChunk: {}" , req , req .lastChunk ());
357
+ if (req .name ().endsWith ("cfs" ) && req .lastChunk ()) {
358
+ try {
359
+ latch .await ();
360
+ } catch (InterruptedException e ) {
361
+ throw new RuntimeException (e );
362
+ }
363
+ }
364
+ }
365
+ connection .sendRequest (requestId , action , request , options );
366
+ }
367
+ );
368
+
369
+ final int docCount = scaledRandomIntBetween (0 , 200 );
370
+ try (
371
+ BackgroundIndexer indexer = new BackgroundIndexer (
372
+ INDEX_NAME ,
373
+ "_doc" ,
374
+ client (),
375
+ -1 ,
376
+ RandomizedTest .scaledRandomIntBetween (2 , 5 ),
377
+ false ,
378
+ random ()
379
+ )
380
+ ) {
381
+ indexer .start (docCount );
382
+ waitForDocs (docCount , indexer );
383
+
384
+ flush (INDEX_NAME );
385
+ }
386
+ segmentReplicationSourceService .beforeIndexShardClosed (primaryShard .shardId (), primaryShard , indexSettings ());
387
+ latch .countDown ();
388
+ assertDocCounts (docCount , primaryNode );
389
+ }
390
+
321
391
public void testStartReplicaAfterPrimaryIndexesDocs () throws Exception {
322
392
final String primaryNode = internalCluster ().startNode ();
323
393
createIndex (INDEX_NAME , Settings .builder ().put (indexSettings ()).put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 ).build ());
0 commit comments