Skip to content

Commit

Permalink
Merge 467c64e into fc55212
Browse files Browse the repository at this point in the history
  • Loading branch information
haojinming authored Nov 4, 2022
2 parents fc55212 + 467c64e commit 45b6f8d
Show file tree
Hide file tree
Showing 16 changed files with 358 additions and 137 deletions.
10 changes: 9 additions & 1 deletion .github/workflows/ci-br.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,16 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
tikv_version: [nightly]
tikv_version: [v6.1.2, nightly]
api_version: [1, 2]
with_tls: [false, true]
include:
- tikv_version: v5.0.6
api_version: 1
with_tls: true
- tikv_version: v5.4.3
api_version: 1
with_tls: true
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
Expand All @@ -83,6 +90,7 @@ jobs:
# start tikv
# Output the api version
echo "API_VERSION=${{ matrix.api_version }}" >> $GITHUB_ENV
echo "CLUSTER_VERSION=${{ matrix.tikv_version }}" >> $GITHUB_ENV
# TODO: different deploy method for tls enabled and disabled, may use same one.
if ${{ matrix.with_tls }}; then
echo "TLS_PD_PORT=2579" >> $GITHUB_ENV
Expand Down
24 changes: 13 additions & 11 deletions br/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,25 @@
default: release
all: check test build clean

SHELL := /usr/bin/env bash
SHELL := $(shell /usr/bin/which bash)

# golang
GO := GO111MODULE=on go
PACKAGES := go list ./...
DIRECTORIES := $(PACKAGES) | sed 's|github.com/tikv/migration/br/||'

# build & test
BR_BIN_PATH ?= bin/tikv-br
TEST_BIN_PATH ?= bin/tikv-br.test
COVERAGE_DIR ?= build
TEST_PARALLEL ?= 8
PD_ADDR ?= 127.0.0.1:2379
BR_LOCAL_STORE ?= /tmp/backup_restore_test
API_VERSION ?= 1
TLS_CA ?=
TLS_CERT ?=
TLS_KEY ?=
BR_BIN_PATH ?= bin/tikv-br
TEST_BIN_PATH ?= bin/tikv-br.test
COVERAGE_DIR ?= build
TEST_PARALLEL ?= 8
PD_ADDR ?= 127.0.0.1:2379
BR_LOCAL_STORE ?= /tmp/backup_restore_test
API_VERSION ?= 1
CLUSTER_VERSION ?= nightly
TLS_CA ?=
TLS_CERT ?=
TLS_KEY ?=

LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.ReleaseVersion=$(shell git describe --tags --dirty --always)"
LDFLAGS += -X "github.com/tikv/migration/br/pkg/version/build.BuildTS=$(shell date -u '+%Y-%m-%d %H:%M:%S')"
Expand Down Expand Up @@ -72,6 +73,7 @@ test/integration: build/br-test build/rawkv-integration-test
--br='${TEST_BIN_PATH}' \
--br-storage=${BR_LOCAL_STORE} \
--api-version=${API_VERSION} \
--cluster-version=${CLUSTER_VERSION} \
--coverage=$(COVERAGE_DIR) \
--ca=$(TLS_CA) \
--cert=$(TLS_CERT) \
Expand Down
14 changes: 13 additions & 1 deletion br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,21 @@ import (
"reflect"
"strings"

"github.com/coreos/go-semver/semver"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/tikv/migration/br/pkg/checksum"
"github.com/tikv/migration/br/pkg/conn"
"github.com/tikv/migration/br/pkg/feature"
"github.com/tikv/migration/br/pkg/metautil"
"github.com/tikv/migration/br/pkg/pdutil"
"github.com/tikv/migration/br/pkg/task"
"github.com/tikv/migration/br/pkg/utils"
"github.com/tikv/migration/br/pkg/version/build"
pd "github.com/tikv/pd/client"
"go.uber.org/zap"
)

