Skip to content

Commit 278f545

Browse files
authored
Add dynamic config for sdk worker options (#3806)
1 parent a06da89 commit 278f545

File tree

14 files changed

+185
-103
lines changed

14 files changed

+185
-103
lines changed

common/dynamicconfig/constants.go

+5
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,11 @@ const (
713713
WorkerParentCloseMaxConcurrentWorkflowTaskPollers = "worker.ParentCloseMaxConcurrentWorkflowTaskPollers"
714714
// WorkerPerNamespaceWorkerCount controls number of per-ns (scheduler, batcher, etc.) workers to run per namespace
715715
WorkerPerNamespaceWorkerCount = "worker.perNamespaceWorkerCount"
716+
// WorkerPerNamespaceWorkerOptions are SDK worker options for per-namespace worker
717+
WorkerPerNamespaceWorkerOptions = "worker.perNamespaceWorkerOptions"
716718
// WorkerEnableScheduler controls whether to start the worker for scheduled workflows
717719
WorkerEnableScheduler = "worker.enableScheduler"
720+
// WorkerStickyCacheSize controls the sticky cache size for SDK workers on worker nodes
721+
// (shared between all workers in the process, cannot be changed after startup)
722+
WorkerStickyCacheSize = "worker.stickyCacheSize"
718723
)

common/resource/fx.go

+2-5
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ var DefaultOptions = fx.Options(
132132
fx.Provide(ArchiverProviderProvider),
133133
fx.Provide(ThrottledLoggerProvider),
134134
fx.Provide(SdkClientFactoryProvider),
135-
fx.Provide(SdkWorkerFactoryProvider),
136135
fx.Provide(DCRedirectionPolicyProvider),
137136
)
138137

@@ -394,6 +393,7 @@ func SdkClientFactoryProvider(
394393
metricsHandler metrics.Handler,
395394
logger log.SnTaggedLogger,
396395
resolver membership.GRPCResolver,
396+
dc *dynamicconfig.Collection,
397397
) (sdk.ClientFactory, error) {
398398
frontendURL, frontendTLSConfig, err := getFrontendConnectionDetails(cfg, tlsConfigProvider, resolver)
399399
if err != nil {
@@ -404,13 +404,10 @@ func SdkClientFactoryProvider(
404404
frontendTLSConfig,
405405
metricsHandler,
406406
logger,
407+
dc.GetIntProperty(dynamicconfig.WorkerStickyCacheSize, 0),
407408
), nil
408409
}
409410

410-
func SdkWorkerFactoryProvider() sdk.WorkerFactory {
411-
return sdk.NewWorkerFactory()
412-
}
413-
414411
func DCRedirectionPolicyProvider(cfg *config.Config) config.DCRedirectionPolicy {
415412
return cfg.DCRedirectionPolicy
416413
}

common/sdk/factory.go

+16-17
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636

3737
"go.temporal.io/server/common"
3838
"go.temporal.io/server/common/backoff"
39+
"go.temporal.io/server/common/dynamicconfig"
3940
"go.temporal.io/server/common/log"
4041
"go.temporal.io/server/common/log/tag"
4142
"go.temporal.io/server/common/metrics"
@@ -48,10 +49,7 @@ type (
4849
// MetricsHandler, or Logger (they will be overwritten)
4950
NewClient(options sdkclient.Options) sdkclient.Client
5051
GetSystemClient() sdkclient.Client
51-
}
52-
53-
WorkerFactory interface {
54-
New(client sdkclient.Client, taskQueue string, options sdkworker.Options) sdkworker.Worker
52+
NewWorker(client sdkclient.Client, taskQueue string, options sdkworker.Options) sdkworker.Worker
5553
}
5654

5755
clientFactory struct {
@@ -61,29 +59,29 @@ type (
6159
logger log.Logger
6260
sdklogger sdklog.Logger
6361
systemSdkClient sdkclient.Client
62+
stickyCacheSize dynamicconfig.IntPropertyFn
6463
once sync.Once
6564
}
66-
67-
workerFactory struct{}
6865
)
6966

7067
var (
7168
_ ClientFactory = (*clientFactory)(nil)
72-
_ WorkerFactory = (*workerFactory)(nil)
7369
)
7470

7571
func NewClientFactory(
7672
hostPort string,
7773
tlsConfig *tls.Config,
7874
metricsHandler metrics.Handler,
7975
logger log.Logger,
76+
stickyCacheSize dynamicconfig.IntPropertyFn,
8077
) *clientFactory {
8178
return &clientFactory{
82-
hostPort: hostPort,
83-
tlsConfig: tlsConfig,
84-
metricsHandler: NewMetricsHandler(metricsHandler),
85-
logger: logger,
86-
sdklogger: log.NewSdkLogger(logger),
79+
hostPort: hostPort,
80+
tlsConfig: tlsConfig,
81+
metricsHandler: NewMetricsHandler(metricsHandler),
82+
logger: logger,
83+
sdklogger: log.NewSdkLogger(logger),
84+
stickyCacheSize: stickyCacheSize,
8785
}
8886
}
8987

@@ -122,15 +120,16 @@ func (f *clientFactory) GetSystemClient() sdkclient.Client {
122120
if err != nil {
123121
f.logger.Fatal("error creating sdk client", tag.Error(err))
124122
}
123+
124+
if size := f.stickyCacheSize(); size > 0 {
125+
f.logger.Info("setting sticky workflow cache size", tag.NewInt("size", size))
126+
sdkworker.SetStickyWorkflowCacheSize(size)
127+
}
125128
})
126129
return f.systemSdkClient
127130
}
128131

