Skip to content

Commit 3f29951

Browse files
committed
[Backport 2.16][Backport 2.x] Refactor remote-routing-table service inline with remote state interfaces (opensearch-project#14668)
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
1 parent 69bccb9 commit 3f29951

19 files changed

+788
-673
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2525
- Add SplitResponseProcessor to Search Pipelines (([#14800](https://github.com/opensearch-project/OpenSearch/issues/14800)))
2626
- Optimize TransportNodesAction to not send DiscoveryNodes for NodeStats, NodesInfo and ClusterStats call ([14749](https://github.com/opensearch-project/OpenSearch/pull/14749))
2727
- Reduce logging in DEBUG for MasterService:run ([#14795](https://github.com/opensearch-project/OpenSearch/pull/14795))
28+
- Refactor remote-routing-table service inline with remote state interfaces([#14668](https://github.com/opensearch-project/OpenSearch/pull/14668))
2829

2930
### Dependencies
3031
- Update to Apache Lucene 9.11.1 ([#14042](https://github.com/opensearch-project/OpenSearch/pull/14042), [#14576](https://github.com/opensearch-project/OpenSearch/pull/14576))

server/src/main/java/org/opensearch/cluster/routing/remote/InternalRemoteRoutingTableService.java

+40-191
Original file line numberDiff line numberDiff line change
@@ -11,34 +11,23 @@
1111
import org.apache.logging.log4j.LogManager;
1212
import org.apache.logging.log4j.Logger;
1313
import org.apache.logging.log4j.message.ParameterizedMessage;
14-
import org.apache.lucene.store.IndexInput;
1514
import org.opensearch.action.LatchedActionListener;
16-
import org.opensearch.cluster.ClusterState;
1715
import org.opensearch.cluster.DiffableUtils;
1816
import org.opensearch.cluster.routing.IndexRoutingTable;
1917
import org.opensearch.cluster.routing.RoutingTable;
20-
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
21-
import org.opensearch.common.blobstore.BlobContainer;
2218
import org.opensearch.common.blobstore.BlobPath;
23-
import org.opensearch.common.blobstore.stream.write.WritePriority;
24-
import org.opensearch.common.blobstore.transfer.RemoteTransferContainer;
25-
import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream;
26-
import org.opensearch.common.io.stream.BytesStreamOutput;
2719
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
28-
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
20+
import org.opensearch.common.remote.RemoteWritableEntityStore;
2921
import org.opensearch.common.settings.ClusterSettings;
30-
import org.opensearch.common.settings.Setting;
3122
import org.opensearch.common.settings.Settings;
3223
import org.opensearch.common.util.io.IOUtils;
3324
import org.opensearch.core.action.ActionListener;
34-
import org.opensearch.core.common.bytes.BytesReference;
35-
import org.opensearch.core.index.Index;
25+
import org.opensearch.core.compress.Compressor;
3626
import org.opensearch.gateway.remote.ClusterMetadataManifest;
3727
import org.opensearch.gateway.remote.RemoteStateTransferException;
28+
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
3829
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
39-
import org.opensearch.index.remote.RemoteStoreEnums;
40-
import org.opensearch.index.remote.RemoteStorePathStrategy;
41-
import org.opensearch.index.remote.RemoteStoreUtils;
30+
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
4231
import org.opensearch.node.Node;
4332
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
4433
import org.opensearch.repositories.RepositoriesService;
@@ -51,12 +40,10 @@
5140
import java.util.List;
5241
import java.util.Map;
5342
import java.util.Optional;
54-
import java.util.concurrent.ExecutorService;
5543
import java.util.function.Function;
5644
import java.util.function.Supplier;
5745
import java.util.stream.Collectors;
5846

59-
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
6047
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
6148

6249
/**
@@ -66,64 +53,29 @@
6653
*/
6754
public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService {
6855

69-
/**
70-
* This setting is used to set the remote routing table store blob store path type strategy.
71-
*/
72-
public static final Setting<RemoteStoreEnums.PathType> REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting<>(
73-
"cluster.remote_store.routing_table.path_type",
74-
RemoteStoreEnums.PathType.HASHED_PREFIX.toString(),
75-
RemoteStoreEnums.PathType::parseString,
76-
Setting.Property.NodeScope,
77-
Setting.Property.Dynamic
78-
);
79-
80-
/**
81-
* This setting is used to set the remote routing table store blob store path hash algorithm strategy.
82-
* This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING}
83-
* is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
84-
*/
85-
public static final Setting<RemoteStoreEnums.PathHashAlgorithm> REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting<>(
86-
"cluster.remote_store.routing_table.path_hash_algo",
87-
RemoteStoreEnums.PathHashAlgorithm.FNV_1A_BASE64.toString(),
88-
RemoteStoreEnums.PathHashAlgorithm::parseString,
89-
Setting.Property.NodeScope,
90-
Setting.Property.Dynamic
91-
);
92-
93-
public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
94-
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
95-
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";
96-
9756
private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
9857
private final Settings settings;
9958
private final Supplier<RepositoriesService> repositoriesService;
59+
private Compressor compressor;
60+
private RemoteWritableEntityStore<IndexRoutingTable, RemoteIndexRoutingTable> remoteIndexRoutingTableStore;
61+
private final ClusterSettings clusterSettings;
10062
private BlobStoreRepository blobStoreRepository;
101-
private RemoteStoreEnums.PathType pathType;
102-
private RemoteStoreEnums.PathHashAlgorithm pathHashAlgo;
103-
private ThreadPool threadPool;
63+
private final ThreadPool threadPool;
64+
private final String clusterName;
10465

10566
public InternalRemoteRoutingTableService(
10667
Supplier<RepositoriesService> repositoriesService,
10768
Settings settings,
10869
ClusterSettings clusterSettings,
109-
ThreadPool threadpool
70+
ThreadPool threadpool,
71+
String clusterName
11072
) {
11173
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
11274
this.repositoriesService = repositoriesService;
11375
this.settings = settings;
114-
this.pathType = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING);
115-
this.pathHashAlgo = clusterSettings.get(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING);
116-
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING, this::setPathTypeSetting);
117-
clusterSettings.addSettingsUpdateConsumer(REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING, this::setPathHashAlgoSetting);
11876
this.threadPool = threadpool;
119-
}
120-
121-
private void setPathTypeSetting(RemoteStoreEnums.PathType pathType) {
122-
this.pathType = pathType;
123-
}
124-
125-
private void setPathHashAlgoSetting(RemoteStoreEnums.PathHashAlgorithm pathHashAlgo) {
126-
this.pathHashAlgo = pathHashAlgo;
77+
this.clusterName = clusterName;
78+
this.clusterSettings = clusterSettings;
12779
}
12880

12981
public List<IndexRoutingTable> getIndicesRouting(RoutingTable routingTable) {
@@ -150,43 +102,31 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
150102

151103
/**
152104
* Async action for writing one {@code IndexRoutingTable} to remote store
153-
* @param clusterState current cluster state
105+
* @param term current term
106+
* @param version current version
107+
* @param clusterUUID current cluster UUID
154108
* @param indexRouting indexRoutingTable to write to remote store
155109
* @param latchedActionListener listener for handling async action response
156-
* @param clusterBasePath base path for remote file
157110
*/
158111
@Override
159-
public void getIndexRoutingAsyncAction(
160-
ClusterState clusterState,
112+
public void getAsyncIndexRoutingWriteAction(
113+
String clusterUUID,
114+
long term,
115+
long version,
161116
IndexRoutingTable indexRouting,
162-
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
163-
BlobPath clusterBasePath
117+
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
164118
) {
165119

166-
BlobPath indexRoutingPath = clusterBasePath.add(INDEX_ROUTING_PATH_TOKEN);
167-
BlobPath path = pathType.path(
168-
RemoteStorePathStrategy.BasePathInput.builder().basePath(indexRoutingPath).indexUUID(indexRouting.getIndex().getUUID()).build(),
169-
pathHashAlgo
170-
);
171-
final BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer(path);
172-
173-
final String fileName = getIndexRoutingFileName(clusterState.term(), clusterState.version());
120+
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(indexRouting, clusterUUID, compressor, term, version);
174121

175122
ActionListener<Void> completionListener = ActionListener.wrap(
176-
resp -> latchedActionListener.onResponse(
177-
new ClusterMetadataManifest.UploadedIndexMetadata(
178-
indexRouting.getIndex().getName(),
179-
indexRouting.getIndex().getUUID(),
180-
path.buildAsString() + fileName,
181-
INDEX_ROUTING_METADATA_PREFIX
182-
)
183-
),
123+
resp -> latchedActionListener.onResponse(remoteIndexRoutingTable.getUploadedMetadata()),
184124
ex -> latchedActionListener.onFailure(
185125
new RemoteStateTransferException("Exception in writing index to remote store: " + indexRouting.getIndex().toString(), ex)
186126
)
187127
);
188128

189-
uploadIndex(indexRouting, fileName, blobContainer, completionListener);
129+
remoteIndexRoutingTableStore.writeAsync(remoteIndexRoutingTable, completionListener);
190130
}
191131

192132
/**
@@ -213,111 +153,21 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
213153
return new ArrayList<>(allUploadedIndicesRouting.values());
214154
}
215155

216-
private void uploadIndex(
217-
IndexRoutingTable indexRouting,
218-
String fileName,
219-
BlobContainer blobContainer,
220-
ActionListener<Void> completionListener
221-
) {
222-
RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable(indexRouting);
223-
BytesReference bytesInput = null;
224-
try (BytesStreamOutput streamOutput = new BytesStreamOutput()) {
225-
indexRoutingInput.writeTo(streamOutput);
226-
bytesInput = streamOutput.bytes();
227-
} catch (IOException e) {
228-
logger.error("Failed to serialize IndexRoutingTable for [{}]: [{}]", indexRouting, e);
229-
completionListener.onFailure(e);
230-
return;
231-
}
232-
233-
if (blobContainer instanceof AsyncMultiStreamBlobContainer == false) {
234-
try {
235-
blobContainer.writeBlob(fileName, bytesInput.streamInput(), bytesInput.length(), true);
236-
completionListener.onResponse(null);
237-
} catch (IOException e) {
238-
logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e);
239-
completionListener.onFailure(e);
240-
}
241-
return;
242-
}
243-
244-
try (IndexInput input = new ByteArrayIndexInput("indexrouting", BytesReference.toBytes(bytesInput))) {
245-
try (
246-
RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer(
247-
fileName,
248-
fileName,
249-
input.length(),
250-
true,
251-
WritePriority.URGENT,
252-
(size, position) -> new OffsetRangeIndexInputStream(input, size, position),
253-
null,
254-
false
255-
)
256-
) {
257-
((AsyncMultiStreamBlobContainer) blobContainer).asyncBlobUpload(
258-
remoteTransferContainer.createWriteContext(),
259-
completionListener
260-
);
261-
} catch (IOException e) {
262-
logger.error("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]", indexRouting, e);
263-
completionListener.onFailure(e);
264-
}
265-
} catch (IOException e) {
266-
logger.error(
267-
"Failed to create transfer object for IndexRoutingTable for remote store upload for indexRouting [{}]: [{}]",
268-
indexRouting,
269-
e
270-
);
271-
completionListener.onFailure(e);
272-
}
273-
}
274-
275156
@Override
276157
public void getAsyncIndexRoutingReadAction(
158+
String clusterUUID,
277159
String uploadedFilename,
278-
Index index,
279160
LatchedActionListener<IndexRoutingTable> latchedActionListener
280161
) {
281-
int idx = uploadedFilename.lastIndexOf("/");
282-
String blobFileName = uploadedFilename.substring(idx + 1);
283-
BlobContainer blobContainer = blobStoreRepository.blobStore()
284-
.blobContainer(BlobPath.cleanPath().add(uploadedFilename.substring(0, idx)));
285162

286-
readAsync(
287-
blobContainer,
288-
blobFileName,
289-
index,
290-
threadPool.executor(ThreadPool.Names.REMOTE_STATE_READ),
291-
ActionListener.wrap(
292-
response -> latchedActionListener.onResponse(response.getIndexRoutingTable()),
293-
latchedActionListener::onFailure
294-
)
163+
ActionListener<IndexRoutingTable> actionListener = ActionListener.wrap(
164+
latchedActionListener::onResponse,
165+
latchedActionListener::onFailure
295166
);
296-
}
297167

298-
private void readAsync(
299-
BlobContainer blobContainer,
300-
String name,
301-
Index index,
302-
ExecutorService executorService,
303-
ActionListener<RemoteIndexRoutingTable> listener
304-
) {
305-
executorService.execute(() -> {
306-
try {
307-
listener.onResponse(read(blobContainer, name, index));
308-
} catch (Exception e) {
309-
listener.onFailure(e);
310-
}
311-
});
312-
}
168+
RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable(uploadedFilename, clusterUUID, compressor);
313169

314-
private RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) {
315-
try {
316-
return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index);
317-
} catch (IOException | AssertionError e) {
318-
logger.error(() -> new ParameterizedMessage("RoutingTable read failed for path {}", path), e);
319-
throw new RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e);
320-
}
170+
remoteIndexRoutingTableStore.readAsync(remoteIndexRoutingTable, actionListener);
321171
}
322172

323173
@Override
@@ -334,16 +184,6 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutin
334184
}).collect(Collectors.toList());
335185
}
336186

337-
private String getIndexRoutingFileName(long term, long version) {
338-
return String.join(
339-
DELIMITER,
340-
INDEX_ROUTING_FILE_PREFIX,
341-
RemoteStoreUtils.invertLong(term),
342-
RemoteStoreUtils.invertLong(version),
343-
RemoteStoreUtils.invertLong(System.currentTimeMillis())
344-
);
345-
}
346-
347187
@Override
348188
protected void doClose() throws IOException {
349189
if (blobStoreRepository != null) {
@@ -361,6 +201,16 @@ protected void doStart() {
361201
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
362202
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
363203
blobStoreRepository = (BlobStoreRepository) repository;
204+
compressor = blobStoreRepository.getCompressor();
205+
206+
this.remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore<>(
207+
new BlobStoreTransferService(blobStoreRepository.blobStore(), threadPool),
208+
blobStoreRepository,
209+
clusterName,
210+
threadPool,
211+
ThreadPool.Names.REMOTE_STATE_READ,
212+
clusterSettings
213+
);
364214
}
365215

366216
@Override
@@ -376,5 +226,4 @@ public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOExcep
376226
throw e;
377227
}
378228
}
379-
380229
}

server/src/main/java/org/opensearch/cluster/routing/remote/NoopRemoteRoutingTableService.java

+6-8
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,10 @@
99
package org.opensearch.cluster.routing.remote;
1010

1111
import org.opensearch.action.LatchedActionListener;
12-
import org.opensearch.cluster.ClusterState;
1312
import org.opensearch.cluster.DiffableUtils;
1413
import org.opensearch.cluster.routing.IndexRoutingTable;
1514
import org.opensearch.cluster.routing.RoutingTable;
16-
import org.opensearch.common.blobstore.BlobPath;
1715
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
18-
import org.opensearch.core.index.Index;
1916
import org.opensearch.gateway.remote.ClusterMetadataManifest;
2017

2118
import java.io.IOException;
@@ -41,11 +38,12 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
4138
}
4239

4340
@Override
44-
public void getIndexRoutingAsyncAction(
45-
ClusterState clusterState,
41+
public void getAsyncIndexRoutingWriteAction(
42+
String clusterUUID,
43+
long term,
44+
long version,
4645
IndexRoutingTable indexRouting,
47-
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener,
48-
BlobPath clusterBasePath
46+
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
4947
) {
5048
// noop
5149
}
@@ -62,8 +60,8 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
6260

6361
@Override
6462
public void getAsyncIndexRoutingReadAction(
63+
String clusterUUID,
6564
String uploadedFilename,
66-
Index index,
6765
LatchedActionListener<IndexRoutingTable> latchedActionListener
6866
) {
6967
// noop

0 commit comments

Comments
 (0)