Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
86186: sql: do not distribute queries with subqueries returning OIDs r=yuzefovich a=yuzefovich

If a subquery results in a DOid datum, the datum will get a type
annotation (because DOids are ambiguous) when serializing the
render expression involving the result of the subquery. As a
result, we might need to perform a cast on a remote node which
might fail, thus we prohibit the distribution of the main query.

Fixes: #86075.

Release justification: bug fix.

Release note: None

86357: colmem: improve memory accounting when memory limit is exceeded r=yuzefovich a=yuzefovich

**colmem: improve memory accounting when memory limit is exceeded**

This commit improves the memory accounting when memory limit is
exceeded. Previously, in several operators we could run into a situation
where we perform some allocations and run into a memory limit error
later, which results in those allocations being unaccounted for. In some
cases this is acceptable (when the query results in an error), but in
others the memory error is caught and spilling to disk occurs. In the
latter scenarios we would under-account, and this commit fixes most of
such situations.

Now, each disk-spilling operator instantiates a "limited" allocator that
will grow an unlimited memory account when a memory error is
encountered. The idea is that even though the denied allocations cannot
be registered with the main memory account (meaning the operator has
exceeded its memory limit), we still will draw from the
`--max-sql-memory` pool since the allocations can be live for
non-trivial amount of time. If an error occurs when growing the
unlimited memory account, then that error is returned (not the original
memory error) so that the disk spiller doesn't catch it.

This commit audits all operators in `execplan` to use the limited
allocator where appropriate. The new accounting method is only used in
a handful of places which cover most of the use cases. The goal is to
make this commit backportable whereas the follow-up commit will audit
usages of `AdjustMemoryUsage` and will not be backported.

Addresses: #64906.
Fixes: #86351.
Addresses: https://github.com/cockroachlabs/support/issues/1762.

Release justification: bug fix.

Release note: None

**colmem: audit callers of AdjustMemoryUsage**

This commit audits all callers of `Allocator.AdjustMemoryUsage` to use
the newly-exported `AdjustMemoryUsageAfterAllocation` where applicable
(meaning that if an allocation occurs before the method is called, then
the new method is now used). In many cases this won't result in a change
in the behavior since the allocators are not instantiated with limited
memory accounts, but in some cases it is still useful.

Release justification: bug fix.

Release note: None

86402: externalconn,amazon: support s3 KMS in External Connecetions r=benbardin a=adityamaru

Informs: #84228

Release note (sql change): Users can now
`CREATE EXTERNAL CONNECTION` to represent an `aws-kms`
scheme that represents an AWS KMS resource.

Release justification: low risk change to new functionality to register s3 KMS as a supported External Connection

86613: streamproducer: check the job type for replication stream r=yuzefovich a=yuzefovich

Previously, we would panic if the job id corresponded to a job type
different from the replication stream job, and this is now fixed.

Fixes: #86508.

Release justification: bug fix.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
3 people committed Aug 23, 2022
5 parents 869d44c + 0643813 + 256223f + c6d945b + 89d6c55 commit cf533b0
Show file tree
Hide file tree
Showing 63 changed files with 830 additions and 238 deletions.
245 changes: 245 additions & 0 deletions pkg/ccl/cloudccl/amazon/s3_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"net/url"
"os"
"strings"
"testing"

"github.com/aws/aws-sdk-go/aws/credentials"
Expand Down Expand Up @@ -182,3 +183,247 @@ func TestS3ExternalConnection(t *testing.T) {
invalidS3URI))
})
}

func TestAWSKMSExternalConnection(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

dir, dirCleanupFn := testutils.TempDir(t)
defer dirCleanupFn()

params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODir = dir

tc := testcluster.StartTestCluster(t, 1, params)
defer tc.Stopper().Stop(context.Background())

tc.WaitForNodeLiveness(t)
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])

// Setup some dummy data.
sqlDB.Exec(t, `CREATE DATABASE foo`)
sqlDB.Exec(t, `USE foo`)
sqlDB.Exec(t, `CREATE TABLE foo (id INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2), (3)`)

