Skip to content

Commit aa76719

Browse files
authored
Expose replicateWorkflowState API in history (#3783)
* Expose replicateWorkflowState API in history
1 parent 04ce8e3 commit aa76719

File tree

11 files changed

+822
-432
lines changed

11 files changed

+822
-432
lines changed

api/historyservice/v1/request_response.pb.go

+566-349
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/historyservice/v1/service.pb.go

+122-81
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/historyservicemock/v1/service.pb.mock.go

+35
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/history/client_gen.go

+24
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/history/metric_client_gen.go

+14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/history/retryable_client_gen.go

+15
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/metrics/metric_defs.go

+2
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,8 @@ const (
495495
HistoryClientVerifyChildExecutionCompletionRecordedScope = "HistoryClientVerifyChildExecutionCompletionRecorded"
496496
// HistoryClientReplicateEventsV2Scope tracks RPC calls to history service
497497
HistoryClientReplicateEventsV2Scope = "HistoryClientReplicateEventsV2"
498+
// HistoryClientReplicateWorkflowStateScope tracks RPC calls to history service
499+
HistoryClientReplicateWorkflowStateScope = "HistoryClientReplicateWorkflowState"
498500
// HistoryClientSyncShardStatusScope tracks RPC calls to history service
499501
HistoryClientSyncShardStatusScope = "HistoryClientSyncShardStatus"
500502
// HistoryClientSyncActivityScope tracks RPC calls to history service

proto/internal/temporal/server/api/historyservice/v1/request_response.proto

+5-1
Original file line numberDiff line numberDiff line change
@@ -382,12 +382,16 @@ message ReplicateEventsV2Request {
382382
temporal.api.common.v1.DataBlob new_run_events = 5;
383383
}
384384

385+
message ReplicateEventsV2Response {
386+
}
387+
385388
message ReplicateWorkflowStateRequest {
386389
temporal.server.api.persistence.v1.WorkflowMutableState workflow_state = 1;
387390
string remote_cluster = 2;
391+
string namespace_id= 3;
388392
}
389393

390-
message ReplicateEventsV2Response {
394+
message ReplicateWorkflowStateResponse {
391395
}
392396

393397
message SyncShardStatusRequest {

proto/internal/temporal/server/api/historyservice/v1/service.proto

+6-1
Original file line numberDiff line numberDiff line change
@@ -192,9 +192,14 @@ service HistoryService {
192192
rpc DescribeWorkflowExecution (DescribeWorkflowExecutionRequest) returns (DescribeWorkflowExecutionResponse) {
193193
}
194194

195+
// ReplicateEventsV2 replicates workflow history events
195196
rpc ReplicateEventsV2 (ReplicateEventsV2Request) returns (ReplicateEventsV2Response) {
196197
}
197198

199+
// ReplicateWorkflowState replicates workflow state
200+
rpc ReplicateWorkflowState(ReplicateWorkflowStateRequest) returns (ReplicateWorkflowStateResponse) {
201+
}
202+
198203
// SyncShardStatus sync the status between shards.
199204
rpc SyncShardStatus (SyncShardStatusRequest) returns (SyncShardStatusResponse) {
200205
}
@@ -275,7 +280,7 @@ service HistoryService {
275280
}
276281

277282
// (-- api-linter: core::0134=disabled
278-
// aip.dev/not-precedent: This service does not follow the update method AIP --)
283+
// aip.dev/not-precedent: This service does not follow the update method API --)
279284
rpc UpdateWorkflow(UpdateWorkflowRequest) returns (UpdateWorkflowResponse) {
280285
}
281286
}

service/history/configs/quotas.go

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ var (
5555
"RemoveSignalMutableState": 0,
5656
"RemoveTask": 0,
5757
"ReplicateEventsV2": 0,
58+
"ReplicateWorkflowState": 0,
5859
"RequestCancelWorkflowExecution": 0,
5960
"ResetStickyTaskQueue": 0,
6061
"ResetWorkflowExecution": 0,

service/history/handler.go

+32
Original file line numberDiff line numberDiff line change
@@ -1301,6 +1301,38 @@ func (h *Handler) ReplicateEventsV2(ctx context.Context, request *historyservice
13011301
return &historyservice.ReplicateEventsV2Response{}, nil
13021302
}
13031303

1304+
// ReplicateWorkflowState is called by processor to replicate workflow state for passive namespaces
1305+
func (h *Handler) ReplicateWorkflowState(
1306+
ctx context.Context,
1307+
request *historyservice.ReplicateWorkflowStateRequest,
1308+
) (_ *historyservice.ReplicateWorkflowStateResponse, retError error) {
1309+
defer log.CapturePanic(h.logger, &retError)
1310+
h.startWG.Wait()
1311+
1312+
if h.isStopped() {
1313+
return nil, errShuttingDown
1314+
}
1315+
1316+
shardContext, err := h.controller.GetShardByNamespaceWorkflow(
1317+
namespace.ID(request.GetWorkflowState().GetExecutionInfo().GetNamespaceId()),
1318+
request.GetWorkflowState().GetExecutionInfo().GetWorkflowId(),
1319+
)
1320+
if err != nil {
1321+
return nil, h.convertError(err)
1322+
}
1323+
1324+
engine, err := shardContext.GetEngine(ctx)
1325+
if err != nil {
1326+
return nil, h.convertError(err)
1327+
}
1328+
1329+
err = engine.ReplicateWorkflowState(ctx, request)
1330+
if err != nil {
1331+
return nil, err
1332+
}
1333+
return &historyservice.ReplicateWorkflowStateResponse{}, nil
1334+
}
1335+
13041336
// SyncShardStatus is called by processor to sync history shard information from another cluster
13051337
func (h *Handler) SyncShardStatus(ctx context.Context, request *historyservice.SyncShardStatusRequest) (_ *historyservice.SyncShardStatusResponse, retError error) {
13061338
defer log.CapturePanic(h.logger, &retError)

0 commit comments

Comments
 (0)