Skip to content

Commit 1e7a3f3

Browse files
committed
bitswap/httpnet: ping via http request. fix tests.
1 parent b28294c commit 1e7a3f3

File tree

7 files changed

+82
-84
lines changed

7 files changed

+82
-84
lines changed

bitswap/network/http_multiaddr_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -107,15 +107,15 @@ func TestExtractHTTPAddress(t *testing.T) {
107107
return
108108
}
109109

110-
got, sni, err := ExtractHTTPAddress(ma)
110+
got, err := ExtractHTTPAddress(ma)
111111
if (err != nil) != tt.expectErr {
112-
t.Errorf("got: %s", got)
112+
t.Errorf("got: %s", got.URL)
113113
t.Errorf("ExtractHTTPAddress() error = %v, wantErr %v", err, tt.expectErr)
114114
return
115115
}
116116

117-
if tt.want != nil && (got == nil || got.String() != tt.want.String() || tt.sni != sni) {
118-
t.Errorf("ExtractHTTPAddress() = %v (%s), want %v (%s)", got, sni, tt.want, tt.sni)
117+
if tt.want != nil && (got.URL == nil || got.URL.String() != tt.want.String() || tt.sni != got.SNI) {
118+
t.Errorf("ExtractHTTPAddress() = %v (%s), want %v (%s)", got.URL, got.SNI, tt.want, tt.sni)
119119
}
120120
})
121121
}
@@ -178,7 +178,7 @@ func TestExtractHTTPAddressesFromPeer(t *testing.T) {
178178

179179
// Compare URLs
180180
for i := range got {
181-
if got[i].String() != tt.want[i].String() {
181+
if got[i].URL.String() != tt.want[i].String() {
182182
t.Errorf("ExtractHTTPAddressesFromPeer() URL at index %d = %v, want %v", i, got[i], tt.want[i])
183183
}
184184
}

bitswap/network/httpnet/httpnet.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ var (
4949

5050
var pingCid = "bafkqaaa" // identity CID
5151

52+
const http2proto = "HTTP/2.0"
53+
5254
// Option allows to configure the Network.
5355
type Option func(net *Network)
5456

@@ -167,7 +169,6 @@ type Network struct {
167169
func New(host host.Host, opts ...Option) network.BitSwapNetwork {
168170
htnet := &Network{
169171
host: host,
170-
pinger: newPinger(host),
171172
userAgent: defaultUserAgent(),
172173
maxBlockSize: DefaultMaxBlockSize,
173174
dialTimeout: DefaultDialTimeout,
@@ -236,10 +237,13 @@ func New(host host.Host, opts ...Option) network.BitSwapNetwork {
236237
}
237238

238239
c := &http.Client{
239-
Transport: newTransport(t),
240+
Transport: t,
240241
}
241242
htnet.client = c
242243

244+
pinger := newPinger(host, htnet.client, pingCid, htnet.userAgent)
245+
htnet.pinger = pinger
246+
243247
return htnet
244248
}
245249

@@ -353,7 +357,7 @@ func (ht *Network) Connect(ctx context.Context, p peer.AddrInfo) error {
353357
// on success.
354358
var workingAddrs []multiaddr.Multiaddr
355359
for i, u := range urls {
356-
req, err := ht.buildRequest(ctx, u, "GET", pingCid)
360+
req, err := buildRequest(ctx, u, "GET", pingCid, ht.userAgent)
357361
if err != nil {
358362
log.Debug(err)
359363
return err
@@ -370,7 +374,7 @@ func (ht *Network) Connect(ctx context.Context, p peer.AddrInfo) error {
370374
continue
371375
}
372376

373-
if resp.Proto != "HTTP/2.0" {
377+
if resp.Proto != http2proto {
374378
log.Warnf("%s://%q is not using HTTP/2 (%s)", req.URL.Scheme, req.URL.Host, resp.Proto)
375379
}
376380

@@ -452,7 +456,7 @@ func (ht *Network) Stats() network.Stats {
452456
}
453457

454458
// buildRequests sets up common settings for making a requests.
455-
func (ht *Network) buildRequest(ctx context.Context, u network.ParsedURL, method string, cid string) (*http.Request, error) {
459+
func buildRequest(ctx context.Context, u network.ParsedURL, method string, cid string, userAgent string) (*http.Request, error) {
456460
// copy url
457461
sendURL, _ := url.Parse(u.URL.String())
458462
sendURL.RawQuery = "format=raw"
@@ -470,7 +474,7 @@ func (ht *Network) buildRequest(ctx context.Context, u network.ParsedURL, method
470474

471475
headers := make(http.Header)
472476
headers.Add("Accept", "application/vnd.ipld.raw")
473-
headers.Add("User-Agent", ht.userAgent)
477+
headers.Add("User-Agent", userAgent)
474478
if u.SNI != "" {
475479
headers.Add("Host", u.SNI)
476480
}

bitswap/network/httpnet/httpnet_test.go

+26-10
Original file line numberDiff line numberDiff line change
@@ -296,49 +296,65 @@ func TestBestURL(t *testing.T) {
296296
now := time.Now()
297297
surls := []*senderURL{
298298
{
299-
url: urls[0],
299+
ParsedURL: network.ParsedURL{
300+
URL: urls[0],
301+
},
300302
cooldown: now.Add(time.Second),
301303
clientErrors: 0,
302304
serverErrors: 2,
303305
},
304306
{
305-
url: urls[1],
307+
ParsedURL: network.ParsedURL{
308+
URL: urls[1],
309+
},
306310
cooldown: now.Add(time.Second),
307311
clientErrors: 0,
308312
serverErrors: 1,
309313
},
310314
{
311-
url: urls[2],
315+
ParsedURL: network.ParsedURL{
316+
URL: urls[2],
317+
},
312318
cooldown: time.Time{},
313319
clientErrors: 0,
314320
serverErrors: 3,
315321
},
316322
{
317-
url: urls[3],
323+
ParsedURL: network.ParsedURL{
324+
URL: urls[3],
325+
},
318326
cooldown: time.Time{},
319327
clientErrors: 0,
320328
serverErrors: 2,
321329
},
322330
{
323-
url: urls[4],
331+
ParsedURL: network.ParsedURL{
332+
URL: urls[4],
333+
},
324334
cooldown: time.Time{},
325335
clientErrors: 0,
326336
serverErrors: 1,
327337
},
328338
{
329-
url: urls[5],
339+
ParsedURL: network.ParsedURL{
340+
URL: urls[5],
341+
},
330342
cooldown: time.Time{},
331343
clientErrors: 0,
332344
serverErrors: 20,
333345
},
334346
{
335-
url: urls[6],
347+
ParsedURL: network.ParsedURL{
348+
URL: urls[6],
349+
},
336350
cooldown: time.Time{},
337351
clientErrors: 2,
338352
serverErrors: 0,
339353
},
340354
{
341-
url: urls[7],
355+
ParsedURL: network.ParsedURL{
356+
URL: urls[7],
357+
},
342358
cooldown: now.Add(2 * time.Second),
343359
clientErrors: 0,
344360
serverErrors: 0,
@@ -360,8 +376,8 @@ func TestBestURL(t *testing.T) {
360376
}
361377

362378
for i, u := range ms.urls {
363-
if u.url.String() != expected[i] {
364-
t.Error("wrong url order", i, u.url)
379+
if u.URL.String() != expected[i] {
380+
t.Error("wrong url order", i, u.URL)
365381
}
366382
}
367383

bitswap/network/httpnet/msg_sender.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm
196196

197197
ctx, cancel := context.WithTimeout(ctx, sender.opts.SendTimeout)
198198
defer cancel()
199-
req, err := sender.ht.buildRequest(ctx, u.ParsedURL, method, entry.Cid.String())
199+
req, err := buildRequest(ctx, u.ParsedURL, method, entry.Cid.String(), sender.ht.userAgent)
200200
if err != nil {
201201
return &senderError{
202202
Type: typeFatal,
@@ -386,11 +386,9 @@ func (sender *httpMsgSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessa
386386
if entry.Cancel {
387387
entryCtxs[i] = ctx
388388
entryCancels[i] = nop
389+
} else {
390+
entryCtxs[i], entryCancels[i] = sender.ht.requestTracker.requestContext(ctx, entry.Cid)
389391
}
390-
// The TTL here is just for auto-cleaning the request context
391-
// from the request tracker. It is set in a way that ensure that the request
392-
// has run
393-
entryCtxs[i], entryCancels[i] = sender.ht.requestTracker.requestContext(ctx, entry.Cid)
394392
}
395393

396394
WANTLIST_LOOP:

bitswap/network/httpnet/pinger.go

+26-25
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,25 @@ package httpnet
22

33
import (
44
"context"
5-
"net"
6-
"net/url"
5+
"fmt"
6+
"net/http"
77
"sync"
88
"time"
99

1010
"github.com/ipfs/boxo/bitswap/network"
1111
"github.com/libp2p/go-libp2p/core/host"
1212
"github.com/libp2p/go-libp2p/core/peer"
1313
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
14-
probing "github.com/prometheus-community/pro-bing"
1514
"go.uber.org/multierr"
1615
)
1716

1817
// pinger pings connected hosts on regular intervals
1918
// and tracks their latency.
2019
type pinger struct {
21-
host host.Host
20+
host host.Host
21+
pingCid string
22+
userAgent string
23+
client *http.Client
2224

2325
latenciesLock sync.RWMutex
2426
latencies map[peer.ID]time.Duration
@@ -27,9 +29,11 @@ type pinger struct {
2729
pings map[peer.ID]context.CancelFunc
2830
}
2931

30-
func newPinger(h host.Host) *pinger {
32+
func newPinger(h host.Host, client *http.Client, pingCid, userAgent string) *pinger {
3133
return &pinger{
3234
host: h,
35+
pingCid: pingCid,
36+
userAgent: userAgent,
3337
latencies: make(map[peer.ID]time.Duration),
3438
pings: make(map[peer.ID]context.CancelFunc),
3539
}
@@ -49,36 +53,33 @@ func (pngr *pinger) ping(ctx context.Context, p peer.ID) ping.Result {
4953

5054
results := make(chan ping.Result, len(urls))
5155
for _, u := range urls {
52-
go func(u *url.URL) {
53-
// Remove port from url.
54-
host, _, err := net.SplitHostPort(u.Host)
56+
go func(u network.ParsedURL) {
57+
req, err := buildRequest(ctx, u, "GET", pngr.pingCid, pngr.userAgent)
5558
if err != nil {
56-
results <- ping.Result{
57-
Error: err,
58-
}
59+
log.Debug(err)
60+
results <- ping.Result{Error: err}
61+
return
5962
}
6063

61-
pinger, err := probing.NewPinger(host)
64+
log.Debugf("ping request to %s", req.URL)
65+
start := time.Now()
66+
resp, err := pngr.client.Do(req)
6267
if err != nil {
63-
log.Debug("pinger error ", err)
64-
results <- ping.Result{
65-
Error: err,
66-
}
68+
results <- ping.Result{Error: err}
69+
return
6770
}
68-
pinger.Count = 1
6971

70-
err = pinger.RunWithContext(ctx)
71-
if err != nil {
72-
log.Debug("ping error ", err)
73-
results <- ping.Result{
74-
Error: err,
75-
}
72+
if resp.StatusCode >= 300 { // non-success
73+
err := fmt.Errorf("ping request to %q returned %d", req.URL, resp.StatusCode)
74+
log.Error(err)
75+
results <- ping.Result{Error: err}
76+
return
7677
}
7778

7879
results <- ping.Result{
79-
RTT: pinger.Statistics().AvgRtt,
80+
RTT: time.Since(start),
8081
}
81-
}(u.URL)
82+
}(u)
8283
}
8384

8485
var result ping.Result

bitswap/network/httpnet/transport.go

-28
This file was deleted.

bitswap/network/router.go

+12-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"github.com/libp2p/go-libp2p/core/peer"
99
"github.com/libp2p/go-libp2p/core/peerstore"
1010
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
11-
"go.uber.org/multierr"
1211
)
1312

1413
type router struct {
@@ -112,10 +111,18 @@ func (rt *router) Connect(ctx context.Context, p peer.AddrInfo) error {
112111
}
113112

114113
func (rt *router) DisconnectFrom(ctx context.Context, p peer.ID) error {
115-
return multierr.Combine(
116-
rt.HTTP.DisconnectFrom(ctx, p),
117-
rt.Bitswap.DisconnectFrom(ctx, p),
118-
)
114+
// DisconnectFrom is only called from bitswap.Server, on failures
115+
// receiving a bitswap message. Normally, if HTTP is prioritized, we
116+
// should not have requested anything over bitswap, so this should not
117+
// happen.
118+
//
119+
// Still, follow prioritization rule.
120+
pi := rt.Peerstore.PeerInfo(p)
121+
htaddrs, _ := SplitHTTPAddrs(pi)
122+
if len(htaddrs.Addrs) > 0 {
123+
return rt.HTTP.DisconnectFrom(ctx, p)
124+
}
125+
return rt.Bitswap.DisconnectFrom(ctx, p)
119126
}
120127

121128
func (rt *router) Stats() Stats {

0 commit comments

Comments
 (0)