Skip to content

Commit bd2d3b3

Browse files
authored
Advanced visibility for SQLite (#3895)
1 parent 32a4686 commit bd2d3b3

File tree

7 files changed

+420
-207
lines changed

7 files changed

+420
-207
lines changed

common/persistence/sql/sqlplugin/sqlite/plugin.go

+2
Original file line numberDiff line numberDiff line change
@@ -204,5 +204,7 @@ func buildDSNAttr(cfg *config.SQL) (url.Values, error) {
204204
// assume pragma
205205
parameters.Add("_pragma", fmt.Sprintf("%s=%s", key, value))
206206
}
207+
// set time format
208+
parameters.Add("_time_format", "sqlite")
207209
return parameters, nil
208210
}

common/persistence/sql/sqlplugin/sqlite/typeconv.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (c *converter) ToSQLiteDateTime(t time.Time) time.Time {
4747
if t.IsZero() {
4848
return minSQLiteDateTime
4949
}
50-
return t.UTC()
50+
return t.UTC().Truncate(time.Microsecond)
5151
}
5252

5353
// FromSQLiteDateTime converts SQLite datetime and returns go time

common/persistence/sql/sqlplugin/sqlite/visibility.go

+106-205
Original file line numberDiff line numberDiff line change
@@ -29,248 +29,104 @@ package sqlite
2929
import (
3030
"context"
3131
"database/sql"
32-
"errors"
3332
"fmt"
33+
"strings"
3434

3535
"go.temporal.io/server/common/persistence/sql/sqlplugin"
3636
)
3737

