Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Commit 0a309a1

Browse files
committed
fix(decision): cleanup request queues
Make sure when request queues are idle that they are removed fix #112
1 parent d1f829b commit 0a309a1

File tree

2 files changed

+50
-1
lines changed

2 files changed

+50
-1
lines changed

decision/peer_request_queue.go

+18-1
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,17 @@ func (tl *prq) Pop() *peerRequestTask {
136136
break // and return |out|
137137
}
138138

139-
tl.pQueue.Push(partner)
139+
if partner.IsIdle() {
140+
for target, testPartner := range tl.partners {
141+
if testPartner == partner {
142+
delete(tl.partners, target)
143+
delete(tl.frozen, target)
144+
break
145+
}
146+
}
147+
} else {
148+
tl.pQueue.Push(partner)
149+
}
140150
return out
141151
}
142152

@@ -323,6 +333,7 @@ func (p *activePartner) StartTask(k cid.Cid) {
323333
// TaskDone signals that a task was completed for this partner.
324334
func (p *activePartner) TaskDone(k cid.Cid) {
325335
p.activelk.Lock()
336+
326337
p.activeBlocks.Remove(k)
327338
p.active--
328339
if p.active < 0 {
@@ -331,6 +342,12 @@ func (p *activePartner) TaskDone(k cid.Cid) {
331342
p.activelk.Unlock()
332343
}
333344

345+
func (p *activePartner) IsIdle() bool {
346+
p.activelk.Lock()
347+
defer p.activelk.Unlock()
348+
return p.requests == 0 && p.active == 0
349+
}
350+
334351
// Index implements pq.Elem.
335352
func (p *activePartner) Index() int {
336353
return p.index

decision/peer_request_queue_test.go

+32
Original file line numberDiff line numberDiff line change
@@ -128,3 +128,35 @@ func TestPeerRepeats(t *testing.T) {
128128
}
129129
}
130130
}
131+
132+
func TestCleaningUpQueues(t *testing.T) {
133+
partner := testutil.RandPeerIDFatal(t)
134+
var entries []wantlist.Entry
135+
for i := 0; i < 5; i++ {
136+
entries = append(entries, wantlist.Entry{Cid: cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))})
137+
}
138+
139+
prq := newPRQ()
140+
141+
// push a block, pop a block, complete everything, should be removed
142+
prq.Push(partner, entries...)
143+
task := prq.Pop()
144+
task.Done(task.Entries)
145+
task = prq.Pop()
146+
147+
if task != nil || len(prq.partners) > 0 || prq.pQueue.Len() > 0 {
148+
t.Fatal("Partner should have been removed because it's idle")
149+
}
150+
151+
// push a block, remove each of its entries, should be removed
152+
prq.Push(partner, entries...)
153+
for _, entry := range entries {
154+
prq.Remove(entry.Cid, partner)
155+
}
156+
task = prq.Pop()
157+
158+
if task != nil || len(prq.partners) > 0 || prq.pQueue.Len() > 0 {
159+
t.Fatal("Partner should have been removed because it's idle")
160+
}
161+
162+
}

0 commit comments

Comments
 (0)