From 0202227e8381688d7b74d2855cee2725011afcad Mon Sep 17 00:00:00 2001 From: Jackson Owens Date: Fri, 16 Jul 2021 14:55:02 -0400 Subject: [PATCH 1/2] storage: refactor interfaces for constructing new engines Refactor several of the various methods for constructing Engines into a single Open method that supports configuration through the functional options pattern. This removes some boilerplate configuration. An eventual goal of this refactor is to tighten the storage package interface to a narrower, more controlled interface. Release note: None --- pkg/ccl/importccl/BUILD.bazel | 1 - pkg/ccl/importccl/bench_test.go | 13 +- pkg/ccl/storageccl/engineccl/bench_test.go | 12 +- pkg/ccl/storageccl/export_test.go | 10 +- pkg/cli/BUILD.bazel | 2 - pkg/cli/debug.go | 45 ++--- pkg/cli/debug_check_store_test.go | 10 +- pkg/cli/debug_test.go | 16 +- pkg/cli/syncbench/BUILD.bazel | 1 - pkg/cli/syncbench/syncbench.go | 18 +- pkg/kv/kvserver/BUILD.bazel | 1 - .../batcheval/cmd_add_sstable_test.go | 7 +- .../batcheval/cmd_revert_range_test.go | 19 +- pkg/kv/kvserver/consistency_queue_test.go | 18 +- pkg/kv/kvserver/raft_log_queue_test.go | 10 +- pkg/kv/kvserver/replica_sideload_test.go | 11 +- pkg/kv/kvserver/replica_test.go | 10 +- pkg/kv/kvserver/store_snapshot.go | 15 +- pkg/server/config.go | 11 +- pkg/server/settings_cache_test.go | 7 +- pkg/server/sticky_engine.go | 7 +- pkg/sql/colflow/vectorized_flow_test.go | 2 +- pkg/storage/BUILD.bazel | 1 + pkg/storage/batch_test.go | 9 +- pkg/storage/bench_pebble_test.go | 38 +--- pkg/storage/bench_test.go | 12 +- pkg/storage/engine.go | 19 -- pkg/storage/engine_test.go | 56 ++---- pkg/storage/in_mem.go | 49 ++--- pkg/storage/metamorphic/generator.go | 13 +- pkg/storage/multi_iterator_test.go | 11 +- pkg/storage/mvcc_history_test.go | 5 +- pkg/storage/mvcc_incremental_iterator_test.go | 6 +- pkg/storage/mvcc_test.go | 11 -- pkg/storage/open.go | 168 ++++++++++++++++++ pkg/storage/pebble.go | 30 ---- pkg/storage/pebble_mvcc_scanner_test.go | 8 +- pkg/storage/pebble_test.go | 12 +- pkg/storage/temp_engine.go | 59 ++---- pkg/testutils/colcontainerutils/BUILD.bazel | 1 - .../colcontainerutils/diskqueuecfg.go | 8 +- .../localtestcluster/local_test_cluster.go | 15 +- 42 files changed, 370 insertions(+), 407 deletions(-) create mode 100644 pkg/storage/open.go diff --git a/pkg/ccl/importccl/BUILD.bazel b/pkg/ccl/importccl/BUILD.bazel index 9305d296bf9f..a0345c13abde 100644 --- a/pkg/ccl/importccl/BUILD.bazel +++ b/pkg/ccl/importccl/BUILD.bazel @@ -190,7 +190,6 @@ go_test( "//pkg/workload/workloadsql", "@com_github_cockroachdb_cockroach_go//crdb", "@com_github_cockroachdb_errors//:errors", - "@com_github_cockroachdb_pebble//:pebble", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_gogo_protobuf//proto", "@com_github_jackc_pgx//:pgx", diff --git a/pkg/ccl/importccl/bench_test.go b/pkg/ccl/importccl/bench_test.go index 8b38eb9fa404..e963ce35cbe1 100644 --- a/pkg/ccl/importccl/bench_test.go +++ b/pkg/ccl/importccl/bench_test.go @@ -33,7 +33,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/cockroach/pkg/workload/tpcc" - "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" ) @@ -92,18 +91,14 @@ func benchmarkWriteAndLink(b *testing.B, dir string, tables []tableSSTable) { b.SetBytes(bytes) ctx := context.Background() - cache := pebble.NewCache(server.DefaultCacheSize) - defer cache.Unref() b.ResetTimer() for i := 0; i < b.N; i++ { b.StopTimer() - cfg := storage.PebbleConfig{ - StorageConfig: base.StorageConfig{ - Dir: filepath.Join(dir, `pebble`, timeutil.Now().String())}} - cfg.Opts = storage.DefaultPebbleOptions() - cfg.Opts.Cache = cache - db, err := storage.NewPebble(context.Background(), cfg) + db, err := storage.Open( + context.Background(), + storage.Filesystem(filepath.Join(dir, `pebble`, timeutil.Now().String())), + storage.CacheSize(server.DefaultCacheSize)) if err != nil { b.Fatal(err) } diff --git a/pkg/ccl/storageccl/engineccl/bench_test.go b/pkg/ccl/storageccl/engineccl/bench_test.go index c95540b25b3e..af76e0f51033 100644 --- a/pkg/ccl/storageccl/engineccl/bench_test.go +++ b/pkg/ccl/storageccl/engineccl/bench_test.go @@ -16,7 +16,6 @@ import ( "path/filepath" "testing" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -53,15 +52,10 @@ func loadTestData( exists = false } - eng, err := storage.NewPebble( + eng, err := storage.Open( ctx, - storage.PebbleConfig{ - StorageConfig: base.StorageConfig{ - Settings: cluster.MakeTestingClusterSettings(), - Dir: dir, - }, - }, - ) + storage.Filesystem(dir), + storage.Settings(cluster.MakeTestingClusterSettings())) if err != nil { return nil, err } diff --git a/pkg/ccl/storageccl/export_test.go b/pkg/ccl/storageccl/export_test.go index 4348b4fd868a..87e400a7a612 100644 --- a/pkg/ccl/storageccl/export_test.go +++ b/pkg/ccl/storageccl/export_test.go @@ -646,12 +646,10 @@ func TestRandomKeyAndTimestampExport(t *testing.T) { mkEngine := func(t *testing.T) (e storage.Engine, cleanup func()) { dir, cleanupDir := testutils.TempDir(t) - e, err := storage.NewDefaultEngine( - 0, - base.StorageConfig{ - Settings: cluster.MakeTestingClusterSettings(), - Dir: dir, - }) + e, err := storage.Open(ctx, + storage.Filesystem(dir), + storage.CacheSize(0), + storage.Settings(cluster.MakeTestingClusterSettings())) if err != nil { t.Fatal(err) } diff --git a/pkg/cli/BUILD.bazel b/pkg/cli/BUILD.bazel index 892c002472f5..7caf597d8dd0 100644 --- a/pkg/cli/BUILD.bazel +++ b/pkg/cli/BUILD.bazel @@ -206,7 +206,6 @@ go_library( "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//oserror", "@com_github_cockroachdb_logtags//:logtags", - "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//tool", "@com_github_cockroachdb_pebble//vfs", "@com_github_cockroachdb_redact//:redact", @@ -356,7 +355,6 @@ go_test( "//pkg/workload/examples", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", - "@com_github_cockroachdb_pebble//:pebble", "@com_github_spf13_cobra//:cobra", "@com_github_spf13_pflag//:pflag", "@com_github_stretchr_testify//assert", diff --git a/pkg/cli/debug.go b/pkg/cli/debug.go index 123e09700aac..3e330863b7e1 100644 --- a/pkg/cli/debug.go +++ b/pkg/cli/debug.go @@ -61,7 +61,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" - "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/tool" "github.com/cockroachdb/pebble/vfs" humanize "github.com/dustin/go-humanize" @@ -131,6 +130,17 @@ type OpenEngineOptions struct { MustExist bool } +func (opts OpenEngineOptions) configOptions() []storage.ConfigOption { + var cfgOpts []storage.ConfigOption + if opts.ReadOnly { + cfgOpts = append(cfgOpts, storage.ReadOnly) + } + if opts.MustExist { + cfgOpts = append(cfgOpts, storage.MustExist) + } + return cfgOpts +} + // OpenExistingStore opens the rocksdb engine rooted at 'dir'. // If 'readOnly' is true, opens the store in read-only mode. func OpenExistingStore(dir string, stopper *stop.Stopper, readOnly bool) (storage.Engine, error) { @@ -144,32 +154,13 @@ func OpenEngine(dir string, stopper *stop.Stopper, opts OpenEngineOptions) (stor if err != nil { return nil, err } - - storageConfig := base.StorageConfig{ - Settings: serverCfg.Settings, - Dir: dir, - MustExist: opts.MustExist, - } - if PopulateRocksDBConfigHook != nil { - if err := PopulateRocksDBConfigHook(&storageConfig); err != nil { - return nil, err - } - } - - var db storage.Engine - - cfg := storage.PebbleConfig{ - StorageConfig: storageConfig, - Opts: storage.DefaultPebbleOptions(), - } - cfg.Opts.Cache = pebble.NewCache(server.DefaultCacheSize) - defer cfg.Opts.Cache.Unref() - - cfg.Opts.MaxOpenFiles = int(maxOpenFiles) - cfg.Opts.ReadOnly = opts.ReadOnly - - db, err = storage.NewPebble(context.Background(), cfg) - + db, err := storage.Open(context.Background(), + storage.Filesystem(dir), + storage.MaxOpenFiles(int(maxOpenFiles)), + storage.CacheSize(server.DefaultCacheSize), + storage.Settings(serverCfg.Settings), + storage.Hook(PopulateRocksDBConfigHook), + storage.CombineOptions(opts.configOptions()...)) if err != nil { return nil, err } diff --git a/pkg/cli/debug_check_store_test.go b/pkg/cli/debug_check_store_test.go index 68b21c80bd82..0f1df93c5498 100644 --- a/pkg/cli/debug_check_store_test.go +++ b/pkg/cli/debug_check_store_test.go @@ -81,12 +81,10 @@ func TestDebugCheckStore(t *testing.T) { // Introduce a stats divergence on s1. func() { - eng, err := storage.NewDefaultEngine( - 10<<20, /* 10mb */ - base.StorageConfig{ - Dir: storePaths[0], - MustExist: true, - }) + eng, err := storage.Open(ctx, + storage.Filesystem(storePaths[0]), + storage.CacheSize(10<<20 /* 10 MiB */), + storage.MustExist) require.NoError(t, err) defer eng.Close() sl := stateloader.Make(1) diff --git a/pkg/cli/debug_test.go b/pkg/cli/debug_test.go index b680922e4226..8ddb779bb9d3 100644 --- a/pkg/cli/debug_test.go +++ b/pkg/cli/debug_test.go @@ -39,22 +39,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" - "github.com/cockroachdb/pebble" ) func createStore(t *testing.T, path string) { t.Helper() - cache := pebble.NewCache(server.DefaultCacheSize) - defer cache.Unref() - cfg := storage.PebbleConfig{ - StorageConfig: base.StorageConfig{ - Dir: path, - MustExist: false, - }, - } - cfg.Opts = storage.DefaultPebbleOptions() - cfg.Opts.Cache = cache - db, err := storage.NewPebble(context.Background(), cfg) + db, err := storage.Open( + context.Background(), + storage.Filesystem(path), + storage.CacheSize(server.DefaultCacheSize)) if err != nil { t.Fatal(err) } diff --git a/pkg/cli/syncbench/BUILD.bazel b/pkg/cli/syncbench/BUILD.bazel index 50cd37746299..641b806c2fc6 100644 --- a/pkg/cli/syncbench/BUILD.bazel +++ b/pkg/cli/syncbench/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/cli/syncbench", visibility = ["//visibility:public"], deps = [ - "//pkg/base", "//pkg/settings/cluster", "//pkg/storage", "//pkg/util/encoding", diff --git a/pkg/cli/syncbench/syncbench.go b/pkg/cli/syncbench/syncbench.go index ca905cc854d2..4a7d09914799 100644 --- a/pkg/cli/syncbench/syncbench.go +++ b/pkg/cli/syncbench/syncbench.go @@ -20,7 +20,6 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -122,10 +121,10 @@ type Options struct { LogOnly bool } -// Run a test of writing synchronously to the RocksDB WAL. +// Run a test of writing synchronously to the Pebble WAL. // -// TODO(tschottdorf): this should receive a RocksDB instance so that the caller -// in cli can use OpenEngine (which in turn allows to use encryption, etc). +// TODO(tschottdorf): this should receive an engine so that the caller in cli +// can use OpenEngine (which in turn allows to use encryption, etc). func Run(opts Options) error { // Check if the directory exists. _, err := os.Stat(opts.Dir) @@ -139,12 +138,11 @@ func Run(opts Options) error { fmt.Printf("writing to %s\n", opts.Dir) - db, err := storage.NewDefaultEngine( - 0, - base.StorageConfig{ - Settings: cluster.MakeTestingClusterSettings(), - Dir: opts.Dir, - }) + db, err := storage.Open( + context.Background(), + storage.Filesystem(opts.Dir), + storage.CacheSize(0), + storage.Settings(cluster.MakeTestingClusterSettings())) if err != nil { return err } diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index c9e8c670a57c..a032b3600b28 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -385,7 +385,6 @@ go_test( "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//oserror", "@com_github_cockroachdb_logtags//:logtags", - "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", "@com_github_google_btree//:btree", diff --git a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go index d60d0a607fd2..4c4b8e658542 100644 --- a/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_add_sstable_test.go @@ -37,16 +37,11 @@ import ( "github.com/kr/pretty" ) -// createTestPebbleEngine returns a new in-memory Pebble storage engine. -func createTestPebbleEngine() storage.Engine { - return storage.NewInMemForTesting(context.Background(), roachpb.Attributes{}, 1<<20) -} - var engineImpls = []struct { name string create func() storage.Engine }{ - {"pebble", createTestPebbleEngine}, + {"pebble", storage.NewDefaultInMemForTesting}, } func singleKVSSTable(key storage.MVCCKey, value []byte) ([]byte, error) { diff --git a/pkg/kv/kvserver/batcheval/cmd_revert_range_test.go b/pkg/kv/kvserver/batcheval/cmd_revert_range_test.go index e5de349d8d5b..77b74559fd43 100644 --- a/pkg/kv/kvserver/batcheval/cmd_revert_range_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_revert_range_test.go @@ -52,22 +52,16 @@ func getStats(t *testing.T, reader storage.Reader) enginepb.MVCCStats { return s } -// createTestRocksDBEngine returns a new in-memory RocksDB engine with 1MB of -// storage capacity. -func createTestRocksDBEngine(ctx context.Context) storage.Engine { - return storage.NewInMemForTesting(ctx, roachpb.Attributes{}, 1<<20) -} - // createTestPebbleEngine returns a new in-memory Pebble storage engine. -func createTestPebbleEngine(ctx context.Context) storage.Engine { - return storage.NewInMemForTesting(ctx, roachpb.Attributes{}, 1<<20) +func createTestPebbleEngine(ctx context.Context) (storage.Engine, error) { + return storage.Open(ctx, storage.InMemory(), + storage.MaxSize(1<<20), storage.SettingsForTesting()) } var engineImpls = []struct { name string - create func(context.Context) storage.Engine + create func(context.Context) (storage.Engine, error) }{ - {"rocksdb", createTestRocksDBEngine}, {"pebble", createTestPebbleEngine}, } @@ -85,7 +79,10 @@ func TestCmdRevertRange(t *testing.T) { // https://github.com/cockroachdb/cockroach/pull/42386 for _, engineImpl := range engineImpls { t.Run(engineImpl.name, func(t *testing.T) { - eng := engineImpl.create(ctx) + eng, err := engineImpl.create(ctx) + if err != nil { + t.Fatal(err) + } defer eng.Close() baseTime := hlc.Timestamp{WallTime: 1000} diff --git a/pkg/kv/kvserver/consistency_queue_test.go b/pkg/kv/kvserver/consistency_queue_test.go index feb0eee19792..ce69a7571efe 100644 --- a/pkg/kv/kvserver/consistency_queue_test.go +++ b/pkg/kv/kvserver/consistency_queue_test.go @@ -39,7 +39,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" - "github.com/cockroachdb/pebble" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -444,7 +443,7 @@ func TestCheckConsistencyInconsistent(t *testing.T) { // VFS to verify its contents. fs, err := stickyEngineRegistry.GetUnderlyingFS(base.StoreSpec{StickyInMemoryEngineID: strconv.FormatInt(int64(i), 10)}) assert.NoError(t, err) - cpEng := storage.InMemFromFS(context.Background(), roachpb.Attributes{}, 1<<20, 0, fs, cps[0], nil) + cpEng := storage.InMemFromFS(context.Background(), fs, cps[0], storage.CacheSize(1<<20)) defer cpEng.Close() iter := cpEng.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: []byte("\xff")}) @@ -577,17 +576,10 @@ func testConsistencyQueueRecomputeStatsImpl(t *testing.T, hadEstimates bool) { const sysCountGarbage = 123000 func() { - cache := pebble.NewCache(1 << 20) - defer cache.Unref() - opts := storage.DefaultPebbleOptions() - opts.Cache = cache - eng, err := storage.NewPebble(ctx, storage.PebbleConfig{ - StorageConfig: base.StorageConfig{ - Dir: path, - MustExist: true, - }, - Opts: opts, - }) + eng, err := storage.Open(ctx, + storage.Filesystem(path), + storage.CacheSize(1<<20 /* 1 MiB */), + storage.MustExist) if err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/raft_log_queue_test.go b/pkg/kv/kvserver/raft_log_queue_test.go index b2e73f860497..fb5ca76e1221 100644 --- a/pkg/kv/kvserver/raft_log_queue_test.go +++ b/pkg/kv/kvserver/raft_log_queue_test.go @@ -19,7 +19,6 @@ import ( "testing" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -33,7 +32,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" - "github.com/cockroachdb/pebble" "github.com/stretchr/testify/assert" "go.etcd.io/etcd/raft/v3" "go.etcd.io/etcd/raft/v3/tracker" @@ -862,11 +860,9 @@ func TestTruncateLogRecompute(t *testing.T) { dir, cleanup := testutils.TempDir(t) defer cleanup() - cache := pebble.NewCache(1 << 20) - defer cache.Unref() - opts := storage.DefaultPebbleOptions() - opts.Cache = cache - eng, err := storage.NewPebble(ctx, storage.PebbleConfig{StorageConfig: base.StorageConfig{Dir: dir}, Opts: opts}) + eng, err := storage.Open(ctx, + storage.Filesystem(dir), + storage.CacheSize(1<<20 /* 1 MiB */)) if err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/replica_sideload_test.go b/pkg/kv/kvserver/replica_sideload_test.go index f362be72cc91..c8476782f393 100644 --- a/pkg/kv/kvserver/replica_sideload_test.go +++ b/pkg/kv/kvserver/replica_sideload_test.go @@ -24,7 +24,6 @@ import ( "strings" "testing" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" @@ -698,12 +697,10 @@ func (mr *mockSender) Recv() (*SnapshotResponse, error) { func newOnDiskEngine(t *testing.T) (func(), storage.Engine) { dir, cleanup := testutils.TempDir(t) - eng, err := storage.NewDefaultEngine( - 1<<20, - base.StorageConfig{ - Dir: dir, - MustExist: false, - }) + eng, err := storage.Open( + context.Background(), + storage.Filesystem(dir), + storage.CacheSize(1<<20 /* 1 MiB */)) if err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index 95cd47df620f..172b65cc631c 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -233,7 +233,15 @@ func (tc *testContext) StartWithStoreConfigAndVersion( tc.gossip = gossip.NewTest(1, rpcContext, server, stopper, metric.NewRegistry(), cfg.DefaultZoneConfig) } if tc.engine == nil { - tc.engine = storage.NewInMemForTesting(context.Background(), roachpb.Attributes{Attrs: []string{"dc1", "mem"}}, 1<<20) + var err error + tc.engine, err = storage.Open(context.Background(), + storage.InMemory(), + storage.Attributes(roachpb.Attributes{Attrs: []string{"dc1", "mem"}}), + storage.MaxSize(1<<20), + storage.SettingsForTesting()) + if err != nil { + t.Fatal(err) + } stopper.AddCloser(tc.engine) } if tc.transport == nil { diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 93703e64aba5..658bee44d708 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -906,13 +906,14 @@ func SendEmptySnapshot( to roachpb.ReplicaDescriptor, ) error { // Create an engine to use as a buffer for the empty snapshot. - eng := storage.NewInMem( + eng, err := storage.Open( context.Background(), - roachpb.Attributes{}, - 1<<20, /* cacheSize 1MiB */ - 512<<20, /* storeSize 512 MiB */ - nil, /* settings */ - ) + storage.InMemory(), + storage.CacheSize(1<<20 /* 1 MiB */), + storage.MaxSize(512<<20 /* 512 MiB */)) + if err != nil { + return err + } defer eng.Close() var ms enginepb.MVCCStats @@ -927,7 +928,7 @@ func SendEmptySnapshot( if st.Version.IsActive(ctx, clusterversion.ReplicaVersions) { replicaVersion = st.Version.ActiveVersionOrEmpty(ctx).Version } - ms, err := stateloader.WriteInitialReplicaState( + ms, err = stateloader.WriteInitialReplicaState( ctx, eng, ms, diff --git a/pkg/server/config.go b/pkg/server/config.go index 6254d6c0c537..ebffbe86fd6f 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -514,7 +514,16 @@ func (cfg *Config) CreateEngines(ctx context.Context) (Engines, error) { } engines = append(engines, e) } else { - engines = append(engines, storage.NewInMem(ctx, spec.Attributes, cfg.CacheSize, sizeInBytes, cfg.Settings)) + e, err := storage.Open(ctx, + storage.InMemory(), + storage.Attributes(spec.Attributes), + storage.CacheSize(cfg.CacheSize), + storage.MaxSize(sizeInBytes), + storage.Settings(cfg.Settings)) + if err != nil { + return Engines{}, err + } + engines = append(engines, e) } } else { if spec.Size.Percent > 0 { diff --git a/pkg/server/settings_cache_test.go b/pkg/server/settings_cache_test.go index 09cf7d7bf3a5..d535484ec5f3 100644 --- a/pkg/server/settings_cache_test.go +++ b/pkg/server/settings_cache_test.go @@ -42,9 +42,10 @@ func TestCachedSettingsStoreAndLoad(t *testing.T) { } ctx := context.Background() - attrs := roachpb.Attributes{} - storeSize := int64(512 << 20) /* 512 MiB */ - engine := storage.NewInMemForTesting(ctx, attrs, storeSize) + engine, err := storage.Open(ctx, storage.InMemory(), + storage.MaxSize(512<<20 /* 512 MiB */), + storage.SettingsForTesting()) + require.NoError(t, err) defer engine.Close() require.NoError(t, storeCachedSettingsKVs(ctx, engine, testSettings)) diff --git a/pkg/server/sticky_engine.go b/pkg/server/sticky_engine.go index d773519bd345..03a4df8420a0 100644 --- a/pkg/server/sticky_engine.go +++ b/pkg/server/sticky_engine.go @@ -108,8 +108,11 @@ func (registry *stickyInMemEnginesRegistryImpl) GetOrCreateStickyInMemEngine( log.Infof(ctx, "creating new sticky in-mem engine %s", spec.StickyInMemoryEngineID) fs := vfs.NewMem() - engine := storage.InMemFromFS( - ctx, spec.Attributes, cfg.CacheSize, spec.Size.InBytes, fs, "", storage.MakeRandomSettingsForSeparatedIntents()) + engine := storage.InMemFromFS(ctx, fs, "", + storage.Attributes(spec.Attributes), + storage.CacheSize(cfg.CacheSize), + storage.MaxSize(spec.Size.InBytes), + storage.SettingsForTesting()) engineEntry := &stickyInMemEngine{ id: spec.StickyInMemoryEngineID, diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index adf74ffc9625..17766b8aa109 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -252,7 +252,7 @@ func TestVectorizedFlowTempDirectory(t *testing.T) { // We use an on-disk engine for this test since we're testing FS interactions // and want to get the same behavior as a non-testing environment. tempPath, dirCleanup := testutils.TempDir(t) - ngn, err := storage.NewDefaultEngine(0 /* cacheSize */, base.StorageConfig{Dir: tempPath}) + ngn, err := storage.Open(ctx, storage.Filesystem(tempPath), storage.CacheSize(0)) require.NoError(t, err) defer ngn.Close() defer dirCleanup() diff --git a/pkg/storage/BUILD.bazel b/pkg/storage/BUILD.bazel index e930953be859..079fe117f5dc 100644 --- a/pkg/storage/BUILD.bazel +++ b/pkg/storage/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "mvcc.go", "mvcc_incremental_iterator.go", "mvcc_logical_ops.go", + "open.go", "pebble.go", "pebble_batch.go", "pebble_file_registry.go", diff --git a/pkg/storage/batch_test.go b/pkg/storage/batch_test.go index e0c32b447014..0a29cec7aed6 100644 --- a/pkg/storage/batch_test.go +++ b/pkg/storage/batch_test.go @@ -1100,13 +1100,8 @@ func TestDecodeKey(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - e := NewInMem( - context.Background(), - roachpb.Attributes{}, - 1<<20, /* cacheSize */ - 512<<20, /* storeSize */ - nil, /* settings */ - ) + e, err := Open(context.Background(), InMemory(), CacheSize(1<<20 /* 1 MiB */)) + assert.NoError(t, err) defer e.Close() tests := []MVCCKey{ diff --git a/pkg/storage/bench_pebble_test.go b/pkg/storage/bench_pebble_test.go index e900470a187d..6c38a1f35efe 100644 --- a/pkg/storage/bench_pebble_test.go +++ b/pkg/storage/bench_pebble_test.go @@ -16,7 +16,6 @@ import ( "math/rand" "testing" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -24,28 +23,17 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/pebble" - "github.com/cockroachdb/pebble/vfs" ) const testCacheSize = 1 << 30 // 1 GB func setupMVCCPebble(b testing.TB, dir string) Engine { - opts := DefaultPebbleOptions() - opts.FS = vfs.Default - opts.Cache = pebble.NewCache(testCacheSize) - defer opts.Cache.Unref() - - peb, err := NewPebble( + peb, err := Open( context.Background(), - PebbleConfig{ - StorageConfig: base.StorageConfig{ - Dir: dir, - Settings: makeSettingsForSeparatedIntents( - false /* oldClusterVersion */, true /* enabled */), - }, - Opts: opts, - }) + Filesystem(dir), + CacheSize(testCacheSize), + Settings(makeSettingsForSeparatedIntents( + false /* oldClusterVersion */, true /* enabled */))) if err != nil { b.Fatalf("could not create new pebble instance at %s: %+v", dir, err) } @@ -58,19 +46,11 @@ func setupMVCCInMemPebble(b testing.TB, loc string) Engine { } func setupMVCCInMemPebbleWithSettings(b testing.TB, settings *cluster.Settings) Engine { - opts := DefaultPebbleOptions() - opts.FS = vfs.NewMem() - opts.Cache = pebble.NewCache(testCacheSize) - defer opts.Cache.Unref() - - peb, err := NewPebble( + peb, err := Open( context.Background(), - PebbleConfig{ - Opts: opts, - StorageConfig: base.StorageConfig{ - Settings: settings, - }, - }) + InMemory(), + Settings(settings), + CacheSize(testCacheSize)) if err != nil { b.Fatalf("could not create new in-mem pebble instance: %+v", err) } diff --git a/pkg/storage/bench_test.go b/pkg/storage/bench_test.go index 79c157b64ce0..b09b47957b5e 100644 --- a/pkg/storage/bench_test.go +++ b/pkg/storage/bench_test.go @@ -20,7 +20,6 @@ import ( "testing" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -531,15 +530,10 @@ func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int exists = false } - eng, err := NewPebble( + eng, err := Open( context.Background(), - PebbleConfig{ - StorageConfig: base.StorageConfig{ - Settings: cluster.MakeTestingClusterSettings(), - Dir: dir, - }, - }, - ) + Filesystem(dir), + Settings(cluster.MakeTestingClusterSettings())) if err != nil { return nil, err } diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index 43b4e30384de..52ab4794ad29 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -17,7 +17,6 @@ import ( "io" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" @@ -873,24 +872,6 @@ type EncryptionRegistries struct { KeyRegistry []byte } -// NewEngine creates a new storage engine. -func NewEngine(cacheSize int64, storageConfig base.StorageConfig) (Engine, error) { - pebbleConfig := PebbleConfig{ - StorageConfig: storageConfig, - Opts: DefaultPebbleOptions(), - } - pebbleConfig.Opts.Cache = pebble.NewCache(cacheSize) - defer pebbleConfig.Opts.Cache.Unref() - - return NewPebble(context.Background(), pebbleConfig) -} - -// NewDefaultEngine allocates and returns a new, opened engine with the default configuration. -// The caller must call the engine's Close method when the engine is no longer needed. -func NewDefaultEngine(cacheSize int64, storageConfig base.StorageConfig) (Engine, error) { - return NewEngine(cacheSize, storageConfig) -} - // PutProto sets the given key to the protobuf-serialized byte string // of msg. Returns the length in bytes of key and the value. // diff --git a/pkg/storage/engine_test.go b/pkg/storage/engine_test.go index 7f74600cf4fa..9c6e9b3346c7 100644 --- a/pkg/storage/engine_test.go +++ b/pkg/storage/engine_test.go @@ -26,7 +26,6 @@ import ( "testing" "time" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -40,7 +39,6 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/oserror" "github.com/cockroachdb/pebble" - "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" @@ -582,10 +580,7 @@ func TestEngineMustExist(t *testing.T) { tempDir, dirCleanupFn := testutils.TempDir(t) defer dirCleanupFn() - _, err := NewEngine(0, base.StorageConfig{ - Dir: tempDir, - MustExist: true, - }) + _, err := Open(context.Background(), Filesystem(tempDir), MustExist) if err == nil { t.Fatal("expected error related to missing directory") } @@ -1185,17 +1180,10 @@ func TestCreateCheckpoint(t *testing.T) { dir, cleanup := testutils.TempDir(t) defer cleanup() - opts := DefaultPebbleOptions() - db, err := NewPebble( + db, err := Open( context.Background(), - PebbleConfig{ - StorageConfig: base.StorageConfig{ - Settings: cluster.MakeTestingClusterSettings(), - Dir: dir, - }, - Opts: opts, - }, - ) + Filesystem(dir), + Settings(cluster.MakeTestingClusterSettings())) assert.NoError(t, err) defer db.Close() @@ -1421,20 +1409,10 @@ type engineImpl struct { // These FS implementations are not in-memory. var engineRealFSImpls = []engineImpl{ {"pebble", func(t *testing.T, dir string) Engine { - - opts := DefaultPebbleOptions() - opts.FS = vfs.Default - opts.Cache = pebble.NewCache(testCacheSize) - defer opts.Cache.Unref() - - db, err := NewPebble( + db, err := Open( context.Background(), - PebbleConfig{ - StorageConfig: base.StorageConfig{ - Dir: dir, - }, - Opts: opts, - }) + Filesystem(dir), + CacheSize(testCacheSize)) if err != nil { t.Fatalf("could not create new pebble instance at %s: %+v", dir, err) } @@ -1536,13 +1514,8 @@ func TestSupportsPrev(t *testing.T) { } t.Run("pebble", func(t *testing.T) { - eng := NewInMem( - context.Background(), - roachpb.Attributes{}, - 1<<20, /* cacheSize */ - 512<<20, /* storeSize */ - nil, /* settings */ - ) + eng, err := Open(context.Background(), InMemory(), CacheSize(1<<20 /* 1 MiB */)) + require.NoError(t, err) defer eng.Close() runTest(t, eng, engineTest{ engineIterSupportsPrev: true, @@ -1665,14 +1638,9 @@ func TestScanSeparatedIntents(t *testing.T) { for name, enableSeparatedIntents := range map[string]bool{"interleaved": false, "separated": true} { t.Run(name, func(t *testing.T) { - settings := makeSettingsForSeparatedIntents(false, enableSeparatedIntents) - eng := NewInMem( - ctx, - roachpb.Attributes{}, - 1<<20, /* cacheSize */ - 512<<20, /* storeSize */ - settings, - ) + eng, err := Open(ctx, InMemory(), CacheSize(1<<20 /* 1 MiB */), + Settings(makeSettingsForSeparatedIntents(false, enableSeparatedIntents))) + require.NoError(t, err) defer eng.Close() for _, key := range keys { diff --git a/pkg/storage/in_mem.go b/pkg/storage/in_mem.go index 015c62c71926..3ab2c49fa8f7 100644 --- a/pkg/storage/in_mem.go +++ b/pkg/storage/in_mem.go @@ -15,38 +15,24 @@ import ( "math/rand" "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/pebble/vfs" ) -// NewInMem allocates and returns a new, opened in-memory engine. The caller -// must call the engine's Close method when the engine is no longer needed. -// -// FIXME(tschottdorf): make the signature similar to NewPebble (require a cfg). -func NewInMem( - ctx context.Context, - attrs roachpb.Attributes, - cacheSize, storeSize int64, - settings *cluster.Settings, -) Engine { - return newPebbleInMem(ctx, attrs, cacheSize, storeSize, vfs.NewMem(), "", settings) -} - // InMemFromFS allocates and returns new, opened in-memory engine. Engine // uses provided in mem file system and base directory to store data. The // caller must call obtained engine's Close method when engine is no longer // needed. -func InMemFromFS( - ctx context.Context, - attrs roachpb.Attributes, - cacheSize, storeSize int64, - fs vfs.FS, - dir string, - settings *cluster.Settings, -) Engine { - return newPebbleInMem(ctx, attrs, cacheSize, storeSize, fs, dir, settings) +func InMemFromFS(ctx context.Context, fs vfs.FS, dir string, opts ...ConfigOption) Engine { + // TODO(jackson): Replace this function with a special Location + // constructor that allows both specifying a directory and supplying your + // own VFS? + eng, err := Open(ctx, Location{dir: dir, fs: fs}, opts...) + if err != nil { + panic(err) + } + return eng } // The ForTesting functions randomize the settings for separated intents. This @@ -60,24 +46,21 @@ func InMemFromFS( // other than configuring separated intents. So the fact that we have two // inconsistent cluster.Settings is harmless. -// NewInMemForTesting allocates and returns a new, opened in-memory engine. The caller -// must call the engine's Close method when the engine is no longer needed. -func NewInMemForTesting(ctx context.Context, attrs roachpb.Attributes, storeSize int64) Engine { - settings := MakeRandomSettingsForSeparatedIntents() - return newPebbleInMem(ctx, attrs, 0 /* cacheSize */, storeSize, vfs.NewMem(), "", settings) -} - // NewDefaultInMemForTesting allocates and returns a new, opened in-memory engine with // the default configuration. The caller must call the engine's Close method // when the engine is no longer needed. func NewDefaultInMemForTesting() Engine { - return NewInMemForTesting(context.Background(), roachpb.Attributes{}, 1<<20) + eng, err := Open(context.Background(), InMemory(), SettingsForTesting(), MaxSize(1<<20)) + if err != nil { + panic(err) + } + return eng } -// MakeRandomSettingsForSeparatedIntents makes settings for which it randomly +// makeRandomSettingsForSeparatedIntents makes settings for which it randomly // picks whether the cluster understands separated intents, and if yes, // whether to write separated intents. Once made, these setting do not change. -func MakeRandomSettingsForSeparatedIntents() *cluster.Settings { +func makeRandomSettingsForSeparatedIntents() *cluster.Settings { oldClusterVersion := rand.Intn(2) == 0 enabledSeparated := rand.Intn(2) == 0 log.Infof(context.Background(), diff --git a/pkg/storage/metamorphic/generator.go b/pkg/storage/metamorphic/generator.go index ff6454d445af..0f1e809d1775 100644 --- a/pkg/storage/metamorphic/generator.go +++ b/pkg/storage/metamorphic/generator.go @@ -37,14 +37,11 @@ func makeStorageConfig(path string) base.StorageConfig { } func createTestPebbleEngine(path string, seed int64) (storage.Engine, error) { - pebbleConfig := storage.PebbleConfig{ - StorageConfig: makeStorageConfig(path), - Opts: storage.DefaultPebbleOptions(), - } - pebbleConfig.Opts.Cache = pebble.NewCache(1 << 20) - defer pebbleConfig.Opts.Cache.Unref() - - return storage.NewPebble(context.Background(), pebbleConfig) + return storage.Open( + context.Background(), + storage.Filesystem(path), + storage.CacheSize(1<<20 /* 1 MiB */), + storage.Settings(cluster.MakeTestingClusterSettings())) } func createTestPebbleManySSTs(path string, seed int64) (storage.Engine, error) { diff --git a/pkg/storage/multi_iterator_test.go b/pkg/storage/multi_iterator_test.go index 6a9e82da44d6..eb71607ad801 100644 --- a/pkg/storage/multi_iterator_test.go +++ b/pkg/storage/multi_iterator_test.go @@ -27,13 +27,10 @@ func TestMultiIterator(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - pebble := NewInMem( - context.Background(), - roachpb.Attributes{}, - 1<<20, /* cacheSize */ - 512<<20, /* storeSize */ - nil, /* settings */ - ) + pebble, err := Open(context.Background(), InMemory(), CacheSize(1<<20 /* 1 MiB */)) + if err != nil { + t.Fatal(err) + } defer pebble.Close() // Each `input` is turned into an iterator and these are passed to a new diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index 4093dc194d0b..e69d894fd861 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -119,7 +119,10 @@ func TestMVCCHistories(t *testing.T) { } settings := makeSettingsForSeparatedIntents(oldClusterVersion, enabledSeparated) // We start from a clean slate in every test file. - engine := createTestPebbleEngineWithSettings(settings) + engine, err := Open(ctx, InMemory(), CacheSize(1<<20 /* 1 MiB */), Settings(settings)) + if err != nil { + t.Fatal(err) + } defer engine.Close() reportDataEntries := func(buf *bytes.Buffer) error { diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index abe806949caa..1de8e7f0dcc8 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -911,7 +911,8 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) { // regular MVCCPut operation to generate these keys, which we'll later be // copying into manually created sstables. ctx := context.Background() - db1 := NewInMemForTesting(ctx, roachpb.Attributes{}, 10<<20) + db1, err := Open(ctx, InMemory(), SettingsForTesting()) + require.NoError(t, err) defer db1.Close() put := func(key, value string, ts int64, txn *roachpb.Transaction) { @@ -946,7 +947,8 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) { // // SSTable 2: // b@2 - db2 := NewInMemForTesting(ctx, roachpb.Attributes{}, 10<<20) + db2, err := Open(ctx, InMemory(), SettingsForTesting()) + require.NoError(t, err) defer db2.Close() // NB: If the original intent was separated, iterating using an interleaving diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index baf096da94cb..bf0ccda39fec 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -25,7 +25,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -84,16 +83,6 @@ func createTestPebbleEngine() Engine { return NewDefaultInMemForTesting() } -func createTestPebbleEngineWithSettings(settings *cluster.Settings) Engine { - return NewInMem( - context.Background(), - roachpb.Attributes{}, - 1<<20, /* cacheSize */ - 512<<20, /* storeSize */ - settings, - ) -} - // TODO(sumeer): the following is legacy from when we had multiple engine // implementations. Some tests are switched over to only create Pebble, since // the create method does not provide control over cluster.Settings. Switch diff --git a/pkg/storage/open.go b/pkg/storage/open.go new file mode 100644 index 000000000000..33c535b30663 --- /dev/null +++ b/pkg/storage/open.go @@ -0,0 +1,168 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/vfs" +) + +// A ConfigOption may be passed to Open to configure the storage engine. +type ConfigOption func(cfg *engineConfig) error + +// CombineOptions combines many options into one. +func CombineOptions(opts ...ConfigOption) ConfigOption { + return func(cfg *engineConfig) error { + for _, opt := range opts { + if err := opt(cfg); err != nil { + return err + } + } + return nil + } +} + +// ReadOnly configures an engine to be opened in read-only mode. +var ReadOnly ConfigOption = func(cfg *engineConfig) error { + cfg.Opts.ReadOnly = true + return nil +} + +// MustExist configures an engine to error on Open if the target directory +// does not contain an initialized store. +var MustExist ConfigOption = func(cfg *engineConfig) error { + cfg.MustExist = true + return nil +} + +// Attributes configures the engine's attributes. +func Attributes(attrs roachpb.Attributes) ConfigOption { + return func(cfg *engineConfig) error { + cfg.Attrs = attrs + return nil + } +} + +// MaxSize sets the intended maximum store size. MaxSize is used for +// calculating free space and making rebalancing decisions. +func MaxSize(size int64) ConfigOption { + return func(cfg *engineConfig) error { + cfg.MaxSize = size + return nil + } +} + +// MaxOpenFiles sets the maximum number of files an engine should open. +func MaxOpenFiles(count int) ConfigOption { + return func(cfg *engineConfig) error { + cfg.Opts.MaxOpenFiles = count + return nil + } + +} + +// Settings sets the cluster settings to use. +func Settings(settings *cluster.Settings) ConfigOption { + return func(cfg *engineConfig) error { + cfg.Settings = settings + return nil + } +} + +// CacheSize configures the size of the block cache. +func CacheSize(size int64) ConfigOption { + return func(cfg *engineConfig) error { + cfg.cacheSize = &size + return nil + } +} + +// Hook configures a hook to initialize additional storage options. It's used +// to initialize encryption-at-rest details in CCL builds. +func Hook(hookFunc func(*base.StorageConfig) error) ConfigOption { + return func(cfg *engineConfig) error { + if hookFunc == nil { + return nil + } + return hookFunc(&cfg.PebbleConfig.StorageConfig) + } +} + +// SettingsForTesting configures the engine's cluster settings for an engine +// used in testing. It may randomize some cluster settings to improve test +// coverage. +func SettingsForTesting() ConfigOption { + return Settings(makeRandomSettingsForSeparatedIntents()) +} + +// A Location describes where the storage engine's data will be written. A +// Location may be in-memory or on the filesystem. +type Location struct { + dir string + fs vfs.FS +} + +// Filesystem constructs a Location that instructs the storage engine to read +// and store data on the filesystem in the provided directory. +func Filesystem(dir string) Location { + return Location{ + dir: dir, + // fs is left nil intentionally, so that it will be left as the + // default of vfs.Default wrapped in vfs.WithDiskHealthChecks + // (initialized by DefaultPebbleOptions). + // TODO(jackson): Refactor to make it harder to accidentially remove + // disk health checks by setting your own VFS in a call to NewPebble. + } +} + +// InMemory constructs a Location that instructs the storage engine to store +// data in-memory. +func InMemory() Location { + return Location{ + dir: "", + fs: vfs.NewMem(), + } +} + +type engineConfig struct { + PebbleConfig + // cacheSize is stored separately so that we can avoid constructing the + // PebbleConfig.Opts.Cache until the call to Open. A Cache is created with + // a ref count of 1, so creating the Cache during execution of + // ConfigOption makes it too easy to leak a cache. + cacheSize *int64 +} + +// Open opens a new Pebble storage engine, reading and writing data to the +// provided Location, configured with the provided options. +func Open(ctx context.Context, loc Location, opts ...ConfigOption) (*Pebble, error) { + var cfg engineConfig + cfg.Dir = loc.dir + cfg.Opts = DefaultPebbleOptions() + if loc.fs != nil { + cfg.Opts.FS = loc.fs + } + for _, opt := range opts { + if err := opt(&cfg); err != nil { + return nil, err + } + } + if cfg.cacheSize != nil { + cfg.Opts.Cache = pebble.NewCache(*cfg.cacheSize) + defer cfg.Opts.Cache.Unref() + } + return NewPebble(ctx, cfg.PebbleConfig) +} diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index c16962e78fdf..8026dc4ee9c1 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -602,36 +602,6 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (*Pebble, error) { return p, nil } -func newPebbleInMem( - ctx context.Context, - attrs roachpb.Attributes, - cacheSize, storeSize int64, - fs vfs.FS, - dir string, - settings *cluster.Settings, -) *Pebble { - opts := DefaultPebbleOptions() - opts.Cache = pebble.NewCache(cacheSize) - defer opts.Cache.Unref() - - opts.FS = fs - db, err := NewPebble( - ctx, - PebbleConfig{ - StorageConfig: base.StorageConfig{ - Attrs: attrs, - Dir: dir, - MaxSize: storeSize, - Settings: settings, - }, - Opts: opts, - }) - if err != nil { - panic(err) - } - return db -} - func (p *Pebble) connectEventMetrics(ctx context.Context, eventListener *pebble.EventListener) { oldDiskSlow := eventListener.DiskSlow diff --git a/pkg/storage/pebble_mvcc_scanner_test.go b/pkg/storage/pebble_mvcc_scanner_test.go index 934042d88e74..733998d4a6d3 100644 --- a/pkg/storage/pebble_mvcc_scanner_test.go +++ b/pkg/storage/pebble_mvcc_scanner_test.go @@ -30,9 +30,11 @@ func TestMVCCScanWithManyVersionsAndSeparatedIntents(t *testing.T) { defer leaktest.AfterTest(t)() // Force separated intents for writing. - settings := makeSettingsForSeparatedIntents( - false /* oldClusterVersion */, true /* enabled */) - eng := createTestPebbleEngineWithSettings(settings) + eng, err := Open(context.Background(), InMemory(), + CacheSize(1<<20), + Settings(makeSettingsForSeparatedIntents( + false /* oldClusterVersion */, true /* enabled */))) + require.NoError(t, err) defer eng.Close() keys := []roachpb.Key{roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")} diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index d95f95cfaa3d..38282e46a48e 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -34,7 +34,6 @@ import ( "github.com/cockroachdb/datadriven" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" - "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) @@ -426,15 +425,8 @@ func TestPebbleDiskSlowEmit(t *testing.T) { settings := cluster.MakeTestingClusterSettings() MaxSyncDurationFatalOnExceeded.Override(ctx, &settings.SV, false) - p := newPebbleInMem( - context.Background(), - roachpb.Attributes{}, - 1<<20, /* cacheSize */ - 512<<20, /* storeSize */ - vfs.NewMem(), - "", - settings, - ) + p, err := Open(ctx, InMemory(), CacheSize(1<<20 /* 1 MiB */), Settings(settings)) + require.NoError(t, err) defer p.Close() require.Equal(t, uint64(0), p.diskSlowCount) diff --git a/pkg/storage/temp_engine.go b/pkg/storage/temp_engine.go index e69d3e268c1b..9f46e9d450f3 100644 --- a/pkg/storage/temp_engine.go +++ b/pkg/storage/temp_engine.go @@ -15,11 +15,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/diskmap" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/fs" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/pebble" - "github.com/cockroachdb/pebble/vfs" ) // NewTempEngine creates a new engine for DistSQL processors to use when @@ -30,21 +28,6 @@ func NewTempEngine( return NewPebbleTempEngine(ctx, tempStorage, storeSpec) } -// storageConfigFromTempStorageConfigAndStoreSpec creates a base.StorageConfig -// used by both the RocksDB and Pebble temp engines from the given arguments. -func storageConfigFromTempStorageConfigAndStoreSpec( - config base.TempStorageConfig, spec base.StoreSpec, -) base.StorageConfig { - return base.StorageConfig{ - Attrs: roachpb.Attributes{}, - Dir: config.Path, - MaxSize: 0, // doesn't matter for temp storage - it's not enforced in any way. - Settings: config.Settings, - UseFileRegistry: spec.UseFileRegistry, - EncryptionOptions: spec.EncryptionOptions, - } -} - type pebbleTempEngine struct { db *pebble.DB } @@ -78,35 +61,29 @@ func NewPebbleTempEngine( func newPebbleTempEngine( ctx context.Context, tempStorage base.TempStorageConfig, storeSpec base.StoreSpec, ) (*pebbleTempEngine, fs.FS, error) { - // Default options as copied over from pebble/cmd/pebble/db.go - opts := DefaultPebbleOptions() - // Pebble doesn't currently support 0-size caches, so use a 128MB cache for - // now. - opts.Cache = pebble.NewCache(128 << 20) - defer opts.Cache.Unref() - - // The Pebble temp engine does not use MVCC Encoding. Instead, the - // caller-provided key is used as-is (with the prefix prepended). See - // pebbleMap.makeKey and pebbleMap.makeKeyWithSequence on how this works. - // Use the default bytes.Compare-like comparer. - opts.Comparer = pebble.DefaultComparer - opts.DisableWAL = true - opts.TablePropertyCollectors = nil - - storageConfig := storageConfigFromTempStorageConfigAndStoreSpec(tempStorage, storeSpec) + var loc Location if tempStorage.InMemory { - opts.FS = vfs.NewMem() - storageConfig.Dir = "" + loc = InMemory() + } else { + loc = Filesystem(tempStorage.Path) } - p, err := NewPebble( - ctx, - PebbleConfig{ - StorageConfig: storageConfig, - Opts: opts, + p, err := Open(ctx, loc, + CacheSize(128<<20), + func(cfg *engineConfig) error { + cfg.UseFileRegistry = storeSpec.UseFileRegistry + cfg.EncryptionOptions = storeSpec.EncryptionOptions + + // The Pebble temp engine does not use MVCC Encoding. Instead, the + // caller-provided key is used as-is (with the prefix prepended). See + // pebbleMap.makeKey and pebbleMap.makeKeyWithSequence on how this works. + // Use the default bytes.Compare-like comparer. + cfg.Opts.Comparer = pebble.DefaultComparer + cfg.Opts.DisableWAL = true + cfg.Opts.TablePropertyCollectors = nil + return nil }, ) - if err != nil { return nil, nil, err } diff --git a/pkg/testutils/colcontainerutils/BUILD.bazel b/pkg/testutils/colcontainerutils/BUILD.bazel index df75d69b8cfb..eb846c900f58 100644 --- a/pkg/testutils/colcontainerutils/BUILD.bazel +++ b/pkg/testutils/colcontainerutils/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/testutils/colcontainerutils", visibility = ["//visibility:public"], deps = [ - "//pkg/base", "//pkg/sql/colcontainer", "//pkg/storage", "//pkg/storage/fs", diff --git a/pkg/testutils/colcontainerutils/diskqueuecfg.go b/pkg/testutils/colcontainerutils/diskqueuecfg.go index f848183ab318..c3fada0fdaba 100644 --- a/pkg/testutils/colcontainerutils/diskqueuecfg.go +++ b/pkg/testutils/colcontainerutils/diskqueuecfg.go @@ -14,7 +14,6 @@ import ( "context" "testing" - "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/sql/colcontainer" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/fs" @@ -45,11 +44,14 @@ func NewTestingDiskQueueCfg(t testing.TB, inMem bool) (colcontainer.DiskQueueCfg } else { tempPath, dirCleanup := testutils.TempDir(t) path = tempPath - ngn, err := storage.NewDefaultEngine(0 /* cacheSize */, base.StorageConfig{Dir: tempPath}) + ngn, err := storage.Open( + context.Background(), + storage.Filesystem(tempPath), + storage.CacheSize(0)) if err != nil { t.Fatal(err) } - testingFS = ngn.(fs.FS) + testingFS = ngn cleanup = func() { ngn.Close() dirCleanup() diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index d347291ccfb4..c8aa27643cbe 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -133,13 +133,16 @@ func (ltc *LocalTestCluster) Start(t testing.TB, baseCtx *base.Config, initFacto clusterID := &cfg.RPCContext.ClusterID server := rpc.NewServer(cfg.RPCContext) // never started ltc.Gossip = gossip.New(ambient, clusterID, nc, cfg.RPCContext, server, ltc.stopper, metric.NewRegistry(), roachpb.Locality{}, zonepb.DefaultZoneConfigRef()) - ltc.Eng = storage.NewInMem( + var err error + ltc.Eng, err = storage.Open( ambient.AnnotateCtx(context.Background()), - roachpb.Attributes{}, - 0, /* cacheSize */ - 50<<20, /* storeSize */ - storage.MakeRandomSettingsForSeparatedIntents(), - ) + storage.InMemory(), + storage.CacheSize(0), + storage.MaxSize(50<<20 /* 50 MiB */), + storage.SettingsForTesting()) + if err != nil { + t.Fatal(err) + } ltc.stopper.AddCloser(ltc.Eng) ltc.Stores = kvserver.NewStores(ambient, ltc.Clock) From 251b4bdb692417a5e803389f8717d3258fa4e513 Mon Sep 17 00:00:00 2001 From: Azhng Date: Thu, 22 Jul 2021 13:11:34 -0400 Subject: [PATCH 2/2] sql,server: refactor SQL Stats control to sqlstats package Previsouly, the administrative actions on SQL Stats subsystem (e.g. reset sql stats) is directly implements on the sql.Server. The implication of this is that various parts of the system needs to maintain a reference of sql.Server one way or another. As SQL Stats subsystem grows in complexity, it's no longer feasible to piggyback off sql.Server for implementing the control logic. This commit refactors those logics into a new sslocal.Controller struct. This struct will be reponsible for all the control actions. Release note: None --- pkg/server/diagnostics/reporter.go | 2 +- pkg/server/diagnostics/reporter_test.go | 6 +- pkg/server/server_sql.go | 2 +- pkg/server/sql_stats.go | 3 +- pkg/server/stats_test.go | 12 ++-- pkg/server/tenant_status.go | 3 +- pkg/sql/conn_executor.go | 65 ++++++++----------- pkg/sql/distsql/server.go | 2 +- pkg/sql/execinfra/server_config.go | 4 +- pkg/sql/planner.go | 32 ++++----- pkg/sql/sem/builtins/builtins.go | 4 +- pkg/sql/sem/tree/eval.go | 6 +- pkg/sql/sqlstats/sslocal/BUILD.bazel | 3 + .../sqlstats/sslocal/sql_stats_controller.go | 58 +++++++++++++++++ pkg/sql/sqlstats/sslocal/sslocal_provider.go | 7 ++ pkg/sql/sqltestutils/telemetry.go | 4 +- 16 files changed, 137 insertions(+), 76 deletions(-) create mode 100644 pkg/sql/sqlstats/sslocal/sql_stats_controller.go diff --git a/pkg/server/diagnostics/reporter.go b/pkg/server/diagnostics/reporter.go index 27d73a847b84..5dda40efbd63 100644 --- a/pkg/server/diagnostics/reporter.go +++ b/pkg/server/diagnostics/reporter.go @@ -159,7 +159,7 @@ func (r *Reporter) ReportDiagnostics(ctx context.Context) { "error: %v", res.Status, b, err) return } - r.SQLServer.ResetReportedStats(ctx) + r.SQLServer.GetReportedSQLStatsController().ResetLocalSQLStats(ctx) } // CreateReport generates a new diagnostics report containing information about diff --git a/pkg/server/diagnostics/reporter_test.go b/pkg/server/diagnostics/reporter_test.go index 66d518729016..950b02a380d4 100644 --- a/pkg/server/diagnostics/reporter_test.go +++ b/pkg/server/diagnostics/reporter_test.go @@ -65,7 +65,7 @@ func TestTenantReport(t *testing.T) { setupCluster(t, tenantDB) // Clear the SQL stat pool before getting diagnostics. - rt.server.SQLServer().(*sql.Server).ResetSQLStats(ctx) + rt.server.SQLServer().(*sql.Server).GetSQLStatsController().ResetLocalSQLStats(ctx) reporter.ReportDiagnostics(ctx) require.Equal(t, 1, rt.diagServer.NumRequests()) @@ -137,7 +137,7 @@ func TestServerReport(t *testing.T) { node := rt.server.MetricsRecorder().GenerateNodeStatus(ctx) // Clear the SQL stat pool before getting diagnostics. - rt.server.SQLServer().(*sql.Server).ResetSQLStats(ctx) + rt.server.SQLServer().(*sql.Server).GetSQLStatsController().ResetLocalSQLStats(ctx) rt.server.DiagnosticsReporter().(*diagnostics.Reporter).ReportDiagnostics(ctx) keyCounts := make(map[roachpb.StoreID]int64) @@ -334,7 +334,7 @@ func TestUsageQuantization(t *testing.T) { } // Flush the SQL stat pool. - ts.SQLServer().(*sql.Server).ResetSQLStats(ctx) + ts.SQLServer().(*sql.Server).GetSQLStatsController().ResetLocalSQLStats(ctx) // Collect a round of statistics. ts.DiagnosticsReporter().(*diagnostics.Reporter).ReportDiagnostics(ctx) diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 12cf04676c49..718b8e6e103a 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -698,7 +698,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { execCfg, ) - distSQLServer.ServerConfig.SQLStatsResetter = pgServer.SQLServer + distSQLServer.ServerConfig.SQLStatsController = pgServer.SQLServer.GetSQLStatsController() // Now that we have a pgwire.Server (which has a sql.Server), we can close a // circular dependency between the rowexec.Server and sql.Server and set diff --git a/pkg/server/sql_stats.go b/pkg/server/sql_stats.go index db595471dff0..f665247a0021 100644 --- a/pkg/server/sql_stats.go +++ b/pkg/server/sql_stats.go @@ -42,7 +42,8 @@ func (s *statusServer) ResetSQLStats( return nil, status.Errorf(codes.InvalidArgument, err.Error()) } if local { - s.admin.server.sqlServer.pgServer.SQLServer.ResetSQLStats(ctx) + controller := s.sqlServer.pgServer.SQLServer.GetSQLStatsController() + controller.ResetLocalSQLStats(ctx) return response, nil } status, err := s.dialNode(ctx, requestedNodeID) diff --git a/pkg/server/stats_test.go b/pkg/server/stats_test.go index c77369d90425..07c1ba25b338 100644 --- a/pkg/server/stats_test.go +++ b/pkg/server/stats_test.go @@ -65,8 +65,8 @@ CREATE TABLE t.test (x INT PRIMARY KEY); sqlServer := s.(*TestServer).Server.sqlServer.pgServer.SQLServer // Flush stats at the beginning of the test. - sqlServer.ResetSQLStats(ctx) - sqlServer.ResetReportedStats(ctx) + sqlServer.GetSQLStatsController().ResetLocalSQLStats(ctx) + sqlServer.GetReportedSQLStatsController().ResetLocalSQLStats(ctx) // Run some queries mixed with diagnostics, and ensure that the statistics // are unnaffected by the calls to report diagnostics. @@ -167,8 +167,8 @@ func TestSQLStatCollection(t *testing.T) { sqlServer := s.(*TestServer).Server.sqlServer.pgServer.SQLServer // Flush stats at the beginning of the test. - sqlServer.ResetSQLStats(ctx) - sqlServer.ResetReportedStats(ctx) + sqlServer.GetSQLStatsController().ResetLocalSQLStats(ctx) + sqlServer.GetReportedSQLStatsController().ResetLocalSQLStats(ctx) // Execute some queries against the sqlDB to build up some stats. if _, err := sqlDB.Exec(` @@ -202,7 +202,7 @@ func TestSQLStatCollection(t *testing.T) { // Reset the SQL statistics, which will dump stats into the // reported statistics pool. - sqlServer.ResetSQLStats(ctx) + sqlServer.GetSQLStatsController().ResetLocalSQLStats(ctx) // Query the reported statistics. stats, err = sqlServer.GetScrubbedReportingStats(ctx) @@ -248,7 +248,7 @@ func TestSQLStatCollection(t *testing.T) { } // Flush the SQL stats again. - sqlServer.ResetSQLStats(ctx) + sqlServer.GetSQLStatsController().ResetLocalSQLStats(ctx) // Find our statement stat from the reported stats pool. stats, err = sqlServer.GetScrubbedReportingStats(ctx) diff --git a/pkg/server/tenant_status.go b/pkg/server/tenant_status.go index 11c7fe491979..2761655c9698 100644 --- a/pkg/server/tenant_status.go +++ b/pkg/server/tenant_status.go @@ -114,7 +114,8 @@ func (t *tenantStatusServer) ListContentionEvents( func (t *tenantStatusServer) ResetSQLStats( ctx context.Context, _ *serverpb.ResetSQLStatsRequest, ) (*serverpb.ResetSQLStatsResponse, error) { - t.sqlServer.pgServer.SQLServer.ResetSQLStats(ctx) + controller := t.sqlServer.pgServer.SQLServer.GetSQLStatsController() + controller.ResetLocalSQLStats(ctx) return &serverpb.ResetSQLStatsResponse{}, nil } diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index deaa0fb78da6..5c2583767e05 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -258,11 +258,18 @@ type Server struct { // node. Newly collected statistics flow into sqlStats. sqlStats sqlstats.Provider + // sqlStatsController is the control-plane interface for sqlStats. + sqlStatsController *sslocal.Controller + // reportedStats is a pool of stats that is held for reporting, and is // cleared on a lower interval than sqlStats. Stats from sqlStats flow // into reported stats when sqlStats is cleared. reportedStats sqlstats.Provider + // reportedStatsController is the control-plane interface for + // reportedStatsController. + reportedStatsController *sslocal.Controller + reCache *tree.RegexpCache // pool is the parent monitor for all session monitors. @@ -313,6 +320,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { nil, /* resetInterval */ nil, /* reportedProvider */ ) + reportedSQLStatsController := reportedSQLStats.GetController(cfg.SQLStatusServer) sqlStats := sslocal.New( cfg.Settings, sqlstats.MaxMemSQLStatsStmtFingerprints, @@ -323,14 +331,17 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server { sqlstats.SQLStatReset, reportedSQLStats, ) + sqlStatsController := sqlStats.GetController(cfg.SQLStatusServer) s := &Server{ - cfg: cfg, - Metrics: metrics, - InternalMetrics: makeMetrics(cfg, true /* internal */), - pool: pool, - sqlStats: sqlStats, - reportedStats: reportedSQLStats, - reCache: tree.NewRegexpCache(512), + cfg: cfg, + Metrics: metrics, + InternalMetrics: makeMetrics(cfg, true /* internal */), + pool: pool, + sqlStats: sqlStats, + sqlStatsController: sqlStatsController, + reportedStats: reportedSQLStats, + reportedStatsController: reportedSQLStatsController, + reCache: tree.NewRegexpCache(512), indexUsageStats: idxusage.NewLocalIndexUsageStats(&idxusage.Config{ ChannelSize: idxusage.DefaultChannelSize, Setting: cfg.Settings, @@ -401,28 +412,16 @@ func (s *Server) Start(ctx context.Context, stopper *stop.Stopper) { s.reportedStats.Start(ctx, stopper) } -// ResetSQLStats resets the executor's collected sql statistics. -// TODO(azhng): after refactoring of sqlstats package is completed, we want to -// remove the following functions below that's related to fetching SQL stats -// and expose the provider interfaces instead. This will decouple other -// subsystems from directly consuming the SQLServer methods. -func (s *Server) ResetSQLStats(ctx context.Context) { - err := s.sqlStats.Reset(ctx) - if err != nil { - if log.V(1) { - log.Warningf(ctx, "reported SQL stats memory limit has been exceeded, some fingerprints stats are discarded: %s", err) - } - } +// GetSQLStatsController returns the sqlstats.Controller for current +// sql.Server's SQL Stats. +func (s *Server) GetSQLStatsController() *sslocal.Controller { + return s.sqlStatsController } -// ResetReportedStats resets the executor's collected reported stats. -func (s *Server) ResetReportedStats(ctx context.Context) { - err := s.reportedStats.Reset(ctx) - if err != nil { - if log.V(1) { - log.Errorf(ctx, "failed to reset reported stats: %s", err) - } - } +// GetReportedSQLStatsController returns the sqlstats.Controller for the current +// sql.Server's reported SQL Stats. +func (s *Server) GetReportedSQLStatsController() *sslocal.Controller { + return s.reportedStatsController } // GetScrubbedStmtStats returns the statement statistics by app, with the @@ -858,16 +857,6 @@ func (s *Server) newConnExecutorWithTxn( return ex } -// ResetClusterSQLStats resets the collected cluster-wide SQL statistics by calling into the statusServer. -func (s *Server) ResetClusterSQLStats(ctx context.Context) error { - req := &serverpb.ResetSQLStatsRequest{} - _, err := s.cfg.SQLStatusServer.ResetSQLStats(ctx, req) - if err != nil { - return err - } - return nil -} - type closeType int const ( @@ -2312,7 +2301,7 @@ func (ex *connExecutor) initEvalCtx(ctx context.Context, evalCtx *extendedEvalCo InternalExecutor: &ie, DB: ex.server.cfg.DB, SQLLivenessReader: ex.server.cfg.SQLLivenessReader, - SQLStatsResetter: ex.server, + SQLStatsController: ex.server.sqlStatsController, CompactEngineSpan: ex.server.cfg.CompactEngineSpanFunc, }, SessionMutator: ex.dataMutator, diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 4dfc1b111789..eb4c8fc7138b 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -330,7 +330,7 @@ func (ds *ServerImpl) setupFlow( InternalExecutor: ie, Txn: leafTxn, SQLLivenessReader: ds.ServerConfig.SQLLivenessReader, - SQLStatsResetter: ds.ServerConfig.SQLStatsResetter, + SQLStatsController: ds.ServerConfig.SQLStatsController, } evalCtx.SetStmtTimestamp(timeutil.Unix(0 /* sec */, req.EvalContext.StmtTimestampNanos)) evalCtx.SetTxnTimestamp(timeutil.Unix(0 /* sec */, req.EvalContext.TxnTimestampNanos)) diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index bfc2f0fe0223..a388c78b87a3 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -141,9 +141,9 @@ type ServerConfig struct { // gateway. RangeCache *rangecache.RangeCache - // SQLStatsResetter is an interface used to reset SQL stats without the need to + // SQLStatsController is an interface used to reset SQL stats without the need to // introduce dependency on the sql package. - SQLStatsResetter tree.SQLStatsResetter + SQLStatsController tree.SQLStatsController // SQLSQLResponseAdmissionQ is the admission queue to use for // SQLSQLResponseWork. diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 079a337f2be7..2a5cdedd9599 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" + "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/sslocal" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -414,11 +415,11 @@ func internalExtendedEvalCtx( evalContextTestingKnobs := execCfg.EvalContextTestingKnobs var indexUsageStats *idxusage.LocalIndexUsageStats - var sqlStatsResetter tree.SQLStatsResetter + var sqlStatsController tree.SQLStatsController if execCfg.InternalExecutor != nil { - sqlStatsResetter = execCfg.InternalExecutor.s if execCfg.InternalExecutor.s != nil { indexUsageStats = execCfg.InternalExecutor.s.indexUsageStats + sqlStatsController = execCfg.InternalExecutor.s.sqlStatsController } else { // If the indexUsageStats is nil from the sql.Server, we create a dummy // index usage stats collector. The sql.Server in the ExecutorConfig @@ -426,24 +427,25 @@ func internalExtendedEvalCtx( indexUsageStats = idxusage.NewLocalIndexUsageStats(&idxusage.Config{ Setting: execCfg.Settings, }) + sqlStatsController = &sslocal.Controller{} } } return extendedEvalContext{ EvalContext: tree.EvalContext{ - Txn: txn, - SessionData: sd, - TxnReadOnly: false, - TxnImplicit: true, - Settings: execCfg.Settings, - Codec: execCfg.Codec, - Context: ctx, - Mon: plannerMon, - TestingKnobs: evalContextTestingKnobs, - StmtTimestamp: stmtTimestamp, - TxnTimestamp: txnTimestamp, - InternalExecutor: execCfg.InternalExecutor, - SQLStatsResetter: sqlStatsResetter, + Txn: txn, + SessionData: sd, + TxnReadOnly: false, + TxnImplicit: true, + Settings: execCfg.Settings, + Codec: execCfg.Codec, + Context: ctx, + Mon: plannerMon, + TestingKnobs: evalContextTestingKnobs, + StmtTimestamp: stmtTimestamp, + TxnTimestamp: txnTimestamp, + InternalExecutor: execCfg.InternalExecutor, + SQLStatsController: sqlStatsController, }, SessionMutator: dataMutator, VirtualSchemas: execCfg.VirtualSchemas, diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index 2f9dbe40a6eb..7ec2f8863ae7 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -5540,11 +5540,11 @@ table's zone configuration this will return NULL.`, Types: tree.ArgTypes{}, ReturnType: tree.FixedReturnType(types.Bool), Fn: func(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) { - if evalCtx.SQLStatsResetter == nil { + if evalCtx.SQLStatsController == nil { return nil, errors.AssertionFailedf("sql stats resetter not set") } ctx := evalCtx.Ctx() - if err := evalCtx.SQLStatsResetter.ResetClusterSQLStats(ctx); err != nil { + if err := evalCtx.SQLStatsController.ResetClusterSQLStats(ctx); err != nil { return nil, err } return tree.MakeDBool(true), nil diff --git a/pkg/sql/sem/tree/eval.go b/pkg/sql/sem/tree/eval.go index 25f7da3021f5..c2744af2544c 100644 --- a/pkg/sql/sem/tree/eval.go +++ b/pkg/sql/sem/tree/eval.go @@ -3418,10 +3418,10 @@ var _ base.ModuleTestingKnobs = &EvalContextTestingKnobs{} // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface. func (*EvalContextTestingKnobs) ModuleTestingKnobs() {} -// SQLStatsResetter is an interface embedded in EvalCtx which can be used by +// SQLStatsController is an interface embedded in EvalCtx which can be used by // the builtins to reset SQL stats in the cluster. This interface is introduced // to avoid circular dependency. -type SQLStatsResetter interface { +type SQLStatsController interface { ResetClusterSQLStats(ctx context.Context) error } @@ -3559,7 +3559,7 @@ type EvalContext struct { SQLLivenessReader sqlliveness.Reader - SQLStatsResetter SQLStatsResetter + SQLStatsController SQLStatsController // CompactEngineSpan is used to force compaction of a span in a store. CompactEngineSpan CompactEngineSpanFunc diff --git a/pkg/sql/sqlstats/sslocal/BUILD.bazel b/pkg/sql/sqlstats/sslocal/BUILD.bazel index 94e1d57b7d21..d18b27795d2f 100644 --- a/pkg/sql/sqlstats/sslocal/BUILD.bazel +++ b/pkg/sql/sqlstats/sslocal/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "sslocal", srcs = [ "sql_stats.go", + "sql_stats_controller.go", "sslocal_provider.go", "sslocal_sink.go", "sslocal_stats_collector.go", @@ -12,8 +13,10 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/roachpb:with-mocks", + "//pkg/server/serverpb", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/sql/sem/tree", "//pkg/sql/sessionphase", "//pkg/sql/sqlstats", "//pkg/sql/sqlstats/ssmemstorage", diff --git a/pkg/sql/sqlstats/sslocal/sql_stats_controller.go b/pkg/sql/sqlstats/sslocal/sql_stats_controller.go new file mode 100644 index 000000000000..444551e0a55e --- /dev/null +++ b/pkg/sql/sqlstats/sslocal/sql_stats_controller.go @@ -0,0 +1,58 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sslocal + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// Controller implements the SQL Stats subsystem control plane. This exposes +// administrative interfaces that can be consumed by other parts of the database +// (e.g. status server, builtins) to control the behavior of the SQL Stats +// subsystem. +type Controller struct { + sqlStats *SQLStats + statusServer serverpb.SQLStatusServer +} + +var _ tree.SQLStatsController = &Controller{} + +// NewController returns a new instance of sqlstats.Controller. +func NewController(sqlStats *SQLStats, status serverpb.SQLStatusServer) *Controller { + return &Controller{ + sqlStats: sqlStats, + statusServer: status, + } +} + +// ResetClusterSQLStats implements the tree.SQLStatsController interface. +func (s *Controller) ResetClusterSQLStats(ctx context.Context) error { + req := &serverpb.ResetSQLStatsRequest{} + _, err := s.statusServer.ResetSQLStats(ctx, req) + if err != nil { + return err + } + return nil +} + +// ResetLocalSQLStats resets the node-local sql stats. +func (s *Controller) ResetLocalSQLStats(ctx context.Context) { + err := s.sqlStats.Reset(ctx) + if err != nil { + if log.V(1) { + log.Warningf(ctx, "reported SQL stats memory limit has been exceeded, some fingerprints stats are discarded: %s", err) + } + } +} diff --git a/pkg/sql/sqlstats/sslocal/sslocal_provider.go b/pkg/sql/sqlstats/sslocal/sslocal_provider.go index e3f8435d013b..15ea964e8588 100644 --- a/pkg/sql/sqlstats/sslocal/sslocal_provider.go +++ b/pkg/sql/sqlstats/sslocal/sslocal_provider.go @@ -17,6 +17,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" @@ -46,6 +47,12 @@ func New( var _ sqlstats.Provider = &SQLStats{} +// GetController returns a sqlstats.Controller responsible for the current +// SQLStats. +func (s *SQLStats) GetController(server serverpb.SQLStatusServer) *Controller { + return NewController(s, server) +} + // Start implements sqlstats.Provider interface. func (s *SQLStats) Start(ctx context.Context, stopper *stop.Stopper) { if s.resetInterval != nil { diff --git a/pkg/sql/sqltestutils/telemetry.go b/pkg/sql/sqltestutils/telemetry.go index 9c59d6f9e713..da354fc6b7ba 100644 --- a/pkg/sql/sqltestutils/telemetry.go +++ b/pkg/sql/sqltestutils/telemetry.go @@ -246,7 +246,7 @@ func (tt *telemetryTest) RunTest( case "sql-stats": // Report diagnostics once to reset the stats. - sqlServer.ResetSQLStats(ctx) + sqlServer.GetSQLStatsController().ResetLocalSQLStats(ctx) reportDiags(ctx) _, err := db.Exec(td.Input) @@ -254,7 +254,7 @@ func (tt *telemetryTest) RunTest( if err != nil { fmt.Fprintf(&buf, "error: %v\n", err) } - sqlServer.ResetSQLStats(ctx) + sqlServer.GetSQLStatsController().ResetLocalSQLStats(ctx) reportDiags(ctx) last := tt.diagSrv.LastRequestData() buf.WriteString(formatSQLStats(last.SqlStats))