Skip to content

Commit b9bba94

Browse files
authored
Fix GetWorkflowExecution in PostgreSQL (#3816)
1 parent 3abd50d commit b9bba94

File tree

4 files changed

+90
-24
lines changed

4 files changed

+90
-24
lines changed

common/persistence/sql/sqlplugin/mysql/visibility.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,7 @@ func (mdb *db) SelectFromVisibility(
273273
return nil, err
274274
}
275275
for i := range rows {
276-
rows[i].StartTime = mdb.converter.FromMySQLDateTime(rows[i].StartTime)
277-
rows[i].ExecutionTime = mdb.converter.FromMySQLDateTime(rows[i].ExecutionTime)
278-
if rows[i].CloseTime != nil {
279-
closeTime := mdb.converter.FromMySQLDateTime(*rows[i].CloseTime)
280-
rows[i].CloseTime = &closeTime
281-
}
276+
mdb.processRowFromDB(&rows[i])
282277
}
283278
return rows, nil
284279
}
@@ -298,5 +293,15 @@ func (mdb *db) GetFromVisibility(
298293
if err != nil {
299294
return nil, err
300295
}
296+
mdb.processRowFromDB(&row)
301297
return &row, nil
302298
}
299+
300+
func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
301+
row.StartTime = mdb.converter.FromMySQLDateTime(row.StartTime)
302+
row.ExecutionTime = mdb.converter.FromMySQLDateTime(row.ExecutionTime)
303+
if row.CloseTime != nil {
304+
closeTime := mdb.converter.FromMySQLDateTime(*row.CloseTime)
305+
row.CloseTime = &closeTime
306+
}
307+
}

common/persistence/sql/sqlplugin/postgresql/visibility.go

+16-11
Original file line numberDiff line numberDiff line change
@@ -290,26 +290,18 @@ func (pdb *db) SelectFromVisibility(
290290
return nil, err
291291
}
292292
for i := range rows {
293-
rows[i].StartTime = pdb.converter.FromPostgreSQLDateTime(rows[i].StartTime)
294-
rows[i].ExecutionTime = pdb.converter.FromPostgreSQLDateTime(rows[i].ExecutionTime)
295-
if rows[i].CloseTime != nil {
296-
closeTime := pdb.converter.FromPostgreSQLDateTime(*rows[i].CloseTime)
297-
rows[i].CloseTime = &closeTime
298-
}
299-
// need to trim the run ID, or otherwise the returned value will
300-
// come with lots of trailing spaces, probably due to the CHAR(64) type
301-
rows[i].RunID = strings.TrimSpace(rows[i].RunID)
293+
pdb.processRowFromDB(&rows[i])
302294
}
303295
return rows, nil
304296
}
305297

