Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] [Point in time] Backport point in time changes #4406

Closed
wants to merge 14 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Adding create pit service layer changes
Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
bharath-techie committed Sep 5, 2022
commit ff7d47f4d799e750591eb269fdba5d4ef43f315f
Original file line number Diff line number Diff line change
@@ -32,15 +32,21 @@

package org.opensearch.search.searchafter;

import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.create.CreateIndexRequestBuilder;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.common.UUIDs;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.hamcrest.Matchers;
@@ -155,6 +161,58 @@ public void testsShouldFail() throws Exception {
}
}

public void testPitWithSearchAfter() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").setMapping("field1", "type=long", "field2", "type=keyword").get());
ensureGreen();
indexRandom(
true,
client().prepareIndex("test").setId("0").setSource("field1", 0),
client().prepareIndex("test").setId("1").setSource("field1", 100, "field2", "toto"),
client().prepareIndex("test").setId("2").setSource("field1", 101),
client().prepareIndex("test").setId("3").setSource("field1", 99)
);

CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true);
request.setIndices(new String[] { "test" });
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
SearchResponse sr = client().prepareSearch()
.addSort("field1", SortOrder.ASC)
.setQuery(matchAllQuery())
.searchAfter(new Object[] { 99 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()))
.get();
assertEquals(2, sr.getHits().getHits().length);
sr = client().prepareSearch()
.addSort("field1", SortOrder.ASC)
.setQuery(matchAllQuery())
.searchAfter(new Object[] { 100 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()))
.get();
assertEquals(1, sr.getHits().getHits().length);
sr = client().prepareSearch()
.addSort("field1", SortOrder.ASC)
.setQuery(matchAllQuery())
.searchAfter(new Object[] { 0 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()))
.get();
assertEquals(3, sr.getHits().getHits().length);
/**
* Add new data and assert PIT results remain the same and normal search results gets refreshed
*/
indexRandom(true, client().prepareIndex("test").setId("4").setSource("field1", 102));
sr = client().prepareSearch()
.addSort("field1", SortOrder.ASC)
.setQuery(matchAllQuery())
.searchAfter(new Object[] { 0 })
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()))
.get();
assertEquals(3, sr.getHits().getHits().length);
sr = client().prepareSearch().addSort("field1", SortOrder.ASC).setQuery(matchAllQuery()).searchAfter(new Object[] { 0 }).get();
assertEquals(4, sr.getHits().getHits().length);
client().admin().indices().prepareDelete("test").get();
}

public void testWithNullStrings() throws InterruptedException {
assertAcked(client().admin().indices().prepareCreate("test").setMapping("field2", "type=keyword").get());
ensureGreen();
Original file line number Diff line number Diff line change
@@ -32,9 +32,13 @@

package org.opensearch.search.slice;

import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;

import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
@@ -46,6 +50,7 @@
import org.opensearch.search.Scroll;
import org.opensearch.search.SearchException;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.SortBuilders;
import org.opensearch.test.OpenSearchIntegTestCase;

@@ -86,7 +91,12 @@ private void setupIndex(int numDocs, int numberOfShards) throws IOException, Exe
client().admin()
.indices()
.prepareCreate("test")
.setSettings(Settings.builder().put("number_of_shards", numberOfShards).put("index.max_slices_per_scroll", 10000))
.setSettings(
Settings.builder()
.put("number_of_shards", numberOfShards)
.put("index.max_slices_per_scroll", 10000)
.put("index.max_slices_per_pit", 10000)
)
.setMapping(mapping)
);
ensureGreen();
@@ -129,6 +139,78 @@ public void testSearchSort() throws Exception {
}
}

public void testSearchSortWithoutPitOrScroll() throws Exception {
int numShards = randomIntBetween(1, 7);
int numDocs = randomIntBetween(100, 1000);
setupIndex(numDocs, numShards);
int fetchSize = randomIntBetween(10, 100);
SearchRequestBuilder request = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setSize(fetchSize)
.addSort(SortBuilders.fieldSort("_doc"));
SliceBuilder sliceBuilder = new SliceBuilder("_id", 0, 4);
SearchPhaseExecutionException ex = expectThrows(SearchPhaseExecutionException.class, () -> request.slice(sliceBuilder).get());
assertTrue(ex.getMessage().contains("all shards failed"));
}

public void testSearchSortWithPIT() throws Exception {
int numShards = randomIntBetween(1, 7);
int numDocs = randomIntBetween(100, 1000);
setupIndex(numDocs, numShards);
int max = randomIntBetween(2, numShards * 3);
CreatePitRequest pitRequest = new CreatePitRequest(TimeValue.timeValueDays(1), true);
pitRequest.setIndices(new String[] { "test" });
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, pitRequest);
CreatePitResponse pitResponse = execute.get();
for (String field : new String[] { "_id", "random_int", "static_int" }) {
int fetchSize = randomIntBetween(10, 100);

// test _doc sort
SearchRequestBuilder request = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()))
.setSize(fetchSize)
.addSort(SortBuilders.fieldSort("_doc"));
assertSearchSlicesWithPIT(request, field, max, numDocs);

// test numeric sort
request = client().prepareSearch("test")
.setQuery(matchAllQuery())
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()))
.setSize(fetchSize)
.addSort(SortBuilders.fieldSort("random_int"));
assertSearchSlicesWithPIT(request, field, max, numDocs);
}
client().admin().indices().prepareDelete("test").get();
}

private void assertSearchSlicesWithPIT(SearchRequestBuilder request, String field, int numSlice, int numDocs) {
int totalResults = 0;
List<String> keys = new ArrayList<>();
for (int id = 0; id < numSlice; id++) {
SliceBuilder sliceBuilder = new SliceBuilder(field, id, numSlice);
SearchResponse searchResponse = request.slice(sliceBuilder).setFrom(0).get();
totalResults += searchResponse.getHits().getHits().length;
int expectedSliceResults = (int) searchResponse.getHits().getTotalHits().value;
int numSliceResults = searchResponse.getHits().getHits().length;
for (SearchHit hit : searchResponse.getHits().getHits()) {
assertTrue(keys.add(hit.getId()));
}
while (searchResponse.getHits().getHits().length > 0) {
searchResponse = request.setFrom(numSliceResults).slice(sliceBuilder).get();
totalResults += searchResponse.getHits().getHits().length;
numSliceResults += searchResponse.getHits().getHits().length;
for (SearchHit hit : searchResponse.getHits().getHits()) {
assertTrue(keys.add(hit.getId()));
}
}
assertThat(numSliceResults, equalTo(expectedSliceResults));
}
assertThat(totalResults, equalTo(numDocs));
assertThat(keys.size(), equalTo(numDocs));
assertThat(new HashSet(keys).size(), equalTo(numDocs));
}

public void testWithPreferenceAndRoutings() throws Exception {
int numShards = 10;
int totalDocs = randomIntBetween(100, 1000);
@@ -217,7 +299,7 @@ public void testInvalidQuery() throws Exception {
);
Throwable rootCause = findRootCause(exc);
assertThat(rootCause.getClass(), equalTo(SearchException.class));
assertThat(rootCause.getMessage(), equalTo("`slice` cannot be used outside of a scroll context"));
assertThat(rootCause.getMessage(), equalTo("`slice` cannot be used outside of a scroll context or PIT context"));
}

private void assertSearchSlicesWithScroll(SearchRequestBuilder request, String field, int numSlice, int numDocs) {
Original file line number Diff line number Diff line change
@@ -234,10 +234,12 @@
import org.opensearch.action.main.MainAction;
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
@@ -664,6 +666,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
// Remote Store
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);

// point in time actions
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);

return unmodifiableMap(actions.getRegistry());
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionType;

