Skip to content

Commit eb88af4

Browse files
Add random delay to ArchiveExecutionTask (#3565)
1 parent 72aa9bd commit eb88af4

File tree

6 files changed

+183
-91
lines changed

6 files changed

+183
-91
lines changed

common/backoff/jitter.go

+3
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ func JitInt64(input int64, coefficient float64) int64 {
4343
if input == 0 {
4444
return 0
4545
}
46+
if coefficient == 0 {
47+
return input
48+
}
4649

4750
base := int64(float64(input) * (1 - coefficient))
4851
addon := rand.Int63n(2 * (input - base))

common/dynamicconfig/constants.go

+2
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,8 @@ const (
550550
// ArchivalProcessorPollBackoffInterval is the poll backoff interval if task redispatcher's size exceeds limit for
551551
// archivalQueueProcessor
552552
ArchivalProcessorPollBackoffInterval = "history.archivalProcessorPollBackoffInterval"
553+
// ArchivalProcessorArchiveDelay is the delay before archivalQueueProcessor starts to process archival tasks
554+
ArchivalProcessorArchiveDelay = "history.archivalProcessorArchiveDelay"
553555

554556
// ReplicatorTaskBatchSize is batch size for ReplicatorProcessor
555557
ReplicatorTaskBatchSize = "history.replicatorTaskBatchSize"

service/history/configs/config.go

+2
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ type Config struct {
303303
ArchivalProcessorMaxPollIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
304304
ArchivalProcessorUpdateAckInterval dynamicconfig.DurationPropertyFn
305305
ArchivalProcessorUpdateAckIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
306+
ArchivalProcessorArchiveDelay dynamicconfig.DurationPropertyFn
306307
}
307308

308309
const (
@@ -541,6 +542,7 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int32, isAdvancedVis
541542
ArchivalProcessorUpdateAckIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.
542543
ArchivalProcessorUpdateAckIntervalJitterCoefficient, 0.15),
543544
ArchivalProcessorPollBackoffInterval: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorPollBackoffInterval, 5*time.Second),
545+
ArchivalProcessorArchiveDelay: dc.GetDurationProperty(dynamicconfig.ArchivalProcessorArchiveDelay, 5*time.Minute),
544546
}
545547

546548
return cfg

service/history/tasks/category.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ var (
9494

9595
CategoryArchival = Category{
9696
id: CategoryIDArchival,
97-
cType: CategoryTypeImmediate,
97+
cType: CategoryTypeScheduled,
9898
name: CategoryNameArchival,
9999
}
100100
)

service/history/workflow/task_generator.go

+12-3
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ func (r *TaskGeneratorImpl) GenerateWorkflowStartTasks(
142142
return nil
143143
}
144144

145+
var archivalDelayJitterCoefficient = 1.0
146+
145147
func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
146148
closeEvent *historypb.HistoryEvent,
147149
deleteAfterClose bool,
@@ -198,10 +200,17 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
198200
},
199201
)
200202
if r.config.DurableArchivalEnabled() {
203+
delay := backoff.JitDuration(r.config.ArchivalProcessorArchiveDelay(), archivalDelayJitterCoefficient) / 2
204+
if delay > retention {
205+
delay = retention
206+
}
207+
208+
archiveTime := closeEvent.GetEventTime().Add(delay)
201209
closeTasks = append(closeTasks, &tasks.ArchiveExecutionTask{
202-
// TaskID and VisibilityTimestamp are set by the shard
203-
WorkflowKey: r.mutableState.GetWorkflowKey(),
204-
Version: currentVersion,
210+
// TaskID is set by the shard
211+
WorkflowKey: r.mutableState.GetWorkflowKey(),
212+
VisibilityTimestamp: archiveTime,
213+
Version: currentVersion,
205214
})
206215
} else {
207216
closeTime := timestamp.TimeValue(closeEvent.GetEventTime())

service/history/workflow/task_generator_test.go

+163-87
Original file line numberDiff line numberDiff line change
@@ -59,66 +59,130 @@ import (
5959

6060
"go.temporal.io/server/api/persistence/v1"
6161
"go.temporal.io/server/common/definition"
62+
"go.temporal.io/server/common/log"
6263
"go.temporal.io/server/common/namespace"
6364
"go.temporal.io/server/common/primitives/timestamp"
6465
"go.temporal.io/server/service/history/configs"
6566
"go.temporal.io/server/service/history/tasks"
6667
"go.temporal.io/server/service/history/tests"
6768
)
6869

70+
type testConfig struct {
71+
Name string
72+
ConfigFn func(config *testParams)
73+
}
74+
75+
type testParams struct {
76+
DurableArchivalEnabled bool
77+
DeleteAfterClose bool
78+
CloseEventTime time.Time
79+
Retention time.Duration
80+
Logger *log.MockLogger
81+
ArchivalProcessorArchiveDelay time.Duration
82+
83+
ExpectCloseExecutionVisibilityTask bool
84+
ExpectArchiveExecutionTask bool
85+
ExpectDeleteHistoryEventTask bool
86+
ExpectedArchiveExecutionTaskVisibilityTimestamp time.Time
87+
}
88+
6989
func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
70-
for _, c := range []struct {
71-
Name string
72-
DurableArchivalEnabled bool
73-
DeleteAfterClose bool
74-
ExpectCloseExecutionVisibilityTask bool
75-
ExpectArchiveExecutionTask bool
76-
ExpectDeleteHistoryEventTask bool
77-
}{
90+
// we need to set the jitter coefficient to 0 to remove the randomness in the test
91+
archivalDelayJitterCoefficient = 0.0
92+
for _, c := range []testConfig{
93+
{
94+
Name: "delete after retention",
95+
ConfigFn: func(p *testParams) {
96+
p.ExpectCloseExecutionVisibilityTask = true
97+
p.ExpectDeleteHistoryEventTask = true
98+
},
99+
},
78100
{
79-
Name: "Delete after retention",
80-
DurableArchivalEnabled: false,
81-
DeleteAfterClose: false,
101+
Name: "use archival queue",
102+
ConfigFn: func(p *testParams) {
103+
p.DurableArchivalEnabled = true
82104

83-
ExpectCloseExecutionVisibilityTask: true,
84-
ExpectDeleteHistoryEventTask: true,
85-
ExpectArchiveExecutionTask: false,
105+
p.ExpectCloseExecutionVisibilityTask = true
106+
p.ExpectArchiveExecutionTask = true
107+
},
86108
},
87109
{
88-
Name: "Use archival queue",
89-
DurableArchivalEnabled: true,
90-
DeleteAfterClose: false,
110+
Name: "delete after close",
111+
ConfigFn: func(p *testParams) {
112+
p.DurableArchivalEnabled = true
91113

92-
ExpectCloseExecutionVisibilityTask: true,
93-
ExpectDeleteHistoryEventTask: false,
94-
ExpectArchiveExecutionTask: true,
114+
p.ExpectCloseExecutionVisibilityTask = true
115+
p.ExpectArchiveExecutionTask = true
116+
},
95117
},
96118
{
97-
Name: "DeleteAfterClose",
98-
DurableArchivalEnabled: false,
99-
DeleteAfterClose: true,
119+
Name: "delete after close ignores durable execution flag",
120+
ConfigFn: func(p *testParams) {
121+
p.DurableArchivalEnabled = true
122+
p.DeleteAfterClose = true
123+
},
124+
},
125+
{
126+
Name: "delay is zero",
127+
ConfigFn: func(p *testParams) {
128+
p.DurableArchivalEnabled = true
129+
p.CloseEventTime = time.Unix(0, 0)
130+
p.Retention = 24 * time.Hour
131+
p.ArchivalProcessorArchiveDelay = 0
100132

101-
ExpectCloseExecutionVisibilityTask: false,
102-
ExpectDeleteHistoryEventTask: false,
103-
ExpectArchiveExecutionTask: false,
133+
p.ExpectedArchiveExecutionTaskVisibilityTimestamp = time.Unix(0, 0)
134+
p.ExpectCloseExecutionVisibilityTask = true
135+
p.ExpectArchiveExecutionTask = true
136+
},
104137
},
105138
{
106-
Name: "DeleteAfterClose ignores durable execution flag",
107-
DurableArchivalEnabled: true,
108-
DeleteAfterClose: true,
139+
Name: "delay exceeds retention",
140+
ConfigFn: func(p *testParams) {
141+
p.DurableArchivalEnabled = true
142+
p.CloseEventTime = time.Unix(0, 0)
143+
p.Retention = 24 * time.Hour
144+
p.ArchivalProcessorArchiveDelay = 48*time.Hour + time.Second
109145

110-
ExpectCloseExecutionVisibilityTask: false,
111-
ExpectDeleteHistoryEventTask: false,
112-
ExpectArchiveExecutionTask: false,
146+
p.ExpectedArchiveExecutionTaskVisibilityTimestamp = time.Unix(0, 0).Add(24 * time.Hour)
147+
p.ExpectCloseExecutionVisibilityTask = true
148+
p.ExpectArchiveExecutionTask = true
149+
},
150+
},
151+
{
152+
Name: "delay is less than retention",
153+
ConfigFn: func(p *testParams) {
154+
p.DurableArchivalEnabled = true
155+
p.CloseEventTime = time.Unix(0, 0)
156+
p.Retention = 24 * time.Hour
157+
p.ArchivalProcessorArchiveDelay = 12 * time.Hour
158+
159+
p.ExpectedArchiveExecutionTaskVisibilityTimestamp = time.Unix(0, 0).Add(6 * time.Hour)
160+
p.ExpectCloseExecutionVisibilityTask = true
161+
p.ExpectArchiveExecutionTask = true
162+
},
113163
},
114164
} {
115165
c := c
116166
t.Run(c.Name, func(t *testing.T) {
117-
t.Parallel()
167+
// t.Parallel()
168+
now := time.Unix(0, 0).UTC()
118169
ctrl := gomock.NewController(t)
170+
mockLogger := log.NewMockLogger(ctrl)
171+
p := testParams{
172+
DurableArchivalEnabled: false,
173+
DeleteAfterClose: false,
174+
CloseEventTime: now,
175+
Retention: time.Hour * 24 * 7,
176+
Logger: mockLogger,
177+
178+
ExpectCloseExecutionVisibilityTask: false,
179+
ExpectArchiveExecutionTask: false,
180+
ExpectDeleteHistoryEventTask: false,
181+
ExpectedArchiveExecutionTaskVisibilityTimestamp: now,
182+
}
183+
c.ConfigFn(&p)
119184
namespaceRegistry := namespace.NewMockRegistry(ctrl)
120-
retention := 24 * time.Hour
121-
namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(&retention))
185+
namespaceEntry := tests.GlobalNamespaceEntry.Clone(namespace.WithRetention(&p.Retention))
122186
namespaceRegistry.EXPECT().GetNamespaceID(gomock.Any()).Return(namespaceEntry.ID(), nil).AnyTimes()
123187
namespaceRegistry.EXPECT().GetNamespaceByID(namespaceEntry.ID()).Return(namespaceEntry, nil).AnyTimes()
124188

@@ -131,71 +195,83 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
131195
namespaceEntry.ID().String(), tests.WorkflowID, tests.RunID,
132196
)).AnyTimes()
133197
mutableState.EXPECT().GetCurrentBranchToken().Return(nil, nil)
134-
taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, &configs.Config{
198+
retentionTimerDelay := time.Second
199+
cfg := &configs.Config{
135200
DurableArchivalEnabled: func() bool {
136-
return c.DurableArchivalEnabled
201+
return p.DurableArchivalEnabled
137202
},
138203
RetentionTimerJitterDuration: func() time.Duration {
139-
return time.Second
204+
return retentionTimerDelay
140205
},
141-
})
142-
206+
ArchivalProcessorArchiveDelay: func() time.Duration {
207+
return p.ArchivalProcessorArchiveDelay
208+
},
209+
}
143210
closeTime := time.Unix(0, 0)
144-
211+
var allTasks []tasks.Task
145212
mutableState.EXPECT().AddTasks(gomock.Any()).Do(func(ts ...tasks.Task) {
146-
var (
147-
closeExecutionTask *tasks.CloseExecutionTask
148-
deleteHistoryEventTask *tasks.DeleteHistoryEventTask
149-
closeExecutionVisibilityTask *tasks.CloseExecutionVisibilityTask
150-
archiveExecutionTask *tasks.ArchiveExecutionTask
151-
)
152-
for _, task := range ts {
153-
switch t := task.(type) {
154-
case *tasks.CloseExecutionTask:
155-
closeExecutionTask = t
156-
case *tasks.DeleteHistoryEventTask:
157-
deleteHistoryEventTask = t
158-
case *tasks.CloseExecutionVisibilityTask:
159-
closeExecutionVisibilityTask = t
160-
case *tasks.ArchiveExecutionTask:
161-
archiveExecutionTask = t
162-
}
163-
}
164-
require.NotNil(t, closeExecutionTask)
165-
assert.Equal(t, c.DeleteAfterClose, closeExecutionTask.DeleteAfterClose)
166-
167-
if c.ExpectCloseExecutionVisibilityTask {
168-
assert.NotNil(t, closeExecutionVisibilityTask)
169-
} else {
170-
assert.Nil(t, closeExecutionVisibilityTask)
171-
}
172-
if c.ExpectArchiveExecutionTask {
173-
require.NotNil(t, archiveExecutionTask)
174-
assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String())
175-
assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID)
176-
assert.Equal(t, archiveExecutionTask.RunID, tests.RunID)
177-
} else {
178-
assert.Nil(t, archiveExecutionTask)
179-
}
180-
if c.ExpectDeleteHistoryEventTask {
181-
require.NotNil(t, deleteHistoryEventTask)
182-
assert.Equal(t, deleteHistoryEventTask.NamespaceID, namespaceEntry.ID().String())
183-
assert.Equal(t, deleteHistoryEventTask.WorkflowID, tests.WorkflowID)
184-
assert.Equal(t, deleteHistoryEventTask.RunID, tests.RunID)
185-
assert.True(t, deleteHistoryEventTask.VisibilityTimestamp.After(closeTime.Add(retention)))
186-
assert.True(t, deleteHistoryEventTask.VisibilityTimestamp.Before(closeTime.Add(retention).Add(time.Second*2)))
187-
} else {
188-
assert.Nil(t, deleteHistoryEventTask)
189-
}
213+
allTasks = append(allTasks, ts...)
190214
})
191215

