Skip to content

Commit 7e1160b

Browse files
committed
httpnet: an Exchange network layer for http-retrieval
This and subsequent commits introduce an httpnet module at what is known as the "bitswap network layer". The bitswap network layer connects bitswap-peers, sends bitswap messages and receives responses. Bitswap messages are basically a wantlist, a list of CIDs that should be sent if available. httpnet does the same, except instead of sending the bitswap message over bitswap, it triggers http requests for the requested blocks. httpnet is a drop-in addon so that we can request blocks over http, and not only via bitswap. As httpnet is a network, it benefits from all existing wantlist management logic. Any http/2 endpoint should benefit from streamlined requests on a single http connection. A router-network ensures that messages are correctly handled by bitswap or by http requests depending on what the peers are advertising. HTTP requests are given priority in the presence of both. Here are some of the httpnet features: * Peers are marked as Connected when they are able to handle http requets. * Peers are marked as Disconnected when http requests fail repeatedly (MaxRetries). * Server errors trigger backoffs preventing more requests to happen to the same url for a period (Retry-After header or configuration value) * We support several urls per peer, meaning a peer can provide alternative http endpoints which are tried based on number of failures or existing cooldowns. * We translate HAVE requests to HTTP-HEAD requests and BLOCK requests to HTTP-GETs * We support cancellations: ongoing or soon to happen requests for a CID can be cancelled using a "cancel" entry in the wantlist. * We record latency information for peers by pinging regularly. * We discriminate between different errors so that we know whether to move to the next block in a wantlist, or to retry with a different url, or to completely abort. * Options to configure user-agent, max retries etc. are supported.
1 parent 6397847 commit 7e1160b

30 files changed

+2335
-106
lines changed

bitswap/benchmarks_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import (
1313
"time"
1414

1515
"github.com/ipfs/boxo/bitswap"
16-
bsnet "github.com/ipfs/boxo/bitswap/network"
16+
bsnet "github.com/ipfs/boxo/bitswap/network/bsnet"
1717
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
1818
tn "github.com/ipfs/boxo/bitswap/testnet"
1919
mockrouting "github.com/ipfs/boxo/routing/mock"

bitswap/client/client.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Package bitswap implements the IPFS exchange interface with the BitSwap
1+
// Package client implements the IPFS exchange interface with the BitSwap
22
// bilateral exchange protocol.
33
package client
44

@@ -191,7 +191,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr
191191

192192
sim := bssim.New()
193193
bpm := bsbpm.New()
194-
pm := bspm.New(ctx, peerQueueFactory, network.Self())
194+
pm := bspm.New(ctx, peerQueueFactory)
195195

196196
if bs.providerFinder != nil && bs.defaultProviderQueryManager {
197197
// network can do dialing.
@@ -232,7 +232,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr
232232
return bssession.New(sessctx, sessmgr, id, spm, sessionProvFinder, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
233233
}
234234
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
235-
return bsspm.New(id, network.ConnectionManager())
235+
return bsspm.New(id, network)
236236
}
237237
notif := notifications.New()
238238
sm = bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())

bitswap/client/internal/peermanager/peermanager.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,17 @@ type PeerManager struct {
4545
psLk sync.RWMutex
4646
sessions map[uint64]Session
4747
peerSessions map[peer.ID]map[uint64]struct{}
48-
49-
self peer.ID
5048
}
5149

