Skip to content

Commit b272024

Browse files
authored
Merge pull request ipfs/go-bitswap#27 from ipfs/feat/speed-up-sessions
Speed up sessions Round ipfs#1 This commit was moved from ipfs/go-bitswap@43c65d4
2 parents c0e13aa + 0f8c138 commit b272024

9 files changed

+612
-75
lines changed

bitswap/bitswap.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"sync/atomic"
1010
"time"
1111

12+
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
13+
1214
decision "github.com/ipfs/go-bitswap/decision"
1315
bsgetter "github.com/ipfs/go-bitswap/getter"
1416
bsmsg "github.com/ipfs/go-bitswap/message"
@@ -103,12 +105,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
103105
}
104106

105107
wm := bswm.New(ctx)
106-
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager) bssm.Session {
107-
return bssession.New(ctx, id, wm, pm)
108+
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
109+
return bssession.New(ctx, id, wm, pm, srs)
108110
}
109111
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
110112
return bsspm.New(ctx, id, network)
111113
}
114+
sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
115+
return bssrs.New(ctx)
116+
}
112117

113118
bs := &Bitswap{
114119
blockstore: bstore,
@@ -121,7 +126,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
121126
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
122127
wm: wm,
123128
pm: bspm.New(ctx, peerQueueFactory),
124-
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory),
129+
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
125130
counters: new(counters),
126131
dupMetric: dupHist,
127132
allMetric: allHist,
@@ -391,7 +396,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
391396
defer wg.Done()
392397

393398
bs.updateReceiveCounters(b)
394-
399+
bs.sm.UpdateReceiveCounters(b)
395400
log.Debugf("got block %s from %s", b, p)
396401

397402
// skip received blocks that are not in the wantlist

bitswap/session/session.go

+83-14
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,14 @@ import (
1212
logging "github.com/ipfs/go-log"
1313
loggables "github.com/libp2p/go-libp2p-loggables"
1414
peer "github.com/libp2p/go-libp2p-peer"
15+
16+
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
1517
)
1618

17-
const activeWantsLimit = 16
19+
const (
20+
broadcastLiveWantsLimit = 4
21+
targetedLiveWantsLimit = 32
22+
)
1823

1924
// WantManager is an interface that can be used to request blocks
2025
// from given peers.
@@ -32,14 +37,23 @@ type PeerManager interface {
3237
RecordPeerResponse(peer.ID, cid.Cid)
3338
}
3439

40+
// RequestSplitter provides an interface for splitting
41+
// a request for Cids up among peers.
42+
type RequestSplitter interface {
43+
SplitRequest([]peer.ID, []cid.Cid) []*bssrs.PartialRequest
44+
RecordDuplicateBlock()
45+
RecordUniqueBlock()
46+
}
47+
3548
type interestReq struct {
3649
c cid.Cid
3750
resp chan bool
3851
}
3952

4053
type blkRecv struct {
41-
from peer.ID
42-
blk blocks.Block
54+
from peer.ID
55+
blk blocks.Block
56+
counterMessage bool
4357
}
4458