129-
func NewWorkerFactory() *workerFactory {
130-
return &workerFactory{}
131-
}
132-
133-
func (f *workerFactory) New(
132+
func (f *clientFactory) NewWorker(
134133
client sdkclient.Client,
135134
taskQueue string,
136135
options sdkworker.Options,

common/sdk/factory_mock.go

+6-29
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/worker/archiver/client_worker.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func NewClientWorker(container *BootstrapContainer) ClientWorker {
120120
BackgroundActivityContext: actCtx,
121121
}
122122
clientWorker := &clientWorker{
123-
worker: worker.New(sdkClient, workflowTaskQueue, wo),
123+
worker: container.SdkClientFactory.NewWorker(sdkClient, workflowTaskQueue, wo),
124124
namespaceRegistry: container.NamespaceCache,
125125
}
126126

service/worker/batcher/batcher.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func (s *Batcher) Start() error {
8181
BackgroundActivityContext: ctx,
8282
}
8383
sdkClient := s.sdkClientFactory.GetSystemClient()
84-
batchWorker := worker.New(sdkClient, taskQueueName, workerOpts)
84+
batchWorker := s.sdkClientFactory.NewWorker(sdkClient, taskQueueName, workerOpts)
8585
batchWorker.RegisterWorkflowWithOptions(BatchWorkflow, workflow.RegisterOptions{Name: BatchWFTypeName})
8686
batchWorker.RegisterActivity(&activities{
8787
activityDeps: activityDeps{

service/worker/parentclosepolicy/processor.go

+5-7
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,8 @@ type (
5050
NumParentClosePolicySystemWorkflows dynamicconfig.IntPropertyFn
5151
}
5252

53-
// BootstrapParams contains the set of params needed to bootstrap
54-
// the sub-system
53+
// BootstrapParams contains the set of params needed to bootstrap the sub-system
5554
BootstrapParams struct {
56-
// SdkSystemClient is an instance of temporal service client
5755
SdkClientFactory sdk.ClientFactory
5856
// MetricsHandler is an instance of metrics object for emitting stats
5957
MetricsHandler metrics.Handler
@@ -69,7 +67,7 @@ type (
6967

7068
// Processor is the background sub-system that execute workflow for ParentClosePolicy
7169
Processor struct {
72-
svcClientFactory sdk.ClientFactory
70+
sdkClientFactory sdk.ClientFactory
7371
clientBean client.Bean
7472
metricsHandler metrics.Handler
7573
cfg Config
@@ -81,7 +79,7 @@ type (
8179
// New returns a new instance as daemon
8280
func New(params *BootstrapParams) *Processor {
8381
return &Processor{
84-
svcClientFactory: params.SdkClientFactory,
82+
sdkClientFactory: params.SdkClientFactory,
8583
metricsHandler: params.MetricsHandler.WithTags(metrics.OperationTag(metrics.ParentClosePolicyProcessorScope)),
8684
cfg: params.Config,
8785
logger: log.With(params.Logger, tag.ComponentBatcher),
@@ -92,8 +90,8 @@ func New(params *BootstrapParams) *Processor {
9290

9391
// Start starts the scanner
9492
func (s *Processor) Start() error {
95-
svcClient := s.svcClientFactory.GetSystemClient()
96-
processorWorker := worker.New(svcClient, processorTaskQueueName, getWorkerOptions(s))
93+
svcClient := s.sdkClientFactory.GetSystemClient()
94+
processorWorker := s.sdkClientFactory.NewWorker(svcClient, processorTaskQueueName, getWorkerOptions(s))
9795
processorWorker.RegisterWorkflowWithOptions(ProcessorWorkflow, workflow.RegisterOptions{Name: processorWFTypeName})
9896
processorWorker.RegisterActivityWithOptions(ProcessorActivity, activity.RegisterOptions{Name: processorActivityName})
9997

service/worker/pernamespaceworker.go

+57-13
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ package worker
2626

2727
import (
2828
"context"
29+
"encoding/json"
2930
"errors"
3031
"fmt"
3132
"os"
@@ -49,8 +50,10 @@ import (
4950
"go.temporal.io/server/common/membership"
5051
"go.temporal.io/server/common/namespace"
5152
"go.temporal.io/server/common/primitives"
53+
"go.temporal.io/server/common/primitives/timestamp"
5254
"go.temporal.io/server/common/resource"
5355
"go.temporal.io/server/common/sdk"
56+
"go.temporal.io/server/common/util"
5457
workercommon "go.temporal.io/server/service/worker/common"
5558
)
5659

@@ -63,7 +66,6 @@ type (
6366
fx.In
6467
Logger log.Logger
6568
SdkClientFactory sdk.ClientFactory
66-
SdkWorkerFactory sdk.WorkerFactory
6769
NamespaceRegistry namespace.Registry
6870
HostName resource.HostName
6971
Config *Config
@@ -77,7 +79,6 @@ type (
7779
// from init params or Start
7880
logger log.Logger
7981
sdkClientFactory sdk.ClientFactory
80-
sdkWorkerFactory sdk.WorkerFactory
8182
namespaceRegistry namespace.Registry
8283
self *membership.HostInfo
8384
hostName resource.HostName
@@ -105,6 +106,19 @@ type (
105106
client sdkclient.Client
106107
worker sdkworker.Worker
107108
}
109+
110+
sdkWorkerOptions struct {
111+
// Copy of relevant fields from sdkworker.Options
112+
MaxConcurrentActivityExecutionSize int
113+
WorkerActivitiesPerSecond float64
114+
MaxConcurrentLocalActivityExecutionSize int
115+
WorkerLocalActivitiesPerSecond float64
116+
MaxConcurrentActivityTaskPollers int
117+
MaxConcurrentWorkflowTaskExecutionSize int
118+
MaxConcurrentWorkflowTaskPollers int
119+
StickyScheduleToStartTimeout string // parse into time.Duration
120+
StickyScheduleToStartTimeoutDuration time.Duration
121+
}
108122
)
109123

110124
var (
@@ -115,7 +129,6 @@ func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *p
115129
return &perNamespaceWorkerManager{
116130
logger: log.With(params.Logger, tag.ComponentPerNSWorkerManager),
117131
sdkClientFactory: params.SdkClientFactory,
118-
sdkWorkerFactory: params.SdkWorkerFactory,
119132
namespaceRegistry: params.NamespaceRegistry,
120133
hostName: params.HostName,
121134
config: params.Config,
@@ -249,6 +262,22 @@ func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespa
249262
return multiplicity, nil
250263
}
251264

265+
func (wm *perNamespaceWorkerManager) getWorkerOptions(ns *namespace.Namespace) sdkWorkerOptions {
266+
optionsMap := wm.config.PerNamespaceWorkerOptions(ns.Name().String())
267+
var options sdkWorkerOptions
268+
b, err := json.Marshal(optionsMap)
269+
if err != nil {
270+
return options
271+
}
272+
_ = json.Unmarshal(b, &options) // ignore errors, just use the zero value anyway
273+
if len(options.StickyScheduleToStartTimeout) > 0 {
274+
if options.StickyScheduleToStartTimeoutDuration, err = timestamp.ParseDuration(options.StickyScheduleToStartTimeout); err != nil {
275+
wm.logger.Warn("invalid StickyScheduleToStartTimeout", tag.Error(err))
276+
}
277+
}
278+
return options
279+
}
280+
252281
// called on namespace state change callback
253282
func (w *perNamespaceWorker) refreshWithNewNamespace(ns *namespace.Namespace, deleted bool) {
254283
w.lock.Lock()
@@ -357,7 +386,11 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
357386
return errNoWorkerNeeded
358387
}
359388
// ensure this changes if multiplicity changes
360-
componentSet += fmt.Sprintf("%d", multiplicity)
389+
componentSet += fmt.Sprintf(",%d", multiplicity)
390+
391+
// get sdk worker options
392+
dcOptions := w.wm.getWorkerOptions(ns)
393+
componentSet += fmt.Sprintf(",%+v", dcOptions)
361394

362395
// we do need a worker, but maybe we have one already
363396
w.lock.Lock()
@@ -373,7 +406,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
373406
// create new one. note that even before startWorker returns, the worker may have started
374407
// and already called the fatal error handler. we need to set w.client+worker+componentSet
375408
// before releasing the lock to keep our state consistent.
376-
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity)
409+
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, dcOptions)
377410
if err != nil {
378411
// TODO: add metric also
379412
return err
@@ -389,6 +422,7 @@ func (w *perNamespaceWorker) startWorker(
389422
ns *namespace.Namespace,
390423
components []workercommon.PerNSWorkerComponent,
391424
multiplicity int,
425+
dcOptions sdkWorkerOptions,
392426
) (sdkclient.Client, sdkworker.Worker, error) {
393427
nsName := ns.Name().String()
394428
// this should not block because it uses an existing grpc connection
@@ -398,28 +432,38 @@ func (w *perNamespaceWorker) startWorker(
398432
})
399433

400434
var sdkoptions sdkworker.Options
435+
436+
// copy from dynamic config
437+
sdkoptions.MaxConcurrentActivityExecutionSize = dcOptions.MaxConcurrentActivityExecutionSize
438+
sdkoptions.WorkerActivitiesPerSecond = dcOptions.WorkerActivitiesPerSecond
439+
sdkoptions.MaxConcurrentLocalActivityExecutionSize = dcOptions.MaxConcurrentLocalActivityExecutionSize
440+
sdkoptions.WorkerLocalActivitiesPerSecond = dcOptions.WorkerLocalActivitiesPerSecond
441+
sdkoptions.MaxConcurrentActivityTaskPollers = util.Max(2, dcOptions.MaxConcurrentActivityTaskPollers)
442+
sdkoptions.MaxConcurrentWorkflowTaskExecutionSize = dcOptions.MaxConcurrentWorkflowTaskExecutionSize
443+
sdkoptions.MaxConcurrentWorkflowTaskPollers = util.Max(2, dcOptions.MaxConcurrentWorkflowTaskPollers)
444+
sdkoptions.StickyScheduleToStartTimeout = dcOptions.StickyScheduleToStartTimeoutDuration
445+
401446
sdkoptions.BackgroundActivityContext = headers.SetCallerInfo(context.Background(), headers.NewBackgroundCallerInfo(ns.Name().String()))
402447
sdkoptions.Identity = fmt.Sprintf("server-worker@%d@%s@%s", os.Getpid(), w.wm.hostName, nsName)
403-
// sdk default is 2, we increase it if we're supposed to run with more multiplicity.
404-
// other defaults are already large enough.
405-
sdkoptions.MaxConcurrentWorkflowTaskPollers = 2 * multiplicity
406-
sdkoptions.MaxConcurrentActivityTaskPollers = 2 * multiplicity
448+
// increase these if we're supposed to run with more multiplicity
449+
sdkoptions.MaxConcurrentWorkflowTaskPollers *= multiplicity
450+
sdkoptions.MaxConcurrentActivityTaskPollers *= multiplicity
407451
sdkoptions.OnFatalError = w.onFatalError
408452

409453
// this should not block because the client already has server capabilities
410-
sdkworker := w.wm.sdkWorkerFactory.New(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
454+
worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
411455
for _, cmp := range components {
412-
cmp.Register(sdkworker, ns)
456+
cmp.Register(worker, ns)
413457
}
414458

415459
// this blocks by calling DescribeNamespace a few times (with a 10s timeout)
416-
err := sdkworker.Start()
460+
err := worker.Start()
417461
if err != nil {
418462
client.Close()
419463
return nil, nil, err
420464
}
421465

422-
return client, sdkworker, nil
466+
return client, worker, nil
423467
}
424468

425469
func (w *perNamespaceWorker) onFatalError(err error) {

0 commit comments

Comments
 (0)