// NewDebugCommand return a debug subcommand.
Expand Down Expand Up @@ -255,6 +258,15 @@ func runRawChecksumCommand(command *cobra.Command, cmdName string) error {
if err != nil {
return errors.Trace(err)
}
clusterVersion, err := pdCtrl.GetClusterVersion(ctx)
if err != nil {
return errors.Trace(err)
}
featureGate := feature.NewFeatureGate(semver.New(clusterVersion))
if !featureGate.IsEnabled(feature.Checksum) {
log.Error("TiKV cluster does not support checksum.", zap.String("version", clusterVersion))
return errors.Errorf("TiKV cluster %s does not support checksum", clusterVersion)
}
storageAPIVersion, err := conn.GetTiKVApiVersion(ctx, pdCtrl.GetPDClient(), tlsConf)
if err != nil {
return errors.Trace(err)
Expand All @@ -264,7 +276,7 @@ func runRawChecksumCommand(command *cobra.Command, cmdName string) error {
return errors.Trace(err)
}
fileChecksum, keyRanges := task.CalcChecksumAndRangeFromBackupMeta(ctx, backupMeta, storageAPIVersion)
if !task.CheckBackupAPIVersion(storageAPIVersion, backupMeta.ApiVersion) {
if !task.CheckBackupAPIVersion(featureGate, storageAPIVersion, backupMeta.ApiVersion) {
return errors.Errorf("Unsupported api version, storage:%s, backup meta:%s",
storageAPIVersion.String(), backupMeta.ApiVersion.String())
}
Expand Down
70 changes: 40 additions & 30 deletions br/pkg/backup/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,46 @@ func (push *pushDown) pushBackup(
wg.Add(1)
go func() {
defer wg.Done()
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
resp := new(backuppb.BackupResponse)
resp.Error = &backuppb.Error{
Msg: msg,
}
push.respCh <- responseAndStore{
Resp: resp,
Store: store,
}
})
failpoint.Inject("tikv-rw-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
resp := new(backuppb.BackupResponse)
resp.Error = &backuppb.Error{
Msg: msg,
}
push.respCh <- responseAndStore{
Resp: resp,
Store: store,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
msg := val.(string)
resp := new(backuppb.BackupResponse)
logutil.CL(ctx).Debug("failpoint tikv-region-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
push.respCh <- responseAndStore{
Resp: resp,
Store: store,
}
})
err := SendBackup(
lctx, storeID, client, req,
func(resp *backuppb.BackupResponse) error {
Expand Down Expand Up @@ -117,7 +157,6 @@ func (push *pushDown) pushBackup(
close(push.respCh)
}()

regionErrorIngestedOnce := false
for {
select {
case respAndStore, ok := <-push.respCh:
Expand All @@ -127,35 +166,6 @@ func (push *pushDown) pushBackup(
// Finished.
return res, nil
}
failpoint.Inject("backup-storage-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint backup-storage-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("tikv-rw-error", func(val failpoint.Value) {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-rw-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
Msg: msg,
}
})
failpoint.Inject("tikv-region-error", func(val failpoint.Value) {
if !regionErrorIngestedOnce {
msg := val.(string)
logutil.CL(ctx).Debug("failpoint tikv-regionh-error injected.", zap.String("msg", msg))
resp.Error = &backuppb.Error{
// Msg: msg,
Detail: &backuppb.Error_RegionError{
RegionError: &errorpb.Error{
Message: msg,
},
},
}
}
regionErrorIngestedOnce = true
})
if resp.GetError() == nil {
// None error means range has been backuped successfully.
res.Put(resp.GetStartKey(), resp.GetEndKey(), resp.GetFiles())
Expand Down
39 changes: 39 additions & 0 deletions br/pkg/feature/feature_gate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package feature

import "github.com/coreos/go-semver/semver"

type Feature int

const (
APIVersionConversion Feature = iota
Checksum
BackupTs
SplitRegion
)

var (
minAPIVersionConversionVersion = semver.New("6.1.0")
minChecksumVersion = semver.New("6.1.1")
minBackupTsVersion = semver.New("6.2.0")
minSplitRegionVersion = semver.New("5.2.0")
)

type Gate struct {
features map[Feature]*semver.Version
pdVersion *semver.Version
}

func NewFeatureGate(pdVersion *semver.Version) *Gate {
featureGate := new(Gate)
featureGate.features = make(map[Feature]*semver.Version)
featureGate.features[APIVersionConversion] = minAPIVersionConversionVersion
featureGate.features[Checksum] = minChecksumVersion
featureGate.features[BackupTs] = minBackupTsVersion
featureGate.features[SplitRegion] = minSplitRegionVersion
featureGate.pdVersion = pdVersion
return featureGate
}

func (f *Gate) IsEnabled(feature Feature) bool {
return f.pdVersion.Compare(*f.features[feature]) >= 0
}
33 changes: 33 additions & 0 deletions br/pkg/feature/feature_gate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package feature

import (
"testing"

"github.com/coreos/go-semver/semver"
"github.com/stretchr/testify/require"
)

func TestFeatureGate(t *testing.T) {
gate := NewFeatureGate(semver.New("6.0.0"))
require.False(t, gate.IsEnabled(APIVersionConversion))
require.False(t, gate.IsEnabled(Checksum))
require.False(t, gate.IsEnabled(BackupTs))
require.True(t, gate.IsEnabled(SplitRegion))

gate = NewFeatureGate(semver.New("6.1.0"))
require.True(t, gate.IsEnabled(APIVersionConversion))
require.False(t, gate.IsEnabled(Checksum))
require.False(t, gate.IsEnabled(BackupTs))

gate = NewFeatureGate(semver.New("6.1.1"))
require.True(t, gate.IsEnabled(Checksum))

gate = NewFeatureGate(semver.New("6.2.0"))
require.True(t, gate.IsEnabled(BackupTs))

gate = NewFeatureGate(semver.New("5.1.0"))
require.False(t, gate.IsEnabled(SplitRegion))

gate = NewFeatureGate(semver.New("5.2.0"))
require.True(t, gate.IsEnabled(SplitRegion))
}
6 changes: 5 additions & 1 deletion br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ func (p *PdController) GetClusterVersion(ctx context.Context) (string, error) {
return p.getClusterVersionWith(ctx, pdRequest)
}

func FormatVersionString(v []byte) string {
return strings.ReplaceAll(strings.ReplaceAll(string(v), "\"", ""), "\n", "")
}

func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequest) (string, error) {
var err error
for _, addr := range p.addrs {
Expand All @@ -308,7 +312,7 @@ func (p *PdController) getClusterVersionWith(ctx context.Context, get pdHTTPRequ
err = e
continue
}
return string(v), nil
return FormatVersionString(v), nil
}

return "", errors.Trace(err)
Expand Down
8 changes: 7 additions & 1 deletion br/pkg/pdutil/pd_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ func TestScheduler(t *testing.T) {
require.Equal(t, scheduler, schedulers[0])
}

func TestFormatVersionString(t *testing.T) {
require.Equal(t, "5.0.0", FormatVersionString([]byte("5.0.0")))
require.Equal(t, "5.0.0", FormatVersionString([]byte("\"5.0.0\"")))
require.Equal(t, "5.0.0", FormatVersionString([]byte("\"5.0.0\"\n")))
}

func TestGetClusterVersion(t *testing.T) {
pdController := &PdController{addrs: []string{"", ""}} // two endpoints
counter := 0
Expand All @@ -87,7 +93,7 @@ func TestGetClusterVersion(t *testing.T) {
if counter <= 1 {
return nil, errors.New("mock error")
}
return []byte(`test`), nil
return []byte("\"test\"\n"), nil
}

ctx := context.Background()
Expand Down
27 changes: 17 additions & 10 deletions br/pkg/task/backup_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package task
import (
"context"

"github.com/coreos/go-semver/semver"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/tikv/client-go/v2/rawkv"
"github.com/tikv/migration/br/pkg/backup"
"github.com/tikv/migration/br/pkg/checksum"
"github.com/tikv/migration/br/pkg/feature"
"github.com/tikv/migration/br/pkg/glue"
"github.com/tikv/migration/br/pkg/metautil"
"github.com/tikv/migration/br/pkg/rtree"
Expand Down Expand Up @@ -103,15 +105,26 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
if err != nil {
return errors.Trace(err)
}
clusterVersion, err := mgr.GetClusterVersion(ctx)
if err != nil {
return errors.Trace(err)
}
brVersion := g.GetVersion()

curAPIVersion := client.GetCurAPIVersion()
cfg.adjustBackupRange(curAPIVersion)
if len(cfg.DstAPIVersion) == 0 { // if no DstAPIVersion is specified, backup to same api-version.
cfg.DstAPIVersion = curAPIVersion.String()
}
dstAPIVersion := kvrpcpb.APIVersion(kvrpcpb.APIVersion_value[cfg.DstAPIVersion])
if !CheckBackupAPIVersion(curAPIVersion, dstAPIVersion) {
return errors.Errorf("Unsupported backup api version, cur:%s, dst:%s",
curAPIVersion.String(), cfg.DstAPIVersion)
featureGate := feature.NewFeatureGate(semver.New(clusterVersion))
if !CheckBackupAPIVersion(featureGate, curAPIVersion, dstAPIVersion) {
return errors.Errorf("Unsupported backup api version in current cluster, cur:%s, dst:%s, cluster version:%s",
curAPIVersion.String(), cfg.DstAPIVersion, clusterVersion)
}
if cfg.Checksum && !featureGate.IsEnabled(feature.Checksum) {
log.Error("TiKV cluster does not support checksum, please disable checksum", zap.String("version", clusterVersion))
return errors.Errorf("Current tikv cluster version %s does not support checksum, please disable checksum", clusterVersion)
}
opts := storage.ExternalStorageOptions{
NoCredentials: cfg.NoCreds,
Expand All @@ -121,7 +134,7 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
return errors.Trace(err)
}
client.SetGCTTL(cfg.GCTTL)
if curAPIVersion == kvrpcpb.APIVersion_V2 {
if featureGate.IsEnabled(feature.BackupTs) && curAPIVersion == kvrpcpb.APIVersion_V2 {
// set safepoint to avoid the logical deletion data to gc.
backupTs, err := client.UpdateBRGCSafePoint(ctx, cfg.SafeInterval)
if err != nil {
Expand All @@ -148,12 +161,6 @@ func RunBackupRaw(c context.Context, g glue.Glue, cmdName string, cfg *RawKvConf
}
}

brVersion := g.GetVersion()
clusterVersion, err := mgr.GetClusterVersion(ctx)
if err != nil {
return errors.Trace(err)
}

// The number of regions need to backup
approximateRegions, err := mgr.GetRegionCount(ctx, backupRange.StartKey, backupRange.EndKey)
if err != nil {
Expand Down
Loading

0 comments on commit 45b6f8d

Please sign in to comment.