306298
// GetFromVisibility reads one row from visibility table
307-
func (mdb *db) GetFromVisibility(
299+
func (pdb *db) GetFromVisibility(
308300
ctx context.Context,
309301
filter sqlplugin.VisibilityGetFilter,
310302
) (*sqlplugin.VisibilityRow, error) {
311303
var row sqlplugin.VisibilityRow
312-
err := mdb.conn.GetContext(ctx,
304+
err := pdb.conn.GetContext(ctx,
313305
&row,
314306
templateGetWorkflowExecution,
315307
filter.NamespaceID,
@@ -318,5 +310,18 @@ func (mdb *db) GetFromVisibility(
318310
if err != nil {
319311
return nil, err
320312
}
313+
pdb.processRowFromDB(&row)
321314
return &row, nil
322315
}
316+
317+
func (pdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
318+
row.StartTime = pdb.converter.FromPostgreSQLDateTime(row.StartTime)
319+
row.ExecutionTime = pdb.converter.FromPostgreSQLDateTime(row.ExecutionTime)
320+
if row.CloseTime != nil {
321+
closeTime := pdb.converter.FromPostgreSQLDateTime(*row.CloseTime)
322+
row.CloseTime = &closeTime
323+
}
324+
// need to trim the run ID, or otherwise the returned value will
325+
// come with lots of trailing spaces, probably due to the CHAR(64) type
326+
row.RunID = strings.TrimSpace(row.RunID)
327+
}

common/persistence/sql/sqlplugin/sqlite/visibility.go

+11-6
Original file line numberDiff line numberDiff line change
@@ -270,12 +270,7 @@ func (mdb *db) SelectFromVisibility(
270270
return nil, err
271271
}
272272
for i := range rows {
273-
rows[i].StartTime = mdb.converter.FromSQLiteDateTime(rows[i].StartTime)
274-
rows[i].ExecutionTime = mdb.converter.FromSQLiteDateTime(rows[i].ExecutionTime)
275-
if rows[i].CloseTime != nil {
276-
closeTime := mdb.converter.FromSQLiteDateTime(*rows[i].CloseTime)
277-
rows[i].CloseTime = &closeTime
278-
}
273+
mdb.processRowFromDB(&rows[i])
279274
}
280275
return rows, nil
281276
}
@@ -295,5 +290,15 @@ func (mdb *db) GetFromVisibility(
295290
if err != nil {
296291
return nil, err
297292
}
293+
mdb.processRowFromDB(&row)
298294
return &row, nil
299295
}
296+
297+
func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
298+
row.StartTime = mdb.converter.FromSQLiteDateTime(row.StartTime)
299+
row.ExecutionTime = mdb.converter.FromSQLiteDateTime(row.ExecutionTime)
300+
if row.CloseTime != nil {
301+
closeTime := mdb.converter.FromSQLiteDateTime(*row.CloseTime)
302+
row.CloseTime = &closeTime
303+
}
304+
}

common/persistence/tests/visibility_persistence_suite_test.go

+52-1
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,7 @@ func (s *VisibilityPersistenceSuite) TestDeleteWorkflow() {
613613

614614
// TestUpsertWorkflowExecution test
615615
func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
616+
temporalChangeVersionPayload, _ := payload.Encode([]string{"dummy"})
616617
tests := []struct {
617618
request *manager.UpsertWorkflowExecutionRequest
618619
expected error
@@ -630,7 +631,7 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
630631
Memo: nil,
631632
SearchAttributes: &commonpb.SearchAttributes{
632633
IndexedFields: map[string]*commonpb.Payload{
633-
searchattribute.TemporalChangeVersion: payload.EncodeBytes([]byte("dummy")),
634+
searchattribute.TemporalChangeVersion: temporalChangeVersionPayload,
634635
},
635636
},
636637
Status: enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
@@ -665,6 +666,56 @@ func (s *VisibilityPersistenceSuite) TestUpsertWorkflowExecution() {
665666
}
666667
}
667668

669+
// TestGetWorkflowExecution test
670+
func (s *VisibilityPersistenceSuite) TestGetWorkflowExecution() {
671+
testNamespaceUUID := namespace.ID(uuid.New())
672+
closeTime := time.Now().UTC()
673+
startTime := closeTime.Add(-5 * time.Second)
674+
675+
var startRequests []*manager.RecordWorkflowExecutionStartedRequest
676+
for i := 0; i < 5; i++ {
677+
startRequests = append(
678+
startRequests,
679+
s.createOpenWorkflowRecord(
680+
testNamespaceUUID,
681+
"visibility-workflow-test",
682+
"visibility-workflow",
683+
startTime,
684+
"test-queue",
685+
),
686+
)
687+
}
688+
for _, req := range startRequests {
689+
resp, err := s.VisibilityMgr.GetWorkflowExecution(
690+
s.ctx,
691+
&manager.GetWorkflowExecutionRequest{
692+
NamespaceID: testNamespaceUUID,
693+
RunID: req.Execution.RunId,
694+
StartTime: &startTime,
695+
},
696+
)
697+
s.NoError(err)
698+
s.assertOpenExecutionEquals(req, resp.Execution)
699+
}
700+
701+
var closeRequests []*manager.RecordWorkflowExecutionClosedRequest
702+
for _, startReq := range startRequests {
703+
closeRequests = append(closeRequests, s.createClosedWorkflowRecord(startReq, closeTime))
704+
}
705+
for _, req := range closeRequests {
706+
resp, err := s.VisibilityMgr.GetWorkflowExecution(
707+
s.ctx,
708+
&manager.GetWorkflowExecutionRequest{
709+
NamespaceID: testNamespaceUUID,
710+
RunID: req.Execution.RunId,
711+
CloseTime: &closeTime,
712+
},
713+
)
714+
s.NoError(err)
715+
s.assertClosedExecutionEquals(req, resp.Execution)
716+
}
717+
}
718+
668719
// TestAdvancedVisibilityPagination test
669720
func (s *VisibilityPersistenceSuite) TestAdvancedVisibilityPagination() {
670721
testNamespaceUUID := namespace.ID(uuid.New())

0 commit comments

Comments
 (0)