Skip to content

Commit

Permalink
seperate metrics with source scope
Browse files Browse the repository at this point in the history
Signed-off-by: cfzjywxk <lsswxrxr@163.com>
  • Loading branch information
cfzjywxk committed Mar 6, 2023
1 parent 31d55e5 commit abdbd94
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 65 deletions.
33 changes: 24 additions & 9 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,16 +487,22 @@ var (
)

type sendReqHistCacheKey struct {
tp tikvrpc.CmdType
id uint64
staleRad bool
tp tikvrpc.CmdType
id uint64
staleRad bool
isInternal bool
}

type sendReqCounterCacheKey struct {
sendReqHistCacheKey
requestSource string
}

type rpcNetLatencyCacheKey struct {
storeID uint64
isInternal bool
}

type sendReqCounterCacheValue struct {
counter prometheus.Counter
timeCounter prometheus.Counter
Expand All @@ -506,11 +512,13 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
elapsed := time.Since(start)
secs := elapsed.Seconds()
storeID := req.Context.GetPeer().GetStoreId()
isInternal := util.IsInternalRequest(req.GetRequestSource())

histKey := sendReqHistCacheKey{
req.Type,
storeID,
staleRead,
isInternal,
}
counterKey := sendReqCounterCacheKey{
histKey,
Expand All @@ -525,7 +533,8 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead))
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr,
strconv.FormatBool(staleRead), strconv.FormatBool(isInternal))
sendReqHistCache.Store(histKey, hist)
}
counter, ok := sendReqCounterCache.Load(counterKey)
Expand All @@ -534,8 +543,10 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
storeIDStr = strconv.FormatUint(storeID, 10)
}
counter = sendReqCounterCacheValue{
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead),
counterKey.requestSource, strconv.FormatBool(isInternal)),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr,
strconv.FormatBool(staleRead), counterKey.requestSource, strconv.FormatBool(isInternal)),
}
sendReqCounterCache.Store(counterKey, counter)
}
Expand All @@ -546,13 +557,17 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr

if execDetail := resp.GetExecDetailsV2(); execDetail != nil &&
execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 {
latHist, ok := rpcNetLatencyHistCache.Load(storeID)
cacheKey := rpcNetLatencyCacheKey{
storeID,
isInternal,
}
latHist, ok := rpcNetLatencyHistCache.Load(cacheKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr)
rpcNetLatencyHistCache.Store(storeID, latHist)
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr, strconv.FormatBool(isInternal))
rpcNetLatencyHistCache.Store(cacheKey, latHist)
}
latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond
latHist.(prometheus.Observer).Observe(latency.Seconds())
Expand Down
6 changes: 5 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,11 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
}

// NOTE: Please add the region error handler in the same order of errorpb.Error.
metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc()
isInternal := false
if req != nil {
isInternal = util.IsInternalRequest(req.GetRequestSource())
}
metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr), strconv.FormatBool(isInternal)).Inc()

if notLeader := regionErr.GetNotLeader(); notLeader != nil {
// Retry if error is `NotLeader`.
Expand Down
26 changes: 13 additions & 13 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ var (
TiKVCoprocessorHistogram *prometheus.HistogramVec
TiKVLockResolverCounter *prometheus.CounterVec
TiKVRegionErrorCounter *prometheus.CounterVec
TiKVTxnWriteKVCountHistogram prometheus.Histogram
TiKVTxnWriteSizeHistogram prometheus.Histogram
TiKVTxnWriteKVCountHistogram *prometheus.HistogramVec
TiKVTxnWriteSizeHistogram *prometheus.HistogramVec
TiKVRawkvCmdHistogram *prometheus.HistogramVec
TiKVRawkvSizeHistogram *prometheus.HistogramVec
TiKVTxnRegionsNumHistogram *prometheus.HistogramVec
Expand Down Expand Up @@ -150,23 +150,23 @@ func initMetrics(namespace, subsystem string) {
Name: "request_seconds",
Help: "Bucketed histogram of sending request duration.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days
}, []string{LblType, LblStore, LblStaleRead})
}, []string{LblType, LblStore, LblStaleRead, LblScope})

TiKVSendReqCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_counter",
Help: "Counter of sending request with multi dimensions.",
}, []string{LblType, LblStore, LblStaleRead, LblSource})
}, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope})

TiKVSendReqTimeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_time_counter",
Help: "Counter of request time with multi dimensions.",
}, []string{LblType, LblStore, LblStaleRead, LblSource})
}, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope})

TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -175,7 +175,7 @@ func initMetrics(namespace, subsystem string) {
Name: "rpc_net_latency_seconds",
Help: "Bucketed histogram of time difference between TiDB and TiKV.",
Buckets: prometheus.ExponentialBuckets(5e-5, 2, 18), // 50us ~ 6.5s
}, []string{LblStore})
}, []string{LblStore, LblScope})

TiKVCoprocessorHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -184,7 +184,7 @@ func initMetrics(namespace, subsystem string) {
Name: "cop_duration_seconds",
Help: "Run duration of a single coprocessor task, includes backoff time.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days
}, []string{LblStore, LblStaleRead})
}, []string{LblStore, LblStaleRead, LblScope})

TiKVLockResolverCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand All @@ -200,25 +200,25 @@ func initMetrics(namespace, subsystem string) {
Subsystem: subsystem,
Name: "region_err_total",
Help: "Counter of region errors.",
}, []string{LblType})
}, []string{LblType, LblScope})

TiKVTxnWriteKVCountHistogram = prometheus.NewHistogram(
TiKVTxnWriteKVCountHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "txn_write_kv_num",
Help: "Count of kv pairs to write in a transaction.",
Buckets: prometheus.ExponentialBuckets(1, 4, 17), // 1 ~ 4G
})
}, []string{LblScope})

TiKVTxnWriteSizeHistogram = prometheus.NewHistogram(
TiKVTxnWriteSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "txn_write_size_bytes",
Help: "Size of kv pairs to write in a transaction.",
Buckets: prometheus.ExponentialBuckets(16, 4, 17), // 16Bytes ~ 64GB
})
}, []string{LblScope})

TiKVRawkvCmdHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -245,7 +245,7 @@ func initMetrics(namespace, subsystem string) {
Name: "txn_regions_num",
Help: "Number of regions in a transaction.",
Buckets: prometheus.ExponentialBuckets(1, 2, 25), // 1 ~ 16M
}, []string{LblType})
}, []string{LblType, LblScope})

TiKVLoadSafepointCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
52 changes: 36 additions & 16 deletions metrics/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,26 @@ var (
BackoffHistogramIsWitness prometheus.Observer
BackoffHistogramEmpty prometheus.Observer

TxnRegionsNumHistogramWithSnapshot prometheus.Observer
TxnRegionsNumHistogramPrewrite prometheus.Observer
TxnRegionsNumHistogramCommit prometheus.Observer
TxnRegionsNumHistogramCleanup prometheus.Observer
TxnRegionsNumHistogramPessimisticLock prometheus.Observer
TxnRegionsNumHistogramPessimisticRollback prometheus.Observer
TxnRegionsNumHistogramWithCoprocessor prometheus.Observer
TxnRegionsNumHistogramWithBatchCoprocessor prometheus.Observer
TxnRegionsNumHistogramWithSnapshotInternal prometheus.Observer
TxnRegionsNumHistogramWithSnapshot prometheus.Observer
TxnRegionsNumHistogramPrewriteInternal prometheus.Observer
TxnRegionsNumHistogramPrewrite prometheus.Observer
TxnRegionsNumHistogramCommitInternal prometheus.Observer
TxnRegionsNumHistogramCommit prometheus.Observer
TxnRegionsNumHistogramCleanupInternal prometheus.Observer
TxnRegionsNumHistogramCleanup prometheus.Observer
TxnRegionsNumHistogramPessimisticLockInternal prometheus.Observer
TxnRegionsNumHistogramPessimisticLock prometheus.Observer
TxnRegionsNumHistogramPessimisticRollbackInternal prometheus.Observer
TxnRegionsNumHistogramPessimisticRollback prometheus.Observer
TxnRegionsNumHistogramWithCoprocessor prometheus.Observer
TxnRegionsNumHistogramWithBatchCoprocessorInternal prometheus.Observer
TxnRegionsNumHistogramWithBatchCoprocessor prometheus.Observer

TxnWriteKVCountHistogramInternal prometheus.Observer
TxnWriteKVCountHistogramGeneral prometheus.Observer
TxnWriteSizeHistogramInternal prometheus.Observer
TxnWriteSizeHistogramGeneral prometheus.Observer

LockResolverCountWithBatchResolve prometheus.Counter
LockResolverCountWithExpired prometheus.Counter
Expand Down Expand Up @@ -185,14 +197,22 @@ func initShortcuts() {
BackoffHistogramIsWitness = TiKVBackoffHistogram.WithLabelValues("isWitness")
BackoffHistogramEmpty = TiKVBackoffHistogram.WithLabelValues("")

TxnRegionsNumHistogramWithSnapshot = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot")
TxnRegionsNumHistogramPrewrite = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite")
TxnRegionsNumHistogramCommit = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit")
TxnRegionsNumHistogramCleanup = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup")
TxnRegionsNumHistogramPessimisticLock = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock")
TxnRegionsNumHistogramPessimisticRollback = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback")
TxnRegionsNumHistogramWithCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor")
TxnRegionsNumHistogramWithBatchCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor")
TxnRegionsNumHistogramWithSnapshotInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot", LblInternal)
TxnRegionsNumHistogramWithSnapshot = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot", LblGeneral)
TxnRegionsNumHistogramPrewriteInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite", LblInternal)
TxnRegionsNumHistogramPrewrite = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite", LblGeneral)
TxnRegionsNumHistogramCommitInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit", LblInternal)
TxnRegionsNumHistogramCommit = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit", LblGeneral)
TxnRegionsNumHistogramCleanupInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup", LblInternal)
TxnRegionsNumHistogramCleanup = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup", LblGeneral)
TxnRegionsNumHistogramPessimisticLockInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock", LblInternal)
TxnRegionsNumHistogramPessimisticLock = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock", LblGeneral)
TxnRegionsNumHistogramPessimisticRollbackInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback", LblInternal)
TxnRegionsNumHistogramPessimisticRollback = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback", LblGeneral)
TxnRegionsNumHistogramWithBatchCoprocessorInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor", LblInternal)
TxnRegionsNumHistogramWithBatchCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor", LblGeneral)
TxnWriteKVCountHistogramInternal = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblInternal)
TxnWriteKVCountHistogramGeneral = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblGeneral)

