Skip to content

Commit cd961f3

Browse files
authored
Use RemoteSegmentStoreDirectory instead of RemoteDirectory (#4240)
* Use RemoteSegmentStoreDirectory instead of RemoteDirectory Signed-off-by: Sachin Kale <kalsac@amazon.com>
1 parent 7ea6e88 commit cd961f3

18 files changed

+628
-217
lines changed

CHANGELOG.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
66
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))
77

88
### Changed
9-
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
9+
- Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308))
10+
- Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240))
1011

1112
### Deprecated
1213

server/src/main/java/org/opensearch/index/IndexModule.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
import org.opensearch.index.shard.SearchOperationListener;
7171
import org.opensearch.index.similarity.SimilarityService;
7272
import org.opensearch.index.store.FsDirectoryFactory;
73-
import org.opensearch.index.store.RemoteDirectoryFactory;
7473
import org.opensearch.indices.IndicesQueryCache;
7574
import org.opensearch.indices.breaker.CircuitBreakerService;
7675
import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache;
@@ -487,7 +486,7 @@ public IndexService newIndexService(
487486
NamedWriteableRegistry namedWriteableRegistry,
488487
BooleanSupplier idFieldDataEnabled,
489488
ValuesSourceRegistry valuesSourceRegistry,
490-
RemoteDirectoryFactory remoteDirectoryFactory
489+
IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory
491490
) throws IOException {
492491
final IndexEventListener eventListener = freeze();
493492
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper

server/src/main/java/org/opensearch/index/shard/IndexShard.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@
4848
import org.apache.lucene.search.Sort;
4949
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
5050
import org.apache.lucene.store.AlreadyClosedException;
51-
import org.apache.lucene.store.Directory;
52-
import org.apache.lucene.store.FilterDirectory;
5351
import org.apache.lucene.util.SetOnce;
5452
import org.apache.lucene.util.ThreadInterruptedException;
5553
import org.opensearch.Assertions;
@@ -3228,8 +3226,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro
32283226
final List<ReferenceManager.RefreshListener> internalRefreshListener = new ArrayList<>();
32293227
internalRefreshListener.add(new RefreshMetricUpdater(refreshMetric));
32303228
if (isRemoteStoreEnabled()) {
3231-
Directory remoteDirectory = ((FilterDirectory) ((FilterDirectory) remoteStore.directory()).getDelegate()).getDelegate();
3232-
internalRefreshListener.add(new RemoteStoreRefreshListener(store.directory(), remoteDirectory));
3229+
internalRefreshListener.add(new RemoteStoreRefreshListener(this));
32333230
}
32343231
if (this.checkpointPublisher != null && indexSettings.isSegRepEnabled() && shardRouting.primary()) {
32353232
internalRefreshListener.add(new CheckpointRefreshListener(this, this.checkpointPublisher));

server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java

+134-42
Original file line numberDiff line numberDiff line change
@@ -11,32 +11,54 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14+
import org.apache.lucene.codecs.CodecUtil;
15+
import org.apache.lucene.index.IndexFileNames;
16+
import org.apache.lucene.index.SegmentInfos;
1417
import org.apache.lucene.search.ReferenceManager;
1518
import org.apache.lucene.store.Directory;
19+
import org.apache.lucene.store.FilterDirectory;
1620
import org.apache.lucene.store.IOContext;
21+
import org.apache.lucene.store.IndexInput;
22+
import org.opensearch.common.concurrent.GatedCloseable;
23+
import org.opensearch.index.engine.EngineException;
24+
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
1725

1826
import java.io.IOException;
19-
import java.nio.file.NoSuchFileException;
20-
import java.util.Arrays;
21-
import java.util.HashSet;
27+
import java.util.Collection;
28+
import java.util.Comparator;
29+
import java.util.HashMap;
30+
import java.util.List;
31+
import java.util.Map;
32+
import java.util.Optional;
2233
import java.util.Set;
34+
import java.util.concurrent.atomic.AtomicBoolean;
35+
import java.util.stream.Collectors;
2336

2437
/**
2538
* RefreshListener implementation to upload newly created segment files to the remote store
39+
*
40+
* @opensearch.internal
2641
*/
27-
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {
42+
public final class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener {
43+
// Visible for testing
44+
static final Set<String> EXCLUDE_FILES = Set.of("write.lock");
45+
// Visible for testing
46+
static final int LAST_N_METADATA_FILES_TO_KEEP = 10;
2847

48+
private final IndexShard indexShard;
2949
private final Directory storeDirectory;
30-
private final Directory remoteDirectory;
31-
// ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398)
32-
private final Set<String> filesUploadedToRemoteStore;
50+
private final RemoteSegmentStoreDirectory remoteDirectory;
51+
private final Map<String, String> localSegmentChecksumMap;
52+
private long primaryTerm;
3353
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class);
3454

35-
public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException {
36-
this.storeDirectory = storeDirectory;
37-
this.remoteDirectory = remoteDirectory;
38-
// ToDo: Handle failures in reading list of files (GitHub #3397)
39-
this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll()));
55+
public RemoteStoreRefreshListener(IndexShard indexShard) {
56+
this.indexShard = indexShard;
57+
this.storeDirectory = indexShard.store().directory();
58+
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory())
59+
.getDelegate()).getDelegate();
60+
this.primaryTerm = indexShard.getOperationPrimaryTerm();
61+
localSegmentChecksumMap = new HashMap<>();
4062
}
4163