createExternalConnection := func(externalConnectionName, uri string) {
sqlDB.Exec(t, fmt.Sprintf(`CREATE EXTERNAL CONNECTION '%s' AS '%s'`, externalConnectionName, uri))
}
backupAndRestoreFromExternalConnection := func(backupExternalConnectionName, kmsExternalConnectionName string) {
backupURI := fmt.Sprintf("external://%s", backupExternalConnectionName)
kmsURI := fmt.Sprintf("external://%s", kmsExternalConnectionName)
sqlDB.Exec(t, fmt.Sprintf(`BACKUP DATABASE foo INTO '%s' WITH kms='%s'`, backupURI, kmsURI))
sqlDB.Exec(t, fmt.Sprintf(`RESTORE DATABASE foo FROM LATEST IN '%s' WITH new_db_name = bar, kms='%s'`, backupURI, kmsURI))
sqlDB.CheckQueryResults(t, `SELECT * FROM bar.foo`, [][]string{{"1"}, {"2"}, {"3"}})
sqlDB.CheckQueryResults(t, `SELECT * FROM crdb_internal.invalid_objects`, [][]string{})
sqlDB.Exec(t, `DROP DATABASE bar CASCADE`)
}

// If environment credentials are not present, we want to
// skip all AWS KMS tests, including auth-implicit, even though
// it is not used in auth-implicit.
_, err := credentials.NewEnvCredentials().Get()
if err != nil {
skip.IgnoreLint(t, "Test only works with AWS credentials")
}

q := make(url.Values)
expect := map[string]string{
"AWS_ACCESS_KEY_ID": amazon.AWSAccessKeyParam,
"AWS_SECRET_ACCESS_KEY": amazon.AWSSecretParam,
}
for env, param := range expect {
v := os.Getenv(env)
if v == "" {
skip.IgnoreLintf(t, "%s env var must be set", env)
}
q.Add(param, v)
}
// Get AWS KMS region from env variable.
kmsRegion := os.Getenv("AWS_KMS_REGION")
if kmsRegion == "" {
skip.IgnoreLint(t, "AWS_KMS_REGION env var must be set")
}
q.Add(amazon.KMSRegionParam, kmsRegion)

// Get AWS Key identifier from env variable.
keyID := os.Getenv("AWS_KMS_KEY_ARN")
if keyID == "" {
skip.IgnoreLint(t, "AWS_KMS_KEY_ARN env var must be set")
}

bucket := os.Getenv("AWS_S3_BUCKET")
if bucket == "" {
skip.IgnoreLint(t, "AWS_S3_BUCKET env var must be set")
}

// Create an external connection where we will write the backup.
backupURI := fmt.Sprintf("s3://%s/backup?%s=%s", bucket,
cloud.AuthParam, cloud.AuthParamImplicit)
backupExternalConnectionName := "backup"
createExternalConnection(backupExternalConnectionName, backupURI)

t.Run("auth-implicit", func(t *testing.T) {
// Set the AUTH to implicit.
params := make(url.Values)
params.Add(cloud.AuthParam, cloud.AuthParamImplicit)
params.Add(amazon.KMSRegionParam, kmsRegion)

kmsURI := fmt.Sprintf("aws-kms:///%s?%s", keyID, params.Encode())
createExternalConnection("auth-implicit-kms", kmsURI)
backupAndRestoreFromExternalConnection(backupExternalConnectionName,
"auth-implicit-kms")
})

t.Run("auth-specified", func(t *testing.T) {
kmsURI := fmt.Sprintf("aws-kms:///%s?%s", keyID, q.Encode())
createExternalConnection("auth-specified-kms", kmsURI)
backupAndRestoreFromExternalConnection(backupExternalConnectionName,
"auth-specified-kms")
})

