Skip to content

Commit a22db81

Browse files
authored
Remove deprecated code from queue implementation (#3770)
1 parent adf7c54 commit a22db81

15 files changed

+79
-111
lines changed

common/dynamicconfig/constants.go

-8
Original file line numberDiff line numberDiff line change
@@ -411,8 +411,6 @@ const (
411411
TimerProcessorMaxPollInterval = "history.timerProcessorMaxPollInterval"
412412
// TimerProcessorMaxPollIntervalJitterCoefficient is the max poll interval jitter coefficient
413413
TimerProcessorMaxPollIntervalJitterCoefficient = "history.timerProcessorMaxPollIntervalJitterCoefficient"
414-
// TimerProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for timer processor
415-
TimerProcessorMaxReschedulerSize = "history.timerProcessorMaxReschedulerSize"
416414
// TimerProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for timer processor
417415
TimerProcessorPollBackoffInterval = "history.timerProcessorPollBackoffInterval"
418416
// TimerProcessorMaxTimeShift is the max shift timer processor can have
@@ -452,8 +450,6 @@ const (
452450
TransferProcessorUpdateAckIntervalJitterCoefficient = "history.transferProcessorUpdateAckIntervalJitterCoefficient"
453451
// TransferProcessorCompleteTransferInterval is complete timer interval for transferQueueProcessor
454452
TransferProcessorCompleteTransferInterval = "history.transferProcessorCompleteTransferInterval"
455-
// TransferProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for transferQueueProcessor
456-
TransferProcessorMaxReschedulerSize = "history.transferProcessorMaxReschedulerSize"
457453
// TransferProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for transferQueueProcessor
458454
TransferProcessorPollBackoffInterval = "history.transferProcessorPollBackoffInterval"
459455
// TransferProcessorVisibilityArchivalTimeLimit is the upper time limit for archiving visibility records
@@ -485,8 +481,6 @@ const (
485481
VisibilityProcessorUpdateAckIntervalJitterCoefficient = "history.visibilityProcessorUpdateAckIntervalJitterCoefficient"
486482
// VisibilityProcessorCompleteTaskInterval is complete timer interval for visibilityQueueProcessor
487483
VisibilityProcessorCompleteTaskInterval = "history.visibilityProcessorCompleteTaskInterval"
488-
// VisibilityProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for visibilityQueueProcessor
489-
VisibilityProcessorMaxReschedulerSize = "history.visibilityProcessorMaxReschedulerSize"
490484
// VisibilityProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for visibilityQueueProcessor
491485
VisibilityProcessorPollBackoffInterval = "history.visibilityProcessorPollBackoffInterval"
492486
// VisibilityProcessorVisibilityArchivalTimeLimit is the upper time limit for archiving visibility records
@@ -545,8 +539,6 @@ const (
545539
ReplicatorProcessorUpdateAckInterval = "history.replicatorProcessorUpdateAckInterval"
546540
// ReplicatorProcessorUpdateAckIntervalJitterCoefficient is the update interval jitter coefficient
547541
ReplicatorProcessorUpdateAckIntervalJitterCoefficient = "history.replicatorProcessorUpdateAckIntervalJitterCoefficient"
548-
// ReplicatorProcessorMaxReschedulerSize is the threshold of the number of tasks in the redispatch queue for ReplicatorProcessor
549-
ReplicatorProcessorMaxReschedulerSize = "history.replicatorProcessorMaxReschedulerSize"
550542
// ReplicatorProcessorEnablePriorityTaskProcessor indicates whether priority task processor should be used for ReplicatorProcessor
551543
ReplicatorProcessorEnablePriorityTaskProcessor = "history.replicatorProcessorEnablePriorityTaskProcessor"
552544
// MaximumBufferedEventsBatch is max number of buffer event in mutable state

service/history/archival_queue_factory.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -112,11 +112,6 @@ func newQueueFactoryBase(params ArchivalQueueFactoryParams, hostScheduler queues
112112
return QueueFactoryBase{
113113
HostScheduler: hostScheduler,
114114
HostPriorityAssigner: queues.NewPriorityAssigner(),
115-
HostRateLimiter: NewQueueHostRateLimiter(
116-
params.Config.ArchivalProcessorMaxPollHostRPS,
117-
params.Config.PersistenceMaxQPS,
118-
archivalQueuePersistenceMaxRPSRatio,
119-
),
120115
HostReaderRateLimiter: queues.NewReaderPriorityRateLimiter(
121116
NewHostRateLimiterRateFn(
122117
params.Config.ArchivalProcessorMaxPollHostRPS,
@@ -152,10 +147,20 @@ func (f *archivalQueueFactory) newArchivalTaskExecutor(shard shard.Context, work
152147
// newScheduledQueue creates a new scheduled queue for the given shard with archival-specific configurations.
153148
func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor queues.Executor) queues.Queue {
154149
logger := log.With(shard.GetLogger(), tag.ComponentArchivalQueue)
150+
metricsHandler := f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationArchivalQueueProcessorScope))
151+
152+
rescheduler := queues.NewRescheduler(
153+
f.HostScheduler,
154+
shard.GetTimeSource(),
155+
logger,
156+
metricsHandler,
157+
)
158+
155159
return queues.NewScheduledQueue(
156160
shard,
157161
tasks.CategoryArchival,
158162
f.HostScheduler,
163+
rescheduler,
159164
f.HostPriorityAssigner,
160165
executor,
161166
&queues.Options{
@@ -179,6 +184,6 @@ func (f *archivalQueueFactory) newScheduledQueue(shard shard.Context, executor q
179184
},
180185
f.HostReaderRateLimiter,
181186
logger,
182-
f.MetricsHandler.WithTags(metrics.OperationTag(metrics.OperationArchivalQueueProcessorScope)),
187+
metricsHandler,
183188
)
184189
}

service/history/configs/config.go

-8
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ type Config struct {
110110
TimerProcessorMaxPollHostRPS dynamicconfig.IntPropertyFn
111111
TimerProcessorMaxPollInterval dynamicconfig.DurationPropertyFn
112112
TimerProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
113-
TimerProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
114113
TimerProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
115114
TimerProcessorMaxTimeShift dynamicconfig.DurationPropertyFn
116115
TimerProcessorHistoryArchivalSizeLimit dynamicconfig.IntPropertyFn
@@ -132,7 +131,6 @@ type Config struct {
132131
TransferProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
133132
TransferProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
134133
TransferProcessorCompleteTransferInterval dynamicconfig.DurationPropertyFn
135-
TransferProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
136134
TransferProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
137135
TransferProcessorVisibilityArchivalTimeLimit dynamicconfig.DurationPropertyFn
138136
TransferProcessorEnsureCloseBeforeDelete dynamicconfig.BoolPropertyFn
@@ -147,7 +145,6 @@ type Config struct {
147145
ReplicatorProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
148146
ReplicatorProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
149147
ReplicatorProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
150-
ReplicatorProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
151148
ReplicatorProcessorEnablePriorityTaskProcessor dynamicconfig.BoolPropertyFn
152149
ReplicatorProcessorFetchTasksBatchSize dynamicconfig.IntPropertyFn
153150
ReplicatorProcessorMaxSkipTaskCount dynamicconfig.IntPropertyFn
@@ -263,7 +260,6 @@ type Config struct {
263260
VisibilityProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
264261
VisibilityProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
265262
VisibilityProcessorCompleteTaskInterval dynamicconfig.DurationPropertyFn
266-
VisibilityProcessorMaxReschedulerSize dynamicconfig.IntPropertyFn
267263
VisibilityProcessorPollBackoffInterval dynamicconfig.DurationPropertyFn
268264
VisibilityProcessorVisibilityArchivalTimeLimit dynamicconfig.DurationPropertyFn
269265
VisibilityProcessorEnsureCloseBeforeDelete dynamicconfig.BoolPropertyFn
@@ -366,7 +362,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
366362
TimerProcessorMaxPollHostRPS: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxPollHostRPS, 0),
367363
TimerProcessorMaxPollInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxPollInterval, 5*time.Minute),
368364
TimerProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TimerProcessorMaxPollIntervalJitterCoefficient, 0.15),
369-
TimerProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.TimerProcessorMaxReschedulerSize, 10000),
370365
TimerProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.TimerProcessorPollBackoffInterval, 5*time.Second),
371366
TimerProcessorMaxTimeShift: dc.GetDurationProperty(dynamicconfig.TimerProcessorMaxTimeShift, 1*time.Second),
372367
TimerProcessorHistoryArchivalSizeLimit: dc.GetIntProperty(dynamicconfig.TimerProcessorHistoryArchivalSizeLimit, 500*1024),
@@ -386,7 +381,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
386381
TransferProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorUpdateAckInterval, 30*time.Second),
387382
TransferProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TransferProcessorUpdateAckIntervalJitterCoefficient, 0.15),
388383
TransferProcessorCompleteTransferInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorCompleteTransferInterval, 60*time.Second),
389-
TransferProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.TransferProcessorMaxReschedulerSize, 10000),
390384
TransferProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.TransferProcessorPollBackoffInterval, 5*time.Second),
391385
TransferProcessorVisibilityArchivalTimeLimit: dc.GetDurationProperty(dynamicconfig.TransferProcessorVisibilityArchivalTimeLimit, 200*time.Millisecond),
392386
TransferProcessorEnsureCloseBeforeDelete: dc.GetBoolProperty(dynamicconfig.TransferProcessorEnsureCloseBeforeDelete, true),
@@ -399,7 +393,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
399393
ReplicatorProcessorMaxPollIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorMaxPollIntervalJitterCoefficient, 0.15),
400394
ReplicatorProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.ReplicatorProcessorUpdateAckInterval, 5*time.Second),
401395
ReplicatorProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.ReplicatorProcessorUpdateAckIntervalJitterCoefficient, 0.15),
402-
ReplicatorProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.ReplicatorProcessorMaxReschedulerSize, 10000),
403396
ReplicatorProcessorEnablePriorityTaskProcessor: dc.GetBoolProperty(dynamicconfig.ReplicatorProcessorEnablePriorityTaskProcessor, false),
404397
ReplicatorProcessorFetchTasksBatchSize: dc.GetIntProperty(dynamicconfig.ReplicatorTaskBatchSize, 25),
405398
ReplicatorProcessorMaxSkipTaskCount: dc.GetIntProperty(dynamicconfig.ReplicatorMaxSkipTaskCount, 250),
@@ -485,7 +478,6 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
485478
VisibilityProcessorUpdateAckInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorUpdateAckInterval, 30*time.Second),
486479
VisibilityProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.VisibilityProcessorUpdateAckIntervalJitterCoefficient, 0.15),
487480
VisibilityProcessorCompleteTaskInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorCompleteTaskInterval, 60*time.Second),
488-
VisibilityProcessorMaxReschedulerSize: dc.GetIntProperty(dynamicconfig.VisibilityProcessorMaxReschedulerSize, 10000),
489481
VisibilityProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorPollBackoffInterval, 5*time.Second),
490482
VisibilityProcessorVisibilityArchivalTimeLimit: dc.GetDurationProperty(dynamicconfig.VisibilityProcessorVisibilityArchivalTimeLimit, 200*time.Millisecond),
491483
VisibilityProcessorEnsureCloseBeforeDelete: dc.GetBoolProperty(dynamicconfig.VisibilityProcessorEnsureCloseBeforeDelete, false),

