Skip to content

Commit 9d6255c

Browse files
authoredAug 23, 2022
Add changes to Point in time segments API service layer (#4105)
* pit segments service layer changes Signed-off-by: Bharathwaj G <bharath78910@gmail.com> * Addressing comment Signed-off-by: Bharathwaj G <bharath78910@gmail.com> * Addressing comment Signed-off-by: Bharathwaj G <bharath78910@gmail.com> * Addressing comment Signed-off-by: Bharathwaj G <bharath78910@gmail.com> * addressing review comments Signed-off-by: Bharathwaj G <bharath78910@gmail.com> * addressing comment Signed-off-by: Bharathwaj G <bharath78910@gmail.com> * Addressing comments Signed-off-by: Bharathwaj G <bharath78910@gmail.com> * addressing comments Signed-off-by: Bharathwaj G <bharath78910@gmail.com> * Addressing comments Signed-off-by: Bharathwaj G <bharath78910@gmail.com> * Adding '_all' as option to get all segments Signed-off-by: Bharathwaj G <bharath78910@gmail.com> Signed-off-by: Bharathwaj G <bharath78910@gmail.com>
1 parent 5433056 commit 9d6255c

File tree

13 files changed

+464
-6
lines changed

13 files changed

+464
-6
lines changed
 

‎server/src/main/java/org/opensearch/action/ActionModule.java

+3
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,9 @@
165165
import org.opensearch.action.admin.indices.rollover.RolloverAction;
166166
import org.opensearch.action.admin.indices.rollover.TransportRolloverAction;
167167
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
168+
import org.opensearch.action.admin.indices.segments.PitSegmentsAction;
168169
import org.opensearch.action.admin.indices.segments.TransportIndicesSegmentsAction;
170+
import org.opensearch.action.admin.indices.segments.TransportPitSegmentsAction;
169171
import org.opensearch.action.admin.indices.settings.get.GetSettingsAction;
170172
import org.opensearch.action.admin.indices.settings.get.TransportGetSettingsAction;
171173
import org.opensearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
@@ -671,6 +673,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
671673
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
672674
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);
673675
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);
676+
actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);
674677

