From 0462816d58c21ddabf626611c783657a36f9f9bc Mon Sep 17 00:00:00 2001 From: Rohan Yadav Date: Thu, 20 Feb 2020 15:37:29 -0500 Subject: [PATCH] sql: make secondary indexes not write empty k/v's + bugfixes for primary key changes Fixes #45223. Depends on #45226 to land first. This PR fixes many bugs around secondary index encodings and CRUD operations k/v reads and writes. * Fixes a problem secondary indexes would write empty k/v's if it contained a family that had all null values. * Fix multiple bugs where updates to a table during an online primary key change could result an inconsistent new primary key. * Fix an assumption in the updater that assumed indexes always had the same number of k/v's. The logic has been updated to perform a sort of merge operation to decide what k/v's to insert/delete during the update operation. * Increased testing around secondary indexes k/vs and schema change operations. The release note is None because these are all bugs introduced in 20.1. Release note: None --- pkg/sql/alter_table.go | 20 ++ pkg/sql/backfill/backfill.go | 5 +- .../testdata/secondary_index_column_families | 193 ++++++++++++++++++ pkg/sql/row/deleter.go | 6 +- pkg/sql/row/fetcher.go | 4 +- pkg/sql/row/helper.go | 8 +- pkg/sql/row/inserter.go | 4 +- pkg/sql/row/updater.go | 125 +++++++++--- pkg/sql/schema_changer_test.go | 163 +++++++++++++++ pkg/sql/scrub_test.go | 12 +- pkg/sql/sem/builtins/builtins.go | 2 +- pkg/sql/sqlbase/index_encoding.go | 65 ++++-- pkg/sql/sqlbase/table_test.go | 2 +- 13 files changed, 548 insertions(+), 61 deletions(-) diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index 9ab1bb522dce..2caffa8910ab 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -403,6 +403,7 @@ func (n *alterTableNode) startExec(params runParams) error { CreatedExplicitly: true, EncodingType: sqlbase.PrimaryIndexEncoding, Type: sqlbase.IndexDescriptor_FORWARD, + Version: sqlbase.SecondaryIndexFamilyFormatVersion, } // If the new index is requested to be sharded, set up the index descriptor @@ -444,6 +445,25 @@ func (n *alterTableNode) startExec(params runParams) error { return err } + // Ensure that the new primary index stores all columns in the table. We can't + // use AllocateID's to fill the stored columns here because it assumes + // that the indexed columns are n.PrimaryIndex.ColumnIDs, but here we want + // to consider the indexed columns to be newPrimaryIndexDesc.ColumnIDs. + newPrimaryIndexDesc.StoreColumnNames, newPrimaryIndexDesc.StoreColumnIDs = nil, nil + for _, col := range n.tableDesc.Columns { + containsCol := false + for _, colID := range newPrimaryIndexDesc.ColumnIDs { + if colID == col.ID { + containsCol = true + break + } + } + if !containsCol { + newPrimaryIndexDesc.StoreColumnIDs = append(newPrimaryIndexDesc.StoreColumnIDs, col.ID) + newPrimaryIndexDesc.StoreColumnNames = append(newPrimaryIndexDesc.StoreColumnNames, col.Name) + } + } + if t.Interleave != nil { if err := params.p.addInterleave(params.ctx, n.tableDesc, newPrimaryIndexDesc, t.Interleave); err != nil { return err diff --git a/pkg/sql/backfill/backfill.go b/pkg/sql/backfill/backfill.go index 65ae27dc52d9..636f95e0c075 100644 --- a/pkg/sql/backfill/backfill.go +++ b/pkg/sql/backfill/backfill.go @@ -422,11 +422,12 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk( // We're resetting the length of this slice for variable length indexes such as inverted // indexes which can append entries to the end of the slice. If we don't do this, then everything // EncodeSecondaryIndexes appends to secondaryIndexEntries for a row, would stay in the slice for - // subsequent rows and we would then have duplicates in entries on output. + // subsequent rows and we would then have duplicates in entries on output. Additionally, we do + // not want to include empty k/v pairs while backfilling. buffer = buffer[:len(ib.added)] if buffer, err = sqlbase.EncodeSecondaryIndexes( tableDesc.TableDesc(), ib.added, ib.colIdxMap, - ib.rowVals, buffer); err != nil { + ib.rowVals, buffer, false /* includeEmpty */); err != nil { return nil, nil, err } entries = append(entries, buffer...) diff --git a/pkg/sql/opt/exec/execbuilder/testdata/secondary_index_column_families b/pkg/sql/opt/exec/execbuilder/testdata/secondary_index_column_families index bc41f871542d..2e90ffa4ec73 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/secondary_index_column_families +++ b/pkg/sql/opt/exec/execbuilder/testdata/secondary_index_column_families @@ -354,3 +354,196 @@ message LIKE 'Scan%' ORDER BY message ---- Scan /Table/57/2/2/{0-1} + +# Ensure that when backfilling an index we only insert the needed k/vs. +statement ok +DROP TABLE IF EXISTS t; +CREATE TABLE t ( + x INT PRIMARY KEY, y INT, z INT, w INT, + FAMILY (y), FAMILY (x), FAMILY (z), FAMILY (w) +); +INSERT INTO t VALUES (1, 2, NULL, 3), (4, 5, 6, NULL), (8, 9, NULL, NULL); +CREATE INDEX i ON t (y) STORING (z, w) + +query IIII rowsort +SET TRACING=on,kv,results; +SELECT * FROM t@i; +SET TRACING=off +---- +1 2 NULL 3 +4 5 6 NULL +8 9 NULL NULL + +# Ensure by scanning that we fetch 2 k/v's for row (1, 2, NULL, 3), +# 2 k/v's for row (4, 5, 6, NULL), and 1 k/v for row (8, 9, NULL, NULL). +query T +SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE +message LIKE 'fetched%' +ORDER BY message +---- +fetched: /t/i/2/1 -> NULL +fetched: /t/i/2/1/w -> /3 +fetched: /t/i/5/4 -> NULL +fetched: /t/i/5/4/z -> /6 +fetched: /t/i/9/8 -> NULL + +statement ok +DROP TABLE IF EXISTS t; +CREATE TABLE t ( + x INT PRIMARY KEY, y INT, z INT, w INT, + FAMILY (y), FAMILY (x), FAMILY (z), FAMILY (w) +); +INSERT INTO t VALUES (1, 2, NULL, NULL) + +statement ok +BEGIN + +# Place i on the mutations queue in a delete only state. +statement ok +CREATE INDEX i ON t (y) STORING (z, w) + +statement ok +SET TRACING=on,kv,results; +UPDATE t SET z = 3 WHERE y = 2; +SET TRACING=off + +# Because i is in a delete only state, we should see a delete +# for each k/v for i for the row (1, 2, NULL, NULL). +query T +SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE +message LIKE 'Del%' +ORDER BY message +---- +Del /Table/59/2/2/1/0 +Del /Table/59/2/2/1/2/1 +Del /Table/59/2/2/1/3/1 + +statement ok +COMMIT + +query IIII +SELECT * FROM t@i +---- +1 2 3 NULL + +statement ok +DROP TABLE IF EXISTS t; +CREATE TABLE t ( + x INT PRIMARY KEY, y INT, a INT, b INT, c INT, d INT, e INT, f INT, + FAMILY (x), FAMILY (y), FAMILY (a, b), FAMILY (c, d), FAMILY (e), FAMILY (f), + INDEX i1 (y) STORING (a, b, c, d, e, f), + UNIQUE INDEX i2 (y) STORING (a, b, c, d, e, f) +); + +# Ensure we only insert the correct keys. +statement ok +SET TRACING=on,kv,results; +INSERT INTO t VALUES (1, 2, 3, NULL, 5, 6, NULL, 8); +SET TRACING=off + +query T +SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE +message LIKE 'InitPut%' +ORDER BY message +---- +InitPut /Table/60/2/2/1/0 -> /BYTES/ +InitPut /Table/60/2/2/1/2/1 -> /TUPLE/3:3:Int/3 +InitPut /Table/60/2/2/1/3/1 -> /TUPLE/5:5:Int/5/1:6:Int/6 +InitPut /Table/60/2/2/1/5/1 -> /TUPLE/8:8:Int/8 +InitPut /Table/60/3/2/0 -> /BYTES/0x89 +InitPut /Table/60/3/2/2/1 -> /TUPLE/3:3:Int/3 +InitPut /Table/60/3/2/3/1 -> /TUPLE/5:5:Int/5/1:6:Int/6 +InitPut /Table/60/3/2/5/1 -> /TUPLE/8:8:Int/8 + +# Test some cases of the updater. +statement ok +SET TRACING=on,kv,results; +UPDATE t SET b = 4, c = NULL, d = NULL, e = 7, f = NULL WHERE y = 2; +SET TRACING=off + +query IIIIIIII +SELECT * FROM t@i2 +---- +1 2 3 4 NULL NULL 7 NULL + +query T +SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE +message LIKE 'Put /Table/60/2/%' OR +message LIKE 'Del /Table/60/2/%' OR +message LIKE 'CPut /Table/60/2/%' +---- +CPut /Table/60/2/2/1/2/1 -> /TUPLE/3:3:Int/3/1:4:Int/4 (replacing raw_bytes:"\000\000\000\000\n3\006" timestamp:<> , if exists) +Del /Table/60/2/2/1/3/1 +CPut /Table/60/2/2/1/4/1 -> /TUPLE/7:7:Int/7 (expecting does not exist) +Del /Table/60/2/2/1/5/1 + +statement ok +INSERT INTO t VALUES (3, 3, NULL, NULL, NULL, NULL, NULL, NULL) + +statement ok +SET TRACING=on,kv,results; +UPDATE t SET a = 10, b = 11, c = 12, d = 13, e = 14, f = 15 WHERE y = 3; +SET TRACING=off + +query T +SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE +message LIKE 'Put /Table/60/2/%' OR +message LIKE 'Del /Table/60/2/%' OR +message LIKE 'CPut /Table/60/2/%' +---- +CPut /Table/60/2/3/3/2/1 -> /TUPLE/3:3:Int/10/1:4:Int/11 (expecting does not exist) +CPut /Table/60/2/3/3/3/1 -> /TUPLE/5:5:Int/12/1:6:Int/13 (expecting does not exist) +CPut /Table/60/2/3/3/4/1 -> /TUPLE/7:7:Int/14 (expecting does not exist) +CPut /Table/60/2/3/3/5/1 -> /TUPLE/8:8:Int/15 (expecting does not exist) + +statement ok +SET TRACING=on,kv,results; +UPDATE t SET a = NULL, b = NULL, c = NULL, d = NULL, e = NULL, f = NULL WHERE y = 3; +SET TRACING=off + +query T +SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE +message LIKE 'Put /Table/60/2/%' OR +message LIKE 'Del /Table/60/2/%' OR +message LIKE 'CPut /Table/60/2/%' +---- +Del /Table/60/2/3/3/2/1 +Del /Table/60/2/3/3/3/1 +Del /Table/60/2/3/3/4/1 +Del /Table/60/2/3/3/5/1 + + +statement ok +INSERT INTO t VALUES (20, 21, 22, NULL, NULL, 25, NULL, 27); +SET TRACING=on,kv,results; +UPDATE t SET y = 22 WHERE y = 21; +SET TRACING=off + +query T +SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE +message LIKE 'Put /Table/60/2/%' OR +message LIKE 'Del /Table/60/2/%' OR +message LIKE 'CPut /Table/60/2/%' +---- +Del /Table/60/2/21/20/0 +CPut /Table/60/2/22/20/0 -> /BYTES/ (expecting does not exist) +Del /Table/60/2/21/20/2/1 +CPut /Table/60/2/22/20/2/1 -> /TUPLE/3:3:Int/22 (expecting does not exist) +Del /Table/60/2/21/20/3/1 +CPut /Table/60/2/22/20/3/1 -> /TUPLE/6:6:Int/25 (expecting does not exist) +Del /Table/60/2/21/20/5/1 +CPut /Table/60/2/22/20/5/1 -> /TUPLE/8:8:Int/27 (expecting does not exist) + +query IIIIIIII rowsort +SELECT * FROM t@i1 +---- +1 2 3 4 NULL NULL 7 NULL +3 3 NULL NULL NULL NULL NULL NULL +20 22 22 NULL NULL 25 NULL 27 + +query IIIIIIII rowsort +SELECT * FROM t@i2 +---- +1 2 3 4 NULL NULL 7 NULL +3 3 NULL NULL NULL NULL NULL NULL +20 22 22 NULL NULL 25 NULL 27 diff --git a/pkg/sql/row/deleter.go b/pkg/sql/row/deleter.go index 2c6317e23527..32f479674b97 100644 --- a/pkg/sql/row/deleter.go +++ b/pkg/sql/row/deleter.go @@ -139,8 +139,9 @@ func (rd *Deleter) DeleteRow( // Delete the row from any secondary indices. for i := range rd.Helper.Indexes { + // We want to include empty k/v pairs because we want to delete all k/v's for this row. entries, err := sqlbase.EncodeSecondaryIndex( - rd.Helper.TableDesc.TableDesc(), &rd.Helper.Indexes[i], rd.FetchColIDtoRowIndex, values) + rd.Helper.TableDesc.TableDesc(), &rd.Helper.Indexes[i], rd.FetchColIDtoRowIndex, values, true /* includeEmpty */) if err != nil { return err } @@ -212,8 +213,9 @@ func (rd *Deleter) DeleteIndexRow( return err } } + // We want to include empty k/v pairs because we want to delete all k/v's for this row. secondaryIndexEntry, err := sqlbase.EncodeSecondaryIndex( - rd.Helper.TableDesc.TableDesc(), idx, rd.FetchColIDtoRowIndex, values) + rd.Helper.TableDesc.TableDesc(), idx, rd.FetchColIDtoRowIndex, values, true /* includeEmpty */) if err != nil { return err } diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index a926836974fc..bcc18d72b46f 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -1358,7 +1358,9 @@ func (rf *Fetcher) checkSecondaryIndexDatumEncodings(ctx context.Context) error values[i] = table.row[i].Datum } - indexEntries, err := sqlbase.EncodeSecondaryIndex(table.desc.TableDesc(), table.index, table.colIdxMap, values) + // The below code makes incorrect checks (#45256). + indexEntries, err := sqlbase.EncodeSecondaryIndex( + table.desc.TableDesc(), table.index, table.colIdxMap, values, false /* includeEmpty */) if err != nil { return err } diff --git a/pkg/sql/row/helper.go b/pkg/sql/row/helper.go index 99a5e49169c4..570cbc710d6f 100644 --- a/pkg/sql/row/helper.go +++ b/pkg/sql/row/helper.go @@ -56,13 +56,13 @@ func newRowHelper( // secondaryIndexEntries are only valid until the next call to encodeIndexes or // encodeSecondaryIndexes. func (rh *rowHelper) encodeIndexes( - colIDtoRowIndex map[sqlbase.ColumnID]int, values []tree.Datum, + colIDtoRowIndex map[sqlbase.ColumnID]int, values []tree.Datum, includeEmpty bool, ) (primaryIndexKey []byte, secondaryIndexEntries []sqlbase.IndexEntry, err error) { primaryIndexKey, err = rh.encodePrimaryIndex(colIDtoRowIndex, values) if err != nil { return nil, nil, err } - secondaryIndexEntries, err = rh.encodeSecondaryIndexes(colIDtoRowIndex, values) + secondaryIndexEntries, err = rh.encodeSecondaryIndexes(colIDtoRowIndex, values, includeEmpty) if err != nil { return nil, nil, err } @@ -86,13 +86,13 @@ func (rh *rowHelper) encodePrimaryIndex( // secondaryIndexEntries are only valid until the next call to encodeIndexes or // encodeSecondaryIndexes. func (rh *rowHelper) encodeSecondaryIndexes( - colIDtoRowIndex map[sqlbase.ColumnID]int, values []tree.Datum, + colIDtoRowIndex map[sqlbase.ColumnID]int, values []tree.Datum, includeEmpty bool, ) (secondaryIndexEntries []sqlbase.IndexEntry, err error) { if len(rh.indexEntries) != len(rh.Indexes) { rh.indexEntries = make([]sqlbase.IndexEntry, len(rh.Indexes)) } rh.indexEntries, err = sqlbase.EncodeSecondaryIndexes( - rh.TableDesc.TableDesc(), rh.Indexes, colIDtoRowIndex, values, rh.indexEntries) + rh.TableDesc.TableDesc(), rh.Indexes, colIDtoRowIndex, values, rh.indexEntries, includeEmpty) if err != nil { return nil, err } diff --git a/pkg/sql/row/inserter.go b/pkg/sql/row/inserter.go index 77a5edd62d25..5ac4f420646a 100644 --- a/pkg/sql/row/inserter.go +++ b/pkg/sql/row/inserter.go @@ -158,7 +158,9 @@ func (ri *Inserter) InsertRow( } } - primaryIndexKey, secondaryIndexEntries, err := ri.Helper.encodeIndexes(ri.InsertColIDtoRowIndex, values) + // We don't want to insert any empty k/v's, so set includeEmpty to false. + primaryIndexKey, secondaryIndexEntries, err := ri.Helper.encodeIndexes( + ri.InsertColIDtoRowIndex, values, false /* includeEmpty */) if err != nil { return err } diff --git a/pkg/sql/row/updater.go b/pkg/sql/row/updater.go index 5c4c6eb1f7b1..6a74f3166ed2 100644 --- a/pkg/sql/row/updater.go +++ b/pkg/sql/row/updater.go @@ -319,7 +319,7 @@ func (ru *Updater) UpdateRow( } var deleteOldSecondaryIndexEntries []sqlbase.IndexEntry if ru.DeleteHelper != nil { - _, deleteOldSecondaryIndexEntries, err = ru.DeleteHelper.encodeIndexes(ru.FetchColIDtoRowIndex, oldValues) + _, deleteOldSecondaryIndexEntries, err = ru.DeleteHelper.encodeIndexes(ru.FetchColIDtoRowIndex, oldValues, true /* includeEmpty */) if err != nil { return nil, err } @@ -352,15 +352,24 @@ func (ru *Updater) UpdateRow( } for i := range ru.Helper.Indexes { - // TODO (rohany): include a version of sqlbase.EncodeSecondaryIndex that allocates index entries - // into an argument list. + // We don't want to write k/v's that have empty values, so don't include empty k/v's here. ru.oldIndexEntries[i], err = sqlbase.EncodeSecondaryIndex( - ru.Helper.TableDesc.TableDesc(), &ru.Helper.Indexes[i], ru.FetchColIDtoRowIndex, oldValues) + ru.Helper.TableDesc.TableDesc(), + &ru.Helper.Indexes[i], + ru.FetchColIDtoRowIndex, + oldValues, + false, /* includeEmpty */ + ) if err != nil { return nil, err } ru.newIndexEntries[i], err = sqlbase.EncodeSecondaryIndex( - ru.Helper.TableDesc.TableDesc(), &ru.Helper.Indexes[i], ru.FetchColIDtoRowIndex, ru.newValues) + ru.Helper.TableDesc.TableDesc(), + &ru.Helper.Indexes[i], + ru.FetchColIDtoRowIndex, + ru.newValues, + false, /* includeEmpty */ + ) if err != nil { return nil, err } @@ -436,35 +445,103 @@ func (ru *Updater) UpdateRow( for i := range ru.Helper.Indexes { index := &ru.Helper.Indexes[i] if index.Type == sqlbase.IndexDescriptor_FORWARD { - if len(ru.oldIndexEntries[i]) != len(ru.newIndexEntries[i]) { - panic("expected same number of index entries for old and new values") - } - for j := range ru.oldIndexEntries[i] { - oldEntry := &ru.oldIndexEntries[i][j] - newEntry := &ru.newIndexEntries[i][j] - var expValue *roachpb.Value - if !bytes.Equal(oldEntry.Key, newEntry.Key) { - // TODO (rohany): this check is duplicated here and above, is there a reason? - ru.Fks.addCheckForIndex(ru.Helper.Indexes[i].ID, ru.Helper.Indexes[i].Type) + oldIdx, newIdx := 0, 0 + oldEntries, newEntries := ru.oldIndexEntries[i], ru.newIndexEntries[i] + for oldIdx < len(oldEntries) && newIdx < len(newEntries) { + oldEntry, newEntry := &oldEntries[oldIdx], &newEntries[newIdx] + if oldEntry.Family == newEntry.Family { + // If the families are equal, then check if the keys have changed. If so, delete the old key. + // Then, issue a CPut for the new value of the key if the value has changed. + // Because the indexes will always have a k/v for family 0, it suffices to only + // add foreign key checks in this case, because we are guaranteed to enter here. + oldIdx++ + newIdx++ + var expValue *roachpb.Value + if !bytes.Equal(oldEntry.Key, newEntry.Key) { + ru.Fks.addCheckForIndex(index.ID, index.Type) + if traceKV { + log.VEventf(ctx, 2, "Del %s", keys.PrettyPrint(ru.Helper.secIndexValDirs[i], oldEntry.Key)) + } + batch.Del(oldEntry.Key) + } else if !newEntry.Value.EqualData(oldEntry.Value) { + expValue = &oldEntry.Value + } else { + continue + } + if traceKV { + k := keys.PrettyPrint(ru.Helper.secIndexValDirs[i], newEntry.Key) + v := newEntry.Value.PrettyPrint() + if expValue != nil { + log.VEventf(ctx, 2, "CPut %s -> %v (replacing %v, if exists)", k, v, expValue) + } else { + log.VEventf(ctx, 2, "CPut %s -> %v (expecting does not exist)", k, v) + } + } + batch.CPutAllowingIfNotExists(newEntry.Key, &newEntry.Value, expValue) + } else if oldEntry.Family < newEntry.Family { + if oldEntry.Family == sqlbase.FamilyID(0) { + return nil, errors.AssertionFailedf( + "index entry for family 0 for table %s, index %s was not generated", + ru.Helper.TableDesc.Name, index.Name, + ) + } + // In this case, the index has a k/v for a family that does not exist in + // the new set of k/v's for the row. So, we need to delete the old k/v. if traceKV { log.VEventf(ctx, 2, "Del %s", keys.PrettyPrint(ru.Helper.secIndexValDirs[i], oldEntry.Key)) } batch.Del(oldEntry.Key) - } else if !newEntry.Value.EqualData(oldEntry.Value) { - expValue = &oldEntry.Value + oldIdx++ } else { - continue + if newEntry.Family == sqlbase.FamilyID(0) { + return nil, errors.AssertionFailedf( + "index entry for family 0 for table %s, index %s was not generated", + ru.Helper.TableDesc.Name, index.Name, + ) + } + // In this case, the index now has a k/v that did not exist in the + // old row, so we should expect to not see a value for the new + // key, and put the new key in place. + if traceKV { + k := keys.PrettyPrint(ru.Helper.secIndexValDirs[i], newEntry.Key) + v := newEntry.Value.PrettyPrint() + log.VEventf(ctx, 2, "CPut %s -> %v (expecting does not exist)", k, v) + } + batch.CPut(newEntry.Key, &newEntry.Value, nil) + newIdx++ + } + } + for oldIdx < len(oldEntries) { + // Delete any remaining old entries that are not matched by new entries in this row. + oldEntry := &oldEntries[oldIdx] + if oldEntry.Family == sqlbase.FamilyID(0) { + return nil, errors.AssertionFailedf( + "index entry for family 0 for table %s, index %s was not generated", + ru.Helper.TableDesc.Name, index.Name, + ) + } + if traceKV { + log.VEventf(ctx, 2, "Del %s", keys.PrettyPrint(ru.Helper.secIndexValDirs[i], oldEntry.Key)) + } + batch.Del(oldEntry.Key) + oldIdx++ + } + for newIdx < len(newEntries) { + // Insert any remaining new entries that are not present in the old row. + newEntry := &newEntries[newIdx] + if newEntry.Family == sqlbase.FamilyID(0) { + return nil, errors.AssertionFailedf( + "index entry for family 0 for table %s, index %s was not generated", + ru.Helper.TableDesc.Name, index.Name, + ) } if traceKV { k := keys.PrettyPrint(ru.Helper.secIndexValDirs[i], newEntry.Key) v := newEntry.Value.PrettyPrint() - if expValue != nil { - log.VEventf(ctx, 2, "CPut %s -> %v (replacing %v, if exists)", k, v, expValue) - } else { - log.VEventf(ctx, 2, "CPut %s -> %v (expecting does not exist)", k, v) - } + log.VEventf(ctx, 2, "CPut %s -> %v (expecting does not exist)", k, v) } - batch.CPutAllowingIfNotExists(newEntry.Key, &newEntry.Value, expValue) + batch.CPut(newEntry.Key, &newEntry.Value, nil) + newIdx++ } } else { // Remove all inverted index entries, and re-add them. diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 25951c303b3a..210dc863703b 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -56,6 +56,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/pkg/errors" + "github.com/stretchr/testify/require" ) // asyncSchemaChangerDisabled can be used to disable asynchronous processing @@ -2563,6 +2564,168 @@ CREATE TABLE t.test (k INT NOT NULL, v INT); ) } +// TestPrimaryKeyChangeKVOps tests sequences of k/v operations +// on the new primary index while it is staged as a special +// secondary index. We cannot test this in a standard logic +// test because we only have control over stopping the backfill +// process in a unit test like this. This test is essentially +// a heavy-weight poor man's logic tests, but there doesn't +// seem to be a better way to achieve what is needed here. +func TestPrimaryKeyChangeKVOps(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + + backfillNotification := make(chan struct{}) + waitBeforeContinuing := make(chan struct{}) + + params, _ := tests.CreateTestServerParams() + params.Knobs = base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + RunBeforeBackfillChunk: func(_ roachpb.Span) error { + backfillNotification <- struct{}{} + <-waitBeforeContinuing + return nil + }, + }, + } + s, sqlDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + + if _, err := sqlDB.Exec(` +SET experimental_enable_primary_key_changes = true; +CREATE DATABASE t; +CREATE TABLE t.test ( + x INT PRIMARY KEY, + y INT NOT NULL, + z INT, + a INT, + b INT, + c INT, + FAMILY (x), FAMILY (y), FAMILY (z, a), FAMILY (b), FAMILY (c) +) +`); err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + if _, err := sqlDB.Exec(`ALTER TABLE t.test ALTER PRIMARY KEY USING COLUMNS (y)`); err != nil { + t.Error(err) + } + wg.Done() + }() + + // Wait for the new primary index to move to the DELETE_AND_WRITE_ONLY + // state, which happens right before backfilling of the index begins. + <-backfillNotification + + scanToArray := func(rows *gosql.Rows) []string { + var found []string + for rows.Next() { + var message string + if err := rows.Scan(&message); err != nil { + t.Fatal(err) + } + found = append(found, message) + } + return found + } + + // Test that we only insert the necessary k/v's. + rows, err := sqlDB.Query(` + SET TRACING=on,kv,results; + INSERT INTO t.test VALUES (1, 2, 3, NULL, NULL, 6); + SET TRACING=off; + SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE + message LIKE 'InitPut /Table/53/2%' ORDER BY message;`) + if err != nil { + t.Fatal(err) + } + + expected := []string{ + "InitPut /Table/53/2/2/0 -> /TUPLE/1:1:Int/1", + // TODO (rohany): this k/v is spurious and should be removed + // when #45343 is fixed. + "InitPut /Table/53/2/2/1/1 -> /INT/2", + "InitPut /Table/53/2/2/2/1 -> /TUPLE/3:3:Int/3", + "InitPut /Table/53/2/2/4/1 -> /INT/6", + } + require.Equal(t, expected, scanToArray(rows)) + + // Test that we remove all families when deleting. + rows, err = sqlDB.Query(` + SET TRACING=on, kv, results; + DELETE FROM t.test WHERE y = 2; + SET TRACING=off; + SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE + message LIKE 'Del /Table/53/2%' ORDER BY message;`) + if err != nil { + t.Fatal(err) + } + + expected = []string{ + "Del /Table/53/2/2/0", + "Del /Table/53/2/2/1/1", + "Del /Table/53/2/2/2/1", + "Del /Table/53/2/2/3/1", + "Del /Table/53/2/2/4/1", + } + require.Equal(t, expected, scanToArray(rows)) + + // Test that we update all families when the key changes. + rows, err = sqlDB.Query(` + INSERT INTO t.test VALUES (1, 2, 3, NULL, NULL, 6); + SET TRACING=on, kv, results; + UPDATE t.test SET y = 3 WHERE y = 2; + SET TRACING=off; + SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE + message LIKE 'Put /Table/53/2%' OR + message LIKE 'Del /Table/53/2%' OR + message LIKE 'CPut /Table/53/2%';`) + if err != nil { + t.Fatal(err) + } + + expected = []string{ + "Del /Table/53/2/2/0", + "CPut /Table/53/2/3/0 -> /TUPLE/1:1:Int/1 (expecting does not exist)", + // TODO (rohany): this k/v is spurious and should be removed + // when #45343 is fixed. + "Del /Table/53/2/2/1/1", + "CPut /Table/53/2/3/1/1 -> /INT/3 (expecting does not exist)", + "Del /Table/53/2/2/2/1", + "CPut /Table/53/2/3/2/1 -> /TUPLE/3:3:Int/3 (expecting does not exist)", + "Del /Table/53/2/2/4/1", + "CPut /Table/53/2/3/4/1 -> /INT/6 (expecting does not exist)", + } + require.Equal(t, expected, scanToArray(rows)) + + // Test that we only update necessary families when the key doesn't change. + rows, err = sqlDB.Query(` + SET TRACING=on, kv, results; + UPDATE t.test SET z = NULL, b = 5, c = NULL WHERE y = 3; + SET TRACING=off; + SELECT message FROM [SHOW KV TRACE FOR SESSION] WHERE + message LIKE 'Put /Table/53/2%' OR + message LIKE 'Del /Table/53/2%' OR + message LIKE 'CPut /Table/53/2%';`) + if err != nil { + t.Fatal(err) + } + + expected = []string{ + "Del /Table/53/2/3/2/1", + "CPut /Table/53/2/3/3/1 -> /INT/5 (expecting does not exist)", + "Del /Table/53/2/3/4/1", + } + require.Equal(t, expected, scanToArray(rows)) + + waitBeforeContinuing <- struct{}{} + + wg.Wait() +} + // TestPrimaryKeyIndexRewritesGetRemoved ensures that the old versions of // indexes that are being rewritten eventually get cleaned up and removed. func TestPrimaryKeyIndexRewritesGetRemoved(t *testing.T) { diff --git a/pkg/sql/scrub_test.go b/pkg/sql/scrub_test.go index 8de2f3975a4f..e93b445c2772 100644 --- a/pkg/sql/scrub_test.go +++ b/pkg/sql/scrub_test.go @@ -64,7 +64,7 @@ INSERT INTO t."tEst" VALUES (10, 20); // Construct the secondary index key that is currently in the // database. secondaryIndexKey, err := sqlbase.EncodeSecondaryIndex( - tableDesc, secondaryIndex, colIDtoRowIndex, values) + tableDesc, secondaryIndex, colIDtoRowIndex, values, true /* includeEmpty */) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -132,7 +132,7 @@ CREATE INDEX secondary ON t.test (v); // Construct datums and secondary k/v for our row values (k, v). values := []tree.Datum{tree.NewDInt(10), tree.NewDInt(314)} secondaryIndex, err := sqlbase.EncodeSecondaryIndex( - tableDesc, secondaryIndexDesc, colIDtoRowIndex, values) + tableDesc, secondaryIndexDesc, colIDtoRowIndex, values, true /* includeEmpty */) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -226,7 +226,7 @@ INSERT INTO t.test VALUES (10, 20, 1337); // Generate the existing secondary index key. values := []tree.Datum{tree.NewDInt(10), tree.NewDInt(20), tree.NewDInt(1337)} secondaryIndex, err := sqlbase.EncodeSecondaryIndex( - tableDesc, secondaryIndexDesc, colIDtoRowIndex, values) + tableDesc, secondaryIndexDesc, colIDtoRowIndex, values, true /* includeEmpty */) if len(secondaryIndex) != 1 { t.Fatalf("expected 1 index entry, got %d. got %#v", len(secondaryIndex), secondaryIndex) @@ -243,7 +243,7 @@ INSERT INTO t.test VALUES (10, 20, 1337); // Generate a secondary index k/v that has a different value. values = []tree.Datum{tree.NewDInt(10), tree.NewDInt(20), tree.NewDInt(314)} secondaryIndex, err = sqlbase.EncodeSecondaryIndex( - tableDesc, secondaryIndexDesc, colIDtoRowIndex, values) + tableDesc, secondaryIndexDesc, colIDtoRowIndex, values, true /* includeEmpty */) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -444,7 +444,7 @@ func TestScrubFKConstraintFKMissing(t *testing.T) { // Construct the secondary index key entry as it exists in the // database. secondaryIndexKey, err := sqlbase.EncodeSecondaryIndex( - tableDesc, secondaryIndex, colIDtoRowIndex, values) + tableDesc, secondaryIndex, colIDtoRowIndex, values, true /* includeEmpty */) if err != nil { t.Fatalf("unexpected error: %s", err) } @@ -464,7 +464,7 @@ func TestScrubFKConstraintFKMissing(t *testing.T) { // Construct the new secondary index key that will be inserted. secondaryIndexKey, err = sqlbase.EncodeSecondaryIndex( - tableDesc, secondaryIndex, colIDtoRowIndex, values) + tableDesc, secondaryIndex, colIDtoRowIndex, values, true /* includeEmpty */) if err != nil { t.Fatalf("unexpected error: %s", err) } diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index 04a8ea0000bd..3af04b1a972e 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -3306,7 +3306,7 @@ may increase either contention or retry errors, or both.`, return tree.NewDBytes(tree.DBytes(res)), err } // We have a secondary index. - res, err := sqlbase.EncodeSecondaryIndex(tableDesc, indexDesc, colMap, datums) + res, err := sqlbase.EncodeSecondaryIndex(tableDesc, indexDesc, colMap, datums, true /* includeEmpty */) if err != nil { return nil, err } diff --git a/pkg/sql/sqlbase/index_encoding.go b/pkg/sql/sqlbase/index_encoding.go index d1d2ae7cbf3b..05cbaca8d5ad 100644 --- a/pkg/sql/sqlbase/index_encoding.go +++ b/pkg/sql/sqlbase/index_encoding.go @@ -812,6 +812,8 @@ func ExtractIndexKey( type IndexEntry struct { Key roachpb.Key Value roachpb.Value + // Only used for forward indexes. + Family FamilyID } // valueEncodedColumn represents a composite or stored column of a secondary @@ -874,7 +876,11 @@ func EncodeInvertedIndexTableKeys(val tree.Datum, inKey []byte) (key [][]byte, e // It is somewhat duplicated here due to the different arguments that prepareOrInsertUpdateBatch needs // and uses to generate the k/v's for the row it inserts. func EncodePrimaryIndex( - tableDesc *TableDescriptor, index *IndexDescriptor, colMap map[ColumnID]int, values []tree.Datum, + tableDesc *TableDescriptor, + index *IndexDescriptor, + colMap map[ColumnID]int, + values []tree.Datum, + includeEmpty bool, ) ([]IndexEntry, error) { keyPrefix := MakeIndexKeyPrefix(tableDesc, index.ID) indexKey, _, err := EncodeIndexKey(tableDesc, index, colMap, values, keyPrefix) @@ -889,6 +895,19 @@ func EncodePrimaryIndex( var entryValue []byte indexEntries := make([]IndexEntry, 0, len(tableDesc.Families)) var columnsToEncode []valueEncodedColumn + + isIndexed := func(col ColumnID) bool { + _, ok := indexedColumns[col] + return ok + } + isComposite := func(col ColumnID) bool { + cdatum, ok := values[colMap[col]].(tree.CompositeDatum) + if ok && cdatum.IsComposite() { + return true + } + return false + } + for i := range tableDesc.Families { var err error family := &tableDesc.Families[i] @@ -901,8 +920,10 @@ func EncodePrimaryIndex( // The decoders expect that column family 0 is encoded with a TUPLE value tag, so we // don't want to use the untagged value encoding. if len(family.ColumnIDs) == 1 && family.ColumnIDs[0] == family.DefaultColumnID && family.ID != 0 { - datum := values[colMap[family.DefaultColumnID]] - if datum != tree.DNull { + datum := findColumnValue(family.DefaultColumnID, colMap, values) + // We want to include this column if its value is non-null or + // we were requested to include all of the columns. + if datum != tree.DNull || includeEmpty { col, err := tableDesc.FindColumnByID(family.DefaultColumnID) if err != nil { return nil, err @@ -911,21 +932,19 @@ func EncodePrimaryIndex( if err != nil { return nil, err } - indexEntries = append(indexEntries, IndexEntry{Key: familyKey, Value: value}) + indexEntries = append(indexEntries, IndexEntry{Key: familyKey, Value: value, Family: family.ID}) } continue } for _, colID := range family.ColumnIDs { - if _, ok := indexedColumns[colID]; !ok { + if !isIndexed(colID) { columnsToEncode = append(columnsToEncode, valueEncodedColumn{id: colID}) continue } - if cdatum, ok := values[colMap[colID]].(tree.CompositeDatum); ok { - if cdatum.IsComposite() { - columnsToEncode = append(columnsToEncode, valueEncodedColumn{id: colID, isComposite: true}) - continue - } + if isComposite(colID) { + columnsToEncode = append(columnsToEncode, valueEncodedColumn{id: colID, isComposite: true}) + continue } } sort.Sort(byID(columnsToEncode)) @@ -933,10 +952,10 @@ func EncodePrimaryIndex( if err != nil { return nil, err } - if family.ID != 0 && len(entryValue) == 0 { + if family.ID != 0 && len(entryValue) == 0 && !includeEmpty { continue } - entry := IndexEntry{Key: familyKey} + entry := IndexEntry{Key: familyKey, Family: family.ID} entry.Value.SetTuple(entryValue) indexEntries = append(indexEntries, entry) } @@ -945,19 +964,19 @@ func EncodePrimaryIndex( // EncodeSecondaryIndex encodes key/values for a secondary // index. colMap maps ColumnIDs to indices in `values`. This returns a -// slice of IndexEntry. Forward indexes will return one value, while -// inverted indices can return multiple values. +// slice of IndexEntry. func EncodeSecondaryIndex( tableDesc *TableDescriptor, secondaryIndex *IndexDescriptor, colMap map[ColumnID]int, values []tree.Datum, + includeEmpty bool, ) ([]IndexEntry, error) { secondaryIndexKeyPrefix := MakeIndexKeyPrefix(tableDesc, secondaryIndex.ID) // Use the primary key encoding for covering indexes. if secondaryIndex.GetEncodingType(tableDesc.PrimaryIndex.ID) == PrimaryIndexEncoding { - return EncodePrimaryIndex(tableDesc, secondaryIndex, colMap, values) + return EncodePrimaryIndex(tableDesc, secondaryIndex, colMap, values, includeEmpty) } var containsNull = false @@ -1032,7 +1051,8 @@ func EncodeSecondaryIndex( } } } - entries, err = encodeSecondaryIndexWithFamilies(familyToColumns, secondaryIndex, colMap, key, values, extraKey, entries) + entries, err = encodeSecondaryIndexWithFamilies( + familyToColumns, secondaryIndex, colMap, key, values, extraKey, entries, includeEmpty) if err != nil { return []IndexEntry{}, err } @@ -1051,6 +1071,7 @@ func encodeSecondaryIndexWithFamilies( row []tree.Datum, extraKeyCols []byte, results []IndexEntry, + includeEmpty bool, ) ([]IndexEntry, error) { var ( value []byte @@ -1098,7 +1119,12 @@ func encodeSecondaryIndexWithFamilies( if err != nil { return []IndexEntry{}, err } - entry := IndexEntry{Key: key} + entry := IndexEntry{Key: key, Family: FamilyID(familyID)} + // If we aren't looking at family 0 and don't have a value, + // don't include an entry for this k/v. + if familyID != 0 && len(value) == 0 && !includeEmpty { + continue + } // If we are looking at family 0, encode the data as BYTES, as it might // include encoded primary key columns. For other families, use the // tuple encoding for the value. @@ -1153,7 +1179,7 @@ func encodeSecondaryIndexNoFamilies( if err != nil { return IndexEntry{}, err } - entry := IndexEntry{Key: key} + entry := IndexEntry{Key: key, Family: 0} entry.Value.SetBytes(value) return entry, nil } @@ -1193,12 +1219,13 @@ func EncodeSecondaryIndexes( colMap map[ColumnID]int, values []tree.Datum, secondaryIndexEntries []IndexEntry, + includeEmpty bool, ) ([]IndexEntry, error) { if len(secondaryIndexEntries) != len(indexes) { panic("Length of secondaryIndexEntries is not equal to the number of indexes.") } for i := range indexes { - entries, err := EncodeSecondaryIndex(tableDesc, &indexes[i], colMap, values) + entries, err := EncodeSecondaryIndex(tableDesc, &indexes[i], colMap, values, includeEmpty) if err != nil { return secondaryIndexEntries, err } diff --git a/pkg/sql/sqlbase/table_test.go b/pkg/sql/sqlbase/table_test.go index 8c7a8bf76ec5..9ebc61aaebe5 100644 --- a/pkg/sql/sqlbase/table_test.go +++ b/pkg/sql/sqlbase/table_test.go @@ -224,7 +224,7 @@ func TestIndexKey(t *testing.T) { primaryIndexKV := client.KeyValue{Key: primaryKey, Value: &primaryValue} secondaryIndexEntry, err := EncodeSecondaryIndex( - &tableDesc, &tableDesc.Indexes[0], colMap, testValues) + &tableDesc, &tableDesc.Indexes[0], colMap, testValues, true /* includeEmpty */) if len(secondaryIndexEntry) != 1 { t.Fatalf("expected 1 index entry, got %d. got %#v", len(secondaryIndexEntry), secondaryIndexEntry) }