/**
* Action type for creating PIT reader context
*/
public class CreatePitAction extends ActionType<CreatePitResponse> {
public static final CreatePitAction INSTANCE = new CreatePitAction();
public static final String NAME = "indices:data/read/point_in_time/create";

private CreatePitAction() {
super(NAME, CreatePitResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.shard.ShardId;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.tasks.Task;
import org.opensearch.transport.Transport;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import static org.opensearch.common.unit.TimeValue.timeValueSeconds;

/**
* Controller for creating PIT reader context
* Phase 1 of create PIT request : Create PIT reader contexts in the associated shards with a temporary keep alive
* Phase 2 of create PIT : Update PIT reader context with PIT ID and keep alive from request and
* fail user request if any of the updates in this phase are failed - we clean up PITs in case of such failures.
* This two phase approach is used to save PIT ID as part of context which is later used for other use cases like list PIT etc.
*/
public class CreatePitController {
private final SearchTransportService searchTransportService;
private final ClusterService clusterService;
private final TransportSearchAction transportSearchAction;
private final NamedWriteableRegistry namedWriteableRegistry;
private static final Logger logger = LogManager.getLogger(CreatePitController.class);
public static final Setting<TimeValue> PIT_INIT_KEEP_ALIVE = Setting.positiveTimeSetting(
"point_in_time.init.keep_alive",
timeValueSeconds(30),
Setting.Property.NodeScope
);

@Inject
public CreatePitController(
SearchTransportService searchTransportService,
ClusterService clusterService,
TransportSearchAction transportSearchAction,
NamedWriteableRegistry namedWriteableRegistry
) {
this.searchTransportService = searchTransportService;
this.clusterService = clusterService;
this.transportSearchAction = transportSearchAction;
this.namedWriteableRegistry = namedWriteableRegistry;
}

/**
* This method creates PIT reader context
*/
public void executeCreatePit(
CreatePitRequest request,
Task task,
StepListener<SearchResponse> createPitListener,
ActionListener<CreatePitResponse> updatePitIdListener
) {
SearchRequest searchRequest = new SearchRequest(request.getIndices());
searchRequest.preference(request.getPreference());
searchRequest.routing(request.getRouting());
searchRequest.indicesOptions(request.getIndicesOptions());
searchRequest.allowPartialSearchResults(request.shouldAllowPartialPitCreation());
SearchTask searchTask = searchRequest.createTask(
task.getId(),
task.getType(),
task.getAction(),
task.getParentTaskId(),
Collections.emptyMap()
);
/**
* Phase 1 of create PIT
*/
executeCreatePit(searchTask, searchRequest, createPitListener);

/**
* Phase 2 of create PIT where we update pit id in pit contexts
*/
createPitListener.whenComplete(
searchResponse -> { executeUpdatePitId(request, searchRequest, searchResponse, updatePitIdListener); },
updatePitIdListener::onFailure
);
}

/**
* Creates PIT reader context with temporary keep alive
*/
void executeCreatePit(Task task, SearchRequest searchRequest, StepListener<SearchResponse> createPitListener) {
logger.debug(
() -> new ParameterizedMessage("Executing creation of PIT context for indices [{}]", Arrays.toString(searchRequest.indices()))
);
transportSearchAction.executeRequest(
task,
searchRequest,
TransportCreatePitAction.CREATE_PIT_ACTION,
true,
new TransportSearchAction.SinglePhaseSearchAction() {
@Override
public void executeOnShardTarget(
SearchTask searchTask,
SearchShardTarget target,
Transport.Connection connection,
ActionListener<SearchPhaseResult> searchPhaseResultActionListener
) {
searchTransportService.createPitContext(
connection,
new TransportCreatePitAction.CreateReaderContextRequest(
target.getShardId(),
PIT_INIT_KEEP_ALIVE.get(clusterService.getSettings())
),
searchTask,
ActionListener.wrap(r -> searchPhaseResultActionListener.onResponse(r), searchPhaseResultActionListener::onFailure)
);
}
},
createPitListener
);
}

/**
* Updates PIT ID, keep alive and createdTime of PIT reader context
*/
void executeUpdatePitId(
CreatePitRequest request,
SearchRequest searchRequest,
SearchResponse searchResponse,
ActionListener<CreatePitResponse> updatePitIdListener
) {
logger.debug(
() -> new ParameterizedMessage(
"Updating PIT context with PIT ID [{}], creation time and keep alive",
searchResponse.pointInTimeId()
)
);
/**
* store the create time ( same create time for all PIT contexts across shards ) to be used
* for list PIT api
*/
final long relativeStartNanos = System.nanoTime();
final TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
searchRequest.getOrCreateAbsoluteStartMillis(),
relativeStartNanos,
System::nanoTime
);
final long creationTime = timeProvider.getAbsoluteStartMillis();
CreatePitResponse createPITResponse = new CreatePitResponse(
searchResponse.pointInTimeId(),
creationTime,
searchResponse.getTotalShards(),
searchResponse.getSuccessfulShards(),
searchResponse.getSkippedShards(),
searchResponse.getFailedShards(),
searchResponse.getShardFailures()
);
SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, createPITResponse.getId());
final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = getConnectionLookupListener(contextId);
lookupListener.whenComplete(nodelookup -> {
final ActionListener<UpdatePitContextResponse> groupedActionListener = getGroupedListener(
updatePitIdListener,
createPITResponse,
contextId.shards().size(),
contextId.shards().values()
);
for (Map.Entry<ShardId, SearchContextIdForNode> entry : contextId.shards().entrySet()) {
DiscoveryNode node = nodelookup.apply(entry.getValue().getClusterAlias(), entry.getValue().getNode());
try {
final Transport.Connection connection = searchTransportService.getConnection(entry.getValue().getClusterAlias(), node);
searchTransportService.updatePitContext(
connection,
new UpdatePitContextRequest(
entry.getValue().getSearchContextId(),
createPITResponse.getId(),
request.getKeepAlive().millis(),
creationTime
),
groupedActionListener
);
} catch (Exception e) {
logger.error(
() -> new ParameterizedMessage(
"Create pit update phase failed for PIT ID [{}] on node [{}]",
searchResponse.pointInTimeId(),
node
),
e
);
groupedActionListener.onFailure(
new OpenSearchException(
"Create pit update phase for PIT ID [" + searchResponse.pointInTimeId() + "] failed on node[" + node + "]",
e
)
);
}
}
}, updatePitIdListener::onFailure);
}

private StepListener<BiFunction<String, String, DiscoveryNode>> getConnectionLookupListener(SearchContextId contextId) {
ClusterState state = clusterService.state();
final Set<String> clusters = contextId.shards()
.values()
.stream()
.filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false)
.map(SearchContextIdForNode::getClusterAlias)
.collect(Collectors.toSet());
return (StepListener<BiFunction<String, String, DiscoveryNode>>) SearchUtils.getConnectionLookupListener(
searchTransportService.getRemoteClusterService(),
state,
clusters
);
}

private ActionListener<UpdatePitContextResponse> getGroupedListener(
ActionListener<CreatePitResponse> updatePitIdListener,
CreatePitResponse createPITResponse,
int size,
Collection<SearchContextIdForNode> contexts
) {
return new GroupedActionListener<>(new ActionListener<>() {
@Override
public void onResponse(final Collection<UpdatePitContextResponse> responses) {
updatePitIdListener.onResponse(createPITResponse);
}

@Override
public void onFailure(final Exception e) {
updatePitIdListener.onFailure(e);
}
}, size);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionRequest;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.common.Nullable;
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

import static org.opensearch.action.ValidateActions.addValidationError;

/**
* A request to make create point in time against one or more indices.
*/
public class CreatePitRequest extends ActionRequest implements IndicesRequest.Replaceable, ToXContent {

// keep alive for pit reader context
private TimeValue keepAlive;

// this describes whether PIT can be created with partial failures
private Boolean allowPartialPitCreation;
@Nullable
private String routing = null;
@Nullable
private String preference = null;
private String[] indices = Strings.EMPTY_ARRAY;
private IndicesOptions indicesOptions = SearchRequest.DEFAULT_INDICES_OPTIONS;

public CreatePitRequest(TimeValue keepAlive, Boolean allowPartialPitCreation, String... indices) {
this.keepAlive = keepAlive;
this.allowPartialPitCreation = allowPartialPitCreation;
this.indices = indices;
}

public CreatePitRequest(StreamInput in) throws IOException {
super(in);
indices = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
routing = in.readOptionalString();
preference = in.readOptionalString();
keepAlive = in.readTimeValue();
allowPartialPitCreation = in.readOptionalBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(indices);
indicesOptions.writeIndicesOptions(out);
out.writeOptionalString(routing);
out.writeOptionalString(preference);
out.writeTimeValue(keepAlive);
out.writeOptionalBoolean(allowPartialPitCreation);
}

public String getRouting() {
return routing;
}

public String getPreference() {
return preference;
}

public String[] getIndices() {
return indices;
}

public IndicesOptions getIndicesOptions() {
return indicesOptions;
}

public TimeValue getKeepAlive() {
return keepAlive;
}

/**
* Sets if this request should allow partial results.
*/
public void allowPartialPitCreation(Boolean allowPartialPitCreation) {
this.allowPartialPitCreation = allowPartialPitCreation;
}

public boolean shouldAllowPartialPitCreation() {
return allowPartialPitCreation;
}

public void setRouting(String routing) {
this.routing = routing;
}

public void setPreference(String preference) {
this.preference = preference;
}

public void setIndices(String[] indices) {
this.indices = indices;
}

public void setIndicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = Objects.requireNonNull(indicesOptions, "indicesOptions must not be null");
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (keepAlive == null) {
validationException = addValidationError("keep alive not specified", validationException);
}
return validationException;
}

@Override
public String[] indices() {
return indices;
}

@Override
public IndicesOptions indicesOptions() {
return indicesOptions;
}

public CreatePitRequest indicesOptions(IndicesOptions indicesOptions) {
this.indicesOptions = Objects.requireNonNull(indicesOptions, "indicesOptions must not be null");
return this;
}

public void setKeepAlive(TimeValue keepAlive) {
this.keepAlive = keepAlive;
}

public final String buildDescription() {
StringBuilder sb = new StringBuilder();
sb.append("indices[");
Strings.arrayToDelimitedString(indices, ",", sb);
sb.append("], ");
sb.append("pointintime[").append(keepAlive).append("], ");
sb.append("allowPartialPitCreation[").append(allowPartialPitCreation).append("], ");
return sb.toString();
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new Task(id, type, action, this.buildDescription(), parentTaskId, headers);
}

private void validateIndices(String... indices) {
Objects.requireNonNull(indices, "indices must not be null");
for (String index : indices) {
Objects.requireNonNull(index, "index must not be null");
}
}

@Override
public CreatePitRequest indices(String... indices) {
validateIndices(indices);
this.indices = indices;
return this;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field("keep_alive", keepAlive);
builder.field("allow_partial_pit_creation", allowPartialPitCreation);
if (indices != null) {
builder.startArray("indices");
for (String index : indices) {
builder.value(index);
}
builder.endArray();
}
if (indicesOptions != null) {
indicesOptions.toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionResponse;
import org.opensearch.common.ParseField;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.StatusToXContentObject;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.rest.RestStatus;
import org.opensearch.rest.action.RestActions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;

/**
* Create point in time response with point in time id and shard success / failures
*/
public class CreatePitResponse extends ActionResponse implements StatusToXContentObject {
private static final ParseField ID = new ParseField("id");
private static final ParseField CREATION_TIME = new ParseField("creation_time");

// point in time id
private final String id;
private final int totalShards;
private final int successfulShards;
private final int failedShards;
private final int skippedShards;
private final ShardSearchFailure[] shardFailures;
private final long creationTime;

public CreatePitResponse(StreamInput in) throws IOException {
super(in);
id = in.readString();
totalShards = in.readVInt();
successfulShards = in.readVInt();
failedShards = in.readVInt();
skippedShards = in.readVInt();
creationTime = in.readLong();
int size = in.readVInt();
if (size == 0) {
shardFailures = ShardSearchFailure.EMPTY_ARRAY;
} else {
shardFailures = new ShardSearchFailure[size];
for (int i = 0; i < shardFailures.length; i++) {
shardFailures[i] = ShardSearchFailure.readShardSearchFailure(in);
}
}
}

public CreatePitResponse(
String id,
long creationTime,
int totalShards,
int successfulShards,
int skippedShards,
int failedShards,
ShardSearchFailure[] shardFailures
) {
this.id = id;
this.creationTime = creationTime;
this.totalShards = totalShards;
this.successfulShards = successfulShards;
this.skippedShards = skippedShards;
this.failedShards = failedShards;
this.shardFailures = shardFailures;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ID.getPreferredName(), id);
RestActions.buildBroadcastShardsHeader(
builder,
params,
getTotalShards(),
getSuccessfulShards(),
getSkippedShards(),
getFailedShards(),
getShardFailures()
);
builder.field(CREATION_TIME.getPreferredName(), creationTime);
builder.endObject();
return builder;
}

/**
* Parse the create PIT response body into a new {@link CreatePitResponse} object
*/
public static CreatePitResponse fromXContent(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
parser.nextToken();
return innerFromXContent(parser);
}

public static CreatePitResponse innerFromXContent(XContentParser parser) throws IOException {
ensureExpectedToken(XContentParser.Token.FIELD_NAME, parser.currentToken(), parser);
String currentFieldName = parser.currentName();
int successfulShards = -1;
int totalShards = -1;
int skippedShards = 0;
int failedShards = 0;
String id = null;
long creationTime = 0;
List<ShardSearchFailure> failures = new ArrayList<>();
for (XContentParser.Token token = parser.nextToken(); token != XContentParser.Token.END_OBJECT; token = parser.nextToken()) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (CREATION_TIME.match(currentFieldName, parser.getDeprecationHandler())) {
creationTime = parser.longValue();
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
id = parser.text();
} else {
parser.skipChildren();
}
} else if (token == XContentParser.Token.START_OBJECT) {
if (RestActions._SHARDS_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (RestActions.FAILED_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
failedShards = parser.intValue(); // we don't need it but need to consume it
} else if (RestActions.SUCCESSFUL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
successfulShards = parser.intValue();
} else if (RestActions.TOTAL_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
totalShards = parser.intValue();
} else if (RestActions.SKIPPED_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
skippedShards = parser.intValue();
} else {
parser.skipChildren();
}
} else if (token == XContentParser.Token.START_ARRAY) {
if (RestActions.FAILURES_FIELD.match(currentFieldName, parser.getDeprecationHandler())) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
failures.add(ShardSearchFailure.fromXContent(parser));
}
} else {
parser.skipChildren();
}
} else {
parser.skipChildren();
}
}
} else {
parser.skipChildren();
}
}
}

