Skip to content

Commit

Permalink
opt: add first half of upsert FK checks
Browse files Browse the repository at this point in the history
An Upsert can be decomposed into an insert half and an update half, so
we must emit the checks for both components. This commit introduces
checks for the insert half.

Release note (sql change): foreign key checks for insertions performed
by upserts are now handled by the optimizer.
  • Loading branch information
Justin Jaffray committed Feb 3, 2020
1 parent 8737691 commit 96447c8
Show file tree
Hide file tree
Showing 12 changed files with 1,370 additions and 361 deletions.
63 changes: 63 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/fk_opt
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,69 @@ DROP TABLE child2
statement ok
DROP TABLE parent2

# Upsert
# ------

statement ok
CREATE TABLE parent (p INT PRIMARY KEY, other INT)

statement ok
CREATE TABLE child (c INT PRIMARY KEY, p INT NOT NULL REFERENCES parent(p))

statement ok
INSERT INTO parent VALUES (1), (2)

# Insert case.
statement ok
INSERT INTO child VALUES (1, 1) ON CONFLICT (c) DO UPDATE SET p = 2

statement error foreign key
INSERT INTO child VALUES (2, 10) ON CONFLICT (c) DO UPDATE SET p = 2

# Update case.
statement ok
INSERT INTO child VALUES (1, 1) ON CONFLICT (c) DO UPDATE SET p = 1

statement ok
INSERT INTO child VALUES (1, 10) ON CONFLICT (c) DO UPDATE SET p = 1

statement error foreign key
INSERT INTO child VALUES (1, 10) ON CONFLICT (c) DO UPDATE SET p = 10

statement ok
TRUNCATE child

statement ok
INSERT INTO child VALUES (1, 1)

# Both insert and update case.

# Both insert and update are invalid.

statement error foreign key
INSERT INTO child VALUES (1, 1), (2, 3) ON CONFLICT (c) DO UPDATE SET p = 3

# Insert is invalid, update is valid.

statement error foreign key
INSERT INTO child VALUES (1, 2), (2, 3) ON CONFLICT (c) DO UPDATE SET p = 1

# Insert is valid, update is invalid.

statement error foreign key
INSERT INTO child VALUES (1, 2), (2, 1) ON CONFLICT (c) DO UPDATE SET p = 3

# Both insert and update are valid.

statement ok
INSERT INTO child VALUES (1, 2), (2, 1) ON CONFLICT (c) DO UPDATE SET p = 2

statement ok
DROP TABLE child

statement ok
DROP TABLE parent

# --- Tests that follow are copied from the fk tests and adjusted as needed.

