Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d009e53

Browse files
committedDec 6, 2024·
Refactor
Signed-off-by: Pranshu Shukla <pranshushukla06@gmail.com>
1 parent ed9f5e7 commit d009e53

File tree

5 files changed

+116
-62
lines changed

5 files changed

+116
-62
lines changed
 

‎server/src/internalClusterTest/java/org/opensearch/discovery/DiscoveryDisruptionIT.java

+38-10
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@
4444
import org.opensearch.common.Randomness;
4545
import org.opensearch.common.settings.Settings;
4646
import org.opensearch.repositories.RepositoriesService;
47+
import org.opensearch.repositories.Repository;
48+
import org.opensearch.repositories.RepositoryMissingException;
4749
import org.opensearch.repositories.fs.ReloadableFsRepository;
4850
import org.opensearch.test.OpenSearchIntegTestCase;
4951
import org.opensearch.test.disruption.NetworkDisruption;
@@ -57,6 +59,7 @@
5759
import java.util.Arrays;
5860
import java.util.HashSet;
5961
import java.util.List;
62+
import java.util.Objects;
6063
import java.util.Set;
6164
import java.util.concurrent.CountDownLatch;
6265
import java.util.stream.Collectors;
@@ -261,7 +264,9 @@ public void testNodeNotReachableFromClusterManager() throws Exception {
261264
}
262265

263266
/**
264-
* Test Repositories Configured Node Join Commit failures.
267+
* Tests the scenario where-in a cluster-state containing new repository meta-data as part of a node-join from a
268+
* repository-configured node fails on a commit stag and has a master switch. This would lead to master nodes
269+
* doing another round of node-joins with the new cluster-state as the previous attempt had a successful publish.
265270
*/
266271
public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitFails() throws Exception {
267272
final String remoteStateRepoName = "remote-state-repo";
@@ -288,20 +293,32 @@ public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitF
288293
clusterManagerNode
289294
);
290295
logger.info("Blocking Cluster Manager Commit Request on all nodes");
296+
// This is to allow the new node to have commit failures on the nodes in the send path itself. This will lead to the
297+
// nodes have a successful publish operation but failed commit operation. This will come into play once the new node joins
291298
nonClusterManagerNodes.forEach(node -> {
292299
TransportService targetTransportService = internalCluster().getInstance(TransportService.class, node);
293-
clusterManagerTransportService.addOpenSearchFailureException(
294-
targetTransportService,
295-
new FailedToCommitClusterStateException("Blocking Commit"),
296-
PublicationTransportHandler.COMMIT_STATE_ACTION_NAME
297-
);
300+
clusterManagerTransportService.addSendBehavior(targetTransportService, (connection, requestId, action, request, options) -> {
301+
if (action.equals(PublicationTransportHandler.COMMIT_STATE_ACTION_NAME)) {
302+
logger.info("--> preventing {} request", PublicationTransportHandler.COMMIT_STATE_ACTION_NAME);
303+
throw new FailedToCommitClusterStateException("Blocking Commit");
304+
}
305+
connection.sendRequest(requestId, action, request, options);
306+
});
298307
});
299308

300309
logger.info("Starting Node with remote publication settings");
310+
// Start a node with remote-publication repositories configured. This will lead to the active cluster-manager create
311+
// a new cluster-state event with the new node-join along with new repositories setup in the cluster meta-data.
301312
internalCluster().startDataOnlyNodes(1, remotePublicationSettings, Boolean.TRUE);
302313

303314
logger.info("Stopping current Cluster Manager");
315+
// We stop the current cluster-manager whose outbound paths were blocked. This is to force a new election onto nodes
316+
// we had the new cluster-state published but not commited.
304317
internalCluster().stopCurrentClusterManagerNode();
318+
319+
// We expect that the repositories validations are skipped in this case and node-joins succeeds as expected. The
320+
// repositories validations are skipped because even though the cluster-state is updated in the persisted registry,
321+
// the repository service will not be updated as the commit attempt failed.
305322
ensureStableCluster(6);
306323

307324
String randomNode = nonClusterManagerNodes.get(Randomness.get().nextInt(nonClusterManagerNodes.size()));
@@ -330,11 +347,22 @@ public void testElectClusterManagerRemotePublicationConfigurationNodeJoinCommitF
330347

331348
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, randomNode);
332349

