Skip to content

Commit 1454f84

Browse files
Fix errcheck in service/worker/ (#3731)
1 parent d896157 commit 1454f84

File tree

3 files changed

+73
-29
lines changed

3 files changed

+73
-29
lines changed

service/worker/pernamespaceworker.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,10 @@ func (wm *perNamespaceWorkerManager) Start(
151151
// this will call namespaceCallback with current namespaces
152152
wm.namespaceRegistry.RegisterStateChangeCallback(wm, wm.namespaceCallback)
153153

154-
wm.serviceResolver.AddListener(perNamespaceWorkerManagerListenerKey, wm.membershipChangedCh)
154+
err := wm.serviceResolver.AddListener(perNamespaceWorkerManagerListenerKey, wm.membershipChangedCh)
155+
if err != nil {
156+
wm.logger.Fatal("Unable to register membership listener", tag.Error(err))
157+
}
155158
go wm.membershipChangedListener()
156159

157160
wm.logger.Info("", tag.LifeCycleStarted)
@@ -169,7 +172,10 @@ func (wm *perNamespaceWorkerManager) Stop() {
169172
wm.logger.Info("", tag.LifeCycleStopping)
170173

171174
wm.namespaceRegistry.UnregisterStateChangeCallback(wm)
172-
wm.serviceResolver.RemoveListener(perNamespaceWorkerManagerListenerKey)
175+
err := wm.serviceResolver.RemoveListener(perNamespaceWorkerManagerListenerKey)
176+
if err != nil {
177+
wm.logger.Error("Unable to unregister membership listener", tag.Error(err))
178+
}
173179
close(wm.membershipChangedCh)
174180

175181
wm.lock.Lock()

service/worker/scheduler/workflow.go

+62-26
Original file line numberDiff line numberDiff line change
@@ -188,21 +188,34 @@ func (s *scheduler) run() error {
188188
s.logger.Warn("Time went backwards", "from", t1, "to", t2)
189189
t2 = t1
190190
}
191-
nextSleep := s.processTimeRange(
191+
nextSleep, err := s.processTimeRange(
192192
t1, t2,
193193
// resolve this to the schedule's policy as late as possible
194194
enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED,
195195
false,
196196
)
197+
if err != nil {
198+
return err
199+
}
197200
s.State.LastProcessedTime = timestamp.TimePtr(t2)
198201
// handle signals after processing time range that just elapsed
199202
scheduleChanged := s.processSignals()
200203
if scheduleChanged {
201204
// need to calculate sleep again
202-
nextSleep = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
205+
nextSleep, err = s.processTimeRange(t2, t2, enumspb.SCHEDULE_OVERLAP_POLICY_UNSPECIFIED, false)
206+
if err != nil {
207+
return err
208+
}
203209
}
204210
// try starting workflows in the buffer
205-
for s.processBuffer() {
211+
for {
212+
b, err := s.processBuffer()
213+
if err != nil {
214+
return err
215+
}
216+
if !b {
217+
break
218+
}
206219
}
207220
s.updateMemoAndSearchAttributes()
208221
// sleep returns on any of:
@@ -314,11 +327,11 @@ func (s *scheduler) processTimeRange(
314327
t1, t2 time.Time,
315328
overlapPolicy enumspb.ScheduleOverlapPolicy,
316329
manual bool,
317-
) time.Duration {
330+
) (time.Duration, error) {
318331
s.logger.Debug("processTimeRange", "t1", t1, "t2", t2, "overlapPolicy", overlapPolicy, "manual", manual)
319332

320333
if s.cspec == nil {
321-
return invalidDuration
334+
return invalidDuration, nil
322335
}
323336

324337
catchupWindow := s.getCatchupWindow()
@@ -327,14 +340,16 @@ func (s *scheduler) processTimeRange(
327340
// Run this logic in a SideEffect so that we can fix bugs there without breaking
328341
// existing schedule workflows.
329342
var next getNextTimeResult
330-
workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
343+
if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
331344
return s.cspec.getNextTime(t1)
332-
}).Get(&next)
345+
}).Get(&next); err != nil {
346+
return 0, err
347+
}
333348
t1 = next.Next
334349
if t1.IsZero() {
335-
return invalidDuration
350+
return invalidDuration, nil
336351
} else if t1.After(t2) {
337-
return t1.Sub(t2)
352+
return t1.Sub(t2), nil
338353
}
339354
if !manual && t2.Sub(t1) > catchupWindow {
340355
s.logger.Warn("Schedule missed catchup window", "now", t2, "time", t1)
@@ -675,7 +690,7 @@ func (s *scheduler) addStart(nominalTime, actualTime time.Time, overlapPolicy en
675690
}
676691

677692
// processBuffer should return true if there might be more work to do right now.
678-
func (s *scheduler) processBuffer() bool {
693+
func (s *scheduler) processBuffer() (bool, error) {
679694
s.logger.Debug("processBuffer", "buffer", len(s.State.BufferedStarts), "running", len(s.Info.RunningWorkflows), "needRefresh", s.State.NeedRefresh)
680695

681696
// TODO: consider doing this always and removing needRefresh? we only end up here without
@@ -692,7 +707,7 @@ func (s *scheduler) processBuffer() bool {
692707
req := s.Schedule.Action.GetStartWorkflow()
693708
if req == nil || len(s.State.BufferedStarts) == 0 {
694709
s.State.BufferedStarts = nil
695-
return false
710+
return false, nil
696711
}
697712

698713
isRunning := len(s.Info.RunningWorkflows) > 0
@@ -729,11 +744,15 @@ func (s *scheduler) processBuffer() bool {
729744
// Terminate or cancel if required (terminate overrides cancel if both are present)
730745
if action.needTerminate {
731746
for _, ex := range s.Info.RunningWorkflows {
732-
s.terminateWorkflow(ex)
747+
if err := s.terminateWorkflow(ex); err != nil {
748+
return false, err
749+
}
733750
}
734751
} else if action.needCancel {
735752
for _, ex := range s.Info.RunningWorkflows {
736-
s.cancelWorkflow(ex)
753+
if err := s.cancelWorkflow(ex); err != nil {
754+
return false, err
755+
}
737756
}
738757
}
739758

@@ -749,7 +768,7 @@ func (s *scheduler) processBuffer() bool {
749768
}
750769
}
751770

752-
return tryAgain
771+
return tryAgain, nil
753772
}
754773

755774
func (s *scheduler) recordAction(result *schedpb.ScheduleActionResult) {
@@ -788,6 +807,10 @@ func (s *scheduler) startWorkflow(
788807
}
789808
ctx := workflow.WithLocalActivityOptions(s.ctx, options)
790809

810+
requestID, err := s.newUUIDString()
811+
if err != nil {
812+
return nil, err
813+
}
791814
req := &schedspb.StartWorkflowRequest{
792815
Request: &workflowservice.StartWorkflowExecutionRequest{
793816
WorkflowId: workflowID,
@@ -798,7 +821,7 @@ func (s *scheduler) startWorkflow(
798821
WorkflowRunTimeout: newWorkflow.WorkflowRunTimeout,
799822
WorkflowTaskTimeout: newWorkflow.WorkflowTaskTimeout,
800823
Identity: s.identity(),
801-
RequestId: s.newUUIDString(),
824+
RequestId: requestID,
802825
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE,
803826
RetryPolicy: newWorkflow.RetryPolicy,
804827
Memo: newWorkflow.Memo,
@@ -809,7 +832,7 @@ func (s *scheduler) startWorkflow(
809832
ContinuedFailure: s.State.ContinuedFailure,
810833
}
811834
var res schedspb.StartWorkflowResponse
812-
err := workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res)
835+
err = workflow.ExecuteLocalActivity(ctx, s.a.StartWorkflow, req).Get(s.ctx, &res)
813836
if err != nil {
814837
return nil, err
815838
}
@@ -885,47 +908,60 @@ func (s *scheduler) startLongPollWatcher(ex *commonpb.WorkflowExecution) {
885908
s.watchingWorkflowId = ex.WorkflowId
886909
}
887910

888-
func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) {
911+
func (s *scheduler) cancelWorkflow(ex *commonpb.WorkflowExecution) error {
889912
ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions)
913+
requestID, err := s.newUUIDString()
914+
if err != nil {
915+
return err
916+
}
890917
areq := &schedspb.CancelWorkflowRequest{
891-
RequestId: s.newUUIDString(),
918+
RequestId: requestID,
892919
Identity: s.identity(),
893920
Execution: ex,
894921
Reason: "cancelled by schedule overlap policy",
895922
}
896-
err := workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
923+
err = workflow.ExecuteLocalActivity(ctx, s.a.CancelWorkflow, areq).Get(s.ctx, nil)
897924
if err != nil {
898925
s.logger.Error("cancel workflow failed", "workflow", ex.WorkflowId, "error", err)
926+
return err
899927
}
900928
// Note: the local activity has completed (or failed) here but the workflow might take time
901929
// to close since a cancel is only a request.
930+
return nil
902931
}
903932

904-
func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) {
933+
func (s *scheduler) terminateWorkflow(ex *commonpb.WorkflowExecution) error {
905934
ctx := workflow.WithLocalActivityOptions(s.ctx, defaultLocalActivityOptions)
935+
requestID, err := s.newUUIDString()
936+
if err != nil {
937+
return err
938+
}
906939
areq := &schedspb.TerminateWorkflowRequest{
907-
RequestId: s.newUUIDString(),
940+
RequestId: requestID,
908941
Identity: s.identity(),
909942
Execution: ex,
910943
Reason: "terminated by schedule overlap policy",
911944
}
912-
err := workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
945+
err = workflow.ExecuteLocalActivity(ctx, s.a.TerminateWorkflow, areq).Get(s.ctx, nil)
913946
if err != nil {
914947
s.logger.Error("terminate workflow failed", "workflow", ex.WorkflowId, "error", err)
915948
}
949+
return err
916950
}
917951

918-
func (s *scheduler) newUUIDString() string {
952+
func (s *scheduler) newUUIDString() (string, error) {
919953
if len(s.uuidBatch) == 0 {
920-
workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
954+
if err := workflow.SideEffect(s.ctx, func(ctx workflow.Context) interface{} {
921955
out := make([]string, 10)
922956
for i := range out {
923957
out[i] = uuid.NewString()
924958
}
925959
return out
926-
}).Get(&s.uuidBatch)
960+
}).Get(&s.uuidBatch); err != nil {
961+
return "", err
962+
}
927963
}
928964
next := s.uuidBatch[0]
929965
s.uuidBatch = s.uuidBatch[1:]
930-
return next
966+
return next, nil
931967
}

service/worker/worker.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,9 @@ func (wm *workerManager) Start() {
9898
}
9999

100100
for _, w := range wm.workers {
101-
w.Start()
101+
if err := w.Start(); err != nil {
102+
wm.logger.Fatal("Unable to start worker", tag.Error(err))
103+
}
102104
}
103105

104106
wm.logger.Info("", tag.ComponentWorkerManager, tag.LifeCycleStarted)

0 commit comments

Comments
 (0)