Skip to content

Commit 1acba95

Browse files
authored
Search Replica Allocation and Recovery (opensearch-project#17457)
* Restrict Search Replicas to Allocate only to Search dedicated node Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * fixed the javadoc Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * fixed tests Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Treat Regular and Search Replicas Separately to Prevent Allocation Blocking Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Updated tests and some refactor Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Fixed SearchReplica recovery scenario for same node and new node Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Updated the logic for SearchReplica recovery scenario for new node Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Fixed nits after self review Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Modified the search replica allocation based on node attribute Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * fixed PR comments Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Revert "Fixed SearchReplica recovery scenario for same node and new node" This reverts commit de1e719. Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Separated the recovery flow method for search replica Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Revert "fixed PR comments" This reverts commit 8fe8dcf. Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Added unit tests in IndexShardTests Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * updated method name and minor refactor Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Removed search replica recovery logic from internalRecoverFromStore method Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Added integ test to cover search node restart scenario Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Applied search node role in tests and removed searchonly attribute Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Fixed failing test Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Removed unwanted comment Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> * Address PR comments Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com> --------- Signed-off-by: Vinay Krishna Pudyodu <vinkrish.neo@gmail.com>
1 parent 83edf75 commit 1acba95

File tree

15 files changed

+821
-239
lines changed

15 files changed

+821
-239
lines changed

server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaFilteringAllocationIT.java server/src/internalClusterTest/java/org/opensearch/cluster/allocation/SearchReplicaAllocationIT.java

+35-48
Original file line numberDiff line numberDiff line change
@@ -21,31 +21,21 @@
2121
import java.util.stream.Collectors;
2222

2323
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_REPLICATION_TYPE;
24-
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
2524

2625
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
27-
public class SearchReplicaFilteringAllocationIT extends RemoteStoreBaseIntegTestCase {
26+
public class SearchReplicaAllocationIT extends RemoteStoreBaseIntegTestCase {
2827

2928
@Override
3029
protected Settings featureFlagSettings() {
3130
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, Boolean.TRUE).build();
3231
}
3332

34-
public void testSearchReplicaDedicatedIncludes() {
35-
List<String> nodesIds = internalCluster().startNodes(3);
36-
final String node_0 = nodesIds.get(0);
37-
final String node_1 = nodesIds.get(1);
38-
final String node_2 = nodesIds.get(2);
39-
assertEquals(3, cluster().size());
33+
public void testSearchReplicaAllocatedToDedicatedSearchNode() {
34+
internalCluster().startClusterManagerOnlyNode();
35+
String primaryNode = internalCluster().startDataOnlyNode();
36+
internalCluster().startSearchOnlyNode();
4037

41-
client().admin()
42-
.cluster()
43-
.prepareUpdateSettings()
44-
.setTransientSettings(
45-
Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1 + "," + node_0)
46-
)
47-
.execute()
48-
.actionGet();
38+
assertEquals(3, cluster().size());
4939

5040
createIndex(
5141
"test",
@@ -57,42 +47,16 @@ public void testSearchReplicaDedicatedIncludes() {
5747
.build()
5848
);
5949
ensureGreen("test");
60-
// ensure primary is not on node 0 or 1,
50+
// ensure primary is not on searchNode
6151
IndexShardRoutingTable routingTable = getRoutingTable();
62-
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
63-
64-
String existingSearchReplicaNode = getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId());
65-
String emptyAllowedNode = existingSearchReplicaNode.equals(node_0) ? node_1 : node_0;
66-
67-
// set the included nodes to the other open node, search replica should relocate to that node.
68-
client().admin()
69-
.cluster()
70-
.prepareUpdateSettings()
71-
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", emptyAllowedNode))
72-
.execute()
73-
.actionGet();
74-
ensureGreen("test");
75-
76-
routingTable = getRoutingTable();
77-
assertEquals(node_2, getNodeName(routingTable.primaryShard().currentNodeId()));
78-
assertEquals(emptyAllowedNode, getNodeName(routingTable.searchOnlyReplicas().get(0).currentNodeId()));
52+
assertEquals(primaryNode, getNodeName(routingTable.primaryShard().currentNodeId()));
7953
}
8054

8155
public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
82-
List<String> nodesIds = internalCluster().startNodes(3);
83-
final String node_0 = nodesIds.get(0);
84-
final String node_1 = nodesIds.get(1);
85-
final String node_2 = nodesIds.get(2);
56+
internalCluster().startNodes(2);
57+
final String node_1 = internalCluster().startSearchOnlyNode();
8658
assertEquals(3, cluster().size());
8759

88-
// set filter on 1 node and set search replica count to 2 - should leave 1 unassigned
89-
client().admin()
90-
.cluster()
91-
.prepareUpdateSettings()
92-
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", node_1))
93-
.execute()
94-
.actionGet();
95-
9660
logger.info("--> creating an index with no replicas");
9761
createIndex(
9862
"test",
@@ -115,9 +79,32 @@ public void testSearchReplicaDedicatedIncludes_DoNotAssignToOtherNodes() {
11579
assertEquals(1, routingTable.searchOnlyReplicas().stream().filter(ShardRouting::unassigned).count());
11680
}
11781

82+
public void testSearchReplicaDedicatedIncludes_WhenNotSetDoNotAssign() {
83+
internalCluster().startNodes(2);
84+
assertEquals(2, cluster().size());
85+
86+
createIndex(
87+
"test",
88+
Settings.builder()
89+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
90+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
91+
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
92+
.put(SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
93+
.build()
94+
);
95+
ensureYellowAndNoInitializingShards("test");
96+
IndexShardRoutingTable routingTable = getRoutingTable();
97+
assertNull(routingTable.searchOnlyReplicas().get(0).currentNodeId());
98+
99+
// Add a search node
100+
final String searchNode = internalCluster().startSearchOnlyNode();
101+
102+
ensureGreen("test");
103+
assertEquals(searchNode, getNodeName(getRoutingTable().searchOnlyReplicas().get(0).currentNodeId()));
104+
}
105+
118106
private IndexShardRoutingTable getRoutingTable() {
119-
IndexShardRoutingTable routingTable = getClusterState().routingTable().index("test").getShards().get(0);
120-
return routingTable;
107+
return getClusterState().routingTable().index("test").getShards().get(0);
121108
}
122109

123110
private String getNodeName(String id) {

server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaReplicationAndRecoveryIT.java

+37-44
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,13 @@
3131
import org.junit.After;
3232

3333
import java.nio.file.Path;
34-
import java.util.List;
3534
import java.util.Set;
3635
import java.util.concurrent.ExecutionException;
3736

3837
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
3938
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS;
4039
import static org.opensearch.cluster.routing.RecoverySource.Type.EMPTY_STORE;
4140
import static org.opensearch.cluster.routing.RecoverySource.Type.EXISTING_STORE;
42-
import static org.opensearch.cluster.routing.allocation.decider.SearchReplicaAllocationDecider.SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING;
4341

4442
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
4543
public class SearchReplicaReplicationAndRecoveryIT extends SegmentReplicationBaseIT {
@@ -84,20 +82,22 @@ public void testReplication() throws Exception {
8482
final String primary = internalCluster().startDataOnlyNode();
8583
createIndex(INDEX_NAME);
8684
ensureYellowAndNoInitializingShards(INDEX_NAME);
87-
final String replica = internalCluster().startDataOnlyNode();
85+
final String searchNode = internalCluster().startSearchOnlyNode();
86+
8887
ensureGreen(INDEX_NAME);
8988

9089
final int docCount = 10;
9190
for (int i = 0; i < docCount; i++) {
9291
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
9392
}
9493
refresh(INDEX_NAME);
95-
waitForSearchableDocs(docCount, primary, replica);
94+
waitForSearchableDocs(docCount, primary, searchNode);
9695
}
9796

9897
public void testSegmentReplicationStatsResponseWithSearchReplica() throws Exception {
9998
internalCluster().startClusterManagerOnlyNode();
100-
final List<String> nodes = internalCluster().startDataOnlyNodes(2);
99+
final String searchNode = internalCluster().startSearchOnlyNode();
100+
final String primary = internalCluster().startDataOnlyNode();
101101
createIndex(
102102
INDEX_NAME,
103103
Settings.builder()
@@ -107,14 +107,15 @@ public void testSegmentReplicationStatsResponseWithSearchReplica() throws Except
107107
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
108108
.build()
109109
);
110+
110111
ensureGreen(INDEX_NAME);
111112

112113
final int docCount = 5;
113114
for (int i = 0; i < docCount; i++) {
114115
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
115116
}
116117
refresh(INDEX_NAME);
117-
waitForSearchableDocs(docCount, nodes);
118+
waitForSearchableDocs(docCount, primary, searchNode);
118119

119120
SegmentReplicationStatsResponse segmentReplicationStatsResponse = dataNodeClient().admin()
120121
.indices()
@@ -142,46 +143,35 @@ public void testSegmentReplicationStatsResponseWithSearchReplica() throws Except
142143
public void testSearchReplicaRecovery() throws Exception {
143144
internalCluster().startClusterManagerOnlyNode();
144145
final String primary = internalCluster().startDataOnlyNode();
145-
final String replica = internalCluster().startDataOnlyNode();
146-
147-
// ensure search replicas are only allocated to "replica" node.
148-
client().admin()
149-
.cluster()
150-
.prepareUpdateSettings()
151-
.setTransientSettings(Settings.builder().put(SEARCH_REPLICA_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "_name", replica))
152-
.execute()
153-
.actionGet();
146+
final String searchNode = internalCluster().startSearchOnlyNode();
154147

155148
createIndex(INDEX_NAME);
156149
ensureGreen(INDEX_NAME);
157-
assertRecoverySourceType(replica, EMPTY_STORE);
150+
assertRecoverySourceType(searchNode, EMPTY_STORE);
158151

159152
final int docCount = 10;
160153
for (int i = 0; i < docCount; i++) {
161154
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
162155
}
163156
refresh(INDEX_NAME);
164157
flush(INDEX_NAME);
165-
waitForSearchableDocs(10, primary, replica);
158+
waitForSearchableDocs(10, primary, searchNode);
166159

167160
// Node stats should show remote download stats as nonzero, use this as a precondition to compare
168161
// post restart.
169-
assertDownloadStats(replica, true);
170-
NodesStatsResponse nodesStatsResponse;
171-
NodeStats nodeStats;
162+
assertDownloadStats(searchNode, true);
172163

173-
internalCluster().restartNode(replica);
164+
internalCluster().restartNode(searchNode);
174165
ensureGreen(INDEX_NAME);
175-
assertDocCounts(10, replica);
166+
assertDocCounts(10, searchNode);
176167

177-
// assert existing store recovery
178-
assertRecoverySourceType(replica, EXISTING_STORE);
179-
assertDownloadStats(replica, false);
168+
assertRecoverySourceType(searchNode, EXISTING_STORE);
169+
assertDownloadStats(searchNode, false);
180170
}
181171

182172
public void testRecoveryAfterDocsIndexed() throws Exception {
183173
internalCluster().startClusterManagerOnlyNode();
184-
final String primary = internalCluster().startDataOnlyNode();
174+
internalCluster().startDataOnlyNode();
185175
createIndex(INDEX_NAME);
186176
ensureYellowAndNoInitializingShards(INDEX_NAME);
187177
final int docCount = 10;
@@ -190,13 +180,14 @@ public void testRecoveryAfterDocsIndexed() throws Exception {
190180
}
191181
refresh(INDEX_NAME);
192182

193-
final String replica = internalCluster().startDataOnlyNode();
183+
final String searchNode = internalCluster().startSearchOnlyNode();
184+
194185
ensureGreen(INDEX_NAME);
195-
assertDocCounts(10, replica);
186+
assertDocCounts(10, searchNode);
196187

197-
assertRecoverySourceType(replica, EMPTY_STORE);
188+
assertRecoverySourceType(searchNode, EMPTY_STORE);
198189
// replica should have downloaded from remote
199-
assertDownloadStats(replica, true);
190+
assertDownloadStats(searchNode, true);
200191

201192
client().admin()
202193
.indices()
@@ -212,14 +203,14 @@ public void testRecoveryAfterDocsIndexed() throws Exception {
212203
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1))
213204
.get();
214205
ensureGreen(INDEX_NAME);
215-
assertDocCounts(10, replica);
206+
assertDocCounts(10, searchNode);
216207

217-
internalCluster().restartNode(replica);
208+
internalCluster().restartNode(searchNode);
218209

219210
ensureGreen(INDEX_NAME);
220-
assertDocCounts(10, replica);
221-
assertRecoverySourceType(replica, EXISTING_STORE);
222-
assertDownloadStats(replica, false);
211+
assertDocCounts(10, searchNode);
212+
assertRecoverySourceType(searchNode, EXISTING_STORE);
213+
assertDownloadStats(searchNode, false);
223214
}
224215

225216
private static void assertRecoverySourceType(String replica, RecoverySource.Type recoveryType) throws InterruptedException,
@@ -257,29 +248,30 @@ public void testStopPrimary_RestoreOnNewNode() throws Exception {
257248
refresh(INDEX_NAME);
258249
assertDocCounts(docCount, primary);
259250

260-
final String replica = internalCluster().startDataOnlyNode();
251+
final String searchNode = internalCluster().startSearchOnlyNode();
252+
261253
ensureGreen(INDEX_NAME);
262-
assertDocCounts(docCount, replica);
254+
assertDocCounts(docCount, searchNode);
263255
// stop the primary
264256
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(primary));
265257

266258
assertBusy(() -> {
267259
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get();
268260
assertEquals(ClusterHealthStatus.RED, clusterHealthResponse.getStatus());
269261
});
270-
assertDocCounts(docCount, replica);
262+
assertDocCounts(docCount, searchNode);
271263

272264
String restoredPrimary = internalCluster().startDataOnlyNode();
273265

274266
client().admin().cluster().restoreRemoteStore(new RestoreRemoteStoreRequest().indices(INDEX_NAME), PlainActionFuture.newFuture());
275267
ensureGreen(INDEX_NAME);
276-
assertDocCounts(docCount, replica, restoredPrimary);
268+
assertDocCounts(docCount, searchNode, restoredPrimary);
277269

278270
for (int i = docCount; i < docCount * 2; i++) {
279271
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
280272
}
281273
refresh(INDEX_NAME);
282-
assertBusy(() -> assertDocCounts(20, replica, restoredPrimary));
274+
assertBusy(() -> assertDocCounts(20, searchNode, restoredPrimary));
283275
}
284276

285277
public void testFailoverToNewPrimaryWithPollingReplication() throws Exception {
@@ -293,9 +285,10 @@ public void testFailoverToNewPrimaryWithPollingReplication() throws Exception {
293285
}
294286
refresh(INDEX_NAME);
295287

296-
final String replica = internalCluster().startDataOnlyNode();
288+
final String searchNode = internalCluster().startSearchOnlyNode();
289+
297290
ensureGreen(INDEX_NAME);
298-
assertDocCounts(10, replica);
291+
assertDocCounts(10, searchNode);
299292

300293
client().admin()
301294
.indices()
@@ -314,12 +307,12 @@ public void testFailoverToNewPrimaryWithPollingReplication() throws Exception {
314307
});
315308
ClusterHealthResponse clusterHealthResponse = clusterAdmin().prepareHealth(INDEX_NAME).get();
316309
assertEquals(ClusterHealthStatus.YELLOW, clusterHealthResponse.getStatus());
317-
assertDocCounts(10, replica);
310+
assertDocCounts(10, searchNode);
318311

319312
for (int i = docCount; i < docCount * 2; i++) {
320313
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
321314
}
322315
refresh(INDEX_NAME);
323-
assertBusy(() -> assertDocCounts(20, replica, writer_replica));
316+
assertBusy(() -> assertDocCounts(20, searchNode, writer_replica));
324317
}
325318
}

server/src/internalClusterTest/java/org/opensearch/indices/replication/SearchReplicaRestoreIT.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,17 @@ public void testSearchReplicaRestore_WhenSnapshotOnSegRep_RestoreOnSegRepWithSea
7272
Settings.builder().put(SETTING_NUMBER_OF_SEARCH_REPLICAS, 1).build()
7373
);
7474
ensureYellowAndNoInitializingShards(RESTORED_INDEX_NAME);
75-
internalCluster().startDataOnlyNode();
75+
76+
internalCluster().startSearchOnlyNode();
77+
7678
ensureGreen(RESTORED_INDEX_NAME);
7779
assertEquals(1, getNumberOfSearchReplicas(RESTORED_INDEX_NAME));
7880

7981
SearchResponse resp = client().prepareSearch(RESTORED_INDEX_NAME).setQuery(QueryBuilders.matchAllQuery()).get();
8082
assertHitCount(resp, DOC_COUNT);
8183
}
8284

83-
public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_RestoreOnDocRep() throws Exception {
85+
public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_RestoreOnDocRep() {
8486
bootstrapIndexWithSearchReplicas();
8587
createRepoAndSnapshot(REPOSITORY_NAME, FS_REPOSITORY_TYPE, SNAPSHOT_NAME, INDEX_NAME);
8688

@@ -98,7 +100,7 @@ public void testSearchReplicaRestore_WhenSnapshotOnSegRepWithSearchReplica_Resto
98100
}
99101

100102
private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType) throws InterruptedException {
101-
startCluster(2);
103+
internalCluster().startNodes(2);
102104

103105
Settings settings = Settings.builder()
104106
.put(super.indexSettings())
@@ -114,8 +116,9 @@ private void bootstrapIndexWithOutSearchReplicas(ReplicationType replicationType
114116
ensureGreen(INDEX_NAME);
115117
}
116118

117-
private void bootstrapIndexWithSearchReplicas() throws InterruptedException {
118-
startCluster(3);
119+
private void bootstrapIndexWithSearchReplicas() {
120+
internalCluster().startNodes(2);
121+
internalCluster().startSearchOnlyNode();
119122

120123
Settings settings = Settings.builder()
121124
.put(super.indexSettings())
@@ -126,18 +129,14 @@ private void bootstrapIndexWithSearchReplicas() throws InterruptedException {
126129
.build();
127130

128131
createIndex(INDEX_NAME, settings);
132+
129133
ensureGreen(INDEX_NAME);
130134
for (int i = 0; i < DOC_COUNT; i++) {
131135
client().prepareIndex(INDEX_NAME).setId(String.valueOf(i)).setSource("foo", "bar").get();
132136
}
133137
flushAndRefresh(INDEX_NAME);
134138
}
135139

136-
private void startCluster(int numOfNodes) {
137-
internalCluster().startClusterManagerOnlyNode();
138-
internalCluster().startDataOnlyNodes(numOfNodes);
139-
}
140-
141140
private void createRepoAndSnapshot(String repositoryName, String repositoryType, String snapshotName, String indexName) {
142141
createRepository(repositoryName, repositoryType, randomRepoPath().toAbsolutePath());
143142
createSnapshot(repositoryName, snapshotName, List.of(indexName));

0 commit comments

Comments
 (0)