Skip to content

Commit 98545f9

Browse files
authored
Remove unused parameters from task executable (#3747)
1 parent d970aa4 commit 98545f9

11 files changed

+40
-141
lines changed

service/history/archival_queue_task_executor_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,6 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
503503
executable := queues.NewExecutable(
504504
queues.DefaultReaderId,
505505
task,
506-
nil,
507506
executor,
508507
nil,
509508
nil,
@@ -513,7 +512,6 @@ func TestArchivalQueueTaskExecutor(t *testing.T) {
513512
nil,
514513
metrics.NoopMetricsHandler,
515514
nil,
516-
nil,
517515
)
518516
err := executable.Execute()
519517
if len(p.ExpectedErrorSubstrings) > 0 {

service/history/queues/executable.go

+24-57
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,6 @@ type (
6666
// active/standby queue processing logic
6767
Execute(context.Context, Executable) (tags []metrics.Tag, isActive bool, err error)
6868
}
69-
70-
// TaskFilter determines if the given task should be executed
71-
// TODO: remove after merging active/standby queue processor
72-
// task should always be executed as active or verified as standby
73-
TaskFilter func(task tasks.Task) bool
7469
)
7570

7671
var (
@@ -114,31 +109,25 @@ type (
114109
timeSource clock.TimeSource
115110
namespaceRegistry namespace.Registry
116111

117-
readerID int32
118-
loadTime time.Time
119-
scheduledTime time.Time
120-
userLatency time.Duration
121-
lastActiveness bool
122-
resourceExhaustedCount int
123-
logger log.Logger
124-
metricsHandler metrics.Handler
125-
taggedMetricsHandler metrics.Handler
126-
criticalRetryAttempt dynamicconfig.IntPropertyFn
127-
namespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn
128-
filter TaskFilter
129-
shouldProcess bool
112+
readerID int32
113+
loadTime time.Time
114+
scheduledTime time.Time
115+
userLatency time.Duration
116+
lastActiveness bool
117+
resourceExhaustedCount int
118+
logger log.Logger
119+
metricsHandler metrics.Handler
120+
taggedMetricsHandler metrics.Handler
121+
criticalRetryAttempt dynamicconfig.IntPropertyFn
130122
}
131123
)
132124

133-
// TODO: Remove filter, queueType, and namespaceCacheRefreshInterval
134-
// parameters after deprecating old queue processing logic.
135-
// CriticalRetryAttempt probably should also be removed as it's only
125+
// TODO: CriticalRetryAttempt probably should be removed as it's only
136126
// used for emiting logs and metrics when # of attempts is high, and
137127
// doesn't have to be a dynamic config.
138128
func NewExecutable(
139129
readerID int32,
140130
task tasks.Task,
141-
filter TaskFilter,
142131
executor Executor,
143132
scheduler Scheduler,
144133
rescheduler Rescheduler,
@@ -148,7 +137,6 @@ func NewExecutable(
148137
logger log.Logger,
149138
metricsHandler metrics.Handler,
150139
criticalRetryAttempt dynamicconfig.IntPropertyFn,
151-
namespaceCacheRefreshInterval dynamicconfig.DurationPropertyFn,
152140
) Executable {
153141
executable := &executableImpl{
154142
Task: task,
@@ -168,11 +156,9 @@ func NewExecutable(
168156
return tasks.Tags(task)
169157
},
170158
),
171-
metricsHandler: metricsHandler,
172-
taggedMetricsHandler: metricsHandler,
173-
criticalRetryAttempt: criticalRetryAttempt,
174-
filter: filter,
175-
namespaceCacheRefreshInterval: namespaceCacheRefreshInterval,
159+
metricsHandler: metricsHandler,
160+
taggedMetricsHandler: metricsHandler,
161+
criticalRetryAttempt: criticalRetryAttempt,
176162
}
177163
executable.updatePriority()
178164
return executable
@@ -183,15 +169,6 @@ func (e *executableImpl) Execute() error {
183169
return nil
184170
}
185171

186-
// this filter should also contain the logic for overriding
187-
// results from task allocator (force executing some standby task types)
188-
e.shouldProcess = true
189-
if e.filter != nil {
190-
if e.shouldProcess = e.filter(e.Task); !e.shouldProcess {
191-
return nil
192-
}
193-
}
194-
195172
ctx := metrics.AddMetricsContext(context.Background())
196173
namespace, _ := e.namespaceRegistry.GetNamespaceName(namespace.ID(e.GetNamespaceID()))
197174

@@ -291,17 +268,9 @@ func (e *executableImpl) HandleErr(err error) (retErr error) {
291268
}
292269

293270
if _, ok := err.(*serviceerror.NamespaceNotActive); ok {
294-
// TODO remove this error check special case after multi-cursor is enabled by default,
295-
// since the new task life cycle will not give up until task processed / verified
296-
// Currently, only run this check if filter is not nil which means we are running the old
297-
// active/passive queue logic.
298-
if e.filter != nil && e.timeSource.Now().Sub(e.loadTime) > 2*e.namespaceCacheRefreshInterval() {
299-
e.taggedMetricsHandler.Counter(metrics.TaskNotActiveCounter.GetMetricName()).Record(1)
300-
return nil
301-
}
302-
303271
// error is expected when there's namespace failover,
304272
// so don't count it into task failures.
273+
e.taggedMetricsHandler.Counter(metrics.TaskNotActiveCounter.GetMetricName()).Record(1)
305274
return err
306275
}
307276

@@ -362,19 +331,17 @@ func (e *executableImpl) Ack() {
362331

363332
e.state = ctasks.TaskStateAcked
364333

365-
if e.shouldProcess {
366-
e.taggedMetricsHandler.Timer(metrics.TaskLoadLatency.GetMetricName()).Record(
367-
e.loadTime.Sub(e.GetVisibilityTime()),
368-
metrics.QueueReaderIDTag(e.readerID),
369-
)
370-
e.taggedMetricsHandler.Histogram(metrics.TaskAttempt.GetMetricName(), metrics.TaskAttempt.GetMetricUnit()).Record(int64(e.attempt))
334+
e.taggedMetricsHandler.Timer(metrics.TaskLoadLatency.GetMetricName()).Record(
335+
e.loadTime.Sub(e.GetVisibilityTime()),
336+
metrics.QueueReaderIDTag(e.readerID),
337+
)
338+
e.taggedMetricsHandler.Histogram(metrics.TaskAttempt.GetMetricName(), metrics.TaskAttempt.GetMetricUnit()).Record(int64(e.attempt))
371339

372-
priorityTaggedProvider := e.taggedMetricsHandler.WithTags(metrics.TaskPriorityTag(e.lowestPriority.String()))
373-
priorityTaggedProvider.Timer(metrics.TaskLatency.GetMetricName()).Record(time.Since(e.loadTime))
340+
priorityTaggedProvider := e.taggedMetricsHandler.WithTags(metrics.TaskPriorityTag(e.lowestPriority.String()))
341+
priorityTaggedProvider.Timer(metrics.TaskLatency.GetMetricName()).Record(time.Since(e.loadTime))
374342

375-
readerIDTaggedProvider := priorityTaggedProvider.WithTags(metrics.QueueReaderIDTag(e.readerID))
376-
readerIDTaggedProvider.Timer(metrics.TaskQueueLatency.GetMetricName()).Record(time.Since(e.GetVisibilityTime()))
377-
}
343+
readerIDTaggedProvider := priorityTaggedProvider.WithTags(metrics.QueueReaderIDTag(e.readerID))
344+
readerIDTaggedProvider.Timer(metrics.TaskQueueLatency.GetMetricName()).Record(time.Since(e.GetVisibilityTime()))
378345
}
379346

380347
func (e *executableImpl) Nack(err error) {

service/history/queues/executable_test.go

+14-68
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,6 @@ import (
4848
"go.temporal.io/server/service/history/tests"
4949
)
5050

51-
const namespaceCacheRefreshInterval = 10 * time.Second
52-
5351
type (
5452
executableSuite struct {
5553
suite.Suite
@@ -88,18 +86,8 @@ func (s *executableSuite) TearDownSuite() {
8886
s.controller.Finish()
8987
}
9088

91-
func (s *executableSuite) TestExecute_TaskFiltered() {
92-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
93-
return false
94-
})
95-
96-
s.NoError(executable.Execute())
97-
}
98-
9989
func (s *executableSuite) TestExecute_TaskExecuted() {
100-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
101-
return true
102-
})
90+
executable := s.newTestExecutable()
10391

10492
s.mockExecutor.EXPECT().Execute(gomock.Any(), executable).Return(nil, true, errors.New("some random error"))
10593
s.Error(executable.Execute())
@@ -109,9 +97,7 @@ func (s *executableSuite) TestExecute_TaskExecuted() {
10997
}
11098

11199
func (s *executableSuite) TestExecute_UserLatency() {
112-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
113-
return true
114-
})
100+
executable := s.newTestExecutable()
115101

116102
expectedUserLatency := int64(133)
117103
updateContext := func(ctx context.Context, taskInfo interface{}) {
@@ -124,79 +110,49 @@ func (s *executableSuite) TestExecute_UserLatency() {
124110
}
125111

126112
func (s *executableSuite) TestHandleErr_EntityNotExists() {
127-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
128-
return true
129-
})
113+
executable := s.newTestExecutable()
130114

131115
s.NoError(executable.HandleErr(serviceerror.NewNotFound("")))
132116
}
133117

134118
func (s *executableSuite) TestHandleErr_ErrTaskRetry() {
135-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
136-
return true
137-
})
119+
executable := s.newTestExecutable()
138120

