Skip to content

Commit a04857d

Browse files
committed
Remove taskIDBlock data race
1 parent b6d0d0e commit a04857d

File tree

4 files changed

+33
-18
lines changed

4 files changed

+33
-18
lines changed

service/matching/backlog_manager.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -223,14 +223,14 @@ func (c *backlogManagerImpl) TotalApproximateBacklogCount() int64 {
223223
}
224224

225225
func (c *backlogManagerImpl) InternalStatus() []*taskqueuespb.InternalTaskQueueStatus {
226+
currentTaskIDBlock := c.taskWriter.currentTaskIDBlock()
226227
return []*taskqueuespb.InternalTaskQueueStatus{
227228
&taskqueuespb.InternalTaskQueueStatus{
228229
ReadLevel: c.taskAckManager.getReadLevel(),
229230
AckLevel: c.taskAckManager.getAckLevel(),
230231
TaskIdBlock: &taskqueuepb.TaskIdBlock{
231-
// TODO(pri): this is a data race, it should only be read by taskWriterLoop
232-
StartId: c.taskWriter.taskIDBlock.start,
233-
EndId: c.taskWriter.taskIDBlock.end,
232+
StartId: currentTaskIDBlock.start,
233+
EndId: currentTaskIDBlock.end,
234234
},
235235
LoadedTasks: c.taskAckManager.getBacklogCountHint(),
236236
MaxReadLevel: c.db.GetMaxReadLevel(subqueueZero),

service/matching/pri_backlog_manager.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -323,11 +323,7 @@ func (c *priBacklogManagerImpl) TotalApproximateBacklogCount() int64 {
323323
}
324324

325325
func (c *priBacklogManagerImpl) InternalStatus() []*taskqueuespb.InternalTaskQueueStatus {
326-
// TODO(pri): this is a data race, it should only be read by taskWriterLoop
327-
idBlock := &taskqueuepb.TaskIdBlock{
328-
StartId: c.taskWriter.taskIDBlock.start,
329-
EndId: c.taskWriter.taskIDBlock.end,
330-
}
326+
currentTaskIDBlock := c.taskWriter.currentTaskIDBlock()
331327

332328
c.subqueueLock.Lock()
333329
defer c.subqueueLock.Unlock()
@@ -336,9 +332,12 @@ func (c *priBacklogManagerImpl) InternalStatus() []*taskqueuespb.InternalTaskQue
336332
for i, r := range c.subqueues {
337333
readLevel, ackLevel := r.getLevels()
338334
status[i] = &taskqueuespb.InternalTaskQueueStatus{
339-
ReadLevel: readLevel,
340-
AckLevel: ackLevel,
341-
TaskIdBlock: idBlock,
335+
ReadLevel: readLevel,
336+
AckLevel: ackLevel,
337+
TaskIdBlock: &taskqueuepb.TaskIdBlock{
338+
StartId: currentTaskIDBlock.start,
339+
EndId: currentTaskIDBlock.end,
340+
},
342341
LoadedTasks: int64(r.getLoadedTasks()),
343342
MaxReadLevel: c.db.GetMaxReadLevel(i),
344343
ApproximateBacklogCount: c.db.getApproximateBacklogCount(i),

service/matching/pri_task_writer.go

+19-7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ package matching
2626

2727
import (
2828
"context"
29+
"sync/atomic"
2930
"time"
3031

3132
enumspb "go.temporal.io/api/enums/v1"
@@ -52,12 +53,13 @@ type (
5253

5354
// priTaskWriter writes tasks persistence split among subqueues
5455
priTaskWriter struct {
55-
backlogMgr *priBacklogManagerImpl
56-
config *taskQueueConfig
57-
db *taskQueueDB
58-
logger log.Logger
59-
appendCh chan *writeTaskRequest
60-
taskIDBlock taskIDBlock
56+
backlogMgr *priBacklogManagerImpl
57+
config *taskQueueConfig
58+
db *taskQueueDB
59+
logger log.Logger
60+
appendCh chan *writeTaskRequest
61+
taskIDBlock taskIDBlock
62+
lastTaskIDBlock taskIDBlock // copy of the last taskIDBlock for status reporting
6163
}
6264
)
6365

@@ -91,7 +93,6 @@ func (w *priTaskWriter) appendTask(
9193
subqueue int,
9294
taskInfo *persistencespb.TaskInfo,
9395
) error {
94-
9596
select {
9697
case <-w.backlogMgr.tqCtx.Done():
9798
return errShutdown
@@ -174,6 +175,7 @@ func (w *priTaskWriter) initState() error {
174175
return err
175176
}
176177
w.taskIDBlock = rangeIDToTaskIDBlock(state.rangeID, w.config.RangeSize)
178+
w.lastTaskIDBlock = w.lastTaskIDBlock
177179
w.backlogMgr.initState(state, nil)
178180
return nil
179181
}
@@ -185,6 +187,9 @@ func (w *priTaskWriter) taskWriterLoop() {
185187

186188
var reqs []*writeTaskRequest
187189
for {
190+
atomic.StoreInt64(&w.lastTaskIDBlock.start, w.taskIDBlock.start)
191+
atomic.StoreInt64(&w.lastTaskIDBlock.end, w.taskIDBlock.end)
192+
188193
select {
189194
case request := <-w.appendCh:
190195
// read a batch of requests from the channel
@@ -249,3 +254,10 @@ func (w *priTaskWriter) allocTaskIDBlock(prevBlockEnd int64) (taskIDBlock, error
249254
}
250255
return rangeIDToTaskIDBlock(state.rangeID, w.config.RangeSize), nil
251256
}
257+
258+
func (w *priTaskWriter) currentTaskIDBlock() taskIDBlock {
259+
return taskIDBlock{
260+
start: atomic.LoadInt64(&w.taskIDBlock.start),
261+
end: atomic.LoadInt64(&w.taskIDBlock.end),
262+
}
263+
}

service/matching/task_writer.go

+4
Original file line numberDiff line numberDiff line change
@@ -248,3 +248,7 @@ func (w *taskWriter) allocTaskIDBlock(prevBlockEnd int64) (taskIDBlock, error) {
248248
}
249249
return rangeIDToTaskIDBlock(state.rangeID, w.config.RangeSize), nil
250250
}
251+
252+
func (w *taskWriter) currentTaskIDBlock() taskIDBlock {
253+
return w.taskIDBlock // TODO
254+
}

0 commit comments

Comments
 (0)