Skip to content

Commit

Permalink
[close #108] Br debug checksum cmd (#114)
Browse files Browse the repository at this point in the history
* implement  debug checksum cmd

Signed-off-by: haojinming <[email protected]>

* add api version check

Signed-off-by: haojinming <[email protected]>

add warning

Signed-off-by: haojinming <[email protected]>

revert changes

Signed-off-by: haojinming <[email protected]>

* move common functions

Signed-off-by: haojinming <[email protected]>
  • Loading branch information
haojinming committed Jun 9, 2022
1 parent 93bccd0 commit b8eef52
Show file tree
Hide file tree
Showing 11 changed files with 217 additions and 103 deletions.
62 changes: 60 additions & 2 deletions br/cmd/br/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@ package main

import (
"context"
"crypto/tls"
"path"
"reflect"
"strings"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/tikv/migration/br/pkg/checksum"
"github.com/tikv/migration/br/pkg/conn"
"github.com/tikv/migration/br/pkg/metautil"
"github.com/tikv/migration/br/pkg/pdutil"
"github.com/tikv/migration/br/pkg/task"
"github.com/tikv/migration/br/pkg/utils"
"github.com/tikv/migration/br/pkg/version/build"
pd "github.com/tikv/pd/client"
)

// NewDebugCommand return a debug subcommand.
Expand Down Expand Up @@ -51,10 +57,9 @@ func newCheckSumCommand() *cobra.Command {
Short: "check the backup data",
Args: cobra.NoArgs,
RunE: func(cmd *cobra.Command, _ []string) error {
return errors.Errorf("checksum is unsupported")
return runRawChecksumCommand(cmd, "RawChecksum")
},
}
command.Hidden = true
return command
}

Expand All @@ -77,6 +82,7 @@ func newBackupMetaValidateCommand() *cobra.Command {
},
}
command.Flags().Uint64("offset", 0, "the offset of table id alloctor")
command.Hidden = true
return command
}

Expand Down Expand Up @@ -223,3 +229,55 @@ func setPDConfigCommand() *cobra.Command {
}
return pdConfigCmd
}

