Skip to content

Commit 5b182a7

Browse files
authored
Capture panic in replication task processing (#3799)
1 parent 6319a28 commit 5b182a7

7 files changed

+62
-26
lines changed

service/history/replication/dlq_handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ func (r *dlqHandlerImpl) MergeMessages(
209209
}
210210

211211
for _, task := range replicationTasks {
212-
if _, err := taskExecutor.Execute(
212+
if err := taskExecutor.Execute(
213213
ctx,
214214
task,
215215
true,

service/history/replication/dlq_handler_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ func (s *dlqHandlerSuite) TestMergeMessages() {
292292
Return(&adminservice.GetDLQReplicationMessagesResponse{
293293
ReplicationTasks: []*replicationspb.ReplicationTask{remoteTask},
294294
}, nil)
295-
s.taskExecutor.EXPECT().Execute(gomock.Any(), remoteTask, true).Return("", nil)
295+
s.taskExecutor.EXPECT().Execute(gomock.Any(), remoteTask, true).Return(nil)
296296
s.executionManager.EXPECT().RangeDeleteReplicationTaskFromDLQ(gomock.Any(), &persistence.RangeDeleteReplicationTaskFromDLQRequest{
297297
RangeCompleteHistoryTasksRequest: persistence.RangeCompleteHistoryTasksRequest{
298298
ShardID: s.mockShard.GetShardID(),

service/history/replication/task_executor.go

+3-10
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ import (
5151

5252
type (
5353
TaskExecutor interface {
54-
Execute(ctx context.Context, replicationTask *replicationspb.ReplicationTask, forceApply bool) (string, error)
54+
Execute(ctx context.Context, replicationTask *replicationspb.ReplicationTask, forceApply bool) error
5555
}
5656

5757
TaskExecutorParams struct {
@@ -107,32 +107,25 @@ func (e *taskExecutorImpl) Execute(
107107
ctx context.Context,
108108
replicationTask *replicationspb.ReplicationTask,
109109
forceApply bool,
110-
) (string, error) {
110+
) error {
111111
var err error
112-
var operation string
113112
switch replicationTask.GetTaskType() {
114113
case enumsspb.REPLICATION_TASK_TYPE_SYNC_SHARD_STATUS_TASK:
115114
// Shard status will be sent as part of the Replication message without kafka
116-
operation = metrics.SyncShardTaskScope
117115
case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK:
118-
operation = metrics.SyncActivityTaskScope
119116
err = e.handleActivityTask(ctx, replicationTask, forceApply)
120117
case enumsspb.REPLICATION_TASK_TYPE_HISTORY_METADATA_TASK:
121118
// Without kafka we should not have size limits so we don't necessary need this in the new replication scheme.
122-
operation = metrics.HistoryMetadataReplicationTaskScope
123119
case enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK:
124-
operation = metrics.HistoryReplicationTaskScope
125120
err = e.handleHistoryReplicationTask(ctx, replicationTask, forceApply)
126121
case enumsspb.REPLICATION_TASK_TYPE_SYNC_WORKFLOW_STATE_TASK:
127-
operation = metrics.SyncWorkflowStateTaskScope
128122
err = e.handleSyncWorkflowStateTask(ctx, replicationTask, forceApply)
129123
default:
130124
e.logger.Error("Unknown task type.")
131-
operation = metrics.ReplicatorScope
132125
err = ErrUnknownReplicationTask
133126
}
134127

135-
return operation, err
128+
return err
136129
}
137130

138131
func (e *taskExecutorImpl) handleActivityTask(

service/history/replication/task_executor_mock.go

+3-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/replication/task_executor_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask() {
232232
}
233233

234234
s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil)
235-
_, err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
235+
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
236236
s.NoError(err)
237237
}
238238

@@ -297,7 +297,7 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncActivityReplicationTask_Rese
297297
int64(456),
298298
)
299299
s.mockEngine.EXPECT().SyncActivity(gomock.Any(), request).Return(nil)
300-
_, err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
300+
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
301301
s.NoError(err)
302302
}
303303

@@ -330,7 +330,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask() {
330330
}
331331

332332
s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil)
333-
_, err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
333+
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
334334
s.NoError(err)
335335
}
336336

@@ -385,7 +385,7 @@ func (s *taskExecutorSuite) TestProcess_HistoryReplicationTask_Resend() {
385385
int64(456),
386386
)
387387
s.mockEngine.EXPECT().ReplicateEventsV2(gomock.Any(), request).Return(nil)
388-
_, err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
388+
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
389389
s.NoError(err)
390390
}
391391

@@ -406,6 +406,6 @@ func (s *taskExecutorSuite) TestProcessTaskOnce_SyncWorkflowStateTask() {
406406

407407
s.mockEngine.EXPECT().ReplicateWorkflowState(gomock.Any(), gomock.Any()).Return(nil)
408408

409-
_, err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
409+
err := s.replicationTaskExecutor.Execute(context.Background(), task, true)
410410
s.NoError(err)
411411
}

service/history/replication/task_processor.go

+35-3
Original file line numberDiff line numberDiff line change
@@ -324,14 +324,27 @@ func (p *taskProcessorImpl) handleSyncShardStatus(
324324
func (p *taskProcessorImpl) handleReplicationTask(
325325
ctx context.Context,
326326
replicationTask *replicationspb.ReplicationTask,
327-
) error {
327+
) (retErr error) {
328328
_ = p.rateLimiter.Wait(ctx)
329329

330+
operationTagValue := p.getOperationTagValue(replicationTask)
331+
330332
operation := func() error {
331-
operation, err := p.replicationTaskExecutor.Execute(ctx, replicationTask, false)
332-
p.emitTaskMetrics(operation, err)
333+
err := p.replicationTaskExecutor.Execute(ctx, replicationTask, false)
334+
p.emitTaskMetrics(operationTagValue, err)
333335
return err
334336
}
337+
338+
var panicErr error
339+
defer func() {
340+
if panicErr != nil {
341+
retErr = panicErr
342+
p.emitTaskMetrics(operationTagValue, panicErr)
343+
}
344+
}()
345+
346+
defer log.CapturePanic(p.logger, &panicErr)
347+
335348
return backoff.ThrottleRetry(operation, p.taskRetryPolicy, p.isRetryableError)
336349
}
337350

@@ -526,6 +539,25 @@ func (p *taskProcessorImpl) emitTaskMetrics(operation string, err error) {
526539
metricsScope.Counter(metrics.ReplicationTasksFailed.GetMetricName()).Record(1)
527540
}
528541

542+
func (p *taskProcessorImpl) getOperationTagValue(
543+
replicationTask *replicationspb.ReplicationTask,
544+
) string {
545+
switch replicationTask.GetTaskType() {
546+
case enumsspb.REPLICATION_TASK_TYPE_SYNC_SHARD_STATUS_TASK:
547+
return metrics.SyncShardTaskScope
548+
case enumsspb.REPLICATION_TASK_TYPE_SYNC_ACTIVITY_TASK:
549+
return metrics.SyncActivityTaskScope
550+
case enumsspb.REPLICATION_TASK_TYPE_HISTORY_METADATA_TASK:
551+
return metrics.HistoryMetadataReplicationTaskScope
552+
case enumsspb.REPLICATION_TASK_TYPE_HISTORY_V2_TASK:
553+
return metrics.HistoryReplicationTaskScope
554+
case enumsspb.REPLICATION_TASK_TYPE_SYNC_WORKFLOW_STATE_TASK:
555+
return metrics.SyncWorkflowStateTaskScope
556+
default:
557+
return metrics.ReplicatorScope
558+
}
559+
}
560+
529561
func (p *taskProcessorImpl) isStopped() bool {
530562
return atomic.LoadInt32(&p.status) == common.DaemonStatusStopped
531563
}

service/history/replication/task_processor_test.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func (s *taskProcessorSuite) TestHandleReplicationTask_SyncActivity() {
204204
VisibilityTime: &now,
205205
}
206206

207-
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return("", nil)
207+
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return(nil)
208208
err := s.replicationTaskProcessor.handleReplicationTask(context.Background(), task)
209209
s.NoError(err)
210210
}
@@ -243,11 +243,23 @@ func (s *taskProcessorSuite) TestHandleReplicationTask_History() {
243243
VisibilityTime: &now,
244244
}
245245

246-
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return("", nil)
246+
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).Return(nil)
247247
err = s.replicationTaskProcessor.handleReplicationTask(context.Background(), task)
248248
s.NoError(err)
249249
}
250250

251+
func (s *taskProcessorSuite) TestHandleReplicationTask_Panic() {
252+
task := &replicationspb.ReplicationTask{}
253+
254+
s.mockReplicationTaskExecutor.EXPECT().Execute(gomock.Any(), task, false).DoAndReturn(
255+
func(_ context.Context, _ *replicationspb.ReplicationTask, _ bool) error {
256+
panic("test replication task panic")
257+
},
258+
)
259+
err := s.replicationTaskProcessor.handleReplicationTask(context.Background(), task)
260+
s.Error(err)
261+
}
262+
251263
func (s *taskProcessorSuite) TestHandleReplicationDLQTask_SyncActivity() {
252264
namespaceID := uuid.NewRandom().String()
253265
workflowID := uuid.New()

0 commit comments

Comments
 (0)