Skip to content

Commit 7faf6f8

Browse files
authored
Handle namespace division filter in SQL queries (#3931)
1 parent de6ca9c commit 7faf6f8

File tree

2 files changed

+145
-55
lines changed

2 files changed

+145
-55
lines changed

common/persistence/visibility/store/sql/query_converter.go

+39-2
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,8 @@ type (
6464
request *manager.ListWorkflowExecutionsRequestV2
6565
saTypeMap searchattribute.NameTypeMap
6666
saMapper searchattribute.Mapper
67+
68+
seenNamespaceDivision bool
6769
}
6870
)
6971

@@ -121,6 +123,8 @@ func newQueryConverterInternal(
121123
request: request,
122124
saTypeMap: saTypeMap,
123125
saMapper: saMapper,
126+
127+
seenNamespaceDivision: false,
124128
}
125129
}
126130

@@ -180,8 +184,38 @@ func (c *QueryConverter) convertSelectStmt(sel *sqlparser.Select) error {
180184
return query.NewConverterError("%s: 'limit' clause", query.NotSupportedErrMessage)
181185
}
182186

183-
if sel.Where != nil {
184-
return c.convertWhereExpr(&sel.Where.Expr)
187+
if sel.Where == nil {
188+
sel.Where = &sqlparser.Where{
189+
Type: sqlparser.WhereStr,
190+
Expr: nil,
191+
}
192+
}
193+
194+
if sel.Where.Expr != nil {
195+
err := c.convertWhereExpr(&sel.Where.Expr)
196+
if err != nil {
197+
return err
198+
}
199+
}
200+
201+
// This logic comes from elasticsearch/visibility_store.go#convertQuery function.
202+
// If the query did not explicitly filter on TemporalNamespaceDivision,
203+
// then add "is null" query to it.
204+
if !c.seenNamespaceDivision {
205+
namespaceDivisionExpr := &sqlparser.IsExpr{
206+
Operator: sqlparser.IsNullStr,
207+
Expr: newColName(
208+
searchattribute.GetSqlDbColName(searchattribute.TemporalNamespaceDivision),
209+
),
210+
}
211+
if sel.Where.Expr == nil {
212+
sel.Where.Expr = namespaceDivisionExpr
213+
} else {
214+
sel.Where.Expr = &sqlparser.AndExpr{
215+
Left: sel.Where.Expr,
216+
Right: namespaceDivisionExpr,
217+
}
218+
}
185219
}
186220

187221
return nil
@@ -347,6 +381,9 @@ func (c *QueryConverter) convertColName(
347381
return "", "", err
348382
}
349383
}
384+
if saFieldName == searchattribute.TemporalNamespaceDivision {
385+
c.seenNamespaceDivision = true
386+
}
350387
var newExpr sqlparser.Expr = newColName(searchattribute.GetSqlDbColName(saFieldName))
351388
if saAlias == searchattribute.CloseTime {
352389
newExpr = c.getCoalesceCloseTimeExpr()

common/persistence/visibility/store/sql/visibility_store.go

+106-53
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package sql
2727
import (
2828
"context"
2929
"fmt"
30+
"strings"
3031
"time"
3132

3233
"go.temporal.io/api/common/v1"
@@ -199,13 +200,11 @@ func (s *VisibilityStore) ListOpenWorkflowExecutions(
199200
Namespace: request.Namespace,
200201
PageSize: request.PageSize,
201202
NextPageToken: request.NextPageToken,
202-
Query: fmt.Sprintf(
203-
"%s = %d AND %s BETWEEN '%s' AND '%s'",
204-
searchattribute.ExecutionStatus,
205-
int32(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING),
206-
searchattribute.StartTime,
207-
request.EarliestStartTime.UTC().Format(time.RFC3339Nano),
208-
request.LatestStartTime.UTC().Format(time.RFC3339Nano),
203+
Query: s.buildQueryStringFromListRequest(
204+
request,
205+
enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
206+
"",
207+
"",
209208
),
210209
},
211210
)
@@ -222,13 +221,11 @@ func (s *VisibilityStore) ListClosedWorkflowExecutions(
222221
Namespace: request.Namespace,
223222
PageSize: request.PageSize,
224223
NextPageToken: request.NextPageToken,
225-
Query: fmt.Sprintf(
226-
"%s != %d AND %s BETWEEN '%s' AND '%s'",
227-
searchattribute.ExecutionStatus,
228-
int32(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING),
229-
searchattribute.CloseTime,
230-
request.EarliestStartTime.UTC().Format(time.RFC3339Nano),
231-
request.LatestStartTime.UTC().Format(time.RFC3339Nano),
224+
Query: s.buildQueryStringFromListRequest(
225+
request,
226+
enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED,
227+
"",
228+
"",
232229
),
233230
},
234231
)
@@ -245,15 +242,11 @@ func (s *VisibilityStore) ListOpenWorkflowExecutionsByType(
245242
Namespace: request.Namespace,
246243
PageSize: request.PageSize,
247244
NextPageToken: request.NextPageToken,
248-
Query: fmt.Sprintf(
249-
"%s = %d AND %s = '%s' AND %s BETWEEN '%s' AND '%s'",
250-
searchattribute.ExecutionStatus,
251-
int32(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING),
252-
searchattribute.WorkflowType,
245+
Query: s.buildQueryStringFromListRequest(
246+
request.ListWorkflowExecutionsRequest,
247+
enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
248+
"",
253249
request.WorkflowTypeName,
254-
searchattribute.StartTime,
255-
request.EarliestStartTime.UTC().Format(time.RFC3339Nano),
256-
request.LatestStartTime.UTC().Format(time.RFC3339Nano),
257250
),
258251
},
259252
)
@@ -270,15 +263,11 @@ func (s *VisibilityStore) ListClosedWorkflowExecutionsByType(
270263
Namespace: request.Namespace,
271264
PageSize: request.PageSize,
272265
NextPageToken: request.NextPageToken,
273-
Query: fmt.Sprintf(
274-
"%s != %d AND %s = '%s' AND %s BETWEEN '%s' AND '%s'",
275-
searchattribute.ExecutionStatus,
276-
int32(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING),
277-
searchattribute.WorkflowType,
266+
Query: s.buildQueryStringFromListRequest(
267+
request.ListWorkflowExecutionsRequest,
268+
enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED,
269+
"",
278270
request.WorkflowTypeName,
279-
searchattribute.CloseTime,
280-
request.EarliestStartTime.UTC().Format(time.RFC3339Nano),
281-
request.LatestStartTime.UTC().Format(time.RFC3339Nano),
282271
),
283272
},
284273
)
@@ -295,15 +284,11 @@ func (s *VisibilityStore) ListOpenWorkflowExecutionsByWorkflowID(
295284
Namespace: request.Namespace,
296285
PageSize: request.PageSize,
297286
NextPageToken: request.NextPageToken,
298-
Query: fmt.Sprintf(
299-
"%s = %d AND %s = '%s' AND %s BETWEEN '%s' AND '%s'",
300-
searchattribute.ExecutionStatus,
301-
int32(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING),
302-
searchattribute.WorkflowID,
287+
Query: s.buildQueryStringFromListRequest(
288+
request.ListWorkflowExecutionsRequest,
289+
enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING,
303290
request.WorkflowID,
304-
searchattribute.StartTime,
305-
request.EarliestStartTime.UTC().Format(time.RFC3339Nano),
306-
request.LatestStartTime.UTC().Format(time.RFC3339Nano),
291+
"",
307292
),
308293
},
309294
)
@@ -320,15 +305,11 @@ func (s *VisibilityStore) ListClosedWorkflowExecutionsByWorkflowID(
320305
Namespace: request.Namespace,
321306
PageSize: request.PageSize,
322307
NextPageToken: request.NextPageToken,
323-
Query: fmt.Sprintf(
324-
"%s != %d AND %s = '%s' AND %s BETWEEN '%s' AND '%s'",
325-
searchattribute.ExecutionStatus,
326-
int32(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING),
327-
searchattribute.WorkflowID,
308+
Query: s.buildQueryStringFromListRequest(
309+
request.ListWorkflowExecutionsRequest,
310+
enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED,
328311
request.WorkflowID,
329-
searchattribute.CloseTime,
330-
request.EarliestStartTime.UTC().Format(time.RFC3339Nano),
331-
request.LatestStartTime.UTC().Format(time.RFC3339Nano),
312+
"",
332313
),
333314
},
334315
)
@@ -345,13 +326,11 @@ func (s *VisibilityStore) ListClosedWorkflowExecutionsByStatus(
345326
Namespace: request.Namespace,
346327
PageSize: request.PageSize,
347328
NextPageToken: request.NextPageToken,
348-
Query: fmt.Sprintf(
349-
"%s = %d AND %s BETWEEN '%s' AND '%s'",
350-
searchattribute.ExecutionStatus,
351-
int32(request.Status),
352-
searchattribute.CloseTime,
353-
request.EarliestStartTime.UTC().Format(time.RFC3339Nano),
354-
request.LatestStartTime.UTC().Format(time.RFC3339Nano),
329+
Query: s.buildQueryStringFromListRequest(
330+
request.ListWorkflowExecutionsRequest,
331+
request.Status,
332+
"",
333+
"",
355334
),
356335
},
357336
)
@@ -587,3 +566,77 @@ func (s *VisibilityStore) processRowSearchAttributes(
587566
}
588567
return searchAttributes, nil
589568
}
569+
570+
func (s *VisibilityStore) buildQueryStringFromListRequest(
571+
request *manager.ListWorkflowExecutionsRequest,
572+
executionStatus enumspb.WorkflowExecutionStatus,
573+
workflowID string,
574+
workflowTypeName string,
575+
) string {
576+
var queryTerms []string
577+
578+
switch executionStatus {
579+
case enumspb.WORKFLOW_EXECUTION_STATUS_UNSPECIFIED:
580+
queryTerms = append(
581+
queryTerms,
582+
fmt.Sprintf(
583+
"%s != %d",
584+
searchattribute.ExecutionStatus,
585+
int32(enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING),
586+
),
587+
)
588+
default:
589+
queryTerms = append(
590+
queryTerms,
591+
fmt.Sprintf("%s = %d", searchattribute.ExecutionStatus, int32(executionStatus)),
592+
)
593+
}
594+
595+
var timeAttr string
596+
if executionStatus == enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
597+
timeAttr = searchattribute.StartTime
598+
} else {
599+
timeAttr = searchattribute.CloseTime
600+
}
601+
queryTerms = append(
602+
queryTerms,
603+
fmt.Sprintf(
604+
"%s BETWEEN '%s' AND '%s'",
605+
timeAttr,
606+
request.EarliestStartTime.UTC().Format(time.RFC3339Nano),
607+
request.LatestStartTime.UTC().Format(time.RFC3339Nano),
608+
),
609+
)
610+
611+
if request.NamespaceDivision != "" {
612+
queryTerms = append(
613+
queryTerms,
614+
fmt.Sprintf(
615+
"%s = '%s'",
616+
searchattribute.TemporalNamespaceDivision,
617+
request.NamespaceDivision,
618+
),
619+
)
620+
} else {
621+
queryTerms = append(
622+
queryTerms,
623+
fmt.Sprintf("%s IS NULL", searchattribute.TemporalNamespaceDivision),
624+
)
625+
}
626+
627+
if workflowID != "" {
628+
queryTerms = append(
629+
queryTerms,
630+
fmt.Sprintf("%s = '%s'", searchattribute.WorkflowID, workflowID),
631+
)
632+
}
633+
634+
if workflowTypeName != "" {
635+
queryTerms = append(
636+
queryTerms,
637+
fmt.Sprintf("%s = '%s'", searchattribute.WorkflowType, workflowTypeName),
638+
)
639+
}
640+
641+
return strings.Join(queryTerms, " AND ")
642+
}

0 commit comments

Comments
 (0)