return new CreatePitResponse(
id,
creationTime,
totalShards,
successfulShards,
skippedShards,
failedShards,
failures.toArray(ShardSearchFailure.EMPTY_ARRAY)
);
}

public long getCreationTime() {
return creationTime;
}

/**
* The failed number of shards the search was executed on.
*/
public int getFailedShards() {
return shardFailures.length;
}

/**
* The failures that occurred during the search.
*/
public ShardSearchFailure[] getShardFailures() {
return this.shardFailures;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
out.writeVInt(totalShards);
out.writeVInt(successfulShards);
out.writeVInt(failedShards);
out.writeVInt(skippedShards);
out.writeLong(creationTime);
out.writeVInt(shardFailures.length);
for (ShardSearchFailure shardSearchFailure : shardFailures) {
shardSearchFailure.writeTo(out);
}
}

public String getId() {
return id;
}

/**
* The total number of shards the create pit operation was executed on.
*/
public int getTotalShards() {
return totalShards;
}

/**
* The successful number of shards the create pit operation was executed on.
*/
public int getSuccessfulShards() {
return successfulShards;
}

public int getSkippedShards() {
return skippedShards;
}

@Override
public RestStatus status() {
return RestStatus.status(successfulShards, totalShards, shardFailures);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;

import java.io.IOException;

/**
* Pit ID along with Id for a search context per node.
*
* @opensearch.internal
*/
public class PitSearchContextIdForNode implements Writeable {

private final String pitId;
private final SearchContextIdForNode searchContextIdForNode;

public PitSearchContextIdForNode(String pitId, SearchContextIdForNode searchContextIdForNode) {
this.pitId = pitId;
this.searchContextIdForNode = searchContextIdForNode;
}

PitSearchContextIdForNode(StreamInput in) throws IOException {
this.pitId = in.readString();
this.searchContextIdForNode = new SearchContextIdForNode(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(pitId);
searchContextIdForNode.writeTo(out);
}

public String getPitId() {
return pitId;
}

public SearchContextIdForNode getSearchContextIdForNode() {
return searchContextIdForNode;
}
}
Original file line number Diff line number Diff line change
@@ -116,7 +116,7 @@ public static SearchContextId decode(NamedWriteableRegistry namedWriteableRegist
}
return new SearchContextId(Collections.unmodifiableMap(shards), Collections.unmodifiableMap(aliasFilters));
} catch (IOException e) {
throw new IllegalArgumentException(e);
throw new IllegalArgumentException("invalid id: [" + id + "]", e);
}
}

Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@
*
* @opensearch.internal
*/
public final class SearchContextIdForNode implements Writeable {
final class SearchContextIdForNode implements Writeable {
private final String node;
private final ShardSearchContextId searchContextId;
private final String clusterAlias;
Original file line number Diff line number Diff line change
@@ -95,6 +95,8 @@ public class SearchTransportService {
public static final String FETCH_ID_SCROLL_ACTION_NAME = "indices:data/read/search[phase/fetch/id/scroll]";
public static final String FETCH_ID_ACTION_NAME = "indices:data/read/search[phase/fetch/id]";
public static final String QUERY_CAN_MATCH_NAME = "indices:data/read/search[can_match]";
public static final String CREATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[create_context]";
public static final String UPDATE_READER_CONTEXT_ACTION_NAME = "indices:data/read/search[update_context]";

private final TransportService transportService;
private final BiFunction<Transport.Connection, SearchActionListener, ActionListener> responseWrapper;
@@ -142,6 +144,36 @@ public void sendFreeContext(
);
}

public void updatePitContext(
Transport.Connection connection,
UpdatePitContextRequest request,
ActionListener<UpdatePitContextResponse> actionListener
) {
transportService.sendRequest(
connection,
UPDATE_READER_CONTEXT_ACTION_NAME,
request,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(actionListener, UpdatePitContextResponse::new)
);
}

public void createPitContext(
Transport.Connection connection,
TransportCreatePitAction.CreateReaderContextRequest request,
SearchTask task,
ActionListener<TransportCreatePitAction.CreateReaderContextResponse> actionListener
) {
transportService.sendChildRequest(
connection,
CREATE_READER_CONTEXT_ACTION_NAME,
request,
task,
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(actionListener, TransportCreatePitAction.CreateReaderContextResponse::new)
);
}

public void sendCanMatch(
Transport.Connection connection,
final ShardSearchRequest request,
@@ -562,6 +594,48 @@ public static void registerRequestHandler(TransportService transportService, Sea
}
);
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, SearchService.CanMatchResponse::new);
transportService.registerRequestHandler(
CREATE_READER_CONTEXT_ACTION_NAME,
ThreadPool.Names.SAME,
TransportCreatePitAction.CreateReaderContextRequest::new,
(request, channel, task) -> {
ChannelActionListener<
TransportCreatePitAction.CreateReaderContextResponse,
TransportCreatePitAction.CreateReaderContextRequest> listener = new ChannelActionListener<>(
channel,
CREATE_READER_CONTEXT_ACTION_NAME,
request
);
searchService.createPitReaderContext(
request.getShardId(),
request.getKeepAlive(),
ActionListener.wrap(
r -> listener.onResponse(new TransportCreatePitAction.CreateReaderContextResponse(r)),
listener::onFailure
)
);
}
);
TransportActionProxy.registerProxyAction(
transportService,
CREATE_READER_CONTEXT_ACTION_NAME,
TransportCreatePitAction.CreateReaderContextResponse::new
);

transportService.registerRequestHandler(
UPDATE_READER_CONTEXT_ACTION_NAME,
ThreadPool.Names.SAME,
UpdatePitContextRequest::new,
(request, channel, task) -> {
ChannelActionListener<UpdatePitContextResponse, UpdatePitContextRequest> listener = new ChannelActionListener<>(
channel,
UPDATE_READER_CONTEXT_ACTION_NAME,
request
);
searchService.updatePitIdAndKeepAlive(request, listener);
}
);
TransportActionProxy.registerProxyAction(transportService, UPDATE_READER_CONTEXT_ACTION_NAME, UpdatePitContextResponse::new);
}

