Skip to content

Commit 3fed8d4

Browse files
committed
[Backport 2.x] Refactor remote-routing-table service inline with remote state interfaces (opensearch-project#14668)
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent 11fd0dd commit 3fed8d4

19 files changed

+793
-663
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1919
- Add matchesPluginSystemIndexPattern to SystemIndexRegistry ([#14750](https://github.com/opensearch-project/OpenSearch/pull/14750))
2020
- Add Plugin interface for loading application based configuration templates (([#14659](https://github.com/opensearch-project/OpenSearch/issues/14659)))
2121
- Add prefix mode verification setting for repository verification (([#14790](https://github.com/opensearch-project/OpenSearch/pull/14790)))
22+
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
2223

2324
### Dependencies
2425
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))

server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java

+41-189
Original file line numberDiff line numberDiff line change
@@ -11,34 +11,26 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14-
import org.apache.lucene.store.IndexInput;
1514
import org.opensearch.action.LatchedActionListener;
16-
import org.opensearch.cluster.ClusterState;
1715
import org.opensearch.cluster.DiffableUtils;
1816
import org.opensearch.cluster.routing.IndexRoutingTable;
1917
import org.opensearch.cluster.routing.RoutingTable;
2018
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
2119
import org.opensearch.common.blobstore.BlobContainer;
20+
import org.opensearch.common.CheckedRunnable;
2221
import org.opensearch.common.blobstore.BlobPath;
23-
import org.opensearch.common.blobstore.stream.write.WritePriority;
24-
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
25-
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
26-
import org.opensearch.common.io.stream.BytesStreamOutput;
2722
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
28-
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
23+
import org.opensearch.common.remote.RemoteWritableEntityStore;
2924
import org.opensearch.common.settings.ClusterSettings;
30-
import org.opensearch.common.settings.Setting;
3125
import org.opensearch.common.settings.Settings;
3226
import org.opensearch.common.util.io.IOUtils;
3327
import org.opensearch.core.action.ActionListener;
34-
import org.opensearch.core.common.bytes.BytesReference;
35-
import org.opensearch.core.index.Index;
28+
import org.opensearch.core.compress.Compressor;
3629
import org.opensearch.gateway.remote.ClusterMetadataManifest;
3730
import org.opensearch.gateway.remote.RemoteStateTransferException;
31+
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
3832
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
39-
import org.opensearch.index.remote.RemoteStoreEnums;
40-
import org.opensearch.index.remote.RemoteStorePathStrategy;
41-
import org.opensearch.index.remote.RemoteStoreUtils;
33+
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
4234
import org.opensearch.node.Node;
4335
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
4436
import org.opensearch.repositories.RepositoriesService;
@@ -51,12 +43,10 @@
5143
import java.util.List;
5244
import java.util.Map;
5345
import java.util.Optional;
54-
import java.util.concurrent.ExecutorService;
5546
import java.util.function.Function;
5647
import java.util.function.Supplier;
5748
import java.util.stream.Collectors;
5849

59-
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
6050
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
6151

6252
/**
@@ -66,64 +56,29 @@
6656
*/
6757
public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService {
6858

69-
/**
70-
* This setting is used to set the remote routing table store blob store path type strategy.
71-
*/
72-
public static final Setting<RemoteStoreEnums.PathType> REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting<>(
73-
"cluster.remote_store.routing_table.path_type",
74-
RemoteStoreEnums.PathType.HASHED_PREFIX.toString(),
75-
RemoteStoreEnums.PathType::parseString,
76-
Setting.Property.NodeScope,
77-
Setting.Property.Dynamic
78-
);
79-
80-
/**
81-
* This setting is used to set the remote routing table store blob store path hash algorithm strategy.
82-
* This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING}
83-
* is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
84-
*/
85-
public static final Setting<RemoteStoreEnums.PathHashAlgorithm> REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting<>(
86-
"cluster.remote_store.routing_table.path_hash_algo",
87-
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(),
88-
RemoteStoreEnums.PathHashAlgorithm::parseString,
89-
Setting.Property.NodeScope,
90-
Setting.Property.Dynamic
91-
);
92-
93-
public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
94-
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
95-
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";
96-
9759
private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
9860
private final Settings settings;
9961
private final Supplier<RepositoriesService> repositoriesService;
62+
private Compressor compressor;
63+
private RemoteWritableEntityStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteIndexRoutingTableStore;
64+
private final ClusterSettings clusterSettings;
10065
private BlobStoreRepository blobStoreRepository;
101-
private RemoteStoreEnums.PathType pathType;
102-
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;
103-
private ThreadPool threadPool;
66+
private final ThreadPool threadPool;
67+
private final String clusterName;
10468

10569
public InternalRemoteRoutingTableService(
10670
Supplier<RepositoriesService> repositoriesService,
10771
Settings settings,
10872
ClusterSettings clusterSettings,
109-
ThreadPool threadpool
73+
ThreadPool threadpool,
74+
String clusterName
11075
) {
11176
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
11277
this.repositoriesService = repositoriesService;
11378
this.settings = settings;
114-
this.pathType = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING);
115-
this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING);
116-
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting);
117-
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting);
11879
this.threadPool = threadpool;
119-
}
120-
121-
private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) {
122-
this.pathType = pathType;
123-
}
124-
125-
private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) {
126-
this.pathHashAlgo = pathHashAlgo;
80+
this.clusterName = clusterName;
81+
this.clusterSettings = clusterSettings;
12782
}
12883

12984
public List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable) {
@@ -150,43 +105,31 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
150105

151106
/**
152107
* Async action for writing one {@code IndexRoutingTable} to remote store
153-
* @param clusterState current cluster state
108+
* @param term current term
109+
* @param version current version
110+
* @param clusterUUID current cluster UUID
154111
* @param indexRouting indexRoutingTable to write to remote store
155112
* @param latchedActionListener listener for handling async action response
156-
* @param clusterBasePath base path for remote file
157113
*/
158114
@Override
159-
public void getIndexRoutingAsyncAction(
160-
ClusterState clusterState,
115+
public void getAsyncIndexRoutingWriteAction(
116+
String clusterUUID,
117+
long term,
118+
long version,
161119
IndexRoutingTable indexRouting,
162-
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
163-
BlobPath clusterBasePath
120+
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
164121
) {
165122

166-
BlobPath indexRoutingPath = clusterBasePath.add(INDEX_ROUTING_PATH_TOKEN);
167-
BlobPath path = pathType.path(
168-
RemoteStorePathStrategy.BasePathInput.builder().basePath(indexRoutingPath).indexUUID(indexRouting.getIndex().getUUID()).build(),
169-
pathHashAlgo
170-
);
171-
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path);
172-
173-
final String fileName = getIndexRoutingFileName(clusterState.term(), clusterState.version());
123+
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(indexRouting, clusterUUID, compressor, term, version);
174124

175125
ActionListener<Void> completionListener = ActionListener.wrap(
176-
resp -> latchedActionListener.onResponse(
177-
new ClusterMetadataManifest.UploadedIndexMetadata(
178-
indexRouting.getIndex().getName(),
179-
indexRouting.getIndex().getUUID(),
180-
path.buildAsString() + fileName,
181-
INDEX_ROUTING_METADATA_PREFIX
182-
)
183-
),
126+
resp -> latchedActionListener.onResponse(remoteIndexRoutingTable.getUploadedMetadata()),
184127
ex -> latchedActionListener.onFailure(
185128
new RemoteStateTransferException("Exception in writing index to remote store: " + indexRouting.getIndex().toString(), ex)
186129
)
187130
);
188131

189-
uploadIndex(indexRouting, fileName, blobContainer, completionListener);
132+
remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener);
190133
}
191134

192135
/**
@@ -213,111 +156,21 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
213156
return new ArrayList<>(allUploadedIndicesRouting.values());
214157
}
215158

216-
private void uploadIndex(
217-
IndexRoutingTable indexRouting,
218-
String fileName,
219-
BlobContainer blobContainer,
220-
ActionListener<Void> completionListener
221-
) {
222-
RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable(indexRouting);
223-
BytesReference bytesInput = null;
224-
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
225-
indexRoutingInput.writeTo(streamOutput);
226-
bytesInput = streamOutput.bytes();
227-
} catch (IOException e) {
228-
logger.error("Failed to serialize IndexRoutingTable for [{}]: [{}]", indexRouting, e);
229-
completionListener.onFailure(e);
230-
return;
231-
}
232-
233-
if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) {
234-
try {
235-
blobContainer.writeBlob(fileName, bytesInput.streamInput(), bytesInput.length(), true);
236-
completionListener.onResponse(null);
237-
} catch (IOException e) {
238-
logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e);
239-
completionListener.onFailure(e);
240-
}
241-
return;
242-
}
243-
244-
try (IndexInput input = new ByteArrayIndexInput("indexrouting", BytesReference.toBytes(bytesInput))) {
245-
try (
246-
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
247-
fileName,
248-
fileName,
249-
input.length(),
250-
true,
251-
WritePriority.URGENT,
252-
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
253-
null,
254-
false
255-
)
256-
) {
257-
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(
258-
remoteTransferContainer.createWriteContext(),
259-
completionListener
260-
);
261-
} catch (IOException e) {
262-
logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e);
263-
completionListener.onFailure(e);
264-
}
265-
} catch (IOException e) {
266-
logger.error(
267-
"Failed to create transfer object for IndexRoutingTable for remote store upload for indexRouting [{}]: [{}]",
268-
indexRouting,
269-
e
270-
);
271-
completionListener.onFailure(e);
272-
}
273-
}
274-
275159
@Override
276160
public void getAsyncIndexRoutingReadAction(
161+
String clusterUUID,
277162
String uploadedFilename,
278-
Index index,
279163
LatchedActionListener<IndexRoutingTable> latchedActionListener
280164
) {
281-
int idx = uploadedFilename.lastIndexOf("/");
282-
String blobFileName = uploadedFilename.substring(idx + 1);
283-
BlobContainer blobContainer = blobStoreRepository.blobStore()
284-
.blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx)));
285165

286-
readAsync(
287-
blobContainer,
288-
blobFileName,
289-
index,
290-
threadPool.executor(ThreadPool.Names.REMOTE_STATE_READ),
291-
ActionListener.wrap(
292-
response -> latchedActionListener.onResponse(response.getIndexRoutingTable()),
293-
latchedActionListener::onFailure
294-
)
166+
ActionListener<IndexRoutingTable> actionListener = ActionListener.wrap(
167+
latchedActionListener::onResponse,
168+
latchedActionListener::onFailure
295169
);
296-
}
297170

298-
private void readAsync(
299-
BlobContainer blobContainer,
300-
String name,
301-
Index index,
302-
ExecutorService executorService,
303-
ActionListener<RemoteIndexRoutingTable> listener
304-
) {
305-
executorService.execute(() -> {
306-
try {
307-
listener.onResponse(read(blobContainer, name, index));
308-
} catch (Exception e) {
309-
listener.onFailure(e);
310-
}
311-
});
312-
}
171+
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);
313172

314-
private RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) {
315-
try {
316-
return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index);
317-
} catch (IOException | AssertionError e) {
318-
logger.error(() -> new ParameterizedMessage("RoutingTable read failed for path {}", path), e);
319-
throw new RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e);
320-
}
173+
remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
321174
}
322175

323176
@Override
@@ -334,16 +187,6 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutin
334187
}).collect(Collectors.toList());
335188
}
336189

337-
private String getIndexRoutingFileName(long term, long version) {
338-
return String.join(
339-
DELIMITER,
340-
INDEX_ROUTING_FILE_PREFIX,
341-
RemoteStoreUtils.invertLong(term),
342-
RemoteStoreUtils.invertLong(version),
343-
RemoteStoreUtils.invertLong(System.currentTimeMillis())
344-
);
345-
}
346-
347190
@Override
348191
protected void doClose() throws IOException {
349192
if (blobStoreRepository != null) {
@@ -361,6 +204,16 @@ protected void doStart() {
361204
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
362205
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
363206
blobStoreRepository = (BlobStoreRepository) repository;
207+
compressor = blobStoreRepository.getCompressor();
208+
209+
this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>(
210+
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
211+
blobStoreRepository,
212+
clusterName,
213+
threadPool,
214+
ThreadPool.Names.REMOTE_STATE_READ,
215+
clusterSettings
216+
);
364217
}
365218

366219
@Override
@@ -376,5 +229,4 @@ public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOExcep
376229
throw e;
377230
}
378231
}
379-
380232
}

server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@
99
package org.opensearch.cluster.routing.remote;
1010

1111
import org.opensearch.action.LatchedActionListener;
12-
import org.opensearch.cluster.ClusterState;
1312
import org.opensearch.cluster.DiffableUtils;
1413
import org.opensearch.cluster.routing.IndexRoutingTable;
1514
import org.opensearch.cluster.routing.RoutingTable;
15+
import org.opensearch.common.CheckedRunnable;
1616
import org.opensearch.common.blobstore.BlobPath;
1717
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
18-
import org.opensearch.core.index.Index;
1918
import org.opensearch.gateway.remote.ClusterMetadataManifest;
2019

2120
import java.io.IOException;
@@ -41,11 +40,12 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
4140
}
4241

4342
@Override
44-
public void getIndexRoutingAsyncAction(
45-
ClusterState clusterState,
43+
public void getAsyncIndexRoutingWriteAction(
44+
String clusterUUID,
45+
long term,
46+
long version,
4647
IndexRoutingTable indexRouting,
47-
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
48-
BlobPath clusterBasePath
48+
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
4949
) {
5050
// noop
5151
}
@@ -62,8 +62,8 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
6262

6363
@Override
6464
public void getAsyncIndexRoutingReadAction(
65+
String clusterUUID,
6566
String uploadedFilename,
66-
Index index,
6767
LatchedActionListener<IndexRoutingTable> latchedActionListener
6868
) {
6969
// noop

0 commit comments

Comments
 (0)