139121
s.Equal(consts.ErrTaskRetry, executable.HandleErr(consts.ErrTaskRetry))
140122
}
141123

142124
func (s *executableSuite) TestHandleErr_ErrDeleteOpenExecution() {
143-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
144-
return true
145-
})
125+
executable := s.newTestExecutable()
146126

147127
s.Equal(consts.ErrDependencyTaskNotCompleted, executable.HandleErr(consts.ErrDependencyTaskNotCompleted))
148128
}
149129

150130
func (s *executableSuite) TestHandleErr_ErrTaskDiscarded() {
151-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
152-
return true
153-
})
131+
executable := s.newTestExecutable()
154132

155133
s.NoError(executable.HandleErr(consts.ErrTaskDiscarded))
156134
}
157135

158136
func (s *executableSuite) TestHandleErr_ErrTaskVersionMismatch() {
159-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
160-
return true
161-
})
137+
executable := s.newTestExecutable()
162138

163139
s.NoError(executable.HandleErr(consts.ErrTaskVersionMismatch))
164140
}
165141

166142
func (s *executableSuite) TestHandleErr_NamespaceNotActiveError() {
167-
now := time.Now().UTC()
168143
err := serviceerror.NewNamespaceNotActive("", "", "")
169144

170-
s.timeSource.Update(now.Add(-namespaceCacheRefreshInterval * time.Duration(3)))
171-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
172-
return true
173-
})
174-
s.timeSource.Update(now)
175-
s.NoError(executable.HandleErr(err))
176-
177-
s.timeSource.Update(now.Add(-namespaceCacheRefreshInterval * time.Duration(3)))
178-
executable = s.newTestExecutable(nil)
179-
s.timeSource.Update(now)
180-
s.Equal(err, executable.HandleErr(err))
181-
182-
executable = s.newTestExecutable(func(_ tasks.Task) bool {
183-
return true
184-
})
185-
s.Equal(err, executable.HandleErr(err))
145+
s.Equal(err, s.newTestExecutable().HandleErr(err))
186146
}
187147

