Skip to content

Commit c24c83f

Browse files
yux0dnr
andauthored
Use generics in jitter (#3717)
* Use generics in jitter Co-authored-by: David Reiss <dnr@dnr.im>
1 parent efb3bc1 commit c24c83f

16 files changed

+68
-67
lines changed

common/backoff/jitter.go

+9-22
Original file line numberDiff line numberDiff line change
@@ -26,39 +26,26 @@ package backoff
2626

2727
import (
2828
"math/rand"
29-
"time"
3029
)
3130

32-
// JitDuration return random duration from (1-coefficient)*duration to (1+coefficient)*duration, inclusive, exclusive
33-
func JitDuration(duration time.Duration, coefficient float64) time.Duration {
34-
validateCoefficient(coefficient)
31+
const fullCoefficient float64 = 1
3532

36-
return time.Duration(JitInt64(duration.Nanoseconds(), coefficient))
33+
// FullJitter return random number from 0 to input, inclusive, exclusive
34+
func FullJitter[T ~int64 | ~int | ~int32 | ~float64 | ~float32](input T) T {
35+
return Jitter(input, fullCoefficient) / 2
3736
}
3837

39-
// JitInt64 return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive
40-
func JitInt64(input int64, coefficient float64) int64 {
38+
// Jitter return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive
39+
func Jitter[T ~int64 | ~int | ~int32 | ~float64 | ~float32](input T, coefficient float64) T {
4140
validateCoefficient(coefficient)
4241

43-
if input == 0 {
44-
return 0
45-
}
4642
if coefficient == 0 {
4743
return input
4844
}
4945

50-
base := int64(float64(input) * (1 - coefficient))
51-
addon := rand.Int63n(2 * (input - base))
52-
return base + addon
53-
}
54-
55-
// JitFloat64 return random number from (1-coefficient)*input to (1+coefficient)*input, inclusive, exclusive
56-
func JitFloat64(input float64, coefficient float64) float64 {
57-
validateCoefficient(coefficient)
58-
59-
base := input * (1 - coefficient)
60-
addon := rand.Float64() * 2 * (input - base)
61-
return base + addon
46+
base := float64(input) * (1 - coefficient)
47+
addon := rand.Float64() * 2 * (float64(input) - base)
48+
return T(base + addon)
6249
}
6350

6451
func validateCoefficient(coefficient float64) {

common/backoff/jitter_test.go

+31-10
Original file line numberDiff line numberDiff line change
@@ -46,47 +46,68 @@ func TestJitterSuite(t *testing.T) {
4646
func (s *jitterSuite) SetupSuite() {
4747
}
4848

49-
func (s *jitterSuite) TestJitInt64() {
49+
func (s *jitterSuite) TestJitter_Int64() {
5050
input := int64(1048576)
5151
coefficient := float64(0.25)
5252
lowerBound := int64(float64(input) * (1 - coefficient))
5353
upperBound := int64(float64(input) * (1 + coefficient))
54+
fullJitterUpperBound := int64(float64(input) * 2)
5455

5556
for i := 0; i < 1048576; i++ {
56-
result := JitInt64(input, coefficient)
57+
result := Jitter(input, coefficient)
5758
s.True(result >= lowerBound)
5859
s.True(result < upperBound)
60+
61+
result = FullJitter(input)
62+
s.True(result >= 0)
63+
s.True(result < fullJitterUpperBound)
5964
}
6065
}
6166

62-
func (s *jitterSuite) TestJitFloat64() {
67+
func (s *jitterSuite) TestJitter_Float64() {
6368
input := float64(1048576.1048576)
6469
coefficient := float64(0.16)
6570
lowerBound := float64(input) * (1 - coefficient)
6671
upperBound := float64(input) * (1 + coefficient)
72+
fullJitterUpperBound := float64(input) * 2
6773

6874
for i := 0; i < 1048576; i++ {
69-
result := JitFloat64(input, coefficient)
75+
result := Jitter(input, coefficient)
7076
s.True(result >= lowerBound)
7177
s.True(result < upperBound)
78+
79+
result = FullJitter(input)
80+
s.True(result >= 0)
81+
s.True(result < fullJitterUpperBound)
7282
}
7383
}
7484

75-
func (s *jitterSuite) TestJitDuration() {
85+
func (s *jitterSuite) TestJitter_Duration() {
7686
input := time.Duration(1099511627776)
7787
coefficient := float64(0.1)
7888
lowerBound := time.Duration(int64(float64(input.Nanoseconds()) * (1 - coefficient)))
7989
upperBound := time.Duration(int64(float64(input.Nanoseconds()) * (1 + coefficient)))
90+
fullJitterUpperBound := time.Duration(int64(float64(input.Nanoseconds()) * 2))
8091

8192
for i := 0; i < 1048576; i++ {
82-
result := JitDuration(input, coefficient)
93+
result := Jitter(input, coefficient)
8394
s.True(result >= lowerBound)
8495
s.True(result < upperBound)
96+
97+
result = FullJitter(input)
98+
s.True(result >= 0)
99+
s.True(result < fullJitterUpperBound)
85100
}
86101
}
87102

88-
func (s *jitterSuite) TestJit_InputZeroValue() {
89-
s.Zero(JitDuration(0, rand.Float64()))
90-
s.Zero(JitInt64(0, rand.Float64()))
91-
s.Zero(JitFloat64(0, rand.Float64()))
103+
func (s *jitterSuite) TestJitter_InputZeroValue() {
104+
s.Zero(Jitter(time.Duration(0), rand.Float64()))
105+
s.Zero(Jitter(int64(0), rand.Float64()))
106+
s.Zero(Jitter(float64(0), rand.Float64()))
107+
}
108+
109+
func (s *jitterSuite) TestJitter_CoeffientZeroValue() {
110+
s.Equal(time.Duration(1), Jitter(time.Duration(1), 0))
111+
s.Equal(int64(1), Jitter(int64(1), 0))
112+
s.Equal(float64(1), Jitter(float64(1), 0))
92113
}

common/tasks/fifo_scheduler.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (f *FIFOScheduler[T]) TrySubmit(task T) bool {
145145
func (f *FIFOScheduler[T]) workerMonitor() {
146146
defer f.shutdownWG.Done()
147147

148-
timer := time.NewTimer(backoff.JitDuration(defaultMonitorTickerDuration, defaultMonitorTickerJitter))
148+
timer := time.NewTimer(backoff.Jitter(defaultMonitorTickerDuration, defaultMonitorTickerJitter))
149149
defer timer.Stop()
150150

151151
for {
@@ -154,7 +154,7 @@ func (f *FIFOScheduler[T]) workerMonitor() {
154154
f.stopWorkers(len(f.workerShutdownCh))
155155
return
156156
case <-timer.C:
157-
timer.Reset(backoff.JitDuration(defaultMonitorTickerDuration, defaultMonitorTickerJitter))
157+
timer.Reset(backoff.Jitter(defaultMonitorTickerDuration, defaultMonitorTickerJitter))
158158

159159
targetWorkerNum := f.options.WorkerCount()
160160
currentWorkerNum := len(f.workerShutdownCh)

common/tasks/interleaved_weighted_round_robin.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ func (s *InterleavedWeightedRoundRobinScheduler[T, K]) channels() iwrrChannels[T
312312

313313
func (s *InterleavedWeightedRoundRobinScheduler[T, K]) setupDispatchTimer() {
314314
throttleDuration := iwrrMinDispatchThrottleDuration +
315-
backoff.JitDuration(s.options.MaxDispatchThrottleDuration-iwrrMinDispatchThrottleDuration, 1)/2
315+
backoff.FullJitter(s.options.MaxDispatchThrottleDuration-iwrrMinDispatchThrottleDuration)
316316

317317
s.dispatchTimerLock.Lock()
318318
defer s.dispatchTimerLock.Unlock()

service/history/queues/queue_base.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ func (p *queueBase) Start() {
257257
p.rescheduler.Start()
258258
p.readerGroup.Start()
259259

260-
p.checkpointTimer = time.NewTimer(backoff.JitDuration(
260+
p.checkpointTimer = time.NewTimer(backoff.Jitter(
261261
p.options.CheckpointInterval(),
262262
p.options.CheckpointIntervalJitterCoefficient(),
263263
))
@@ -426,7 +426,7 @@ func (p *queueBase) resetCheckpointTimer(checkPointErr error) {
426426
}
427427

428428
p.checkpointRetrier.Reset()
429-
p.checkpointTimer.Reset(backoff.JitDuration(
429+
p.checkpointTimer.Reset(backoff.Jitter(
430430
p.options.CheckpointInterval(),
431431
p.options.CheckpointIntervalJitterCoefficient(),
432432
))

service/history/queues/queue_immediate.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func (p *immediateQueue) NotifyNewTasks(tasks []tasks.Task) {
148148
func (p *immediateQueue) processEventLoop() {
149149
defer p.shutdownWG.Done()
150150

151-
pollTimer := time.NewTimer(backoff.JitDuration(
151+
pollTimer := time.NewTimer(backoff.Jitter(
152152
p.options.MaxPollInterval(),
153153
p.options.MaxPollIntervalJitterCoefficient(),
154154
))
@@ -177,7 +177,7 @@ func (p *immediateQueue) processPollTimer(pollTimer *time.Timer) {
177177
p.logger.Error("Unable to process new range", tag.Error(err))
178178
}
179179

180-
pollTimer.Reset(backoff.JitDuration(
180+
pollTimer.Reset(backoff.Jitter(
181181
p.options.MaxPollInterval(),
182182
p.options.MaxPollIntervalJitterCoefficient(),
183183
))

service/history/queues/queue_scheduled.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ func (p *scheduledQueue) processNewRange() {
228228
// in which case no look ahead is needed.
229229
// Notification will be sent when shard is reacquired, but
230230
// still set a max poll timer here as a catch all case.
231-
p.timerGate.Update(p.timeSource.Now().Add(backoff.JitDuration(
231+
p.timerGate.Update(p.timeSource.Now().Add(backoff.Jitter(
232232
p.options.MaxPollInterval(),
233233
p.options.MaxPollIntervalJitterCoefficient(),
234234
)))
@@ -253,7 +253,7 @@ func (p *scheduledQueue) lookAheadTask() {
253253
}
254254

255255
lookAheadMinTime := p.nonReadableScope.Range.InclusiveMin.FireTime
256-
lookAheadMaxTime := lookAheadMinTime.Add(backoff.JitDuration(
256+
lookAheadMaxTime := lookAheadMinTime.Add(backoff.Jitter(
257257
p.options.MaxPollInterval(),
258258
p.options.MaxPollIntervalJitterCoefficient(),
259259
))

service/history/queues/rescheduler.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ func (r *reschedulerImpl) Len() int {
198198
func (r *reschedulerImpl) rescheduleLoop() {
199199
defer r.shutdownWG.Done()
200200

201-
cleanupTimer := time.NewTimer(backoff.JitDuration(
201+
cleanupTimer := time.NewTimer(backoff.Jitter(
202202
reschedulerPQCleanupDuration,
203203
reschedulerPQCleanupJitterCoefficient,
204204
))
@@ -213,7 +213,7 @@ func (r *reschedulerImpl) rescheduleLoop() {
213213
r.reschedule()
214214
case <-cleanupTimer.C:
215215
r.cleanupPQ()
216-
cleanupTimer.Reset(backoff.JitDuration(
216+
cleanupTimer.Reset(backoff.Jitter(
217217
reschedulerPQCleanupDuration,
218218
reschedulerPQCleanupJitterCoefficient,
219219
))

service/history/replication/ack_manager.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -368,7 +368,7 @@ func (p *ackMgrImpl) taskIDsRange(
368368

369369
now := p.shard.GetTimeSource().Now()
370370
if p.sanityCheckTime.IsZero() || p.sanityCheckTime.Before(now) {
371-
p.sanityCheckTime = now.Add(backoff.JitDuration(
371+
p.sanityCheckTime = now.Add(backoff.Jitter(
372372
p.config.ReplicatorProcessorMaxPollInterval(),
373373
p.config.ReplicatorProcessorMaxPollIntervalJitterCoefficient(),
374374
))

service/history/replication/task_fetcher.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
"sync/atomic"
3232
"time"
3333

34+
"golang.org/x/exp/maps"
35+
3436
"go.temporal.io/server/api/adminservice/v1"
3537
replicationspb "go.temporal.io/server/api/replication/v1"
3638
"go.temporal.io/server/client"
@@ -43,7 +45,6 @@ import (
4345
"go.temporal.io/server/common/quotas"
4446
"go.temporal.io/server/common/rpc"
4547
"go.temporal.io/server/service/history/configs"
46-
"golang.org/x/exp/maps"
4748
)
4849

4950
const (
@@ -356,7 +357,7 @@ func (f *replicationTaskFetcherWorker) Stop() {
356357

357358
// fetchTasks collects getReplicationTasks request from shards and send out aggregated request to source frontend.
358359
func (f *replicationTaskFetcherWorker) fetchTasks() {
359-
timer := time.NewTimer(backoff.JitDuration(
360+
timer := time.NewTimer(backoff.Jitter(
360361
f.config.ReplicationTaskFetcherAggregationInterval(),
361362
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
362363
))
@@ -371,12 +372,12 @@ func (f *replicationTaskFetcherWorker) fetchTasks() {
371372
// When timer fires, we collect all the requests we have so far and attempt to send them to remote.
372373
err := f.getMessages()
373374
if err != nil {
374-
timer.Reset(backoff.JitDuration(
375+
timer.Reset(backoff.Jitter(
375376
f.config.ReplicationTaskFetcherErrorRetryWait(),
376377
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
377378
))
378379
} else {
379-
timer.Reset(backoff.JitDuration(
380+
timer.Reset(backoff.Jitter(
380381
f.config.ReplicationTaskFetcherAggregationInterval(),
381382
f.config.ReplicationTaskFetcherTimerJitterCoefficient(),
382383
))

service/history/replication/task_processor.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func (p *taskProcessorImpl) Stop() {
191191
}
192192

193193
func (p *taskProcessorImpl) eventLoop() {
194-
syncShardTimer := time.NewTimer(backoff.JitDuration(
194+
syncShardTimer := time.NewTimer(backoff.Jitter(
195195
p.config.ShardSyncMinInterval(),
196196
p.config.ShardSyncTimerJitterCoefficient(),
197197
))
@@ -212,7 +212,7 @@ func (p *taskProcessorImpl) eventLoop() {
212212
1,
213213
metrics.OperationTag(metrics.HistorySyncShardStatusScope))
214214
}
215-
syncShardTimer.Reset(backoff.JitDuration(
215+
syncShardTimer.Reset(backoff.Jitter(
216216
p.config.ShardSyncMinInterval(),
217217
p.config.ShardSyncTimerJitterCoefficient(),
218218
))

service/history/replication/task_processor_manager.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ func (r *taskProcessorManagerImpl) handleClusterMetadataUpdate(
213213

214214
func (r *taskProcessorManagerImpl) completeReplicationTaskLoop() {
215215
shardID := r.shard.GetShardID()
216-
cleanupTimer := time.NewTimer(backoff.JitDuration(
216+
cleanupTimer := time.NewTimer(backoff.Jitter(
217217
r.config.ReplicationTaskProcessorCleanupInterval(shardID),
218218
r.config.ReplicationTaskProcessorCleanupJitterCoefficient(shardID),
219219
))
@@ -228,7 +228,7 @@ func (r *taskProcessorManagerImpl) completeReplicationTaskLoop() {
228228
metrics.OperationTag(metrics.ReplicationTaskCleanupScope),
229229
)
230230
}
231-
cleanupTimer.Reset(backoff.JitDuration(
231+
cleanupTimer.Reset(backoff.Jitter(
232232
r.config.ReplicationTaskProcessorCleanupInterval(shardID),
233233
r.config.ReplicationTaskProcessorCleanupJitterCoefficient(shardID),
234234
))

service/history/workflow/task_generator.go

+3-6
Original file line numberDiff line numberDiff line change
@@ -148,10 +148,6 @@ func (r *TaskGeneratorImpl) GenerateWorkflowStartTasks(
148148
return nil
149149
}
150150

151-
// archivalDelayJitterCoefficient is a variable because we need to override it to 0 in unit tests to make them
152-
// deterministic.
153-
var archivalDelayJitterCoefficient = 1.0
154-
155151
func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
156152
closeEvent *historypb.HistoryEvent,
157153
deleteAfterClose bool,
@@ -199,7 +195,8 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
199195
}
200196
// We schedule the archival task for a random time in the near future to avoid sending a surge of tasks
201197
// to the archival system at the same time
202-
delay := backoff.JitDuration(r.config.ArchivalProcessorArchiveDelay(), archivalDelayJitterCoefficient) / 2
198+
199+
delay := backoff.FullJitter(r.config.ArchivalProcessorArchiveDelay())
203200
if delay > retention {
204201
delay = retention
205202
}
@@ -262,7 +259,7 @@ func (r *TaskGeneratorImpl) GenerateDeleteHistoryEventTask(closeTime time.Time,
262259
return err
263260
}
264261

265-
retentionJitterDuration := backoff.JitDuration(r.config.RetentionTimerJitterDuration(), 1) / 2
262+
retentionJitterDuration := backoff.FullJitter(r.config.RetentionTimerJitterDuration())
266263
deleteTime := closeTime.Add(retention).Add(retentionJitterDuration)
267264
r.mutableState.AddTasks(&tasks.DeleteHistoryEventTask{
268265
// TaskID is set by shard

service/history/workflow/task_generator_test.go

+2-7
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,6 @@ type testParams struct {
8787
}
8888

8989
func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
90-
// we need to set the jitter coefficient to 0 to remove the randomness in the test
91-
archivalDelayJitterCoefficient = 0.0
9290
for _, c := range []testConfig{
9391
{
9492
Name: "delete after retention",
@@ -258,11 +256,8 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
258256
assert.Equal(t, archiveExecutionTask.NamespaceID, namespaceEntry.ID().String())
259257
assert.Equal(t, archiveExecutionTask.WorkflowID, tests.WorkflowID)
260258
assert.Equal(t, archiveExecutionTask.RunID, tests.RunID)
261-
assert.Equal(
262-
t,
263-
p.ExpectedArchiveExecutionTaskVisibilityTimestamp,
264-
archiveExecutionTask.VisibilityTimestamp,
265-
)
259+
assert.True(t, p.ExpectedArchiveExecutionTaskVisibilityTimestamp.Equal(archiveExecutionTask.VisibilityTimestamp) ||
260+
p.ExpectedArchiveExecutionTaskVisibilityTimestamp.After(archiveExecutionTask.VisibilityTimestamp))
266261
} else {
267262
assert.Nil(t, archiveExecutionTask)
268263
}

service/worker/replicator/namespace_replication_message_processor.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ func (p *namespaceReplicationMessageProcessor) Stop() {
225225
}
226226

227227
func getWaitDuration() time.Duration {
228-
return backoff.JitDuration(time.Duration(pollIntervalSecs)*time.Second, pollTimerJitterCoefficient)
228+
return backoff.Jitter(time.Duration(pollIntervalSecs)*time.Second, pollTimerJitterCoefficient)
229229
}
230230

231231
func isTransientRetryableError(err error) bool {

service/worker/scanner/executions/task.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func newTask(
106106

107107
// Run runs the task
108108
func (t *task) Run() executor.TaskStatus {
109-
time.Sleep(backoff.JitDuration(
109+
time.Sleep(backoff.Jitter(
110110
taskStartupDelayRatio*time.Duration(t.scavenger.numHistoryShards),
111111
taskStartupDelayRandomizationRatio,
112112
))

0 commit comments

Comments
 (0)