Skip to content

Commit 7e89af4

Browse files
authored
Add explicit per-namespace rate limit to schedules (#3838)
1 parent 5c23836 commit 7e89af4

File tree

12 files changed

+316
-123
lines changed

12 files changed

+316
-123
lines changed

api/schedule/v1/message.pb.go

+123-75
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/dynamicconfig/constants.go

+2
Original file line numberDiff line numberDiff line change
@@ -720,4 +720,6 @@ const (
720720
// WorkerStickyCacheSize controls the sticky cache size for SDK workers on worker nodes
721721
// (shared between all workers in the process, cannot be changed after startup)
722722
WorkerStickyCacheSize = "worker.stickyCacheSize"
723+
// SchedulerNamespaceStartWorkflowRPS is the per-namespace limit for starting workflows by schedules
724+
SchedulerNamespaceStartWorkflowRPS = "worker.schedulerNamespaceStartWorkflowRPS"
723725
)

proto/internal/temporal/server/api/schedule/v1/message.proto

+1
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ message StartWorkflowRequest {
104104
reserved 3;
105105
temporal.api.common.v1.Payloads last_completion_result = 4;
106106
temporal.api.failure.v1.Failure continued_failure = 5;
107+
bool completed_rate_limit_sleep = 6;
107108
}
108109

109110
message StartWorkflowResponse {

service/worker/batcher/fx.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (s *workerComponent) DedicatedWorkerOptions(ns *namespace.Namespace) *worke
9494
}
9595
}
9696

97-
func (s *workerComponent) Register(worker sdkworker.Worker, ns *namespace.Namespace) {
97+
func (s *workerComponent) Register(worker sdkworker.Worker, ns *namespace.Namespace, _ workercommon.RegistrationDetails) {
9898
worker.RegisterWorkflowWithOptions(BatchWorkflow, workflow.RegisterOptions{Name: BatchWFTypeName})
9999
worker.RegisterActivity(s.activities(ns.Name(), ns.ID()))
100100
}

service/worker/common/interface.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type (
5454
PerNSWorkerComponent interface {
5555
// Register registers Workflow and Activity types provided by this worker component.
5656
// The namespace that this worker is running in is also provided.
57-
Register(sdkworker.Worker, *namespace.Namespace)
57+
Register(sdkworker.Worker, *namespace.Namespace, RegistrationDetails)
5858
// DedicatedWorkerOptions returns a PerNSDedicatedWorkerOptions for this worker component.
5959
DedicatedWorkerOptions(*namespace.Namespace) *PerNSDedicatedWorkerOptions
6060
}
@@ -63,4 +63,14 @@ type (
6363
// Set this to false to disable this worker for this namespace
6464
Enabled bool
6565
}
66+
67+
RegistrationDetails struct {
68+
// TotalWorkers is the number of requested per-namespace workers for this namespace.
69+
TotalWorkers int
70+
// Multiplicity is the number of those workers that this particular sdkworker.Worker
71+
// represents. It may be more than one if the requested number is more than the total
72+
// number of worker nodes or if consistent hashing decided to place more than one on
73+
// the same node.
74+
Multiplicity int
75+
}
6676
)

service/worker/common/interface_mock.go

+4-4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/worker/pernamespaceworker.go

+14-9
Original file line numberDiff line numberDiff line change
@@ -246,24 +246,24 @@ func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) {
246246
delete(wm.workers, ns.ID())
247247
}
248248

