Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Speed up sessions Round #1 #27

Merged
merged 5 commits into from
Dec 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"sync/atomic"
"time"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"

decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsmsg "github.com/ipfs/go-bitswap/message"
Expand Down Expand Up @@ -103,12 +105,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
}

wm := bswm.New(ctx)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager) bssm.Session {
return bssession.New(ctx, id, wm, pm)
sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
return bsspm.New(ctx, id, network)
}
sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
return bssrs.New(ctx)
}

bs := &Bitswap{
blockstore: bstore,
Expand All @@ -121,7 +126,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pm: bspm.New(ctx, peerQueueFactory),
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory),
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory),
counters: new(counters),
dupMetric: dupHist,
allMetric: allHist,
Expand Down Expand Up @@ -391,7 +396,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
defer wg.Done()

bs.updateReceiveCounters(b)

bs.sm.UpdateReceiveCounters(b)
log.Debugf("got block %s from %s", b, p)

// skip received blocks that are not in the wantlist
Expand Down
97 changes: 83 additions & 14 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ import (
logging "github.com/ipfs/go-log"
loggables "github.com/libp2p/go-libp2p-loggables"
peer "github.com/libp2p/go-libp2p-peer"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
)

const activeWantsLimit = 16
const (
broadcastLiveWantsLimit = 4
targetedLiveWantsLimit = 32
)

// WantManager is an interface that can be used to request blocks
// from given peers.
Expand All @@ -32,14 +37,23 @@ type PeerManager interface {
RecordPeerResponse(peer.ID, cid.Cid)
}

// RequestSplitter provides an interface for splitting
// a request for Cids up among peers.
type RequestSplitter interface {
SplitRequest([]peer.ID, []cid.Cid) []*bssrs.PartialRequest
RecordDuplicateBlock()
RecordUniqueBlock()
}

type interestReq struct {
c cid.Cid
resp chan bool
}

type blkRecv struct {
from peer.ID
blk blocks.Block
from peer.ID
blk blocks.Block
counterMessage bool
}

