Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
84230: kvcoord: account for the span overhead when condensing refresh spans r=yuzefovich a=yuzefovich

Previously, we would only account for the lengths of the key and the end
key of the span for the purposes of memory estimation while condensing
the refresh spans set. However, each span has non-trivial overhead (48
bytes) of `roachpb.Span` object itself which we previously ignored. As
a result, the actual footprint of the refresh spans could previously
significantly exceed the target size, especially when the keys are
small. For example, when looking at a recently collected core dump,
I saw the refresh spans taking up about 24MB in the heap whereas the
target setting is only 4MiB. This memory currently is not tracked
against the memory accounting system at all, so such over-shots are
quite bad, especially so given the recent bump of the setting from
256KiB to 4MiB.

Addresses: #64906.
Addresses: #81451.

Release note (ops change): The way we track memory against
`kv.transaction.max_intents_bytes` and
`kv.transaction.max_refresh_spans_bytes` has been adjusted to be more
precise (we no longer ignore some of the overhead). As a result, the
stability of CRDB improves (we're less likely to OOM), however, this
change effectively reduces the budgets determined by those cluster
settings. In practice, this means that
- the intents might be tracked more coarsely (due to span coalescing)
which makes the intent resolution less efficient
- the refresh spans become more coarse too making it more likely that
`ReadWithinUncertaintyIntervalError`s are returned to the user rather
than are retried transparently.

85156: changefeedccl: reduce allocations in kvevent blocking buffer  r=jayshrivastava a=jayshrivastava

This change removes a pointer from the kvevent.Event struct, reducing overall allocations. The hope is that this reduces the amount of work Go gc has to do, which will reduce SQL latency at the end of the day. When doing backfills, the allocations in kv events add up pretty fast, so reducing even one pointer is significant.

See #84709 for more info. I'm not closing the issue with this PR since we may decide to reduce more pointers in future PRs using some of the ideas in the issue comments.

Here are the benchmark results 
```
name          old time/op    new time/op    delta
MemBuffer-10    98.1µs ± 0%    95.8µs ± 1%   -2.35%  (p=0.008 n=5+5)

name          old alloc/op   new alloc/op   delta
MemBuffer-10    76.9kB ± 0%    64.4kB ± 0%  -16.17%  (p=0.008 n=5+5)

name          old allocs/op  new allocs/op  delta
MemBuffer-10       859 ± 0%       675 ± 0%  -21.42%  (p=0.008 n=5+5)

```



85368: roachtest: add KV/YCSB benchmarks with global MVCC range tombstones r=jbowens a=erikgrinaker

**kvserver: add env var to write global MVCC range tombstone**

This patch adds the envvar `COCKROACH_GLOBAL_MVCC_RANGE_TOMBSTONE`.
When enabled, it will write a global MVCC range tombstone across the
entire table data keyspan during cluster bootstrapping. This can be used
to test performance and correctness in the presence of MVCC range
tombstones, by activating range key-specific code paths while not
semantically affecting the data above it.

Touches #84384.

Release note: None

**roachtest: add KV/YCSB benchmarks with global MVCC range tombstones**

This patch adds a set of benchmark variants that write a single MVCC
range tombstone across the entire SQL keyspan at cluster start, via the
`COCKROACH_GLOBAL_MVCC_RANGE_TOMBSTONE` env var. Even though this range
tombstone will not affect the data written during the benchmarks, it
activates range key-specific code paths in the storage layer which can
have a significant impact on performance.

The new benchmarks are:

* `kv0/enc=false/nodes=3/cpu=32/mvcc-range-keys=global`
* `kv95/enc=false/nodes=3/cpu=32/mvcc-range-keys=global`
* `ycsb/A/nodes=3/cpu=32/mvcc-range-keys=global`
* `ycsb/B/nodes=3/cpu=32/mvcc-range-keys=global`
* `ycsb/C/nodes=3/cpu=32/mvcc-range-keys=global`
* `ycsb/D/nodes=3/cpu=32/mvcc-range-keys=global`
* `ycsb/E/nodes=3/cpu=32/mvcc-range-keys=global`
* `ycsb/F/nodes=3/cpu=32/mvcc-range-keys=global`

Resolves #84384.


Release note: None

85424: cmd/dev: add support for --show-diff flag from logictests r=rytaft a=rytaft

This commit adds support for the `--show-diff` flag when running tests
with `dev`. This flag is used by the logictests in order to show diffs
between the expected and actual output.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Rebecca Taft <[email protected]>
  • Loading branch information
5 people committed Aug 1, 2022
5 parents 3be0e86 + 649c7fa + e8e664c + a1d87ac + 7465232 commit 9c4662b
Show file tree
Hide file tree
Showing 25 changed files with 321 additions and 77 deletions.
12 changes: 6 additions & 6 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) {
}

