Skip to content

Commit c649eb0

Browse files
ashking94Vishalks
authored andcommitted
[Remote Store] Change behaviour in replica recovery for remote translog enabled indices (opensearch-project#4318)
Signed-off-by: Ashish Singh <ssashish@amazon.com> Signed-off-by: Vishal Sarda <vsarda@amazon.com>
1 parent 21038b5 commit c649eb0

File tree

7 files changed

+369
-95
lines changed

7 files changed

+369
-95
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
4040
- Add index specific setting for remote repository ([#4253](https://github.com/opensearch-project/OpenSearch/pull/4253))
4141
- [Segment Replication] Update replicas to commit SegmentInfos instead of relying on SIS files from primary shards. ([#4402](https://github.com/opensearch-project/OpenSearch/pull/4402))
4242
- [CCR] Add getHistoryOperationsFromTranslog method to fetch the history snapshot from translogs ([#3948](https://github.com/opensearch-project/OpenSearch/pull/3948))
43+
- [Remote Store] Change behaviour in replica recovery for remote translog enabled indices ([#4318](https://github.com/opensearch-project/OpenSearch/pull/4318))
4344

4445
### Deprecated
4546

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+57-9
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,8 @@
163163
import org.opensearch.indices.recovery.RecoveryListener;
164164
import org.opensearch.indices.recovery.RecoveryState;
165165
import org.opensearch.indices.recovery.RecoveryTarget;
166-
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
167166
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
167+
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
168168
import org.opensearch.repositories.RepositoriesService;
169169
import org.opensearch.repositories.Repository;
170170
import org.opensearch.rest.RestStatus;
@@ -203,6 +203,7 @@
203203
import java.util.stream.StreamSupport;
204204

205205
import static org.opensearch.index.seqno.RetentionLeaseActions.RETAIN_ALL;
206+
import static org.opensearch.index.seqno.SequenceNumbers.MAX_SEQ_NO;
206207
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
207208

208209
/**
@@ -1703,13 +1704,8 @@ public void prepareForIndexRecovery() {
17031704
* @return a sequence number that an operation-based peer recovery can start with.
17041705
* This is the first operation after the local checkpoint of the safe commit if exists.
17051706
*/
1706-
public long recoverLocallyUpToGlobalCheckpoint() {
1707-
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
1708-
if (state != IndexShardState.RECOVERING) {
1709-
throw new IndexShardNotRecoveringException(shardId, state);
1710-
}
1711-
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
1712-
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
1707+
private long recoverLocallyUpToGlobalCheckpoint() {
1708+
validateLocalRecoveryState();
17131709
final Optional<SequenceNumbers.CommitInfo> safeCommit;
17141710
final long globalCheckpoint;
17151711
try {
@@ -1792,6 +1788,54 @@ public long recoverLocallyUpToGlobalCheckpoint() {
17921788
}
17931789
}
17941790

1791+
public long recoverLocallyAndFetchStartSeqNo(boolean localTranslog) {
1792+
if (localTranslog) {
1793+
return recoverLocallyUpToGlobalCheckpoint();
1794+
} else {
1795+
return recoverLocallyUptoLastCommit();
1796+
}
1797+
}
1798+
1799+
/**
1800+
* The method figures out the sequence number basis the last commit.
1801+
*
1802+
* @return the starting sequence number from which the recovery should start.
1803+
*/
1804+
private long recoverLocallyUptoLastCommit() {
1805+
assert isRemoteTranslogEnabled() : "Remote translog store is not enabled";
1806+
long seqNo;
1807+
validateLocalRecoveryState();
1808+
1809+
try {
1810+
seqNo = Long.parseLong(store.readLastCommittedSegmentsInfo().getUserData().get(MAX_SEQ_NO));
1811+
} catch (org.apache.lucene.index.IndexNotFoundException e) {
1812+
logger.error("skip local recovery as no index commit found", e);
1813+
return UNASSIGNED_SEQ_NO;
1814+
} catch (Exception e) {
1815+
logger.error("skip local recovery as failed to find the safe commit", e);
1816+
return UNASSIGNED_SEQ_NO;
1817+
}
1818+
1819+
try {
1820+
maybeCheckIndex();
1821+
recoveryState.setStage(RecoveryState.Stage.TRANSLOG);
1822+
recoveryState.getTranslog().totalLocal(0);
1823+
} catch (Exception e) {
1824+
logger.error("check index failed during fetch seqNo", e);
1825+
return UNASSIGNED_SEQ_NO;
1826+
}
1827+
return seqNo;
1828+
}
1829+
1830+
private void validateLocalRecoveryState() {
1831+
assert Thread.holdsLock(mutex) == false : "recover locally under mutex";
1832+
if (state != IndexShardState.RECOVERING) {
1833+
throw new IndexShardNotRecoveringException(shardId, state);
1834+
}
1835+
recoveryState.validateCurrentStage(RecoveryState.Stage.INDEX);
1836+
assert routingEntry().recoverySource().getType() == RecoverySource.Type.PEER : "not a peer recovery [" + routingEntry() + "]";
1837+
}
1838+
17951839
public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
17961840
getEngine().translogManager().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo);
17971841
}
@@ -1998,7 +2042,7 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
19982042
private boolean assertSequenceNumbersInCommit() throws IOException {
19992043
final Map<String, String> userData = SegmentInfos.readLatestCommit(store.directory()).getUserData();
20002044
assert userData.containsKey(SequenceNumbers.LOCAL_CHECKPOINT_KEY) : "commit point doesn't contains a local checkpoint";
2001-
assert userData.containsKey(SequenceNumbers.MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
2045+
assert userData.containsKey(MAX_SEQ_NO) : "commit point doesn't contains a maximum sequence number";
20022046
assert userData.containsKey(Engine.HISTORY_UUID_KEY) : "commit point doesn't contains a history uuid";
20032047
assert userData.get(Engine.HISTORY_UUID_KEY).equals(getHistoryUUID()) : "commit point history uuid ["
20042048
+ userData.get(Engine.HISTORY_UUID_KEY)
@@ -3275,6 +3319,10 @@ private boolean isRemoteStoreEnabled() {
32753319
return (remoteStore != null && shardRouting.primary());
32763320
}
32773321

3322+
public boolean isRemoteTranslogEnabled() {
3323+
return indexSettings() != null && indexSettings().isRemoteTranslogStoreEnabled();
3324+
}
3325+
32783326
/**
32793327
* Acquire a primary operation permit whenever the shard is ready for indexing. If a permit is directly available, the provided
32803328
* ActionListener will be called on the calling thread. During relocation hand-off, permit acquisition can be delayed. The provided

server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java

+52-25
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@
3636
import org.apache.logging.log4j.Logger;
3737
import org.apache.logging.log4j.message.ParameterizedMessage;
3838
import org.apache.lucene.store.AlreadyClosedException;
39+
import org.opensearch.ExceptionsHelper;
3940
import org.opensearch.LegacyESVersion;
4041
import org.opensearch.OpenSearchException;
4142
import org.opensearch.OpenSearchTimeoutException;
42-
import org.opensearch.ExceptionsHelper;
4343
import org.opensearch.action.ActionListener;
4444
import org.opensearch.action.ActionRunnable;
4545
import org.opensearch.cluster.ClusterState;
@@ -219,6 +219,12 @@ protected void reestablishRecovery(final StartRecoveryRequest request, final Str
219219
threadPool.scheduleUnlessShuttingDown(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryId, request));
220220
}
221221

222+
/**
223+
* Initiates recovery of the replica. TODO - Need to revisit it with PRRL and later. @see
224+
* <a href="https://github.com/opensearch-project/OpenSearch/issues/4502">github issue</a> on it.
225+
* @param recoveryId recovery id
226+
* @param preExistingRequest start recovery request
227+
*/
222228
private void doRecovery(final long recoveryId, final StartRecoveryRequest preExistingRequest) {
223229
final String actionName;
224230
final TransportRequest requestToSend;
@@ -238,10 +244,17 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
238244
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
239245
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
240246
indexShard.prepareForIndexRecovery();
241-
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
247+
boolean remoteTranslogEnabled = recoveryTarget.state().getPrimary() == false && indexShard.isRemoteTranslogEnabled();
248+
final long startingSeqNo = indexShard.recoverLocallyAndFetchStartSeqNo(!remoteTranslogEnabled);
242249
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
243250
: "unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
244-
startRequest = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
251+
startRequest = getStartRecoveryRequest(
252+
logger,
253+
clusterService.localNode(),
254+
recoveryTarget,
255+
startingSeqNo,
256+
!remoteTranslogEnabled
257+
);
245258
requestToSend = startRequest;
246259
actionName = PeerRecoverySourceService.Actions.START_RECOVERY;
247260
} catch (final Exception e) {
@@ -270,44 +283,58 @@ private void doRecovery(final long recoveryId, final StartRecoveryRequest preExi
270283
);
271284
}
272285

286+
public static StartRecoveryRequest getStartRecoveryRequest(
287+
Logger logger,
288+
DiscoveryNode localNode,
289+
RecoveryTarget recoveryTarget,
290+
long startingSeqNo
291+
) {
292+
return getStartRecoveryRequest(logger, localNode, recoveryTarget, startingSeqNo, true);
293+
}
294+
273295
/**
274296
* Prepare the start recovery request.
275297
*
276-
* @param logger the logger
277-
* @param localNode the local node of the recovery target
278-
* @param recoveryTarget the target of the recovery
279-
* @param startingSeqNo a sequence number that an operation-based peer recovery can start with.
280-
* This is the first operation after the local checkpoint of the safe commit if exists.
298+
* @param logger the logger
299+
* @param localNode the local node of the recovery target
300+
* @param recoveryTarget the target of the recovery
301+
* @param startingSeqNo a sequence number that an operation-based peer recovery can start with.
302+
* This is the first operation after the local checkpoint of the safe commit if exists.
303+
* @param verifyTranslog should the recovery request validate translog consistency with snapshot store metadata.
281304
* @return a start recovery request
282305
*/
283306
public static StartRecoveryRequest getStartRecoveryRequest(
284307
Logger logger,
285308
DiscoveryNode localNode,
286309
RecoveryTarget recoveryTarget,
287-
long startingSeqNo
310+
long startingSeqNo,
311+
boolean verifyTranslog
288312
) {
289313
final StartRecoveryRequest request;
290314
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
291315

292316
Store.MetadataSnapshot metadataSnapshot;
293317
try {
294318
metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata();
295-
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene index.
296-
try {
297-
final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);
298-
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
299-
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
300-
} catch (IOException | TranslogCorruptedException e) {
301-
logger.warn(
302-
new ParameterizedMessage(
303-
"error while reading global checkpoint from translog, "
304-
+ "resetting the starting sequence number from {} to unassigned and recovering as if there are none",
305-
startingSeqNo
306-
),
307-
e
308-
);
309-
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
310-
startingSeqNo = UNASSIGNED_SEQ_NO;
319+
if (verifyTranslog) {
320+
// Make sure that the current translog is consistent with the Lucene index; otherwise, we have to throw away the Lucene
321+
// index.
322+
try {
323+
final String expectedTranslogUUID = metadataSnapshot.getCommitUserData().get(Translog.TRANSLOG_UUID_KEY);
324+
final long globalCheckpoint = Translog.readGlobalCheckpoint(recoveryTarget.translogLocation(), expectedTranslogUUID);
325+
assert globalCheckpoint + 1 >= startingSeqNo : "invalid startingSeqNo " + startingSeqNo + " >= " + globalCheckpoint;
326+
} catch (IOException | TranslogCorruptedException e) {
327+
logger.warn(
328+
new ParameterizedMessage(
329+
"error while reading global checkpoint from translog, "
330+
+ "resetting the starting sequence number from {} to unassigned and recovering as if there are none",
331+
startingSeqNo
332+
),
333+
e
334+
);
335+
metadataSnapshot = Store.MetadataSnapshot.EMPTY;
336+
startingSeqNo = UNASSIGNED_SEQ_NO;
337+
}
311338
}
312339
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
313340
// happens on an empty folder. no need to log

0 commit comments

Comments
 (0)