249-
func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespace) (int, error) {
250-
workerCount := wm.config.PerNamespaceWorkerCount(ns.Name().String())
251-
// This can result in fewer than the intended number of workers if numWorkers > 1, because
249+
func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespace) (int, int, error) {
250+
totalWorkers := wm.config.PerNamespaceWorkerCount(ns.Name().String())
251+
// This can result in fewer than the intended number of workers if totalWorkers > 1, because
252252
// multiple lookups might land on the same node. To compensate, we increase the number of
253253
// pollers in that case, but it would be better to try to spread them across our nodes.
254254
// TODO: implement this properly using LookupN in serviceResolver
255255
multiplicity := 0
256-
for i := 0; i < workerCount; i++ {
256+
for i := 0; i < totalWorkers; i++ {
257257
key := fmt.Sprintf("%s/%d", ns.ID().String(), i)
258258
target, err := wm.serviceResolver.Lookup(key)
259259
if err != nil {
260-
return 0, err
260+
return 0, 0, err
261261
}
262262
if target.Identity() == wm.self.Identity() {
263263
multiplicity++
264264
}
265265
}
266-
return multiplicity, nil
266+
return multiplicity, totalWorkers, nil
267267
}
268268

269269
func (wm *perNamespaceWorkerManager) getWorkerOptions(ns *namespace.Namespace) sdkWorkerOptions {
@@ -379,7 +379,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
379379
}
380380

381381
// check if we are responsible for this namespace at all
382-
multiplicity, err := w.wm.getWorkerMultiplicity(ns)
382+
multiplicity, totalWorkers, err := w.wm.getWorkerMultiplicity(ns)
383383
if err != nil {
384384
w.logger.Error("Failed to look up hosts", tag.Error(err))
385385
// TODO: add metric also
@@ -410,7 +410,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
410410
// create new one. note that even before startWorker returns, the worker may have started
411411
// and already called the fatal error handler. we need to set w.client+worker+componentSet
412412
// before releasing the lock to keep our state consistent.
413-
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, dcOptions)
413+
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, totalWorkers, dcOptions)
414414
if err != nil {
415415
// TODO: add metric also
416416
return err
@@ -426,6 +426,7 @@ func (w *perNamespaceWorker) startWorker(
426426
ns *namespace.Namespace,
427427
components []workercommon.PerNSWorkerComponent,
428428
multiplicity int,
429+
totalWorkers int,
429430
dcOptions sdkWorkerOptions,
430431
) (sdkclient.Client, sdkworker.Worker, error) {
431432
nsName := ns.Name().String()
@@ -456,8 +457,12 @@ func (w *perNamespaceWorker) startWorker(
456457

457458
// this should not block because the client already has server capabilities
458459
worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
460+
details := workercommon.RegistrationDetails{
461+
TotalWorkers: totalWorkers,
462+
Multiplicity: multiplicity,
463+
}
459464
for _, cmp := range components {
460-
cmp.Register(worker, ns)
465+
cmp.Register(worker, ns, details)
461466
}
462467

463468
// this blocks by calling DescribeNamespace a few times (with a 10s timeout)

service/worker/pernamespaceworker_test.go

+12-12
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (s *perNsWorkerManagerSuite) TestEnabled() {
182182
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
183183
wkr1 := mocksdk.NewMockWorker(s.controller)
184184
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
185-
s.cmp1.EXPECT().Register(wkr1, ns)
185+
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
186186
wkr1.EXPECT().Start()
187187

188188
s.manager.namespaceCallback(ns, false)
@@ -213,7 +213,7 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() {
213213
s.Equal(4, options.MaxConcurrentWorkflowTaskPollers)
214214
s.Equal(4, options.MaxConcurrentActivityTaskPollers)
215215
}).Return(wkr1)
216-
s.cmp1.EXPECT().Register(wkr1, ns)
216+
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 3, Multiplicity: 2})
217217
wkr1.EXPECT().Start()
218218

219219
s.manager.namespaceCallback(ns, false)
@@ -258,7 +258,7 @@ func (s *perNsWorkerManagerSuite) TestOptions() {
258258
s.Equal(0.0, options.WorkerLocalActivitiesPerSecond)
259259
s.Equal(0*time.Millisecond, options.StickyScheduleToStartTimeout)
260260
}).Return(wkr)
261-
s.cmp1.EXPECT().Register(wkr, gomock.Any()).AnyTimes()
261+
s.cmp1.EXPECT().Register(wkr, gomock.Any(), gomock.Any()).AnyTimes()
262262
wkr.EXPECT().Start().AnyTimes()
263263

264264
s.manager.namespaceCallback(ns1, false)
@@ -302,9 +302,9 @@ func (s *perNsWorkerManagerSuite) TestTwoNamespacesTwoComponents() {
302302
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
303303
s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2)
304304

