Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use md5 checksum instead of crc32 #787

Merged
merged 12 commits into from
Aug 9, 2024
6 changes: 3 additions & 3 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,9 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli
wg.Add(1)
go func() {
defer wg.Done()
upstreamInfo = df.upstream.GetCountAndCrc32(ctx, tableRange)
upstreamInfo = df.upstream.GetCountAndMd5(ctx, tableRange)
}()
downstreamInfo = df.downstream.GetCountAndCrc32(ctx, tableRange)
downstreamInfo = df.downstream.GetCountAndMd5(ctx, tableRange)
wg.Wait()

if upstreamInfo.Err != nil {
Expand All @@ -601,7 +601,7 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli
if upstreamInfo.Count == downstreamInfo.Count && upstreamInfo.Checksum == downstreamInfo.Checksum {
return true, upstreamInfo.Count, downstreamInfo.Count, nil
}
log.Debug("checksum failed", zap.Any("chunk id", tableRange.ChunkRange.Index), zap.String("table", df.workSource.GetTables()[tableRange.GetTableIndex()].Table), zap.Int64("upstream chunk size", upstreamInfo.Count), zap.Int64("downstream chunk size", downstreamInfo.Count), zap.Int64("upstream checksum", upstreamInfo.Checksum), zap.Int64("downstream checksum", downstreamInfo.Checksum))
log.Debug("checksum doesn't match", zap.Any("chunk id", tableRange.ChunkRange.Index), zap.String("table", df.workSource.GetTables()[tableRange.GetTableIndex()].Table), zap.Int64("upstream chunk size", upstreamInfo.Count), zap.Int64("downstream chunk size", downstreamInfo.Count), zap.Uint64("upstream checksum", upstreamInfo.Checksum), zap.Uint64("downstream checksum", downstreamInfo.Checksum))
return false, upstreamInfo.Count, downstreamInfo.Count, nil
}

Expand Down
6 changes: 3 additions & 3 deletions sync_diff_inspector/source/mysql_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *MySQLSources) Close() {
}
}

