Skip to content

Commit 41bf668

Browse files
Merge pull request #1294 from natewalck/more-opencensus-metrics
More opencensus metrics
2 parents 2cca6ab + 65cd133 commit 41bf668

File tree

7 files changed

+166
-19
lines changed

7 files changed

+166
-19
lines changed

chain/store/store.go

+11-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"github.com/filecoin-project/lotus/build"
1414
"github.com/filecoin-project/lotus/chain/state"
1515
"github.com/filecoin-project/lotus/chain/vm"
16+
"github.com/filecoin-project/lotus/metrics"
17+
"go.opencensus.io/stats"
1618
"go.opencensus.io/trace"
1719
"go.uber.org/multierr"
1820

@@ -100,7 +102,15 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls *types.VMSy
100102
return nil
101103
}
102104

103-
cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf)
105+
hcmetric := func(rev, app []*types.TipSet) error {
106+
ctx := context.Background()
107+
for _, r := range app {
108+
stats.Record(ctx, metrics.ChainNodeHeight.M(int64(r.Height())))
109+
}
110+
return nil
111+
}
112+
113+
cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf, hcmetric)
104114

105115
return cs
106116
}

chain/sub/incoming.go

+28-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sub
22

33
import (
44
"context"
5+
"fmt"
56
"time"
67

78
lru "github.com/hashicorp/golang-lru"
@@ -10,11 +11,14 @@ import (
1011
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
1112
peer "github.com/libp2p/go-libp2p-peer"
1213
pubsub "github.com/libp2p/go-libp2p-pubsub"
14+
"go.opencensus.io/stats"
15+
"go.opencensus.io/tag"
1316

1417
"github.com/filecoin-project/lotus/build"
1518
"github.com/filecoin-project/lotus/chain"
1619
"github.com/filecoin-project/lotus/chain/messagepool"
1720
"github.com/filecoin-project/lotus/chain/types"
21+
"github.com/filecoin-project/lotus/metrics"
1822
)
1923

2024
var log = logging.Logger("sub")
@@ -107,15 +111,25 @@ func (bv *BlockValidator) flagPeer(p peer.ID) {
107111
}
108112

109113
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool {
114+
stats.Record(ctx, metrics.BlockReceived.M(1))
115+
ctx, _ = tag.New(
116+
ctx,
117+
tag.Insert(metrics.PeerID, pid.String()),
118+
tag.Insert(metrics.ReceivedFrom, msg.ReceivedFrom.String()),
119+
)
110120
blk, err := types.DecodeBlockMsg(msg.GetData())
111121
if err != nil {
112122
log.Error("got invalid block over pubsub: ", err)
123+
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "invalid"))
124+
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
113125
bv.flagPeer(pid)
114126
return false
115127
}
116128

117129
if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit {
118130
log.Warnf("received block with too many messages over pubsub")
131+
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "too_many_messages"))
132+
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
119133
bv.flagPeer(pid)
120134
return false
121135
}
@@ -127,6 +141,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
127141
}
128142

129143
msg.ValidatorData = blk
144+
stats.Record(ctx, metrics.BlockValidationSuccess.M(1))
130145
return true
131146
}
132147

@@ -162,17 +177,29 @@ func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator {
162177
}
163178

164179
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool {
180+
stats.Record(ctx, metrics.MessageReceived.M(1))
181+
ctx, _ = tag.New(ctx, tag.Insert(metrics.PeerID, pid.String()))
165182
m, err := types.DecodeSignedMessage(msg.Message.GetData())
166183
if err != nil {
167184
log.Warnf("failed to decode incoming message: %s", err)
185+
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "decode"))
186+
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
168187
return false
169188
}
170189