305-
s.cmp1.EXPECT().Register(wkr1, ns1)
306-
s.cmp1.EXPECT().Register(wkr2, ns2)
307-
s.cmp2.EXPECT().Register(wkr1, ns1)
305+
s.cmp1.EXPECT().Register(wkr1, ns1, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
306+
s.cmp1.EXPECT().Register(wkr2, ns2, workercommon.RegistrationDetails{TotalWorkers: 2, Multiplicity: 2})
307+
s.cmp2.EXPECT().Register(wkr1, ns1, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
308308

309309
wkr1.EXPECT().Start()
310310
wkr2.EXPECT().Start()
@@ -334,7 +334,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() {
334334
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
335335
wkr1 := mocksdk.NewMockWorker(s.controller)
336336
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
337-
s.cmp1.EXPECT().Register(wkr1, ns)
337+
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
338338
wkr1.EXPECT().Start()
339339

340340
s.manager.namespaceCallback(ns, false)
@@ -354,7 +354,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() {
354354
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2)
355355
wkr2 := mocksdk.NewMockWorker(s.controller)
356356
s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2)
357-
s.cmp1.EXPECT().Register(wkr2, ns)
357+
s.cmp1.EXPECT().Register(wkr2, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
358358
wkr2.EXPECT().Start()
359359

360360
s.manager.namespaceCallback(nsRestored, false)
@@ -389,7 +389,7 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() {
389389
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
390390
wkr1 := mocksdk.NewMockWorker(s.controller)
391391
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
392-
s.cmp1.EXPECT().Register(wkr1, ns)
392+
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
393393
wkr1.EXPECT().Start()
394394

395395
s.manager.membershipChangedCh <- nil
@@ -422,7 +422,7 @@ func (s *perNsWorkerManagerSuite) TestServiceResolverError() {
422422
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
423423
wkr1 := mocksdk.NewMockWorker(s.controller)
424424
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
425-
s.cmp1.EXPECT().Register(wkr1, ns)
425+
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
426426
wkr1.EXPECT().Start()
427427

428428
s.manager.namespaceCallback(ns, false)
@@ -449,7 +449,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() {
449449
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
450450
wkr1 := mocksdk.NewMockWorker(s.controller)
451451
s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr1)
452-
s.cmp1.EXPECT().Register(wkr1, ns)
452+
s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
453453

454454
// first try fails to start
455455
wkr1.EXPECT().Start().Return(errors.New("start worker error"))
@@ -460,7 +460,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() {
460460
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2)
461461
wkr2 := mocksdk.NewMockWorker(s.controller)
462462
s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Return(wkr2)
463-
s.cmp1.EXPECT().Register(wkr2, ns)
463+
s.cmp1.EXPECT().Register(wkr2, ns, workercommon.RegistrationDetails{TotalWorkers: 1, Multiplicity: 1})
464464
wkr2.EXPECT().Start()
465465

466466
s.manager.namespaceCallback(ns, false)

service/worker/scheduler/activities.go

+24
Original file line numberDiff line numberDiff line change
@@ -44,28 +44,52 @@ import (
4444
"go.temporal.io/server/common"
4545
"go.temporal.io/server/common/namespace"
4646
"go.temporal.io/server/common/primitives/timestamp"
47+
"go.temporal.io/server/common/quotas"
4748
)
4849

4950
type (
5051
activities struct {
5152
activityDeps
5253
namespace namespace.Name
5354
namespaceID namespace.ID
55+
// Rate limiter for start workflow requests. Note that the scope is all schedules in
56+
// this namespace on this worker.
57+
startWorkflowRateLimiter quotas.RateLimiter
5458
}
5559

5660
errFollow string
61+
62+
rateLimitedDetails struct {
63+
Delay time.Duration
64+
}
5765
)
5866

5967
var (
6068
errTryAgain = errors.New("try again")
6169
errWrongChain = errors.New("found running workflow with wrong FirstExecutionRunId")
6270
errNoEvents = errors.New("GetEvents didn't return any events")
6371
errNoAttrs = errors.New("last event did not have correct attrs")
72+
errBlocked = errors.New("rate limiter doesn't allow any progress")
6473
)
6574

6675
func (e errFollow) Error() string { return string(e) }
6776

6877
func (a *activities) StartWorkflow(ctx context.Context, req *schedspb.StartWorkflowRequest) (*schedspb.StartWorkflowResponse, error) {
78+
if !req.CompletedRateLimitSleep {
79+
reservation := a.startWorkflowRateLimiter.Reserve()
80+
if !reservation.OK() {
81+
return nil, translateError(errBlocked, "StartWorkflowExecution")
82+
}
83+
delay := reservation.Delay()
84+
if delay > 1*time.Second {
85+
// for a long sleep, ask the workflow to do it in workflow logic
86+
return nil, temporal.NewNonRetryableApplicationError(
87+
rateLimitedErrorType, rateLimitedErrorType, nil, rateLimitedDetails{Delay: delay})
88+
}
89+
// short sleep can be done in-line
90+
time.Sleep(delay)
91+
}
92+
6993
req.Request.Namespace = a.namespace.String()
7094

7195
request := common.CreateHistoryStartWorkflowRequest(

service/worker/scheduler/fx.go

+16-8
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"go.temporal.io/server/common/log"
3737
"go.temporal.io/server/common/metrics"
3838
"go.temporal.io/server/common/namespace"
39+
"go.temporal.io/server/common/quotas"
3940
workercommon "go.temporal.io/server/service/worker/common"
4041
)
4142

@@ -46,8 +47,9 @@ const (
4647

4748
type (
4849
workerComponent struct {
49-
activityDeps activityDeps
50-
enabledForNs dynamicconfig.BoolPropertyFnWithNamespaceFilter
50+
activityDeps activityDeps
51+
enabledForNs dynamicconfig.BoolPropertyFnWithNamespaceFilter
52+
globalNSStartWorkflowRPS dynamicconfig.FloatPropertyFnWithNamespaceFilter
5153
}
5254

5355
activityDeps struct {
@@ -77,6 +79,8 @@ func NewResult(
7779
activityDeps: params,
7880
enabledForNs: dcCollection.GetBoolPropertyFnWithNamespaceFilter(
7981
dynamicconfig.WorkerEnableScheduler, true),
82+
globalNSStartWorkflowRPS: dcCollection.GetFloatPropertyFilteredByNamespace(
83+
dynamicconfig.SchedulerNamespaceStartWorkflowRPS, 30.0),
8084
},
8185
}
8286
}
@@ -87,15 +91,19 @@ func (s *workerComponent) DedicatedWorkerOptions(ns *namespace.Namespace) *worke
8791
}
8892
}
8993

90-
func (s *workerComponent) Register(worker sdkworker.Worker, ns *namespace.Namespace) {
94+
func (s *workerComponent) Register(worker sdkworker.Worker, ns *namespace.Namespace, details workercommon.RegistrationDetails) {
9195
worker.RegisterWorkflowWithOptions(SchedulerWorkflow, workflow.RegisterOptions{Name: WorkflowType})
92-
worker.RegisterActivity(s.activities(ns.Name(), ns.ID()))
96+
worker.RegisterActivity(s.activities(ns.Name(), ns.ID(), details))
9397
}
9498

95-
func (s *workerComponent) activities(name namespace.Name, id namespace.ID) *activities {
99+
func (s *workerComponent) activities(name namespace.Name, id namespace.ID, details workercommon.RegistrationDetails) *activities {
100+
localRPS := func() float64 {
101+
return float64(details.Multiplicity) * s.globalNSStartWorkflowRPS(name.String()) / float64(details.TotalWorkers)
102+
}
96103
return &activities{
97-
activityDeps: s.activityDeps,
98-
namespace: name,
99-
namespaceID: id,
104+
activityDeps: s.activityDeps,
105+
namespace: name,
106+
namespaceID: id,
107+
startWorkflowRateLimiter: quotas.NewDefaultOutgoingRateLimiter(localRPS),
100108
}
101109
}

0 commit comments

Comments
 (0)