Skip to content

Commit e0081fd

Browse files
MichaelSnowdendnr
andauthored
Fix a rare deadlock in scanner.Stop (#3818)
* Fix a rare deadlock in scanner.Stop * Update service/worker/scanner/scanner.go Co-authored-by: David Reiss <david@temporal.io> Co-authored-by: David Reiss <david@temporal.io>
1 parent eb4b574 commit e0081fd

File tree

2 files changed

+112
-37
lines changed

2 files changed

+112
-37
lines changed

service/worker/scanner/scanner.go

+22-15
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737

3838
"go.temporal.io/server/api/adminservice/v1"
3939
"go.temporal.io/server/api/historyservice/v1"
40+
"go.temporal.io/server/common"
4041
"go.temporal.io/server/common/config"
4142
"go.temporal.io/server/common/headers"
4243
"go.temporal.io/server/common/log"
@@ -102,8 +103,9 @@ type (
102103
// of database tables to cleanup resources, monitor anamolies
103104
// and emit stats for analytics
104105
Scanner struct {
105-
context scannerContext
106-
wg sync.WaitGroup
106+
context scannerContext
107+
wg sync.WaitGroup
108+
lifecycleCancel context.CancelFunc
107109
}
108110
)
109111

@@ -144,6 +146,7 @@ func New(
144146
func (s *Scanner) Start() error {
145147
ctx := context.WithValue(context.Background(), scannerContextKey, s.context)
146148
ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundCallerInfo)
149+
ctx, s.lifecycleCancel = context.WithCancel(ctx)
147150

148151
workerOpts := worker.Options{
149152
MaxConcurrentActivityExecutionSize: s.context.cfg.MaxConcurrentActivityExecutionSize(),
@@ -157,19 +160,19 @@ func (s *Scanner) Start() error {
157160
var workerTaskQueueNames []string
158161
if s.context.cfg.ExecutionsScannerEnabled() {
159162
s.wg.Add(1)
160-
go s.startWorkflowWithRetry(executionsScannerWFStartOptions, executionsScannerWFTypeName)
163+
go s.startWorkflowWithRetry(ctx, executionsScannerWFStartOptions, executionsScannerWFTypeName)
161164
workerTaskQueueNames = append(workerTaskQueueNames, executionsScannerTaskQueueName)
162165
}
163166

164167
if s.context.cfg.Persistence.DefaultStoreType() == config.StoreTypeSQL && s.context.cfg.TaskQueueScannerEnabled() {
165168
s.wg.Add(1)
166-
go s.startWorkflowWithRetry(tlScannerWFStartOptions, tqScannerWFTypeName)
169+
go s.startWorkflowWithRetry(ctx, tlScannerWFStartOptions, tqScannerWFTypeName)
167170
workerTaskQueueNames = append(workerTaskQueueNames, tqScannerTaskQueueName)
168171
}
169172

170173
if s.context.cfg.HistoryScannerEnabled() {
171174
s.wg.Add(1)
172-
go s.startWorkflowWithRetry(historyScannerWFStartOptions, historyScannerWFTypeName)
175+
go s.startWorkflowWithRetry(ctx, historyScannerWFStartOptions, historyScannerWFTypeName)
173176
workerTaskQueueNames = append(workerTaskQueueNames, historyScannerTaskQueueName)
174177
}
175178

@@ -192,37 +195,41 @@ func (s *Scanner) Start() error {
192195
}
193196

194197
func (s *Scanner) Stop() {
198+
s.lifecycleCancel()
195199
s.wg.Wait()
196200
}
197201

198-
func (s *Scanner) startWorkflowWithRetry(
199-
options sdkclient.StartWorkflowOptions,
200-
workflowType string,
201-
workflowArgs ...interface{},
202-
) {
202+
func (s *Scanner) startWorkflowWithRetry(ctx context.Context, options sdkclient.StartWorkflowOptions, workflowType string, workflowArgs ...interface{}) {
203203
defer s.wg.Done()
204204

205205
policy := backoff.NewExponentialRetryPolicy(time.Second).
206206
WithMaximumInterval(time.Minute).
207207
WithExpirationInterval(backoff.NoInterval)
208-
err := backoff.ThrottleRetry(func() error {
209-
return s.startWorkflow(s.context.sdkClientFactory.GetSystemClient(), options, workflowType, workflowArgs...)
208+
err := backoff.ThrottleRetryContext(ctx, func(ctx context.Context) error {
209+
return s.startWorkflow(
210+
ctx,
211+
s.context.sdkClientFactory.GetSystemClient(),
212+
options,
213+
workflowType,
214+
workflowArgs...,
215+
)
210216
}, policy, func(err error) bool {
211217
return true
212218
})
213-
if err != nil {
219+
// if the scanner shuts down before the workflow is started, then the error will be context canceled
220+
if err != nil && !common.IsContextCanceledErr(err) {
214221
s.context.logger.Fatal("unable to start scanner", tag.WorkflowType(workflowType), tag.Error(err))
215222
}
216223
}
217224

218225
func (s *Scanner) startWorkflow(
226+
ctx context.Context,
219227
client sdkclient.Client,
220228
options sdkclient.StartWorkflowOptions,
221229
workflowType string,
222230
workflowArgs ...interface{},
223231
) error {
224-
225-
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
232+
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute)
226233
_, err := client.ExecuteWorkflow(ctx, options, workflowType, workflowArgs...)
227234
cancel()
228235
if err != nil {

service/worker/scanner/scanner_test.go

+90-22
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,18 @@
2525
package scanner
2626

2727
import (
28+
"context"
29+
"sync"
2830
"testing"
2931

3032
"github.com/golang/mock/gomock"
3133
"github.com/stretchr/testify/suite"
34+
"go.temporal.io/sdk/client"
3235

3336
"go.temporal.io/server/api/adminservicemock/v1"
3437
"go.temporal.io/server/api/historyservicemock/v1"
3538
"go.temporal.io/server/common/config"
39+
"go.temporal.io/server/common/dynamicconfig"
3640
"go.temporal.io/server/common/log"
3741
"go.temporal.io/server/common/metrics"
3842
"go.temporal.io/server/common/namespace"
@@ -161,27 +165,13 @@ func (s *scannerTestSuite) TestScannerEnabled() {
161165
scanner := New(
162166
log.NewNoopLogger(),
163167
&Config{
164-
MaxConcurrentActivityExecutionSize: func() int {
165-
return 1
166-
},
167-
MaxConcurrentWorkflowTaskExecutionSize: func() int {
168-
return 1
169-
},
170-
MaxConcurrentActivityTaskPollers: func() int {
171-
return 1
172-
},
173-
MaxConcurrentWorkflowTaskPollers: func() int {
174-
return 1
175-
},
176-
ExecutionsScannerEnabled: func() bool {
177-
return c.ExecutionsScannerEnabled
178-
},
179-
HistoryScannerEnabled: func() bool {
180-
return c.HistoryScannerEnabled
181-
},
182-
TaskQueueScannerEnabled: func() bool {
183-
return c.TaskQueueScannerEnabled
184-
},
168+
MaxConcurrentActivityExecutionSize: dynamicconfig.GetIntPropertyFn(1),
169+
MaxConcurrentWorkflowTaskExecutionSize: dynamicconfig.GetIntPropertyFn(1),
170+
MaxConcurrentActivityTaskPollers: dynamicconfig.GetIntPropertyFn(1),
171+
MaxConcurrentWorkflowTaskPollers: dynamicconfig.GetIntPropertyFn(1),
172+
HistoryScannerEnabled: dynamicconfig.GetBoolPropertyFn(c.HistoryScannerEnabled),
173+
ExecutionsScannerEnabled: dynamicconfig.GetBoolPropertyFn(c.ExecutionsScannerEnabled),
174+
TaskQueueScannerEnabled: dynamicconfig.GetBoolPropertyFn(c.TaskQueueScannerEnabled),
185175
Persistence: &config.Persistence{
186176
DefaultStore: c.DefaultStore,
187177
DataStores: map[string]config.DataStore{
@@ -201,18 +191,96 @@ func (s *scannerTestSuite) TestScannerEnabled() {
201191
mockNamespaceRegistry,
202192
mockWorkerFactory,
203193
)
194+
var wg sync.WaitGroup
204195
for _, sc := range c.ExpectedScanners {
196+
wg.Add(1)
205197
worker := mocksdk.NewMockWorker(ctrl)
206198
worker.EXPECT().RegisterActivityWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
207199
worker.EXPECT().RegisterWorkflowWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
208200
worker.EXPECT().Start()
209201
mockWorkerFactory.EXPECT().New(gomock.Any(), sc.TaskQueueName, gomock.Any()).Return(worker)
210202
mockSdkClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes()
211-
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), sc.WFTypeName, gomock.Any())
203+
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), sc.WFTypeName,
204+
gomock.Any()).Do(func(
205+
_ context.Context,
206+
_ client.StartWorkflowOptions,
207+
_ string,
208+
_ ...interface{},
209+
) {
210+
wg.Done()
211+
})
212212
}
213213
err := scanner.Start()
214214
s.NoError(err)
215+
wg.Wait()
215216
scanner.Stop()
216217
})
217218
}
218219
}
220+
221+
// TestScannerWorkflow tests that the scanner can be shut down even when it hasn't finished starting.
222+
// This fixes a rare issue that can occur when Stop() is called quickly after Start(). When Start() is called, the
223+
// scanner starts a new goroutine for each scanner type. In that goroutine, an sdk client is created which dials the
224+
// frontend service. If the test driver calls Stop() on the server, then the server stops the frontend service and the
225+
// history service. In some cases, the frontend services stops before the sdk client has finished connecting to it.
226+
// This causes the startWorkflow() call to fail with an error. However, startWorkflowWithRetry retries the call for
227+
// a whole minute, which causes the test to take a long time to fail. So, instead we immediately cancel all async
228+
// requests when Stop() is called.
229+
func (s *scannerTestSuite) TestScannerShutdown() {
230+
ctrl := gomock.NewController(s.T())
231+
232+
logger := log.NewTestLogger()
233+
mockSdkClientFactory := sdk.NewMockClientFactory(ctrl)
234+
mockSdkClient := mocksdk.NewMockClient(ctrl)
235+
mockNamespaceRegistry := namespace.NewMockRegistry(ctrl)
236+
mockAdminClient := adminservicemock.NewMockAdminServiceClient(ctrl)
237+
mockWorkerFactory := sdk.NewMockWorkerFactory(ctrl)
238+
worker := mocksdk.NewMockWorker(ctrl)
239+
scanner := New(
240+
logger,
241+
&Config{
242+
MaxConcurrentActivityExecutionSize: dynamicconfig.GetIntPropertyFn(1),
243+
MaxConcurrentWorkflowTaskExecutionSize: dynamicconfig.GetIntPropertyFn(1),
244+
MaxConcurrentActivityTaskPollers: dynamicconfig.GetIntPropertyFn(1),
245+
MaxConcurrentWorkflowTaskPollers: dynamicconfig.GetIntPropertyFn(1),
246+
HistoryScannerEnabled: dynamicconfig.GetBoolPropertyFn(true),
247+
ExecutionsScannerEnabled: dynamicconfig.GetBoolPropertyFn(false),
248+
TaskQueueScannerEnabled: dynamicconfig.GetBoolPropertyFn(false),
249+
Persistence: &config.Persistence{
250+
DefaultStore: config.StoreTypeNoSQL,
251+
DataStores: map[string]config.DataStore{
252+
config.StoreTypeNoSQL: {},
253+
},
254+
},
255+
},
256+
mockSdkClientFactory,
257+
metrics.NoopMetricsHandler,
258+
p.NewMockExecutionManager(ctrl),
259+
p.NewMockTaskManager(ctrl),
260+
historyservicemock.NewMockHistoryServiceClient(ctrl),
261+
mockAdminClient,
262+
mockNamespaceRegistry,
263+
mockWorkerFactory,
264+
)
265+
mockSdkClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes()
266+
worker.EXPECT().RegisterActivityWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
267+
worker.EXPECT().RegisterWorkflowWithOptions(gomock.Any(), gomock.Any()).AnyTimes()
268+
worker.EXPECT().Start()
269+
mockWorkerFactory.EXPECT().New(gomock.Any(), gomock.Any(), gomock.Any()).Return(worker)
270+
var wg sync.WaitGroup
271+
wg.Add(1)
272+
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(func(
273+
ctx context.Context,
274+
_ client.StartWorkflowOptions,
275+
_ string,
276+
_ ...interface{},
277+
) (client.WorkflowRun, error) {
278+
wg.Done()
279+
<-ctx.Done()
280+
return nil, ctx.Err()
281+
})
282+
err := scanner.Start()
283+
s.NoError(err)
284+
wg.Wait()
285+
scanner.Stop()
286+
}

0 commit comments

Comments
 (0)