t.Run("kms-uses-incorrect-external-connection-type", func(t *testing.T) {
// Point the KMS to the External Connection object that represents an
// ExternalStorage. This should be disallowed.
backupExternalConnectionURI := fmt.Sprintf("external://%s", backupExternalConnectionName)
sqlDB.ExpectErr(t,
"KMS cannot use object of type STORAGE",
fmt.Sprintf(`BACKUP DATABASE foo INTO '%s' WITH kms='%s'`,
backupExternalConnectionURI, backupExternalConnectionURI))
})
}

func TestAWSKMSExternalConnectionAssumeRole(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

dir, dirCleanupFn := testutils.TempDir(t)
defer dirCleanupFn()

params := base.TestClusterArgs{}
params.ServerArgs.ExternalIODir = dir

tc := testcluster.StartTestCluster(t, 1, params)
defer tc.Stopper().Stop(context.Background())

tc.WaitForNodeLiveness(t)
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])

// Setup some dummy data.
sqlDB.Exec(t, `CREATE DATABASE foo`)
sqlDB.Exec(t, `USE foo`)
sqlDB.Exec(t, `CREATE TABLE foo (id INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2), (3)`)

createExternalConnection := func(externalConnectionName, uri string) {
sqlDB.Exec(t, fmt.Sprintf(`CREATE EXTERNAL CONNECTION '%s' AS '%s'`, externalConnectionName, uri))
}
disallowedCreateExternalConnection := func(externalConnectionName, uri string) {
sqlDB.ExpectErr(t, "(PermissionDenied|AccessDenied|PERMISSION_DENIED)",
fmt.Sprintf(`CREATE EXTERNAL CONNECTION '%s' AS '%s'`, externalConnectionName, uri))
}
backupAndRestoreFromExternalConnection := func(backupExternalConnectionName, kmsExternalConnectionName string) {
backupURI := fmt.Sprintf("external://%s", backupExternalConnectionName)
kmsURI := fmt.Sprintf("external://%s", kmsExternalConnectionName)
sqlDB.Exec(t, fmt.Sprintf(`BACKUP DATABASE foo INTO '%s' WITH kms='%s'`, backupURI, kmsURI))
sqlDB.Exec(t, fmt.Sprintf(`RESTORE DATABASE foo FROM LATEST IN '%s' WITH new_db_name = bar, kms='%s'`, backupURI, kmsURI))
sqlDB.CheckQueryResults(t, `SELECT * FROM bar.foo`, [][]string{{"1"}, {"2"}, {"3"}})
sqlDB.CheckQueryResults(t, `SELECT * FROM crdb_internal.invalid_objects`, [][]string{})
sqlDB.Exec(t, `DROP DATABASE bar CASCADE`)
}

// If environment credentials are not present, we want to
// skip all AWS KMS tests, including auth-implicit, even though
// it is not used in auth-implicit.
_, err := credentials.NewEnvCredentials().Get()
if err != nil {
skip.IgnoreLint(t, "Test only works with AWS credentials")
}

q := make(url.Values)
expect := map[string]string{
"AWS_ACCESS_KEY_ID": amazon.AWSAccessKeyParam,
"AWS_SECRET_ACCESS_KEY": amazon.AWSSecretParam,
"AWS_ASSUME_ROLE": amazon.AssumeRoleParam,
}
for env, param := range expect {
v := os.Getenv(env)
if v == "" {
skip.IgnoreLintf(t, "%s env var must be set", env)
}
q.Add(param, v)
}
// Get AWS KMS region from env variable.
kmsRegion := os.Getenv("AWS_KMS_REGION")
if kmsRegion == "" {
skip.IgnoreLint(t, "AWS_KMS_REGION env var must be set")
}
q.Add(amazon.KMSRegionParam, kmsRegion)
q.Set(cloud.AuthParam, cloud.AuthParamSpecified)

// Get AWS Key identifier from env variable.
keyID := os.Getenv("AWS_KMS_KEY_ARN")
if keyID == "" {
skip.IgnoreLint(t, "AWS_KMS_KEY_ARN env var must be set")
}

bucket := os.Getenv("AWS_S3_BUCKET")
if bucket == "" {
skip.IgnoreLint(t, "AWS_S3_BUCKET env var must be set")
}

