Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics: seperate metrics with source scope for txn command #723

Merged
merged 3 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,16 +487,22 @@ var (
)

type sendReqHistCacheKey struct {
tp tikvrpc.CmdType
id uint64
staleRad bool
tp tikvrpc.CmdType
id uint64
staleRad bool
isInternal bool
}

type sendReqCounterCacheKey struct {
sendReqHistCacheKey
requestSource string
}

type rpcNetLatencyCacheKey struct {
storeID uint64
isInternal bool
}

type sendReqCounterCacheValue struct {
counter prometheus.Counter
timeCounter prometheus.Counter
Expand All @@ -506,11 +512,13 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
elapsed := time.Since(start)
secs := elapsed.Seconds()
storeID := req.Context.GetPeer().GetStoreId()
isInternal := util.IsInternalRequest(req.GetRequestSource())

histKey := sendReqHistCacheKey{
req.Type,
storeID,
staleRead,
isInternal,
}
counterKey := sendReqCounterCacheKey{
histKey,
Expand All @@ -525,7 +533,8 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead))
hist = metrics.TiKVSendReqHistogram.WithLabelValues(reqType, storeIDStr,
strconv.FormatBool(staleRead), strconv.FormatBool(isInternal))
sendReqHistCache.Store(histKey, hist)
}
counter, ok := sendReqCounterCache.Load(counterKey)
Expand All @@ -534,8 +543,10 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr
storeIDStr = strconv.FormatUint(storeID, 10)
}
counter = sendReqCounterCacheValue{
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead), counterKey.requestSource),
metrics.TiKVSendReqCounter.WithLabelValues(reqType, storeIDStr, strconv.FormatBool(staleRead),
counterKey.requestSource, strconv.FormatBool(isInternal)),
metrics.TiKVSendReqTimeCounter.WithLabelValues(reqType, storeIDStr,
strconv.FormatBool(staleRead), counterKey.requestSource, strconv.FormatBool(isInternal)),
}
sendReqCounterCache.Store(counterKey, counter)
}
Expand All @@ -546,13 +557,17 @@ func (c *RPCClient) updateTiKVSendReqHistogram(req *tikvrpc.Request, resp *tikvr

if execDetail := resp.GetExecDetailsV2(); execDetail != nil &&
execDetail.TimeDetail != nil && execDetail.TimeDetail.TotalRpcWallTimeNs > 0 {
latHist, ok := rpcNetLatencyHistCache.Load(storeID)
cacheKey := rpcNetLatencyCacheKey{
storeID,
isInternal,
}
latHist, ok := rpcNetLatencyHistCache.Load(cacheKey)
if !ok {
if len(storeIDStr) == 0 {
storeIDStr = strconv.FormatUint(storeID, 10)
}
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr)
rpcNetLatencyHistCache.Store(storeID, latHist)
latHist = metrics.TiKVRPCNetLatencyHistogram.WithLabelValues(storeIDStr, strconv.FormatBool(isInternal))
rpcNetLatencyHistCache.Store(cacheKey, latHist)
}
latency := elapsed - time.Duration(execDetail.TimeDetail.TotalRpcWallTimeNs)*time.Nanosecond
latHist.(prometheus.Observer).Observe(latency.Seconds())
Expand Down
6 changes: 5 additions & 1 deletion internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,7 +1571,11 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
}

// NOTE: Please add the region error handler in the same order of errorpb.Error.
metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc()
isInternal := false
if req != nil {
isInternal = util.IsInternalRequest(req.GetRequestSource())
}
metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr), strconv.FormatBool(isInternal)).Inc()

if notLeader := regionErr.GetNotLeader(); notLeader != nil {
// Retry if error is `NotLeader`.
Expand Down
26 changes: 13 additions & 13 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ var (
TiKVCoprocessorHistogram *prometheus.HistogramVec
TiKVLockResolverCounter *prometheus.CounterVec
TiKVRegionErrorCounter *prometheus.CounterVec
TiKVTxnWriteKVCountHistogram prometheus.Histogram
TiKVTxnWriteSizeHistogram prometheus.Histogram
TiKVTxnWriteKVCountHistogram *prometheus.HistogramVec
TiKVTxnWriteSizeHistogram *prometheus.HistogramVec
TiKVRawkvCmdHistogram *prometheus.HistogramVec
TiKVRawkvSizeHistogram *prometheus.HistogramVec
TiKVTxnRegionsNumHistogram *prometheus.HistogramVec
Expand Down Expand Up @@ -150,23 +150,23 @@ func initMetrics(namespace, subsystem string) {
Name: "request_seconds",
Help: "Bucketed histogram of sending request duration.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days
}, []string{LblType, LblStore, LblStaleRead})
}, []string{LblType, LblStore, LblStaleRead, LblScope})

TiKVSendReqCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_counter",
Help: "Counter of sending request with multi dimensions.",
}, []string{LblType, LblStore, LblStaleRead, LblSource})
}, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope})

TiKVSendReqTimeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "request_time_counter",
Help: "Counter of request time with multi dimensions.",
}, []string{LblType, LblStore, LblStaleRead, LblSource})
}, []string{LblType, LblStore, LblStaleRead, LblSource, LblScope})

TiKVRPCNetLatencyHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -175,7 +175,7 @@ func initMetrics(namespace, subsystem string) {
Name: "rpc_net_latency_seconds",
Help: "Bucketed histogram of time difference between TiDB and TiKV.",
Buckets: prometheus.ExponentialBuckets(5e-5, 2, 18), // 50us ~ 6.5s
}, []string{LblStore})
}, []string{LblStore, LblScope})

TiKVCoprocessorHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -184,7 +184,7 @@ func initMetrics(namespace, subsystem string) {
Name: "cop_duration_seconds",
Help: "Run duration of a single coprocessor task, includes backoff time.",
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 29), // 0.5ms ~ 1.5days
}, []string{LblStore, LblStaleRead})
}, []string{LblStore, LblStaleRead, LblScope})

TiKVLockResolverCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand All @@ -200,25 +200,25 @@ func initMetrics(namespace, subsystem string) {
Subsystem: subsystem,
Name: "region_err_total",
Help: "Counter of region errors.",
}, []string{LblType})
}, []string{LblType, LblScope})

TiKVTxnWriteKVCountHistogram = prometheus.NewHistogram(
TiKVTxnWriteKVCountHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "txn_write_kv_num",
Help: "Count of kv pairs to write in a transaction.",
Buckets: prometheus.ExponentialBuckets(1, 4, 17), // 1 ~ 4G
})
}, []string{LblScope})

TiKVTxnWriteSizeHistogram = prometheus.NewHistogram(
TiKVTxnWriteSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "txn_write_size_bytes",
Help: "Size of kv pairs to write in a transaction.",
Buckets: prometheus.ExponentialBuckets(16, 4, 17), // 16Bytes ~ 64GB
})
}, []string{LblScope})

TiKVRawkvCmdHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand All @@ -245,7 +245,7 @@ func initMetrics(namespace, subsystem string) {
Name: "txn_regions_num",
Help: "Number of regions in a transaction.",
Buckets: prometheus.ExponentialBuckets(1, 2, 25), // 1 ~ 16M
}, []string{LblType})
}, []string{LblType, LblScope})

TiKVLoadSafepointCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Expand Down
57 changes: 41 additions & 16 deletions metrics/shortcuts.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,27 @@ var (
BackoffHistogramIsWitness prometheus.Observer
BackoffHistogramEmpty prometheus.Observer

TxnRegionsNumHistogramWithSnapshot prometheus.Observer
TxnRegionsNumHistogramPrewrite prometheus.Observer
TxnRegionsNumHistogramCommit prometheus.Observer
TxnRegionsNumHistogramCleanup prometheus.Observer
TxnRegionsNumHistogramPessimisticLock prometheus.Observer
TxnRegionsNumHistogramPessimisticRollback prometheus.Observer
TxnRegionsNumHistogramWithCoprocessor prometheus.Observer
TxnRegionsNumHistogramWithBatchCoprocessor prometheus.Observer
TxnRegionsNumHistogramWithSnapshotInternal prometheus.Observer
TxnRegionsNumHistogramWithSnapshot prometheus.Observer
TxnRegionsNumHistogramPrewriteInternal prometheus.Observer
TxnRegionsNumHistogramPrewrite prometheus.Observer
TxnRegionsNumHistogramCommitInternal prometheus.Observer
TxnRegionsNumHistogramCommit prometheus.Observer
TxnRegionsNumHistogramCleanupInternal prometheus.Observer
TxnRegionsNumHistogramCleanup prometheus.Observer
TxnRegionsNumHistogramPessimisticLockInternal prometheus.Observer
TxnRegionsNumHistogramPessimisticLock prometheus.Observer
TxnRegionsNumHistogramPessimisticRollbackInternal prometheus.Observer
TxnRegionsNumHistogramPessimisticRollback prometheus.Observer
TxnRegionsNumHistogramWithCoprocessorInternal prometheus.Observer
TxnRegionsNumHistogramWithCoprocessor prometheus.Observer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TxnRegionsNumHistogramWithCoprocessorInternal is missing

TxnRegionsNumHistogramWithBatchCoprocessorInternal prometheus.Observer
TxnRegionsNumHistogramWithBatchCoprocessor prometheus.Observer

TxnWriteKVCountHistogramInternal prometheus.Observer
TxnWriteKVCountHistogramGeneral prometheus.Observer
TxnWriteSizeHistogramInternal prometheus.Observer
TxnWriteSizeHistogramGeneral prometheus.Observer

LockResolverCountWithBatchResolve prometheus.Counter
LockResolverCountWithExpired prometheus.Counter
Expand Down Expand Up @@ -185,14 +198,26 @@ func initShortcuts() {
BackoffHistogramIsWitness = TiKVBackoffHistogram.WithLabelValues("isWitness")
BackoffHistogramEmpty = TiKVBackoffHistogram.WithLabelValues("")

TxnRegionsNumHistogramWithSnapshot = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot")
TxnRegionsNumHistogramPrewrite = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite")
TxnRegionsNumHistogramCommit = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit")
TxnRegionsNumHistogramCleanup = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup")
TxnRegionsNumHistogramPessimisticLock = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock")
TxnRegionsNumHistogramPessimisticRollback = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback")
TxnRegionsNumHistogramWithCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor")
TxnRegionsNumHistogramWithBatchCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor")
TxnRegionsNumHistogramWithSnapshotInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot", LblInternal)
TxnRegionsNumHistogramWithSnapshot = TiKVTxnRegionsNumHistogram.WithLabelValues("snapshot", LblGeneral)
TxnRegionsNumHistogramPrewriteInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite", LblInternal)
TxnRegionsNumHistogramPrewrite = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_prewrite", LblGeneral)
TxnRegionsNumHistogramCommitInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit", LblInternal)
TxnRegionsNumHistogramCommit = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_commit", LblGeneral)
TxnRegionsNumHistogramCleanupInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup", LblInternal)
TxnRegionsNumHistogramCleanup = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_cleanup", LblGeneral)
TxnRegionsNumHistogramPessimisticLockInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock", LblInternal)
TxnRegionsNumHistogramPessimisticLock = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_lock", LblGeneral)
TxnRegionsNumHistogramPessimisticRollbackInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback", LblInternal)
TxnRegionsNumHistogramPessimisticRollback = TiKVTxnRegionsNumHistogram.WithLabelValues("2pc_pessimistic_rollback", LblGeneral)
TxnRegionsNumHistogramWithCoprocessorInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor", LblInternal)
TxnRegionsNumHistogramWithCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor", LblGeneral)
TxnRegionsNumHistogramWithBatchCoprocessorInternal = TiKVTxnRegionsNumHistogram.WithLabelValues("coprocessor", LblInternal)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type "coprocessor" is wrong, and the init of TxnRegionsNumHistogramWithCoprocessor and TxnRegionsNumHistogramWithCoprocessorInternal is missing.

TxnRegionsNumHistogramWithBatchCoprocessor = TiKVTxnRegionsNumHistogram.WithLabelValues("batch_coprocessor", LblGeneral)
TxnWriteKVCountHistogramInternal = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblInternal)
TxnWriteKVCountHistogramGeneral = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblGeneral)
Copy link
Contributor

@you06 you06 Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TxnWriteKVCountHistogramInternal and TxnWriteKVCountHistogramGeneral are not initialized.

Unluckily there is no codegen by macro for such works :(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing TxnWriteSizeHistogramInternal and TxnWriteSizeHistogramGeneral are added.

TxnWriteSizeHistogramInternal = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblInternal)
TxnWriteSizeHistogramGeneral = TiKVTxnWriteKVCountHistogram.WithLabelValues(LblGeneral)

