Skip to content

Commit ba1fa47

Browse files
committed
Remove taskIDBlock data race
1 parent b6d0d0e commit ba1fa47

File tree

4 files changed

+32
-18
lines changed

4 files changed

+32
-18
lines changed

service/matching/backlog_manager.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -223,14 +223,14 @@ func (c *backlogManagerImpl) TotalApproximateBacklogCount() int64 {
223223
}
224224

225225
func (c *backlogManagerImpl) InternalStatus() []*taskqueuespb.InternalTaskQueueStatus {
226+
currentTaskIDBlock := c.taskWriter.currentTaskIDBlock()
226227
return []*taskqueuespb.InternalTaskQueueStatus{
227228
&taskqueuespb.InternalTaskQueueStatus{
228229
ReadLevel: c.taskAckManager.getReadLevel(),
229230
AckLevel: c.taskAckManager.getAckLevel(),
230231
TaskIdBlock: &taskqueuepb.TaskIdBlock{
231-
// TODO(pri): this is a data race, it should only be read by taskWriterLoop
232-
StartId: c.taskWriter.taskIDBlock.start,
233-
EndId: c.taskWriter.taskIDBlock.end,
232+
StartId: currentTaskIDBlock.start,
233+
EndId: currentTaskIDBlock.end,
234234
},
235235
LoadedTasks: c.taskAckManager.getBacklogCountHint(),
236236
MaxReadLevel: c.db.GetMaxReadLevel(subqueueZero),

service/matching/pri_backlog_manager.go

+7-8
Original file line numberDiff line numberDiff line change
@@ -323,11 +323,7 @@ func (c *priBacklogManagerImpl) TotalApproximateBacklogCount() int64 {
323323
}
324324

325325
func (c *priBacklogManagerImpl) InternalStatus() []*taskqueuespb.InternalTaskQueueStatus {
326-
// TODO(pri): this is a data race, it should only be read by taskWriterLoop
327-
idBlock := &taskqueuepb.TaskIdBlock{
328-
StartId: c.taskWriter.taskIDBlock.start,
329-
EndId: c.taskWriter.taskIDBlock.end,
330-
}
326+
currentTaskIDBlock := c.taskWriter.currentTaskIDBlock()
331327

332328
c.subqueueLock.Lock()
333329
defer c.subqueueLock.Unlock()
@@ -336,9 +332,12 @@ func (c *priBacklogManagerImpl) InternalStatus() []*taskqueuespb.InternalTaskQue
336332
for i, r := range c.subqueues {
337333
readLevel, ackLevel := r.getLevels()
338334
status[i] = &taskqueuespb.InternalTaskQueueStatus{
339-
ReadLevel: readLevel,
340-
AckLevel: ackLevel,
341-
TaskIdBlock: idBlock,
335+
ReadLevel: readLevel,
336+
AckLevel: ackLevel,
337+
TaskIdBlock: &taskqueuepb.TaskIdBlock{
338+
StartId: currentTaskIDBlock.start,
339+
EndId: currentTaskIDBlock.end,
340+
},
342341
LoadedTasks: int64(r.getLoadedTasks()),
343342
MaxReadLevel: c.db.GetMaxReadLevel(i),
344343
ApproximateBacklogCount: c.db.getApproximateBacklogCount(i),

service/matching/pri_task_writer.go

+18-7
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ package matching
2626

2727
import (
2828
"context"
29+
"sync/atomic"
2930
"time"
3031

3132
enumspb "go.temporal.io/api/enums/v1"
@@ -52,12 +53,13 @@ type (
5253

5354
// priTaskWriter writes tasks persistence split among subqueues
5455
priTaskWriter struct {
55-
backlogMgr *priBacklogManagerImpl
56-
config *taskQueueConfig
57-
db *taskQueueDB
58-
logger log.Logger
59-
appendCh chan *writeTaskRequest
60-
taskIDBlock taskIDBlock
56+
backlogMgr *priBacklogManagerImpl
57+
config *taskQueueConfig
58+
db *taskQueueDB
59+
logger log.Logger
60+
appendCh chan *writeTaskRequest
61+
taskIDBlock taskIDBlock
62+
lastTaskIDBlock taskIDBlock // copy of the last taskIDBlock for status reporting
6163
}
6264
)
6365

@@ -91,7 +93,6 @@ func (w *priTaskWriter) appendTask(
9193
subqueue int,
9294
taskInfo *persistencespb.TaskInfo,
9395
) error {
94-
9596
select {
9697
case <-w.backlogMgr.tqCtx.Done():
9798
return errShutdown
@@ -185,6 +186,9 @@ func (w *priTaskWriter) taskWriterLoop() {
185186

186187
var reqs []*writeTaskRequest
187188
for {
189+
atomic.StoreInt64(&w.lastTaskIDBlock.start, w.taskIDBlock.start)
190+
atomic.StoreInt64(&w.lastTaskIDBlock.end, w.taskIDBlock.end)
191+
188192
select {
189193
case request := <-w.appendCh:
190194
// read a batch of requests from the channel
@@ -249,3 +253,10 @@ func (w *priTaskWriter) allocTaskIDBlock(prevBlockEnd int64) (taskIDBlock, error
249253
}
250254
return rangeIDToTaskIDBlock(state.rangeID, w.config.RangeSize), nil
251255
}
256+
257+
func (w *priTaskWriter) currentTaskIDBlock() taskIDBlock {
258+
return taskIDBlock{
259+
start: atomic.LoadInt64(&w.taskIDBlock.start),
260+
end: atomic.LoadInt64(&w.taskIDBlock.end),
261+
}
262+
}

service/matching/task_writer.go

+4
Original file line numberDiff line numberDiff line change
@@ -248,3 +248,7 @@ func (w *taskWriter) allocTaskIDBlock(prevBlockEnd int64) (taskIDBlock, error) {
248248
}
249249
return rangeIDToTaskIDBlock(state.rangeID, w.config.RangeSize), nil
250250
}
251+
252+
func (w *taskWriter) currentTaskIDBlock() taskIDBlock {
253+
return w.taskIDBlock // TODO
254+
}

0 commit comments

Comments
 (0)