675678
// Remote Store
676679
actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.segments;
10+
11+
import org.opensearch.action.ActionType;
12+
13+
/**
14+
* Action for retrieving segment information for PITs
15+
*/
16+
public class PitSegmentsAction extends ActionType<IndicesSegmentResponse> {
17+
18+
public static final PitSegmentsAction INSTANCE = new PitSegmentsAction();
19+
public static final String NAME = "indices:monitor/point_in_time/segments";
20+
21+
private PitSegmentsAction() {
22+
super(NAME, IndicesSegmentResponse::new);
23+
}
24+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.action.admin.indices.segments;
10+
11+
import org.opensearch.action.ActionRequestValidationException;
12+
import org.opensearch.action.support.broadcast.BroadcastRequest;
13+
import org.opensearch.common.Strings;
14+
import org.opensearch.common.io.stream.StreamInput;
15+
import org.opensearch.common.io.stream.StreamOutput;
16+
17+
import java.io.IOException;
18+
import java.util.ArrayList;
19+
import java.util.Arrays;
20+
import java.util.Collections;
21+
import java.util.List;
22+
23+
import static org.opensearch.action.ValidateActions.addValidationError;
24+
25+
/**
26+
* Transport request for retrieving PITs segment information
27+
*/
28+
public class PitSegmentsRequest extends BroadcastRequest<PitSegmentsRequest> {
29+
private boolean verbose = false;
30+
private final List<String> pitIds = new ArrayList<>();
31+
32+
public PitSegmentsRequest() {
33+
this(Strings.EMPTY_ARRAY);
34+
}
35+
36+
public PitSegmentsRequest(StreamInput in) throws IOException {
37+
super(in);
38+
pitIds.addAll(Arrays.asList(in.readStringArray()));
39+
verbose = in.readBoolean();
40+
}
41+
42+
public PitSegmentsRequest(String... pitIds) {
43+
super(pitIds);
44+
this.pitIds.addAll(Arrays.asList(pitIds));
45+
}
46+
47+
/**
48+
* <code>true</code> if detailed information about each segment should be returned,
49+
* <code>false</code> otherwise.
50+
*/
51+
public boolean isVerbose() {
52+
return verbose;
53+
}
54+
55+
/**
56+
* Sets the <code>verbose</code> option.
57+
* @see #isVerbose()
58+
*/
59+
public void setVerbose(boolean v) {
60+
verbose = v;
61+
}
62+
63+
@Override
64+
public void writeTo(StreamOutput out) throws IOException {
65+
super.writeTo(out);
66+
out.writeStringArrayNullable((pitIds == null) ? null : pitIds.toArray(new String[pitIds.size()]));
67+
out.writeBoolean(verbose);
68+
}
69+
70+
public List<String> getPitIds() {
71+
return Collections.unmodifiableList(pitIds);
72+
}
73+
74+
public void clearAndSetPitIds(List<String> pitIds) {
75+
this.pitIds.clear();
76+
this.pitIds.addAll(pitIds);
77+
}
78+
79+
@Override
80+
public ActionRequestValidationException validate() {
81+
ActionRequestValidationException validationException = null;
82+
if (pitIds == null || pitIds.isEmpty()) {
83+
validationException = addValidationError("no pit ids specified", validationException);
84+
}
85+
return validationException;
86+
}
87+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
package org.opensearch.action.admin.indices.segments;
9+
10+
import org.opensearch.action.ActionListener;
11+
import org.opensearch.action.search.ListPitInfo;
12+
import org.opensearch.action.search.PitService;
13+
import org.opensearch.action.search.SearchContextId;
14+
import org.opensearch.action.search.SearchContextIdForNode;
15+
import org.opensearch.action.support.ActionFilters;
16+
import org.opensearch.action.support.DefaultShardOperationFailedException;
17+
import org.opensearch.action.support.broadcast.node.TransportBroadcastByNodeAction;
18+
import org.opensearch.cluster.ClusterState;
19+
import org.opensearch.cluster.block.ClusterBlockException;
20+
import org.opensearch.cluster.block.ClusterBlockLevel;
21+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
22+
import org.opensearch.cluster.routing.AllocationId;
23+
import org.opensearch.cluster.routing.PlainShardsIterator;
24+
import org.opensearch.cluster.routing.RecoverySource;
25+
import org.opensearch.cluster.routing.ShardRouting;
26+
import org.opensearch.cluster.routing.ShardRoutingState;
27+
import org.opensearch.cluster.routing.ShardsIterator;
28+
import org.opensearch.cluster.routing.UnassignedInfo;
29+
import org.opensearch.cluster.service.ClusterService;
30+
import org.opensearch.common.Strings;
31+
import org.opensearch.common.inject.Inject;
32+
import org.opensearch.common.io.stream.NamedWriteableRegistry;
33+
import org.opensearch.common.io.stream.StreamInput;
34+
import org.opensearch.common.io.stream.StreamOutput;
35+
import org.opensearch.common.xcontent.XContentBuilder;
36+
import org.opensearch.index.shard.ShardId;
37+
import org.opensearch.indices.IndicesService;
38+
import org.opensearch.search.SearchService;
39+
import org.opensearch.search.internal.PitReaderContext;
40+
import org.opensearch.tasks.Task;
41+
import org.opensearch.threadpool.ThreadPool;
42+
import org.opensearch.transport.TransportService;
43+
44+
import java.io.IOException;
45+
import java.util.ArrayList;
46+
import java.util.Collections;
47+
import java.util.List;
48+
import java.util.Map;
49+
import java.util.stream.Collectors;
50+
51+
import static org.opensearch.action.search.SearchContextId.decode;
52+
53+
/**
54+
* Transport action for retrieving segment information of PITs
55+
*/
56+
public class TransportPitSegmentsAction extends TransportBroadcastByNodeAction<PitSegmentsRequest, IndicesSegmentResponse, ShardSegments> {
57+
private final ClusterService clusterService;
58+
private final IndicesService indicesService;
59+
private final SearchService searchService;
60+
private final NamedWriteableRegistry namedWriteableRegistry;
61+
private final TransportService transportService;
62+
private final PitService pitService;
63+
64+
@Inject
65+
public TransportPitSegmentsAction(
66+
ClusterService clusterService,
67+
TransportService transportService,
68+
IndicesService indicesService,
69+
ActionFilters actionFilters,
70+
IndexNameExpressionResolver indexNameExpressionResolver,
71+
SearchService searchService,
72+
NamedWriteableRegistry namedWriteableRegistry,
73+
PitService pitService
74+
) {
75+
super(
76+
PitSegmentsAction.NAME,
77+
clusterService,
78+
transportService,
79+
actionFilters,
80+
indexNameExpressionResolver,
81+
PitSegmentsRequest::new,
82+
ThreadPool.Names.MANAGEMENT
83+
);
84+
this.clusterService = clusterService;
85+
this.indicesService = indicesService;
86+
this.searchService = searchService;
87+
this.namedWriteableRegistry = namedWriteableRegistry;
88+
this.transportService = transportService;
89+
this.pitService = pitService;
90+
}
91+
92+
/**
93+
* Execute PIT segments flow for all PITs or request PIT IDs
94+
*/
95+
@Override
96+
protected void doExecute(Task task, PitSegmentsRequest request, ActionListener<IndicesSegmentResponse> listener) {
97+
List<String> pitIds = request.getPitIds();
98+
if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) {
99+
pitService.getAllPits(ActionListener.wrap(response -> {
100+
request.clearAndSetPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList()));
101+
super.doExecute(task, request, listener);
102+
}, listener::onFailure));
103+
} else {
104+
super.doExecute(task, request, listener);
105+
}
106+
}
107+
108+
/**
109+
* This adds list of shards on which we need to retrieve pit segments details
110+
* @param clusterState the cluster state
111+
* @param request the underlying request
112+
* @param concreteIndices the concrete indices on which to execute the operation
113+
*/
114+
@Override
115+
protected ShardsIterator shards(ClusterState clusterState, PitSegmentsRequest request, String[] concreteIndices) {
116+
final ArrayList<ShardRouting> iterators = new ArrayList<>();
117+
for (String pitId : request.getPitIds()) {
118+
SearchContextId searchContext = decode(namedWriteableRegistry, pitId);
119+
for (Map.Entry<ShardId, SearchContextIdForNode> entry : searchContext.shards().entrySet()) {
120+
final SearchContextIdForNode perNode = entry.getValue();
121+
// check if node is part of local cluster
122+
if (Strings.isEmpty(perNode.getClusterAlias())) {
123+
final ShardId shardId = entry.getKey();
124+
iterators.add(
125+
new PitAwareShardRouting(
126+
pitId,
127+
shardId,
128+
perNode.getNode(),
129+
null,
130+
true,
131+
ShardRoutingState.STARTED,
132+
null,
133+
null,
134+
null,
135+
-1L
136+
)
137+
);
138+
}
139+
}
140+
}
141+
return new PlainShardsIterator(iterators);
142+
}
143+
144+
@Override
145+
protected ClusterBlockException checkGlobalBlock(ClusterState state, PitSegmentsRequest request) {
146+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
147+
}
148+
149+
@Override
150+
protected ClusterBlockException checkRequestBlock(ClusterState state, PitSegmentsRequest countRequest, String[] concreteIndices) {
151+
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
152+
}
153+
154+
@Override
155+
protected ShardSegments readShardResult(StreamInput in) throws IOException {
156+
return new ShardSegments(in);
157+
}
158+
159+
@Override
160+
protected IndicesSegmentResponse newResponse(
161+
PitSegmentsRequest request,
162+
int totalShards,
163+
int successfulShards,
164+
int failedShards,
165+
List<ShardSegments> results,
166+
List<DefaultShardOperationFailedException> shardFailures,
167+
ClusterState clusterState
168+
) {
169+
return new IndicesSegmentResponse(
170+
results.toArray(new ShardSegments[results.size()]),
171+
totalShards,
172+
successfulShards,
173+
failedShards,
174+
shardFailures
175+
);
176+
}
177+
178+
@Override
179+
protected PitSegmentsRequest readRequestFrom(StreamInput in) throws IOException {
180+
return new PitSegmentsRequest(in);
181+
}
182+
183+
@Override
184+
public List<ShardRouting> getShardRoutingsFromInputStream(StreamInput in) throws IOException {
185+
return in.readList(PitAwareShardRouting::new);
186+
}
187+
188+
/**
189+
* This retrieves segment details of PIT context
190+
* @param request the node-level request
191+
* @param shardRouting the shard on which to execute the operation
192+
*/
193+
@Override
194+
protected ShardSegments shardOperation(PitSegmentsRequest request, ShardRouting shardRouting) {
195+
assert shardRouting instanceof PitAwareShardRouting;
196+
PitAwareShardRouting pitAwareShardRouting = (PitAwareShardRouting) shardRouting;
197+
SearchContextIdForNode searchContextIdForNode = decode(namedWriteableRegistry, pitAwareShardRouting.getPitId()).shards()
198+
.get(shardRouting.shardId());
199+
PitReaderContext pitReaderContext = searchService.getPitReaderContext(searchContextIdForNode.getSearchContextId());
200+
if (pitReaderContext == null) {
201+
return new ShardSegments(shardRouting, Collections.emptyList());
202+
}
203+
return new ShardSegments(pitReaderContext.getShardRouting(), pitReaderContext.getSegments());
204+
}
205+
206+
/**
207+
* This holds PIT id which is used to perform broadcast operation in PIT shards to retrieve segments information
208+
*/
209+
public class PitAwareShardRouting extends ShardRouting {
210+
211+
private final String pitId;
212+
213+
public PitAwareShardRouting(StreamInput in) throws IOException {
214+
super(in);
215+
this.pitId = in.readString();
216+
}
217+
218+
public PitAwareShardRouting(
219+
String pitId,
220+
ShardId shardId,
221+
String currentNodeId,
222+
String relocatingNodeId,
223+
boolean primary,
224+
ShardRoutingState state,
225+
RecoverySource recoverySource,
226+
UnassignedInfo unassignedInfo,
227+
AllocationId allocationId,
228+
long expectedShardSize
229+
) {
230+
super(
231+
shardId,
232+
currentNodeId,
233+
relocatingNodeId,
234+
primary,
235+
state,
236+
recoverySource,
237+
unassignedInfo,
238+
allocationId,
239+
expectedShardSize
240+
);
241+
this.pitId = pitId;
242+
}
243+
244+
public String getPitId() {
245+
return pitId;
246+
}
247+
248+
@Override
249+
public void writeTo(StreamOutput out) throws IOException {
250+
super.writeTo(out);
251+
out.writeString(pitId);
252+
}
253+
254+
@Override
255+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
256+
super.toXContent(builder, params);
257+
builder.field("pit_id", pitId);
258+
return builder.endObject();
259+
}
260+
}
261+
}