// Create an external connection where we will write the backup.
backupURI := fmt.Sprintf("s3://%s/backup?%s=%s", bucket,
cloud.AuthParam, cloud.AuthParamImplicit)
backupExternalConnectionName := "backup"
createExternalConnection(backupExternalConnectionName, backupURI)

t.Run("auth-assume-role-implicit", func(t *testing.T) {
// You can create an IAM that can access AWS KMS
// in the AWS console, then set it up locally.
// https://docs.aws.com/cli/latest/userguide/cli-configure-role.html
// We only run this test if default role exists.
credentialsProvider := credentials.SharedCredentialsProvider{}
_, err := credentialsProvider.Retrieve()
if err != nil {
skip.IgnoreLint(t, err)
}

// Create params for implicit user.
params := make(url.Values)
params.Add(cloud.AuthParam, cloud.AuthParamImplicit)
params.Add(amazon.AssumeRoleParam, q.Get(amazon.AssumeRoleParam))
params.Add(amazon.KMSRegionParam, kmsRegion)

uri := fmt.Sprintf("aws-kms:///%s?%s", keyID, params.Encode())
createExternalConnection("auth-assume-role-implicit", uri)
backupAndRestoreFromExternalConnection(backupExternalConnectionName, "auth-assume-role-implicit")
})

t.Run("auth-assume-role-specified", func(t *testing.T) {
uri := fmt.Sprintf("aws-kms:///%s?%s", keyID, q.Encode())
createExternalConnection("auth-assume-role-specified", uri)
backupAndRestoreFromExternalConnection(backupExternalConnectionName, "auth-assume-role-specified")
})

t.Run("auth-assume-role-chaining", func(t *testing.T) {
roleChainStr := os.Getenv("AWS_ROLE_ARN_CHAIN")
roleChain := strings.Split(roleChainStr, ",")

// First verify that none of the individual roles in the chain can be used
// to access the KMS.
for i, role := range roleChain {
i := i
q.Set(amazon.AssumeRoleParam, role)
disallowedKMSURI := fmt.Sprintf("aws-kms:///%s?%s", keyID, q.Encode())
disallowedECName := fmt.Sprintf("auth-assume-role-chaining-disallowed-%d", i)
disallowedCreateExternalConnection(disallowedECName, disallowedKMSURI)
}

// Finally, check that the chain of roles can be used to access the KMS.
q.Set(amazon.AssumeRoleParam, roleChainStr)
uri := fmt.Sprintf("aws-kms:///%s?%s", keyID, q.Encode())
createExternalConnection("auth-assume-role-chaining", uri)
backupAndRestoreFromExternalConnection(backupExternalConnectionName, "auth-assume-role-chaining")
})
}
3 changes: 3 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/crdb_internal
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,6 @@ SELECT crdb_internal.stream_ingestion_stats_json(unique_rowid());

query error pq: crdb_internal\.stream_ingestion_stats_json\(\): job.*is not a stream ingestion job
SELECT crdb_internal.stream_ingestion_stats_json(id) FROM (SELECT id FROM system.jobs LIMIT 1);