/**
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.transport.RemoteClusterService;

import java.util.Set;
import java.util.function.BiFunction;

/**
* Helper class for common search functions
*/
public class SearchUtils {

public SearchUtils() {}

/**
* Get connection lookup listener for list of clusters passed
*/
public static ActionListener<BiFunction<String, String, DiscoveryNode>> getConnectionLookupListener(
RemoteClusterService remoteClusterService,
ClusterState state,
Set<String> clusters
) {
final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = new StepListener<>();

if (clusters.isEmpty()) {
lookupListener.onResponse((cluster, nodeId) -> state.getNodes().get(nodeId));
} else {
remoteClusterService.collectNodes(clusters, lookupListener);
}
return lookupListener;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.index.shard.ShardId;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;

import java.io.IOException;
import java.util.Arrays;

/**
* Transport action for creating PIT reader context
*/
public class TransportCreatePitAction extends HandledTransportAction<CreatePitRequest, CreatePitResponse> {

public static final String CREATE_PIT_ACTION = "create_pit";
private final TransportService transportService;
private final SearchTransportService searchTransportService;
private final ClusterService clusterService;
private final TransportSearchAction transportSearchAction;
private final NamedWriteableRegistry namedWriteableRegistry;
private final CreatePitController createPitController;

@Inject
public TransportCreatePitAction(
TransportService transportService,
ActionFilters actionFilters,
SearchTransportService searchTransportService,
ClusterService clusterService,
TransportSearchAction transportSearchAction,
NamedWriteableRegistry namedWriteableRegistry,
CreatePitController createPitController
) {
super(CreatePitAction.NAME, transportService, actionFilters, in -> new CreatePitRequest(in));
this.transportService = transportService;
this.searchTransportService = searchTransportService;
this.clusterService = clusterService;
this.transportSearchAction = transportSearchAction;
this.namedWriteableRegistry = namedWriteableRegistry;
this.createPitController = createPitController;
}

@Override
protected void doExecute(Task task, CreatePitRequest request, ActionListener<CreatePitResponse> listener) {
final StepListener<SearchResponse> createPitListener = new StepListener<>();
final ActionListener<CreatePitResponse> updatePitIdListener = ActionListener.wrap(r -> listener.onResponse(r), e -> {
logger.error(
() -> new ParameterizedMessage(
"PIT creation failed while updating PIT ID for indices [{}]",
Arrays.toString(request.indices())
)
);
listener.onFailure(e);
});
createPitController.executeCreatePit(request, task, createPitListener, updatePitIdListener);
}

/**
* Request to create pit reader context with keep alive
*/
public static class CreateReaderContextRequest extends TransportRequest {
private final ShardId shardId;
private final TimeValue keepAlive;

public CreateReaderContextRequest(ShardId shardId, TimeValue keepAlive) {
this.shardId = shardId;
this.keepAlive = keepAlive;
}

public ShardId getShardId() {
return shardId;
}

public TimeValue getKeepAlive() {
return keepAlive;
}

public CreateReaderContextRequest(StreamInput in) throws IOException {
super(in);
this.shardId = new ShardId(in);
this.keepAlive = in.readTimeValue();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardId.writeTo(out);
out.writeTimeValue(keepAlive);
}
}

/**
* Create pit reader context response which holds the contextId
*/
public static class CreateReaderContextResponse extends SearchPhaseResult {
public CreateReaderContextResponse(ShardSearchContextId shardSearchContextId) {
this.contextId = shardSearchContextId;
}

public CreateReaderContextResponse(StreamInput in) throws IOException {
super(in);
contextId = new ShardSearchContextId(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
contextId.writeTo(out);
}
}

}
Original file line number Diff line number Diff line change
@@ -65,6 +65,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.common.util.concurrent.CountDown;
import org.opensearch.index.Index;
import org.opensearch.index.query.Rewriteable;
@@ -297,6 +298,81 @@ void executeOnShardTarget(
);
}

public void executeRequest(
Task task,
SearchRequest searchRequest,
String actionName,
boolean includeSearchContext,
SinglePhaseSearchAction phaseSearchAction,
ActionListener<SearchResponse> listener
) {
executeRequest(task, searchRequest, new SearchAsyncActionProvider() {
@Override
public AbstractSearchAsyncAction<? extends SearchPhaseResult> asyncSearchAction(
SearchTask task,
SearchRequest searchRequest,
Executor executor,
GroupShardsIterator<SearchShardIterator> shardsIts,
SearchTimeProvider timeProvider,
BiFunction<String, String, Transport.Connection> connectionLookup,
ClusterState clusterState,
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
Map<String, Set<String>> indexRoutings,
ActionListener<SearchResponse> listener,
boolean preFilter,
ThreadPool threadPool,
SearchResponse.Clusters clusters
) {
return new AbstractSearchAsyncAction<SearchPhaseResult>(
actionName,
logger,
searchTransportService,
connectionLookup,
aliasFilter,
concreteIndexBoosts,
indexRoutings,
executor,
searchRequest,
listener,
shardsIts,
timeProvider,
clusterState,
task,
new ArraySearchPhaseResults<>(shardsIts.size()),
searchRequest.getMaxConcurrentShardRequests(),
clusters
) {
@Override
protected void executePhaseOnShard(
SearchShardIterator shardIt,
SearchShardTarget shard,
SearchActionListener<SearchPhaseResult> listener
) {
final Transport.Connection connection = getConnection(shard.getClusterAlias(), shard.getNodeId());
phaseSearchAction.executeOnShardTarget(task, shard, connection, listener);
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchPhaseResult> results, SearchPhaseContext context) {
return new SearchPhase(getName()) {
@Override
public void run() {
final AtomicArray<SearchPhaseResult> atomicArray = results.getAtomicArray();
sendSearchResponse(InternalSearchResponse.empty(), atomicArray);
}
};
}

@Override
boolean buildPointInTimeFromSearchResults() {
return includeSearchContext;
}
};
}
}, listener);
}

private void executeRequest(
Task task,
SearchRequest searchRequest,
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.transport.TransportRequest;

import java.io.IOException;

/**
* Request used to update PIT reader contexts with pitId, keepAlive and creationTime
*/
public class UpdatePitContextRequest extends TransportRequest {
private final String pitId;
private final long keepAlive;

private final long creationTime;
private final ShardSearchContextId searchContextId;

public UpdatePitContextRequest(ShardSearchContextId searchContextId, String pitId, long keepAlive, long creationTime) {
this.pitId = pitId;
this.searchContextId = searchContextId;
this.keepAlive = keepAlive;
this.creationTime = creationTime;
}

UpdatePitContextRequest(StreamInput in) throws IOException {
super(in);
pitId = in.readString();
keepAlive = in.readLong();
creationTime = in.readLong();
searchContextId = new ShardSearchContextId(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(pitId);
out.writeLong(keepAlive);
out.writeLong(creationTime);
searchContextId.writeTo(out);
}

public ShardSearchContextId getSearchContextId() {
return searchContextId;
}

public String getPitId() {
return pitId;
}

public long getCreationTime() {
return creationTime;
}

public long getKeepAlive() {
return keepAlive;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.transport.TransportResponse;

import java.io.IOException;

/**
* Update PIT context response with creation time, keep alive etc.
*/
public class UpdatePitContextResponse extends TransportResponse {
private final String pitId;

private final long creationTime;

private final long keepAlive;

UpdatePitContextResponse(StreamInput in) throws IOException {
super(in);
pitId = in.readString();
creationTime = in.readLong();
keepAlive = in.readLong();
}

public UpdatePitContextResponse(String pitId, long creationTime, long keepAlive) {
this.pitId = pitId;
this.keepAlive = keepAlive;
this.creationTime = creationTime;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(pitId);
out.writeLong(creationTime);
out.writeLong(keepAlive);
}

public String getPitId() {
return pitId;
}

public long getKeepAlive() {
return keepAlive;
}

public long getCreationTime() {
return creationTime;
}
}
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/client/Client.java
Original file line number Diff line number Diff line change
@@ -58,6 +58,8 @@
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollRequestBuilder;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchRequestBuilder;
import org.opensearch.action.search.MultiSearchResponse;
@@ -325,6 +327,11 @@ public interface Client extends OpenSearchClient, Releasable {
*/
SearchScrollRequestBuilder prepareSearchScroll(String scrollId);

/**
* Create point in time for one or more indices
*/
void createPit(CreatePitRequest createPITRequest, ActionListener<CreatePitResponse> listener);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bharath-techie from the API perspective, this is beaking change. Since we backport to 2.x (no breaking changes), we probably should mark this methods as default.


/**
* Performs multiple search requests.
*/
Original file line number Diff line number Diff line change
@@ -327,6 +327,9 @@
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollRequestBuilder;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchRequestBuilder;
@@ -577,6 +580,11 @@ public SearchScrollRequestBuilder prepareSearchScroll(String scrollId) {
return new SearchScrollRequestBuilder(this, SearchScrollAction.INSTANCE, scrollId);
}

@Override
public void createPit(final CreatePitRequest createPITRequest, final ActionListener<CreatePitResponse> listener) {
execute(CreatePitAction.INSTANCE, createPITRequest, listener);
}

@Override
public ActionFuture<MultiSearchResponse> multiSearch(MultiSearchRequest request) {
return execute(MultiSearchAction.INSTANCE, request);
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@
import org.apache.logging.log4j.LogManager;
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance;
import org.opensearch.action.search.CreatePitController;
import org.opensearch.cluster.routing.allocation.decider.NodeLoadAwareAllocationDecider;
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexSettings;
@@ -472,6 +473,9 @@ public void apply(Settings value, Settings current, Settings previous) {
MultiBucketConsumerService.MAX_BUCKET_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
SearchService.MAX_OPEN_SCROLL_CONTEXT,
SearchService.MAX_OPEN_PIT_CONTEXT,
SearchService.MAX_PIT_KEEPALIVE_SETTING,
CreatePitController.PIT_INIT_KEEP_ALIVE,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,
Node.NODE_ATTRIBUTES,
Original file line number Diff line number Diff line change
@@ -149,6 +149,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_CHECK_ON_STARTUP,
IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD,
IndexSettings.MAX_SLICES_PER_SCROLL,
IndexSettings.MAX_SLICES_PER_PIT,
IndexSettings.MAX_REGEX_LENGTH_SETTING,
ShardsLimitAllocationDecider.INDEX_TOTAL_SHARDS_PER_NODE_SETTING,
IndexSettings.INDEX_GC_DELETES_SETTING,
29 changes: 28 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
@@ -457,6 +457,17 @@ public final class IndexSettings {
Property.IndexScope
);

/**
* The maximum number of slices allowed in a search request with PIT
*/
public static final Setting<Integer> MAX_SLICES_PER_PIT = Setting.intSetting(
"index.max_slices_per_pit",
1024,
1,
Property.Dynamic,
Property.IndexScope
);

/**
* The maximum length of regex string allowed in a regexp query.
*/
@@ -617,7 +628,10 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
* The maximum number of slices allowed in a scroll request.
*/
private volatile int maxSlicesPerScroll;

/**
* The maximum number of slices allowed in a PIT request.
*/
private volatile int maxSlicesPerPit;
/**
* The maximum length of regex string allowed in a regexp query.
*/
@@ -736,6 +750,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
maxShingleDiff = scopedSettings.get(MAX_SHINGLE_DIFF_SETTING);
maxRefreshListeners = scopedSettings.get(MAX_REFRESH_LISTENERS_PER_SHARD);
maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL);
maxSlicesPerPit = scopedSettings.get(MAX_SLICES_PER_PIT);
maxAnalyzedOffset = scopedSettings.get(MAX_ANALYZED_OFFSET_SETTING);
maxTermsCount = scopedSettings.get(MAX_TERMS_COUNT_SETTING);
maxRegexLength = scopedSettings.get(MAX_REGEX_LENGTH_SETTING);
@@ -809,6 +824,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(MAX_ANALYZED_OFFSET_SETTING, this::setHighlightMaxAnalyzedOffset);
scopedSettings.addSettingsUpdateConsumer(MAX_TERMS_COUNT_SETTING, this::setMaxTermsCount);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_PIT, this::setMaxSlicesPerPit);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_FIELD_SETTING, this::setDefaultFields);
scopedSettings.addSettingsUpdateConsumer(INDEX_SEARCH_IDLE_AFTER, this::setSearchIdleAfter);
scopedSettings.addSettingsUpdateConsumer(MAX_REGEX_LENGTH_SETTING, this::setMaxRegexLength);
@@ -1281,6 +1297,17 @@ private void setMaxSlicesPerScroll(int value) {
this.maxSlicesPerScroll = value;
}

/**
* The maximum number of slices allowed in a PIT request.
*/
public int getMaxSlicesPerPit() {
return maxSlicesPerPit;
}

private void setMaxSlicesPerPit(int value) {
this.maxSlicesPerPit = value;
}

/**
* The maximum length of regex string allowed in a regexp query.
*/
Original file line number Diff line number Diff line change
@@ -131,6 +131,19 @@ default void onFreeScrollContext(ReaderContext readerContext) {}
*/
default void validateReaderContext(ReaderContext readerContext, TransportRequest transportRequest) {}

/**
* Executed when a new Point-In-Time {@link ReaderContext} was created
* @param readerContext the created reader context
*/
default void onNewPitContext(ReaderContext readerContext) {}

/**
* Executed when a Point-In-Time search {@link SearchContext} is freed.
* This happens on deletion of a Point-In-Time or on it's keep-alive is expiring.
* @param readerContext the freed search context
*/
default void onFreePitContext(ReaderContext readerContext) {}

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
@@ -265,5 +278,36 @@ public void validateReaderContext(ReaderContext readerContext, TransportRequest
}
ExceptionsHelper.reThrowIfNotNull(exception);
}

/**
* Executed when a new Point-In-Time {@link ReaderContext} was created
* @param readerContext the created reader context
*/
@Override
public void onNewPitContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewPitContext(readerContext);
} catch (Exception e) {
logger.warn("onNewPitContext listener failed", e);
}
}
}

/**
* Executed when a Point-In-Time search {@link SearchContext} is freed.
* This happens on deletion of a Point-In-Time or on it's keep-alive is expiring.
* @param readerContext the freed search context
*/
@Override
public void onFreePitContext(ReaderContext readerContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreePitContext(readerContext);
} catch (Exception e) {
logger.warn("onFreePitContext listener failed", e);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.cache.bitset.BitsetFilterCache;
@@ -75,6 +76,7 @@
import org.opensearch.search.fetch.subphase.ScriptFieldsContext;
import org.opensearch.search.fetch.subphase.highlight.SearchHighlightContext;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.PitReaderContext;
import org.opensearch.search.internal.ReaderContext;
import org.opensearch.search.internal.ScrollContext;
import org.opensearch.search.internal.SearchContext;
@@ -287,7 +289,7 @@ public void preProcess(boolean rewrite) {
}
}

if (sliceBuilder != null) {
if (sliceBuilder != null && scrollContext() != null) {
int sliceLimit = indexService.getIndexSettings().getMaxSlicesPerScroll();
int numSlices = sliceBuilder.getMax();
if (numSlices > sliceLimit) {
@@ -304,6 +306,22 @@ public void preProcess(boolean rewrite) {
}
}

if (sliceBuilder != null && readerContext != null && readerContext instanceof PitReaderContext) {
int sliceLimit = indexService.getIndexSettings().getMaxSlicesPerPit();
int numSlices = sliceBuilder.getMax();
if (numSlices > sliceLimit) {
throw new OpenSearchRejectedExecutionException(
"The number of slices ["
+ numSlices
+ "] is too large. It must "
+ "be less than ["
+ sliceLimit
+ "]. This limit can be set by changing the ["
+ IndexSettings.MAX_SLICES_PER_PIT.getKey()
+ "] index level setting."
);
}
}
// initialize the filtering alias based on the provided filters
try {
final QueryBuilder queryBuilder = request.getAliasFilter().getQueryBuilder();
160 changes: 153 additions & 7 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
@@ -44,6 +44,8 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.search.UpdatePitContextRequest;
import org.opensearch.action.search.UpdatePitContextResponse;
import org.opensearch.action.support.TransportActions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
@@ -111,6 +113,7 @@
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.search.internal.InternalScrollSearchRequest;
import org.opensearch.search.internal.LegacyReaderContext;
import org.opensearch.search.internal.PitReaderContext;
import org.opensearch.search.internal.ReaderContext;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.internal.ShardSearchContextId;
@@ -166,6 +169,15 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope,
Property.Dynamic
);
/**
* This setting will help validate the max keep alive that can be set during creation or extension for a PIT reader context
*/
public static final Setting<TimeValue> MAX_PIT_KEEPALIVE_SETTING = Setting.positiveTimeSetting(
"point_in_time.max_keep_alive",
timeValueHours(24),
Property.NodeScope,
Property.Dynamic
);
public static final Setting<TimeValue> MAX_KEEPALIVE_SETTING = Setting.positiveTimeSetting(
"search.max_keep_alive",
timeValueHours(24),
@@ -218,6 +230,19 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

/**
* This setting defines the maximum number of active PIT reader contexts in the node , since each PIT context
* has a resource cost attached to it. This setting is less than scroll since users are
* encouraged to share the PIT details.
*/
public static final Setting<Integer> MAX_OPEN_PIT_CONTEXT = Setting.intSetting(
"search.max_open_pit_context",
300,
0,
Property.Dynamic,
Property.NodeScope
);

public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;

@@ -243,6 +268,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private volatile long maxKeepAlive;

private volatile long maxPitKeepAlive;

private volatile TimeValue defaultSearchTimeout;

private volatile boolean defaultAllowPartialSearchResults;
@@ -251,6 +278,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private volatile int maxOpenScrollContext;

private volatile int maxOpenPitContext;

private final Cancellable keepAliveReaper;

private final AtomicLong idGenerator = new AtomicLong();
@@ -260,6 +289,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
private final MultiBucketConsumerService multiBucketConsumerService;

private final AtomicInteger openScrollContexts = new AtomicInteger();
private final AtomicInteger openPitContexts = new AtomicInteger();
private final String sessionId = UUIDs.randomBase64UUID();
private final Executor indexSearcherExecutor;

@@ -293,6 +323,14 @@ public SearchService(

TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
setPitKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_PIT_KEEPALIVE_SETTING.get(settings));
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(
DEFAULT_KEEPALIVE_SETTING,
MAX_PIT_KEEPALIVE_SETTING,
this::setPitKeepAlives,
this::validatePitKeepAlives
);

clusterService.getClusterSettings()
.addSettingsUpdateConsumer(DEFAULT_KEEPALIVE_SETTING, MAX_KEEPALIVE_SETTING, this::setKeepAlives, this::validateKeepAlives);
@@ -309,6 +347,9 @@ public SearchService(
maxOpenScrollContext = MAX_OPEN_SCROLL_CONTEXT.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_SCROLL_CONTEXT, this::setMaxOpenScrollContext);

maxOpenPitContext = MAX_OPEN_PIT_CONTEXT.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_OPEN_PIT_CONTEXT, this::setMaxOpenPitContext);

lowLevelCancellation = LOW_LEVEL_CANCELLATION_SETTING.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(LOW_LEVEL_CANCELLATION_SETTING, this::setLowLevelCancellation);
}
@@ -331,12 +372,38 @@ private void validateKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAli
}
}

/**
* Default keep alive search setting should be less than max PIT keep alive
*/
private void validatePitKeepAlives(TimeValue defaultKeepAlive, TimeValue maxPitKeepAlive) {
if (defaultKeepAlive.millis() > maxPitKeepAlive.millis()) {
throw new IllegalArgumentException(
"Default keep alive setting for request ["
+ DEFAULT_KEEPALIVE_SETTING.getKey()
+ "]"
+ " should be smaller than max keep alive for PIT ["
+ MAX_PIT_KEEPALIVE_SETTING.getKey()
+ "], "
+ "was ("
+ defaultKeepAlive
+ " > "
+ maxPitKeepAlive
+ ")"
);
}
}

private void setKeepAlives(TimeValue defaultKeepAlive, TimeValue maxKeepAlive) {
validateKeepAlives(defaultKeepAlive, maxKeepAlive);
this.defaultKeepAlive = defaultKeepAlive.millis();
this.maxKeepAlive = maxKeepAlive.millis();
}

private void setPitKeepAlives(TimeValue defaultKeepAlive, TimeValue maxPitKeepAlive) {
validatePitKeepAlives(defaultKeepAlive, maxPitKeepAlive);
this.maxPitKeepAlive = maxPitKeepAlive.millis();
}

private void setDefaultSearchTimeout(TimeValue defaultSearchTimeout) {
this.defaultSearchTimeout = defaultSearchTimeout;
}
@@ -353,6 +420,10 @@ private void setMaxOpenScrollContext(int maxOpenScrollContext) {
this.maxOpenScrollContext = maxOpenScrollContext;
}

private void setMaxOpenPitContext(int maxOpenPitContext) {
this.maxOpenPitContext = maxOpenPitContext;
}

private void setLowLevelCancellation(Boolean lowLevelCancellation) {
this.lowLevelCancellation = lowLevelCancellation;
}
@@ -793,22 +864,40 @@ final ReaderContext createAndPutReaderContext(
* Opens the reader context for given shardId. The newly opened reader context will be keep
* until the {@code keepAlive} elapsed unless it is manually released.
*/
public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListener<ShardSearchContextId> listener) {
checkKeepAliveLimit(keepAlive.millis());
public void createPitReaderContext(ShardId shardId, TimeValue keepAlive, ActionListener<ShardSearchContextId> listener) {
checkPitKeepAliveLimit(keepAlive.millis());
final IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
final IndexShard shard = indexService.getShard(shardId.id());
final SearchOperationListener searchOperationListener = shard.getSearchOperationListener();
shard.awaitShardSearchActive(ignored -> {
Engine.SearcherSupplier searcherSupplier = null;
ReaderContext readerContext = null;
try {
if (openPitContexts.incrementAndGet() > maxOpenPitContext) {
throw new OpenSearchRejectedExecutionException(
"Trying to create too many Point In Time contexts. Must be less than or equal to: ["
+ maxOpenPitContext
+ "]. "
+ "This limit can be set by changing the ["
+ MAX_OPEN_PIT_CONTEXT.getKey()
+ "] setting."
);
}
searcherSupplier = shard.acquireSearcherSupplier();
final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet());
readerContext = new ReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false);
readerContext = new PitReaderContext(id, indexService, shard, searcherSupplier, keepAlive.millis(), false);
final ReaderContext finalReaderContext = readerContext;
searcherSupplier = null; // transfer ownership to reader context

searchOperationListener.onNewReaderContext(readerContext);
readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(finalReaderContext));
searchOperationListener.onNewPitContext(finalReaderContext);

readerContext.addOnClose(() -> {
openPitContexts.decrementAndGet();
searchOperationListener.onFreeReaderContext(finalReaderContext);
searchOperationListener.onFreePitContext(finalReaderContext);
});
// add the newly created pit reader context to active readers
putReaderContext(readerContext);
readerContext = null;
listener.onResponse(finalReaderContext.id());
@@ -819,6 +908,40 @@ public void openReaderContext(ShardId shardId, TimeValue keepAlive, ActionListen
});
}

/**
* Update PIT reader with pit id, keep alive and created time etc
*/
public void updatePitIdAndKeepAlive(UpdatePitContextRequest request, ActionListener<UpdatePitContextResponse> listener) {
checkPitKeepAliveLimit(request.getKeepAlive());
PitReaderContext readerContext = getPitReaderContext(request.getSearchContextId());
if (readerContext == null) {
throw new SearchContextMissingException(request.getSearchContextId());
}
Releasable updatePit = null;
try {
updatePit = readerContext.updatePitIdAndKeepAlive(request.getKeepAlive(), request.getPitId(), request.getCreationTime());
listener.onResponse(new UpdatePitContextResponse(request.getPitId(), request.getCreationTime(), request.getKeepAlive()));
} catch (Exception e) {
freeReaderContext(readerContext.id());
listener.onFailure(e);
} finally {
if (updatePit != null) {
updatePit.close();
}
}
}

/**
* Returns pit reader context based on ID
*/
public PitReaderContext getPitReaderContext(ShardSearchContextId id) {
ReaderContext context = activeReaders.get(id.getId());
if (context instanceof PitReaderContext) {
return (PitReaderContext) context;
}
return null;
}

final SearchContext createContext(
ReaderContext readerContext,
ShardSearchRequest request,
@@ -944,7 +1067,11 @@ private long getKeepAlive(ShardSearchRequest request) {
if (request.scroll() != null) {
return getScrollKeepAlive(request.scroll());
} else if (request.keepAlive() != null) {
checkKeepAliveLimit(request.keepAlive().millis());
if (getReaderContext(request.readerId()) instanceof PitReaderContext) {
checkPitKeepAliveLimit(request.keepAlive().millis());
} else {
checkKeepAliveLimit(request.keepAlive().millis());
}
return request.keepAlive().getMillis();
} else {
return request.readerId() == null ? defaultKeepAlive : -1;
@@ -975,6 +1102,25 @@ private void checkKeepAliveLimit(long keepAlive) {
}
}

/**
* check if request keep alive is greater than max keep alive
*/
private void checkPitKeepAliveLimit(long keepAlive) {
if (keepAlive > maxPitKeepAlive) {
throw new IllegalArgumentException(
"Keep alive for request ("
+ TimeValue.timeValueMillis(keepAlive)
+ ") is too large. "
+ "It must be less than ("
+ TimeValue.timeValueMillis(maxPitKeepAlive)
+ "). "
+ "This limit can be set by changing the ["
+ MAX_PIT_KEEPALIVE_SETTING.getKey()
+ "] cluster level setting."
);
}
}

private <T> ActionListener<T> wrapFailureListener(ActionListener<T> listener, ReaderContext context, Releasable releasable) {
return new ActionListener<T>() {
@Override
@@ -1165,8 +1311,8 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
}

if (source.slice() != null) {
if (context.scrollContext() == null) {
throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context");
if (context.scrollContext() == null && !(context.readerContext() instanceof PitReaderContext)) {
throw new SearchException(shardTarget, "`slice` cannot be used outside of a scroll context or PIT context");
}
context.sliceBuilder(source.slice());
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.internal;

import org.apache.lucene.util.SetOnce;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.index.IndexService;
import org.opensearch.index.engine.Engine;
import org.opensearch.index.shard.IndexShard;

/**
* PIT reader context containing PIT specific information such as pit id, create time etc.
*/
public class PitReaderContext extends ReaderContext {

// Storing the encoded PIT ID as part of PIT reader context for use cases such as list pit API
private final SetOnce<String> pitId = new SetOnce<>();
// Creation time of PIT contexts which helps users to differentiate between multiple PIT reader contexts
private final SetOnce<Long> creationTime = new SetOnce<>();

public PitReaderContext(
ShardSearchContextId id,
IndexService indexService,
IndexShard indexShard,
Engine.SearcherSupplier searcherSupplier,
long keepAliveInMillis,
boolean singleSession
) {
super(id, indexService, indexShard, searcherSupplier, keepAliveInMillis, singleSession);
}

public String getPitId() {
return this.pitId.get();
}

public void setPitId(final String pitId) {
this.pitId.set(pitId);
}

/**
* Returns a releasable to indicate that the caller has stopped using this reader.
* The pit id can be updated and time to live of the reader usage can be extended using the provided
* <code>keepAliveInMillis</code>.
*/
public Releasable updatePitIdAndKeepAlive(long keepAliveInMillis, String pitId, long createTime) {
getRefCounted().incRef();
tryUpdateKeepAlive(keepAliveInMillis);
setPitId(pitId);
setCreationTime(createTime);
return Releasables.releaseOnce(() -> {
updateLastAccessTime();
getRefCounted().decRef();
});
}

public long getCreationTime() {
return this.creationTime.get();
}

public void setCreationTime(final long creationTime) {
this.creationTime.set(creationTime);
}
}
Original file line number Diff line number Diff line change
@@ -105,7 +105,15 @@ public void validate(TransportRequest request) {
indexShard.getSearchOperationListener().validateReaderContext(this, request);
}

private long nowInMillis() {
protected AbstractRefCounted getRefCounted() {
return refCounted;
}

protected void updateLastAccessTime() {
this.lastAccessTime.updateAndGet(curr -> Math.max(curr, nowInMillis()));
}

protected long nowInMillis() {
return indexShard.getThreadPool().relativeTimeInMillis();
}

@@ -140,7 +148,10 @@ public Engine.Searcher acquireSearcher(String source) {
return searcherSupplier.acquireSearcher(source);
}

private void tryUpdateKeepAlive(long keepAlive) {
/**
* Update keep alive if it is greater than current keep alive
*/
public void tryUpdateKeepAlive(long keepAlive) {
this.keepAlive.updateAndGet(curr -> Math.max(curr, keepAlive));
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.Version;
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.index.query.IdsQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.index.shard.ShardId;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.search.internal.ShardSearchContextId;

import java.util.HashMap;
import java.util.Map;

import static org.opensearch.test.OpenSearchTestCase.between;
import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength;
import static org.opensearch.test.OpenSearchTestCase.randomBoolean;

/**
* Helper class for common pit tests functions
*/
public class PitTestsUtil {
private PitTestsUtil() {}

public static QueryBuilder randomQueryBuilder() {
if (randomBoolean()) {
return new TermQueryBuilder(randomAlphaOfLength(10), randomAlphaOfLength(10));
} else if (randomBoolean()) {
return new MatchAllQueryBuilder();
} else {
return new IdsQueryBuilder().addIds(randomAlphaOfLength(10));
}
}

public static String getPitId() {
AtomicArray<SearchPhaseResult> array = new AtomicArray<>(3);
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult(
new ShardSearchContextId("a", 1),
null
);
testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult(
new ShardSearchContextId("b", 12),
null
);
testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null));
SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult(
new ShardSearchContextId("c", 42),
null
);
testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null));
array.setOnce(0, testSearchPhaseResult1);
array.setOnce(1, testSearchPhaseResult2);
array.setOnce(2, testSearchPhaseResult3);

final Version version = Version.CURRENT;
final Map<String, AliasFilter> aliasFilters = new HashMap<>();
for (SearchPhaseResult result : array.asList()) {
final AliasFilter aliasFilter;
if (randomBoolean()) {
aliasFilter = new AliasFilter(randomQueryBuilder());
} else if (randomBoolean()) {
aliasFilter = new AliasFilter(randomQueryBuilder(), "alias-" + between(1, 10));
} else {
aliasFilter = AliasFilter.EMPTY;
}
if (randomBoolean()) {
aliasFilters.put(result.getSearchShardTarget().getShardId().getIndex().getUUID(), aliasFilter);
}
}
return SearchContextId.encode(array.asList(), aliasFilters, version);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.junit.After;
import org.junit.Before;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.concurrent.ExecutionException;

import static org.hamcrest.Matchers.containsString;
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* Multi node integration tests for PIT creation and search operation with PIT ID.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2)
public class CreatePitMultiNodeTests extends OpenSearchIntegTestCase {

@Before
public void setupIndex() throws ExecutionException, InterruptedException {
createIndex("index", Settings.builder().put("index.number_of_shards", 2).put("index.number_of_replicas", 0).build());
client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).execute().get();
ensureGreen();
}

@After
public void clearIndex() {
client().admin().indices().prepareDelete("index").get();
}

public void testPit() throws Exception {
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true);
request.setIndices(new String[] { "index" });
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
SearchResponse searchResponse = client().prepareSearch("index")
.setSize(2)
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.get();
assertEquals(2, searchResponse.getSuccessfulShards());
assertEquals(2, searchResponse.getTotalShards());
}

public void testCreatePitWhileNodeDropWithAllowPartialCreationFalse() throws Exception {
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), false);
request.setIndices(new String[] { "index" });
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
ExecutionException ex = expectThrows(ExecutionException.class, execute::get);
assertTrue(ex.getMessage().contains("Failed to execute phase [create_pit]"));
assertTrue(ex.getMessage().contains("Partial shards failure"));
return super.onNodeStopped(nodeName);
}
});
}