‎server/src/main/java/org/opensearch/action/search/CreatePitResponse.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
* Create point in time response with point in time id and shard success / failures
2929
*/
3030
public class CreatePitResponse extends ActionResponse implements StatusToXContentObject {
31-
private static final ParseField ID = new ParseField("id");
31+
private static final ParseField ID = new ParseField("pit_id");
3232
private static final ParseField CREATION_TIME = new ParseField("creation_time");
3333

3434
// point in time id

‎server/src/main/java/org/opensearch/action/support/broadcast/node/TransportBroadcastByNodeAction.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,13 @@ private void onShardOperation(
532532
}
533533
}
534534

535+
/**
536+
* This method reads ShardRouting from input stream
537+
*/
538+
public List<ShardRouting> getShardRoutingsFromInputStream(StreamInput in) throws IOException {
539+
return in.readList(ShardRouting::new);
540+
}
541+
535542
/**
536543
* A node request
537544
*
@@ -547,7 +554,7 @@ public class NodeRequest extends TransportRequest implements IndicesRequest {
547554
public NodeRequest(StreamInput in) throws IOException {
548555
super(in);
549556
indicesLevelRequest = readRequestFrom(in);
550-
shards = in.readList(ShardRouting::new);
557+
shards = getShardRoutingsFromInputStream(in);
551558
nodeId = in.readString();
552559
}
553560

‎server/src/main/java/org/opensearch/client/Client.java

+7
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434

3535
import org.opensearch.action.ActionFuture;
3636
import org.opensearch.action.ActionListener;
37+
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
38+
import org.opensearch.action.admin.indices.segments.PitSegmentsRequest;
3739
import org.opensearch.action.bulk.BulkRequest;
3840
import org.opensearch.action.bulk.BulkRequestBuilder;
3941
import org.opensearch.action.bulk.BulkResponse;
@@ -339,6 +341,11 @@ public interface Client extends OpenSearchClient, Releasable {
339341
*/
340342
void deletePits(DeletePitRequest deletePITRequest, ActionListener<DeletePitResponse> listener);
341343

