Skip to content

Commit f4e4ea2

Browse files
authored
Remove isRecordValid check from ES visibility store (#3948)
1 parent 0b698c1 commit f4e4ea2

File tree

2 files changed

+17
-52
lines changed

2 files changed

+17
-52
lines changed

common/persistence/visibility/store/elasticsearch/visibility_store.go

+12-47
Original file line numberDiff line numberDiff line change
@@ -300,11 +300,7 @@ func (s *visibilityStore) ListOpenWorkflowExecutions(
300300
return nil, convertElasticsearchClientError("ListOpenWorkflowExecutions failed", err)
301301
}
302302

303-
isRecordValid := func(rec *store.InternalWorkflowExecutionInfo) bool {
304-
return !rec.StartTime.Before(request.EarliestStartTime) && !rec.StartTime.After(request.LatestStartTime)
305-
}
306-
307-
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize, isRecordValid)
303+
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize)
308304
}
309305

310306
func (s *visibilityStore) ListClosedWorkflowExecutions(
@@ -325,11 +321,7 @@ func (s *visibilityStore) ListClosedWorkflowExecutions(
325321
return nil, convertElasticsearchClientError("ListClosedWorkflowExecutions failed", err)
326322
}
327323

328-
isRecordValid := func(rec *store.InternalWorkflowExecutionInfo) bool {
329-
return !rec.CloseTime.Before(request.EarliestStartTime) && !rec.CloseTime.After(request.LatestStartTime)
330-
}
331-
332-
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize, isRecordValid)
324+
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize)
333325
}
334326

335327
func (s *visibilityStore) ListOpenWorkflowExecutionsByType(
@@ -352,11 +344,7 @@ func (s *visibilityStore) ListOpenWorkflowExecutionsByType(
352344
return nil, convertElasticsearchClientError("ListOpenWorkflowExecutionsByType failed", err)
353345
}
354346

355-
isRecordValid := func(rec *store.InternalWorkflowExecutionInfo) bool {
356-
return !rec.StartTime.Before(request.EarliestStartTime) && !rec.StartTime.After(request.LatestStartTime)
357-
}
358-
359-
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize, isRecordValid)
347+
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize)
360348
}
361349

362350
func (s *visibilityStore) ListClosedWorkflowExecutionsByType(
@@ -378,11 +366,7 @@ func (s *visibilityStore) ListClosedWorkflowExecutionsByType(
378366
return nil, convertElasticsearchClientError("ListClosedWorkflowExecutionsByType failed", err)
379367
}
380368

381-
isRecordValid := func(rec *store.InternalWorkflowExecutionInfo) bool {
382-
return !rec.CloseTime.Before(request.EarliestStartTime) && !rec.CloseTime.After(request.LatestStartTime)
383-
}
384-
385-
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize, isRecordValid)
369+
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize)
386370
}
387371

388372
func (s *visibilityStore) ListOpenWorkflowExecutionsByWorkflowID(
@@ -405,11 +389,7 @@ func (s *visibilityStore) ListOpenWorkflowExecutionsByWorkflowID(
405389
return nil, convertElasticsearchClientError("ListOpenWorkflowExecutionsByWorkflowID failed", err)
406390
}
407391

408-
isRecordValid := func(rec *store.InternalWorkflowExecutionInfo) bool {
409-
return !rec.StartTime.Before(request.EarliestStartTime) && !rec.StartTime.After(request.LatestStartTime)
410-
}
411-
412-
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize, isRecordValid)
392+
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize)
413393
}
414394

415395
func (s *visibilityStore) ListClosedWorkflowExecutionsByWorkflowID(
@@ -431,11 +411,7 @@ func (s *visibilityStore) ListClosedWorkflowExecutionsByWorkflowID(
431411
return nil, convertElasticsearchClientError("ListClosedWorkflowExecutionsByWorkflowID failed", err)
432412
}
433413

434-
isRecordValid := func(rec *store.InternalWorkflowExecutionInfo) bool {
435-
return !rec.CloseTime.Before(request.EarliestStartTime) && !rec.CloseTime.After(request.LatestStartTime)
436-
}
437-
438-
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize, isRecordValid)
414+
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize)
439415
}
440416

441417
func (s *visibilityStore) ListClosedWorkflowExecutionsByStatus(
@@ -456,11 +432,7 @@ func (s *visibilityStore) ListClosedWorkflowExecutionsByStatus(
456432
return nil, convertElasticsearchClientError("ListClosedWorkflowExecutionsByStatus failed", err)
457433
}
458434

459-
isRecordValid := func(rec *store.InternalWorkflowExecutionInfo) bool {
460-
return !rec.CloseTime.Before(request.EarliestStartTime) && !rec.CloseTime.After(request.LatestStartTime)
461-
}
462-
463-
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize, isRecordValid)
435+
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize)
464436
}
465437

466438
func (s *visibilityStore) ListWorkflowExecutions(
@@ -486,7 +458,7 @@ func (s *visibilityStore) ListWorkflowExecutions(
486458
return nil, convertElasticsearchClientError("ListWorkflowExecutions failed", err)
487459
}
488460

489-
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize, nil)
461+
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize)
490462
}
491463

492464
func (s *visibilityStore) ScanWorkflowExecutions(
@@ -543,7 +515,7 @@ func (s *visibilityStore) scanWorkflowExecutionsWithPit(
543515
}
544516
}
545517

546-
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize, nil)
518+
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize)
547519
}
548520