216+
taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg)
192217
err := taskGenerator.GenerateWorkflowCloseTasks(&historypb.HistoryEvent{
193218
Attributes: &historypb.HistoryEvent_WorkflowExecutionCompletedEventAttributes{
194219
WorkflowExecutionCompletedEventAttributes: &historypb.WorkflowExecutionCompletedEventAttributes{},
195220
},
196-
EventTime: timestamp.TimePtr(closeTime),
197-
}, c.DeleteAfterClose)
221+
EventTime: timestamp.TimePtr(p.CloseEventTime),
222+
}, p.DeleteAfterClose)
198223
require.NoError(t, err)
224+
225+
var (
226+
closeExecutionTask *tasks.CloseExecutionTask
227+
deleteHistoryEventTask *tasks.DeleteHistoryEventTask
228+
closeExecutionVisibilityTask *tasks.CloseExecutionVisibilityTask
229+
archiveExecutionTask *tasks.ArchiveExecutionTask
230+
)
231+
for _, task := range allTasks {
232+
switch t := task.(type) {
233+
case *tasks.CloseExecutionTask:
234+
closeExecutionTask = t
235+
case *tasks.DeleteHistoryEventTask:
236+
deleteHistoryEventTask = t
237+
case *tasks.CloseExecutionVisibilityTask:
238+
closeExecutionVisibilityTask = t
239+
case *tasks.ArchiveExecutionTask:
240+
archiveExecutionTask = t
241+
}
242+
}
243+
require.NotNil(t, closeExecutionTask)
244+
assert.Equal(t, p.DeleteAfterClose, closeExecutionTask.DeleteAfterClose)
245+
246+
if p.ExpectCloseExecutionVisibilityTask {
247+
assert.NotNil(t, closeExecutionVisibilityTask)
248+
} else {
249+
assert.Nil(t, closeExecutionVisibilityTask)
250+
}
251+
if p.ExpectArchiveExecutionTask {
252+
require.NotNil(t, archiveExecutionTask)
253+
assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String())
254+
assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID)
255+
assert.Equal(t, archiveExecutionTask.RunID, tests.RunID)
256+
assert.Equal(
257+
t,
258+
p.ExpectedArchiveExecutionTaskVisibilityTimestamp,
259+
archiveExecutionTask.VisibilityTimestamp,
260+
)
261+
} else {
262+
assert.Nil(t, archiveExecutionTask)
263+
}
264+
if p.ExpectDeleteHistoryEventTask {
265+
require.NotNil(t, deleteHistoryEventTask)
266+
assert.Equal(t, deleteHistoryEventTask.NamespaceID, namespaceEntry.ID().String())
267+
assert.Equal(t, deleteHistoryEventTask.WorkflowID, tests.WorkflowID)
268+
assert.Equal(t, deleteHistoryEventTask.RunID, tests.RunID)
269+
assert.GreaterOrEqual(t, deleteHistoryEventTask.VisibilityTimestamp, closeTime.Add(p.Retention))
270+
assert.LessOrEqual(t, deleteHistoryEventTask.VisibilityTimestamp,
271+
closeTime.Add(p.Retention).Add(retentionTimerDelay*2))
272+
} else {
273+
assert.Nil(t, deleteHistoryEventTask)
274+
}
199275
})
200276
}
201277
}

0 commit comments

Comments
 (0)