344+
/**
345+
* Get information of segments of one or more PITs
346+
*/
347+
void pitSegments(PitSegmentsRequest pitSegmentsRequest, ActionListener<IndicesSegmentResponse> listener);
348+
342349
/**
343350
* Performs multiple search requests.
344351
*/

‎server/src/main/java/org/opensearch/client/support/AbstractClient.java

+7
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@
240240
import org.opensearch.action.admin.indices.segments.IndicesSegmentsAction;
241241
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
242242
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequestBuilder;
243+
import org.opensearch.action.admin.indices.segments.PitSegmentsAction;
244+
import org.opensearch.action.admin.indices.segments.PitSegmentsRequest;
243245
import org.opensearch.action.admin.indices.settings.get.GetSettingsAction;
244246
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequest;
245247
import org.opensearch.action.admin.indices.settings.get.GetSettingsRequestBuilder;
@@ -593,6 +595,11 @@ public void deletePits(final DeletePitRequest deletePITRequest, final ActionList
593595
execute(DeletePitAction.INSTANCE, deletePITRequest, listener);
594596
}
595597

598+
@Override
599+
public void pitSegments(final PitSegmentsRequest request, final ActionListener<IndicesSegmentResponse> listener) {
600+
execute(PitSegmentsAction.INSTANCE, request, listener);
601+
}
602+
596603
@Override
597604
public ActionFuture<MultiSearchResponse> multiSearch(MultiSearchRequest request) {
598605
return execute(MultiSearchAction.INSTANCE, request);

‎server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
*
5555
* @opensearch.internal
5656
*/
57-
public final class ShardRouting implements Writeable, ToXContentObject {
57+
public class ShardRouting implements Writeable, ToXContentObject {
5858

5959
/**
6060
* Used if shard size is not available
@@ -78,7 +78,7 @@ public final class ShardRouting implements Writeable, ToXContentObject {
7878
* A constructor to internally create shard routing instances, note, the internal flag should only be set to true
7979
* by either this class or tests. Visible for testing.
8080
*/
81-
ShardRouting(
81+
protected ShardRouting(
8282
ShardId shardId,
8383
String currentNodeId,
8484
String relocatingNodeId,

‎server/src/main/java/org/opensearch/search/internal/PitReaderContext.java

+24
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,17 @@
99
package org.opensearch.search.internal;
1010

1111
import org.apache.lucene.util.SetOnce;
12+
import org.opensearch.cluster.routing.ShardRouting;
1213
import org.opensearch.common.lease.Releasable;
1314
import org.opensearch.common.lease.Releasables;
1415
import org.opensearch.index.IndexService;
1516
import org.opensearch.index.engine.Engine;
17+
import org.opensearch.index.engine.Segment;
1618
import org.opensearch.index.shard.IndexShard;
1719

20+
import java.util.Collections;
21+
import java.util.List;
22+
1823
/**
1924
* PIT reader context containing PIT specific information such as pit id, create time etc.
2025
*/
@@ -24,6 +29,15 @@ public class PitReaderContext extends ReaderContext {
2429
private final SetOnce<String> pitId = new SetOnce<>();
2530
// Creation time of PIT contexts which helps users to differentiate between multiple PIT reader contexts
2631
private final SetOnce<Long> creationTime = new SetOnce<>();
32+
/**
33+
* Shard routing at the time of creation of PIT Reader Context
34+
*/
35+
private final ShardRouting shardRouting;
36+
37+
/**
38+
* Encapsulates segments constituting the shard at the time of creation of PIT Reader Context.
39+
*/
40+
private final List<Segment> segments;
2741

2842
public PitReaderContext(
2943
ShardSearchContextId id,
@@ -34,6 +48,8 @@ public PitReaderContext(
3448
boolean singleSession
3549
) {
3650
super(id, indexService, indexShard, searcherSupplier, keepAliveInMillis, singleSession);
51+
shardRouting = indexShard.routingEntry();
52+
segments = indexShard.segments(true);
3753
}
3854

3955
public String getPitId() {
@@ -67,4 +83,12 @@ public long getCreationTime() {
6783
public void setCreationTime(final long creationTime) {
6884
this.creationTime.set(creationTime);
6985
}
86+
87+
public ShardRouting getShardRouting() {
88+
return shardRouting;
89+
}
90+
91+
public List<Segment> getSegments() {
92+
return Collections.unmodifiableList(segments);
93+
}
7094
}

‎server/src/test/java/org/opensearch/action/search/PitTestsUtil.java

+22-1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
import org.opensearch.action.ActionFuture;
1515
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
1616
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
17+
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
18+
import org.opensearch.action.admin.indices.segments.PitSegmentsAction;
19+
import org.opensearch.action.admin.indices.segments.PitSegmentsRequest;
1720
import org.opensearch.client.Client;
1821
import org.opensearch.cluster.node.DiscoveryNode;
1922
import org.opensearch.common.util.concurrent.AtomicArray;
@@ -33,6 +36,8 @@
3336
import java.util.Map;
3437
import java.util.concurrent.ExecutionException;
3538

39+
import static org.junit.Assert.assertEquals;
40+
import static org.junit.Assert.assertTrue;
3641
import static org.opensearch.test.OpenSearchTestCase.between;
3742
import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength;
3843
import static org.opensearch.test.OpenSearchTestCase.randomBoolean;
@@ -107,7 +112,7 @@ public static void assertUsingGetAllPits(Client client, String id, long creation
107112
GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr);
108113
ActionFuture<GetAllPitNodesResponse> execute1 = client.execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest);
109114
GetAllPitNodesResponse getPitResponse = execute1.get();
110-
Assert.assertTrue(getPitResponse.getPitInfos().get(0).getPitId().contains(id));
115+
assertTrue(getPitResponse.getPitInfos().get(0).getPitId().contains(id));
111116
Assert.assertEquals(getPitResponse.getPitInfos().get(0).getCreationTime(), creationTime);
112117
}
113118

@@ -128,4 +133,20 @@ public static void assertGetAllPitsEmpty(Client client) throws ExecutionExceptio
128133
GetAllPitNodesResponse getPitResponse = execute1.get();
129134
Assert.assertEquals(0, getPitResponse.getPitInfos().size());
130135
}
136+
137+
public static void assertSegments(boolean isEmpty, String index, long expectedShardSize, Client client) {
138+
PitSegmentsRequest pitSegmentsRequest = new PitSegmentsRequest("_all");
139+
IndicesSegmentResponse indicesSegmentResponse = client.execute(PitSegmentsAction.INSTANCE, pitSegmentsRequest).actionGet();
140+
assertTrue(indicesSegmentResponse.getShardFailures() == null || indicesSegmentResponse.getShardFailures().length == 0);
141+
assertEquals(indicesSegmentResponse.getIndices().isEmpty(), isEmpty);
142+
if (!isEmpty) {
143+
assertTrue(indicesSegmentResponse.getIndices().get(index) != null);
144+
assertTrue(indicesSegmentResponse.getIndices().get(index).getIndex().equalsIgnoreCase(index));
145+
assertEquals(expectedShardSize, indicesSegmentResponse.getIndices().get(index).getShards().size());
146+
}
147+
}
148+
149+
public static void assertSegments(boolean isEmpty, Client client) {
150+
assertSegments(isEmpty, "index", 2, client);
151+
}
131152
}

‎server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.concurrent.ExecutionException;
3434

3535
import static org.hamcrest.CoreMatchers.equalTo;
36+
import static org.opensearch.action.search.PitTestsUtil.assertSegments;
3637
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
3738
import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
3839
import static org.opensearch.index.query.QueryBuilders.matchAllQuery;
@@ -68,6 +69,7 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti
6869
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
6970
CreatePitResponse pitResponse = execute.get();
7071
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
72+
assertSegments(false, client());
7173
client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
7274
SearchResponse searchResponse = client().prepareSearch("index")
7375
.setSize(2)
@@ -80,6 +82,7 @@ public void testCreatePITSuccess() throws ExecutionException, InterruptedExcepti
8082
validatePitStats("index", 1, 0, 0);
8183
validatePitStats("index", 1, 0, 1);
8284
service.doClose(); // this kills the keep-alive reaper we have to reset the node after this test
85+
assertSegments(true, client());
8386
validatePitStats("index", 0, 1, 0);
8487
validatePitStats("index", 0, 1, 1);
8588
}
@@ -96,12 +99,14 @@ public void testCreatePITWithMultipleIndicesSuccess() throws ExecutionException,
9699
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
97100
CreatePitResponse response = execute.get();
98101
PitTestsUtil.assertUsingGetAllPits(client(), response.getId(), response.getCreationTime());
102+
assertSegments(false, client());
99103
assertEquals(4, response.getSuccessfulShards());
100104
assertEquals(4, service.getActiveContexts());
101105

102106
validatePitStats("index", 1, 0, 0);
103107
validatePitStats("index1", 1, 0, 0);
104108
service.doClose();
109+
assertSegments(true, client());
105110
validatePitStats("index", 0, 1, 0);
106111
validatePitStats("index1", 0, 1, 0);
107112
}
@@ -115,6 +120,7 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I
115120
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
116121
CreatePitResponse pitResponse = execute.get();
117122
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
123+
assertSegments(false, client());
118124
client().prepareIndex("index").setId("2").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get();
119125
SearchResponse searchResponse = client().prepareSearch("index")
120126
.setSize(2)
@@ -127,6 +133,7 @@ public void testCreatePITWithShardReplicasSuccess() throws ExecutionException, I
127133
validatePitStats("index", 1, 0, 0);
128134
validatePitStats("index", 1, 0, 1);
129135
service.doClose();
136+
assertSegments(true, client());
130137
validatePitStats("index", 0, 1, 0);
131138
validatePitStats("index", 0, 1, 1);
132139
}
@@ -144,6 +151,7 @@ public void testCreatePITWithNonExistentIndex() {
144151

145152
assertTrue(ex.getMessage().contains("no such index [index1]"));
146153
assertEquals(0, service.getActiveContexts());
154+
assertSegments(true, client());
147155
service.doClose();
148156
}
149157

@@ -164,6 +172,7 @@ public void testCreatePITOnCloseIndex() throws ExecutionException, InterruptedEx
164172
SearchService service = getInstanceFromNode(SearchService.class);
165173
assertEquals(0, service.getActiveContexts());
166174
PitTestsUtil.assertGetAllPitsEmpty(client());
175+
assertSegments(true, client());
167176
service.doClose();
168177
}
169178