333-
if (repositoriesService.isRepositoryPresent(remoteStateRepoName)) {
334-
isRemoteStateRepoConfigured = Boolean.TRUE;
350+
try {
351+
Repository remoteStateRepo = repositoriesService.repository(remoteStateRepoName);
352+
if (Objects.nonNull(remoteStateRepo)) {
353+
isRemoteStateRepoConfigured = Boolean.TRUE;
354+
}
355+
} catch (RepositoryMissingException e) {
356+
isRemoteStateRepoConfigured = Boolean.FALSE;
335357
}
336-
if (repositoriesService.isRepositoryPresent(remoteRoutingTableRepoName)) {
337-
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
358+
359+
try {
360+
Repository routingTableRepo = repositoriesService.repository(remoteRoutingTableRepoName);
361+
if (Objects.nonNull(routingTableRepo)) {
362+
isRemoteRoutingTableRepoConfigured = Boolean.TRUE;
363+
}
364+
} catch (RepositoryMissingException e) {
365+
isRemoteRoutingTableRepoConfigured = Boolean.FALSE;
338366
}
339367

340368
Assert.assertTrue("RemoteState Repo is not set in RepositoryService", isRemoteStateRepoConfigured);

‎server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeService.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.repositories.RepositoriesService;
2222
import org.opensearch.repositories.Repository;
2323
import org.opensearch.repositories.RepositoryException;
24+
import org.opensearch.repositories.RepositoryMissingException;
2425
import org.opensearch.threadpool.ThreadPool;
2526

2627
import java.util.ArrayList;
@@ -183,13 +184,17 @@ public RepositoriesMetadata updateRepositoriesMetadata(DiscoveryNode joiningNode
183184
boolean repositoryAlreadyPresent = false;
184185
for (RepositoryMetadata existingRepositoryMetadata : existingRepositories.repositories()) {
185186
if (newRepositoryMetadata.name().equals(existingRepositoryMetadata.name())) {
186-
// This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded but
187-
// the commit operation failed, the cluster-state may have the repository metadata which is not applied into the
188-
// repository service. This may lead to assertion failures down the line.
189-
if (!repositoriesService.get().isRepositoryPresent(newRepositoryMetadata.name())) {
187+
try {
188+
// This is to handle cases where-in the during a previous node-join attempt if the publish operation succeeded
189+
// but
190+
// the commit operation failed, the cluster-state may have the repository metadata which is not applied into the
191+
// repository service. This may lead to assertion failures down the line.
192+
String repositoryName = newRepositoryMetadata.name();
193+
repositoriesService.get().repository(repositoryName);
194+
} catch (RepositoryMissingException e) {
190195
logger.warn(
191-
"remote repository [{}] in cluster-state but repository-service but not present "
192-
+ "in repository-service, skipping checks",
196+
"Skipping repositories metadata checks: Remote repository [{}] is in the cluster state but not present "
197+
+ "in the repository service.",
193198
newRepositoryMetadata.name()
194199
);
195200
break;

‎server/src/main/java/org/opensearch/repositories/RepositoriesService.java

-4
Original file line numberDiff line numberDiff line change
@@ -579,10 +579,6 @@ public Repository repository(String repositoryName) {
579579
throw new RepositoryMissingException(repositoryName);
580580
}
581581

582-
public Boolean isRepositoryPresent(final String repositoryName) {
583-
return Objects.nonNull(repositories.get(repositoryName));
584-
}
585-
586582
public List<RepositoryStatsSnapshot> repositoriesStats() {
587583
List<RepositoryStatsSnapshot> activeRepoStats = getRepositoryStatsForActiveRepositories();
588584
return activeRepoStats;

‎server/src/test/java/org/opensearch/cluster/coordination/JoinTaskExecutorTests.java

+67
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.opensearch.common.util.FeatureFlags;
5656
import org.opensearch.node.remotestore.RemoteStoreNodeService;
5757
import org.opensearch.repositories.RepositoriesService;
58+
import org.opensearch.repositories.RepositoryMissingException;
5859
import org.opensearch.repositories.blobstore.BlobStoreRepository;
5960
import org.opensearch.test.OpenSearchTestCase;
6061
import org.opensearch.test.VersionUtils;
@@ -1378,6 +1379,72 @@ public void testJoinRemoteStoreClusterWithRemotePublicationNodeInMixedMode() {
13781379
JoinTaskExecutor.ensureNodesCompatibility(joiningNode, currentState.getNodes(), currentState.metadata());
13791380
}
13801381

1382+
public void testUpdatesClusterStateWithRepositoryMetadataNotInSync() throws Exception {
1383+
Map<String, String> newNodeAttributes = new HashMap<>();
1384+
newNodeAttributes.putAll(remoteStateNodeAttributes(CLUSTER_STATE_REPO));
1385+
newNodeAttributes.putAll(remoteRoutingTableAttributes(ROUTING_TABLE_REPO));
1386+
1387+
final AllocationService allocationService = mock(AllocationService.class);
1388+
when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
1389+
final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
1390+
RepositoriesService repositoriesService = mock(RepositoriesService.class);
1391+
when(repositoriesService.repository(any())).thenThrow(RepositoryMissingException.class);
1392+
final RemoteStoreNodeService remoteStoreNodeService = new RemoteStoreNodeService(new SetOnce<>(repositoriesService)::get, null);
1393+
1394+
final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(
1395+
Settings.EMPTY,
1396+
allocationService,
1397+
logger,
1398+
rerouteService,
1399+
remoteStoreNodeService
1400+
);
1401+
1402+
final DiscoveryNode clusterManagerNode = new DiscoveryNode(
1403+
UUIDs.base64UUID(),
1404+
buildNewFakeTransportAddress(),
1405+
newNodeAttributes,
1406+
DiscoveryNodeRole.BUILT_IN_ROLES,
1407+
Version.CURRENT
1408+
);
1409+
1410+
final RepositoryMetadata clusterStateRepo = buildRepositoryMetadata(clusterManagerNode, CLUSTER_STATE_REPO);
1411+
final RepositoryMetadata routingTableRepo = buildRepositoryMetadata(clusterManagerNode, ROUTING_TABLE_REPO);
1412+
List<RepositoryMetadata> repositoriesMetadata = new ArrayList<>() {
1413+
{
1414+
add(clusterStateRepo);
1415+
add(routingTableRepo);
1416+
}
1417+
};
1418+
1419+
final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
1420+
.nodes(
1421+
DiscoveryNodes.builder()
1422+
.add(clusterManagerNode)
1423+
.localNodeId(clusterManagerNode.getId())
1424+
.clusterManagerNodeId(clusterManagerNode.getId())
1425+
)
1426+
.metadata(Metadata.builder().putCustom(RepositoriesMetadata.TYPE, new RepositoriesMetadata(repositoriesMetadata)))
1427+
.build();
1428+
1429+
final DiscoveryNode joiningNode = new DiscoveryNode(
1430+
UUIDs.base64UUID(),
1431+
buildNewFakeTransportAddress(),
1432+
newNodeAttributes,
1433+
DiscoveryNodeRole.BUILT_IN_ROLES,
1434+
Version.CURRENT
1435+
);
1436+
1437+
final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result = joinTaskExecutor.execute(
1438+
clusterState,
1439+
List.of(new JoinTaskExecutor.Task(joiningNode, "test"))
1440+
);
1441+
assertThat(result.executionResults.entrySet(), hasSize(1));
1442+
final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
1443+
assertTrue(taskResult.isSuccess());
1444+
validatePublicationRepositoryMetadata(result.resultingState, clusterManagerNode);
1445+
1446+
}
1447+
13811448
private void validateRepositoryMetadata(ClusterState updatedState, DiscoveryNode existingNode, int expectedRepositories)
13821449
throws Exception {
13831450

‎test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java

-42
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434

3535
import org.apache.logging.log4j.LogManager;
3636
import org.apache.logging.log4j.Logger;
37-
import org.opensearch.OpenSearchException;
3837
import org.opensearch.Version;
3938
import org.opensearch.cluster.ClusterModule;
4039
import org.opensearch.cluster.node.DiscoveryNode;
@@ -377,47 +376,6 @@ public void addFailToSendNoConnectRule(TransportAddress transportAddress, final
377376
});
378377
}
379378

380-
/**
381-
* Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions
382-
*/
383-
public void addOpenSearchFailureException(
384-
TransportService transportService,
385-
final OpenSearchException exception,
386-
final String... blockedActions
387-
) {
388-
addOpenSearchFailureException(transportService, exception, new HashSet<>(Arrays.asList(blockedActions)));
389-
}
390-
391-
/**
392-
* Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions
393-
*/
394-
public void addOpenSearchFailureException(
395-
TransportService transportService,
396-
OpenSearchException exception,
397-
final Set<String> blockedActions
398-
) {
399-
for (TransportAddress transportAddress : extractTransportAddresses(transportService)) {
400-
addOpenSearchFailureException(transportAddress, exception, blockedActions);
401-
}
402-
}
403-
404-
/**
405-
* Adds a rule that will cause matching operations to throw provided OpenSearch Exceptions
406-
*/
407-
public void addOpenSearchFailureException(
408-
TransportAddress transportAddress,
409-
OpenSearchException exception,
410-
final Set<String> blockedActions
411-
) {
412-
transport().addSendBehavior(transportAddress, (connection, requestId, action, request, options) -> {
413-
if (blockedActions.contains(action)) {
414-
logger.info("--> preventing {} request", action);
415-
throw exception;
416-
}
417-
connection.sendRequest(requestId, action, request, options);
418-
});
419-
}
420-
421379
/**
422380
* Adds a rule that will cause ignores each send request, simulating an unresponsive node
423381
* and failing to connect once the rule was added.

0 commit comments

Comments
 (0)
Please sign in to comment.