5250
// New creates a new PeerManager, given a context and a peerQueueFactory.
53-
func New(ctx context.Context, createPeerQueue PeerQueueFactory, self peer.ID) *PeerManager {
51+
func New(ctx context.Context, createPeerQueue PeerQueueFactory) *PeerManager {
5452
wantGauge := metrics.NewCtx(ctx, "wantlist_total", "Number of items in wantlist.").Gauge()
5553
wantBlockGauge := metrics.NewCtx(ctx, "want_blocks_total", "Number of want-blocks in wantlist.").Gauge()
5654
return &PeerManager{
5755
peerQueues: make(map[peer.ID]PeerQueue),
5856
pwm: newPeerWantManager(wantGauge, wantBlockGauge),
5957
createPeerQueue: createPeerQueue,
6058
ctx: ctx,
61-
self: self,
6259

6360
sessions: make(map[uint64]Session),
6461
peerSessions: make(map[peer.ID]map[uint64]struct{}),

bitswap/client/internal/peermanager/peermanager_test.go

+13-14
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ func TestAddingAndRemovingPeers(t *testing.T) {
8585
peerQueueFactory := makePeerQueueFactory(msgs)
8686

8787
tp := random.Peers(6)
88-
self, peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4], tp[5]
89-
peerManager := New(ctx, peerQueueFactory, self)
88+
peer1, peer2, peer3, peer4, peer5 := tp[0], tp[1], tp[2], tp[3], tp[4]
89+
peerManager := New(ctx, peerQueueFactory)
9090

9191
peerManager.Connected(peer1)
9292
peerManager.Connected(peer2)
@@ -128,8 +128,8 @@ func TestBroadcastOnConnect(t *testing.T) {
128128
msgs := make(chan msg, 16)
129129
peerQueueFactory := makePeerQueueFactory(msgs)
130130
tp := random.Peers(2)
131-
self, peer1 := tp[0], tp[1]
132-
peerManager := New(ctx, peerQueueFactory, self)
131+
peer1 := tp[0]
132+
peerManager := New(ctx, peerQueueFactory)
133133

134134
cids := random.Cids(2)
135135
peerManager.BroadcastWantHaves(ctx, cids)
@@ -149,8 +149,8 @@ func TestBroadcastWantHaves(t *testing.T) {
149149
msgs := make(chan msg, 16)
150150
peerQueueFactory := makePeerQueueFactory(msgs)
151151
tp := random.Peers(3)
152-
self, peer1, peer2 := tp[0], tp[1], tp[2]
153-
peerManager := New(ctx, peerQueueFactory, self)
152+
peer1, peer2 := tp[0], tp[1]
153+
peerManager := New(ctx, peerQueueFactory)
154154

155155
cids := random.Cids(3)
156156

@@ -190,8 +190,8 @@ func TestSendWants(t *testing.T) {
190190
msgs := make(chan msg, 16)
191191
peerQueueFactory := makePeerQueueFactory(msgs)
192192
tp := random.Peers(2)
193-
self, peer1 := tp[0], tp[1]
194-
peerManager := New(ctx, peerQueueFactory, self)
193+
peer1 := tp[0]
194+
peerManager := New(ctx, peerQueueFactory)
195195
cids := random.Cids(4)
196196

197197
peerManager.Connected(peer1)
@@ -224,8 +224,8 @@ func TestSendCancels(t *testing.T) {
224224
msgs := make(chan msg, 16)
225225
peerQueueFactory := makePeerQueueFactory(msgs)
226226
tp := random.Peers(3)
227-
self, peer1, peer2 := tp[0], tp[1], tp[2]
228-
peerManager := New(ctx, peerQueueFactory, self)
227+
peer1, peer2 := tp[0], tp[1]
228+
peerManager := New(ctx, peerQueueFactory)
229229
cids := random.Cids(4)
230230

231231
// Connect to peer1 and peer2
@@ -285,8 +285,8 @@ func TestSessionRegistration(t *testing.T) {
285285
peerQueueFactory := makePeerQueueFactory(msgs)
286286

287287
tp := random.Peers(3)
288-
self, p1, p2 := tp[0], tp[1], tp[2]
289-
peerManager := New(ctx, peerQueueFactory, self)
288+
p1, p2 := tp[0], tp[1]
289+
peerManager := New(ctx, peerQueueFactory)
290290

291291
id := uint64(1)
292292
s := newSess(id)
@@ -344,9 +344,8 @@ func BenchmarkPeerManager(b *testing.B) {
344344
return &benchPeerQueue{}
345345
}
346346

347-
self := random.Peers(1)[0]
348347
peers := random.Peers(500)
349-
peerManager := New(ctx, peerQueueFactory, self)
348+
peerManager := New(ctx, peerQueueFactory)
350349

351350
// Create a bunch of connections
352351
connected := 0

bitswap/network/bsnet/bsnet.go

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package bsnet
2+
3+
import "github.com/ipfs/boxo/bitswap/network/bsnet/internal"
4+
5+
var (
6+
// ProtocolBitswapNoVers is equivalent to the legacy bitswap protocol
7+
ProtocolBitswapNoVers = internal.ProtocolBitswapNoVers
8+
// ProtocolBitswapOneZero is the prefix for the legacy bitswap protocol
9+
ProtocolBitswapOneZero = internal.ProtocolBitswapOneZero
10+
// ProtocolBitswapOneOne is the prefix for version 1.1.0
11+
ProtocolBitswapOneOne = internal.ProtocolBitswapOneOne
12+
// ProtocolBitswap is the current version of the bitswap protocol: 1.2.0
13+
ProtocolBitswap = internal.ProtocolBitswap
14+
)
File renamed without changes.

bitswap/network/ipfs_impl.go bitswap/network/bsnet/ipfs_impl.go

+42-18
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package network
1+
package bsnet
22

33
import (
44
"context"
@@ -9,10 +9,10 @@ import (
99
"time"
1010

1111
bsmsg "github.com/ipfs/boxo/bitswap/message"
12-
"github.com/ipfs/boxo/bitswap/network/internal"
12+
iface "github.com/ipfs/boxo/bitswap/network"
13+
"github.com/ipfs/boxo/bitswap/network/bsnet/internal"
1314

1415
logging "github.com/ipfs/go-log/v2"
15-
"github.com/libp2p/go-libp2p/core/connmgr"
1616
"github.com/libp2p/go-libp2p/core/host"
1717
"github.com/libp2p/go-libp2p/core/network"
1818
"github.com/libp2p/go-libp2p/core/peer"
@@ -23,7 +23,7 @@ import (
2323
"github.com/multiformats/go-multistream"
2424
)
2525

26-
var log = logging.Logger("bitswap/network")
26+
var log = logging.Logger("bitswap/bsnet")
2727

2828
var (
2929
maxSendTimeout = 2 * time.Minute
@@ -33,7 +33,7 @@ var (
3333
)
3434

3535
// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host.
36-
func NewFromIpfsHost(host host.Host, opts ...NetOpt) BitSwapNetwork {
36+
func NewFromIpfsHost(host host.Host, opts ...NetOpt) iface.BitSwapNetwork {
3737
s := processSettings(opts...)
3838

3939
bitswapNetwork := impl{
@@ -66,10 +66,10 @@ func processSettings(opts ...NetOpt) Settings {
6666
type impl struct {
6767
// NOTE: Stats must be at the top of the heap allocation to ensure 64bit
6868
// alignment.
69-
stats Stats
69+
stats iface.Stats
7070

7171
host host.Host
72-
connectEvtMgr *connectEventManager
72+
connectEvtMgr *iface.ConnectEventManager
7373

7474
protocolBitswapNoVers protocol.ID
7575
protocolBitswapOneZero protocol.ID
@@ -79,7 +79,7 @@ type impl struct {
7979
supportedProtocols []protocol.ID
8080

8181
// inbound messages from the network are forwarded to the receiver
82-
receivers []Receiver
82+
receivers []iface.Receiver
8383
}
8484

8585
// interfaceWrapper is concrete type that wraps an interface. Necessary because
@@ -109,7 +109,7 @@ type streamMessageSender struct {
109109
to peer.ID
110110
stream atomicInterface[network.Stream]
111111
bsnet *impl
112-
opts *MessageSenderOpts
112+
opts *iface.MessageSenderOpts
113113
}
114114

115115
type HasContext interface {
@@ -317,7 +317,7 @@ func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.
317317
return nil
318318
}
319319

320-
func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *MessageSenderOpts) (MessageSender, error) {
320+
func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *iface.MessageSenderOpts) (iface.MessageSender, error) {
321321
opts = setDefaultOpts(opts)
322322

323323
sender := &streamMessageSender{
@@ -337,7 +337,7 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID, opts *Messag
337337
return sender, nil
338338
}
339339

340-
func setDefaultOpts(opts *MessageSenderOpts) *MessageSenderOpts {
340+
func setDefaultOpts(opts *iface.MessageSenderOpts) *iface.MessageSenderOpts {
341341
copy := *opts
342342
if opts.MaxRetries == 0 {
343343
copy.MaxRetries = 3
@@ -385,14 +385,14 @@ func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stre
385385
return bsnet.host.NewStream(ctx, p, bsnet.supportedProtocols...)
386386
}
387387

388-
func (bsnet *impl) Start(r ...Receiver) {
388+
func (bsnet *impl) Start(r ...iface.Receiver) {
389389
bsnet.receivers = r
390390
{
391-
connectionListeners := make([]ConnectionListener, len(r))
391+
connectionListeners := make([]iface.ConnectionListener, len(r))
392392
for i, v := range r {
393393
connectionListeners[i] = v
394394
}
395-
bsnet.connectEvtMgr = newConnectEventManager(connectionListeners...)
395+
bsnet.connectEvtMgr = iface.NewConnectEventManager(connectionListeners...)
396396
}
397397
for _, proto := range bsnet.supportedProtocols {
398398
bsnet.host.SetStreamHandler(proto, bsnet.handleNewStream)
@@ -451,12 +451,36 @@ func (bsnet *impl) handleNewStream(s network.Stream) {
451451
}
452452
}
453453

454-
func (bsnet *impl) ConnectionManager() connmgr.ConnManager {
455-
return bsnet.host.ConnManager()
454+
func (bsnet *impl) TagPeer(p peer.ID, tag string, w int) {
455+
if bsnet.host == nil {
456+
return
457+
}
458+
bsnet.host.ConnManager().TagPeer(p, tag, w)
459+
}
460+
461+
func (bsnet *impl) UntagPeer(p peer.ID, tag string) {
462+
if bsnet.host == nil {
463+
return
464+
}
465+
bsnet.host.ConnManager().UntagPeer(p, tag)
466+
}
467+
468+
func (bsnet *impl) Protect(p peer.ID, tag string) {
469+
if bsnet.host == nil {
470+
return
471+
}
472+
bsnet.host.ConnManager().Protect(p, tag)
473+
}
474+
475+
func (bsnet *impl) Unprotect(p peer.ID, tag string) bool {
476+
if bsnet.host == nil {
477+
return false
478+
}
479+
return bsnet.host.ConnManager().Unprotect(p, tag)
456480
}
457481

458-
func (bsnet *impl) Stats() Stats {
459-
return Stats{
482+
func (bsnet *impl) Stats() iface.Stats {
483+
return iface.Stats{
460484
MessagesRecvd: atomic.LoadUint64(&bsnet.stats.MessagesRecvd),
461485
MessagesSent: atomic.LoadUint64(&bsnet.stats.MessagesSent),
462486
}

bitswap/network/ipfs_impl_test.go bitswap/network/bsnet/ipfs_impl_test.go

+14-13
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package network_test
1+
package bsnet_test
22

33
import (
44
"context"
@@ -10,13 +10,14 @@ import (
1010

1111
bsmsg "github.com/ipfs/boxo/bitswap/message"
1212
pb "github.com/ipfs/boxo/bitswap/message/pb"
13-
bsnet "github.com/ipfs/boxo/bitswap/network"
14-
"github.com/ipfs/boxo/bitswap/network/internal"
13+
network "github.com/ipfs/boxo/bitswap/network"
14+
bsnet "github.com/ipfs/boxo/bitswap/network/bsnet"
15+
"github.com/ipfs/boxo/bitswap/network/bsnet/internal"
1516
tn "github.com/ipfs/boxo/bitswap/testnet"
1617
"github.com/ipfs/go-test/random"
1718
tnet "github.com/libp2p/go-libp2p-testing/net"
1819
"github.com/libp2p/go-libp2p/core/host"
19-
"github.com/libp2p/go-libp2p/core/network"
20+
p2pnet "github.com/libp2p/go-libp2p/core/network"
2021
"github.com/libp2p/go-libp2p/core/peer"
2122
"github.com/libp2p/go-libp2p/core/protocol"
2223
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
@@ -30,7 +31,7 @@ type receiver struct {
3031
connectionEvent chan bool
3132
lastMessage bsmsg.BitSwapMessage
3233
lastSender peer.ID
33-
listener network.Notifiee
34+
listener p2pnet.Notifiee
3435
}
3536

3637
func newReceiver() *receiver {
@@ -71,7 +72,7 @@ func (r *receiver) PeerDisconnected(p peer.ID) {
7172
var errMockNetErr = errors.New("network err")
7273

7374
type ErrStream struct {
74-
network.Stream
75+
p2pnet.Stream
7576
lk sync.Mutex
7677
err error
7778
timingOut bool
@@ -120,7 +121,7 @@ func (eh *ErrHost) Connect(ctx context.Context, pi peer.AddrInfo) error {
120121
return eh.Host.Connect(ctx, pi)
121122
}
122123

123-
func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (network.Stream, error) {
124+
func (eh *ErrHost) NewStream(ctx context.Context, p peer.ID, pids ...protocol.ID) (p2pnet.Stream, error) {
124125
eh.lk.Lock()
125126
defer eh.lk.Unlock()
126127

@@ -268,7 +269,7 @@ func TestMessageSendAndReceive(t *testing.T) {
268269
}
269270
}
270271

271-
func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *receiver, p2 tnet.Identity, r2 *receiver) (*ErrHost, bsnet.BitSwapNetwork, *ErrHost, bsnet.BitSwapNetwork, bsmsg.BitSwapMessage) {
272+
func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *receiver, p2 tnet.Identity, r2 *receiver) (*ErrHost, network.BitSwapNetwork, *ErrHost, network.BitSwapNetwork, bsmsg.BitSwapMessage) {
272273
// create network
273274
mn := mocknet.New()
274275
defer mn.Close()
@@ -337,7 +338,7 @@ func TestMessageResendAfterError(t *testing.T) {
337338
eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)
338339

339340
testSendErrorBackoff := 100 * time.Millisecond
340-
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
341+
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{
341342
MaxRetries: 3,
342343
SendTimeout: 100 * time.Millisecond,
343344
SendErrorBackoff: testSendErrorBackoff,
@@ -382,7 +383,7 @@ func TestMessageSendTimeout(t *testing.T) {
382383

383384
eh, bsnet1, _, _, msg := prepareNetwork(t, ctx, p1, r1, p2, r2)
384385

385-
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
386+
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{
386387
MaxRetries: 3,
387388
SendTimeout: 100 * time.Millisecond,
388389
SendErrorBackoff: 100 * time.Millisecond,
@@ -424,7 +425,7 @@ func TestMessageSendNotSupportedResponse(t *testing.T) {
424425
eh, bsnet1, _, _, _ := prepareNetwork(t, ctx, p1, r1, p2, r2)
425426

426427
eh.setError(multistream.ErrNotSupported[protocol.ID]{})
427-
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{
428+
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{
428429
MaxRetries: 3,
429430
SendTimeout: 100 * time.Millisecond,
430431
SendErrorBackoff: 100 * time.Millisecond,
@@ -482,7 +483,7 @@ func TestSupportsHave(t *testing.T) {
482483
t.Fatal(err)
483484
}
484485

485-
senderCurrent, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{})
486+
senderCurrent, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{})
486487
if err != nil {
487488
t.Fatal(err)
488489
}
@@ -532,7 +533,7 @@ func testNetworkCounters(t *testing.T, n1 int, n2 int) {
532533
}
533534

534535
if n2 > 0 {
535-
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &bsnet.MessageSenderOpts{})
536+
ms, err := bsnet1.NewMessageSender(ctx, p2.ID(), &network.MessageSenderOpts{})
536537
if err != nil {
537538
t.Fatal(err)
538539
}

0 commit comments

Comments
 (0)