forked from opensearch-project/OpenSearch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathShardLimitValidator.java
335 lines (295 loc) · 13.7 KB
/
ShardLimitValidator.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.indices;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.ValidationException;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.index.Index;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import static org.opensearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING;
/**
* This class contains the logic used to check the cluster-wide shard limit before shards are created and ensuring that the limit is
* updated correctly on setting updates, etc.
* <p>
* NOTE: This is the limit applied at *shard creation time*. If you are looking for the limit applied at *allocation* time, which is
* controlled by a different setting,
* see {@link org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider}.
*
* @opensearch.internal
*/
public class ShardLimitValidator {
public static final String SETTING_MAX_SHARDS_PER_CLUSTER_KEY = "cluster.routing.allocation.total_shards_limit";
public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_NODE = Setting.intSetting(
"cluster.max_shards_per_node",
1000,
1,
new MaxShardPerNodeLimitValidator(),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final Setting<Integer> SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER = Setting.intSetting(
SETTING_MAX_SHARDS_PER_CLUSTER_KEY,
-1,
-1,
new MaxShardPerClusterLimitValidator(),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
public static final Setting<Boolean> SETTING_CLUSTER_IGNORE_DOT_INDEXES = Setting.boolSetting(
"cluster.ignore_dot_indexes",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
protected final AtomicInteger shardLimitPerNode = new AtomicInteger();
protected final AtomicInteger shardLimitPerCluster = new AtomicInteger();
private final SystemIndices systemIndices;
private volatile boolean ignoreDotIndexes;
public ShardLimitValidator(final Settings settings, ClusterService clusterService, SystemIndices systemIndices) {
this.shardLimitPerNode.set(SETTING_CLUSTER_MAX_SHARDS_PER_NODE.get(settings));
this.shardLimitPerCluster.set(SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER.get(settings));
this.ignoreDotIndexes = SETTING_CLUSTER_IGNORE_DOT_INDEXES.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_NODE, this::setShardLimitPerNode);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER, this::setShardLimitPerCluster);
clusterService.getClusterSettings().addSettingsUpdateConsumer(SETTING_CLUSTER_IGNORE_DOT_INDEXES, this::setIgnoreDotIndexes);
this.systemIndices = systemIndices;
}
private void setShardLimitPerNode(int newValue) {
this.shardLimitPerNode.set(newValue);
}
private void setShardLimitPerCluster(int newValue) {
this.shardLimitPerCluster.set(newValue);
}
/**
* Gets the currently configured value of the {@link ShardLimitValidator#SETTING_CLUSTER_MAX_SHARDS_PER_NODE} setting.
* @return the current value of the setting
*/
public int getShardLimitPerNode() {
return shardLimitPerNode.get();
}
/**
* Gets the currently configured value of the {@link ShardLimitValidator#SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER} setting.
* @return the current value of the setting.
*/
public int getShardLimitPerCluster() {
return shardLimitPerCluster.get();
}
private void setIgnoreDotIndexes(boolean newValue) {
this.ignoreDotIndexes = newValue;
}
/**
* Checks whether an index can be created without going over the cluster shard limit.
* Validate shard limit only for non system indices as it is not hard limit anyways.
* Further also validates if the cluster.ignore_dot_indexes is set to true.
* If so then it does not validate any index which starts with '.' except data-stream index.
*
* @param indexName the name of the index being created
* @param settings the settings of the index to be created
* @param state the current cluster state
* @throws ValidationException if creating this index would put the cluster over the cluster shard limit
*/
public void validateShardLimit(final String indexName, final Settings settings, final ClusterState state) {
/*
Validate shard limit only for non system indices as it is not hard limit anyways.
Further also validates if the cluster.ignore_dot_indexes is set to true.
If so then it does not validate any index which starts with '.'.
*/
if (shouldIndexBeIgnored(indexName)) {
return;
}
final int numberOfShards = INDEX_NUMBER_OF_SHARDS_SETTING.get(settings);
final int numberOfReplicas = IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings);
final int shardsToCreate = numberOfShards * (1 + numberOfReplicas);
final Optional<String> shardLimit = checkShardLimit(shardsToCreate, state);
if (shardLimit.isPresent()) {
final ValidationException e = new ValidationException();
e.addValidationError(shardLimit.get());
throw e;
}
}
/**
* Validates whether a list of indices can be opened without going over the cluster shard limit. Only counts indices which are
* currently closed and will be opened, ignores indices which are already open. Adding to this it validates the
* shard limit only for non system indices and if the cluster.ignore_dot_indexes property is set to true
* then the indexes starting with '.' are ignored except the data-stream indexes.
*
* @param currentState The current cluster state.
* @param indicesToOpen The indices which are to be opened.
* @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled.
*/
public void validateShardLimit(ClusterState currentState, Index[] indicesToOpen) {
int shardsToOpen = Arrays.stream(indicesToOpen)
/*
Validate shard limit only for non system indices as it is not hard limit anyways.
Further also validates if the cluster.ignore_dot_indexes is set to true.
If so then it does not validate any index which starts with '.'
however data-stream indexes are still validated.
*/
.filter(index -> !shouldIndexBeIgnored(index.getName()))
.filter(index -> currentState.metadata().index(index).getState().equals(IndexMetadata.State.CLOSE))
.mapToInt(index -> getTotalShardCount(currentState, index))
.sum();
Optional<String> error = checkShardLimit(shardsToOpen, currentState);
if (error.isPresent()) {
ValidationException ex = new ValidationException();
ex.addValidationError(error.get());
throw ex;
}
}
private static int getTotalShardCount(ClusterState state, Index index) {
IndexMetadata indexMetadata = state.metadata().index(index);
return indexMetadata.getNumberOfShards() * (1 + indexMetadata.getNumberOfReplicas());
}
/**
* Returns true if the index should be ignored during validation.
* Index is ignored if it is a system index or if cluster.ignore_dot_indexes is set to true
* then indexes which are starting with dot and are not data stream index are ignored.
*
* @param indexName The index which needs to be validated.
*/
private boolean shouldIndexBeIgnored(String indexName) {
if (this.ignoreDotIndexes) {
return validateDotIndex(indexName) && !isDataStreamIndex(indexName);
} else return systemIndices.validateSystemIndex(indexName);
}
/**
* Returns true if the index name starts with '.' else false.
*
* @param indexName The index which needs to be validated.
*/
private boolean validateDotIndex(String indexName) {
return indexName.charAt(0) == '.';
}
/**
* Returns true if the index is dataStreamIndex false otherwise.
*
* @param indexName The index which needs to be validated.
*/
private boolean isDataStreamIndex(String indexName) {
return indexName.startsWith(DataStream.BACKING_INDEX_PREFIX);
}
/**
* Checks to see if an operation can be performed without taking the cluster over the cluster-wide shard limit.
* Returns an error message if appropriate, or an empty {@link Optional} otherwise.
*
* @param newShards The number of shards to be added by this operation
* @param state The current cluster state
* @return If present, an error message to be given as the reason for failing
* an operation. If empty, a sign that the operation is valid.
*/
public Optional<String> checkShardLimit(int newShards, ClusterState state) {
return checkShardLimit(newShards, state, getShardLimitPerNode(), getShardLimitPerCluster());
}
// package-private for testing
static Optional<String> checkShardLimit(
int newShards,
ClusterState state,
int maxShardsPerNodeSetting,
int maxShardsPerClusterSetting
) {
int nodeCount = state.getNodes().getDataNodes().size();
// Only enforce the shard limit if we have at least one data node, so that we don't block
// index creation during cluster setup
if (nodeCount == 0 || newShards < 0) {
return Optional.empty();
}
int computedMaxShards = (int) Math.min(Integer.MAX_VALUE, (long) maxShardsPerNodeSetting * nodeCount);
int maxShardsInCluster = maxShardsPerClusterSetting;
if (maxShardsInCluster == -1) {
maxShardsInCluster = computedMaxShards;
} else {
maxShardsInCluster = Math.min(maxShardsInCluster, computedMaxShards);
}
long currentOpenShards = state.getMetadata().getTotalOpenIndexShards();
if ((currentOpenShards + newShards) > maxShardsInCluster) {
String errorMessage = "this action would add ["
+ newShards
+ "] total shards, but this cluster currently has ["
+ currentOpenShards
+ "]/["
+ maxShardsInCluster
+ "] maximum shards open";
return Optional.of(errorMessage);
}
return Optional.empty();
}
/**
* Validates the MaxShadPerCluster threshold.
*/
static final class MaxShardPerClusterLimitValidator implements Setting.Validator<Integer> {
@Override
public void validate(Integer value) {}
@Override
public void validate(Integer maxShardPerCluster, Map<Setting<?>, Object> settings) {
final int maxShardPerNode = (int) settings.get(SETTING_CLUSTER_MAX_SHARDS_PER_NODE);
doValidate(maxShardPerCluster, maxShardPerNode);
}
@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(SETTING_CLUSTER_MAX_SHARDS_PER_NODE);
return settings.iterator();
}
}
/**
* Validates the MaxShadPerNode threshold.
*/
static final class MaxShardPerNodeLimitValidator implements Setting.Validator<Integer> {
@Override
public void validate(Integer value) {}
@Override
public void validate(Integer maxShardPerNode, Map<Setting<?>, Object> settings) {
final int maxShardPerCluster = (int) settings.get(SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER);
doValidate(maxShardPerCluster, maxShardPerNode);
}
@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER);
return settings.iterator();
}
}
private static void doValidate(final int maxShardPerCluster, final int maxShardPerNode) {
if (maxShardPerCluster != -1 && maxShardPerCluster < maxShardPerNode) {
throw new IllegalArgumentException(
"MaxShardPerCluster " + maxShardPerCluster + " should be greater than or equal to MaxShardPerNode " + maxShardPerNode
);
}
}
}