11
11
import org .apache .logging .log4j .LogManager ;
12
12
import org .apache .logging .log4j .Logger ;
13
13
import org .apache .logging .log4j .message .ParameterizedMessage ;
14
- import org .apache .lucene .store .IndexInput ;
15
14
import org .opensearch .action .LatchedActionListener ;
16
- import org .opensearch .cluster .ClusterState ;
17
15
import org .opensearch .cluster .DiffableUtils ;
18
16
import org .opensearch .cluster .routing .IndexRoutingTable ;
19
17
import org .opensearch .cluster .routing .RoutingTable ;
20
- import org .opensearch .common .blobstore .AsyncMultiStreamBlobContainer ;
21
- import org .opensearch .common .blobstore .BlobContainer ;
22
18
import org .opensearch .common .blobstore .BlobPath ;
23
- import org .opensearch .common .blobstore .stream .write .WritePriority ;
24
- import org .opensearch .common .blobstore .transfer .RemoteTransferContainer ;
25
- import org .opensearch .common .blobstore .transfer .stream .OffsetRangeIndexInputStream ;
26
- import org .opensearch .common .io .stream .BytesStreamOutput ;
27
19
import org .opensearch .common .lifecycle .AbstractLifecycleComponent ;
28
- import org .opensearch .common .lucene . store . ByteArrayIndexInput ;
20
+ import org .opensearch .common .remote . RemoteWritableEntityStore ;
29
21
import org .opensearch .common .settings .ClusterSettings ;
30
- import org .opensearch .common .settings .Setting ;
31
22
import org .opensearch .common .settings .Settings ;
32
23
import org .opensearch .common .util .io .IOUtils ;
33
24
import org .opensearch .core .action .ActionListener ;
34
- import org .opensearch .core .common .bytes .BytesReference ;
35
- import org .opensearch .core .index .Index ;
25
+ import org .opensearch .core .compress .Compressor ;
36
26
import org .opensearch .gateway .remote .ClusterMetadataManifest ;
37
27
import org .opensearch .gateway .remote .RemoteStateTransferException ;
28
+ import org .opensearch .gateway .remote .model .RemoteRoutingTableBlobStore ;
38
29
import org .opensearch .gateway .remote .routingtable .RemoteIndexRoutingTable ;
39
- import org .opensearch .index .remote .RemoteStoreEnums ;
40
- import org .opensearch .index .remote .RemoteStorePathStrategy ;
41
- import org .opensearch .index .remote .RemoteStoreUtils ;
30
+ import org .opensearch .index .translog .transfer .BlobStoreTransferService ;
42
31
import org .opensearch .node .Node ;
43
32
import org .opensearch .node .remotestore .RemoteStoreNodeAttribute ;
44
33
import org .opensearch .repositories .RepositoriesService ;
51
40
import java .util .List ;
52
41
import java .util .Map ;
53
42
import java .util .Optional ;
54
- import java .util .concurrent .ExecutorService ;
55
43
import java .util .function .Function ;
56
44
import java .util .function .Supplier ;
57
45
import java .util .stream .Collectors ;
58
46
59
- import static org .opensearch .gateway .remote .RemoteClusterStateUtils .DELIMITER ;
60
47
import static org .opensearch .node .remotestore .RemoteStoreNodeAttribute .isRemoteRoutingTableEnabled ;
61
48
62
49
/**
66
53
*/
67
54
public class InternalRemoteRoutingTableService extends AbstractLifecycleComponent implements RemoteRoutingTableService {
68
55
69
- /**
70
- * This setting is used to set the remote routing table store blob store path type strategy.
71
- */
72
- public static final Setting <RemoteStoreEnums .PathType > REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING = new Setting <>(
73
- "cluster.remote_store.routing_table.path_type" ,
74
- RemoteStoreEnums .PathType .HASHED_PREFIX .toString (),
75
- RemoteStoreEnums .PathType ::parseString ,
76
- Setting .Property .NodeScope ,
77
- Setting .Property .Dynamic
78
- );
79
-
80
- /**
81
- * This setting is used to set the remote routing table store blob store path hash algorithm strategy.
82
- * This setting will come to effect if the {@link #REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING}
83
- * is either {@code HASHED_PREFIX} or {@code HASHED_INFIX}.
84
- */
85
- public static final Setting <RemoteStoreEnums .PathHashAlgorithm > REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING = new Setting <>(
86
- "cluster.remote_store.routing_table.path_hash_algo" ,
87
- RemoteStoreEnums .PathHashAlgorithm .FNV_1A_BASE64 .toString (),
88
- RemoteStoreEnums .PathHashAlgorithm ::parseString ,
89
- Setting .Property .NodeScope ,
90
- Setting .Property .Dynamic
91
- );
92
-
93
- public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing" ;
94
- public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing" ;
95
- public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--" ;
96
-
97
56
private static final Logger logger = LogManager .getLogger (InternalRemoteRoutingTableService .class );
98
57
private final Settings settings ;
99
58
private final Supplier <RepositoriesService > repositoriesService ;
59
+ private Compressor compressor ;
60
+ private RemoteWritableEntityStore <IndexRoutingTable , RemoteIndexRoutingTable > remoteIndexRoutingTableStore ;
61
+ private final ClusterSettings clusterSettings ;
100
62
private BlobStoreRepository blobStoreRepository ;
101
- private RemoteStoreEnums .PathType pathType ;
102
- private RemoteStoreEnums .PathHashAlgorithm pathHashAlgo ;
103
- private ThreadPool threadPool ;
63
+ private final ThreadPool threadPool ;
64
+ private final String clusterName ;
104
65
105
66
public InternalRemoteRoutingTableService (
106
67
Supplier <RepositoriesService > repositoriesService ,
107
68
Settings settings ,
108
69
ClusterSettings clusterSettings ,
109
- ThreadPool threadpool
70
+ ThreadPool threadpool ,
71
+ String clusterName
110
72
) {
111
73
assert isRemoteRoutingTableEnabled (settings ) : "Remote routing table is not enabled" ;
112
74
this .repositoriesService = repositoriesService ;
113
75
this .settings = settings ;
114
- this .pathType = clusterSettings .get (REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING );
115
- this .pathHashAlgo = clusterSettings .get (REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING );
116
- clusterSettings .addSettingsUpdateConsumer (REMOTE_ROUTING_TABLE_PATH_TYPE_SETTING , this ::setPathTypeSetting );
117
- clusterSettings .addSettingsUpdateConsumer (REMOTE_ROUTING_TABLE_PATH_HASH_ALGO_SETTING , this ::setPathHashAlgoSetting );
118
76
this .threadPool = threadpool ;
119
- }
120
-
121
- private void setPathTypeSetting (RemoteStoreEnums .PathType pathType ) {
122
- this .pathType = pathType ;
123
- }
124
-
125
- private void setPathHashAlgoSetting (RemoteStoreEnums .PathHashAlgorithm pathHashAlgo ) {
126
- this .pathHashAlgo = pathHashAlgo ;
77
+ this .clusterName = clusterName ;
78
+ this .clusterSettings = clusterSettings ;
127
79
}
128
80
129
81
public List <IndexRoutingTable > getIndicesRouting (RoutingTable routingTable ) {
@@ -150,43 +102,31 @@ public DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRouting
150
102
151
103
/**
152
104
* Async action for writing one {@code IndexRoutingTable} to remote store
153
- * @param clusterState current cluster state
105
+ * @param term current term
106
+ * @param version current version
107
+ * @param clusterUUID current cluster UUID
154
108
* @param indexRouting indexRoutingTable to write to remote store
155
109
* @param latchedActionListener listener for handling async action response
156
- * @param clusterBasePath base path for remote file
157
110
*/
158
111
@ Override
159
- public void getIndexRoutingAsyncAction (
160
- ClusterState clusterState ,
112
+ public void getAsyncIndexRoutingWriteAction (
113
+ String clusterUUID ,
114
+ long term ,
115
+ long version ,
161
116
IndexRoutingTable indexRouting ,
162
- LatchedActionListener <ClusterMetadataManifest .UploadedMetadata > latchedActionListener ,
163
- BlobPath clusterBasePath
117
+ LatchedActionListener <ClusterMetadataManifest .UploadedMetadata > latchedActionListener
164
118
) {
165
119
166
- BlobPath indexRoutingPath = clusterBasePath .add (INDEX_ROUTING_PATH_TOKEN );
167
- BlobPath path = pathType .path (
168
- RemoteStorePathStrategy .BasePathInput .builder ().basePath (indexRoutingPath ).indexUUID (indexRouting .getIndex ().getUUID ()).build (),
169
- pathHashAlgo
170
- );
171
- final BlobContainer blobContainer = blobStoreRepository .blobStore ().blobContainer (path );
172
-
173
- final String fileName = getIndexRoutingFileName (clusterState .term (), clusterState .version ());
120
+ RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable (indexRouting , clusterUUID , compressor , term , version );
174
121
175
122
ActionListener <Void > completionListener = ActionListener .wrap (
176
- resp -> latchedActionListener .onResponse (
177
- new ClusterMetadataManifest .UploadedIndexMetadata (
178
- indexRouting .getIndex ().getName (),
179
- indexRouting .getIndex ().getUUID (),
180
- path .buildAsString () + fileName ,
181
- INDEX_ROUTING_METADATA_PREFIX
182
- )
183
- ),
123
+ resp -> latchedActionListener .onResponse (remoteIndexRoutingTable .getUploadedMetadata ()),
184
124
ex -> latchedActionListener .onFailure (
185
125
new RemoteStateTransferException ("Exception in writing index to remote store: " + indexRouting .getIndex ().toString (), ex )
186
126
)
187
127
);
188
128
189
- uploadIndex ( indexRouting , fileName , blobContainer , completionListener );
129
+ remoteIndexRoutingTableStore . writeAsync ( remoteIndexRoutingTable , completionListener );
190
130
}
191
131
192
132
/**
@@ -213,111 +153,21 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getAllUploadedIndices
213
153
return new ArrayList <>(allUploadedIndicesRouting .values ());
214
154
}
215
155
216
- private void uploadIndex (
217
- IndexRoutingTable indexRouting ,
218
- String fileName ,
219
- BlobContainer blobContainer ,
220
- ActionListener <Void > completionListener
221
- ) {
222
- RemoteIndexRoutingTable indexRoutingInput = new RemoteIndexRoutingTable (indexRouting );
223
- BytesReference bytesInput = null ;
224
- try (BytesStreamOutput streamOutput = new BytesStreamOutput ()) {
225
- indexRoutingInput .writeTo (streamOutput );
226
- bytesInput = streamOutput .bytes ();
227
- } catch (IOException e ) {
228
- logger .error ("Failed to serialize IndexRoutingTable for [{}]: [{}]" , indexRouting , e );
229
- completionListener .onFailure (e );
230
- return ;
231
- }
232
-
233
- if (blobContainer instanceof AsyncMultiStreamBlobContainer == false ) {
234
- try {
235
- blobContainer .writeBlob (fileName , bytesInput .streamInput (), bytesInput .length (), true );
236
- completionListener .onResponse (null );
237
- } catch (IOException e ) {
238
- logger .error ("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]" , indexRouting , e );
239
- completionListener .onFailure (e );
240
- }
241
- return ;
242
- }
243
-
244
- try (IndexInput input = new ByteArrayIndexInput ("indexrouting" , BytesReference .toBytes (bytesInput ))) {
245
- try (
246
- RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer (
247
- fileName ,
248
- fileName ,
249
- input .length (),
250
- true ,
251
- WritePriority .URGENT ,
252
- (size , position ) -> new OffsetRangeIndexInputStream (input , size , position ),
253
- null ,
254
- false
255
- )
256
- ) {
257
- ((AsyncMultiStreamBlobContainer ) blobContainer ).asyncBlobUpload (
258
- remoteTransferContainer .createWriteContext (),
259
- completionListener
260
- );
261
- } catch (IOException e ) {
262
- logger .error ("Failed to write IndexRoutingTable to remote store for indexRouting [{}]: [{}]" , indexRouting , e );
263
- completionListener .onFailure (e );
264
- }
265
- } catch (IOException e ) {
266
- logger .error (
267
- "Failed to create transfer object for IndexRoutingTable for remote store upload for indexRouting [{}]: [{}]" ,
268
- indexRouting ,
269
- e
270
- );
271
- completionListener .onFailure (e );
272
- }
273
- }
274
-
275
156
@ Override
276
157
public void getAsyncIndexRoutingReadAction (
158
+ String clusterUUID ,
277
159
String uploadedFilename ,
278
- Index index ,
279
160
LatchedActionListener <IndexRoutingTable > latchedActionListener
280
161
) {
281
- int idx = uploadedFilename .lastIndexOf ("/" );
282
- String blobFileName = uploadedFilename .substring (idx + 1 );
283
- BlobContainer blobContainer = blobStoreRepository .blobStore ()
284
- .blobContainer (BlobPath .cleanPath ().add (uploadedFilename .substring (0 , idx )));
285
162
286
- readAsync (
287
- blobContainer ,
288
- blobFileName ,
289
- index ,
290
- threadPool .executor (ThreadPool .Names .REMOTE_STATE_READ ),
291
- ActionListener .wrap (
292
- response -> latchedActionListener .onResponse (response .getIndexRoutingTable ()),
293
- latchedActionListener ::onFailure
294
- )
163
+ ActionListener <IndexRoutingTable > actionListener = ActionListener .wrap (
164
+ latchedActionListener ::onResponse ,
165
+ latchedActionListener ::onFailure
295
166
);
296
- }
297
167
298
- private void readAsync (
299
- BlobContainer blobContainer ,
300
- String name ,
301
- Index index ,
302
- ExecutorService executorService ,
303
- ActionListener <RemoteIndexRoutingTable > listener
304
- ) {
305
- executorService .execute (() -> {
306
- try {
307
- listener .onResponse (read (blobContainer , name , index ));
308
- } catch (Exception e ) {
309
- listener .onFailure (e );
310
- }
311
- });
312
- }
168
+ RemoteIndexRoutingTable remoteIndexRoutingTable = new RemoteIndexRoutingTable (uploadedFilename , clusterUUID , compressor );
313
169
314
- private RemoteIndexRoutingTable read (BlobContainer blobContainer , String path , Index index ) {
315
- try {
316
- return new RemoteIndexRoutingTable (blobContainer .readBlob (path ), index );
317
- } catch (IOException | AssertionError e ) {
318
- logger .error (() -> new ParameterizedMessage ("RoutingTable read failed for path {}" , path ), e );
319
- throw new RemoteStateTransferException ("Failed to read RemoteRoutingTable from Manifest with error " , e );
320
- }
170
+ remoteIndexRoutingTableStore .readAsync (remoteIndexRoutingTable , actionListener );
321
171
}
322
172
323
173
@ Override
@@ -334,16 +184,6 @@ public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutin
334
184
}).collect (Collectors .toList ());
335
185
}
336
186
337
- private String getIndexRoutingFileName (long term , long version ) {
338
- return String .join (
339
- DELIMITER ,
340
- INDEX_ROUTING_FILE_PREFIX ,
341
- RemoteStoreUtils .invertLong (term ),
342
- RemoteStoreUtils .invertLong (version ),
343
- RemoteStoreUtils .invertLong (System .currentTimeMillis ())
344
- );
345
- }
346
-
347
187
@ Override
348
188
protected void doClose () throws IOException {
349
189
if (blobStoreRepository != null ) {
@@ -361,6 +201,16 @@ protected void doStart() {
361
201
final Repository repository = repositoriesService .get ().repository (remoteStoreRepo );
362
202
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository" ;
363
203
blobStoreRepository = (BlobStoreRepository ) repository ;
204
+ compressor = blobStoreRepository .getCompressor ();
205
+
206
+ this .remoteIndexRoutingTableStore = new RemoteRoutingTableBlobStore <>(
207
+ new BlobStoreTransferService (blobStoreRepository .blobStore (), threadPool ),
208
+ blobStoreRepository ,
209
+ clusterName ,
210
+ threadPool ,
211
+ ThreadPool .Names .REMOTE_STATE_READ ,
212
+ clusterSettings
213
+ );
364
214
}
365
215
366
216
@ Override
@@ -376,5 +226,4 @@ public void deleteStaleIndexRoutingPaths(List<String> stalePaths) throws IOExcep
376
226
throw e ;
377
227
}
378
228
}
379
-
380
229
}
0 commit comments