LockResolverCountWithBatchResolve = TiKVLockResolverCounter.WithLabelValues("batch_resolve")
LockResolverCountWithExpired = TiKVLockResolverCounter.WithLabelValues("expired")
Expand Down
15 changes: 13 additions & 2 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ type twoPhaseCommitter struct {

// assertion error happened when initializing mutations, could be false positive if pessimistic lock is lost
stashedAssertionError error

// isInternal means it's related to an internal transaction. It's only used by `asyncPessimisticRollback` as the
// committer may contain a nil `txn` pointer.
isInternal bool
}

type memBufferMutations struct {
Expand Down Expand Up @@ -696,8 +700,15 @@ func (c *twoPhaseCommitter) initKeysAndMutations(ctx context.Context) error {
WriteKeys: c.mutations.Len(),
ResolveLock: util.ResolveLockDetail{},
}
metrics.TiKVTxnWriteKVCountHistogram.Observe(float64(commitDetail.WriteKeys))
metrics.TiKVTxnWriteSizeHistogram.Observe(float64(commitDetail.WriteSize))

isInternalReq := util.IsInternalRequest(c.txn.GetRequestSource())
Copy link
Contributor

@you06 you06 Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
isInternalReq := util.IsInternalRequest(c.txn.GetRequestSource())
isInternalReq := c.txn.isInternal()

if isInternalReq {
metrics.TxnWriteKVCountHistogramInternal.Observe(float64(commitDetail.WriteKeys))
metrics.TxnWriteSizeHistogramInternal.Observe(float64(commitDetail.WriteSize))
} else {
metrics.TxnWriteKVCountHistogramGeneral.Observe(float64(commitDetail.WriteKeys))
metrics.TxnWriteSizeHistogramGeneral.Observe(float64(commitDetail.WriteSize))
}
c.hasNoNeedCommitKeys = checkCnt > 0
c.lockTTL = txnLockTTL(txn.startTime, size)
c.priority = txn.priority.ToPB()
Expand Down
13 changes: 8 additions & 5 deletions txnkv/transaction/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,22 @@ import (
"go.uber.org/zap"
)

type actionCleanup struct{}
type actionCleanup struct{ isInternal bool }

var _ twoPhaseCommitAction = actionCleanup{}

func (actionCleanup) String() string {
func (action actionCleanup) String() string {
return "cleanup"
}

func (actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer {
func (action actionCleanup) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramCleanupInternal
}
return metrics.TxnRegionsNumHistogramCleanup
}

func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
func (action actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
req := tikvrpc.NewRequest(tikvrpc.CmdBatchRollback, &kvrpcpb.BatchRollbackRequest{
Keys: batch.mutations.GetKeys(),
StartVersion: c.startTS,
Expand Down Expand Up @@ -99,5 +102,5 @@ func (actionCleanup) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer
}

func (c *twoPhaseCommitter) cleanupMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
return c.doActionOnMutations(bo, actionCleanup{}, mutations)
return c.doActionOnMutations(bo, actionCleanup{isInternal: c.txn.isInternal()}, mutations)
}
18 changes: 12 additions & 6 deletions txnkv/transaction/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,25 @@ import (
"go.uber.org/zap"
)

type actionCommit struct{ retry bool }
type actionCommit struct {
retry bool
isInternal bool
}

var _ twoPhaseCommitAction = actionCommit{}

func (actionCommit) String() string {
func (action actionCommit) String() string {
return "commit"
}

func (actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer {
func (action actionCommit) tiKVTxnRegionsNumHistogram() prometheus.Observer {
if action.isInternal {
return metrics.TxnRegionsNumHistogramCommitInternal
}
return metrics.TxnRegionsNumHistogramCommit
}

func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
func (action actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
keys := batch.mutations.GetKeys()
req := tikvrpc.NewRequest(tikvrpc.CmdCommit, &kvrpcpb.CommitRequest{
StartVersion: c.startTS,
Expand Down Expand Up @@ -132,7 +138,7 @@ func (actionCommit) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer,
if same {
continue
}
return c.doActionOnMutations(bo, actionCommit{true}, batch.mutations)
return c.doActionOnMutations(bo, actionCommit{true, action.isInternal}, batch.mutations)
}

if resp.Resp == nil {
Expand Down Expand Up @@ -220,5 +226,5 @@ func (c *twoPhaseCommitter) commitMutations(bo *retry.Backoffer, mutations Commi
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}

return c.doActionOnMutations(bo, actionCommit{}, mutations)
return c.doActionOnMutations(bo, actionCommit{isInternal: c.txn.isInternal()}, mutations)
}
Loading