public void testCreatePitWhileNodeDropWithAllowPartialCreationTrue() throws Exception {
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true);
request.setIndices(new String[] { "index" });
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
assertEquals(1, pitResponse.getSuccessfulShards());
assertEquals(2, pitResponse.getTotalShards());
SearchResponse searchResponse = client().prepareSearch("index")
.setSize(2)
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.get();
assertEquals(1, searchResponse.getSuccessfulShards());
assertEquals(1, searchResponse.getTotalShards());
return super.onNodeStopped(nodeName);
}
});
}

public void testPitSearchWithNodeDrop() throws Exception {
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true);
request.setIndices(new String[] { "index" });
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
SearchResponse searchResponse = client().prepareSearch()
.setSize(2)
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.get();
assertEquals(1, searchResponse.getSuccessfulShards());
assertEquals(1, searchResponse.getFailedShards());
assertEquals(0, searchResponse.getSkippedShards());
assertEquals(2, searchResponse.getTotalShards());
return super.onNodeStopped(nodeName);
}
});
}

public void testPitSearchWithNodeDropWithPartialSearchResultsFalse() throws Exception {
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true);
request.setIndices(new String[] { "index" });
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
@Override
public Settings onNodeStopped(String nodeName) throws Exception {
ActionFuture<SearchResponse> execute = client().prepareSearch()
.setSize(2)
.setPointInTime(new PointInTimeBuilder(pitResponse.getId()).setKeepAlive(TimeValue.timeValueDays(1)))
.setAllowPartialSearchResults(false)
.execute();
ExecutionException ex = expectThrows(ExecutionException.class, execute::get);
assertTrue(ex.getMessage().contains("Partial shards failure"));
return super.onNodeStopped(nodeName);
}
});
}