4264
@Override
@@ -46,42 +68,112 @@ public void beforeRefresh() throws IOException {
4668

4769
/**
4870
* Upload new segment files created as part of the last refresh to the remote segment store.
49-
* The method also deletes segment files from remote store which are not part of local filesystem.
71+
* This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded.
5072
* @param didRefresh true if the refresh opened a new reference
51-
* @throws IOException in case of I/O error in reading list of local files
5273
*/
5374
@Override
54-
public void afterRefresh(boolean didRefresh) throws IOException {
55-
if (didRefresh) {
56-
Set<String> localFiles = Set.of(storeDirectory.listAll());
57-
localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> {
58-
try {
59-
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
60-
filesUploadedToRemoteStore.add(file);
61-
} catch (NoSuchFileException e) {
62-
logger.info(
63-
() -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file),
64-
e
65-
);
66-
} catch (IOException e) {
67-
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
68-
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
69-
}
70-
});
75+
public void afterRefresh(boolean didRefresh) {
76+
synchronized (this) {
77+
try {
78+
if (indexShard.shardRouting.primary()) {
79+
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) {
80+
this.primaryTerm = indexShard.getOperationPrimaryTerm();
81+
this.remoteDirectory.init();
82+
}
83+
try {
84+
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory);
85+
if (!remoteDirectory.containsFile(
86+
lastCommittedLocalSegmentFileName,
87+
getChecksumOfLocalFile(lastCommittedLocalSegmentFileName)
88+
)) {
89+
deleteStaleCommits();
90+
}
91+
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) {
92+
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get();
93+
Collection<String> refreshedLocalFiles = segmentInfos.files(true);
94+
95+
List<String> segmentInfosFiles = refreshedLocalFiles.stream()
96+
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS))
97+
.collect(Collectors.toList());
98+
Optional<String> latestSegmentInfos = segmentInfosFiles.stream()
99+
.max(Comparator.comparingLong(IndexFileNames::parseGeneration));
71100

72-
Set<String> remoteFilesToBeDeleted = new HashSet<>();
73-
// ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142)
74-
filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> {
75-
try {
76-
remoteDirectory.deleteFile(file);
77-
remoteFilesToBeDeleted.add(file);
78-
} catch (IOException e) {
79-
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
80-
logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e);
101+
if (latestSegmentInfos.isPresent()) {
102+
refreshedLocalFiles.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true));
103+
segmentInfosFiles.stream()
104+
.filter(file -> !file.equals(latestSegmentInfos.get()))
105+
.forEach(refreshedLocalFiles::remove);
106+
107+
boolean uploadStatus = uploadNewSegments(refreshedLocalFiles);
108+
if (uploadStatus) {
109+
remoteDirectory.uploadMetadata(
110+
refreshedLocalFiles,
111+
storeDirectory,
112+
indexShard.getOperationPrimaryTerm(),
113+
segmentInfos.getGeneration()
114+
);
115+
localSegmentChecksumMap.keySet()
116+
.stream()
117+
.filter(file -> !refreshedLocalFiles.contains(file))
118+
.collect(Collectors.toSet())
119+
.forEach(localSegmentChecksumMap::remove);
120+
}
121+
}
122+
} catch (EngineException e) {
123+
logger.warn("Exception while reading SegmentInfosSnapshot", e);
124+
}
125+
} catch (IOException e) {
126+
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried
127+
// in the next refresh. This should not affect durability of the indexed data after remote trans-log integration.
128+
logger.warn("Exception while uploading new segments to the remote segment store", e);
129+
}
81130
}
82-
});
131+
} catch (Throwable t) {
132+
logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t);
133+
}
134+
}
135+
}
136+
137+
// Visible for testing
138+
boolean uploadNewSegments(Collection<String> localFiles) throws IOException {
139+
AtomicBoolean uploadSuccess = new AtomicBoolean(true);
140+
localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> {
141+
try {
142+
return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file));
143+
} catch (IOException e) {
144+
logger.info(
145+
"Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file",
146+
file
147+
);
148+
return true;
149+
}
150+
}).forEach(file -> {
151+
try {
152+
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT);
153+
} catch (IOException e) {
154+
uploadSuccess.set(false);
155+
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397)
156+
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e);
157+
}
158+
});
159+
return uploadSuccess.get();
160+
}
161+
162+
private String getChecksumOfLocalFile(String file) throws IOException {
163+
if (!localSegmentChecksumMap.containsKey(file)) {
164+
try (IndexInput indexInput = storeDirectory.openInput(file, IOContext.DEFAULT)) {
165+
String checksum = Long.toString(CodecUtil.retrieveChecksum(indexInput));
166+
localSegmentChecksumMap.put(file, checksum);
167+
}
168+
}
169+
return localSegmentChecksumMap.get(file);
170+
}
83171