service/history/historyEngine.go

+1-11
Original file line numberDiff line numberDiff line change
@@ -353,18 +353,8 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() {
353353
e.shard.GetNamespaceRegistry().RegisterNamespaceChangeCallback(
354354
e,
355355
0, /* always want callback so UpdateHandoverNamespaces() can be called after shard reload */
356-
func() {
357-
for _, queueProcessor := range e.queueProcessors {
358-
queueProcessor.LockTaskProcessing()
359-
}
360-
},
356+
func() {},
361357
func(prevNamespaces []*namespace.Namespace, nextNamespaces []*namespace.Namespace) {
362-
defer func() {
363-
for _, queueProcessor := range e.queueProcessors {
364-
queueProcessor.UnlockTaskProcessing()
365-
}
366-
}()
367-
368358
if len(nextNamespaces) == 0 {
369359
return
370360
}

service/history/queueFactoryBase.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,8 @@ type (
7878
}
7979

8080
QueueFactoryBase struct {
81-
HostScheduler queues.Scheduler
82-
HostPriorityAssigner queues.PriorityAssigner
83-
HostRateLimiter quotas.RateLimiter
84-
85-
// used by multi-cursor queue reader
81+
HostScheduler queues.Scheduler
82+
HostPriorityAssigner queues.PriorityAssigner
8683
HostReaderRateLimiter quotas.RequestRateLimiter
8784
}
8885

service/history/queues/queue.go

-2
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,5 @@ type (
3737
Category() tasks.Category
3838
NotifyNewTasks(tasks []tasks.Task)
3939
FailoverNamespace(namespaceIDs map[string]struct{})
40-
LockTaskProcessing()
41-
UnlockTaskProcessing()
4240
}
4341
)

service/history/queues/queue_base.go

+1-14
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ func newQueueBase(
125125
category tasks.Category,
126126
paginationFnProvider PaginationFnProvider,
127127
scheduler Scheduler,
128+
rescheduler Rescheduler,
128129
priorityAssigner PriorityAssigner,
129130
executor Executor,
130131
options *Options,
@@ -150,12 +151,6 @@ func newQueueBase(
150151
}
151152

152153
timeSource := shard.GetTimeSource()
153-
rescheduler := NewRescheduler(
154-
scheduler,
155-
timeSource,
156-
logger,
157-
metricsHandler,
158-
)
159154

160155
monitor := newMonitor(category.Type(), &options.MonitorOptions)
161156
mitigator := newMitigator(monitor, logger, metricsHandler, options.MaxReaderCount)
@@ -286,14 +281,6 @@ func (p *queueBase) FailoverNamespace(
286281
p.rescheduler.Reschedule(namespaceIDs)
287282
}
288283

289-
func (p *queueBase) LockTaskProcessing() {
290-
// no-op
291-
}
292-
293-
func (p *queueBase) UnlockTaskProcessing() {
294-
// no-op
295-
}
296-
297284
func (p *queueBase) processNewRange() error {
298285
var newMaxKey tasks.Key
299286
switch categoryType := p.category.Type(); categoryType {

service/history/queues/queue_base_test.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ func (s *queueBaseSuite) TestNewProcessBase_NoPreviousState() {
140140
tasks.CategoryTransfer,
141141
nil,
142142
s.mockScheduler,
143+
s.mockRescheduler,
143144
NewNoopPriorityAssigner(),
144145
nil,
145146
s.options,
@@ -224,6 +225,7 @@ func (s *queueBaseSuite) TestNewProcessBase_WithPreviousState() {
224225
tasks.CategoryTransfer,
225226
nil,
226227
s.mockScheduler,
228+
s.mockRescheduler,
227229
NewNoopPriorityAssigner(),
228230
nil,
229231
s.options,
@@ -284,14 +286,14 @@ func (s *queueBaseSuite) TestStartStop() {
284286
tasks.CategoryTransfer,
285287
paginationFnProvider,
286288
s.mockScheduler,
289+
s.mockRescheduler,
287290
NewNoopPriorityAssigner(),
288291
nil,
289292
s.options,
290293
s.rateLimiter,
291294
s.logger,
292295
s.metricsHandler,
293296
)
294-
base.rescheduler = s.mockRescheduler // replace with mock to verify Start/Stop
295297

296298
s.mockRescheduler.EXPECT().Start().Times(1)
297299
base.Start()
@@ -335,6 +337,7 @@ func (s *queueBaseSuite) TestProcessNewRange() {
335337
tasks.CategoryTimer,
336338
nil,
337339
s.mockScheduler,
340+
s.mockRescheduler,
338341
NewNoopPriorityAssigner(),
339342
nil,
340343
s.options,
@@ -392,6 +395,7 @@ func (s *queueBaseSuite) TestCheckPoint_WithPendingTasks() {
392395
tasks.CategoryTimer,
393396
nil,
394397
s.mockScheduler,
398+
s.mockRescheduler,
395399
NewNoopPriorityAssigner(),
396400
nil,
397401
s.options,
@@ -465,6 +469,7 @@ func (s *queueBaseSuite) TestCheckPoint_NoPendingTasks() {
465469
tasks.CategoryTimer,
466470
nil,
467471
s.mockScheduler,
472+
s.mockRescheduler,
468473
NewNoopPriorityAssigner(),
469474
nil,
470475
s.options,
@@ -553,6 +558,7 @@ func (s *queueBaseSuite) TestCheckPoint_MoveSlices() {
553558
tasks.CategoryTimer,
554559
nil,
555560
s.mockScheduler,
561+
s.mockRescheduler,
556562
NewNoopPriorityAssigner(),
557563
nil,
558564
s.options,

service/history/queues/queue_immediate.go

+2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func NewImmediateQueue(
5454
shard shard.Context,
5555
category tasks.Category,
5656
scheduler Scheduler,
57+
rescheduler Rescheduler,
5758
priorityAssigner PriorityAssigner,
5859
executor Executor,
5960
options *Options,
@@ -90,6 +91,7 @@ func NewImmediateQueue(
9091
category,
9192
paginationFnProvider,
9293
scheduler,
94+
rescheduler,
9395
priorityAssigner,
9496
executor,
9597
options,

service/history/queues/queue_mock.go

-24
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/history/queues/queue_scheduled.go

+2
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ func NewScheduledQueue(
6666
shard shard.Context,
6767
category tasks.Category,
6868
scheduler Scheduler,
69+
rescheduler Rescheduler,
6970
priorityAssigner PriorityAssigner,
7071
executor Executor,
7172
options *Options,
@@ -111,6 +112,7 @@ func NewScheduledQueue(
111112
category,
112113
paginationFnProvider,
113114
scheduler,
115+
rescheduler,
114116
priorityAssigner,
115117
executor,
116118
options,

service/history/queues/queue_scheduled_test.go

+23-14
Original file line numberDiff line numberDiff line change
@@ -84,23 +84,32 @@ func (s *scheduledQueueSuite) SetupTest() {
8484
s.mockExecutionManager = s.mockShard.Resource.ExecutionMgr
8585
s.mockShard.Resource.ClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
8686

87+
scheduler := NewPriorityScheduler(
88+
PrioritySchedulerOptions{
89+
WorkerCount: dynamicconfig.GetIntPropertyFn(10),
90+
EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true),
91+
},
92+
NewSchedulerRateLimiter(
93+
s.mockShard.GetConfig().TaskSchedulerNamespaceMaxQPS,
94+
s.mockShard.GetConfig().TaskSchedulerMaxQPS,
95+
s.mockShard.GetConfig().PersistenceNamespaceMaxQPS,
96+
s.mockShard.GetConfig().PersistenceMaxQPS,
97+
),
98+
s.mockShard.GetTimeSource(),
99+
log.NewTestLogger(),
100+
)
101+
rescheduler := NewRescheduler(
102+
scheduler,
103+
s.mockShard.GetTimeSource(),
104+
log.NewTestLogger(),
105+
metrics.NoopMetricsHandler,
106+
)
107+
87108
s.scheduledQueue = NewScheduledQueue(
88109
s.mockShard,
89110
tasks.CategoryTimer,
90-
NewPriorityScheduler(
91-
PrioritySchedulerOptions{
92-
WorkerCount: dynamicconfig.GetIntPropertyFn(10),
93-
EnableRateLimiter: dynamicconfig.GetBoolPropertyFn(true),
94-
},
95-
NewSchedulerRateLimiter(
96-
s.mockShard.GetConfig().TaskSchedulerNamespaceMaxQPS,
97-
s.mockShard.GetConfig().TaskSchedulerMaxQPS,
98-
s.mockShard.GetConfig().PersistenceNamespaceMaxQPS,
99-
s.mockShard.GetConfig().PersistenceMaxQPS,
100-
),
101-
s.mockShard.GetTimeSource(),
102-
log.NewTestLogger(),
103-
),
111+
scheduler,
112+
rescheduler,
104113
nil,
105114
nil,
106115
testQueueOptions,

0 commit comments

Comments
 (0)