Skip to content

Commit 909ca96

Browse files
authored
Add initial memo to scheduler workflows (#3839)
1 parent 7e89af4 commit 909ca96

File tree

3 files changed

+130
-2
lines changed

3 files changed

+130
-2
lines changed

service/frontend/workflow_handler.go

+24
Original file line numberDiff line numberDiff line change
@@ -2982,6 +2982,8 @@ func (wh *WorkflowHandler) CreateSchedule(ctx context.Context, request *workflow
29822982
if err != nil {
29832983
return nil, err
29842984
}
2985+
// Add initial memo for list schedules
2986+
wh.addInitialScheduleMemo(request, input)
29852987
// Add namespace division
29862988
searchattribute.AddSearchAttribute(&request.SearchAttributes, searchattribute.TemporalNamespaceDivision, payload.EncodeString(scheduler.NamespaceDivision))
29872989
// Create StartWorkflowExecutionRequest
@@ -4837,6 +4839,28 @@ func (wh *WorkflowHandler) cleanScheduleMemo(memo *commonpb.Memo) *commonpb.Memo
48374839
return memo
48384840
}
48394841

4842+
// This mutates request (but idempotent so safe for retries)
4843+
func (wh *WorkflowHandler) addInitialScheduleMemo(request *workflowservice.CreateScheduleRequest, args *schedspb.StartScheduleArgs) {
4844+
info := scheduler.GetListInfoFromStartArgs(args)
4845+
infoBytes, err := info.Marshal()
4846+
if err != nil {
4847+
wh.logger.Error("encoding initial schedule memo failed", tag.Error(err))
4848+
return
4849+
}
4850+
p, err := sdk.PreferProtoDataConverter.ToPayload(infoBytes)
4851+
if err != nil {
4852+
wh.logger.Error("encoding initial schedule memo failed", tag.Error(err))
4853+
return
4854+
}
4855+
if request.Memo == nil {
4856+
request.Memo = &commonpb.Memo{}
4857+
}
4858+
if request.Memo.Fields == nil {
4859+
request.Memo.Fields = make(map[string]*commonpb.Payload)
4860+
}
4861+
request.Memo.Fields[scheduler.MemoFieldInfo] = p
4862+
}
4863+
48404864
func getBatchOperationState(workflowState enumspb.WorkflowExecutionStatus) enumspb.BatchOperationState {
48414865
var operationState enumspb.BatchOperationState
48424866
switch workflowState {

service/worker/scheduler/workflow.go

+19-1
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,9 @@ func (s *scheduler) ensureFields() {
254254
func (s *scheduler) compileSpec() {
255255
cspec, err := NewCompiledSpec(s.Schedule.Spec)
256256
if err != nil {
257-
s.logger.Error("Invalid schedule", "error", err)
257+
if s.logger != nil {
258+
s.logger.Error("Invalid schedule", "error", err)
259+
}
258260
s.Info.InvalidScheduleError = err.Error()
259261
s.cspec = nil
260262
} else {
@@ -518,6 +520,8 @@ func (s *scheduler) processSignals() bool {
518520
}
519521

520522
func (s *scheduler) getFutureActionTimes(n int) []*time.Time {
523+
// Note that `s` may be a fake scheduler used to compute list info at creation time.
524+
521525
if s.cspec == nil {
522526
return nil
523527
}
@@ -573,6 +577,9 @@ func (s *scheduler) incSeqNo() {
573577
}
574578

575579
func (s *scheduler) getListInfo() *schedpb.ScheduleListInfo {
580+
// Note that `s` may be a fake scheduler used to compute list info at creation time, before
581+
// the first workflow task. This function and anything it calls should not use s.ctx.
582+
576583
// make shallow copy
577584
spec := *s.Schedule.Spec
578585
// clear fields that are too large/not useful for the list view
@@ -956,3 +963,14 @@ func panicIfErr(err error) {
956963
panic(err)
957964
}
958965
}
966+
967+
func GetListInfoFromStartArgs(args *schedspb.StartScheduleArgs) *schedpb.ScheduleListInfo {
968+
// note that this does not take into account InitialPatch
969+
fakeScheduler := &scheduler{
970+
StartScheduleArgs: *args,
971+
tweakables: currentTweakablePolicies,
972+
}
973+
fakeScheduler.ensureFields()
974+
fakeScheduler.compileSpec()
975+
return fakeScheduler.getListInfo()
976+
}

tests/schedule_test.go

+87-1
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func (s *scheduleIntegrationSuite) TestBasics() {
260260
Namespace: s.namespace,
261261
MaximumPageSize: 5,
262262
})
263-
if err != nil || len(listResp.Schedules) != 1 || len(listResp.Schedules[0].GetInfo().GetRecentActions()) < 2 {
263+
if err != nil || len(listResp.Schedules) != 1 || listResp.Schedules[0].ScheduleId != sid || len(listResp.Schedules[0].GetInfo().GetRecentActions()) < 2 {
264264
return false
265265
}
266266
s.NoError(err)
@@ -467,6 +467,14 @@ func (s *scheduleIntegrationSuite) TestInput() {
467467
_, err = s.engine.CreateSchedule(NewContext(), req)
468468
s.NoError(err)
469469
s.Eventually(func() bool { return atomic.LoadInt32(&runs) == 1 }, 5*time.Second, 200*time.Millisecond)
470+
471+
// cleanup
472+
_, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{
473+
Namespace: s.namespace,
474+
ScheduleId: sid,
475+
Identity: "test",
476+
})
477+
s.NoError(err)
470478
}
471479

472480
func (s *scheduleIntegrationSuite) TestRefresh() {
@@ -547,6 +555,84 @@ func (s *scheduleIntegrationSuite) TestRefresh() {
547555
// scheduler has done some stuff
548556
events3 := s.getHistory(s.namespace, &commonpb.WorkflowExecution{WorkflowId: scheduler.WorkflowIDPrefix + sid})
549557
s.Greater(len(events3), len(events2))
558+
559+
// cleanup
560+
_, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{
561+
Namespace: s.namespace,
562+
ScheduleId: sid,
563+
Identity: "test",
564+
})
565+
s.NoError(err)
566+
}
567+
568+
func (s *scheduleIntegrationSuite) TestListBeforeRun() {
569+
sid := "sched-test-list-before-run"
570+
wid := "sched-test-list-before-run-wf"
571+
wt := "sched-test-list-before-run-wt"
572+
573+
// disable per-ns worker so that the schedule workflow never runs
574+
dc := s.testCluster.host.dcClient
575+
dc.OverrideValue(dynamicconfig.WorkerPerNamespaceWorkerCount, 0)
576+
s.testCluster.host.workerService.RefreshPerNSWorkerManager()
577+
time.Sleep(2 * time.Second)
578+
579+
schedule := &schedulepb.Schedule{
580+
Spec: &schedulepb.ScheduleSpec{
581+
Interval: []*schedulepb.IntervalSpec{
582+
{Interval: timestamp.DurationPtr(3 * time.Second)},
583+
},
584+
},
585+
Action: &schedulepb.ScheduleAction{
586+
Action: &schedulepb.ScheduleAction_StartWorkflow{
587+
StartWorkflow: &workflowpb.NewWorkflowExecutionInfo{
588+
WorkflowId: wid,
589+
WorkflowType: &commonpb.WorkflowType{Name: wt},
590+
TaskQueue: &taskqueuepb.TaskQueue{Name: s.taskQueue},
591+
},
592+
},
593+
},
594+
}
595+
req := &workflowservice.CreateScheduleRequest{
596+
Namespace: s.namespace,
597+
ScheduleId: sid,
598+
Schedule: schedule,
599+
Identity: "test",
600+
RequestId: uuid.New(),
601+
}
602+
603+
_, err := s.engine.CreateSchedule(NewContext(), req)
604+
s.NoError(err)
605+
606+
s.Eventually(func() bool { // wait for visibility
607+
listResp, err := s.engine.ListSchedules(NewContext(), &workflowservice.ListSchedulesRequest{
608+
Namespace: s.namespace,
609+
MaximumPageSize: 5,
610+
})
611+
if err != nil || len(listResp.Schedules) != 1 || listResp.Schedules[0].ScheduleId != sid {
612+
return false
613+
}
614+
s.NoError(err)
615+
entry := listResp.Schedules[0]
616+
s.Equal(sid, entry.ScheduleId)
617+
s.NotNil(entry.Info)
618+
s.Equal(schedule.Spec, entry.Info.Spec)
619+
s.Equal(wt, entry.Info.WorkflowType.Name)
620+
s.False(entry.Info.Paused)
621+
s.Greater(len(entry.Info.FutureActionTimes), 1)
622+
return true
623+
}, 10*time.Second, 1*time.Second)
624+
625+
// cleanup
626+
_, err = s.engine.DeleteSchedule(NewContext(), &workflowservice.DeleteScheduleRequest{
627+
Namespace: s.namespace,
628+
ScheduleId: sid,
629+
Identity: "test",
630+
})
631+
s.NoError(err)
632+
633+
dc.RemoveOverride(dynamicconfig.WorkerPerNamespaceWorkerCount)
634+
s.testCluster.host.workerService.RefreshPerNSWorkerManager()
635+
time.Sleep(2 * time.Second)
550636
}
551637

552638
func (s *scheduleIntegrationSuite) TestRateLimit() {

0 commit comments

Comments
 (0)