38-
const (
39-
templateCreateWorkflowExecutionStarted = `INSERT INTO executions_visibility (` +
40-
`namespace_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, status, memo, encoding, task_queue) ` +
41-
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` +
42-
`ON CONFLICT (namespace_id, run_id) DO NOTHING`
38+
var (
39+
keywordListSeparator = "♡"
4340

44-
templateCreateWorkflowExecutionClosed = `REPLACE INTO executions_visibility (` +
45-
`namespace_id, workflow_id, run_id, start_time, execution_time, workflow_type_name, close_time, status, history_length, memo, encoding, task_queue) ` +
46-
`VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `
47-
48-
// RunID condition is needed for correct pagination
49-
templateConditions = ` AND namespace_id = ?
50-
AND start_time >= ?
51-
AND start_time <= ?
52-
AND ((run_id > ? and start_time = ?) OR (start_time < ?))
53-
ORDER BY start_time DESC, run_id
54-
LIMIT ?`
55-
56-
templateConditionsClosedWorkflows = ` AND namespace_id = ?
57-
AND close_time >= ?
58-
AND close_time <= ?
59-
AND ((run_id > ? and close_time = ?) OR (close_time < ?))
60-
ORDER BY close_time DESC, run_id
61-
LIMIT ?`
62-
63-
templateOpenFieldNames = `workflow_id, run_id, start_time, execution_time, workflow_type_name, status, memo, encoding, task_queue`
64-
templateOpenSelect = `SELECT ` + templateOpenFieldNames + ` FROM executions_visibility WHERE status = 1 `
65-
66-
templateClosedSelect = `SELECT ` + templateOpenFieldNames + `, close_time, history_length
67-
FROM executions_visibility WHERE status != 1 `
68-
69-
templateGetOpenWorkflowExecutions = templateOpenSelect + templateConditions
70-
71-
templateGetClosedWorkflowExecutions = templateClosedSelect + templateConditionsClosedWorkflows
72-
73-
templateGetOpenWorkflowExecutionsByType = templateOpenSelect + `AND workflow_type_name = ?` + templateConditions
74-
75-
templateGetClosedWorkflowExecutionsByType = templateClosedSelect + `AND workflow_type_name = ?` + templateConditionsClosedWorkflows
76-
77-
templateGetOpenWorkflowExecutionsByID = templateOpenSelect + `AND workflow_id = ?` + templateConditions
78-
79-
templateGetClosedWorkflowExecutionsByID = templateClosedSelect + `AND workflow_id = ?` + templateConditionsClosedWorkflows
80-
81-
templateGetClosedWorkflowExecutionsByStatus = templateClosedSelect + `AND status = ?` + templateConditionsClosedWorkflows
41+
templateInsertWorkflowExecution = fmt.Sprintf(
42+
`INSERT INTO executions_visibility (%s)
43+
VALUES (%s)
44+
ON CONFLICT (namespace_id, run_id) DO NOTHING`,
45+
strings.Join(sqlplugin.DbFields, ", "),
46+
sqlplugin.BuildNamedPlaceholder(sqlplugin.DbFields...),
47+
)
8248

83-
templateGetClosedWorkflowExecution = `SELECT workflow_id, run_id, start_time, execution_time, memo, encoding, close_time, workflow_type_name, status, history_length, task_queue
84-
FROM executions_visibility
85-
WHERE namespace_id = ? AND status != 1
86-
AND run_id = ?`
49+
templateUpsertWorkflowExecution = fmt.Sprintf(
50+
`INSERT INTO executions_visibility (%s)
51+
VALUES (%s)
52+
%s`,
53+
strings.Join(sqlplugin.DbFields, ", "),
54+
sqlplugin.BuildNamedPlaceholder(sqlplugin.DbFields...),
55+
buildOnDuplicateKeyUpdate(sqlplugin.DbFields...),
56+
)
8757

88-
templateGetWorkflowExecution = `
89-
SELECT
90-
workflow_id,
91-
run_id,
92-
start_time,
93-
execution_time,
94-
memo,
95-
encoding,
96-
close_time,
97-
workflow_type_name,
98-
status,
99-
history_length,
100-
task_queue
101-
FROM executions_visibility
102-
WHERE namespace_id = ? AND run_id = ?`
58+
templateDeleteWorkflowExecution = `
59+
DELETE FROM executions_visibility
60+
WHERE namespace_id = :namespace_id AND run_id = :run_id`
10361

104-
templateDeleteWorkflowExecution = "DELETE FROM executions_visibility WHERE namespace_id = ? AND run_id = ?"
62+
templateGetWorkflowExecution = fmt.Sprintf(
63+
`SELECT %s FROM executions_visibility
64+
WHERE namespace_id = :namespace_id AND run_id = :run_id`,
65+
strings.Join(sqlplugin.DbFields, ", "),
66+
)
10567
)
10668

107-
var errCloseParams = errors.New("missing one of {closeTime, historyLength} params")
69+
func buildOnDuplicateKeyUpdate(fields ...string) string {
70+
items := make([]string, len(fields))
71+
for i, field := range fields {
72+
items[i] = fmt.Sprintf("%s = excluded.%s", field, field)
73+
}
74+
return fmt.Sprintf(
75+
"ON CONFLICT (namespace_id, run_id) DO UPDATE SET %s",
76+
strings.Join(items, ", "),
77+
)
78+
}
10879

10980
// InsertIntoVisibility inserts a row into visibility table. If an row already exist,
11081
// its left as such and no update will be made
11182
func (mdb *db) InsertIntoVisibility(
11283
ctx context.Context,
11384
row *sqlplugin.VisibilityRow,
11485
) (sql.Result, error) {
115-
row.StartTime = mdb.converter.ToSQLiteDateTime(row.StartTime)
116-
return mdb.conn.ExecContext(ctx,
117-
templateCreateWorkflowExecutionStarted,
118-
row.NamespaceID,
119-
row.WorkflowID,
120-
row.RunID,
121-
row.StartTime,
122-
row.ExecutionTime,
123-
row.WorkflowTypeName,
124-
row.Status,
125-
row.Memo,
126-
row.Encoding,
127-
row.TaskQueue,
128-
)
86+
finalRow := mdb.prepareRowForDB(row)
87+
return mdb.conn.NamedExecContext(ctx, templateInsertWorkflowExecution, finalRow)
12988
}
13089

13190
// ReplaceIntoVisibility replaces an existing row if it exist or creates a new row in visibility table
13291
func (mdb *db) ReplaceIntoVisibility(
13392
ctx context.Context,
13493
row *sqlplugin.VisibilityRow,
13594
) (sql.Result, error) {
136-
switch {
137-
case row.CloseTime != nil && row.HistoryLength != nil:
138-
row.StartTime = mdb.converter.ToSQLiteDateTime(row.StartTime)
139-
closeTime := mdb.converter.ToSQLiteDateTime(*row.CloseTime)
140-
return mdb.conn.ExecContext(ctx,
141-
templateCreateWorkflowExecutionClosed,
142-
row.NamespaceID,
143-
row.WorkflowID,
144-
row.RunID,
145-
row.StartTime,
146-
row.ExecutionTime,
147-
row.WorkflowTypeName,
148-
closeTime,
149-
row.Status,
150-
*row.HistoryLength,
151-
row.Memo,
152-
row.Encoding,
153-
row.TaskQueue,
154-
)
155-
default:
156-
return nil, errCloseParams
157-
}
95+
finalRow := mdb.prepareRowForDB(row)
96+
return mdb.conn.NamedExecContext(ctx, templateUpsertWorkflowExecution, finalRow)
15897
}
15998

16099
// DeleteFromVisibility deletes a row from visibility table if it exist
161100
func (mdb *db) DeleteFromVisibility(
162101
ctx context.Context,
163102
filter sqlplugin.VisibilityDeleteFilter,
164103
) (sql.Result, error) {
165-
return mdb.conn.ExecContext(ctx,
166-
templateDeleteWorkflowExecution,
167-
filter.NamespaceID,
168-
filter.RunID,
169-
)
104+
return mdb.conn.NamedExecContext(ctx, templateDeleteWorkflowExecution, filter)
170105
}
171106

172107
// SelectFromVisibility reads one or more rows from visibility table
173108
func (mdb *db) SelectFromVisibility(
174109
ctx context.Context,
175110
filter sqlplugin.VisibilitySelectFilter,
176111
) ([]sqlplugin.VisibilityRow, error) {
177-
var err error
178-
var rows []sqlplugin.VisibilityRow
179-
if filter.MinTime != nil {
180-
*filter.MinTime = mdb.converter.ToSQLiteDateTime(*filter.MinTime)
181-
}
182-
if filter.MaxTime != nil {
183-
*filter.MaxTime = mdb.converter.ToSQLiteDateTime(*filter.MaxTime)
184-
}
185-
// If filter.Status == 0 (UNSPECIFIED) then only closed workflows will be returned (all excluding 1 (RUNNING)).
186-
switch {
187-
case filter.MinTime == nil && filter.RunID != nil && filter.Status != 1:
188-
var row sqlplugin.VisibilityRow
189-
err = mdb.conn.GetContext(ctx,
190-
&row,
191-
templateGetClosedWorkflowExecution,
192-
filter.NamespaceID,
193-
*filter.RunID,
194-
)
195-
if err == nil {
196-
rows = append(rows, row)
112+
if len(filter.Query) == 0 {
113+
// backward compatibility for existing tests
114+
err := sqlplugin.GenerateSelectQuery(&filter, mdb.converter.ToSQLiteDateTime)
115+
if err != nil {
116+
return nil, err
197117
}
198-
case filter.MinTime != nil && filter.MaxTime != nil &&
199-
filter.WorkflowID != nil && filter.RunID != nil && filter.PageSize != nil:
200-
qry := templateGetOpenWorkflowExecutionsByID
201-
if filter.Status != 1 {
202-
qry = templateGetClosedWorkflowExecutionsByID
203-
}
204-
err = mdb.conn.SelectContext(ctx,
205-
&rows,
206-
qry,
207-
*filter.WorkflowID,
208-
filter.NamespaceID,
209-
*filter.MinTime,
210-
*filter.MaxTime,
211-
*filter.RunID,
212-
*filter.MaxTime,
213-
*filter.MaxTime,
214-
*filter.PageSize,
215-
)
216-
case filter.MinTime != nil && filter.MaxTime != nil &&
217-
filter.WorkflowTypeName != nil && filter.RunID != nil && filter.PageSize != nil:
218-
qry := templateGetOpenWorkflowExecutionsByType
219-
if filter.Status != 1 {
220-
qry = templateGetClosedWorkflowExecutionsByType
221-
}
222-
err = mdb.conn.SelectContext(ctx,
223-
&rows,
224-
qry,
225-
*filter.WorkflowTypeName,
226-
filter.NamespaceID,
227-
*filter.MinTime,
228-
*filter.MaxTime,
229-
*filter.RunID,
230-
*filter.MaxTime,
231-
*filter.MaxTime,
232-
*filter.PageSize,
233-
)
234-
case filter.MinTime != nil && filter.MaxTime != nil &&
235-
filter.RunID != nil && filter.PageSize != nil &&
236-
filter.Status != 0 && filter.Status != 1: // 0 is UNSPECIFIED, 1 is RUNNING
237-
err = mdb.conn.SelectContext(ctx,
238-
&rows,
239-
templateGetClosedWorkflowExecutionsByStatus,
240-
filter.Status,
241-
filter.NamespaceID,
242-
*filter.MinTime,
243-
*filter.MaxTime,
244-
*filter.RunID,
245-
*filter.MaxTime,
246-
*filter.MaxTime,
247-
*filter.PageSize,
248-
)
249-
case filter.MinTime != nil && filter.MaxTime != nil &&
250-
filter.RunID != nil && filter.PageSize != nil:
251-
qry := templateGetOpenWorkflowExecutions
252-
if filter.Status != 1 {
253-
qry = templateGetClosedWorkflowExecutions
254-
}
255-
err = mdb.conn.SelectContext(ctx,
256-
&rows,
257-
qry,
258-
filter.NamespaceID,
259-
*filter.MinTime,
260-
*filter.MaxTime,
261-
*filter.RunID,
262-
*filter.MaxTime,
263-
*filter.MaxTime,
264-
*filter.PageSize,
265-
)
266-
default:
267-
return nil, fmt.Errorf("invalid query filter")
268118
}
119+
120+
var rows []sqlplugin.VisibilityRow
121+
err := mdb.conn.SelectContext(ctx, &rows, filter.Query, filter.QueryArgs...)
269122
if err != nil {
270123
return nil, err
271124
}
272125
for i := range rows {
273-
mdb.processRowFromDB(&rows[i])
126+
err = mdb.processRowFromDB(&rows[i])
127+
if err != nil {
128+
return nil, err
129+
}
274130
}
275131
return rows, nil
276132
}
@@ -281,24 +137,69 @@ func (mdb *db) GetFromVisibility(
281137
filter sqlplugin.VisibilityGetFilter,
282138
) (*sqlplugin.VisibilityRow, error) {
283139
var row sqlplugin.VisibilityRow
284-
err := mdb.conn.GetContext(ctx,
285-
&row,
286-
templateGetWorkflowExecution,
287-
filter.NamespaceID,
288-
filter.RunID,
289-
)
140+
stmt, err := mdb.conn.PrepareNamedContext(ctx, templateGetWorkflowExecution)
141+
if err != nil {
142+
return nil, err
143+
}
144+
err = stmt.GetContext(ctx, &row, filter)
145+
if err != nil {
146+
return nil, err
147+
}
148+
err = mdb.processRowFromDB(&row)
290149
if err != nil {
291150
return nil, err
292151
}
293-
mdb.processRowFromDB(&row)
294152
return &row, nil
295153
}
296154

297-
func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) {
155+
func (mdb *db) prepareRowForDB(row *sqlplugin.VisibilityRow) *sqlplugin.VisibilityRow {
156+
if row == nil {
157+
return nil
158+
}
159+
finalRow := *row
160+
finalRow.StartTime = mdb.converter.ToSQLiteDateTime(finalRow.StartTime)
161+
finalRow.ExecutionTime = mdb.converter.ToSQLiteDateTime(finalRow.ExecutionTime)
162+
if finalRow.CloseTime != nil {
163+
*finalRow.CloseTime = mdb.converter.ToSQLiteDateTime(*finalRow.CloseTime)
164+
}
165+
if finalRow.SearchAttributes != nil {
166+
finalSearchAttributes := sqlplugin.VisibilitySearchAttributes{}
167+
for name, value := range *finalRow.SearchAttributes {
168+
switch v := value.(type) {
169+
case []string:
170+
finalSearchAttributes[name] = strings.Join(v, keywordListSeparator)
171+
default:
172+
finalSearchAttributes[name] = v
173+
}
174+
}
175+
finalRow.SearchAttributes = &finalSearchAttributes
176+
}
177+
return &finalRow
178+
}
179+
180+
func (mdb *db) processRowFromDB(row *sqlplugin.VisibilityRow) error {
181+
if row == nil {
182+
return nil
183+
}
298184
row.StartTime = mdb.converter.FromSQLiteDateTime(row.StartTime)
299185
row.ExecutionTime = mdb.converter.FromSQLiteDateTime(row.ExecutionTime)
300186
if row.CloseTime != nil {
301187
closeTime := mdb.converter.FromSQLiteDateTime(*row.CloseTime)
302188
row.CloseTime = &closeTime
303189
}
190+
if row.SearchAttributes != nil {
191+
for saName, saValue := range *row.SearchAttributes {
192+
switch typedSaValue := saValue.(type) {
193+
case string:
194+
if strings.Index(typedSaValue, keywordListSeparator) >= 0 {
195+
// If the string contains the keywordListSeparator, then we need to split it
196+
// into a list of keywords.
197+
(*row.SearchAttributes)[saName] = strings.Split(typedSaValue, keywordListSeparator)
198+
}
199+
default:
200+
// no-op
201+
}
202+
}
203+
}
204+
return nil
304205
}

0 commit comments

Comments
 (0)