diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index e9fc4b11e748..d9d4daa57d42 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -581,7 +581,7 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) { } // ensure that we do not emit a resolved timestamp - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { return true } @@ -616,7 +616,7 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) { ) // allow the changefeed to emit resolved events now - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { return false } @@ -973,14 +973,14 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) { s.SystemServer.DB(), s.Codec, "d", "foo") tableSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) - // ShouldSkipResolved should ensure that once the backfill begins, the following resolved events + // FilterSpanWithMutation should ensure that once the backfill begins, the following resolved events // that are for that backfill (are of the timestamp right after the backfill timestamp) resolve some // but not all of the time, which results in a checkpoint eventually being created haveGaps := false var backfillTimestamp hlc.Timestamp var initialCheckpoint roachpb.SpanGroup var foundCheckpoint int32 - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { // Stop resolving anything after checkpoint set to avoid eventually resolving the full span if initialCheckpoint.Len() > 0 { return true @@ -1077,7 +1077,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { // Emit resolved events for the majority of spans. Be extra paranoid and ensure that // we have at least 1 span for which we don't emit resolvedFoo timestamp (to force checkpointing). haveGaps := false - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { rndMu.Lock() defer rndMu.Unlock() @@ -1159,7 +1159,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) { // Collect spans we attempt to resolve after when we resume. var resolvedFoo []roachpb.Span - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { if !r.Span.Equal(fooTableSpan) { resolvedFoo = append(resolvedFoo, r.Span) } diff --git a/pkg/ccl/changefeedccl/bench_test.go b/pkg/ccl/changefeedccl/bench_test.go index 1982c82c0710..08a7f06d1d33 100644 --- a/pkg/ccl/changefeedccl/bench_test.go +++ b/pkg/ccl/changefeedccl/bench_test.go @@ -255,14 +255,14 @@ func createBenchmarkChangefeed( if err != nil { return nil, nil, err } - tickFn := func(ctx context.Context) (*jobspb.ResolvedSpan, error) { + tickFn := func(ctx context.Context) (jobspb.ResolvedSpan, error) { event, err := buf.Get(ctx) if err != nil { - return nil, err + return jobspb.ResolvedSpan{}, err } if event.Type() == kvevent.TypeKV { if err := eventConsumer.ConsumeEvent(ctx, event); err != nil { - return nil, err + return jobspb.ResolvedSpan{}, err } } return event.Resolved(), nil diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 1d423312bd2a..eac7ee4217c9 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -542,7 +542,7 @@ func (ca *changeAggregator) tick() error { a := event.DetachAlloc() a.Release(ca.Ctx) resolved := event.Resolved() - if ca.knobs.ShouldSkipResolved == nil || !ca.knobs.ShouldSkipResolved(resolved) { + if ca.knobs.FilterSpanWithMutation == nil || !ca.knobs.FilterSpanWithMutation(&resolved) { return ca.noteResolvedSpan(resolved) } case kvevent.TypeFlush: @@ -555,8 +555,8 @@ func (ca *changeAggregator) tick() error { // noteResolvedSpan periodically flushes Frontier progress from the current // changeAggregator node to the changeFrontier node to allow the changeFrontier // to persist the overall changefeed's progress -func (ca *changeAggregator) noteResolvedSpan(resolved *jobspb.ResolvedSpan) error { - advanced, err := ca.frontier.ForwardResolvedSpan(*resolved) +func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error { + advanced, err := ca.frontier.ForwardResolvedSpan(resolved) if err != nil { return err } diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index e184676fa126..92f236fb43e6 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -1636,14 +1636,14 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { s.SystemServer.DB(), s.Codec, "d", "foo") tableSpan := fooDesc.PrimaryIndexSpan(s.Codec) - // ShouldSkipResolved should ensure that once the backfill begins, the following resolved events + // FilterSpanWithMutation should ensure that once the backfill begins, the following resolved events // that are for that backfill (are of the timestamp right after the backfill timestamp) resolve some // but not all of the time, which results in a checkpoint eventually being created haveGaps := false var backfillTimestamp hlc.Timestamp var initialCheckpoint roachpb.SpanGroup var foundCheckpoint int32 - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { // Stop resolving anything after checkpoint set to avoid eventually resolving the full span if initialCheckpoint.Len() > 0 { return true @@ -1706,7 +1706,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { var secondCheckpoint roachpb.SpanGroup foundCheckpoint = 0 haveGaps = false - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { // Stop resolving anything after second checkpoint set to avoid backfill completion if secondCheckpoint.Len() > 0 { return true @@ -1756,7 +1756,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) { // Collect spans we attempt to resolve after when we resume. var resolved []roachpb.Span - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { resolved = append(resolved, r.Span) return false } @@ -5660,7 +5660,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { // Emit resolved events for majority of spans. Be extra paranoid and ensure that // we have at least 1 span for which we don't emit resolved timestamp (to force checkpointing). haveGaps := false - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { if r.Span.Equal(tableSpan) { // Do not emit resolved events for the entire table span. // We "simulate" large table by splitting single table span into many parts, so @@ -5743,7 +5743,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) { // Collect spans we attempt to resolve after when we resume. var resolved []roachpb.Span - knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool { if !r.Span.Equal(tableSpan) { resolved = append(resolved, r.Span) } @@ -6866,7 +6866,7 @@ func TestChangefeedFlushesSinkToReleaseMemory(t *testing.T) { // an effect of never advancing the frontier, and thus never flushing // the sink due to frontier advancement. The only time we flush the sink // is if the memory pressure causes flush request to be delivered. - knobs.ShouldSkipResolved = func(_ *jobspb.ResolvedSpan) bool { + knobs.FilterSpanWithMutation = func(_ *jobspb.ResolvedSpan) bool { return true } diff --git a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel index 634817e3d252..ab8d3f203d04 100644 --- a/pkg/ccl/changefeedccl/kvevent/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvevent/BUILD.bazel @@ -36,15 +36,18 @@ go_test( name = "kvevent_test", srcs = [ "alloc_test.go", + "bench_test.go", "blocking_buffer_test.go", ], embed = [":kvevent"], deps = [ + "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/roachpb", "//pkg/settings/cluster", "//pkg/sql/randgen", "//pkg/sql/rowenc/keyside", + "//pkg/sql/sem/tree", "//pkg/sql/types", "//pkg/util/ctxgroup", "//pkg/util/encoding", diff --git a/pkg/ccl/changefeedccl/kvevent/bench_test.go b/pkg/ccl/changefeedccl/kvevent/bench_test.go new file mode 100644 index 000000000000..1ab45b2a4704 --- /dev/null +++ b/pkg/ccl/changefeedccl/kvevent/bench_test.go @@ -0,0 +1,121 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package kvevent_test + +import ( + "context" + "math/rand" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" +) + +func BenchmarkMemBuffer(b *testing.B) { + rand, _ := randutil.NewTestRand() + + run := func() { + ba, release := getBoundAccountWithBudget(4096) + defer release() + + metrics := kvevent.MakeMetrics(time.Minute) + + // Arrange for mem buffer to notify us when it waits for resources. + waitCh := make(chan struct{}, 1) + notifyWait := func(ctx context.Context, poolName string, r quotapool.Request) { + select { + case waitCh <- struct{}{}: + default: + } + } + + st := cluster.MakeTestingClusterSettings() + buf := kvevent.NewMemBuffer(ba, &st.SV, &metrics, quotapool.OnWaitStart(notifyWait)) + defer func() { + require.NoError(b, buf.CloseWithReason(context.Background(), nil)) + }() + + producerCtx, stopProducers := context.WithCancel(context.Background()) + wg := ctxgroup.WithContext(producerCtx) + defer func() { + _ = wg.Wait() // Ignore error -- this group returns context cancellation. + }() + + numRows := 0 + wg.GoCtx(func(ctx context.Context) error { + for { + err := buf.Add(ctx, kvevent.MakeResolvedEvent(generateSpan(b, rand), hlc.Timestamp{}, jobspb.ResolvedSpan_NONE)) + if err != nil { + return err + } + numRows++ + } + }) + + <-waitCh + writtenRows := numRows + + for i := 0; i < writtenRows; i++ { + e, err := buf.Get(context.Background()) + if err != nil { + b.Fatal("could not read from buffer") + } + a := e.DetachAlloc() + a.Release(context.Background()) + } + stopProducers() + } + + for i := 0; i < b.N; i++ { + run() + } +} + +func generateSpan(b *testing.B, rng *rand.Rand) roachpb.Span { + start := rng.Intn(2 << 20) + end := start + rng.Intn(2<<20) + startDatum := tree.NewDInt(tree.DInt(start)) + endDatum := tree.NewDInt(tree.DInt(end)) + const tableID = 42 + + startKey, err := keyside.Encode( + keys.SystemSQLCodec.TablePrefix(tableID), + startDatum, + encoding.Ascending, + ) + if err != nil { + b.Fatal("could not generate key") + } + + endKey, err := keyside.Encode( + keys.SystemSQLCodec.TablePrefix(tableID), + endDatum, + encoding.Ascending, + ) + if err != nil { + b.Fatal("could not generate key") + } + + return roachpb.Span{ + Key: startKey, + EndKey: endKey, + } +} diff --git a/pkg/ccl/changefeedccl/kvevent/event.go b/pkg/ccl/changefeedccl/kvevent/event.go index 81fa1b2eae91..64ae7b9d04f7 100644 --- a/pkg/ccl/changefeedccl/kvevent/event.go +++ b/pkg/ccl/changefeedccl/kvevent/event.go @@ -92,7 +92,7 @@ type Event struct { kv roachpb.KeyValue prevVal roachpb.Value flush bool - resolved *jobspb.ResolvedSpan + resolved jobspb.ResolvedSpan backfillTimestamp hlc.Timestamp bufferAddTimestamp time.Time approxSize int @@ -104,7 +104,7 @@ func (b *Event) Type() Type { if b.kv.Key != nil { return TypeKV } - if b.resolved != nil { + if b.resolved.Span.Key != nil { return TypeResolved } if b.flush { @@ -132,7 +132,7 @@ func (b *Event) PrevValue() roachpb.Value { // Resolved will be non-nil if this is a resolved timestamp event (i.e. IsKV() // returns false). -func (b *Event) Resolved() *jobspb.ResolvedSpan { +func (b *Event) Resolved() jobspb.ResolvedSpan { return b.resolved } @@ -202,14 +202,15 @@ func (b *Event) DetachAlloc() Alloc { func MakeResolvedEvent( span roachpb.Span, ts hlc.Timestamp, boundaryType jobspb.ResolvedSpan_BoundaryType, ) Event { - return Event{ - resolved: &jobspb.ResolvedSpan{ + e := Event{ + resolved: jobspb.ResolvedSpan{ Span: span, Timestamp: ts, BoundaryType: boundaryType, }, - approxSize: span.Size() + ts.Size() + 4, } + e.approxSize = e.resolved.Size() + 1 + return e } // MakeKVEvent returns KV event. diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner_test.go b/pkg/ccl/changefeedccl/kvfeed/scanner_test.go index 9ec8f58449a5..b5360e074d2c 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner_test.go @@ -33,7 +33,7 @@ type recordResolvedWriter struct { func (r *recordResolvedWriter) Add(ctx context.Context, e kvevent.Event) error { if e.Type() == kvevent.TypeResolved { - r.resolved = append(r.resolved, *e.Resolved()) + r.resolved = append(r.resolved, e.Resolved()) } return nil } diff --git a/pkg/ccl/changefeedccl/testing_knobs.go b/pkg/ccl/changefeedccl/testing_knobs.go index 4c4061a6fd7f..cafb5c04986a 100644 --- a/pkg/ccl/changefeedccl/testing_knobs.go +++ b/pkg/ccl/changefeedccl/testing_knobs.go @@ -31,9 +31,9 @@ type TestingKnobs struct { // It allows the tests to muck with the Sink, and even return altogether different // implementation. WrapSink func(s Sink, jobID jobspb.JobID) Sink - // ShouldSkipResolved is a filter returning true if the resolved span event should - // be skipped. - ShouldSkipResolved func(resolved *jobspb.ResolvedSpan) bool + // FilterSpanWithMutation is a filter returning true if the resolved span event should + // be skipped. This method takes a pointer in case resolved spans need to be mutated. + FilterSpanWithMutation func(resolved *jobspb.ResolvedSpan) bool // FeedKnobs are kvfeed testing knobs. FeedKnobs kvfeed.TestingKnobs // NullSinkIsExternalIOAccounted controls whether we record diff --git a/pkg/cmd/dev/test.go b/pkg/cmd/dev/test.go index 7569213a40fc..7bf7b5d43367 100644 --- a/pkg/cmd/dev/test.go +++ b/pkg/cmd/dev/test.go @@ -41,6 +41,7 @@ const ( streamOutputFlag = "stream-output" testArgsFlag = "test-args" vModuleFlag = "vmodule" + showDiffFlag = "show-diff" ) func makeTestCmd(runE func(cmd *cobra.Command, args []string) error) *cobra.Command { @@ -105,6 +106,7 @@ pkg/kv/kvserver:kvserver_test) instead.`, testCmd.Flags().Bool(streamOutputFlag, false, "stream test output during run") testCmd.Flags().String(testArgsFlag, "", "additional arguments to pass to the go test binary") testCmd.Flags().String(vModuleFlag, "", "comma-separated list of pattern=N settings for file-filtered logging") + testCmd.Flags().Bool(showDiffFlag, false, "generate a diff for expectation mismatches when possible") return testCmd } @@ -128,6 +130,7 @@ func (d *dev) test(cmd *cobra.Command, commandLine []string) error { showLogs = mustGetFlagBool(cmd, showLogsFlag) count = mustGetFlagInt(cmd, countFlag) vModule = mustGetFlagString(cmd, vModuleFlag) + showDiff = mustGetFlagBool(cmd, showDiffFlag) // These are tests that require access to another directory for // --rewrite. These can either be single directories or @@ -274,6 +277,9 @@ func (d *dev) test(cmd *cobra.Command, commandLine []string) error { } } } + if showDiff { + args = append(args, "--test_arg", "-show-diff") + } if timeout > 0 && !stress { args = append(args, fmt.Sprintf("--test_timeout=%d", int(timeout.Seconds()))) diff --git a/pkg/cmd/dev/testlogic.go b/pkg/cmd/dev/testlogic.go index a9b708a0fb4f..6613fd45ce9a 100644 --- a/pkg/cmd/dev/testlogic.go +++ b/pkg/cmd/dev/testlogic.go @@ -59,6 +59,7 @@ func makeTestLogicCmd(runE func(cmd *cobra.Command, args []string) error) *cobra testLogicCmd.Flags().Bool(stressFlag, false, "run tests under stress") testLogicCmd.Flags().String(stressArgsFlag, "", "additional arguments to pass to stress") testLogicCmd.Flags().String(testArgsFlag, "", "additional arguments to pass to go test binary") + testLogicCmd.Flags().Bool(showDiffFlag, false, "generate a diff for expectation mismatches when possible") addCommonBuildFlags(testLogicCmd) return testLogicCmd @@ -85,6 +86,7 @@ func (d *dev) testlogic(cmd *cobra.Command, commandLine []string) error { stress = mustGetFlagBool(cmd, stressFlag) stressCmdArgs = mustGetFlagString(cmd, stressArgsFlag) testArgs = mustGetFlagString(cmd, testArgsFlag) + showDiff = mustGetFlagBool(cmd, showDiffFlag) ) if rewrite { ignoreCache = true @@ -210,6 +212,9 @@ func (d *dev) testlogic(cmd *cobra.Command, commandLine []string) error { args = append(args, fmt.Sprintf("--test_env=COCKROACH_WORKSPACE=%s", workspace)) args = append(args, "--test_arg", "-rewrite") } + if showDiff { + args = append(args, "--test_arg", "-show-diff") + } if timeout > 0 && !stress { args = append(args, fmt.Sprintf("--test_timeout=%d", int(timeout.Seconds()))) diff --git a/pkg/cmd/roachtest/tests/kv.go b/pkg/cmd/roachtest/tests/kv.go index 8257644f148f..471bc1935142 100644 --- a/pkg/cmd/roachtest/tests/kv.go +++ b/pkg/cmd/roachtest/tests/kv.go @@ -57,6 +57,7 @@ func registerKV(r registry.Registry) { encryption bool sequential bool admissionControlDisabled bool + globalMVCCRangeTombstone bool concMultiplier int ssds int raid0 bool @@ -86,7 +87,11 @@ func registerKV(r registry.Registry) { if opts.ssds > 1 && !opts.raid0 { startOpts.RoachprodOpts.StoreCount = opts.ssds } - c.Start(ctx, t.L(), startOpts, install.MakeClusterSettings(), c.Range(1, nodes)) + settings := install.MakeClusterSettings() + if opts.globalMVCCRangeTombstone { + settings.Env = append(settings.Env, "COCKROACH_GLOBAL_MVCC_RANGE_TOMBSTONE=true") + } + c.Start(ctx, t.L(), startOpts, settings, c.Range(1, nodes)) db := c.Conn(ctx, t.L(), 1) defer db.Close() @@ -180,6 +185,8 @@ func registerKV(r registry.Registry) { {nodes: 3, cpus: 32, readPercent: 95, admissionControlDisabled: true}, {nodes: 3, cpus: 32, readPercent: 0, splits: -1 /* no splits */}, {nodes: 3, cpus: 32, readPercent: 95, splits: -1 /* no splits */}, + {nodes: 3, cpus: 32, readPercent: 0, globalMVCCRangeTombstone: true}, + {nodes: 3, cpus: 32, readPercent: 95, globalMVCCRangeTombstone: true}, // Configs with large block sizes. {nodes: 3, cpus: 8, readPercent: 0, blockSize: 1 << 12 /* 4 KB */}, @@ -258,6 +265,9 @@ func registerKV(r registry.Registry) { if opts.admissionControlDisabled { nameParts = append(nameParts, "no-admission") } + if opts.globalMVCCRangeTombstone { + nameParts = append(nameParts, "mvcc-range-keys=global") + } if opts.concMultiplier != 0 { // support legacy test name which didn't include this multiplier nameParts = append(nameParts, fmt.Sprintf("conc=%d", opts.concMultiplier)) } diff --git a/pkg/cmd/roachtest/tests/ycsb.go b/pkg/cmd/roachtest/tests/ycsb.go index 6b2a0e39a023..5bc1fa4c8b0e 100644 --- a/pkg/cmd/roachtest/tests/ycsb.go +++ b/pkg/cmd/roachtest/tests/ycsb.go @@ -29,6 +29,7 @@ const envYCSBFlags = "ROACHTEST_YCSB_FLAGS" func registerYCSB(r registry.Registry) { workloads := []string{"A", "B", "C", "D", "E", "F"} cpusConfigs := []int{8, 32} + cpusWithGlobalMVCCRangeTombstone := 32 // concurrencyConfigs contains near-optimal concurrency levels for each // (workload, cpu count) combination. All of these figures were tuned on GCP @@ -43,7 +44,9 @@ func registerYCSB(r registry.Registry) { "F": {8: 96, 32: 144}, } - runYCSB := func(ctx context.Context, t test.Test, c cluster.Cluster, wl string, cpus int) { + runYCSB := func( + ctx context.Context, t test.Test, c cluster.Cluster, wl string, cpus int, rangeTombstone bool, + ) { // For now, we only want to run the zfs tests on GCE, since only GCE supports // starting roachprod instances on zfs. if c.Spec().FileSystem == spec.Zfs && c.Spec().Cloud != spec.GCE { @@ -57,9 +60,14 @@ func registerYCSB(r registry.Registry) { t.Fatalf("missing concurrency for (workload, cpus) = (%s, %d)", wl, cpus) } + settings := install.MakeClusterSettings() + if rangeTombstone { + settings.Env = append(settings.Env, "COCKROACH_GLOBAL_MVCC_RANGE_TOMBSTONE=true") + } + c.Put(ctx, t.Cockroach(), "./cockroach", c.Range(1, nodes)) c.Put(ctx, t.DeprecatedWorkload(), "./workload", c.Node(nodes+1)) - c.Start(ctx, t.L(), option.DefaultStartOpts(), install.MakeClusterSettings(), c.Range(1, nodes)) + c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, c.Range(1, nodes)) err := WaitFor3XReplication(ctx, t, c.Conn(ctx, t.L(), 1)) require.NoError(t, err) @@ -98,7 +106,7 @@ func registerYCSB(r registry.Registry) { Owner: registry.OwnerTestEng, Cluster: r.MakeClusterSpec(4, spec.CPU(cpus)), Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runYCSB(ctx, t, c, wl, cpus) + runYCSB(ctx, t, c, wl, cpus, false /* rangeTombstone */) }, }) @@ -108,7 +116,18 @@ func registerYCSB(r registry.Registry) { Owner: registry.OwnerStorage, Cluster: r.MakeClusterSpec(4, spec.CPU(cpus), spec.SetFileSystem(spec.Zfs)), Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - runYCSB(ctx, t, c, wl, cpus) + runYCSB(ctx, t, c, wl, cpus, false /* rangeTombstone */) + }, + }) + } + + if cpus == cpusWithGlobalMVCCRangeTombstone { + r.Add(registry.TestSpec{ + Name: fmt.Sprintf("%s/mvcc-range-keys=global", name), + Owner: registry.OwnerTestEng, + Cluster: r.MakeClusterSpec(4, spec.CPU(cpus)), + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + runYCSB(ctx, t, c, wl, cpus, true /* rangeTombstone */) }, }) } diff --git a/pkg/kv/kvclient/kvcoord/condensable_span_set.go b/pkg/kv/kvclient/kvcoord/condensable_span_set.go index 96765b3400af..a9b25aa5e989 100644 --- a/pkg/kv/kvclient/kvcoord/condensable_span_set.go +++ b/pkg/kv/kvclient/kvcoord/condensable_span_set.go @@ -27,6 +27,13 @@ import ( // "footprint" of the set to grow, so the set should be thought of as on // overestimate. type condensableSpanSet struct { + // TODO(nvanbenschoten): It feels like there is a lot that we could do with + // this data structure to 1) reduce the per span overhead, and 2) avoid the + // retention of many small keys for the duration of a transaction. For + // instance, we could allocate a single large block of memory and copy keys + // into it. We could also store key lengths inline to minimize the per-span + // overhead. Recognizing that many spans are actually point keys would also + // help. s []roachpb.Span bytes int64 @@ -238,9 +245,7 @@ func (s *condensableSpanSet) bytesSize() int64 { } func spanSize(sp roachpb.Span) int64 { - return int64(len(sp.Key) + len(sp.EndKey)) -} - -func keySize(k roachpb.Key) int64 { - return int64(len(k)) + // Since the span is included into a []roachpb.Span, we also need to account + // for the overhead of storing it in that slice. + return roachpb.SpanOverhead + int64(cap(sp.Key)+cap(sp.EndKey)) } diff --git a/pkg/kv/kvclient/kvcoord/condensable_span_set_test.go b/pkg/kv/kvclient/kvcoord/condensable_span_set_test.go index 530bd77622b9..61887b0f53c3 100644 --- a/pkg/kv/kvclient/kvcoord/condensable_span_set_test.go +++ b/pkg/kv/kvclient/kvcoord/condensable_span_set_test.go @@ -27,9 +27,9 @@ func TestCondensableSpanSetMergeContiguousSpans(t *testing.T) { s := condensableSpanSet{} s.insert(roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("b")}) s.insert(roachpb.Span{Key: roachpb.Key("b"), EndKey: roachpb.Key("c")}) - require.Equal(t, int64(4), s.bytes) + require.Equal(t, 4+2*roachpb.SpanOverhead, s.bytes) s.mergeAndSort() - require.Equal(t, int64(2), s.bytes) + require.Equal(t, 2+roachpb.SpanOverhead, s.bytes) } func TestCondensableSpanSetEstimateSize(t *testing.T) { @@ -51,32 +51,32 @@ func TestCondensableSpanSetEstimateSize(t *testing.T) { name: "new spans fit without merging", set: []roachpb.Span{ab, bc}, newSpans: []roachpb.Span{ab}, - mergeThreshold: 100, - expEstimate: 6, + mergeThreshold: 100 + 3*roachpb.SpanOverhead, + expEstimate: 6 + 3*roachpb.SpanOverhead, }, { // The set gets merged, the new spans don't. name: "set needs merging", set: []roachpb.Span{ab, bc}, newSpans: []roachpb.Span{ab}, - mergeThreshold: 5, - expEstimate: 4, + mergeThreshold: 5 + 2*roachpb.SpanOverhead, + expEstimate: 4 + 2*roachpb.SpanOverhead, }, { // The set gets merged, and then it gets merged again with the newSpans. name: "new spans fit without merging", set: []roachpb.Span{ab, bc}, newSpans: []roachpb.Span{ab, bc}, - mergeThreshold: 5, - expEstimate: 2, + mergeThreshold: 5 + 2*roachpb.SpanOverhead, + expEstimate: 2 + roachpb.SpanOverhead, }, { // Everything gets merged, but it still doesn't fit. name: "new spans dont fit", set: []roachpb.Span{ab, bc}, newSpans: []roachpb.Span{ab, bc, largeSpan}, - mergeThreshold: 5, - expEstimate: 12, + mergeThreshold: 5 + 2*roachpb.SpanOverhead, + expEstimate: 12 + 2*roachpb.SpanOverhead, }, } for _, tc := range tests { diff --git a/pkg/kv/kvclient/kvcoord/txn_intercepter_pipeliner_client_test.go b/pkg/kv/kvclient/kvcoord/txn_intercepter_pipeliner_client_test.go index 87a1036cabac..06d3ce43dd72 100644 --- a/pkg/kv/kvclient/kvcoord/txn_intercepter_pipeliner_client_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_intercepter_pipeliner_client_test.go @@ -43,23 +43,25 @@ func TestTxnPipelinerCondenseLockSpans(t *testing.T) { fTof0 := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("f0")} g := roachpb.Span{Key: roachpb.Key("g"), EndKey: roachpb.Key(nil)} g0Tog1 := roachpb.Span{Key: roachpb.Key("g0"), EndKey: roachpb.Key("g1")} - fTog1Closed := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("g1")} + fTog1 := roachpb.Span{Key: roachpb.Key("f"), EndKey: roachpb.Key("g1")} testCases := []struct { span roachpb.Span expLocks []roachpb.Span - expLocksSize int64 + expLocksSize int64 // doesn't include the span overhead }{ {span: a, expLocks: []roachpb.Span{a}, expLocksSize: 1}, {span: b, expLocks: []roachpb.Span{a, b}, expLocksSize: 2}, {span: c, expLocks: []roachpb.Span{a, b, c}, expLocksSize: 3}, {span: d, expLocks: []roachpb.Span{a, b, c, d}, expLocksSize: 10}, - // Note that c-e condenses and then lists first. - {span: e, expLocks: []roachpb.Span{cToEClosed, a, b}, expLocksSize: 5}, - {span: fTof0, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0}, expLocksSize: 8}, - {span: g, expLocks: []roachpb.Span{cToEClosed, a, b, fTof0, g}, expLocksSize: 9}, - {span: g0Tog1, expLocks: []roachpb.Span{fTog1Closed, cToEClosed, aToBClosed}, expLocksSize: 9}, + // Note that c-e condenses and then lists first, we proceed to condense + // a-b too to get under half of the threshold. + {span: e, expLocks: []roachpb.Span{cToEClosed, aToBClosed}, expLocksSize: 6}, + {span: fTof0, expLocks: []roachpb.Span{cToEClosed, aToBClosed, fTof0}, expLocksSize: 9}, + {span: g, expLocks: []roachpb.Span{cToEClosed, aToBClosed, fTof0, g}, expLocksSize: 10}, + // f-g1 condenses and then aToBClosed gets reordered with cToEClosed. + {span: g0Tog1, expLocks: []roachpb.Span{fTog1, aToBClosed, cToEClosed}, expLocksSize: 9}, // Add a key in the middle of a span, which will get merged on commit. - {span: c, expLocks: []roachpb.Span{fTog1Closed, cToEClosed, aToBClosed, c}, expLocksSize: 10}, + {span: c, expLocks: []roachpb.Span{fTog1, aToBClosed, cToEClosed, c}, expLocksSize: 10}, } splits := []roachpb.Span{ {Key: roachpb.Key("a"), EndKey: roachpb.Key("c")}, @@ -78,12 +80,14 @@ func TestTxnPipelinerCondenseLockSpans(t *testing.T) { descDB := kvcoord.TestingMockRangeDescriptorDBForDescs(descs...) s := createTestDB(t) st := s.Store.ClusterSettings() - kvcoord.TrackedWritesMaxSize.Override(ctx, &st.SV, 10) /* 10 bytes and it will condense */ + // 10 bytes for the keys and 192 bytes for the span overhead, and then it + // will condense. + kvcoord.TrackedWritesMaxSize.Override(ctx, &st.SV, 10+4*roachpb.SpanOverhead) defer s.Stop() // Check end transaction locks, which should be condensed and split // at range boundaries. - expLocks := []roachpb.Span{aToBClosed, cToEClosed, fTog1Closed} + expLocks := []roachpb.Span{aToBClosed, cToEClosed, fTog1} sendFn := func(_ context.Context, ba roachpb.BatchRequest) (*roachpb.BatchResponse, error) { resp := ba.CreateReply() resp.Txn = ba.Txn @@ -145,7 +149,7 @@ func TestTxnPipelinerCondenseLockSpans(t *testing.T) { } locksSize := int64(0) for _, i := range locks { - locksSize += int64(len(i.Key) + len(i.EndKey)) + locksSize += int64(len(i.Key) + len(i.EndKey)) // ignoring the span overhead } if a, e := locksSize, tc.expLocksSize; a != e { t.Errorf("%d: keys size expected %d; got %d", i, e, a) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go index 2cfff87423ed..1342d572fc22 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go @@ -1068,3 +1068,7 @@ func (a *inFlightWriteAlloc) clear() { } *a = (*a)[:0] } + +func keySize(k roachpb.Key) int64 { + return int64(len(k)) +} diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go index 4b06da3c4cb1..933826e2017f 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner_test.go @@ -1493,7 +1493,7 @@ func TestTxnPipelinerSavepoints(t *testing.T) { require.Empty(t, tp.ifWrites.len()) } -// TestTxnCoordSenderCondenseLockSpans2 verifies that lock spans are condensed +// TestTxnPipelinerCondenseLockSpans2 verifies that lock spans are condensed // along range boundaries when they exceed the maximum intent bytes threshold. func TestTxnPipelinerCondenseLockSpans2(t *testing.T) { defer leaktest.AfterTest(t)() @@ -1708,10 +1708,12 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) { // The 0-based index of the request that's expected to be rejected. -1 if no // request is expected to be rejected. expRejectIdx int + maxSize int64 }{ {name: "large request", reqs: []roachpb.BatchRequest{largeWrite}, expRejectIdx: 0, + maxSize: int64(len(largeAs)) - 1 + roachpb.SpanOverhead, }, {name: "requests that add up", reqs: []roachpb.BatchRequest{ @@ -1719,6 +1721,9 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) { putBatchNoAsyncConsensus(roachpb.Key("bbbb"), nil), putBatchNoAsyncConsensus(roachpb.Key("cccc"), nil)}, expRejectIdx: 2, + // maxSize is such that first two requests fit and the third one + // goes above the limit. + maxSize: 9 + 2*roachpb.SpanOverhead, }, {name: "async requests that add up", // Like the previous test, but this time the requests run with async @@ -1729,6 +1734,7 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) { putBatch(roachpb.Key("bbbb"), nil), putBatch(roachpb.Key("cccc"), nil)}, expRejectIdx: 2, + maxSize: 10 + roachpb.SpanOverhead, }, { name: "response goes over budget, next request rejected", @@ -1737,6 +1743,7 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) { reqs: []roachpb.BatchRequest{delRange, putBatch(roachpb.Key("a"), nil)}, resp: []*roachpb.BatchResponse{delRangeResp}, expRejectIdx: 1, + maxSize: 10 + roachpb.SpanOverhead, }, { name: "response goes over budget", @@ -1746,12 +1753,18 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) { reqs: []roachpb.BatchRequest{delRange}, resp: []*roachpb.BatchResponse{delRangeResp}, expRejectIdx: -1, + maxSize: 10 + roachpb.SpanOverhead, }, { // Request keys overlap, so they don't count twice. name: "overlapping requests", reqs: []roachpb.BatchRequest{mediumWrite, mediumWrite, mediumWrite}, expRejectIdx: -1, + // Our estimation logic for rejecting requests based on size + // consults both the in-flight write set (which doesn't account for + // the span overhead) as well as the lock footprint (which accounts + // for the span overhead). + maxSize: 16 + roachpb.SpanOverhead, }, } for _, tc := range testCases { @@ -1761,7 +1774,7 @@ func TestTxnPipelinerRejectAboveBudget(t *testing.T) { } tp, mockSender := makeMockTxnPipeliner(nil /* iter */) - TrackedWritesMaxSize.Override(ctx, &tp.st.SV, 10) /* reject when exceeding 10 bytes */ + TrackedWritesMaxSize.Override(ctx, &tp.st.SV, tc.maxSize) rejectTxnOverTrackedWritesBudget.Override(ctx, &tp.st.SV, true) txn := makeTxnProto() diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index 166aad8b9fe2..bf61f077b051 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -78,10 +78,10 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, []roachpb.Span{getArgs.Span(), delRangeArgs.Span()}, - tsr.refreshFootprint.asSlice()) + expSpans := []roachpb.Span{getArgs.Span(), delRangeArgs.Span()} + require.Equal(t, expSpans, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(3), tsr.refreshFootprint.bytes) + require.Equal(t, 3+int64(len(expSpans))*roachpb.SpanOverhead, tsr.refreshFootprint.bytes) require.Zero(t, tsr.refreshedTimestamp) // Scan with limit. Only the scanned keys are added to the refresh spans. @@ -104,11 +104,10 @@ func TestTxnSpanRefresherCollectsSpans(t *testing.T) { require.Nil(t, pErr) require.NotNil(t, br) - require.Equal(t, - []roachpb.Span{getArgs.Span(), delRangeArgs.Span(), {Key: scanArgs.Key, EndKey: keyC}}, - tsr.refreshFootprint.asSlice()) + expSpans = []roachpb.Span{getArgs.Span(), delRangeArgs.Span(), {Key: scanArgs.Key, EndKey: keyC}} + require.Equal(t, expSpans, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(5), tsr.refreshFootprint.bytes) + require.Equal(t, 5+int64(len(expSpans))*roachpb.SpanOverhead, tsr.refreshFootprint.bytes) require.Zero(t, tsr.refreshedTimestamp) } @@ -863,8 +862,8 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { keyC := roachpb.Key("c") keyD, keyE := roachpb.Key("d"), roachpb.Key("e") - // Set MaxTxnRefreshSpansBytes limit to 3 bytes. - MaxTxnRefreshSpansBytes.Override(ctx, &tsr.st.SV, 3) + // Set MaxTxnRefreshSpansBytes limit to 3 bytes plus the span overhead. + MaxTxnRefreshSpansBytes.Override(ctx, &tsr.st.SV, 3+roachpb.SpanOverhead) // Send a batch below the limit. var ba roachpb.BatchRequest @@ -879,7 +878,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) require.Zero(t, tsr.refreshedTimestamp) - require.Equal(t, int64(2), tsr.refreshFootprint.bytes) + require.Equal(t, 2+roachpb.SpanOverhead, tsr.refreshFootprint.bytes) // Send another batch that pushes us above the limit. The tracked spans are // adjacent so the spans will be merged, but not condensed. @@ -893,7 +892,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { require.Equal(t, []roachpb.Span{{Key: keyA, EndKey: keyC}}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(2), tsr.refreshFootprint.bytes) + require.Equal(t, 2+roachpb.SpanOverhead, tsr.refreshFootprint.bytes) require.False(t, tsr.refreshFootprint.condensed) require.Equal(t, int64(0), tsr.refreshMemoryLimitExceeded.Count()) require.Zero(t, tsr.refreshedTimestamp) @@ -916,7 +915,7 @@ func TestTxnSpanRefresherMaxTxnRefreshSpansBytes(t *testing.T) { require.Equal(t, int64(0), tsr.refreshFailWithCondensedSpans.Count()) // Return a transaction retry error and make sure the metric indicating that - // we did not retry due to the refresh span bytes in incremented. + // we did not retry due to the refresh span bytes is incremented. mockSender.MockSend(func(request roachpb.BatchRequest) (batchResponse *roachpb.BatchResponse, r *roachpb.Error) { return nil, roachpb.NewErrorWithTxn( roachpb.NewTransactionRetryError(roachpb.RETRY_SERIALIZABLE, ""), ba.Txn) @@ -1088,8 +1087,8 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { keyA, keyB := roachpb.Key("a"), roachpb.Key("b") keyC, keyD := roachpb.Key("c"), roachpb.Key("d") - // Set MaxTxnRefreshSpansBytes limit to 3 bytes. - MaxTxnRefreshSpansBytes.Override(ctx, &tsr.st.SV, 3) + // Set MaxTxnRefreshSpansBytes limit to 3 bytes plus the span overhead. + MaxTxnRefreshSpansBytes.Override(ctx, &tsr.st.SV, 3+roachpb.SpanOverhead) // Send a batch below the limit. var ba roachpb.BatchRequest @@ -1103,7 +1102,7 @@ func TestTxnSpanRefresherEpochIncrement(t *testing.T) { require.Equal(t, []roachpb.Span{scanArgs.Span()}, tsr.refreshFootprint.asSlice()) require.False(t, tsr.refreshInvalid) - require.Equal(t, int64(2), tsr.refreshFootprint.bytes) + require.Equal(t, 2+roachpb.SpanOverhead, tsr.refreshFootprint.bytes) require.Zero(t, tsr.refreshedTimestamp) // Incrementing the transaction epoch clears the spans. diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 6d00fd74fb65..88ccae035884 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/kvcoord", + "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index f039e0569f47..e9ee7a2cfd03 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" @@ -359,6 +360,9 @@ func (f *RangeFeed) processEvents( f.onSSTable(ctx, ev.SST) case ev.DeleteRange != nil: if f.onDeleteRange == nil { + if kvserverbase.GlobalMVCCRangeTombstoneForTesting { + continue + } return errors.AssertionFailedf( "received unexpected rangefeed DeleteRange event with no OnDeleteRange handler: %s", ev) } diff --git a/pkg/kv/kvserver/batcheval/cmd_init_put.go b/pkg/kv/kvserver/batcheval/cmd_init_put.go index 1ec82e748183..6c2c432cc1c6 100644 --- a/pkg/kv/kvserver/batcheval/cmd_init_put.go +++ b/pkg/kv/kvserver/batcheval/cmd_init_put.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" ) @@ -32,6 +33,10 @@ func InitPut( args := cArgs.Args.(*roachpb.InitPutRequest) h := cArgs.Header + if args.FailOnTombstones && kvserverbase.GlobalMVCCRangeTombstoneForTesting { + args.FailOnTombstones = false + } + var err error if args.Blind { err = storage.MVCCBlindInitPut( diff --git a/pkg/kv/kvserver/kvserverbase/BUILD.bazel b/pkg/kv/kvserver/kvserverbase/BUILD.bazel index 09d46d29899d..a424377a5adb 100644 --- a/pkg/kv/kvserver/kvserverbase/BUILD.bazel +++ b/pkg/kv/kvserver/kvserverbase/BUILD.bazel @@ -18,6 +18,7 @@ go_library( "//pkg/kv/kvserver/kvserverpb", "//pkg/roachpb", "//pkg/settings", + "//pkg/util/envutil", "//pkg/util/errorutil", "//pkg/util/hlc", "//pkg/util/quotapool", diff --git a/pkg/kv/kvserver/kvserverbase/base.go b/pkg/kv/kvserver/kvserverbase/base.go index 2f427b0996df..68da6429a4f8 100644 --- a/pkg/kv/kvserver/kvserverbase/base.go +++ b/pkg/kv/kvserver/kvserverbase/base.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" @@ -227,3 +228,10 @@ var SplitByLoadMergeDelay = settings.RegisterDurationSetting( // MaxCommandSizeDefault is the default for the kv.raft.command.max_size // cluster setting. const MaxCommandSizeDefault = 64 << 20 + +// GlobalMVCCRangeTombstoneForTesting will write an MVCC range tombstone at the +// bottom of the SQL table data keyspace during cluster bootstrapping, for +// performance and correctness testing. This shouldn't affect data written above +// it, but activates range key-specific code paths in the storage layer. +var GlobalMVCCRangeTombstoneForTesting = envutil.EnvOrDefaultBool( + "COCKROACH_GLOBAL_MVCC_RANGE_TOMBSTONE", false) diff --git a/pkg/kv/kvserver/store_init.go b/pkg/kv/kvserver/store_init.go index 4cbd9f971021..7d594c6944b3 100644 --- a/pkg/kv/kvserver/store_init.go +++ b/pkg/kv/kvserver/store_init.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -201,6 +202,14 @@ func WriteInitialClusterData( // NOTE: We don't do stats computations in any of the puts below. Instead, // we write everything and then compute the stats over the whole range. + // If requested, write an MVCC range tombstone at the bottom of the + // keyspace, for performance and correctness testing. + if kvserverbase.GlobalMVCCRangeTombstoneForTesting { + if err := writeGlobalMVCCRangeTombstone(ctx, batch, desc, now.Prev()); err != nil { + return err + } + } + // Range descriptor. if err := storage.MVCCPutProto( ctx, batch, nil /* ms */, keys.RangeDescriptorKey(desc.StartKey), @@ -268,3 +277,29 @@ func WriteInitialClusterData( return nil } + +// writeGlobalMVCCRangeTombstone writes an MVCC range tombstone across the +// entire table data keyspace of the range. This is used to test that storage +// operations are correct and performant in the presence of range tombstones. An +// MVCC range tombstone below all other data should in principle not affect +// anything at all. +func writeGlobalMVCCRangeTombstone( + ctx context.Context, w storage.Writer, desc *roachpb.RangeDescriptor, ts hlc.Timestamp, +) error { + rangeKey := storage.MVCCRangeKey{ + StartKey: desc.StartKey.AsRawKey(), + EndKey: desc.EndKey.AsRawKey(), + Timestamp: ts, + } + if rangeKey.EndKey.Compare(keys.TableDataMin) <= 0 { + return nil + } + if rangeKey.StartKey.Compare(keys.TableDataMin) < 0 { + rangeKey.StartKey = keys.TableDataMin + } + if err := w.PutMVCCRangeKey(rangeKey, storage.MVCCValue{}); err != nil { + return err + } + log.Warningf(ctx, "wrote global MVCC range tombstone %s", rangeKey) + return nil +}