Skip to content
This repository was archived by the owner on Feb 1, 2023. It is now read-only.

Bitswap Refactor #1: Session Manager & Extract Want Manager #28

Merged
merged 1 commit into from
Dec 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 13 additions & 26 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package bitswap
import (
"context"
"errors"
"math"
"sync"
"sync/atomic"
"time"
Expand All @@ -14,6 +13,8 @@ import (
bsmsg "github.com/ipfs/go-bitswap/message"
bsnet "github.com/ipfs/go-bitswap/network"
notifications "github.com/ipfs/go-bitswap/notifications"
bssm "github.com/ipfs/go-bitswap/sessionmanager"
bswm "github.com/ipfs/go-bitswap/wantmanager"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -42,8 +43,6 @@ const (
providerRequestTimeout = time.Second * 10
provideTimeout = time.Second * 15
sizeBatchRequestChan = 32
// kMaxPriority is the max priority as defined by the bitswap protocol
kMaxPriority = math.MaxInt32
)

var (
Expand Down Expand Up @@ -101,7 +100,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
process: px,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: NewWantManager(ctx, network),
wm: bswm.New(ctx, network),
sm: bssm.New(),
counters: new(counters),

dupMetric: dupHist,
Expand All @@ -128,7 +128,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
type Bitswap struct {
// the peermanager manages sending messages to peers in a way that
// wont block bitswap operation
wm *WantManager
wm *bswm.WantManager

// the engine is the bit of logic that decides who to send which blocks to
engine *decision.Engine
Expand Down Expand Up @@ -163,12 +163,8 @@ type Bitswap struct {
dupMetric metrics.Histogram
allMetric metrics.Histogram

// Sessions
sessions []*Session
sessLk sync.Mutex

sessID uint64
sessIDLk sync.Mutex
// the sessionmanager manages tracking sessions
sm *bssm.SessionManager
}

type counters struct {
Expand Down Expand Up @@ -229,7 +225,7 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
log.Event(ctx, "Bitswap.GetBlockRequest.Start", k)
}

mses := bs.getNextSessionID()
mses := bs.sm.GetNextSessionID()

bs.wm.WantBlocks(ctx, keys, nil, mses)

Expand Down Expand Up @@ -294,13 +290,6 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
return out, nil
}

func (bs *Bitswap) getNextSessionID() uint64 {
bs.sessIDLk.Lock()
defer bs.sessIDLk.Unlock()
bs.sessID++
return bs.sessID
}

// CancelWant removes a given key from the wantlist
func (bs *Bitswap) CancelWants(cids []cid.Cid, ses uint64) {
if len(cids) == 0 {
Expand Down Expand Up @@ -359,15 +348,13 @@ func (bs *Bitswap) receiveBlockFrom(blk blocks.Block, from peer.ID) error {

// SessionsForBlock returns a slice of all sessions that may be interested in the given cid
func (bs *Bitswap) SessionsForBlock(c cid.Cid) []*Session {
bs.sessLk.Lock()
defer bs.sessLk.Unlock()

var out []*Session
for _, s := range bs.sessions {
bs.sm.IterateSessions(func(session exchange.Fetcher) {
s := session.(*Session)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having this cast is a bit funky. I get that this is to avoid a dependency issue but I wonder if there's a better way. Would it make sense to move sessions into their own package? That way, SessionManager could operate over concrete sessions.

Unfortunately, I don't think that'll work as Session appears to need access to bitswap internals. An alternative is to create a Session interface that exposes an InterestedIn method.

I'd be fine punting these questions till later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey!

Let's punt till later since two PR's forward Sessions end up in their own package, and this cast is gone :)

if s.interestedIn(c) {
out = append(out, s)
}
}
})
return out
}

Expand Down Expand Up @@ -398,7 +385,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
log.Debugf("got block %s from %s", b, p)

// skip received blocks that are not in the wantlist
if _, contains := bs.wm.wl.Contains(b.Cid()); !contains {
if !bs.wm.IsWanted(b.Cid()) {
return
}

Expand Down Expand Up @@ -461,7 +448,7 @@ func (bs *Bitswap) Close() error {
}

func (bs *Bitswap) GetWantlist() []cid.Cid {
entries := bs.wm.wl.Entries()
entries := bs.wm.CurrentWants()
out := make([]cid.Cid, 0, len(entries))
for _, e := range entries {
out = append(out, e.Cid)
Expand Down
208 changes: 208 additions & 0 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package messagequeue

import (
"context"
"sync"
"time"

bsmsg "github.com/ipfs/go-bitswap/message"
bsnet "github.com/ipfs/go-bitswap/network"
wantlist "github.com/ipfs/go-bitswap/wantlist"
logging "github.com/ipfs/go-log"
peer "github.com/libp2p/go-libp2p-peer"
)

var log = logging.Logger("bitswap")

type MessageQueue struct {
p peer.ID

outlk sync.Mutex
out bsmsg.BitSwapMessage
network bsnet.BitSwapNetwork
wl *wantlist.ThreadSafe

sender bsnet.MessageSender

refcnt int

work chan struct{}
done chan struct{}
}

func New(p peer.ID, network bsnet.BitSwapNetwork) *MessageQueue {
return &MessageQueue{
done: make(chan struct{}),
work: make(chan struct{}, 1),
wl: wantlist.NewThreadSafe(),
network: network,
p: p,
refcnt: 1,
}
}

func (mq *MessageQueue) RefIncrement() {
mq.refcnt++
}

func (mq *MessageQueue) RefDecrement() bool {
mq.refcnt--
return mq.refcnt > 0
}

func (mq *MessageQueue) AddMessage(entries []*bsmsg.Entry, ses uint64) {
var work bool
mq.outlk.Lock()
defer func() {
mq.outlk.Unlock()
if !work {
return
}
select {
case mq.work <- struct{}{}:
default:
}
}()

// if we have no message held allocate a new one
if mq.out == nil {
mq.out = bsmsg.New(false)
}

// TODO: add a msg.Combine(...) method
// otherwise, combine the one we are holding with the
// one passed in
for _, e := range entries {
if e.Cancel {
if mq.wl.Remove(e.Cid, ses) {
work = true
mq.out.Cancel(e.Cid)
}
} else {
if mq.wl.Add(e.Cid, e.Priority, ses) {
work = true
mq.out.AddEntry(e.Cid, e.Priority)
}
}
}
}

func (mq *MessageQueue) Startup(ctx context.Context, initialEntries []*wantlist.Entry) {

// new peer, we will want to give them our full wantlist
fullwantlist := bsmsg.New(true)
for _, e := range initialEntries {
for k := range e.SesTrk {
mq.wl.AddEntry(e, k)
}
fullwantlist.AddEntry(e.Cid, e.Priority)
}
mq.out = fullwantlist
mq.work <- struct{}{}

go mq.runQueue(ctx)
}

func (mq *MessageQueue) Shutdown() {
close(mq.done)
}
func (mq *MessageQueue) runQueue(ctx context.Context) {
for {
select {
case <-mq.work: // there is work to be done
mq.doWork(ctx)
case <-mq.done:
if mq.sender != nil {
mq.sender.Close()
}
return
case <-ctx.Done():
if mq.sender != nil {
mq.sender.Reset()
}
return
}
}
}

func (mq *MessageQueue) doWork(ctx context.Context) {
// grab outgoing message
mq.outlk.Lock()
wlm := mq.out
if wlm == nil || wlm.Empty() {
mq.outlk.Unlock()
return
}
mq.out = nil
mq.outlk.Unlock()

// NB: only open a stream if we actually have data to send
if mq.sender == nil {
err := mq.openSender(ctx)
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
return
}
}

// send wantlist updates
for { // try to send this message until we fail.
err := mq.sender.SendMsg(ctx, wlm)
if err == nil {
return
}

log.Infof("bitswap send error: %s", err)
mq.sender.Reset()
mq.sender = nil

select {
case <-mq.done:
return
case <-ctx.Done():
return
case <-time.After(time.Millisecond * 100):
// wait 100ms in case disconnect notifications are still propogating
log.Warning("SendMsg errored but neither 'done' nor context.Done() were set")
}

err = mq.openSender(ctx)
if err != nil {
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
// TODO(why): what do we do now?
// I think the *right* answer is to probably put the message we're
// trying to send back, and then return to waiting for new work or
// a disconnect.
return
}

// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
}
*/
}
}

func (mq *MessageQueue) openSender(ctx context.Context) error {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

err := mq.network.ConnectTo(conctx, mq.p)
if err != nil {
return err
}

nsender, err := mq.network.NewMessageSender(ctx, mq.p)
if err != nil {
return err
}

mq.sender = nsender
return nil
}
17 changes: 3 additions & 14 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,15 @@ func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
notif: notifications.New(),
uuid: loggables.Uuid("GetBlockRequest"),
baseTickDelay: time.Millisecond * 500,
id: bs.getNextSessionID(),
id: bs.sm.GetNextSessionID(),
}

s.tag = fmt.Sprint("bs-ses-", s.id)

cache, _ := lru.New(2048)
s.interest = cache

bs.sessLk.Lock()
bs.sessions = append(bs.sessions, s)
bs.sessLk.Unlock()

bs.sm.AddSession(s)
go s.run(ctx)

return s
Expand All @@ -92,15 +89,7 @@ func (bs *Bitswap) removeSession(s *Session) {
}
bs.CancelWants(live, s.id)

bs.sessLk.Lock()
defer bs.sessLk.Unlock()
for i := 0; i < len(bs.sessions); i++ {
if bs.sessions[i] == s {
bs.sessions[i] = bs.sessions[len(bs.sessions)-1]
bs.sessions = bs.sessions[:len(bs.sessions)-1]
return
}
}
bs.sm.RemoveSession(s)
}

type blkRecv struct {
Expand Down
Loading