statement ok
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opt/bench/stub_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ func (f *stubFactory) ConstructUpsert(
returnCols exec.ColumnOrdinalSet,
checks exec.CheckOrdinalSet,
allowAutoCommit bool,
skipFKChecks bool,
) (exec.Node, error) {
return struct{}{}, nil
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt/exec/execbuilder/mutation.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ func (b *Builder) buildUpsert(ups *memo.UpsertExpr) (execPlan, error) {
updateColOrds := ordinalSetFromColList(ups.UpdateCols)
returnColOrds := ordinalSetFromColList(ups.ReturnCols)
checkOrds := ordinalSetFromColList(ups.CheckCols)
disableExecFKs := !ups.FKFallback
node, err := b.factory.ConstructUpsert(
input.root,
tab,
Expand All @@ -375,11 +376,16 @@ func (b *Builder) buildUpsert(ups *memo.UpsertExpr) (execPlan, error) {
returnColOrds,
checkOrds,
b.allowAutoCommit && len(ups.Checks) == 0,
disableExecFKs,
)
if err != nil {
return execPlan{}, err
}

if err := b.buildFKChecks(ups.Checks); err != nil {
return execPlan{}, err
}

// If UPSERT returns rows, they contain all non-mutation columns from the
// table, in the same order they're defined in the table. Each output column
// value is taken from an insert, fetch, or update column, depending on the
Expand Down
10 changes: 8 additions & 2 deletions pkg/sql/opt/exec/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,11 @@ type Factory interface {
// transaction (if appropriate, i.e. if it is in an implicit transaction).
// This is false if there are multiple mutations in a statement, or the output
// of the mutation is processed through side-effecting expressions.
//
// If skipFKChecks is set, foreign keys are not checked as part of the
// execution of the upsert for the insert half. This is used when the FK
// checks are planned by the optimizer and are run separately as plan
// postqueries.
ConstructUpsert(
input Node,
table cat.Table,
Expand All @@ -431,6 +436,7 @@ type Factory interface {
returnCols ColumnOrdinalSet,
checks CheckOrdinalSet,
allowAutoCommit bool,
skipFKChecks bool,
) (Node, error)

// ConstructDelete creates a node that implements a DELETE statement. The
Expand All @@ -447,8 +453,8 @@ type Factory interface {
// of the mutation is processed through side-effecting expressions.
//
// If skipFKChecks is set, foreign keys are not checked as part of the
// execution of the insertion. This is used when the FK checks are planned by
// the optimizer and are run separately as plan postqueries.
// execution of the delete. This is used when the FK checks are planned
// by the optimizer and are run separately as plan postqueries.
ConstructDelete(
input Node,
table cat.Table,
Expand Down
102 changes: 77 additions & 25 deletions pkg/sql/opt/optbuilder/mutation_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ func (mb *mutationBuilder) buildFKChecksForInsert() {
}

for i, n := 0, mb.tab.OutboundForeignKeyCount(); i < n; i++ {
mb.addInsertionCheck(i, insertCols)
mb.addInsertionCheck(i, mb.insertOrds, checkAllRows)
}
}

Expand Down Expand Up @@ -1019,7 +1019,7 @@ func (mb *mutationBuilder) buildFKChecksForUpdate() {
continue
}

mb.addInsertionCheck(i, newRowColIDs)
mb.addInsertionCheck(i, newRowCols, checkAllRows)
}

// The "deletion" incurred by an update is the rows deleted for a given
Expand All @@ -1046,8 +1046,8 @@ func (mb *mutationBuilder) buildFKChecksForUpdate() {
continue
}

oldRows, colsForOldRow := mb.projectOrdinals(oldRowFKColOrdinals)
newRows, colsForNewRow := mb.projectOrdinals(newRowFKColOrdinals)
oldRows, colsForOldRow, _ := mb.projectInput(oldRowFKColOrdinals, false /* includeCanary */)
newRows, colsForNewRow, _ := mb.projectInput(newRowFKColOrdinals, false /* includeCanary */)

// The rows that no longer exist are the ones that were "deleted" by virtue
// of being updated _from_, minus the ones that were "added" by virtue of
Expand Down Expand Up @@ -1099,15 +1099,58 @@ func (mb *mutationBuilder) buildFKChecksForUpsert() {
mb.fkFallback = true
return
}
// TODO(justin): not implemented yet.
mb.fkFallback = true

mb.withID = mb.b.factory.Memo().NextWithID()

// Add the check for the insertion portion of the the upsert.
for i, n := 0, mb.tab.OutboundForeignKeyCount(); i < n; i++ {
mb.addInsertionCheck(i, mb.insertOrds, checkInsertedRows)
}

// TODO(justin): include checks for the set of updated rows.
}

// rowInclusion determines the mode in which an insertion check should be
// constructed.
type rowInclusion int

const (
checkAllRows rowInclusion = iota
checkInsertedRows
)

// addInsertionCheck adds a FK check for rows which are added to a table.
// The input to the insertion check will be the input to the mutation operator.
// insertCols is a list of the columns for the rows being inserted, indexed by
// insertOrds is a list of the columns for the rows being inserted, indexed by
// their ordinal position in the table.
func (mb *mutationBuilder) addInsertionCheck(fkOrdinal int, insertCols opt.ColList) {
// rows is either checkAllRows or checkInsertedRows, and denotes whether we
// should add a filter for the canary column. It should only be
// checkInsertedRows if we are performing an UPSERT.
func (mb *mutationBuilder) addInsertionCheck(
fkOrdinal int, insertOrds []scopeOrdinal, rows rowInclusion,
) {
inputExpr, insertCols, canaryID := mb.projectInput(insertOrds, rows == checkInsertedRows)

// If we were an upsert, then we have to filter out the rows that were the ones actually inserted.
if rows == checkInsertedRows {
// We need to add a project on top of this so that the we remove the canary column.
inputExpr = mb.b.factory.ConstructProject(
mb.b.factory.ConstructSelect(
inputExpr,
memo.FiltersExpr{
mb.b.factory.ConstructFiltersItem(
mb.b.factory.ConstructIs(
mb.b.factory.ConstructVariable(canaryID),
memo.NullSingleton,
),
),
},
),
memo.ProjectionsExpr{},
insertCols.ToSet(),
)
}

fk := mb.tab.OutboundForeignKey(fkOrdinal)
numCols := fk.ColumnCount()
var notNullInputCols opt.ColSet
Expand All @@ -1125,7 +1168,7 @@ func (mb *mutationBuilder) addInsertionCheck(fkOrdinal int, insertCols opt.ColLi
// If a table column is not nullable, NULLs cannot be inserted (the
// mutation will fail). So for the purposes of FK checks, we can treat
// these columns as not null.
if mb.outScope.expr.Relational().NotNullCols.Contains(inputColID) || !mb.tab.Column(ord).IsNullable() {
if inputExpr.Relational().NotNullCols.Contains(inputColID) || !mb.tab.Column(ord).IsNullable() {
notNullInputCols.Add(inputColID)
}
}
Expand Down Expand Up @@ -1167,8 +1210,9 @@ func (mb *mutationBuilder) addInsertionCheck(fkOrdinal int, insertCols opt.ColLi
mb.b.allocScope(),
)

left, withScanCols := mb.makeFKInputScan(insertedFKCols)
private.KeyCols = withScanCols
inputExpr = mb.b.factory.ConstructProject(inputExpr, memo.ProjectionsExpr{}, insertedFKCols.ToSet())

private.KeyCols = insertedFKCols
if notNullInputCols.Len() < numCols {
// The columns we are inserting might have NULLs. These require special
// handling, depending on the match method:
Expand Down Expand Up @@ -1196,13 +1240,13 @@ func (mb *mutationBuilder) addInsertionCheck(fkOrdinal int, insertCols opt.ColLi
if !notNullInputCols.Contains(insertedFKCols[i]) {
filters = append(filters, mb.b.factory.ConstructFiltersItem(
mb.b.factory.ConstructIsNot(
mb.b.factory.ConstructVariable(withScanCols[i]),
mb.b.factory.ConstructVariable(insertedFKCols[i]),
memo.NullSingleton,
),
))
}
}
left = mb.b.factory.ConstructSelect(left, filters)
inputExpr = mb.b.factory.ConstructSelect(inputExpr, filters)

case tree.MatchFull:
// Filter out any rows which have NULLs on all referencing columns.
Expand All @@ -1215,7 +1259,7 @@ func (mb *mutationBuilder) addInsertionCheck(fkOrdinal int, insertCols opt.ColLi
// Build a filter of the form
// (a IS NOT NULL) OR (b IS NOT NULL) ...
var condition opt.ScalarExpr
for _, col := range withScanCols {
for _, col := range insertedFKCols {
is := mb.b.factory.ConstructIsNot(
mb.b.factory.ConstructVariable(col),
memo.NullSingleton,
Expand All @@ -1226,8 +1270,8 @@ func (mb *mutationBuilder) addInsertionCheck(fkOrdinal int, insertCols opt.ColLi
condition = mb.b.factory.ConstructOr(condition, is)
}
}
left = mb.b.factory.ConstructSelect(
left,
inputExpr = mb.b.factory.ConstructSelect(
inputExpr,
memo.FiltersExpr{mb.b.factory.ConstructFiltersItem(condition)},
)

Expand All @@ -1242,15 +1286,15 @@ func (mb *mutationBuilder) addInsertionCheck(fkOrdinal int, insertCols opt.ColLi
for j := 0; j < numCols; j++ {
antiJoinFilters[j] = mb.b.factory.ConstructFiltersItem(
mb.b.factory.ConstructEq(
mb.b.factory.ConstructVariable(withScanCols[j]),
mb.b.factory.ConstructVariable(insertedFKCols[j]),
mb.b.factory.ConstructVariable(scanScope.cols[j].id),
),
)
}

mb.checks = append(mb.checks, mb.b.factory.ConstructFKChecksItem(
mb.b.factory.ConstructAntiJoin(
left, scanScope.expr, antiJoinFilters, &memo.JoinPrivate{},
inputExpr, scanScope.expr, antiJoinFilters, &memo.JoinPrivate{},
),
&private,
))
Expand Down Expand Up @@ -1338,28 +1382,36 @@ func (mb *mutationBuilder) addDeletionCheck(
return true
}

// projectOrdinals returns a WithScan that returns each row in the input to the
// projectInput returns a WithScan that returns each row in the input to the
// mutation operator, projecting only the column ordinals provided, mapping them
// to new column ids. Returns the new expression along with the column ids
// they've been mapped to.
func (mb *mutationBuilder) projectOrdinals(
ords []scopeOrdinal,
) (_ memo.RelExpr, outCols opt.ColList) {
outCols = make(opt.ColList, len(ords))
inCols := make(opt.ColList, len(ords))
// If includeCanary is set, the resulting expression will also render the canary
// column and its id will be returned. Should only be set if this is an UPSERT.
func (mb *mutationBuilder) projectInput(
ords []scopeOrdinal, includeCanary bool,
) (_ memo.RelExpr, outCols opt.ColList, canaryOutID opt.ColumnID) {
outCols = make(opt.ColList, len(ords), len(ords)+1)
inCols := make(opt.ColList, len(ords), len(ords)+1)
for i := 0; i < len(ords); i++ {
c := mb.b.factory.Metadata().ColumnMeta(mb.outScope.cols[ords[i]].id)
outCols[i] = mb.md.AddColumn(c.Alias, c.Type)
inCols[i] = mb.outScope.cols[ords[i]].id
}
if includeCanary {
c := mb.md.ColumnMeta(mb.canaryColID)
canaryOutID = mb.md.AddColumn(c.Alias, c.Type)
outCols = append(outCols, canaryOutID)
inCols = append(inCols, mb.canaryColID)
}
out := mb.b.factory.ConstructWithScan(&memo.WithScanPrivate{
With: mb.withID,
InCols: inCols,
OutCols: outCols,
BindingProps: mb.outScope.expr.Relational(),
ID: mb.b.factory.Metadata().NextUniqueID(),
})
return out, outCols
return out, outCols, canaryOutID
}

// makeFKInputScan constructs a WithScan that iterates over the input to the
Expand Down
Loading

0 comments on commit 96447c8

Please sign in to comment.