Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[close #231] TiKV-BR support v5.x.x #282

Merged
merged 25 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/ci-br.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,16 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
tikv_version: [nightly]
tikv_version: [v6.1.2, nightly]
api_version: [1, 2]
with_tls: [false, true]
include:
- tikv_version: v5.0.6
api_version: 1
with_tls: true
- tikv_version: v5.4.3
api_version: 1
with_tls: true
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
Expand All @@ -83,6 +90,7 @@ jobs:
# start tikv
# Output the api version
echo "API_VERSION=${{ matrix.api_version }}" >> $GITHUB_ENV
echo "CLUSTER_VERSION=${{ matrix.tikv_version }}" >> $GITHUB_ENV
# TODO: different deploy method for tls enabled and disabled, may use same one.
if ${{ matrix.with_tls }}; then
echo "TLS_PD_PORT=2579" >> $GITHUB_ENV
Expand Down
4 changes: 3 additions & 1 deletion br/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
default: release
all: check test build clean

SHELL := /usr/bin/env bash
SHELL := $(shell /usr/bin/which bash)

# golang
GO := GO111MODULE=on go
Expand All @@ -31,6 +31,7 @@ TEST_PARALLEL ?= 8
PD_ADDR ?= 127.0.0.1:2379
BR_LOCAL_STORE ?= /tmp/backup_restore_test
API_VERSION ?= 1
CLUSTER_VERSION?= nightly
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: add a space before ?= and align other lines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

TLS_CA ?=
TLS_CERT ?=
TLS_KEY ?=
Expand Down Expand Up @@ -72,6 +73,7 @@ test/integration: build/br-test build/rawkv-integration-test
--br='${TEST_BIN_PATH}' \
--br-storage=${BR_LOCAL_STORE} \
--api-version=${API_VERSION} \
--cluster-version=${CLUSTER_VERSION} \
--coverage=$(COVERAGE_DIR) \
--ca=$(TLS_CA) \
--cert=$(TLS_CERT) \
Expand Down
14 changes: 13 additions & 1 deletion br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@ import (
"reflect"
"strings"

"github.com/coreos/go-semver/semver"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/tikv/migration/br/pkg/checksum"
"github.com/tikv/migration/br/pkg/conn"
"github.com/tikv/migration/br/pkg/feature"
"github.com/tikv/migration/br/pkg/metautil"
"github.com/tikv/migration/br/pkg/pdutil"
"github.com/tikv/migration/br/pkg/task"
"github.com/tikv/migration/br/pkg/utils"
"github.com/tikv/migration/br/pkg/version/build"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

// NewDebugCommand return a debug subcommand.
Expand Down Expand Up @@ -255,6 +258,15 @@ func runRawChecksumCommand(command *cobra.Command, cmdName string) error {
if err != nil {
return errors.Trace(err)
}
clusterVersion, err := pdCtrl.GetClusterVersion(ctx)
if err != nil {
return errors.Trace(err)
}
featureGate := feature.NewFeatureGate(semver.New(clusterVersion))
if !featureGate.IsEnabled(feature.Checksum) {
log.Error("TiKV cluster does not support checksum.", zap.String("version", clusterVersion))
return errors.Errorf("TiKV cluster %s does not support checksum", clusterVersion)
}
storageAPIVersion, err := conn.GetTiKVApiVersion(ctx, pdCtrl.GetPDClient(), tlsConf)
if err != nil {
return errors.Trace(err)
Expand All @@ -264,7 +276,7 @@ func runRawChecksumCommand(command *cobra.Command, cmdName string) error {
return errors.Trace(err)
}
fileChecksum, keyRanges := task.CalcChecksumAndRangeFromBackupMeta(ctx, backupMeta, storageAPIVersion)
if !task.CheckBackupAPIVersion(storageAPIVersion, backupMeta.ApiVersion) {
if !task.CheckBackupAPIVersion(featureGate, storageAPIVersion, backupMeta.ApiVersion) {
return errors.Errorf("Unsupported api version, storage:%s, backup meta:%s",
storageAPIVersion.String(), backupMeta.ApiVersion.String())
}
Expand Down
70 changes: 40 additions & 30 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,46 @@ func (push *pushDown) pushBackup(
wg.Add(1)
go func() {
defer wg.Done()
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
resp := new(backuppb.BackupResponse)
resp.Error = &backuppb.Error{
Msg: msg,
}
push.respCh <- responseAndStore{
Resp: resp,
Store: store,
}
})
failpoint.Inject("tikv-rw-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
resp := new(backuppb.BackupResponse)
resp.Error = &backuppb.Error{
Msg: msg,
}
push.respCh <- responseAndStore{
Resp: resp,
Store: store,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have we verified that the failpoint is effective ? This failpoint should cause an incomplete range and enter procedure fineGrainedBackup. I remember that there was a message "start fine grained backup" in log printed here, but I can't find it in CI logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems be an issue. Now it works, see following:
image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will change the original behavior, as all regions will fail, other than only one of them. In real scenarios, there should be only a small number of regions will fail.

I think we can change FAILPOINTS of rawkv test from return("region error") to 1*return("region error") to control how many times the failpoint takes effective.
Refer to here (this change add a "one region fail" case in addition to "all regions").

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks.

msg := val.(string)
resp := new(backuppb.BackupResponse)
logutil.CL(ctx).Debug("failpoint tikv-region-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
push.respCh <- responseAndStore{
Resp: resp,
Store: store,
}
})
err := SendBackup(
lctx, storeID, client, req,
func(resp *backuppb.BackupResponse) error {
Expand Down Expand Up @@ -117,7 +157,6 @@ func (push *pushDown) pushBackup(
close(push.respCh)
}()

regionErrorIngestedOnce := false
for {
select {
case respAndStore, ok := <-push.respCh:
Expand All @@ -127,35 +166,6 @@ func (push *pushDown) pushBackup(
// Finished.
return res, nil
}
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("tikv-rw-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
if !regionErrorIngestedOnce {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-regionh-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
// Msg: msg,
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
}
regionErrorIngestedOnce = true
})
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.Put(resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles())
Expand Down
39 changes: 39 additions & 0 deletions br/pkg/feature/feature_gate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package feature

import "github.com/coreos/go-semver/semver"

type Feature int

const (
APIVersionConversion Feature = iota
Checksum
BackupTs
SplitRegion
)

var (
minAPIVersionConversionVersion = semver.New("6.1.0")
minChecksumVersion = semver.New("6.1.1")
minBackupTsVersion = semver.New("6.2.0")
minSplitRegionVersion = semver.New("5.2.0")
)

type Gate struct {
features map[Feature]*semver.Version
pdVersion *semver.Version
}

func NewFeatureGate(pdVersion *semver.Version) *Gate {
featureGate := new(Gate)
featureGate.features = make(map[Feature]*semver.Version)
featureGate.features[APIVersionConversion] = minAPIVersionConversionVersion
featureGate.features[Checksum] = minChecksumVersion
featureGate.features[BackupTs] = minBackupTsVersion
featureGate.features[SplitRegion] = minSplitRegionVersion
featureGate.pdVersion = pdVersion
return featureGate
}

func (f *Gate) IsEnabled(feature Feature) bool {
return f.pdVersion.Compare(*f.features[feature]) >= 0
}
33 changes: 33 additions & 0 deletions br/pkg/feature/feature_gate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package feature

import (
"testing"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/require"
)

func TestFeatureGate(t *testing.T) {
gate := NewFeatureGate(semver.New("6.0.0"))
require.False(t, gate.IsEnabled(APIVersionConversion))
require.False(t, gate.IsEnabled(Checksum))
require.False(t, gate.IsEnabled(BackupTs))
require.True(t, gate.IsEnabled(SplitRegion))

gate = NewFeatureGate(semver.New("6.1.0"))
require.True(t, gate.IsEnabled(APIVersionConversion))
require.False(t, gate.IsEnabled(Checksum))
require.False(t, gate.IsEnabled(BackupTs))

gate = NewFeatureGate(semver.New("6.1.1"))
require.True(t, gate.IsEnabled(Checksum))

gate = NewFeatureGate(semver.New("6.2.0"))
require.True(t, gate.IsEnabled(BackupTs))

gate = NewFeatureGate(semver.New("5.1.0"))
require.False(t, gate.IsEnabled(SplitRegion))

gate = NewFeatureGate(semver.New("5.2.0"))
require.True(t, gate.IsEnabled(SplitRegion))
}
6 changes: 5 additions & 1 deletion br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ func (p *PdController) GetClusterVersion(ctx context.Context) (string, error) {
return p.getClusterVersionWith(ctx, pdRequest)
}

func FormatVersionString(v []byte) string {
return strings.ReplaceAll(strings.ReplaceAll(string(v), "\"", ""), "\n", "")
}

func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequest) (string, error) {
var err error
for _, addr := range p.addrs {
Expand All @@ -308,7 +312,7 @@ func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequ
err = e
continue
}
return string(v), nil
return FormatVersionString(v), nil
}

return "", errors.Trace(err)
Expand Down
8 changes: 7 additions & 1 deletion br/pkg/pdutil/pd_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func TestScheduler(t *testing.T) {
require.Equal(t, scheduler, schedulers[0])
}

func TestFormatVersionString(t *testing.T) {
require.Equal(t, "5.0.0", FormatVersionString([]byte("5.0.0")))
require.Equal(t, "5.0.0", FormatVersionString([]byte("\"5.0.0\"")))
require.Equal(t, "5.0.0", FormatVersionString([]byte("\"5.0.0\"\n")))
}

func TestGetClusterVersion(t *testing.T) {
pdController := &PdController{addrs: []string{"", ""}} // two endpoints
counter := 0
Expand All @@ -87,7 +93,7 @@ func TestGetClusterVersion(t *testing.T) {
if counter <= 1 {
return nil, errors.New("mock error")
}
return []byte(`test`), nil
return []byte("\"test\"\n"), nil
}

ctx := context.Background()
Expand Down
27 changes: 17 additions & 10 deletions br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package task
import (
"context"

"github.com/coreos/go-semver/semver"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/tikv/client-go/v2/rawkv"
"github.com/tikv/migration/br/pkg/backup"
"github.com/tikv/migration/br/pkg/checksum"
"github.com/tikv/migration/br/pkg/feature"
"github.com/tikv/migration/br/pkg/glue"
"github.com/tikv/migration/br/pkg/metautil"
"github.com/tikv/migration/br/pkg/rtree"
Expand Down Expand Up @@ -103,15 +105,26 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
if err != nil {
return errors.Trace(err)
}
clusterVersion, err := mgr.GetClusterVersion(ctx)
if err != nil {
return errors.Trace(err)
}
brVersion := g.GetVersion()

curAPIVersion := client.GetCurAPIVersion()
cfg.adjustBackupRange(curAPIVersion)
if len(cfg.DstAPIVersion) == 0 { // if no DstAPIVersion is specified, backup to same api-version.
cfg.DstAPIVersion = curAPIVersion.String()
}
dstAPIVersion := kvrpcpb.APIVersion(kvrpcpb.APIVersion_value[cfg.DstAPIVersion])
if !CheckBackupAPIVersion(curAPIVersion, dstAPIVersion) {
return errors.Errorf("Unsupported backup api version, cur:%s, dst:%s",
curAPIVersion.String(), cfg.DstAPIVersion)
featureGate := feature.NewFeatureGate(semver.New(clusterVersion))
if !CheckBackupAPIVersion(featureGate, curAPIVersion, dstAPIVersion) {
return errors.Errorf("Unsupported backup api version in current cluster, cur:%s, dst:%s, cluster version:%s",
curAPIVersion.String(), cfg.DstAPIVersion, clusterVersion)
}
if cfg.Checksum && !featureGate.IsEnabled(feature.Checksum) {
log.Error("TiKV cluster does not support checksum, please disable checksum", zap.String("version", clusterVersion))
return errors.Errorf("Current tikv cluster version %s does not support checksum, please disable checksum", clusterVersion)
}
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
Expand All @@ -121,7 +134,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
return errors.Trace(err)
}
client.SetGCTTL(cfg.GCTTL)
if curAPIVersion == kvrpcpb.APIVersion_V2 {
if featureGate.IsEnabled(feature.BackupTs) && curAPIVersion == kvrpcpb.APIVersion_V2 {
// set safepoint to avoid the logical deletion data to gc.
backupTs, err := client.UpdateBRGCSafePoint(ctx, cfg.SafeInterval)
if err != nil {
Expand All @@ -148,12 +161,6 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
}
}

brVersion := g.GetVersion()
clusterVersion, err := mgr.GetClusterVersion(ctx)
if err != nil {
return errors.Trace(err)
}

// The number of regions need to backup
approximateRegions, err := mgr.GetRegionCount(ctx, backupRange.StartKey, backupRange.EndKey)
if err != nil {
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/spf13/pflag"
"github.com/tikv/migration/br/pkg/conn"
berrors "github.com/tikv/migration/br/pkg/errors"
"github.com/tikv/migration/br/pkg/feature"
"github.com/tikv/migration/br/pkg/glue"
"github.com/tikv/migration/br/pkg/metautil"
"github.com/tikv/migration/br/pkg/storage"
Expand Down Expand Up @@ -358,7 +359,8 @@ func gcsObjectNotFound(err error) bool {
}

// CheckBackupAPIVersion return false if backup api version is not supported.
func CheckBackupAPIVersion(storageAPIVersion, dstAPIVersion kvrpcpb.APIVersion) bool {
func CheckBackupAPIVersion(gate *feature.Gate, storageAPIVersion, dstAPIVersion kvrpcpb.APIVersion) bool {
// only support apiv1/v1ttl->apiv2 if apiversions are not the same.
return storageAPIVersion == dstAPIVersion || dstAPIVersion == kvrpcpb.APIVersion_V2
return storageAPIVersion == dstAPIVersion ||
(gate.IsEnabled(feature.APIVersionConversion) && dstAPIVersion == kvrpcpb.APIVersion_V2)
}
Loading