@@ -187,6 +196,7 @@ public void testPitSearchOnDeletedIndex() throws ExecutionException, Interrupted
187196
SearchService service = getInstanceFromNode(SearchService.class);
188197
PitTestsUtil.assertGetAllPitsEmpty(client());
189198
assertEquals(0, service.getActiveContexts());
199+
assertSegments(true, client());
190200
service.doClose();
191201
}
192202

@@ -212,6 +222,7 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx
212222
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
213223
CreatePitResponse pitResponse = execute.get();
214224
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
225+
assertSegments(false, client());
215226
SearchService service = getInstanceFromNode(SearchService.class);
216227
assertEquals(2, service.getActiveContexts());
217228
validatePitStats("index", 1, 0, 0);
@@ -227,7 +238,7 @@ public void testPitSearchOnCloseIndex() throws ExecutionException, InterruptedEx
227238
assertTrue(ex.shardFailures()[0].reason().contains("SearchContextMissingException"));
228239
assertEquals(0, service.getActiveContexts());
229240
PitTestsUtil.assertGetAllPitsEmpty(client());
230-
241+
assertSegments(true, client());
231242
// PIT reader contexts are lost after close, verifying it with open index api
232243
client().admin().indices().prepareOpen("index").get();
233244
ex = expectThrows(SearchPhaseExecutionException.class, () -> {
@@ -491,6 +502,7 @@ public void testPitAfterUpdateIndex() throws Exception {
491502
assertEquals(0, service.getActiveContexts());
492503
validatePitStats("test", 0, 1, 0);
493504
PitTestsUtil.assertGetAllPitsEmpty(client());
505+
assertSegments(true, client());
494506
}
495507
}
496508

@@ -503,6 +515,7 @@ public void testConcurrentSearches() throws Exception {
503515
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
504516
CreatePitResponse pitResponse = execute.get();
505517
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
518+
assertSegments(false, client());
506519
Thread[] threads = new Thread[5];
507520
CountDownLatch latch = new CountDownLatch(threads.length);
508521

@@ -538,6 +551,7 @@ public void testConcurrentSearches() throws Exception {
538551
validatePitStats("index", 0, 1, 0);
539552
validatePitStats("index", 0, 1, 1);
540553
PitTestsUtil.assertGetAllPitsEmpty(client());
554+
assertSegments(true, client());
541555
}
542556

543557
public void validatePitStats(String index, long expectedPitCurrent, long expectedPitCount, int shardId) throws ExecutionException,

‎server/src/test/java/org/opensearch/search/PitMultiNodeTests.java

+3
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.stream.Collectors;
5252

5353
import static org.hamcrest.Matchers.containsString;
54+
import static org.opensearch.action.search.PitTestsUtil.assertSegments;
5455
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
5556
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
5657

@@ -85,6 +86,7 @@ public void testPit() throws Exception {
8586
assertEquals(2, searchResponse.getTotalShards());
8687
validatePitStats("index", 2, 2);
8788
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
89+
assertSegments(false, client());
8890
}
8991

9092
public void testCreatePitWhileNodeDropWithAllowPartialCreationFalse() throws Exception {
@@ -112,6 +114,7 @@ public Settings onNodeStopped(String nodeName) throws Exception {
112114
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
113115
CreatePitResponse pitResponse = execute.get();
114116
PitTestsUtil.assertUsingGetAllPits(client(), pitResponse.getId(), pitResponse.getCreationTime());
117+
assertSegments(false, "index", 1, client());
115118
assertEquals(1, pitResponse.getSuccessfulShards());
116119
assertEquals(2, pitResponse.getTotalShards());
117120
SearchResponse searchResponse = client().prepareSearch("index")

0 commit comments

Comments
 (0)
Please sign in to comment.