188148
func (s *executableSuite) TestHandleErr_RandomErr() {
189-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
190-
return true
191-
})
149+
executable := s.newTestExecutable()
192150

193151
s.Error(executable.HandleErr(errors.New("random error")))
194152
}
195153

196154
func (s *executableSuite) TestTaskAck() {
197-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
198-
return true
199-
})
155+
executable := s.newTestExecutable()
200156

201157
s.Equal(ctasks.TaskStatePending, executable.State())
202158

@@ -205,9 +161,7 @@ func (s *executableSuite) TestTaskAck() {
205161
}
206162

207163
func (s *executableSuite) TestTaskNack_Resubmit() {
208-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
209-
return true
210-
})
164+
executable := s.newTestExecutable()
211165

212166
s.mockScheduler.EXPECT().TrySubmit(executable).Return(true)
213167

@@ -216,9 +170,7 @@ func (s *executableSuite) TestTaskNack_Resubmit() {
216170
}
217171

218172
func (s *executableSuite) TestTaskNack_Reschedule() {
219-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
220-
return true
221-
})
173+
executable := s.newTestExecutable()
222174

223175
s.mockRescheduler.EXPECT().Add(executable, gomock.AssignableToTypeOf(time.Now())).MinTimes(1)
224176

@@ -233,9 +185,7 @@ func (s *executableSuite) TestTaskNack_Reschedule() {
233185
}
234186

