Skip to content

Commit 7f3b0e1

Browse files
committed
bitswap/network: add new metrics to compare bsnet and httpnet
1 parent 3bc8ebd commit 7f3b0e1

File tree

5 files changed

+93
-21
lines changed

5 files changed

+93
-21
lines changed

bitswap/message/message.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -392,26 +392,30 @@ func BlockPresenceSize(c cid.Cid) int {
392392
}
393393

394394
// FromNet generates a new BitswapMessage from incoming data on an io.Reader.
395-
func FromNet(r io.Reader) (BitSwapMessage, error) {
395+
func FromNet(r io.Reader) (BitSwapMessage, int, error) {
396396
reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax)
397397
return FromMsgReader(reader)
398398
}
399399

400400
// FromPBReader generates a new Bitswap message from a gogo-protobuf reader
401-
func FromMsgReader(r msgio.Reader) (BitSwapMessage, error) {
401+
func FromMsgReader(r msgio.Reader) (BitSwapMessage, int, error) {
402402
msg, err := r.ReadMsg()
403403
if err != nil {
404-
return nil, err
404+
return nil, 0, err
405405
}
406406

407407
var pb pb.Message
408408
err = pb.Unmarshal(msg)
409409
r.ReleaseMsg(msg)
410410
if err != nil {
411-
return nil, err
411+
return nil, 0, err
412412
}
413413

414-
return newMessageFromProto(pb)
414+
m, err := newMessageFromProto(pb)
415+
if err != nil {
416+
return nil, 0, err
417+
}
418+
return m, len(msg), nil
415419
}
416420

417421
func (m *impl) ToProtoV0() *pb.Message {

bitswap/message/message_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) {
110110
t.Fatal(err)
111111
}
112112

113-
copied, err := FromNet(buf)
113+
copied, _, err := FromNet(buf)
114114
if err != nil {
115115
t.Fatal(err)
116116
}
@@ -143,7 +143,7 @@ func TestToAndFromNetMessage(t *testing.T) {
143143
t.Fatal(err)
144144
}
145145

146-
m2, err := FromNet(buf)
146+
m2, _, err := FromNet(buf)
147147
if err != nil {
148148
t.Fatal(err)
149149
}

bitswap/network/bsnet/ipfs_impl.go

+16-1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ func NewFromIpfsHost(host host.Host, opts ...NetOpt) iface.BitSwapNetwork {
4545
protocolBitswap: s.ProtocolPrefix + ProtocolBitswap,
4646

4747
supportedProtocols: s.SupportedProtocols,
48+
49+
metrics: newMetrics(),
4850
}
4951

5052
return &bitswapNetwork
@@ -80,6 +82,8 @@ type impl struct {
8082

8183
// inbound messages from the network are forwarded to the receiver
8284
receivers []iface.Receiver
85+
86+
metrics *metrics
8387
}
8488

8589
// interfaceWrapper is concrete type that wraps an interface. Necessary because
@@ -166,6 +170,15 @@ func (s *streamMessageSender) SupportsHave() bool {
166170

167171
// Send a message to the peer, attempting multiple times
168172
func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
173+
if n := len(msg.Wantlist()); n > 0 {
174+
s.bsnet.metrics.WantlistsTotal.Inc()
175+
s.bsnet.metrics.WantlistsItemsTotal.Add(float64(n))
176+
now := time.Now()
177+
defer func() {
178+
s.bsnet.metrics.WantlistsSeconds.Add(float64(time.Since(now)) / float64(time.Second))
179+
}()
180+
}
181+
169182
return s.multiAttempt(ctx, func() error {
170183
return s.send(ctx, msg)
171184
})
@@ -417,7 +430,7 @@ func (bsnet *impl) handleNewStream(s network.Stream) {
417430

418431
reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
419432
for {
420-
received, err := bsmsg.FromMsgReader(reader)
433+
received, size, err := bsmsg.FromMsgReader(reader)
421434
if err != nil {
422435
if err != io.EOF {
423436
_ = s.Reset()
@@ -429,6 +442,8 @@ func (bsnet *impl) handleNewStream(s network.Stream) {
429442
return
430443
}
431444

445+
bsnet.metrics.ResponseSizes.Observe(float64(size))
446+
bsnet.metrics.ResponseTotalBytes.Add(float64(size))
432447
p := s.Conn().RemotePeer()
433448
ctx := context.Background()
434449
log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())

bitswap/network/httpnet/metrics.go

+40-10
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,38 @@ func requestsFailure(ctx context.Context) imetrics.Counter {
2222
return imetrics.NewCtx(ctx, "requests_failure", "Failed (no response, dial error etc) requests count").Counter()
2323
}
2424

25+
func requestSentBytes(ctx context.Context) imetrics.Counter {
26+
return imetrics.NewCtx(ctx, "request_sent_bytes", "Total bytes sent on requests").Counter()
27+
}
28+
29+
func requestTime(ctx context.Context) imetrics.Histogram {
30+
return imetrics.NewCtx(ctx, "request_duration_seconds", "Histogram of request durations").Histogram(durationHistogramBuckets)
31+
}
32+
2533
func requestsBodyFailure(ctx context.Context) imetrics.Counter {
2634
return imetrics.NewCtx(ctx, "requests_body_failure", "Failure count when reading response body").Counter()
2735
}
2836

37+
func responseSizes(ctx context.Context) imetrics.Histogram {
38+
return imetrics.NewCtx(ctx, "response_bytes", "Histogram of http response sizes").Histogram(blockSizesHistogramBuckets)
39+
}
40+
41+
func responseTotalBytes(ctx context.Context) imetrics.Counter {
42+
return imetrics.NewCtx(ctx, "response_total_bytes", "Accumulated response bytes").Counter()
43+
}
44+
45+
func wantlistsTotal(ctx context.Context) imetrics.Counter {
46+
return imetrics.NewCtx(ctx, "wantlists_total", "Total number of wantlists sent").Counter()
47+
}
48+
49+
func wantlistsItemsTotal(ctx context.Context) imetrics.Counter {
50+
return imetrics.NewCtx(ctx, "wantlists_items_total", "Total number of elements in sent wantlists").Counter()
51+
}
52+
53+
func wantlistsSeconds(ctx context.Context) imetrics.Counter {
54+
return imetrics.NewCtx(ctx, "wantlists_seconds", "Number of seconds spent sending wantlists").Counter()
55+
}
56+
2957
func statusNotFound(ctx context.Context) imetrics.Counter {
3058
return imetrics.NewCtx(ctx, "status_404", "Request count with NotFound status").Counter()
3159
}
@@ -62,18 +90,16 @@ func statusOthers(ctx context.Context) imetrics.Counter {
6290
return imetrics.NewCtx(ctx, "status_others", "Request count with other status codes").Counter()
6391
}
6492

65-
func requestTime(ctx context.Context) imetrics.Histogram {
66-
return imetrics.NewCtx(ctx, "request_duration_seconds", "Histogram of request durations").Histogram(durationHistogramBuckets)
67-
}
68-
69-
func responseSize(ctx context.Context) imetrics.Histogram {
70-
return imetrics.NewCtx(ctx, "response_bytes", "Histogram of http response sizes").Histogram(blockSizesHistogramBuckets)
71-
}
72-
7393
type metrics struct {
7494
RequestsInFlight imetrics.Gauge
7595
RequestsTotal imetrics.Counter
7696
RequestsFailure imetrics.Counter
97+
RequestsSentBytes imetrics.Counter
98+
WantlistsTotal imetrics.Counter
99+
WantlistsItemsTotal imetrics.Counter
100+
WantlistsSeconds imetrics.Counter
101+
ResponseSizes imetrics.Histogram
102+
ResponseTotalBytes imetrics.Counter
77103
RequestsBodyFailure imetrics.Counter
78104
StatusNotFound imetrics.Counter
79105
StatusGone imetrics.Counter
@@ -85,7 +111,6 @@ type metrics struct {
85111
StatusInternalServerError imetrics.Counter
86112
StatusOthers imetrics.Counter
87113
RequestTime imetrics.Histogram
88-
ResponseSize imetrics.Histogram
89114
}
90115

91116
func newMetrics() *metrics {
@@ -94,8 +119,14 @@ func newMetrics() *metrics {
94119
return &metrics{
95120
RequestsInFlight: requestsInFlight(ctx),
96121
RequestsTotal: requestsTotal(ctx),
122+
RequestsSentBytes: requestSentBytes(ctx),
97123
RequestsFailure: requestsFailure(ctx),
98124
RequestsBodyFailure: requestsBodyFailure(ctx),
125+
WantlistsTotal: wantlistsTotal(ctx),
126+
WantlistsItemsTotal: wantlistsItemsTotal(ctx),
127+
WantlistsSeconds: wantlistsSeconds(ctx),
128+
ResponseSizes: responseSizes(ctx),
129+
ResponseTotalBytes: responseTotalBytes(ctx),
99130
StatusNotFound: statusNotFound(ctx),
100131
StatusGone: statusGone(ctx),
101132
StatusForbidden: statusForbidden(ctx),
@@ -106,7 +137,6 @@ func newMetrics() *metrics {
106137
StatusInternalServerError: statusInternalServerError(ctx),
107138
StatusOthers: statusOthers(ctx),
108139
RequestTime: requestTime(ctx),
109-
ResponseSize: responseSize(ctx),
110140
}
111141
}
112142

bitswap/network/httpnet/msg_sender.go

+26-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package httpnet
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
67
"fmt"
@@ -231,6 +232,12 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm
231232
return serr
232233
}
233234

235+
// Record request size
236+
var buf bytes.Buffer
237+
req.Write(&buf)
238+
sender.ht.metrics.RequestsSentBytes.Add(float64((&buf).Len()))
239+
240+
// Handle responses
234241
limReader := &io.LimitedReader{
235242
R: resp.Body,
236243
N: sender.ht.maxBlockSize,
@@ -248,10 +255,18 @@ func (sender *httpMsgSender) tryURL(ctx context.Context, u *senderURL, entry bsm
248255
Err: err,
249256
}
250257
}
251-
reqDuration := time.Since(reqStart)
252-
sender.ht.metrics.ResponseSize.Observe(float64(len(body)))
258+
259+
// Calculate full response size with headers and everything.
260+
// So this is comparable to bitswap message response sizes.
261+
resp.Body = nil
262+
var respBuf bytes.Buffer
263+
resp.Write(&respBuf)
264+
respLen := (&respBuf).Len() + len(body)
265+
266+
sender.ht.metrics.ResponseSizes.Observe(float64(respLen))
267+
sender.ht.metrics.ResponseTotalBytes.Add(float64(respLen))
253268
sender.ht.metrics.RequestsInFlight.Dec()
254-
sender.ht.metrics.RequestTime.Observe(float64(reqDuration) / float64(time.Second))
269+
sender.ht.metrics.RequestTime.Observe(float64(time.Since(reqStart)) / float64(time.Second))
255270
sender.ht.metrics.updateStatusCounter(resp.StatusCode)
256271

257272
sender.ht.connEvtMgr.OnMessage(sender.peer)
@@ -353,6 +368,14 @@ func (sender *httpMsgSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessa
353368
return nil
354369
}
355370

371+
// Keep metrics of wantlists sent and how long it took
372+
sender.ht.metrics.WantlistsTotal.Inc()
373+
sender.ht.metrics.WantlistsItemsTotal.Add(float64(len(wantlist)))
374+
now := time.Now()
375+
defer func() {
376+
sender.ht.metrics.WantlistsSeconds.Add(float64(time.Since(now)) / float64(time.Second))
377+
}()
378+
356379
go func() {
357380
select {
358381
case <-sender.closing:

0 commit comments

Comments
 (0)