public void testPitInvalidDefaultKeepAlive() {
IllegalArgumentException exc = expectThrows(
IllegalArgumentException.class,
() -> client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("point_in_time.max_keep_alive", "1m").put("search.default_keep_alive", "2m"))
.get()
);
assertThat(exc.getMessage(), containsString("was (2m > 1m)"));
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "5m").put("point_in_time.max_keep_alive", "5m"))
.get()
);
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "2m"))
.get()
);
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("point_in_time.max_keep_alive", "2m"))
.get()
);
exc = expectThrows(
IllegalArgumentException.class,
() -> client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "3m"))
.get()
);
assertThat(exc.getMessage(), containsString("was (3m > 2m)"));
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("search.default_keep_alive", "1m"))
.get()
);
exc = expectThrows(
IllegalArgumentException.class,
() -> client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put("point_in_time.max_keep_alive", "30s"))
.get()
);
assertThat(exc.getMessage(), containsString("was (1m > 30s)"));
assertAcked(
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().putNull("*"))
.setTransientSettings(Settings.builder().putNull("*"))
);
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -52,6 +52,7 @@
import org.opensearch.common.util.BigArrays;
import org.opensearch.common.util.MockBigArrays;
import org.opensearch.common.util.MockPageCacheRecycler;
import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.cache.IndexCache;
@@ -67,6 +68,7 @@
import org.opensearch.indices.breaker.NoneCircuitBreakerService;
import org.opensearch.search.internal.AliasFilter;
import org.opensearch.search.internal.LegacyReaderContext;
import org.opensearch.search.internal.PitReaderContext;
import org.opensearch.search.internal.ReaderContext;
import org.opensearch.search.internal.ShardSearchContextId;
import org.opensearch.search.internal.ShardSearchRequest;
@@ -134,10 +136,12 @@ public void testPreProcess() throws Exception {
int maxResultWindow = randomIntBetween(50, 100);
int maxRescoreWindow = randomIntBetween(50, 100);
int maxSlicesPerScroll = randomIntBetween(50, 100);
int maxSlicesPerPit = randomIntBetween(50, 100);
Settings settings = Settings.builder()
.put("index.max_result_window", maxResultWindow)
.put("index.max_slices_per_scroll", maxSlicesPerScroll)
.put("index.max_rescore_window", maxRescoreWindow)
.put("index.max_slices_per_pit", maxSlicesPerPit)
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 2)
@@ -300,13 +304,13 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
);