func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo {
func (s *MySQLSources) GetCountAndMd5(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo {
beginTime := time.Now()
table := s.tableDiffs[tableRange.GetTableIndex()]
chunk := tableRange.GetChunk()
Expand All @@ -103,7 +103,7 @@ func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitte

for _, ms := range matchSources {
go func(ms *common.TableShardSource) {
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args)
count, checksum, err := utils.GetCountAndMd5Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args)
infoCh <- &ChecksumInfo{
Checksum: checksum,
Count: count,
Expand All @@ -116,7 +116,7 @@ func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitte
var (
err error
totalCount int64
totalChecksum int64
totalChecksum uint64
)

for range matchSources {
Expand Down
6 changes: 3 additions & 3 deletions sync_diff_inspector/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
)

type ChecksumInfo struct {
Checksum int64
Checksum uint64
Count int64
Err error
Cost time.Duration
Expand Down Expand Up @@ -82,8 +82,8 @@ type Source interface {
// there are many workers consume the range from the channel to compare.
GetRangeIterator(context.Context, *splitter.RangeInfo, TableAnalyzer, int) (RangeIterator, error)

// GetCountAndCrc32 gets the crc32 result and the count from given range.
GetCountAndCrc32(context.Context, *splitter.RangeInfo) *ChecksumInfo
// GetCountAndMd5 gets the md5 result and the count from given range.
GetCountAndMd5(context.Context, *splitter.RangeInfo) *ChecksumInfo

// GetCountForLackTable gets the count for tables that don't exist upstream or downstream.
GetCountForLackTable(context.Context, *splitter.RangeInfo) int64
Expand Down
8 changes: 4 additions & 4 deletions sync_diff_inspector/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ func TestTiDBSource(t *testing.T) {
require.Equal(t, n, tableCase.rangeInfo.GetTableIndex())
countRows := sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)
mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows)
checksum := tidb.GetCountAndCrc32(ctx, tableCase.rangeInfo)
checksum := tidb.GetCountAndMd5(ctx, tableCase.rangeInfo)
require.NoError(t, checksum.Err)
require.Equal(t, checksum.Count, int64(123))
require.Equal(t, checksum.Checksum, int64(456))
require.Equal(t, checksum.Checksum, uint64(456))
}

// Test ChunkIterator
Expand Down Expand Up @@ -392,14 +392,14 @@ func TestMysqlShardSources(t *testing.T) {

for n, tableCase := range tableCases {
require.Equal(t, n, tableCase.rangeInfo.GetTableIndex())
var resChecksum int64 = 0
var resChecksum uint64 = 0
for i := 0; i < len(dbs); i++ {
resChecksum = resChecksum + 1<<i
countRows := sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(1, 1<<i)
mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows)
}

checksum := shard.GetCountAndCrc32(ctx, tableCase.rangeInfo)
checksum := shard.GetCountAndMd5(ctx, tableCase.rangeInfo)
require.NoError(t, checksum.Err)
require.Equal(t, checksum.Count, int64(len(dbs)))
require.Equal(t, checksum.Checksum, resChecksum)
Expand Down
4 changes: 2 additions & 2 deletions sync_diff_inspector/source/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ func (s *TiDBSource) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo
func (s *TiDBSource) Close() {
s.dbConn.Close()
}
func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo {
func (s *TiDBSource) GetCountAndMd5(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo {
beginTime := time.Now()
table := s.tableDiffs[tableRange.GetTableIndex()]
chunk := tableRange.GetChunk()

matchSource := getMatchSource(s.sourceTableMap, table)
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args)
count, checksum, err := utils.GetCountAndMd5Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args)

cost := time.Since(beginTime)
return &ChecksumInfo{
Expand Down
33 changes: 16 additions & 17 deletions sync_diff_inspector/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,16 +765,16 @@ func GetTableSize(ctx context.Context, db *sql.DB, schemaName, tableName string)
return dataSize.Int64, nil
}

// GetCountAndCRC32Checksum returns checksum code and count of some data by given condition
func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, int64, error) {
// GetCountAndMd5Checksum returns checksum code and count of some data by given condition
func GetCountAndMd5Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, uint64, error) {
/*
calculate CRC32 checksum and count example:
mysql> select count(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) as CHECKSUM from test.test where id > 0;
+--------+------------+
| CNT | CHECKSUM |
+--------+------------+
| 100000 | 1128664311 |
+--------+------------+
calculate MD5 checksum and count example:
mysql> SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM `a`.`t`;
+--------+----------------------
| CNT | CHECKSUM |
+--------+----------------------
| 100000 | 3462532621352132810 |
+--------+----------------------
1 row in set (0.46 sec)
*/
columnNames := make([]string, 0, len(tbInfo.Columns))
Expand All @@ -796,24 +796,23 @@ func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, table
columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name))
}

query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange)
query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange)
log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args))

var count sql.NullInt64
var checksum sql.NullInt64
var checksum uint64
err := db.QueryRowContext(ctx, query, args...).Scan(&count, &checksum)
if err != nil {
log.Warn("execute checksum query fail", zap.String("query", query), zap.Reflect("args", args), zap.Error(err))
return -1, -1, errors.Trace(err)
return -1, 0, errors.Trace(err)
}
if !count.Valid || !checksum.Valid {
if !count.Valid {
// if don't have any data, the checksum will be `NULL`
log.Warn("get empty count or checksum", zap.String("sql", query), zap.Reflect("args", args))
log.Warn("get empty count", zap.String("sql", query), zap.Reflect("args", args))
return 0, 0, nil
}

return count.Int64, checksum.Int64, nil
return count.Int64, checksum, nil
}

// GetRandomValues returns some random values. Different from /pkg/dbutil.GetRandomValues, it returns multi-columns at the same time.
Expand Down
6 changes: 3 additions & 3 deletions sync_diff_inspector/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func TestBasicTableUtilOperation(t *testing.T) {
require.Equal(t, tableInfo.Indices[0].Columns[1].Offset, 1)
}

func TestGetCountAndCRC32Checksum(t *testing.T) {
func TestGetCountAndMd5Checksum(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

Expand All @@ -271,10 +271,10 @@ func TestGetCountAndCRC32Checksum(t *testing.T) {

mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456))

count, checksum, err := GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", []interface{}{"123", "234"})
count, checksum, err := GetCountAndMd5Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", []interface{}{"123", "234"})
require.NoError(t, err)
require.Equal(t, count, int64(123))
require.Equal(t, checksum, int64(456))
require.Equal(t, checksum, uint64(0x1c8))
}

func TestGetApproximateMid(t *testing.T) {
Expand Down