Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support mapping alias for maintainer-activity indices #124

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;


@Slf4j
public class GithubEventsLambda implements RequestHandler<Map<String, String>, Void> {
Expand Down Expand Up @@ -96,7 +98,7 @@ public Void handleRequest(Map<String, String> input, Context context) {
}
}
String indexName = "github-user-activity-events-" + collectionCurrentDate.format(DateTimeFormatter.ofPattern("MM-yyyy"));
openSearchUtil.createIndexIfNotExists(indexName);
openSearchUtil.createIndexIfNotExists(indexName, Optional.empty());
openSearchUtil.bulkIndex(indexName, finalEventData);
collectionCurrentDate = collectionCurrentDate.plusDays(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void generateGeneralMetrics(List<String> repositories) {
return Stream.of(metricsData);
}))
.collect(Collectors.toMap(MetricsData::getId, metricsData -> metricsData.getJson(metricsData, objectMapper)));
openSearchUtil.createIndexIfNotExists("opensearch_general_metrics");
openSearchUtil.createIndexIfNotExists("opensearch_general_metrics", Optional.empty());
openSearchUtil.bulkIndex("opensearch_general_metrics", metricFinalData);
}

Expand Down Expand Up @@ -157,7 +157,7 @@ public void generateLabelMetrics(List<String> repositories) {
});
}))
.collect(Collectors.toMap(LabelData::getId, labelData -> labelData.getJson(labelData, objectMapper)));
openSearchUtil.createIndexIfNotExists("opensearch_label_metrics");
openSearchUtil.createIndexIfNotExists("opensearch_label_metrics", Optional.empty());
openSearchUtil.bulkIndex("opensearch_label_metrics", metricFinalData);
}

Expand Down Expand Up @@ -207,7 +207,7 @@ public void generateReleaseMetrics() {
}))
.collect(Collectors.toMap(ReleaseMetricsData::getId,
releaseMetricsData -> releaseMetricsData.getJson(releaseMetricsData, objectMapper)));
openSearchUtil.createIndexIfNotExists("opensearch_release_metrics");
openSearchUtil.createIndexIfNotExists("opensearch_release_metrics", Optional.empty());
openSearchUtil.bulkIndex("opensearch_release_metrics", metricFinalData);
}

Expand Down Expand Up @@ -245,7 +245,7 @@ public void generateCodeCovMetrics() {
.collect(Collectors.toMap(CodeCovResult::getId,
codeCovResult -> codeCovResult.getJson(codeCovResult, objectMapper)));
String codeCovIndexName = "opensearch-codecov-metrics-" + currentDate.format(DateTimeFormatter.ofPattern("MM-yyyy"));
openSearchUtil.createIndexIfNotExists(codeCovIndexName);
openSearchUtil.createIndexIfNotExists(codeCovIndexName, Optional.empty());
openSearchUtil.bulkIndex(codeCovIndexName, metricFinalData);
}

Expand Down Expand Up @@ -351,7 +351,7 @@ public void generateMaintainerMetrics(List<String> repositories) {
})
.collect(Collectors.toMap(MaintainerData::getId, maintainerData -> maintainerData.getJson(maintainerData, objectMapper)));
String indexName = "maintainer-inactivity-" + currentDate.format(DateTimeFormatter.ofPattern("MM-yyyy"));
openSearchUtil.createIndexIfNotExists(indexName);
openSearchUtil.createIndexIfNotExists(indexName, Optional.of("maintainer-inactivity"));
openSearchUtil.bulkIndex(indexName, metricFinalData);
}
}
24 changes: 21 additions & 3 deletions src/main/java/org/opensearchmetrics/util/OpenSearchUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentType;
Expand All @@ -31,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
Expand All @@ -49,7 +52,7 @@ public OpenSearchUtil(RestHighLevelClient client) {
this.client = client;
}