171190
if err := mv.mpool.Add(m); err != nil {
172191
log.Warnf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
192+
ctx, _ = tag.New(
193+
ctx,
194+
tag.Insert(metrics.MessageFrom, m.Message.From.String()),
195+
tag.Insert(metrics.MessageTo, m.Message.To.String()),
196+
tag.Insert(metrics.MessageNonce, fmt.Sprint(m.Message.Nonce)),
197+
tag.Insert(metrics.FailureType, "add"),
198+
)
199+
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
173200
return false
174201
}
175-
202+
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
176203
return true
177204
}
178205

chain/sync.go

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/libp2p/go-libp2p-core/peer"
2424
cbg "github.com/whyrusleeping/cbor-gen"
2525
"github.com/whyrusleeping/pubsub"
26+
"go.opencensus.io/stats"
2627
"go.opencensus.io/trace"
2728
"golang.org/x/xerrors"
2829

@@ -38,6 +39,7 @@ import (
3839
"github.com/filecoin-project/lotus/chain/store"
3940
"github.com/filecoin-project/lotus/chain/types"
4041
"github.com/filecoin-project/lotus/lib/sigs"
42+
"github.com/filecoin-project/lotus/metrics"
4143
)
4244

4345
var log = logging.Logger("chain")
@@ -1038,6 +1040,7 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*
10381040
return xerrors.Errorf("message processing failed: %w", err)
10391041
}
10401042

1043+
stats.Record(ctx, metrics.ChainNodeWorkerHeight.M(int64(fts.TipSet().Height())))
10411044
ss.SetHeight(fts.TipSet().Height())
10421045

10431046
return nil

cmd/lotus/daemon.go

+6-17
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/filecoin-project/lotus/chain/stmgr"
2525
"github.com/filecoin-project/lotus/chain/store"
2626
"github.com/filecoin-project/lotus/chain/vm"
27+
"github.com/filecoin-project/lotus/metrics"
2728
"github.com/filecoin-project/lotus/node"
2829
"github.com/filecoin-project/lotus/node/modules"
2930
"github.com/filecoin-project/lotus/node/modules/testing"
@@ -36,12 +37,6 @@ const (
3637
preSealedSectorsFlag = "genesis-presealed-sectors"
3738
)
3839