query error pq: crdb_internal\.replication_stream_spec\(\): job.*is not a replication stream job
SELECT crdb_internal.replication_stream_spec(crdb_internal.create_sql_schema_telemetry_job())
6 changes: 5 additions & 1 deletion pkg/ccl/streamingccl/streamproducer/stream_lifetime.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,11 @@ func getReplicationStreamSpec(
planCtx := dsp.NewPlanningCtx(evalCtx.Ctx(), jobExecCtx.ExtendedEvalContext(),
nil /* planner */, noTxn, sql.DistributionTypeSystemTenantOnly)

replicatedSpans := j.Details().(jobspb.StreamReplicationDetails).Spans
details, ok := j.Details().(jobspb.StreamReplicationDetails)
if !ok {
return nil, errors.Errorf("job with id %d is not a replication stream job", streamID)
}
replicatedSpans := details.Spans
spans := make([]roachpb.Span, 0, len(replicatedSpans))
for _, span := range replicatedSpans {
spans = append(spans, *span)
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/amazon/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
name = "amazon",
srcs = [
"aws_kms.go",
"aws_kms_connection.go",
"s3_connection.go",
"s3_storage.go",
],
Expand Down
37 changes: 25 additions & 12 deletions pkg/cloud/amazon/aws_kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import (
"github.com/cockroachdb/errors"
)

const awsScheme = "aws"
const (
awsScheme = "aws"
awsKMSScheme = "aws-kms"
)

type awsKMS struct {
kms *kms.KMS
Expand All @@ -34,7 +37,7 @@ type awsKMS struct {
var _ cloud.KMS = &awsKMS{}

func init() {
cloud.RegisterKMSFromURIFactory(MakeAWSKMS, awsScheme)
cloud.RegisterKMSFromURIFactory(MakeAWSKMS, awsScheme, awsKMSScheme)
}

type kmsURIParams struct {
Expand All @@ -48,19 +51,25 @@ type kmsURIParams struct {
delegateRoleARNs []string
}

func resolveKMSURIParams(kmsURI url.URL) kmsURIParams {
assumeRole, delegateRoles := cloud.ParseRoleString(kmsURI.Query().Get(AssumeRoleParam))
func resolveKMSURIParams(kmsURI cloud.ConsumeURL) (kmsURIParams, error) {
assumeRole, delegateRoles := cloud.ParseRoleString(kmsURI.ConsumeParam(AssumeRoleParam))
params := kmsURIParams{
accessKey: kmsURI.Query().Get(AWSAccessKeyParam),
secret: kmsURI.Query().Get(AWSSecretParam),
tempToken: kmsURI.Query().Get(AWSTempTokenParam),
endpoint: kmsURI.Query().Get(AWSEndpointParam),
region: kmsURI.Query().Get(KMSRegionParam),
auth: kmsURI.Query().Get(cloud.AuthParam),
accessKey: kmsURI.ConsumeParam(AWSAccessKeyParam),
secret: kmsURI.ConsumeParam(AWSSecretParam),
tempToken: kmsURI.ConsumeParam(AWSTempTokenParam),
endpoint: kmsURI.ConsumeParam(AWSEndpointParam),
region: kmsURI.ConsumeParam(KMSRegionParam),
auth: kmsURI.ConsumeParam(cloud.AuthParam),
roleARN: assumeRole,
delegateRoleARNs: delegateRoles,
}

// Validate that all the passed in parameters are supported.
if unknownParams := kmsURI.RemainingQueryParams(); len(unknownParams) > 0 {
return kmsURIParams{}, errors.Errorf(
`unknown KMS query parameters: %s`, strings.Join(unknownParams, ", "))
}

// AWS secrets often contain + characters, which must be escaped when
// included in a query string; otherwise, they represent a space character.
// More than a few users have been bitten by this.
Expand All @@ -69,7 +78,7 @@ func resolveKMSURIParams(kmsURI url.URL) kmsURIParams {
// contain spaces. We can convert any space characters we see to +
// characters to recover the original secret.
params.secret = strings.Replace(params.secret, " ", "+", -1)
return params
return params, nil
}

// MakeAWSKMS is the factory method which returns a configured, ready-to-use
Expand All @@ -84,7 +93,11 @@ func MakeAWSKMS(ctx context.Context, uri string, env cloud.KMSEnv) (cloud.KMS, e
}

// Extract the URI parameters required to setup the AWS KMS session.
kmsURIParams := resolveKMSURIParams(*kmsURI)
kmsConsumeURL := cloud.ConsumeURL{URL: kmsURI}
kmsURIParams, err := resolveKMSURIParams(kmsConsumeURL)
if err != nil {
return nil, err
}
region := kmsURIParams.region
awsConfig := &aws.Config{
Credentials: credentials.NewStaticCredentials(kmsURIParams.accessKey,
Expand Down
Loading

0 comments on commit cf533b0

Please sign in to comment.