4559
// Session holds state for an individual bitswap transfer operation.
@@ -50,6 +64,7 @@ type Session struct {
5064
ctx context.Context
5165
wm WantManager
5266
pm PeerManager
67+
srs RequestSplitter
5368

5469
// channels
5570
incoming chan blkRecv
@@ -62,12 +77,12 @@ type Session struct {
6277
// do not touch outside run loop
6378
tofetch *cidQueue
6479
interest *lru.Cache
80+
pastWants *cidQueue
6581
liveWants map[cid.Cid]time.Time
6682
tick *time.Timer
6783
baseTickDelay time.Duration
6884
latTotal time.Duration
6985
fetchcnt int
70-
7186
// identifiers
7287
notif notifications.PubSub
7388
uuid logging.Loggable
@@ -76,18 +91,20 @@ type Session struct {
7691

7792
// New creates a new bitswap session whose lifetime is bounded by the
7893
// given context.
79-
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Session {
94+
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session {
8095
s := &Session{
8196
liveWants: make(map[cid.Cid]time.Time),
8297
newReqs: make(chan []cid.Cid),
8398
cancelKeys: make(chan []cid.Cid),
8499
tofetch: newCidQueue(),
100+
pastWants: newCidQueue(),
85101
interestReqs: make(chan interestReq),
86102
latencyReqs: make(chan chan time.Duration),
87103
tickDelayReqs: make(chan time.Duration),
88104
ctx: ctx,
89105
wm: wm,
90106
pm: pm,
107+
srs: srs,
91108
incoming: make(chan blkRecv),
92109
notif: notifications.New(),
93110
uuid: loggables.Uuid("GetBlockRequest"),
@@ -106,14 +123,23 @@ func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Sessio
106123
// ReceiveBlockFrom receives an incoming block from the given peer.
107124
func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
108125
select {
109-
case s.incoming <- blkRecv{from: from, blk: blk}:
126+
case s.incoming <- blkRecv{from: from, blk: blk, counterMessage: false}:
110127
case <-s.ctx.Done():
111128
}
112129
ks := []cid.Cid{blk.Cid()}
113130
s.wm.CancelWants(s.ctx, ks, nil, s.id)
114131

115132
}
116133

134+
// UpdateReceiveCounters updates receive counters for a block,
135+
// which may be a duplicate and adjusts the split factor based on that.
136+
func (s *Session) UpdateReceiveCounters(blk blocks.Block) {
137+
select {
138+
case s.incoming <- blkRecv{from: "", blk: blk, counterMessage: true}:
139+
case <-s.ctx.Done():
140+
}
141+
}
142+
117143
// InterestedIn returns true if this session is interested in the given Cid.
118144
func (s *Session) InterestedIn(c cid.Cid) bool {
119145
if s.interest.Contains(c) {
@@ -205,7 +231,11 @@ func (s *Session) run(ctx context.Context) {
205231
for {
206232
select {
207233
case blk := <-s.incoming:
208-
s.handleIncomingBlock(ctx, blk)
234+
if blk.counterMessage {
235+
s.updateReceiveCounters(ctx, blk)
236+
} else {
237+
s.handleIncomingBlock(ctx, blk)
238+
}
209239
case keys := <-s.newReqs:
210240
s.handleNewRequest(ctx, keys)
211241
case keys := <-s.cancelKeys:
@@ -241,8 +271,7 @@ func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
241271
for _, k := range keys {
242272
s.interest.Add(k, nil)
243273
}
244-
if len(s.liveWants) < activeWantsLimit {
245-
toadd := activeWantsLimit - len(s.liveWants)
274+
if toadd := s.wantBudget(); toadd > 0 {
246275
if toadd > len(keys) {
247276
toadd = len(keys)
248277
}
@@ -264,6 +293,7 @@ func (s *Session) handleCancel(keys []cid.Cid) {
264293
}
265294

266295
func (s *Session) handleTick(ctx context.Context) {
296+
267297
live := make([]cid.Cid, 0, len(s.liveWants))
268298
now := time.Now()
269299
for c := range s.liveWants {
@@ -303,6 +333,7 @@ func (s *Session) cidIsWanted(c cid.Cid) bool {
303333
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
304334
c := blk.Cid()
305335
if s.cidIsWanted(c) {
336+
s.srs.RecordUniqueBlock()
306337
tval, ok := s.liveWants[c]
307338
if ok {
308339
s.latTotal += time.Since(tval)
@@ -313,9 +344,26 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
313344
s.fetchcnt++
314345
s.notif.Publish(blk)
315346

316-
if next := s.tofetch.Pop(); next.Defined() {
317-
s.wantBlocks(ctx, []cid.Cid{next})
347+
toAdd := s.wantBudget()
348+
if toAdd > s.tofetch.Len() {
349+
toAdd = s.tofetch.Len()
350+
}
351+
if toAdd > 0 {
352+
var keys []cid.Cid
353+
for i := 0; i < toAdd; i++ {
354+
keys = append(keys, s.tofetch.Pop())
355+
}
356+
s.wantBlocks(ctx, keys)
318357
}
358+
359+
s.pastWants.Push(c)
360+
}
361+
}
362+
363+
func (s *Session) updateReceiveCounters(ctx context.Context, blk blkRecv) {
364+
ks := blk.blk.Cid()
365+
if s.pastWants.Has(ks) {
366+
s.srs.RecordDuplicateBlock()
319367
}
320368
}
321369

@@ -325,9 +373,16 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
325373
s.liveWants[c] = now
326374
}
327375
peers := s.pm.GetOptimizedPeers()
328-
// right now we're requesting each block from every peer, but soon, maybe not
329-
s.pm.RecordPeerRequests(peers, ks)
330-
s.wm.WantBlocks(ctx, ks, peers, s.id)
376+
if len(peers) > 0 {
377+
splitRequests := s.srs.SplitRequest(peers, ks)
378+
for _, splitRequest := range splitRequests {
379+
s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
380+
s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
381+
}
382+
} else {
383+
s.pm.RecordPeerRequests(nil, ks)
384+
s.wm.WantBlocks(ctx, ks, nil, s.id)
385+
}
331386
}
332387

333388
func (s *Session) averageLatency() time.Duration {
@@ -342,3 +397,17 @@ func (s *Session) resetTick() {
342397
s.tick.Reset(s.baseTickDelay + (3 * avLat))
343398
}
344399
}
400+
401+
func (s *Session) wantBudget() int {
402+
live := len(s.liveWants)
403+
var budget int
404+
if len(s.pm.GetOptimizedPeers()) > 0 {
405+
budget = targetedLiveWantsLimit - live
406+
} else {
407+
budget = broadcastLiveWantsLimit - live
408+
}
409+
if budget < 0 {
410+
budget = 0
411+
}
412+
return budget
413+
}

bitswap/session/session_test.go

+32-16
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/ipfs/go-block-format"
1010

11+
bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
1112
"github.com/ipfs/go-bitswap/testutil"
1213
cid "github.com/ipfs/go-cid"
1314
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
@@ -55,17 +56,28 @@ func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
5556
fpm.lk.Unlock()
5657
}
5758

59+
type fakeRequestSplitter struct {
60+
}
61+
62+
func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
63+
return []*bssrs.PartialRequest{&bssrs.PartialRequest{Peers: peers, Keys: keys}}
64+
}
65+
66+
func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
67+
func (frs *fakeRequestSplitter) RecordUniqueBlock() {}
68+
5869
func TestSessionGetBlocks(t *testing.T) {
5970
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
6071
defer cancel()
6172
wantReqs := make(chan wantReq, 1)
6273
cancelReqs := make(chan wantReq, 1)
6374
fwm := &fakeWantManager{wantReqs, cancelReqs}
6475
fpm := &fakePeerManager{}
76+
frs := &fakeRequestSplitter{}
6577
id := testutil.GenerateSessionID()
66-
session := New(ctx, id, fwm, fpm)
78+
session := New(ctx, id, fwm, fpm, frs)
6779
blockGenerator := blocksutil.NewBlockGenerator()
68-
blks := blockGenerator.Blocks(activeWantsLimit * 2)
80+
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
6981
var cids []cid.Cid
7082
for _, block := range blks {
7183
cids = append(cids, block.Cid())
@@ -79,15 +91,15 @@ func TestSessionGetBlocks(t *testing.T) {
7991
// check initial want request
8092
receivedWantReq := <-fwm.wantReqs
8193

82-
if len(receivedWantReq.cids) != activeWantsLimit {
94+
if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
8395
t.Fatal("did not enqueue correct initial number of wants")
8496
}
8597
if receivedWantReq.peers != nil {
8698
t.Fatal("first want request should be a broadcast")
8799
}
88100

89101
// now receive the first set of blocks
90-
peers := testutil.GeneratePeers(activeWantsLimit)
102+
peers := testutil.GeneratePeers(broadcastLiveWantsLimit)
91103
var newCancelReqs []wantReq
92104
var newBlockReqs []wantReq
93105
var receivedBlocks []blocks.Block
@@ -97,13 +109,16 @@ func TestSessionGetBlocks(t *testing.T) {
97109
receivedBlocks = append(receivedBlocks, receivedBlock)
98110
cancelBlock := <-cancelReqs
99111
newCancelReqs = append(newCancelReqs, cancelBlock)
100-
wantBlock := <-wantReqs
101-
newBlockReqs = append(newBlockReqs, wantBlock)
112+
select {
113+
case wantBlock := <-wantReqs:
114+
newBlockReqs = append(newBlockReqs, wantBlock)
115+
default:
116+
}
102117
}
103118

104119
// verify new peers were recorded
105120
fpm.lk.Lock()
106-
if len(fpm.peers) != activeWantsLimit {
121+
if len(fpm.peers) != broadcastLiveWantsLimit {
107122
t.Fatal("received blocks not recorded by the peer manager")
108123
}
109124
for _, p := range fpm.peers {
@@ -116,26 +131,26 @@ func TestSessionGetBlocks(t *testing.T) {
116131
// look at new interactions with want manager
117132

118133
// should have cancelled each received block
119-
if len(newCancelReqs) != activeWantsLimit {
134+
if len(newCancelReqs) != broadcastLiveWantsLimit {
120135
t.Fatal("did not cancel each block once it was received")
121136
}
122137
// new session reqs should be targeted
123-
totalEnqueued := 0
138+
var newCidsRequested []cid.Cid
124139
for _, w := range newBlockReqs {
125140
if len(w.peers) == 0 {
126141
t.Fatal("should not have broadcast again after initial broadcast")
127142
}
128-
totalEnqueued += len(w.cids)
143+
newCidsRequested = append(newCidsRequested, w.cids...)
129144
}
130145

131146
// full new round of cids should be requested
132-
if totalEnqueued != activeWantsLimit {
147+
if len(newCidsRequested) != broadcastLiveWantsLimit {
133148
t.Fatal("new blocks were not requested")
134149
}
135150

136151
// receive remaining blocks
137152
for i, p := range peers {
138-
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newBlockReqs[i].cids[0])])
153+
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newCidsRequested[i])])
139154
receivedBlock := <-getBlocksCh
140155
receivedBlocks = append(receivedBlocks, receivedBlock)
141156
cancelBlock := <-cancelReqs
@@ -159,12 +174,13 @@ func TestSessionFindMorePeers(t *testing.T) {
159174
wantReqs := make(chan wantReq, 1)
160175
cancelReqs := make(chan wantReq, 1)
161176
fwm := &fakeWantManager{wantReqs, cancelReqs}
162-
fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{})}
177+
fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{}, 1)}
178+
frs := &fakeRequestSplitter{}
163179
id := testutil.GenerateSessionID()
164-
session := New(ctx, id, fwm, fpm)
180+
session := New(ctx, id, fwm, fpm, frs)
165181
session.SetBaseTickDelay(200 * time.Microsecond)
166182
blockGenerator := blocksutil.NewBlockGenerator()
167-
blks := blockGenerator.Blocks(activeWantsLimit * 2)
183+
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
168184
var cids []cid.Cid
169185
for _, block := range blks {
170186
cids = append(cids, block.Cid())
@@ -190,7 +206,7 @@ func TestSessionFindMorePeers(t *testing.T) {
190206

191207
// verify a broadcast was made
192208
receivedWantReq := <-wantReqs
193-
if len(receivedWantReq.cids) != activeWantsLimit {
209+
if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
194210
t.Fatal("did not rebroadcast whole live list")
195211
}
196212
if receivedWantReq.peers != nil {

0 commit comments

Comments
 (0)