39-
var (
40-
lotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
41-
version, _ = tag.NewKey("version")
42-
commit, _ = tag.NewKey("commit")
43-
)
44-
4540
// DaemonCmd is the `go-lotus daemon` command
4641
var DaemonCmd = &cli.Command{
4742
Name: "daemon",
@@ -99,7 +94,7 @@ var DaemonCmd = &cli.Command{
9994
defer pprof.StopCPUProfile()
10095
}
10196

102-
ctx, _ := tag.New(context.Background(), tag.Insert(version, build.BuildVersion), tag.Insert(commit, build.CurrentCommit))
97+
ctx, _ := tag.New(context.Background(), tag.Insert(metrics.Version, build.BuildVersion), tag.Insert(metrics.Commit, build.CurrentCommit))
10398
{
10499
dir, err := homedir.Expand(cctx.String("repo"))
105100
if err != nil {
@@ -180,21 +175,15 @@ var DaemonCmd = &cli.Command{
180175
return xerrors.Errorf("initializing node: %w", err)
181176
}
182177

183-
// We are using this metric to tag info about lotus even though
184-
// it doesn't contain any actual metrics
178+
// Register all metric views
185179
if err = view.Register(
186-
&view.View{
187-
Name: "info",
188-
Description: "Lotus node information",
189-
Measure: lotusInfo,
190-
Aggregation: view.LastValue(),
191-
TagKeys: []tag.Key{version, commit},
192-
},
180+
metrics.DefaultViews...,
193181
); err != nil {
194182
log.Fatalf("Cannot register the view: %v", err)
195183
}
184+
196185
// Set the metric to one so it is published to the exporter
197-
stats.Record(ctx, lotusInfo.M(1))
186+
stats.Record(ctx, metrics.LotusInfo.M(1))
198187

199188
endpoint, err := r.APIEndpoint()
200189
if err != nil {

lib/jsonrpc/handler.go

+13
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ import (
99
"io"
1010
"reflect"
1111

12+
"github.com/filecoin-project/lotus/metrics"
13+
"go.opencensus.io/stats"
14+
"go.opencensus.io/tag"
1215
"go.opencensus.io/trace"
1316
"go.opencensus.io/trace/propagation"
1417
"golang.org/x/xerrors"
@@ -151,18 +154,22 @@ func (handlers) getSpan(ctx context.Context, req request) (context.Context, *tra
151154
}
152155

153156
func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer)), rpcError rpcErrFunc, done func(keepCtx bool), chOut chanOut) {
157+
// Not sure if we need to sanitize the incoming req.Method or not.
154158
ctx, span := h.getSpan(ctx, req)
159+
ctx, _ = tag.New(ctx, tag.Insert(metrics.RPCMethod, req.Method))
155160
defer span.End()
156161

157162
handler, ok := h[req.Method]
158163
if !ok {
159164
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not found", req.Method))
165+
stats.Record(ctx, metrics.RPCInvalidMethod.M(1))
160166
done(false)
161167
return
162168
}
163169

164170
if len(req.Params) != handler.nParams {
165171
rpcError(w, &req, rpcInvalidParams, fmt.Errorf("wrong param count"))
172+
stats.Record(ctx, metrics.RPCRequestError.M(1))
166173
done(false)
167174
return
168175
}
@@ -172,6 +179,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer
172179

173180
if chOut == nil && outCh {
174181
rpcError(w, &req, rpcMethodNotFound, fmt.Errorf("method '%s' not supported in this mode (no out channel support)", req.Method))
182+
stats.Record(ctx, metrics.RPCRequestError.M(1))
175183
return
176184
}
177185

@@ -185,6 +193,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer
185193
rp := reflect.New(handler.paramReceivers[i])
186194
if err := json.NewDecoder(bytes.NewReader(req.Params[i].data)).Decode(rp.Interface()); err != nil {
187195
rpcError(w, &req, rpcParseError, xerrors.Errorf("unmarshaling params for '%s': %w", handler.handlerFunc, err))
196+
stats.Record(ctx, metrics.RPCRequestError.M(1))
188197
return
189198
}
190199

@@ -196,6 +205,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer
196205
callResult, err := doCall(req.Method, handler.handlerFunc, callParams)
197206
if err != nil {
198207
rpcError(w, &req, 0, xerrors.Errorf("fatal error calling '%s': %w", req.Method, err))
208+
stats.Record(ctx, metrics.RPCRequestError.M(1))
199209
return
200210
}
201211
if req.ID == nil {
@@ -213,6 +223,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer
213223
err := callResult[handler.errOut].Interface()
214224
if err != nil {
215225
log.Warnf("error in RPC call to '%s': %+v", req.Method, err)
226+
stats.Record(ctx, metrics.RPCResponseError.M(1))
216227
resp.Error = &respError{
217228
Code: 1,
218229
Message: err.(error).Error(),
@@ -234,6 +245,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer
234245
}
235246

236247
log.Warnf("failed to setup channel in RPC call to '%s': %+v", req.Method, err)
248+
stats.Record(ctx, metrics.RPCResponseError.M(1))
237249
resp.Error = &respError{
238250
Code: 1,
239251
Message: err.(error).Error(),
@@ -243,6 +255,7 @@ func (h handlers) handle(ctx context.Context, req request, w func(func(io.Writer
243255
w(func(w io.Writer) {
244256
if err := json.NewEncoder(w).Encode(resp); err != nil {
245257
log.Error(err)
258+
stats.Record(ctx, metrics.RPCResponseError.M(1))
246259
return
247260
}
248261
})

metrics/metrics.go

+102
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package metrics
2+
3+
import (
4+
"go.opencensus.io/stats"
5+
"go.opencensus.io/stats/view"
6+
"go.opencensus.io/tag"
7+
)
8+
9+
// Global Tags
10+
var (
11+
Version, _ = tag.NewKey("version")
12+
Commit, _ = tag.NewKey("commit")
13+
RPCMethod, _ = tag.NewKey("method")
14+
PeerID, _ = tag.NewKey("peer_id")
15+
FailureType, _ = tag.NewKey("failure_type")
16+
MessageFrom, _ = tag.NewKey("message_from")
17+
MessageTo, _ = tag.NewKey("message_to")
18+
MessageNonce, _ = tag.NewKey("message_nonce")
19+
ReceivedFrom, _ = tag.NewKey("received_from")
20+
)
21+
22+
// Measures
23+
var (
24+
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
25+
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
26+
ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless)
27+
MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
28+
MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
29+
MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless)
30+
BlockReceived = stats.Int64("block/received", "Counter for total received blocks", stats.UnitDimensionless)
31+
BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless)
32+
BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless)
33+
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
34+
RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless)
35+
RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless)
36+
RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless)
37+
)
38+
39+
// DefaultViews is an array of Consensus views for metric gathering purposes
40+
var DefaultViews = []*view.View{
41+
&view.View{
42+
Name: "info",
43+
Description: "Lotus node information",
44+
Measure: LotusInfo,
45+
Aggregation: view.LastValue(),
46+
TagKeys: []tag.Key{Version, Commit},
47+
},
48+
&view.View{
49+
Measure: ChainNodeHeight,
50+
Aggregation: view.LastValue(),
51+
},
52+
&view.View{
53+
Measure: ChainNodeWorkerHeight,
54+
Aggregation: view.LastValue(),
55+
},
56+
&view.View{
57+
Measure: BlockReceived,
58+
Aggregation: view.Count(),
59+
},
60+
&view.View{
61+
Measure: BlockValidationFailure,
62+
Aggregation: view.Count(),
63+
TagKeys: []tag.Key{FailureType, PeerID, ReceivedFrom},
64+
},
65+
&view.View{
66+
Measure: BlockValidationSuccess,
67+
Aggregation: view.Count(),
68+
},
69+
&view.View{
70+
Measure: MessageReceived,
71+
Aggregation: view.Count(),
72+
},
73+
&view.View{
74+
Measure: MessageValidationFailure,
75+
Aggregation: view.Count(),
76+
TagKeys: []tag.Key{FailureType, MessageFrom, MessageTo, MessageNonce},
77+
},
78+
&view.View{
79+
Measure: MessageValidationSuccess,
80+
Aggregation: view.Count(),
81+
},
82+
&view.View{
83+
Measure: PeerCount,
84+
Aggregation: view.LastValue(),
85+
},
86+
// All RPC related metrics should at the very least tag the RPCMethod
87+
&view.View{
88+
Measure: RPCInvalidMethod,
89+
Aggregation: view.Count(),
90+
TagKeys: []tag.Key{RPCMethod},
91+
},
92+
&view.View{
93+
Measure: RPCRequestError,
94+
Aggregation: view.Count(),
95+
TagKeys: []tag.Key{RPCMethod},
96+
},
97+
&view.View{
98+
Measure: RPCResponseError,
99+
Aggregation: view.Count(),
100+
TagKeys: []tag.Key{RPCMethod},
101+
},
102+
}

peermgr/peermgr.go

+3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@ import (
55
"sync"
66
"time"
77

8+
"github.com/filecoin-project/lotus/metrics"
89
"github.com/filecoin-project/lotus/node/modules/dtypes"
10+
"go.opencensus.io/stats"
911
"go.uber.org/fx"
1012

1113
host "github.com/libp2p/go-libp2p-core/host"
@@ -115,6 +117,7 @@ func (pmgr *PeerMgr) Run(ctx context.Context) {
115117
} else if pcount > pmgr.maxFilPeers {
116118
log.Debug("peer count about threshold: %d > %d", pcount, pmgr.maxFilPeers)
117119
}
120+
stats.Record(ctx, metrics.PeerCount.M(int64(pmgr.getPeerCount())))
118121
}
119122
}
120123
}

0 commit comments

Comments
 (0)