Skip to content

Commit ee95084

Browse files
authored
Merge pull request ipfs/go-bitswap#119 from ipfs/feat/use-peer-task-queue-package
Use shared peer task queue with Graphsync This commit was moved from ipfs/go-bitswap@a32fa8a
2 parents 58e0800 + 99b56c3 commit ee95084

File tree

4 files changed

+22
-564
lines changed

4 files changed

+22
-564
lines changed

bitswap/decision/bench_test.go

-30
This file was deleted.

bitswap/decision/engine.go

+22-16
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ import (
88

99
bsmsg "github.com/ipfs/go-bitswap/message"
1010
wl "github.com/ipfs/go-bitswap/wantlist"
11+
cid "github.com/ipfs/go-cid"
12+
"github.com/ipfs/go-peertaskqueue"
13+
"github.com/ipfs/go-peertaskqueue/peertask"
1114

1215
blocks "github.com/ipfs/go-block-format"
1316
bstore "github.com/ipfs/go-ipfs-blockstore"
@@ -73,7 +76,7 @@ type Engine struct {
7376
// peerRequestQueue is a priority queue of requests received from peers.
7477
// Requests are popped from the queue, packaged up, and placed in the
7578
// outbox.
76-
peerRequestQueue *prq
79+
peerRequestQueue *peertaskqueue.PeerTaskQueue
7780

7881
// FIXME it's a bit odd for the client and the worker to both share memory
7982
// (both modify the peerRequestQueue) and also to communicate over the
@@ -100,7 +103,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
100103
e := &Engine{
101104
ledgerMap: make(map[peer.ID]*ledger),
102105
bs: bs,
103-
peerRequestQueue: newPRQ(),
106+
peerRequestQueue: peertaskqueue.New(),
104107
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
105108
workSignal: make(chan struct{}, 1),
106109
ticker: time.NewTicker(time.Millisecond * 100),
@@ -159,23 +162,23 @@ func (e *Engine) taskWorker(ctx context.Context) {
159162
// context is cancelled before the next Envelope can be created.
160163
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
161164
for {
162-
nextTask := e.peerRequestQueue.Pop()
165+
nextTask := e.peerRequestQueue.PopBlock()
163166
for nextTask == nil {
164167
select {
165168
case <-ctx.Done():
166169
return nil, ctx.Err()
167170
case <-e.workSignal:
168-
nextTask = e.peerRequestQueue.Pop()
171+
nextTask = e.peerRequestQueue.PopBlock()
169172
case <-e.ticker.C:
170-
e.peerRequestQueue.thawRound()
171-
nextTask = e.peerRequestQueue.Pop()
173+
e.peerRequestQueue.ThawRound()
174+
nextTask = e.peerRequestQueue.PopBlock()
172175
}
173176
}
174177

175178
// with a task in hand, we're ready to prepare the envelope...
176179
msg := bsmsg.New(true)
177-
for _, entry := range nextTask.Entries {
178-
block, err := e.bs.Get(entry.Cid)
180+
for _, entry := range nextTask.Tasks {
181+
block, err := e.bs.Get(entry.Identifier.(cid.Cid))
179182
if err != nil {
180183
log.Errorf("tried to execute a task and errored fetching block: %s", err)
181184
continue
@@ -186,15 +189,15 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
186189
if msg.Empty() {
187190
// If we don't have the block, don't hold that against the peer
188191
// make sure to update that the task has been 'completed'
189-
nextTask.Done(nextTask.Entries)
192+
nextTask.Done(nextTask.Tasks)
190193
continue
191194
}
192195

193196
return &Envelope{
194197
Peer: nextTask.Target,
195198
Message: msg,
196199
Sent: func() {
197-
nextTask.Done(nextTask.Entries)
200+
nextTask.Done(nextTask.Tasks)
198201
select {
199202
case e.workSignal <- struct{}{}:
200203
// work completing may mean that our queue will provide new
@@ -246,7 +249,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
246249
}
247250

248251
var msgSize int
249-
var activeEntries []wl.Entry
252+
var activeEntries []peertask.Task
250253
for _, entry := range m.Wantlist() {
251254
if entry.Cancel {
252255
log.Debugf("%s cancel %s", p, entry.Cid)
@@ -265,17 +268,17 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
265268
// we have the block
266269
newWorkExists = true
267270
if msgSize+blockSize > maxMessageSize {
268-
e.peerRequestQueue.Push(p, activeEntries...)
269-
activeEntries = []wl.Entry{}
271+
e.peerRequestQueue.PushBlock(p, activeEntries...)
272+
activeEntries = []peertask.Task{}
270273
msgSize = 0
271274
}
272-
activeEntries = append(activeEntries, entry.Entry)
275+
activeEntries = append(activeEntries, peertask.Task{Identifier: entry.Cid, Priority: entry.Priority})
273276
msgSize += blockSize
274277
}
275278
}
276279
}
277280
if len(activeEntries) > 0 {
278-
e.peerRequestQueue.Push(p, activeEntries...)
281+
e.peerRequestQueue.PushBlock(p, activeEntries...)
279282
}
280283
for _, block := range m.Blocks() {
281284
log.Debugf("got block %s %d bytes", block, len(block.RawData()))
@@ -289,7 +292,10 @@ func (e *Engine) addBlock(block blocks.Block) {
289292
for _, l := range e.ledgerMap {
290293
l.lk.Lock()
291294
if entry, ok := l.WantListContains(block.Cid()); ok {
292-
e.peerRequestQueue.Push(l.Partner, entry)
295+
e.peerRequestQueue.PushBlock(l.Partner, peertask.Task{
296+
Identifier: entry.Cid,
297+
Priority: entry.Priority,
298+
})
293299
work = true
294300
}
295301
l.lk.Unlock()

0 commit comments

Comments
 (0)