// Session holds state for an individual bitswap transfer operation.
Expand All @@ -50,6 +64,7 @@ type Session struct {
ctx context.Context
wm WantManager
pm PeerManager
srs RequestSplitter

// channels
incoming chan blkRecv
Expand All @@ -62,12 +77,12 @@ type Session struct {
// do not touch outside run loop
tofetch *cidQueue
interest *lru.Cache
pastWants *cidQueue
liveWants map[cid.Cid]time.Time
tick *time.Timer
baseTickDelay time.Duration
latTotal time.Duration
fetchcnt int

// identifiers
notif notifications.PubSub
uuid logging.Loggable
Expand All @@ -76,18 +91,20 @@ type Session struct {

// New creates a new bitswap session whose lifetime is bounded by the
// given context.
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Session {
func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager, srs RequestSplitter) *Session {
s := &Session{
liveWants: make(map[cid.Cid]time.Time),
newReqs: make(chan []cid.Cid),
cancelKeys: make(chan []cid.Cid),
tofetch: newCidQueue(),
pastWants: newCidQueue(),
interestReqs: make(chan interestReq),
latencyReqs: make(chan chan time.Duration),
tickDelayReqs: make(chan time.Duration),
ctx: ctx,
wm: wm,
pm: pm,
srs: srs,
incoming: make(chan blkRecv),
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
Expand All @@ -106,14 +123,23 @@ func New(ctx context.Context, id uint64, wm WantManager, pm PeerManager) *Sessio
// ReceiveBlockFrom receives an incoming block from the given peer.
func (s *Session) ReceiveBlockFrom(from peer.ID, blk blocks.Block) {
select {
case s.incoming <- blkRecv{from: from, blk: blk}:
case s.incoming <- blkRecv{from: from, blk: blk, counterMessage: false}:
case <-s.ctx.Done():
}
ks := []cid.Cid{blk.Cid()}
s.wm.CancelWants(s.ctx, ks, nil, s.id)

}

// UpdateReceiveCounters updates receive counters for a block,
// which may be a duplicate and adjusts the split factor based on that.
func (s *Session) UpdateReceiveCounters(blk blocks.Block) {
select {
case s.incoming <- blkRecv{from: "", blk: blk, counterMessage: true}:
case <-s.ctx.Done():
}
}

// InterestedIn returns true if this session is interested in the given Cid.
func (s *Session) InterestedIn(c cid.Cid) bool {
if s.interest.Contains(c) {
Expand Down Expand Up @@ -205,7 +231,11 @@ func (s *Session) run(ctx context.Context) {
for {
select {
case blk := <-s.incoming:
s.handleIncomingBlock(ctx, blk)
if blk.counterMessage {
s.updateReceiveCounters(ctx, blk)
} else {
s.handleIncomingBlock(ctx, blk)
}
case keys := <-s.newReqs:
s.handleNewRequest(ctx, keys)
case keys := <-s.cancelKeys:
Expand Down Expand Up @@ -241,8 +271,7 @@ func (s *Session) handleNewRequest(ctx context.Context, keys []cid.Cid) {
for _, k := range keys {
s.interest.Add(k, nil)
}
if len(s.liveWants) < activeWantsLimit {
toadd := activeWantsLimit - len(s.liveWants)
if toadd := s.wantBudget(); toadd > 0 {
if toadd > len(keys) {
toadd = len(keys)
}
Expand All @@ -264,6 +293,7 @@ func (s *Session) handleCancel(keys []cid.Cid) {
}

func (s *Session) handleTick(ctx context.Context) {

live := make([]cid.Cid, 0, len(s.liveWants))
now := time.Now()
for c := range s.liveWants {
Expand Down Expand Up @@ -303,6 +333,7 @@ func (s *Session) cidIsWanted(c cid.Cid) bool {
func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
c := blk.Cid()
if s.cidIsWanted(c) {
s.srs.RecordUniqueBlock()
tval, ok := s.liveWants[c]
if ok {
s.latTotal += time.Since(tval)
Expand All @@ -313,9 +344,26 @@ func (s *Session) receiveBlock(ctx context.Context, blk blocks.Block) {
s.fetchcnt++
s.notif.Publish(blk)

if next := s.tofetch.Pop(); next.Defined() {
s.wantBlocks(ctx, []cid.Cid{next})
toAdd := s.wantBudget()
if toAdd > s.tofetch.Len() {
toAdd = s.tofetch.Len()
}
if toAdd > 0 {
var keys []cid.Cid
for i := 0; i < toAdd; i++ {
keys = append(keys, s.tofetch.Pop())
}
s.wantBlocks(ctx, keys)
}

s.pastWants.Push(c)
}
}

func (s *Session) updateReceiveCounters(ctx context.Context, blk blkRecv) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move RecordUniqueBlock to receiveBlock

ks := blk.blk.Cid()
if s.pastWants.Has(ks) {
s.srs.RecordDuplicateBlock()
}
}

Expand All @@ -325,9 +373,16 @@ func (s *Session) wantBlocks(ctx context.Context, ks []cid.Cid) {
s.liveWants[c] = now
}
peers := s.pm.GetOptimizedPeers()
// right now we're requesting each block from every peer, but soon, maybe not
s.pm.RecordPeerRequests(peers, ks)
s.wm.WantBlocks(ctx, ks, peers, s.id)
if len(peers) > 0 {
splitRequests := s.srs.SplitRequest(peers, ks)
for _, splitRequest := range splitRequests {
s.pm.RecordPeerRequests(splitRequest.Peers, splitRequest.Keys)
s.wm.WantBlocks(ctx, splitRequest.Keys, splitRequest.Peers, s.id)
}
} else {
s.pm.RecordPeerRequests(nil, ks)
s.wm.WantBlocks(ctx, ks, nil, s.id)
}
}

func (s *Session) averageLatency() time.Duration {
Expand All @@ -342,3 +397,17 @@ func (s *Session) resetTick() {
s.tick.Reset(s.baseTickDelay + (3 * avLat))
}
}

func (s *Session) wantBudget() int {
live := len(s.liveWants)
var budget int
if len(s.pm.GetOptimizedPeers()) > 0 {
budget = targetedLiveWantsLimit - live
} else {
budget = broadcastLiveWantsLimit - live
}
if budget < 0 {
budget = 0
}
return budget
}
48 changes: 32 additions & 16 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/ipfs/go-block-format"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
"github.com/ipfs/go-bitswap/testutil"
cid "github.com/ipfs/go-cid"
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
Expand Down Expand Up @@ -55,17 +56,28 @@ func (fpm *fakePeerManager) RecordPeerResponse(p peer.ID, c cid.Cid) {
fpm.lk.Unlock()
}

type fakeRequestSplitter struct {
}

func (frs *fakeRequestSplitter) SplitRequest(peers []peer.ID, keys []cid.Cid) []*bssrs.PartialRequest {
return []*bssrs.PartialRequest{&bssrs.PartialRequest{Peers: peers, Keys: keys}}
}

func (frs *fakeRequestSplitter) RecordDuplicateBlock() {}
func (frs *fakeRequestSplitter) RecordUniqueBlock() {}

func TestSessionGetBlocks(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()
wantReqs := make(chan wantReq, 1)
cancelReqs := make(chan wantReq, 1)
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{}
frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm)
session := New(ctx, id, fwm, fpm, frs)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(activeWantsLimit * 2)
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
for _, block := range blks {
cids = append(cids, block.Cid())
Expand All @@ -79,15 +91,15 @@ func TestSessionGetBlocks(t *testing.T) {
// check initial want request
receivedWantReq := <-fwm.wantReqs

if len(receivedWantReq.cids) != activeWantsLimit {
if len(receivedWantReq.cids) != broadcastLiveWantsLimit {
t.Fatal("did not enqueue correct initial number of wants")
}
if receivedWantReq.peers != nil {
t.Fatal("first want request should be a broadcast")
}

// now receive the first set of blocks
peers := testutil.GeneratePeers(activeWantsLimit)
peers := testutil.GeneratePeers(broadcastLiveWantsLimit)
var newCancelReqs []wantReq
var newBlockReqs []wantReq
var receivedBlocks []blocks.Block
Expand All @@ -97,13 +109,16 @@ func TestSessionGetBlocks(t *testing.T) {
receivedBlocks = append(receivedBlocks, receivedBlock)
cancelBlock := <-cancelReqs
newCancelReqs = append(newCancelReqs, cancelBlock)
wantBlock := <-wantReqs
newBlockReqs = append(newBlockReqs, wantBlock)
select {
case wantBlock := <-wantReqs:
newBlockReqs = append(newBlockReqs, wantBlock)
default:
}
}

// verify new peers were recorded
fpm.lk.Lock()
if len(fpm.peers) != activeWantsLimit {
if len(fpm.peers) != broadcastLiveWantsLimit {
t.Fatal("received blocks not recorded by the peer manager")
}
for _, p := range fpm.peers {
Expand All @@ -116,26 +131,26 @@ func TestSessionGetBlocks(t *testing.T) {
// look at new interactions with want manager

// should have cancelled each received block
if len(newCancelReqs) != activeWantsLimit {
if len(newCancelReqs) != broadcastLiveWantsLimit {
t.Fatal("did not cancel each block once it was received")
}
// new session reqs should be targeted
totalEnqueued := 0
var newCidsRequested []cid.Cid
for _, w := range newBlockReqs {
if len(w.peers) == 0 {
t.Fatal("should not have broadcast again after initial broadcast")
}
totalEnqueued += len(w.cids)
newCidsRequested = append(newCidsRequested, w.cids...)
}

// full new round of cids should be requested
if totalEnqueued != activeWantsLimit {
if len(newCidsRequested) != broadcastLiveWantsLimit {
t.Fatal("new blocks were not requested")
}

// receive remaining blocks
for i, p := range peers {
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newBlockReqs[i].cids[0])])
session.ReceiveBlockFrom(p, blks[testutil.IndexOf(blks, newCidsRequested[i])])
receivedBlock := <-getBlocksCh
receivedBlocks = append(receivedBlocks, receivedBlock)
cancelBlock := <-cancelReqs
Expand All @@ -159,12 +174,13 @@ func TestSessionFindMorePeers(t *testing.T) {
wantReqs := make(chan wantReq, 1)
cancelReqs := make(chan wantReq, 1)
fwm := &fakeWantManager{wantReqs, cancelReqs}
fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{})}
fpm := &fakePeerManager{findMorePeersRequested: make(chan struct{}, 1)}
frs := &fakeRequestSplitter{}
id := testutil.GenerateSessionID()
session := New(ctx, id, fwm, fpm)
session := New(ctx, id, fwm, fpm, frs)
session.SetBaseTickDelay(200 * time.Microsecond)
blockGenerator := blocksutil.NewBlockGenerator()
blks := blockGenerator.Blocks(activeWantsLimit * 2)
blks := blockGenerator.Blocks(broadcastLiveWantsLimit * 2)
var cids []cid.Cid
for _, block := range blks {
cids = append(cids, block.Cid())
Expand All @@ -190,7 +206,7 @@ func TestSessionFindMorePeers(t *testing.T) {

// verify a broadcast was made
receivedWantReq := <-wantReqs
if len(receivedWantReq.cids) != activeWantsLimit {
if len(receivedWantReq.cids) < broadcastLiveWantsLimit {
t.Fatal("did not rebroadcast whole live list")
}
if receivedWantReq.peers != nil {
Expand Down
Loading