readerContext.close();
readerContext = new ReaderContext(
readerContext = new LegacyReaderContext(
newContextId(),
indexService,
indexShard,
searcherSupplier.get(),
randomNonNegativeLong(),
false
shardSearchRequest,
randomNonNegativeLong()
);
// rescore is null but sliceBuilder is not null
DefaultSearchContext context2 = new DefaultSearchContext(
@@ -404,6 +408,52 @@ protected Engine.Searcher acquireSearcherInternal(String source) {
assertTrue(query1 instanceof MatchNoDocsQuery || query2 instanceof MatchNoDocsQuery);

readerContext.close();

ReaderContext pitReaderContext = new PitReaderContext(
newContextId(),
indexService,
indexShard,
searcherSupplier.get(),
1000,
true
);
DefaultSearchContext context5 = new DefaultSearchContext(
pitReaderContext,
shardSearchRequest,
target,
null,
bigArrays,
null,
timeout,
null,
false,
Version.CURRENT,
false,
executor
);
int numSlicesForPit = maxSlicesPerPit + randomIntBetween(1, 100);
when(sliceBuilder.getMax()).thenReturn(numSlicesForPit);
context5.sliceBuilder(sliceBuilder);

OpenSearchRejectedExecutionException exception1 = expectThrows(
OpenSearchRejectedExecutionException.class,
() -> context5.preProcess(false)
);
assertThat(
exception1.getMessage(),
equalTo(
"The number of slices ["
+ numSlicesForPit
+ "] is too large. It must "
+ "be less than ["
+ maxSlicesPerPit
+ "]. This limit can be set by changing the ["
+ IndexSettings.MAX_SLICES_PER_PIT.getKey()
+ "] index level setting."
)
);
pitReaderContext.close();

threadPool.shutdown();
}
}
Original file line number Diff line number Diff line change
@@ -46,6 +46,8 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.search.UpdatePitContextRequest;
import org.opensearch.action.search.UpdatePitContextResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.WriteRequest;
@@ -1406,7 +1408,7 @@ public void testOpenReaderContext() {
createIndex("index");
SearchService searchService = getInstanceFromNode(SearchService.class);
PlainActionFuture<ShardSearchContextId> future = new PlainActionFuture<>();
searchService.openReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future);
searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future);
future.actionGet();
assertThat(searchService.getActiveContexts(), equalTo(1));
assertTrue(searchService.freeReaderContext(future.actionGet()));
@@ -1422,4 +1424,100 @@ private ReaderContext createReaderContext(IndexService indexService, IndexShard
false
);
}