public void createIndexIfNotExists(String index) {
public void createIndexIfNotExists(String index, Optional<String> aliasName) {
GetIndexRequest getIndexRequest = new GetIndexRequest(index);
try {
if (!client.indices().exists(getIndexRequest, RequestOptions.DEFAULT)) {
Expand All @@ -66,16 +69,31 @@ public void createIndexIfNotExists(String index) {
throw new RuntimeException(e);
}
System.out.println("Create index " + createIndexResponse.index() + ", acknowledged = " + createIndexResponse.isAcknowledged() + ", shard acknowledged = " + createIndexResponse.isShardsAcknowledged());
//Adds alias if requested to the index after the index is sucessfully created
if(aliasName.isPresent()) {
IndicesAliasesRequest indicesAliasesRequest = new IndicesAliasesRequest()
.addAliasAction(
IndicesAliasesRequest.AliasActions.add()
.index(index)
.alias(aliasName.get())
);
try {
client.indices().updateAliases(indicesAliasesRequest, RequestOptions.DEFAULT);
System.out.println("Alias is added to the index " + index);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

} else {
System.out.println("Index " + index + " already exists, skip creating index.");
}

} catch (IOException e) {
throw new RuntimeException(e);
}
}



/**
* Bulk index json data into an OpenSearch index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;


import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -73,8 +75,8 @@ public void testHandleRequestYesterday() {
// Assert
String indexNameYesterday = "github-user-activity-events-" + yesterday.format(DateTimeFormatter.ofPattern("MM-yyyy"));
String indexNameToday = "github-user-activity-events-" + today.format(DateTimeFormatter.ofPattern("MM-yyyy"));
verify(openSearchUtil, atLeastOnce()).createIndexIfNotExists(indexNameYesterday);
verify(openSearchUtil, atLeastOnce()).createIndexIfNotExists(indexNameToday);
verify(openSearchUtil, atLeastOnce()).createIndexIfNotExists(indexNameYesterday, Optional.empty());
verify(openSearchUtil, atLeastOnce()).createIndexIfNotExists(indexNameToday, Optional.empty());
verify(openSearchUtil, atLeastOnce()).bulkIndex(eq(indexNameYesterday), any(Map.class));
verify(openSearchUtil, atLeastOnce()).bulkIndex(eq(indexNameToday), any(Map.class));
}
Expand All @@ -101,11 +103,11 @@ public void testHandleRequestMonthAgo() {

// Assert
String indexNameLastMonth = "github-user-activity-events-" + lastMonth.format(DateTimeFormatter.ofPattern("MM-yyyy"));
verify(openSearchUtil, atLeastOnce()).createIndexIfNotExists(indexNameLastMonth);
verify(openSearchUtil, atLeastOnce()).createIndexIfNotExists(indexNameLastMonth, Optional.empty());
verify(openSearchUtil, atLeastOnce()).bulkIndex(eq(indexNameLastMonth), any(Map.class));

String indexNameThisMonth = "github-user-activity-events-" + today.format(DateTimeFormatter.ofPattern("MM-yyyy"));
verify(openSearchUtil, atLeastOnce()).createIndexIfNotExists(indexNameThisMonth);
verify(openSearchUtil, atLeastOnce()).createIndexIfNotExists(indexNameThisMonth, Optional.empty());
verify(openSearchUtil, atLeastOnce()).bulkIndex(eq(indexNameThisMonth), any(Map.class));
}

Expand All @@ -131,8 +133,8 @@ public void testHandleRequestDefault() {
// Assert
String indexNameYesterday = "github-user-activity-events-" + yesterday.format(DateTimeFormatter.ofPattern("MM-yyyy"));
String indexNameToday = "github-user-activity-events-" + today.format(DateTimeFormatter.ofPattern("MM-yyyy"));
verify(openSearchUtil, atLeastOnce()).createIndexIfNotExists(indexNameYesterday);
verify(openSearchUtil, atLeastOnce()).createIndexIfNotExists(indexNameToday);
verify(openSearchUtil, atLeastOnce()).createIndexIfNotExists(indexNameYesterday, Optional.empty());
verify(openSearchUtil, atLeastOnce()).createIndexIfNotExists(indexNameToday, Optional.empty());
verify(openSearchUtil, atLeastOnce()).bulkIndex(eq(indexNameYesterday), any(Map.class));
verify(openSearchUtil, atLeastOnce()).bulkIndex(eq(indexNameToday), any(Map.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ void testGenerateGeneralMetrics() throws IOException {
when(untriagedIssues.performSearch(any(), any())).thenReturn(10L);
when(objectMapper.writeValueAsString(any())).thenReturn("json");
metricsCalculation.generateGeneralMetrics(repositories);
verify(openSearchUtil).createIndexIfNotExists("opensearch_general_metrics");
verify(openSearchUtil).createIndexIfNotExists("opensearch_general_metrics", Optional.empty());
verify(openSearchUtil).bulkIndex(eq("opensearch_general_metrics"), any(Map.class));
}

Expand All @@ -136,7 +136,7 @@ void testGenerateLabelMetrics() throws IOException {
when(labelMetrics.getLabelInfo(any(), any())).thenReturn(labelInfo);
when(objectMapper.writeValueAsString(any())).thenReturn("json");
metricsCalculation.generateLabelMetrics(repositories);
verify(openSearchUtil).createIndexIfNotExists("opensearch_label_metrics");
verify(openSearchUtil).createIndexIfNotExists("opensearch_label_metrics", Optional.empty());
verify(openSearchUtil).bulkIndex(eq("opensearch_label_metrics"), any(Map.class));
}

Expand All @@ -157,9 +157,9 @@ void testGenerateReleaseMetrics() {
when(releaseMetrics.getReleaseOwners(ReleaseInputs.VERSION_2_13_0.getVersion(), "repo1")).thenReturn(new String[]{"owner1", "owner2"});
when(releaseMetrics.getReleaseIssue(ReleaseInputs.VERSION_2_13_0.getVersion(), "repo1")).thenReturn("release-123");
metricsCalculation.generateReleaseMetrics();
verify(openSearchUtil).createIndexIfNotExists("opensearch_release_metrics");
verify(openSearchUtil).createIndexIfNotExists("opensearch_release_metrics", Optional.empty());
verify(openSearchUtil).bulkIndex(eq("opensearch_release_metrics"), ArgumentMatchers.anyMap());
verify(openSearchUtil, times(1)).createIndexIfNotExists("opensearch_release_metrics");
verify(openSearchUtil, times(1)).createIndexIfNotExists("opensearch_release_metrics", Optional.empty());
}

@Test
Expand Down Expand Up @@ -187,7 +187,7 @@ void testGenerateCodeCovMetrics() {
throw new RuntimeException(e);
}
metricsCalculation.generateCodeCovMetrics();
verify(openSearchUtil).createIndexIfNotExists(matches("opensearch-codecov-metrics-\\d{2}-\\d{4}"));
verify(openSearchUtil).createIndexIfNotExists(matches("opensearch-codecov-metrics-\\d{2}-\\d{4}"), eq(Optional.empty()));
verify(openSearchUtil).bulkIndex(matches("opensearch-codecov-metrics-\\d{2}-\\d{4}"), argThat(map -> !map.isEmpty()));
verify(releaseMetrics).getCodeCoverage("main", "repo1");
verify(releaseMetrics).getReleaseRepos("2.18.0");
Expand Down Expand Up @@ -221,7 +221,7 @@ void testGenerateMaintainerMetrics() throws IOException{
when(maintainerMetrics.calculateInactivity(50L, slopeAndIntercept, lowerBound, latestEventData)).thenReturn(false);
when(objectMapper.writeValueAsString(any())).thenReturn("json");
metricsCalculation.generateMaintainerMetrics(repositories);
verify(openSearchUtil).createIndexIfNotExists(matches("maintainer-inactivity-\\d{2}-\\d{4}"));
verify(openSearchUtil).createIndexIfNotExists(matches("maintainer-inactivity-\\d{2}-\\d{4}"), eq(Optional.of("maintainer-inactivity")));
verify(openSearchUtil).bulkIndex(matches("maintainer-inactivity-\\d{2}-\\d{4}"), argThat(map -> !map.isEmpty()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,19 @@
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;


import java.io.IOException;
import java.util.Map;
import java.util.Optional;


import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;


public class OpenSearchUtilTest {

Expand All @@ -48,25 +55,45 @@ void setUp() {
void WHEN_index_exists_THEN_doNothing() throws IOException {
when(client.indices()).thenReturn(indicesClient);
when(indicesClient.exists(any(GetIndexRequest.class), any())).thenReturn(true);
openSearchUtil.createIndexIfNotExists("some_index");
openSearchUtil.createIndexIfNotExists("some_index", Optional.empty());

verify(indicesClient, times(0)).create(any(CreateIndexRequest.class), any(RequestOptions.class));
}

@Test
void WHEN_index_not_exist_THEN_create_index() throws IOException {
void WHEN_index_not_exist_THEN_create_index_along_with_alias() throws IOException {

when(client.indices()).thenReturn(indicesClient);
when(indicesClient.exists(any(GetIndexRequest.class), any(RequestOptions.class))).thenReturn(false);
when(indicesClient.create(any(CreateIndexRequest.class), any(RequestOptions.class)))
.thenReturn(new CreateIndexResponse(true, true, "some_index"));
openSearchUtil.createIndexIfNotExists("some_index");
openSearchUtil.createIndexIfNotExists("some_index", Optional.of("maintainer-activity"));

verify(indicesClient).exists(any(GetIndexRequest.class), any(RequestOptions.class));
verify(indicesClient).create(any(CreateIndexRequest.class), any(RequestOptions.class));
verify(indicesClient).updateAliases(any(IndicesAliasesRequest.class), any(RequestOptions.class));
verifyNoMoreInteractions(indicesClient);
}

@Test
void WHEN_index_not_exist_THEN_create_index_along_with_alias_exception() throws IOException {
when(client.indices()).thenReturn(indicesClient);
when(indicesClient.exists(any(GetIndexRequest.class), any(RequestOptions.class))).thenReturn(false);
when(indicesClient.create(any(CreateIndexRequest.class), any(RequestOptions.class)))
.thenReturn(new CreateIndexResponse(true, true, "some_index"));

doThrow(new IOException("Error adding alias to index")).when(indicesClient).updateAliases(any(IndicesAliasesRequest.class), any(RequestOptions.class));

try {
openSearchUtil.createIndexIfNotExists("some_index", Optional.of("maintainer-activity"));
fail("Expected a RuntimeException to be thrown");
} catch (RuntimeException e) {
// Exception caught as expected
System.out.println("Caught exception message: " + e.getMessage());
assertTrue(e.getMessage().contains("Error adding alias to index"));
}
}

@Test
void GIVEN_index_AND_data_THEN_bulkIndex() throws Exception {
BulkResponse mockResponse = Mockito.mock(BulkResponse.class);
Expand Down