549521
func (s *visibilityStore) scanWorkflowExecutionsWithScroll(ctx context.Context, request *manager.ListWorkflowExecutionsRequestV2) (*store.InternalListWorkflowExecutionsResponse, error) {
@@ -584,7 +556,7 @@ func (s *visibilityStore) scanWorkflowExecutionsWithScroll(ctx context.Context,
584556
}
585557
}
586558

587-
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize, nil)
559+
return s.getListWorkflowExecutionsResponse(searchResult, request.Namespace, request.PageSize)
588560
}
589561

590562
func (s *visibilityStore) CountWorkflowExecutions(
@@ -777,7 +749,6 @@ func (s *visibilityStore) getListWorkflowExecutionsResponse(
777749
searchResult *elastic.SearchResult,
778750
namespace namespace.Name,
779751
pageSize int,
780-
isRecordValid func(rec *store.InternalWorkflowExecutionInfo) bool,
781752
) (*store.InternalListWorkflowExecutionsResponse, error) {
782753

783754
if searchResult.Hits == nil || len(searchResult.Hits.Hits) == 0 {
@@ -798,14 +769,8 @@ func (s *visibilityStore) getListWorkflowExecutionsResponse(
798769
if err != nil {
799770
return nil, err
800771
}
801-
// ES6 uses "date" data type not "date_nanos". It truncates dates using milliseconds and might return extra rows.
802-
// For example: 2021-06-12T00:21:43.159739259Z fits 2021-06-12T00:21:43.158Z...2021-06-12T00:21:43.159Z range lte/gte query.
803-
// Therefore, these records need to be filtered out on the client side to support nanos precision.
804-
// After ES6 deprecation isRecordValid can be removed.
805-
if isRecordValid == nil || isRecordValid(workflowExecutionInfo) {
806-
response.Executions = append(response.Executions, workflowExecutionInfo)
807-
lastHitSort = hit.Sort
808-
}
772+
response.Executions = append(response.Executions, workflowExecutionInfo)
773+
lastHitSort = hit.Sort
809774
}
810775

811776
if len(searchResult.Hits.Hits) == pageSize && lastHitSort != nil { // this means the response is not the last page

common/persistence/visibility/store/elasticsearch/visibility_store_read_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -745,7 +745,7 @@ func (s *ESVisibilitySuite) TestGetListWorkflowExecutionsResponse() {
745745
Hits: &elastic.SearchHits{
746746
TotalHits: &elastic.TotalHits{},
747747
}}
748-
resp, err := s.visibilityStore.getListWorkflowExecutionsResponse(searchResult, testNamespace, 1, nil)
748+
resp, err := s.visibilityStore.getListWorkflowExecutionsResponse(searchResult, testNamespace, 1)
749749
s.NoError(err)
750750
s.Equal(0, len(resp.NextPageToken))
751751
s.Equal(0, len(resp.Executions))
@@ -768,14 +768,14 @@ func (s *ESVisibilitySuite) TestGetListWorkflowExecutionsResponse() {
768768
}
769769
searchResult.Hits.Hits = []*elastic.SearchHit{searchHit}
770770
searchResult.Hits.TotalHits.Value = 1
771-
resp, err = s.visibilityStore.getListWorkflowExecutionsResponse(searchResult, testNamespace, 1, nil)
771+
resp, err = s.visibilityStore.getListWorkflowExecutionsResponse(searchResult, testNamespace, 1)
772772
s.NoError(err)
773773
serializedToken, _ := s.visibilityStore.serializePageToken(&visibilityPageToken{SearchAfter: []interface{}{1547596872371234567, "e481009e-14b3-45ae-91af-dce6e2a88365"}})
774774
s.Equal(serializedToken, resp.NextPageToken)
775775
s.Equal(1, len(resp.Executions))
776776

777777
// test for last page hits
778-
resp, err = s.visibilityStore.getListWorkflowExecutionsResponse(searchResult, testNamespace, 2, nil)
778+
resp, err = s.visibilityStore.getListWorkflowExecutionsResponse(searchResult, testNamespace, 2)
779779
s.NoError(err)
780780
s.Equal(0, len(resp.NextPageToken))
781781
s.Equal(1, len(resp.Executions))
@@ -786,7 +786,7 @@ func (s *ESVisibilitySuite) TestGetListWorkflowExecutionsResponse() {
786786
searchResult.Hits.Hits = append(searchResult.Hits.Hits, searchHit)
787787
}
788788
numOfHits := len(searchResult.Hits.Hits)
789-
resp, err = s.visibilityStore.getListWorkflowExecutionsResponse(searchResult, testNamespace, numOfHits, nil)
789+
resp, err = s.visibilityStore.getListWorkflowExecutionsResponse(searchResult, testNamespace, numOfHits)
790790
s.NoError(err)
791791
s.Equal(numOfHits, len(resp.Executions))
792792
nextPageToken, err := s.visibilityStore.deserializePageToken(resp.NextPageToken)
@@ -796,7 +796,7 @@ func (s *ESVisibilitySuite) TestGetListWorkflowExecutionsResponse() {
796796
s.Equal(int64(1547596872371234567), resultSortValue)
797797
s.Equal("e481009e-14b3-45ae-91af-dce6e2a88365", nextPageToken.SearchAfter[1])
798798
// for last page
799-
resp, err = s.visibilityStore.getListWorkflowExecutionsResponse(searchResult, testNamespace, numOfHits+1, nil)
799+
resp, err = s.visibilityStore.getListWorkflowExecutionsResponse(searchResult, testNamespace, numOfHits+1)
800800
s.NoError(err)
801801
s.Equal(0, len(resp.NextPageToken))
802802
s.Equal(numOfHits, len(resp.Executions))

0 commit comments

Comments
 (0)