func runRawChecksumCommand(command *cobra.Command, cmdName string) error {
cfg := task.Config{LogProgress: HasLogFile()}
err := cfg.ParseFromFlags(command.Flags())
if err != nil {
command.SilenceUsage = false
return errors.Trace(err)
}

ctx := GetDefaultContext()
pdAddress := strings.Join(cfg.PD, ",")
securityOption := pd.SecurityOption{}
var tlsConf *tls.Config = nil
if cfg.TLS.IsEnabled() {
securityOption.CAPath = cfg.TLS.CA
securityOption.CertPath = cfg.TLS.Cert
securityOption.KeyPath = cfg.TLS.Key
tlsConf, err = cfg.TLS.ToTLSConfig()
if err != nil {
return errors.Trace(err)
}
}
pdCtrl, err := pdutil.NewPdController(ctx, pdAddress, tlsConf, securityOption)
if err != nil {
return errors.Trace(err)
}
storageAPIVersion, err := conn.GetTiKVApiVersion(ctx, pdCtrl.GetPDClient(), tlsConf)
if err != nil {
return errors.Trace(err)
}
_, _, backupMeta, err := task.ReadBackupMeta(ctx, metautil.MetaFile, &cfg)
if err != nil {
return errors.Trace(err)
}
fileChecksum, keyRanges := task.CalcChecksumAndRangeFromBackupMeta(ctx, backupMeta, storageAPIVersion)
if !task.CheckBackupAPIVersion(storageAPIVersion, backupMeta.ApiVersion) {
return errors.Errorf("Unsupported api version, storage:%s, backup meta:%s.",
storageAPIVersion.String(), backupMeta.ApiVersion.String())
}
checksumMethod := checksum.StorageChecksumCommand
if storageAPIVersion != backupMeta.ApiVersion {
checksumMethod = checksum.StorageScanCommand
}

executor := checksum.NewExecutor(keyRanges, cfg.PD, pdCtrl.GetPDClient(), storageAPIVersion,
cfg.ChecksumConcurrency)
err = checksum.Run(ctx, cmdName, executor, checksumMethod, fileChecksum)
if err != nil {
return errors.Trace(err)
}
return nil
}
60 changes: 1 addition & 59 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ import (
"context"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"strings"
"sync"
Expand Down Expand Up @@ -66,14 +63,6 @@ const (
RegionUnit ProgressUnit = "region"
)

type StorageConfig struct {
APIVersion int `json:"api-version"`
EnableTTL bool `json:"enable-ttl"`
}
type StoreConfig struct {
Storage StorageConfig `json:"storage"`
}

// Client is a client instructs TiKV how to do a backup.
type Client struct {
mgr ClientMgr
Expand All @@ -91,7 +80,7 @@ func NewBackupClient(ctx context.Context, mgr ClientMgr, config *tls.Config) (*C
log.Info("new backup client")
pdClient := mgr.GetPDClient()
clusterID := pdClient.GetClusterID(ctx)
curAPIVer, err := GetCurrentTiKVApiVersion(ctx, mgr.GetPDClient(), config)
curAPIVer, err := conn.GetTiKVApiVersion(ctx, mgr.GetPDClient(), config)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -142,53 +131,6 @@ func (bc *Client) GetTS(ctx context.Context, duration time.Duration, ts uint64)
return backupTS, nil
}

func GetCurrentTiKVApiVersion(ctx context.Context, pdClient pd.Client, tlsConf *tls.Config) (kvrpcpb.APIVersion, error) {
allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, pdClient, conn.SkipTiFlash)
if err != nil {
return kvrpcpb.APIVersion_V1, errors.Trace(err)
} else if len(allStores) == 0 {
return kvrpcpb.APIVersion_V1, errors.New("store are empty")
}
schema := "http"
httpClient := http.Client{}
if tlsConf != nil {
httpClient = http.Client{
Transport: &http.Transport{TLSClientConfig: tlsConf},
}
schema = "https"
}
url := fmt.Sprintf("%s://%s/config", schema, allStores[0].StatusAddress)
resp, err := httpClient.Get(url)
if err != nil {
return kvrpcpb.APIVersion_V1, errors.Trace(err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return kvrpcpb.APIVersion_V1, errors.Trace(err)
}
var cfg StoreConfig
if err := json.Unmarshal(body, &cfg); err != nil {
return kvrpcpb.APIVersion_V1, errors.Trace(err)
}
var apiVersion kvrpcpb.APIVersion
if cfg.Storage.APIVersion == 0 { // in old version without apiversion config. it's APIV1.
apiVersion = kvrpcpb.APIVersion_V1
} else if cfg.Storage.APIVersion == 1 {
if cfg.Storage.EnableTTL {
apiVersion = kvrpcpb.APIVersion_V1TTL
} else {
apiVersion = kvrpcpb.APIVersion_V1
}
} else if cfg.Storage.APIVersion == 2 {
apiVersion = kvrpcpb.APIVersion_V2
} else {
errMsg := fmt.Sprintf("Invalid apiversion %d", cfg.Storage.APIVersion)
return kvrpcpb.APIVersion_V1, errors.New(errMsg)
}
return apiVersion, nil
}

func (bc *Client) GetCurAPIVersion() kvrpcpb.APIVersion {
return bc.curAPIVer
}
Expand Down
29 changes: 0 additions & 29 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
. "github.com/pingcap/check"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -226,31 +225,3 @@ func (r *testBackup) TestCheckBackupIsLocked(c *C) {
err = backup.CheckBackupStorageIsLocked(ctx, r.storage)
c.Assert(err, ErrorMatches, "backup lock file and sst file exist in(.+)")
}

func (r *testBackup) TestGetCurrentTiKVApiVersion(c *C) {
ctx := context.Background()

httpmock.Activate()
defer httpmock.DeactivateAndReset()
// Exact URL match
httpmock.RegisterResponder("GET", `=~^/config`,
httpmock.NewStringResponder(200, `{"storage":{"api-version":1, "enable-ttl":false}}`))

apiVer, err := backup.GetCurrentTiKVApiVersion(ctx, r.mockPDClient, nil)
c.Assert(err, IsNil)
c.Assert(apiVer, Equals, kvrpcpb.APIVersion_V1)

httpmock.RegisterResponder("GET", `=~^/config`,
httpmock.NewStringResponder(200, `{"storage":{"api-version":1, "enable-ttl":true}}`))

apiVer, err = backup.GetCurrentTiKVApiVersion(ctx, r.mockPDClient, nil)
c.Assert(err, IsNil)
c.Assert(apiVer, Equals, kvrpcpb.APIVersion_V1TTL)

httpmock.RegisterResponder("GET", `=~^/config`,
httpmock.NewStringResponder(200, `{"storage":{"api-version":2, "enable-ttl":true}}`))

apiVer, err = backup.GetCurrentTiKVApiVersion(ctx, r.mockPDClient, nil)
c.Assert(err, IsNil)
c.Assert(apiVer, Equals, kvrpcpb.APIVersion_V2)
}
3 changes: 3 additions & 0 deletions br/pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ func (exec *Executor) Execute(

func Run(ctx context.Context, cmdName string,
executor *Executor, method StorageChecksumMethod, expect Checksum) error {
if executor.apiVersion != kvrpcpb.APIVersion_V1 {
fmt.Printf("\033[1;37;41m%s\033[0m\n", "Warning: TiKV cluster is TTL enabled, checksum may be mismatch if some data expired during backup/restore.")
}
glue := new(gluetikv.Glue)
updateCh := glue.StartProgress(ctx, cmdName+" Checksum", int64(len(executor.keyRanges)), false)
progressCallBack := func(unit backup.ProgressUnit) {
Expand Down
60 changes: 60 additions & 0 deletions br/pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ package conn
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"sync"
"time"
Expand All @@ -14,8 +17,10 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/txnlock"
berrors "github.com/tikv/migration/br/pkg/errors"
Expand Down Expand Up @@ -416,3 +421,58 @@ func (mgr *Mgr) Close() {

mgr.PdController.Close()
}

type StorageConfig struct {
APIVersion int `json:"api-version"`
EnableTTL bool `json:"enable-ttl"`
}
type StoreConfig struct {
Storage StorageConfig `json:"storage"`
}

func GetTiKVApiVersion(ctx context.Context, pdClient pd.Client, tlsConf *tls.Config) (kvrpcpb.APIVersion, error) {
allStores, err := conn.GetAllTiKVStoresWithRetry(ctx, pdClient, conn.SkipTiFlash)
if err != nil {
return kvrpcpb.APIVersion_V1, errors.Trace(err)
} else if len(allStores) == 0 {
return kvrpcpb.APIVersion_V1, errors.New("store are empty")
}
schema := "http"
httpClient := http.Client{}
if tlsConf != nil {
httpClient = http.Client{
Transport: &http.Transport{TLSClientConfig: tlsConf},
}
schema = "https"
}
url := fmt.Sprintf("%s://%s/config", schema, allStores[0].StatusAddress)
resp, err := httpClient.Get(url)
if err != nil {
return kvrpcpb.APIVersion_V1, errors.Trace(err)
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return kvrpcpb.APIVersion_V1, errors.Trace(err)
}
var cfg StoreConfig
if err := json.Unmarshal(body, &cfg); err != nil {
return kvrpcpb.APIVersion_V1, errors.Trace(err)
}
var apiVersion kvrpcpb.APIVersion
if cfg.Storage.APIVersion == 0 { // in old version without apiversion config. it's APIV1.
apiVersion = kvrpcpb.APIVersion_V1
} else if cfg.Storage.APIVersion == 1 {
if cfg.Storage.EnableTTL {
apiVersion = kvrpcpb.APIVersion_V1TTL
} else {
apiVersion = kvrpcpb.APIVersion_V1
}
} else if cfg.Storage.APIVersion == 2 {
apiVersion = kvrpcpb.APIVersion_V2
} else {
errMsg := fmt.Sprintf("Invalid apiversion %d", cfg.Storage.APIVersion)
return kvrpcpb.APIVersion_V1, errors.New(errMsg)
}
return apiVersion, nil
}
44 changes: 44 additions & 0 deletions br/pkg/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"context"
"testing"

"github.com/jarcoal/httpmock"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/require"
"github.com/tikv/migration/br/pkg/pdutil"
Expand Down Expand Up @@ -269,3 +271,45 @@ func TestGetConnOnCanceledContext(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "context canceled")
}

type mockPDClient struct {
pd.Client
}

func (c *mockPDClient) GetAllStores(ctx context.Context, opts ...pd.GetStoreOption) ([]*metapb.Store, error) {
store := &metapb.Store{
Id: 0,
Address: "127.0.0.1",
}
return []*metapb.Store{store}, nil
}

func TestGetTiKVApiVersion(t *testing.T) {
ctx := context.Background()

mockPdClient := mockPDClient{}

httpmock.Activate()
defer httpmock.DeactivateAndReset()
// Exact URL match
httpmock.RegisterResponder("GET", `=~^/config`,
httpmock.NewStringResponder(200, `{"storage":{"api-version":1, "enable-ttl":false}}`))

apiVer, err := GetTiKVApiVersion(ctx, &mockPdClient, nil)
require.Equal(t, err, nil)
require.Equal(t, apiVer, kvrpcpb.APIVersion_V1)

httpmock.RegisterResponder("GET", `=~^/config`,
httpmock.NewStringResponder(200, `{"storage":{"api-version":1, "enable-ttl":true}}`))

apiVer, err = GetTiKVApiVersion(ctx, &mockPdClient, nil)
require.Equal(t, err, nil)
require.Equal(t, apiVer, kvrpcpb.APIVersion_V1TTL)

httpmock.RegisterResponder("GET", `=~^/config`,
httpmock.NewStringResponder(200, `{"storage":{"api-version":2, "enable-ttl":true}}`))

apiVer, err = GetTiKVApiVersion(ctx, &mockPdClient, nil)
require.Equal(t, err, nil)
require.Equal(t, apiVer, kvrpcpb.APIVersion_V2)
}
Loading

0 comments on commit b8eef52

Please sign in to comment.