Skip to content

Commit 1a9712c

Browse files
committed
Segment Replication - Implement segment replication event cancellation. (opensearch-project#4225)
* Segment Replication. Fix Cancellation of replication events. This PR updates segment replication paths to correctly cancel replication events on the primary and replica. In the source service, any ongoing event for a primary that is sending to a replica that shuts down or is promoted as a new primary are cancelled. In the target service, any ongoing event for a replica that is promoted as a new primary or is fetching from a primary that shuts down. It wires up SegmentReplicationSourceService as an IndexEventListener so that it can respond to events and cancel any ongoing transfer state. This change also includes some test cleanup for segment replication to rely on actual components over mocks. Signed-off-by: Marc Handalian <handalm@amazon.com> Fix to not start/stop SegmentReplicationSourceService as a lifecycle component with feature flag off. Signed-off-by: Marc Handalian <handalm@amazon.com> Update logic to properly mark SegmentReplicationTarget as cancelled when cancel initiated by primary. Signed-off-by: Marc Handalian <handalm@amazon.com> Minor updates from self review. Signed-off-by: Marc Handalian <handalm@amazon.com> * Add missing changelog entry. Signed-off-by: Marc Handalian <handalm@amazon.com> Signed-off-by: Marc Handalian <handalm@amazon.com> (cherry picked from commit 19d1a2b)
1 parent 03273b5 commit 1a9712c

19 files changed

+739
-196
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
4141
### Fixed
4242
- PR reference to checkout code for changelog verifier ([#4296](https://github.com/opensearch-project/OpenSearch/pull/4296))
4343
- Restore using the class ClusterInfoRequest and ClusterInfoRequestBuilder from package 'org.opensearch.action.support.master.info' for subclasses ([#4324](https://github.com/opensearch-project/OpenSearch/pull/4324))
44+
- Fixed cancellation of segment replication events ([#4225](https://github.com/opensearch-project/OpenSearch/pull/4225))
4445

4546
### Security
4647

server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java

+5
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.opensearch.indices.recovery.PeerRecoveryTargetService;
8282
import org.opensearch.indices.recovery.RecoveryListener;
8383
import org.opensearch.indices.recovery.RecoveryState;
84+
import org.opensearch.indices.replication.SegmentReplicationSourceService;
8485
import org.opensearch.indices.replication.SegmentReplicationTargetService;
8586
import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher;
8687
import org.opensearch.indices.replication.common.ReplicationState;
@@ -152,6 +153,7 @@ public IndicesClusterStateService(
152153
final ThreadPool threadPool,
153154
final PeerRecoveryTargetService recoveryTargetService,
154155
final SegmentReplicationTargetService segmentReplicationTargetService,
156+
final SegmentReplicationSourceService segmentReplicationSourceService,
155157
final ShardStateAction shardStateAction,
156158
final NodeMappingRefreshAction nodeMappingRefreshAction,
157159
final RepositoriesService repositoriesService,
@@ -170,6 +172,7 @@ public IndicesClusterStateService(
170172
threadPool,
171173
checkpointPublisher,
172174
segmentReplicationTargetService,
175+
segmentReplicationSourceService,
173176
recoveryTargetService,
174177
shardStateAction,
175178
nodeMappingRefreshAction,
@@ -191,6 +194,7 @@ public IndicesClusterStateService(
191194
final ThreadPool threadPool,
192195
final SegmentReplicationCheckpointPublisher checkpointPublisher,
193196
final SegmentReplicationTargetService segmentReplicationTargetService,
197+
final SegmentReplicationSourceService segmentReplicationSourceService,
194198
final PeerRecoveryTargetService recoveryTargetService,
195199
final ShardStateAction shardStateAction,
196200
final NodeMappingRefreshAction nodeMappingRefreshAction,
@@ -211,6 +215,7 @@ public IndicesClusterStateService(
211215
// if segrep feature flag is not enabled, don't wire the target serivce as an IndexEventListener.
212216
if (FeatureFlags.isEnabled(FeatureFlags.REPLICATION_TYPE)) {
213217
indexEventListeners.add(segmentReplicationTargetService);
218+
indexEventListeners.add(segmentReplicationSourceService);
214219
}
215220
this.builtInIndexListener = Collections.unmodifiableList(indexEventListeners);
216221
this.indicesService = indicesService;

server/src/main/java/org/opensearch/indices/replication/OngoingSegmentReplications.java

+15-7
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
* @opensearch.internal
3838
*/
3939
class OngoingSegmentReplications {
40-
4140
private final RecoverySettings recoverySettings;
4241
private final IndicesService indicesService;
4342
private final Map<ReplicationCheckpoint, CopyState> copyStateMap;
@@ -161,14 +160,27 @@ synchronized void cancel(IndexShard shard, String reason) {
161160
cancelHandlers(handler -> handler.getCopyState().getShard().shardId().equals(shard.shardId()), reason);
162161
}
163162

163+
/**
164+
* Cancel all Replication events for the given allocation ID, intended to be called when a primary is shutting down.
165+
*
166+
* @param allocationId {@link String} - Allocation ID.
167+
* @param reason {@link String} - Reason for the cancel
168+
*/
169+
synchronized void cancel(String allocationId, String reason) {
170+
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
171+
if (handler != null) {
172+
handler.cancel(reason);
173+
removeCopyState(handler.getCopyState());
174+
}
175+
}
176+
164177
/**
165178
* Cancel any ongoing replications for a given {@link DiscoveryNode}
166179
*
167180
* @param node {@link DiscoveryNode} node for which to cancel replication events.
168181
*/
169182
void cancelReplication(DiscoveryNode node) {
170183
cancelHandlers(handler -> handler.getTargetNode().equals(node), "Node left");
171-
172184
}
173185

174186
/**
@@ -243,11 +255,7 @@ private void cancelHandlers(Predicate<? super SegmentReplicationSourceHandler> p
243255
.map(SegmentReplicationSourceHandler::getAllocationId)
244256
.collect(Collectors.toList());
245257
for (String allocationId : allocationIds) {
246-
final SegmentReplicationSourceHandler handler = allocationIdToHandlers.remove(allocationId);
247-
if (handler != null) {
248-
handler.cancel(reason);
249-
removeCopyState(handler.getCopyState());
250-
}
258+
cancel(allocationId, reason);
251259
}
252260
}
253261
}

server/src/main/java/org/opensearch/indices/replication/PrimaryShardReplicationSource.java

+6
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,10 @@ public void getSegmentFiles(
8787
);
8888
transportClient.executeRetryableAction(GET_SEGMENT_FILES, request, responseListener, reader);
8989
}
90+
91+
@Override
92+
public void cancel() {
93+
transportClient.cancel();
94+
}
95+
9096
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSource.java

+6
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
package org.opensearch.indices.replication;
1010

1111
import org.opensearch.action.ActionListener;
12+
import org.opensearch.common.util.CancellableThreads.ExecutionCancelledException;
1213
import org.opensearch.index.store.Store;
1314
import org.opensearch.index.store.StoreFileMetadata;
1415
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;
@@ -47,4 +48,9 @@ void getSegmentFiles(
4748
Store store,
4849
ActionListener<GetSegmentFilesResponse> listener
4950
);
51+
52+
/**
53+
* Cancel any ongoing requests, should resolve any ongoing listeners with onFailure with a {@link ExecutionCancelledException}.
54+
*/
55+
default void cancel() {}
5056
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceHandler.java

+11
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,16 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
113113
final Closeable releaseResources = () -> IOUtils.close(resources);
114114
try {
115115
timer.start();
116+
cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
117+
final RuntimeException e = new CancellableThreads.ExecutionCancelledException(
118+
"replication was canceled reason [" + reason + "]"
119+
);
120+
if (beforeCancelEx != null) {
121+
e.addSuppressed(beforeCancelEx);
122+
}
123+
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
124+
throw e;
125+
});
116126
final Consumer<Exception> onFailure = e -> {
117127
assert Transports.assertNotTransportThread(SegmentReplicationSourceHandler.this + "[onFailure]");
118128
IOUtils.closeWhileHandlingException(releaseResources, () -> future.onFailure(e));
@@ -153,6 +163,7 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene
153163
final MultiChunkTransfer<StoreFileMetadata, SegmentFileTransferHandler.FileChunk> transfer = segmentFileTransferHandler
154164
.createTransfer(shard.store(), storeFileMetadata, () -> 0, sendFileStep);
155165
resources.add(transfer);
166+
cancellableThreads.checkForCancel();
156167
transfer.start();
157168

158169
sendFileStep.whenComplete(r -> {

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java

+43-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.cluster.ClusterChangedEvent;
1616
import org.opensearch.cluster.ClusterStateListener;
1717
import org.opensearch.cluster.node.DiscoveryNode;
18+
import org.opensearch.cluster.routing.ShardRouting;
1819
import org.opensearch.cluster.service.ClusterService;
1920
import org.opensearch.common.Nullable;
2021
import org.opensearch.common.component.AbstractLifecycleComponent;
@@ -42,7 +43,25 @@
4243
*
4344
* @opensearch.internal
4445
*/
45-
public final class SegmentReplicationSourceService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
46+
public class SegmentReplicationSourceService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
47+
48+
// Empty Implementation, only required while Segment Replication is under feature flag.
49+
public static final SegmentReplicationSourceService NO_OP = new SegmentReplicationSourceService() {
50+
@Override
51+
public void clusterChanged(ClusterChangedEvent event) {
52+
// NoOp;
53+
}
54+
55+
@Override
56+
public void beforeIndexShardClosed(ShardId shardId, IndexShard indexShard, Settings indexSettings) {
57+
// NoOp;
58+
}
59+
60+
@Override
61+
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
62+
// NoOp;
63+
}
64+
};
4665

4766
private static final Logger logger = LogManager.getLogger(SegmentReplicationSourceService.class);
4867
private final RecoverySettings recoverySettings;
@@ -62,6 +81,14 @@ public static class Actions {
6281

6382
private final OngoingSegmentReplications ongoingSegmentReplications;
6483

84+
// Used only for empty implementation.
85+
private SegmentReplicationSourceService() {
86+
recoverySettings = null;
87+
ongoingSegmentReplications = null;
88+
transportService = null;
89+
indicesService = null;
90+
}
91+
6592
public SegmentReplicationSourceService(
6693
IndicesService indicesService,
6794
TransportService transportService,
@@ -163,10 +190,25 @@ protected void doClose() throws IOException {
163190

164191
}
165192

193+
/**
194+
*
195+
* Cancels any replications on this node to a replica shard that is about to be closed.
196+
*/
166197
@Override
167198
public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) {
168199
if (indexShard != null) {
169200
ongoingSegmentReplications.cancel(indexShard, "shard is closed");
170201
}
171202
}
203+
204+
/**
205+
* Cancels any replications on this node to a replica that has been promoted as primary.
206+
*/
207+
@Override
208+
public void shardRoutingChanged(IndexShard indexShard, @Nullable ShardRouting oldRouting, ShardRouting newRouting) {
209+
if (indexShard != null && oldRouting.primary() == false && newRouting.primary()) {
210+
ongoingSegmentReplications.cancel(indexShard.routingEntry().allocationId().getId(), "Relocating primary shard.");
211+
}
212+
}
213+
172214
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationState.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ public enum Stage {
3535
GET_CHECKPOINT_INFO((byte) 3),
3636
FILE_DIFF((byte) 4),
3737
GET_FILES((byte) 5),
38-
FINALIZE_REPLICATION((byte) 6);
38+
FINALIZE_REPLICATION((byte) 6),
39+
CANCELLED((byte) 7);
3940

4041
private static final Stage[] STAGES = new Stage[Stage.values().length];
4142

@@ -118,6 +119,10 @@ protected void validateAndSetStage(Stage expected, Stage next) {
118119
"can't move replication to stage [" + next + "]. current stage: [" + stage + "] (expected [" + expected + "])"
119120
);
120121
}
122+
stopTimersAndSetStage(next);
123+
}
124+
125+
private void stopTimersAndSetStage(Stage next) {
121126
// save the timing data for the current step
122127
stageTimer.stop();
123128
timingData.add(new Tuple<>(stage.name(), stageTimer.time()));
@@ -155,6 +160,14 @@ public void setStage(Stage stage) {
155160
overallTimer.stop();
156161
timingData.add(new Tuple<>("OVERALL", overallTimer.time()));
157162
break;
163+
case CANCELLED:
164+
if (this.stage == Stage.DONE) {
165+
throw new IllegalStateException("can't move replication to Cancelled state from Done.");
166+
}
167+
stopTimersAndSetStage(Stage.CANCELLED);
168+
overallTimer.stop();
169+
timingData.add(new Tuple<>("OVERALL", overallTimer.time()));
170+
break;
158171
default:
159172
throw new IllegalArgumentException("unknown SegmentReplicationState.Stage [" + stage + "]");
160173
}

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java

+29-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import org.apache.lucene.store.ByteBuffersDataInput;
1818
import org.apache.lucene.store.ByteBuffersIndexInput;
1919
import org.apache.lucene.store.ChecksumIndexInput;
20+
import org.opensearch.ExceptionsHelper;
2021
import org.opensearch.OpenSearchException;
2122
import org.opensearch.action.ActionListener;
2223
import org.opensearch.action.StepListener;
@@ -103,7 +104,15 @@ public String description() {
103104

104105
@Override
105106
public void notifyListener(OpenSearchException e, boolean sendShardFailure) {
106-
listener.onFailure(state(), e, sendShardFailure);
107+
// Cancellations still are passed to our SegmentReplicationListner as failures, if we have failed because of cancellation
108+
// update the stage.
109+
final Throwable cancelledException = ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class);
110+
if (cancelledException != null) {
111+
state.setStage(SegmentReplicationState.Stage.CANCELLED);
112+
listener.onFailure(state(), (CancellableThreads.ExecutionCancelledException) cancelledException, sendShardFailure);
113+
} else {
114+
listener.onFailure(state(), e, sendShardFailure);
115+
}
107116
}
108117

109118
@Override
@@ -134,13 +143,22 @@ public void writeFileChunk(
134143
* @param listener {@link ActionListener} listener.
135144
*/
136145
public void startReplication(ActionListener<Void> listener) {
146+
cancellableThreads.setOnCancel((reason, beforeCancelEx) -> {
147+
// This method only executes when cancellation is triggered by this node and caught by a call to checkForCancel,
148+
// SegmentReplicationSource does not share CancellableThreads.
149+
final CancellableThreads.ExecutionCancelledException executionCancelledException =
150+
new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]");
151+
notifyListener(executionCancelledException, false);
152+
throw executionCancelledException;
153+
});
137154
state.setStage(SegmentReplicationState.Stage.REPLICATING);
138155
final StepListener<CheckpointInfoResponse> checkpointInfoListener = new StepListener<>();
139156
final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();
140157
final StepListener<Void> finalizeListener = new StepListener<>();
141158

142159
logger.trace("[shardId {}] Replica starting replication [id {}]", shardId().getId(), getId());
143160
// Get list of files to copy from this checkpoint.
161+
cancellableThreads.checkForCancel();
144162
state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
145163
source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);
146164

@@ -154,6 +172,7 @@ public void startReplication(ActionListener<Void> listener) {
154172

155173
private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSegmentFilesResponse> getFilesListener)
156174
throws IOException {
175+
cancellableThreads.checkForCancel();
157176
state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
158177
final Store.MetadataSnapshot snapshot = checkpointInfo.getSnapshot();
159178
Store.MetadataSnapshot localMetadata = getMetadataSnapshot();
@@ -188,12 +207,14 @@ private void getFiles(CheckpointInfoResponse checkpointInfo, StepListener<GetSeg
188207
}
189208
// always send a req even if not fetching files so the primary can clear the copyState for this shard.
190209
state.setStage(SegmentReplicationState.Stage.GET_FILES);
210+
cancellableThreads.checkForCancel();
191211
source.getSegmentFiles(getId(), checkpointInfo.getCheckpoint(), filesToFetch, store, getFilesListener);
192212
}
193213

194214
private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse, ActionListener<Void> listener) {
195-
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
196215
ActionListener.completeWith(listener, () -> {
216+
cancellableThreads.checkForCancel();
217+
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
197218
multiFileWriter.renameAllTempFiles();
198219
final Store store = store();
199220
store.incRef();
@@ -261,4 +282,10 @@ Store.MetadataSnapshot getMetadataSnapshot() throws IOException {
261282
}
262283
return store.getMetadata(indexShard.getSegmentInfosSnapshot().get());
263284
}
285+
286+
@Override
287+
protected void onCancel(String reason) {
288+
cancellableThreads.cancel(reason);
289+
source.cancel();
290+
}
264291
}

0 commit comments

Comments
 (0)