Skip to content

Commit 67f23e2

Browse files
authored
Merge branch 'main' into cancel_ongoing_request
2 parents 45428ad + b206e98 commit 67f23e2

File tree

8 files changed

+113
-16
lines changed

8 files changed

+113
-16
lines changed

CHANGELOG.md

+2
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
3535
- Add timeout on Mockito.verify to reduce flakyness in testReplicationOnDone test([#4314](https://github.com/opensearch-project/OpenSearch/pull/4314))
3636
- Commit workflow for dependabot changelog helper ([#4331](https://github.com/opensearch-project/OpenSearch/pull/4331))
3737
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))
38+
- [Segment Replication] Bump segment infos counter before commit during replica promotion ([#4365](https://github.com/opensearch-project/OpenSearch/pull/4365))
3839
- Bugs for dependabot changelog verifier workflow ([#4364](https://github.com/opensearch-project/OpenSearch/pull/4364))
3940
- Fix flaky random test `NRTReplicationEngineTests.testUpdateSegments` ([#4352](https://github.com/opensearch-project/OpenSearch/pull/4352))
4041
- [Segment Replication] Extend FileChunkWriter to allow cancel on transport client ([#4386](https://github.com/opensearch-project/OpenSearch/pull/4386))
42+
- [Segment Replication] Add check to cancel ongoing replication with old primary on onNewCheckpoint on replica ([#4363](https://github.com/opensearch-project/OpenSearch/pull/4363))
4143

4244
### Security
4345
- CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341))

server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java

+9
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ public class NRTReplicationEngine extends Engine {
5454
private final LocalCheckpointTracker localCheckpointTracker;
5555
private final WriteOnlyTranslogManager translogManager;
5656

57+
private static final int SI_COUNTER_INCREMENT = 10;
58+
5759
public NRTReplicationEngine(EngineConfig engineConfig) {
5860
super(engineConfig);
5961
store.incRef();
@@ -142,6 +144,13 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
142144
public void commitSegmentInfos() throws IOException {
143145
// TODO: This method should wait for replication events to finalize.
144146
final SegmentInfos latestSegmentInfos = getLatestSegmentInfos();
147+
/*
148+
This is a workaround solution which decreases the chances of conflict on replica nodes when same file is copied
149+
from two different primaries during failover. Increasing counter helps in avoiding this conflict as counter is
150+
used to generate new segment file names. The ideal solution is to identify the counter from previous primary.
151+
*/
152+
latestSegmentInfos.counter = latestSegmentInfos.counter + SI_COUNTER_INCREMENT;
153+
latestSegmentInfos.changed();
145154
store.commitSegmentInfos(latestSegmentInfos, localCheckpointTracker.getMaxSeqNo(), localCheckpointTracker.getProcessedCheckpoint());
146155
translogManager.syncTranslog();
147156
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

+4
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ public class SegmentReplicationTarget extends ReplicationTarget {
5656
private final SegmentReplicationState state;
5757
protected final MultiFileWriter multiFileWriter;
5858

59+
public ReplicationCheckpoint getCheckpoint() {
60+
return this.checkpoint;
61+
}
62+
5963
public SegmentReplicationTarget(
6064
ReplicationCheckpoint checkpoint,
6165
IndexShard indexShard,

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

+19-10
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.opensearch.common.Nullable;
1919
import org.opensearch.common.settings.Settings;
2020
import org.opensearch.common.util.CancellableThreads;
21+
import org.opensearch.common.util.concurrent.ConcurrentCollections;
2122
import org.opensearch.index.shard.IndexEventListener;
2223
import org.opensearch.index.shard.IndexShard;
2324
import org.opensearch.index.shard.ShardId;
@@ -34,7 +35,6 @@
3435
import org.opensearch.transport.TransportRequestHandler;
3536
import org.opensearch.transport.TransportService;
3637

37-
import java.util.HashMap;
3838
import java.util.Map;
3939
import java.util.concurrent.atomic.AtomicLong;
4040

@@ -54,7 +54,7 @@ public class SegmentReplicationTargetService implements IndexEventListener {
5454

5555
private final SegmentReplicationSourceFactory sourceFactory;
5656

57-
private final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = new HashMap<>();
57+
private final Map<ShardId, ReplicationCheckpoint> latestReceivedCheckpoint = ConcurrentCollections.newConcurrentMap();
5858

5959
// Empty Implementation, only required while Segment Replication is under feature flag.
6060
public static final SegmentReplicationTargetService NO_OP = new SegmentReplicationTargetService() {
@@ -151,14 +151,23 @@ public synchronized void onNewCheckpoint(final ReplicationCheckpoint receivedChe
151151
} else {
152152
latestReceivedCheckpoint.put(replicaShard.shardId(), receivedCheckpoint);
153153
}
154-
if (onGoingReplications.isShardReplicating(replicaShard.shardId())) {
155-
logger.trace(
156-
() -> new ParameterizedMessage(
157-
"Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}",
158-
replicaShard.getLatestReplicationCheckpoint()
159-
)
160-
);
161-
return;
154+
SegmentReplicationTarget ongoingReplicationTarget = onGoingReplications.getOngoingReplicationTarget(replicaShard.shardId());
155+
if (ongoingReplicationTarget != null) {
156+
if (ongoingReplicationTarget.getCheckpoint().getPrimaryTerm() < receivedCheckpoint.getPrimaryTerm()) {
157+
logger.trace(
158+
"Cancelling ongoing replication from old primary with primary term {}",
159+
ongoingReplicationTarget.getCheckpoint().getPrimaryTerm()
160+
);
161+
onGoingReplications.cancel(ongoingReplicationTarget.getId(), "Cancelling stuck target after new primary");
162+
} else {
163+
logger.trace(
164+
() -> new ParameterizedMessage(
165+
"Ignoring new replication checkpoint - shard is currently replicating to checkpoint {}",
166+
replicaShard.getLatestReplicationCheckpoint()
167+
)
168+
);
169+
return;
170+
}
162171
}
163172
final Thread thread = Thread.currentThread();
164173
if (replicaShard.shouldProcessCheckpoint(receivedCheckpoint)) {

server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Iterator;
5050
import java.util.List;
5151
import java.util.concurrent.ConcurrentMap;
52+
import java.util.stream.Collectors;
5253

5354
/**
5455
* This class holds a collection of all on going replication events on the current node (i.e., the node is the target node
@@ -236,13 +237,18 @@ public boolean cancelForShard(ShardId shardId, String reason) {
236237
}
237238

238239
/**
239-
* check if a shard is currently replicating
240+
* Get target for shard
240241
*
241-
* @param shardId shardId for which to check if replicating
242-
* @return true if shard is currently replicating
242+
* @param shardId shardId
243+
* @return ReplicationTarget for input shardId
243244
*/
244-
public boolean isShardReplicating(ShardId shardId) {
245-
return onGoingTargetEvents.values().stream().anyMatch(t -> t.indexShard.shardId().equals(shardId));
245+
public T getOngoingReplicationTarget(ShardId shardId) {
246+
final List<T> replicationTargetList = onGoingTargetEvents.values()
247+
.stream()
248+
.filter(t -> t.indexShard.shardId().equals(shardId))
249+
.collect(Collectors.toList());
250+
assert replicationTargetList.size() <= 1 : "More than one on-going replication targets";
251+
return replicationTargetList.size() > 0 ? replicationTargetList.get(0) : null;
246252
}
247253

248254
/**

server/src/test/java/org/opensearch/index/engine/NRTReplicationEngineTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,8 @@ public void testCommitSegmentInfos() throws Exception {
252252
// ensure getLatestSegmentInfos returns an updated infos ref with correct userdata.
253253
final SegmentInfos latestSegmentInfos = nrtEngine.getLatestSegmentInfos();
254254
assertEquals(previousInfos.getGeneration(), latestSegmentInfos.getLastGeneration());
255+
assertEquals(previousInfos.getVersion(), latestSegmentInfos.getVersion());
256+
assertEquals(previousInfos.counter, latestSegmentInfos.counter);
255257
Map<String, String> userData = latestSegmentInfos.getUserData();
256258
assertEquals(processedCheckpoint, localCheckpointTracker.getProcessedCheckpoint());
257259
assertEquals(maxSeqNo, Long.parseLong(userData.get(MAX_SEQ_NO)));

server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java

+48-1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ public class SegmentReplicationTargetServiceTests extends IndexShardTestCase {
4949
private ReplicationCheckpoint initialCheckpoint;
5050
private ReplicationCheckpoint aheadCheckpoint;
5151

52+
private ReplicationCheckpoint newPrimaryCheckpoint;
53+
5254
@Override
5355
public void setUp() throws Exception {
5456
super.setUp();
@@ -74,6 +76,13 @@ public void setUp() throws Exception {
7476
initialCheckpoint.getSeqNo(),
7577
initialCheckpoint.getSegmentInfosVersion() + 1
7678
);
79+
newPrimaryCheckpoint = new ReplicationCheckpoint(
80+
initialCheckpoint.getShardId(),
81+
initialCheckpoint.getPrimaryTerm() + 1,
82+
initialCheckpoint.getSegmentsGen(),
83+
initialCheckpoint.getSeqNo(),
84+
initialCheckpoint.getSegmentInfosVersion() + 1
85+
);
7786
}
7887

7988
@Override
@@ -160,7 +169,7 @@ public void testShardAlreadyReplicating() throws InterruptedException {
160169
// Create a spy of Target Service so that we can verify invocation of startReplication call with specific checkpoint on it.
161170
SegmentReplicationTargetService serviceSpy = spy(sut);
162171
final SegmentReplicationTarget target = new SegmentReplicationTarget(
163-
checkpoint,
172+
initialCheckpoint,
164173
replicaShard,
165174
replicationSource,
166175
mock(SegmentReplicationTargetService.SegmentReplicationListener.class)
@@ -185,9 +194,47 @@ public void testShardAlreadyReplicating() throws InterruptedException {
185194

186195
// wait for the new checkpoint to arrive, before the listener completes.
187196
latch.await(30, TimeUnit.SECONDS);
197+
verify(targetSpy, times(0)).cancel(any());
188198
verify(serviceSpy, times(0)).startReplication(eq(aheadCheckpoint), eq(replicaShard), any());
189199
}
190200

201+
public void testOnNewCheckpointFromNewPrimaryCancelOngoingReplication() throws IOException, InterruptedException {
202+
// Create a spy of Target Service so that we can verify invocation of startReplication call with specific checkpoint on it.
203+
SegmentReplicationTargetService serviceSpy = spy(sut);
204+
// Create a Mockito spy of target to stub response of few method calls.
205+
final SegmentReplicationTarget targetSpy = spy(
206+
new SegmentReplicationTarget(
207+
initialCheckpoint,
208+
replicaShard,
209+
replicationSource,
210+
mock(SegmentReplicationTargetService.SegmentReplicationListener.class)
211+
)
212+
);
213+
214+
CountDownLatch latch = new CountDownLatch(1);
215+
// Mocking response when startReplication is called on targetSpy we send a new checkpoint to serviceSpy and later reduce countdown
216+
// of latch.
217+
doAnswer(invocation -> {
218+
final ActionListener<Void> listener = invocation.getArgument(0);
219+
// a new checkpoint arrives before we've completed.
220+
serviceSpy.onNewCheckpoint(newPrimaryCheckpoint, replicaShard);
221+
listener.onResponse(null);
222+
latch.countDown();
223+
return null;
224+
}).when(targetSpy).startReplication(any());
225+
doNothing().when(targetSpy).onDone();
226+
227+
// start replication. This adds the target to on-ongoing replication collection
228+
serviceSpy.startReplication(targetSpy);
229+
230+
// wait for the new checkpoint to arrive, before the listener completes.
231+
latch.await(5, TimeUnit.SECONDS);
232+
doNothing().when(targetSpy).startReplication(any());
233+
verify(targetSpy, times(1)).cancel("Cancelling stuck target after new primary");
234+
verify(serviceSpy, times(1)).startReplication(eq(newPrimaryCheckpoint), eq(replicaShard), any());
235+
closeShards(replicaShard);
236+
}
237+
191238
public void testNewCheckpointBehindCurrentCheckpoint() {
192239
SegmentReplicationTargetService spy = spy(sut);
193240
spy.onNewCheckpoint(checkpoint, replicaShard);

server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java

+18
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,25 @@ public void onFailure(ReplicationState state, OpenSearchException e, boolean sen
105105
collection.cancel(recoveryId, "meh");
106106
}
107107
}
108+
}
108109

110+
public void testMultiReplicationsForSingleShard() throws Exception {
111+
try (ReplicationGroup shards = createGroup(0)) {
112+
final ReplicationCollection<RecoveryTarget> collection = new ReplicationCollection<>(logger, threadPool);
113+
final IndexShard shard1 = shards.addReplica();
114+
final IndexShard shard2 = shards.addReplica();
115+
final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard1);
116+
final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shard2);
117+
try {
118+
collection.getOngoingReplicationTarget(shard1.shardId());
119+
} catch (AssertionError e) {
120+
assertEquals(e.getMessage(), "More than one on-going replication targets");
121+
} finally {
122+
collection.cancel(recoveryId, "meh");
123+
collection.cancel(recoveryId2, "meh");
124+
}
125+
closeShards(shard1, shard2);
126+
}
109127
}
110128

111129
public void testRecoveryCancellation() throws Exception {

0 commit comments

Comments
 (0)