LockResolverCountWithBatchResolve = TiKVLockResolverCounter.WithLabelValues("batch_resolve")
LockResolverCountWithExpired = TiKVLockResolverCounter.WithLabelValues("expired")
Expand Down
11 changes: 9 additions & 2 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -696,8 +696,15 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
WriteKeys: c.mutations.Len(),
ResolveLock: util.ResolveLockDetail{},
}
metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys))
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))

isInternalReq := util.IsInternalRequest(c.txn.GetRequestSource())
if isInternalReq {
metrics.TxnWriteKVCountHistogramInternal.Observe(float64(commitDetail.WriteKeys))
metrics.TxnWriteSizeHistogramInternal.Observe(float64(commitDetail.WriteSize))
} else {
metrics.TxnWriteKVCountHistogramGeneral.Observe(float64(commitDetail.WriteKeys))
metrics.TxnWriteSizeHistogramGeneral.Observe(float64(commitDetail.WriteSize))
}
c.hasNoNeedCommitKeys = checkCnt > 0
c.lockTTL = txnLockTTL(txn.startTime, size)
c.priority = txn.priority.ToPB()
Expand Down
13 changes: 8 additions & 5 deletions txnkv/transaction/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,22 @@ import (
"go.uber.org/zap"
)

type actionCleanup struct{}
type actionCleanup struct{ isInternal bool }

var _ twoPhaseCommitAction = actionCleanup{}

func (actionCleanup) String() string {
func (action actionCleanup) String() string {
return "cleanup"
}

func (actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer {
func (action actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramCleanupInternal
}
return metrics.TxnRegionsNumHistogramCleanup
}

func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
func (action actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{
Keys: batch.mutations.GetKeys(),
StartVersion: c.startTS,
Expand Down Expand Up @@ -99,5 +102,5 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer
}

func (c *twoPhaseCommitter) cleanupMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
return c.doActionOnMutations(bo, actionCleanup{}, mutations)
return c.doActionOnMutations(bo, actionCleanup{isInternal: c.txn.isInternal()}, mutations)
}
18 changes: 12 additions & 6 deletions txnkv/transaction/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,25 @@ import (
"go.uber.org/zap"
)

type actionCommit struct{ retry bool }
type actionCommit struct {
retry bool
isInternal bool
}

var _ twoPhaseCommitAction = actionCommit{}

func (actionCommit) String() string {
func (action actionCommit) String() string {
return "commit"
}

func (actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer {
func (action actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramCommitInternal
}
return metrics.TxnRegionsNumHistogramCommit
}

func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
keys := batch.mutations.GetKeys()
req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &kvrpcpb.CommitRequest{
StartVersion: c.startTS,
Expand Down Expand Up @@ -132,7 +138,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
if same {
continue
}
return c.doActionOnMutations(bo, actionCommit{true}, batch.mutations)
return c.doActionOnMutations(bo, actionCommit{true, action.isInternal}, batch.mutations)
}

if resp.Resp == nil {
Expand Down Expand Up @@ -220,5 +226,5 @@ func (c *twoPhaseCommitter) commitMutations(bo *retry.Backoffer, mutations Commi
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}

return c.doActionOnMutations(bo, actionCommit{}, mutations)
return c.doActionOnMutations(bo, actionCommit{isInternal: c.txn.isInternal()}, mutations)
}
Loading

0 comments on commit abdbd94

Please sign in to comment.