public void testPitContextMaxKeepAlive() {
createIndex("index");
SearchService searchService = getInstanceFromNode(SearchService.class);
PlainActionFuture<ShardSearchContextId> future = new PlainActionFuture<>();

IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> {
searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueHours(25), future);
future.actionGet();
});
assertEquals(
"Keep alive for request (1d) is too large. "
+ "It must be less than ("
+ SearchService.MAX_PIT_KEEPALIVE_SETTING.get(Settings.EMPTY)
+ "). "
+ "This limit can be set by changing the ["
+ SearchService.MAX_PIT_KEEPALIVE_SETTING.getKey()
+ "] cluster level setting.",
ex.getMessage()
);
assertThat(searchService.getActiveContexts(), equalTo(0));
}

public void testUpdatePitId() {
createIndex("index");
SearchService searchService = getInstanceFromNode(SearchService.class);
PlainActionFuture<ShardSearchContextId> future = new PlainActionFuture<>();
searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future);
ShardSearchContextId id = future.actionGet();
PlainActionFuture<UpdatePitContextResponse> updateFuture = new PlainActionFuture<>();
UpdatePitContextRequest updateRequest = new UpdatePitContextRequest(
id,
"pitId",
TimeValue.timeValueMinutes(between(1, 10)).millis(),
System.currentTimeMillis()
);
searchService.updatePitIdAndKeepAlive(updateRequest, updateFuture);
UpdatePitContextResponse updateResponse = updateFuture.actionGet();
assertTrue(updateResponse.getPitId().equalsIgnoreCase("pitId"));
assertTrue(updateResponse.getCreationTime() == updateRequest.getCreationTime());
assertTrue(updateResponse.getKeepAlive() == updateRequest.getKeepAlive());
assertTrue(updateResponse.getPitId().equalsIgnoreCase("pitId"));
assertThat(searchService.getActiveContexts(), equalTo(1));
assertTrue(searchService.freeReaderContext(future.actionGet()));
}

public void testUpdatePitIdMaxKeepAlive() {
createIndex("index");
SearchService searchService = getInstanceFromNode(SearchService.class);
PlainActionFuture<ShardSearchContextId> future = new PlainActionFuture<>();
searchService.createPitReaderContext(new ShardId(resolveIndex("index"), 0), TimeValue.timeValueMinutes(between(1, 10)), future);
ShardSearchContextId id = future.actionGet();

UpdatePitContextRequest updateRequest = new UpdatePitContextRequest(
id,
"pitId",
TimeValue.timeValueHours(25).millis(),
System.currentTimeMillis()
);
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> {
PlainActionFuture<UpdatePitContextResponse> updateFuture = new PlainActionFuture<>();
searchService.updatePitIdAndKeepAlive(updateRequest, updateFuture);
});

assertEquals(
"Keep alive for request (1d) is too large. "
+ "It must be less than ("
+ SearchService.MAX_PIT_KEEPALIVE_SETTING.get(Settings.EMPTY)
+ "). "
+ "This limit can be set by changing the ["
+ SearchService.MAX_PIT_KEEPALIVE_SETTING.getKey()
+ "] cluster level setting.",
ex.getMessage()
);
assertThat(searchService.getActiveContexts(), equalTo(1));
assertTrue(searchService.freeReaderContext(future.actionGet()));
}

public void testUpdatePitIdWithInvalidReaderId() {
SearchService searchService = getInstanceFromNode(SearchService.class);
ShardSearchContextId id = new ShardSearchContextId("session", 9);

UpdatePitContextRequest updateRequest = new UpdatePitContextRequest(
id,
"pitId",
TimeValue.timeValueHours(23).millis(),
System.currentTimeMillis()
);
SearchContextMissingException ex = expectThrows(SearchContextMissingException.class, () -> {
PlainActionFuture<UpdatePitContextResponse> updateFuture = new PlainActionFuture<>();
searchService.updatePitIdAndKeepAlive(updateRequest, updateFuture);
});

assertEquals("No search context found for id [" + id.getId() + "]", ex.getMessage());
assertThat(searchService.getActiveContexts(), equalTo(0));
}
}