Skip to content

Commit 7e3daf1

Browse files
authored
Add lower priority for replication tasks (#3870)
* add lower priority for replication * test * linting * RequestPriorityFn
1 parent fa429a5 commit 7e3daf1

File tree

7 files changed

+24
-12
lines changed

7 files changed

+24
-12
lines changed

common/headers/caller_info.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,22 @@ import (
3131
)
3232

3333
const (
34-
CallerTypeAPI = "api"
35-
CallerTypeBackground = "background"
34+
CallerTypeAPI = "api"
35+
CallerTypeBackground = "background"
36+
CallerTypePreemptable = "preemptable"
3637

3738
CallerNameSystem = "system"
3839
)
3940

4041
var (
41-
SystemBackgroundCallerInfo CallerInfo = CallerInfo{
42+
SystemBackgroundCallerInfo = CallerInfo{
4243
CallerName: CallerNameSystem,
4344
CallerType: CallerTypeBackground,
4445
}
46+
SystemPreemptableCallerInfo = CallerInfo{
47+
CallerName: CallerNameSystem,
48+
CallerType: CallerTypePreemptable,
49+
}
4550
)
4651

4752
type (

common/headers/caller_info_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ func (s *callerInfoSuite) TestSetCallerType() {
8383
ctx = SetCallerType(ctx, CallerTypeAPI)
8484
info = GetCallerInfo(ctx)
8585
s.Equal(CallerTypeAPI, info.CallerType)
86+
87+
ctx = SetCallerType(ctx, CallerTypePreemptable)
88+
info = GetCallerInfo(ctx)
89+
s.Equal(CallerTypePreemptable, info.CallerType)
8690
}
8791

8892
func (s *callerInfoSuite) TestSetCallOrigin() {

common/persistence/client/quotas.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ import (
3333

3434
var (
3535
CallerTypeDefaultPriority = map[string]int{
36-
headers.CallerTypeAPI: 1,
37-
headers.CallerTypeBackground: 3,
36+
headers.CallerTypeAPI: 1,
37+
headers.CallerTypeBackground: 3,
38+
headers.CallerTypePreemptable: 4,
3839
}
3940

4041
APITypeCallOriginPriorityOverride = map[string]int{
@@ -69,7 +70,7 @@ var (
6970
p.ConstructHistoryTaskAPI("GetHistoryTasks", tasks.CategoryVisibility): 2,
7071
}
7172

72-
RequestPrioritiesOrdered = []int{0, 1, 2, 3}
73+
RequestPrioritiesOrdered = []int{0, 1, 2, 3, 4}
7374
)
7475

7576
func NewPriorityRateLimiter(
@@ -150,6 +151,8 @@ func RequestPriorityFn(req quotas.Request) int {
150151
return priority
151152
}
152153
return CallerTypeDefaultPriority[req.CallerType]
154+
case headers.CallerTypePreemptable:
155+
return CallerTypeDefaultPriority[req.CallerType]
153156
default:
154157
// default requests to high priority to be consistent with existing behavior
155158
return RequestPrioritiesOrdered[0]

service/history/replication/task_fetcher.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,7 @@ func (f *replicationTaskFetcherWorker) getMessages() error {
420420

421421
ctx, cancel := rpc.NewContextWithTimeoutAndVersionHeaders(fetchTaskRequestTimeout)
422422
defer cancel()
423-
ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundCallerInfo)
423+
ctx = headers.SetCallerInfo(ctx, headers.SystemPreemptableCallerInfo)
424424

425425
request := &adminservice.GetReplicationMessagesRequest{
426426
Tokens: tokens,

service/history/replication/task_processor.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (p *taskProcessorImpl) applyReplicationTask(
274274
) error {
275275
ctx := headers.SetCallerInfo(
276276
context.Background(),
277-
headers.SystemBackgroundCallerInfo,
277+
headers.SystemPreemptableCallerInfo,
278278
)
279279

280280
err := p.handleReplicationTask(ctx, replicationTask)
@@ -314,7 +314,7 @@ func (p *taskProcessorImpl) handleSyncShardStatus(
314314
metrics.OperationTag(metrics.HistorySyncShardStatusScope))
315315
ctx, cancel := context.WithTimeout(context.Background(), replicationTimeout)
316316
defer cancel()
317-
ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundCallerInfo)
317+
ctx = headers.SetCallerInfo(ctx, headers.SystemPreemptableCallerInfo)
318318

319319
return p.historyEngine.SyncShardStatus(ctx, &historyservice.SyncShardStatusRequest{
320320
SourceCluster: p.sourceCluster,

service/history/replication/task_processor_manager.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func (r *taskProcessorManagerImpl) cleanupReplicationTasks() error {
273273
)
274274

275275
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
276-
ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundCallerInfo)
276+
ctx = headers.SetCallerInfo(ctx, headers.SystemPreemptableCallerInfo)
277277
defer cancel()
278278

279279
err := r.shard.GetExecutionManager().RangeCompleteHistoryTasks(

service/worker/replicator/namespace_replication_message_processor.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ func (p *namespaceReplicationMessageProcessor) getAndHandleNamespaceReplicationT
147147
}
148148

149149
ctx, cancel := rpc.NewContextWithTimeoutAndVersionHeaders(fetchTaskRequestTimeout)
150-
ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundCallerInfo)
150+
ctx = headers.SetCallerInfo(ctx, headers.SystemPreemptableCallerInfo)
151151
request := &adminservice.GetNamespaceReplicationMessagesRequest{
152152
ClusterName: p.currentCluster,
153153
LastRetrievedMessageId: p.lastRetrievedMessageID,
@@ -164,7 +164,7 @@ func (p *namespaceReplicationMessageProcessor) getAndHandleNamespaceReplicationT
164164
p.logger.Debug("Successfully fetched namespace replication tasks", tag.Counter(len(response.Messages.ReplicationTasks)))
165165

166166
// TODO: specify a timeout for processing namespace replication tasks
167-
taskCtx := headers.SetCallerInfo(context.TODO(), headers.SystemBackgroundCallerInfo)
167+
taskCtx := headers.SetCallerInfo(context.TODO(), headers.SystemPreemptableCallerInfo)
168168
for taskIndex := range response.Messages.ReplicationTasks {
169169
task := response.Messages.ReplicationTasks[taskIndex]
170170
err := backoff.ThrottleRetry(func() error {

0 commit comments

Comments
 (0)