84-
remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove);
172+
private void deleteStaleCommits() {
173+
try {
174+
remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP);
175+
} catch (IOException e) {
176+
logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e);
85177
}
86178
}
87179
}

server/src/main/java/org/opensearch/index/shard/StoreRecovery.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,12 @@ private void recoverFromRemoteStore(IndexShard indexShard) throws IndexShardReco
449449
}
450450
indexShard.preRecovery();
451451
indexShard.prepareForIndexRecovery();
452-
final Directory remoteDirectory = remoteStore.directory();
452+
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
453+
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
454+
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
455+
: "Store.directory is not enclosing an instance of FilterDirectory";
456+
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
457+
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
453458
final Store store = indexShard.store();
454459
final Directory storeDirectory = store.directory();
455460
store.incRef();

server/src/main/java/org/opensearch/index/store/RemoteIndexInput.java

+24-11
Original file line numberDiff line numberDiff line change
@@ -27,27 +27,37 @@ public class RemoteIndexInput extends IndexInput {
2727

2828
private final InputStream inputStream;
2929
private final long size;
30+
private long filePointer;
3031

3132
public RemoteIndexInput(String name, InputStream inputStream, long size) {
3233
super(name);
3334
this.inputStream = inputStream;
3435
this.size = size;
36+
this.filePointer = 0;
3537
}
3638

3739
@Override
3840
public byte readByte() throws IOException {
3941
byte[] buffer = new byte[1];
40-
inputStream.read(buffer);
42+
int numberOfBytesRead = inputStream.read(buffer);
43+
if (numberOfBytesRead != -1) {
44+
filePointer += numberOfBytesRead;
45+
}
4146
return buffer[0];
4247
}
4348

4449
@Override
4550
public void readBytes(byte[] b, int offset, int len) throws IOException {
4651
int bytesRead = inputStream.read(b, offset, len);
47-
while (bytesRead > 0 && bytesRead < len) {
48-
len -= bytesRead;
49-
offset += bytesRead;
50-
bytesRead = inputStream.read(b, offset, len);
52+
if (bytesRead == len) {
53+
filePointer += bytesRead;
54+
} else {
55+
while (bytesRead > 0 && bytesRead < len) {
56+
filePointer += bytesRead;
57+
len -= bytesRead;
58+
offset += bytesRead;
59+
bytesRead = inputStream.read(b, offset, len);
60+
}
5161
}
5262
}
5363

@@ -61,22 +71,25 @@ public long length() {
6171
return size;
6272
}
6373

64-
@Override
65-
public void seek(long pos) throws IOException {
66-
inputStream.skip(pos);
67-
}
68-
6974
/**
7075
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified.
7176
* This method is not implemented as it is not used for the file transfer to/from the remote store.
7277
*
7378
* @throws UnsupportedOperationException always
7479
*/
7580
@Override
76-
public long getFilePointer() {
81+
public void seek(long pos) throws IOException {
7782
throw new UnsupportedOperationException();
7883
}
7984

85+
/**
86+
* Returns the current position in this file in terms of number of bytes read so far.
87+
*/
88+
@Override
89+
public long getFilePointer() {
90+
return filePointer;
91+
}
92+
8093
/**
8194
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified.
8295
* This method is not implemented as it is not used for the file transfer to/from the remote store.

0 commit comments

Comments
 (0)