235187
func (s *executableSuite) TestTaskCancellation() {
236-
executable := s.newTestExecutable(func(_ tasks.Task) bool {
237-
return true
238-
})
188+
executable := s.newTestExecutable()
239189

240190
executable.Cancel()
241191

@@ -252,9 +202,7 @@ func (s *executableSuite) TestTaskCancellation() {
252202
s.False(executable.IsRetryableError(errors.New("some random error")))
253203
}
254204

255-
func (s *executableSuite) newTestExecutable(
256-
filter TaskFilter,
257-
) Executable {
205+
func (s *executableSuite) newTestExecutable() Executable {
258206
return NewExecutable(
259207
DefaultReaderId,
260208
tasks.NewFakeTask(
@@ -266,7 +214,6 @@ func (s *executableSuite) newTestExecutable(
266214
tasks.CategoryTransfer,
267215
s.timeSource.Now(),
268216
),
269-
filter,
270217
s.mockExecutor,
271218
s.mockScheduler,
272219
s.mockRescheduler,
@@ -276,6 +223,5 @@ func (s *executableSuite) newTestExecutable(
276223
log.NewTestLogger(),
277224
metrics.NoopMetricsHandler,
278225
dynamicconfig.GetIntPropertyFn(100),
279-
dynamicconfig.GetDurationPropertyFn(namespaceCacheRefreshInterval),
280226
)
281227
}

service/history/queues/queue_base.go

-2
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ func newQueueBase(
164164
return NewExecutable(
165165
readerID,
166166
t,
167-
nil,
168167
executor,
169168
scheduler,
170169
rescheduler,
@@ -174,7 +173,6 @@ func newQueueBase(
174173
logger,
175174
metricsHandler,
176175
options.TaskMaxRetryCount,
177-
shard.GetConfig().NamespaceCacheRefreshInterval,
178176
)
179177
}
180178

service/history/queues/reader_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (s *readerSuite) SetupTest() {
7777
s.metricsHandler = metrics.NoopMetricsHandler
7878

7979
s.executableInitializer = func(readerID int32, t tasks.Task) Executable {
80-
return NewExecutable(readerID, t, nil, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, metrics.NoopMetricsHandler, nil, nil)
80+
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, metrics.NoopMetricsHandler, nil)
8181
}
8282
s.monitor = newMonitor(tasks.CategoryTypeScheduled, &MonitorOptions{
8383
PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000),

service/history/queues/slice_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (s *sliceSuite) SetupTest() {
6969
s.controller = gomock.NewController(s.T())
7070

7171
s.executableInitializer = func(readerID int32, t tasks.Task) Executable {
72-
return NewExecutable(readerID, t, nil, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, metrics.NoopMetricsHandler, nil, nil)
72+
return NewExecutable(readerID, t, nil, nil, nil, NewNoopPriorityAssigner(), clock.NewRealTimeSource(), nil, nil, metrics.NoopMetricsHandler, nil)
7373
}
7474
s.monitor = newMonitor(tasks.CategoryTypeScheduled, &MonitorOptions{
7575
PendingTasksCriticalCount: dynamicconfig.GetIntPropertyFn(1000),

service/history/timerQueueActiveTaskExecutor_test.go

-2
Original file line numberDiff line numberDiff line change
@@ -1474,7 +1474,6 @@ func (s *timerQueueActiveTaskExecutorSuite) newTaskExecutable(
14741474
return queues.NewExecutable(
14751475
queues.DefaultReaderId,
14761476
task,
1477-
nil,
14781477
s.timerQueueActiveTaskExecutor,
14791478
nil,
14801479
nil,
@@ -1484,6 +1483,5 @@ func (s *timerQueueActiveTaskExecutorSuite) newTaskExecutable(
14841483
nil,
14851484
metrics.NoopMetricsHandler,
14861485
nil,
1487-
nil,
14881486
)
14891487
}

0 commit comments

Comments
 (0)