// ensure that we do not emit a resolved timestamp
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
return true
}

Expand Down Expand Up @@ -616,7 +616,7 @@ func TestAlterChangefeedAddTargetErrors(t *testing.T) {
)

// allow the changefeed to emit resolved events now
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
return false
}

Expand Down Expand Up @@ -973,14 +973,14 @@ func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
s.SystemServer.DB(), s.Codec, "d", "foo")
tableSpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec)

// ShouldSkipResolved should ensure that once the backfill begins, the following resolved events
// FilterSpanWithMutation should ensure that once the backfill begins, the following resolved events
// that are for that backfill (are of the timestamp right after the backfill timestamp) resolve some
// but not all of the time, which results in a checkpoint eventually being created
haveGaps := false
var backfillTimestamp hlc.Timestamp
var initialCheckpoint roachpb.SpanGroup
var foundCheckpoint int32
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
// Stop resolving anything after checkpoint set to avoid eventually resolving the full span
if initialCheckpoint.Len() > 0 {
return true
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {
// Emit resolved events for the majority of spans. Be extra paranoid and ensure that
// we have at least 1 span for which we don't emit resolvedFoo timestamp (to force checkpointing).
haveGaps := false
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
rndMu.Lock()
defer rndMu.Unlock()

Expand Down Expand Up @@ -1159,7 +1159,7 @@ func TestAlterChangefeedAddTargetsDuringBackfill(t *testing.T) {

// Collect spans we attempt to resolve after when we resume.
var resolvedFoo []roachpb.Span
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
if !r.Span.Equal(fooTableSpan) {
resolvedFoo = append(resolvedFoo, r.Span)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,14 +255,14 @@ func createBenchmarkChangefeed(
if err != nil {
return nil, nil, err
}
tickFn := func(ctx context.Context) (*jobspb.ResolvedSpan, error) {
tickFn := func(ctx context.Context) (jobspb.ResolvedSpan, error) {
event, err := buf.Get(ctx)
if err != nil {
return nil, err
return jobspb.ResolvedSpan{}, err
}
if event.Type() == kvevent.TypeKV {
if err := eventConsumer.ConsumeEvent(ctx, event); err != nil {
return nil, err
return jobspb.ResolvedSpan{}, err
}
}
return event.Resolved(), nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (ca *changeAggregator) tick() error {
a := event.DetachAlloc()
a.Release(ca.Ctx)
resolved := event.Resolved()
if ca.knobs.ShouldSkipResolved == nil || !ca.knobs.ShouldSkipResolved(resolved) {
if ca.knobs.FilterSpanWithMutation == nil || !ca.knobs.FilterSpanWithMutation(&resolved) {
return ca.noteResolvedSpan(resolved)
}
case kvevent.TypeFlush:
Expand All @@ -555,8 +555,8 @@ func (ca *changeAggregator) tick() error {
// noteResolvedSpan periodically flushes Frontier progress from the current
// changeAggregator node to the changeFrontier node to allow the changeFrontier
// to persist the overall changefeed's progress
func (ca *changeAggregator) noteResolvedSpan(resolved *jobspb.ResolvedSpan) error {
advanced, err := ca.frontier.ForwardResolvedSpan(*resolved)
func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error {
advanced, err := ca.frontier.ForwardResolvedSpan(resolved)
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1636,14 +1636,14 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
s.SystemServer.DB(), s.Codec, "d", "foo")
tableSpan := fooDesc.PrimaryIndexSpan(s.Codec)

// ShouldSkipResolved should ensure that once the backfill begins, the following resolved events
// FilterSpanWithMutation should ensure that once the backfill begins, the following resolved events
// that are for that backfill (are of the timestamp right after the backfill timestamp) resolve some
// but not all of the time, which results in a checkpoint eventually being created
haveGaps := false
var backfillTimestamp hlc.Timestamp
var initialCheckpoint roachpb.SpanGroup
var foundCheckpoint int32
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
// Stop resolving anything after checkpoint set to avoid eventually resolving the full span
if initialCheckpoint.Len() > 0 {
return true
Expand Down Expand Up @@ -1706,7 +1706,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {
var secondCheckpoint roachpb.SpanGroup
foundCheckpoint = 0
haveGaps = false
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
// Stop resolving anything after second checkpoint set to avoid backfill completion
if secondCheckpoint.Len() > 0 {
return true
Expand Down Expand Up @@ -1756,7 +1756,7 @@ func TestChangefeedSchemaChangeBackfillCheckpoint(t *testing.T) {

// Collect spans we attempt to resolve after when we resume.
var resolved []roachpb.Span
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
resolved = append(resolved, r.Span)
return false
}
Expand Down Expand Up @@ -5660,7 +5660,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {
// Emit resolved events for majority of spans. Be extra paranoid and ensure that
// we have at least 1 span for which we don't emit resolved timestamp (to force checkpointing).
haveGaps := false
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
if r.Span.Equal(tableSpan) {
// Do not emit resolved events for the entire table span.
// We "simulate" large table by splitting single table span into many parts, so
Expand Down Expand Up @@ -5743,7 +5743,7 @@ func TestChangefeedBackfillCheckpoint(t *testing.T) {

// Collect spans we attempt to resolve after when we resume.
var resolved []roachpb.Span
knobs.ShouldSkipResolved = func(r *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(r *jobspb.ResolvedSpan) bool {
if !r.Span.Equal(tableSpan) {
resolved = append(resolved, r.Span)
}
Expand Down Expand Up @@ -6866,7 +6866,7 @@ func TestChangefeedFlushesSinkToReleaseMemory(t *testing.T) {
// an effect of never advancing the frontier, and thus never flushing
// the sink due to frontier advancement. The only time we flush the sink
// is if the memory pressure causes flush request to be delivered.
knobs.ShouldSkipResolved = func(_ *jobspb.ResolvedSpan) bool {
knobs.FilterSpanWithMutation = func(_ *jobspb.ResolvedSpan) bool {
return true
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/kvevent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,18 @@ go_test(
name = "kvevent_test",
srcs = [
"alloc_test.go",
"bench_test.go",
"blocking_buffer_test.go",
],
embed = [":kvevent"],
deps = [
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/sql/randgen",
"//pkg/sql/rowenc/keyside",
"//pkg/sql/sem/tree",
"//pkg/sql/types",
"//pkg/util/ctxgroup",
"//pkg/util/encoding",
Expand Down
121 changes: 121 additions & 0 deletions pkg/ccl/changefeedccl/kvevent/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package kvevent_test

import (
"context"
"math/rand"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/stretchr/testify/require"
)

func BenchmarkMemBuffer(b *testing.B) {
rand, _ := randutil.NewTestRand()

run := func() {
ba, release := getBoundAccountWithBudget(4096)
defer release()

metrics := kvevent.MakeMetrics(time.Minute)

// Arrange for mem buffer to notify us when it waits for resources.
waitCh := make(chan struct{}, 1)
notifyWait := func(ctx context.Context, poolName string, r quotapool.Request) {
select {
case waitCh <- struct{}{}:
default:
}
}

st := cluster.MakeTestingClusterSettings()
buf := kvevent.NewMemBuffer(ba, &st.SV, &metrics, quotapool.OnWaitStart(notifyWait))
defer func() {
require.NoError(b, buf.CloseWithReason(context.Background(), nil))
}()

producerCtx, stopProducers := context.WithCancel(context.Background())
wg := ctxgroup.WithContext(producerCtx)
defer func() {
_ = wg.Wait() // Ignore error -- this group returns context cancellation.
}()

numRows := 0
wg.GoCtx(func(ctx context.Context) error {
for {
err := buf.Add(ctx, kvevent.MakeResolvedEvent(generateSpan(b, rand), hlc.Timestamp{}, jobspb.ResolvedSpan_NONE))
if err != nil {
return err
}
numRows++
}
})

<-waitCh
writtenRows := numRows

for i := 0; i < writtenRows; i++ {
e, err := buf.Get(context.Background())
if err != nil {
b.Fatal("could not read from buffer")
}
a := e.DetachAlloc()
a.Release(context.Background())
}
stopProducers()
}

for i := 0; i < b.N; i++ {
run()
}
}

func generateSpan(b *testing.B, rng *rand.Rand) roachpb.Span {
start := rng.Intn(2 << 20)
end := start + rng.Intn(2<<20)
startDatum := tree.NewDInt(tree.DInt(start))
endDatum := tree.NewDInt(tree.DInt(end))
const tableID = 42

startKey, err := keyside.Encode(
keys.SystemSQLCodec.TablePrefix(tableID),
startDatum,
encoding.Ascending,
)
if err != nil {
b.Fatal("could not generate key")
}

endKey, err := keyside.Encode(
keys.SystemSQLCodec.TablePrefix(tableID),
endDatum,
encoding.Ascending,
)
if err != nil {
b.Fatal("could not generate key")
}

return roachpb.Span{
Key: startKey,
EndKey: endKey,
}
}
13 changes: 7 additions & 6 deletions pkg/ccl/changefeedccl/kvevent/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type Event struct {
kv roachpb.KeyValue
prevVal roachpb.Value
flush bool
resolved *jobspb.ResolvedSpan
resolved jobspb.ResolvedSpan
backfillTimestamp hlc.Timestamp
bufferAddTimestamp time.Time
approxSize int
Expand All @@ -104,7 +104,7 @@ func (b *Event) Type() Type {
if b.kv.Key != nil {
return TypeKV
}
if b.resolved != nil {
if b.resolved.Span.Key != nil {
return TypeResolved
}
if b.flush {
Expand Down Expand Up @@ -132,7 +132,7 @@ func (b *Event) PrevValue() roachpb.Value {

// Resolved will be non-nil if this is a resolved timestamp event (i.e. IsKV()
// returns false).
func (b *Event) Resolved() *jobspb.ResolvedSpan {
func (b *Event) Resolved() jobspb.ResolvedSpan {
return b.resolved
}

Expand Down Expand Up @@ -202,14 +202,15 @@ func (b *Event) DetachAlloc() Alloc {
func MakeResolvedEvent(
span roachpb.Span, ts hlc.Timestamp, boundaryType jobspb.ResolvedSpan_BoundaryType,
) Event {
return Event{
resolved: &jobspb.ResolvedSpan{
e := Event{
resolved: jobspb.ResolvedSpan{
Span: span,
Timestamp: ts,
BoundaryType: boundaryType,
},
approxSize: span.Size() + ts.Size() + 4,
}
e.approxSize = e.resolved.Size() + 1
return e
}

// MakeKVEvent returns KV event.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type recordResolvedWriter struct {

func (r *recordResolvedWriter) Add(ctx context.Context, e kvevent.Event) error {
if e.Type() == kvevent.TypeResolved {
r.resolved = append(r.resolved, *e.Resolved())
r.resolved = append(r.resolved, e.Resolved())
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ type TestingKnobs struct {
// It allows the tests to muck with the Sink, and even return altogether different
// implementation.
WrapSink func(s Sink, jobID jobspb.JobID) Sink
// ShouldSkipResolved is a filter returning true if the resolved span event should
// be skipped.
ShouldSkipResolved func(resolved *jobspb.ResolvedSpan) bool
// FilterSpanWithMutation is a filter returning true if the resolved span event should
// be skipped. This method takes a pointer in case resolved spans need to be mutated.
FilterSpanWithMutation func(resolved *jobspb.ResolvedSpan) bool
// FeedKnobs are kvfeed testing knobs.
FeedKnobs kvfeed.TestingKnobs
// NullSinkIsExternalIOAccounted controls whether we record
Expand Down
6 changes: 6 additions & 0 deletions pkg/cmd/dev/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
streamOutputFlag = "stream-output"
testArgsFlag = "test-args"
vModuleFlag = "vmodule"
showDiffFlag = "show-diff"
)

func makeTestCmd(runE func(cmd *cobra.Command, args []string) error) *cobra.Command {
Expand Down Expand Up @@ -105,6 +106,7 @@ pkg/kv/kvserver:kvserver_test) instead.`,
testCmd.Flags().Bool(streamOutputFlag, false, "stream test output during run")
testCmd.Flags().String(testArgsFlag, "", "additional arguments to pass to the go test binary")
testCmd.Flags().String(vModuleFlag, "", "comma-separated list of pattern=N settings for file-filtered logging")
testCmd.Flags().Bool(showDiffFlag, false, "generate a diff for expectation mismatches when possible")
return testCmd
}

Expand All @@ -128,6 +130,7 @@ func (d *dev) test(cmd *cobra.Command, commandLine []string) error {
showLogs = mustGetFlagBool(cmd, showLogsFlag)
count = mustGetFlagInt(cmd, countFlag)
vModule = mustGetFlagString(cmd, vModuleFlag)
showDiff = mustGetFlagBool(cmd, showDiffFlag)

// These are tests that require access to another directory for
// --rewrite. These can either be single directories or
Expand Down Expand Up @@ -274,6 +277,9 @@ func (d *dev) test(cmd *cobra.Command, commandLine []string) error {
}
}
}
if showDiff {
args = append(args, "--test_arg", "-show-diff")
}
if timeout > 0 && !stress {
args = append(args, fmt.Sprintf("--test_timeout=%d", int(timeout.Seconds())))

Expand Down
Loading

0 comments on commit 9c4662b

Please sign in to comment.