Skip to content

Commit

Permalink
kvserver: record ScanStats once per BatchRequest
Browse files Browse the repository at this point in the history
Previously, we would record a `kvpb.ScanStats` object into the trace for
each evaluated Get, Scan, and ReverseScan command. This was suboptimal
for two reasons:
- this required an allocation of that `kvpb.ScanStats` object
- this required propagating all of these separate objects via the
tracing infrastructure which might make it so that the tracing limits
are reached resulting in some objects being dropped.

This commit, instead, changes the ScanStats to be tracked at the
BatchRequest level, thus, we only need to record a single object per
BatchRequest. This results in reduced granularity, but that is still
sufficient for the SQL needs which simply aggregates all
`kvpb.ScanStats` from a single SQL processor into one object. As
a result, the tpch_concurrency metric averaged over 20 runs increased
from 76.75 to 84.75.

Additionally, this commit makes it so that we track the number of Gets,
Scans, and ReverseScans actually evaluated as part of the BatchResponse.
This information is plumbed through a couple of protos but is not
exposed in any SQL Observability virtual tables. Still, due to having it
in the protos will include this information into the trace.

Release note: None
  • Loading branch information
yuzefovich committed Feb 22, 2023
1 parent b3231c0 commit 1154be5
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 41 deletions.
6 changes: 4 additions & 2 deletions pkg/kv/kvpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1761,7 +1761,8 @@ func (s *ScanStats) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("scan stats: stepped %d times (%d internal); seeked %d times (%d internal); "+
"block-bytes: (total %s, cached %s); "+
"points: (count %s, key-bytes %s, value-bytes %s, tombstoned: %s) "+
"ranges: (count %s), (contained-points %s, skipped-points %s)",
"ranges: (count %s), (contained-points %s, skipped-points %s) "+
"evaluated requests: %d gets, %d scans, %d reverse scans",
s.NumInterfaceSteps, s.NumInternalSteps, s.NumInterfaceSeeks, s.NumInternalSeeks,
humanizeutil.IBytes(int64(s.BlockBytes)),
humanizeutil.IBytes(int64(s.BlockBytesInCache)),
Expand All @@ -1771,7 +1772,8 @@ func (s *ScanStats) SafeFormat(w redact.SafePrinter, _ rune) {
humanizePointCount(s.PointsCoveredByRangeTombstones),
humanizePointCount(s.RangeKeyCount),
humanizePointCount(s.RangeKeyContainedPoints),
humanizePointCount(s.RangeKeySkippedPoints))
humanizePointCount(s.RangeKeySkippedPoints),
s.NumGets, s.NumScans, s.NumReverseScans)
if s.SeparatedPointCount != 0 {
w.Printf(" separated: (count: %s, bytes: %s, bytes-fetched: %s)",
humanizePointCount(s.SeparatedPointCount),
Expand Down
12 changes: 10 additions & 2 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3208,8 +3208,8 @@ message ContentionEvent {
(gogoproto.stdduration) = true];
}

// ScanStats is a message that will be attached to BatchResponses containing
// information about what happened during each scan and get in the request.
// ScanStats is a message that tracks miscellaneous statistics of all Gets,
// Scans, and ReverseScans in a single BatchResponse.
message ScanStats {
option (gogoproto.goproto_stringer) = false;

Expand All @@ -3231,4 +3231,12 @@ message ScanStats {
uint64 separated_point_count = 14;
uint64 separated_point_value_bytes = 15;
uint64 separated_point_value_bytes_fetched = 16;

// NumGets, NumScans, and NumReverseScans tracks the number of Gets, Scans,
// and ReverseScans, respectively, that were actually evaluated as part of the
// BatchResponse. These don't include requests that were not evaluated due to
// reaching the BatchResponse's limits or an error.
uint64 num_gets = 17;
uint64 num_scans = 18;
uint64 num_reverse_scans = 19;
}
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func Get(
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
FailOnMoreRecent: args.KeyLocking != lock.None,
ScanStats: cArgs.ScanStats,
Uncertainty: cArgs.Uncertainty,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func ReverseScan(
Inconsistent: h.ReadConsistency != kvpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
ScanStats: cArgs.ScanStats,
Uncertainty: cArgs.Uncertainty,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func Scan(
Inconsistent: h.ReadConsistency != kvpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
ScanStats: cArgs.ScanStats,
Uncertainty: cArgs.Uncertainty,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ type CommandArgs struct {
Args kvpb.Request
Now hlc.ClockTimestamp
// *Stats should be mutated to reflect any writes made by the command.
Stats *enginepb.MVCCStats
Stats *enginepb.MVCCStats
// ScanStats should be mutated to reflect Get and Scan/ReverseScan reads
// made by the command.
ScanStats *kvpb.ScanStats
Concurrency *concurrency.Guard
Uncertainty uncertainty.Interval
DontInterleaveIntents bool
Expand Down
18 changes: 17 additions & 1 deletion pkg/kv/kvserver/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/kr/pretty"
)
Expand Down Expand Up @@ -233,6 +235,18 @@ func evaluateBatch(
cantDeferWTOE bool
}

// Only collect the scan stats if the tracing is enabled.
var ss *kvpb.ScanStats
if sp := tracing.SpanFromContext(ctx); sp.RecordingType() != tracingpb.RecordingOff {
ss = &kvpb.ScanStats{}
defer func() {
if ss.NumGets != 0 || ss.NumScans != 0 || ss.NumReverseScans != 0 {
// Only record non-empty ScanStats.
sp.RecordStructured(ss)
}
}()
}

// TODO(tbg): if we introduced an "executor" helper here that could carry state
// across the slots in the batch while we execute them, this code could come
// out a lot less ad-hoc.
Expand Down Expand Up @@ -278,7 +292,7 @@ func evaluateBatch(
// may carry a response transaction and in the case of WriteTooOldError
// (which is sometimes deferred) it is fully populated.
curResult, err := evaluateCommand(
ctx, readWriter, rec, ms, baHeader, args, reply, g, st, ui, evalPath,
ctx, readWriter, rec, ms, ss, baHeader, args, reply, g, st, ui, evalPath,
)

if filter := rec.EvalKnobs().TestingPostEvalFilter; filter != nil {
Expand Down Expand Up @@ -479,6 +493,7 @@ func evaluateCommand(
readWriter storage.ReadWriter,
rec batcheval.EvalContext,
ms *enginepb.MVCCStats,
ss *kvpb.ScanStats,
h kvpb.Header,
args kvpb.Request,
reply kvpb.Response,
Expand All @@ -501,6 +516,7 @@ func evaluateCommand(
Args: args,
Now: now,
Stats: ms,
ScanStats: ss,
Concurrency: g,
Uncertainty: ui,
DontInterleaveIntents: evalPath == readOnlyWithoutInterleavedIntents,
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/execinfrapb/component_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ message KVStats {
// alongside total execution time for operators that perform KV work.
optional util.optional.Duration kv_cpu_time = 11 [(gogoproto.nullable) = false,
(gogoproto.customname) = "KVCPUTime"];

optional util.optional.Uint num_gets = 21 [(gogoproto.nullable) = false];
optional util.optional.Uint num_scans = 22 [(gogoproto.nullable) = false];
optional util.optional.Uint num_reverse_scans = 23 [(gogoproto.nullable) = false];
}

// ExecStats contains statistics about the execution of a component.
Expand Down
11 changes: 10 additions & 1 deletion pkg/sql/execstats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ type ScanStats struct {
SeparatedPointValueBytesFetched uint64
// ConsumedRU is the number of RUs that were consumed during the course of a
// scan.
ConsumedRU uint64
ConsumedRU uint64
NumGets uint64
NumScans uint64
NumReverseScans uint64
}

// PopulateKVMVCCStats adds data from the input ScanStats to the input KVStats.
Expand All @@ -110,6 +113,9 @@ func PopulateKVMVCCStats(kvStats *execinfrapb.KVStats, ss *ScanStats) {
kvStats.RangeKeyCount = optional.MakeUint(ss.RangeKeyCount)
kvStats.RangeKeyContainedPoints = optional.MakeUint(ss.RangeKeyContainedPoints)
kvStats.RangeKeySkippedPoints = optional.MakeUint(ss.RangeKeySkippedPoints)
kvStats.NumGets = optional.MakeUint(ss.NumGets)
kvStats.NumScans = optional.MakeUint(ss.NumScans)
kvStats.NumReverseScans = optional.MakeUint(ss.NumReverseScans)
}

// GetScanStats is a helper function to calculate scan stats from the given
Expand Down Expand Up @@ -143,6 +149,9 @@ func GetScanStats(ctx context.Context, recording tracingpb.Recording) (scanStats
scanStats.SeparatedPointCount += ss.SeparatedPointCount
scanStats.SeparatedPointValueBytes += ss.SeparatedPointValueBytes
scanStats.SeparatedPointValueBytesFetched += ss.SeparatedPointValueBytesFetched
scanStats.NumGets += ss.NumGets
scanStats.NumScans += ss.NumScans
scanStats.NumReverseScans += ss.NumReverseScans
} else if pbtypes.Is(any, &tc) {
if err := pbtypes.UnmarshalAny(any, &tc); err != nil {
return
Expand Down
1 change: 0 additions & 1 deletion pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ go_library(
"//pkg/util/sysutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//oserror",
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/col_mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ func mvccScanToCols(
if err != nil {
return MVCCScanResult{}, err
}
if err = finalizeScanResult(ctx, mvccScanner, &res, opts.errOnIntents()); err != nil {
if err = finalizeScanResult(mvccScanner, &res, opts); err != nil {
return MVCCScanResult{}, err
}
return res, nil
Expand Down
71 changes: 39 additions & 32 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
)
Expand Down Expand Up @@ -886,6 +885,7 @@ type MVCCGetOptions struct {
Tombstones bool
FailOnMoreRecent bool
Txn *roachpb.Transaction
ScanStats *kvpb.ScanStats
Uncertainty uncertainty.Interval
// MemoryAccount is used for tracking memory allocations.
MemoryAccount *mon.BoundAccount
Expand Down Expand Up @@ -1150,8 +1150,11 @@ func mvccGetWithValueHeader(
mvccScanner.init(opts.Txn, opts.Uncertainty, &results)
mvccScanner.get(ctx)

// If we have a trace, emit the scan stats that we produced.
recordIteratorStats(ctx, mvccScanner.parent)
// If we're tracking the ScanStats, include the stats from this Get.
if opts.ScanStats != nil {
recordIteratorStats(mvccScanner.parent, opts.ScanStats)
opts.ScanStats.NumGets++
}

if mvccScanner.err != nil {
return optionalValue{}, nil, enginepb.MVCCValueHeader{}, mvccScanner.err
Expand Down Expand Up @@ -3616,36 +3619,31 @@ func MVCCDeleteRangeUsingTombstone(
return nil
}

func recordIteratorStats(ctx context.Context, iter MVCCIterator) {
sp := tracing.SpanFromContext(ctx)
if sp.RecordingType() == tracingpb.RecordingOff {
// Short-circuit before doing any work.
return
}
// recordIteratorStats updates the provided ScanStats (which is assumed to be
// non-nil) with the MVCC stats from iter.
func recordIteratorStats(iter MVCCIterator, scanStats *kvpb.ScanStats) {
iteratorStats := iter.Stats()
stats := &iteratorStats.Stats
steps := stats.ReverseStepCount[pebble.InterfaceCall] + stats.ForwardStepCount[pebble.InterfaceCall]
seeks := stats.ReverseSeekCount[pebble.InterfaceCall] + stats.ForwardSeekCount[pebble.InterfaceCall]
internalSteps := stats.ReverseStepCount[pebble.InternalIterCall] + stats.ForwardStepCount[pebble.InternalIterCall]
internalSeeks := stats.ReverseSeekCount[pebble.InternalIterCall] + stats.ForwardSeekCount[pebble.InternalIterCall]
sp.RecordStructured(&kvpb.ScanStats{
NumInterfaceSeeks: uint64(seeks),
NumInternalSeeks: uint64(internalSeeks),
NumInterfaceSteps: uint64(steps),
NumInternalSteps: uint64(internalSteps),
BlockBytes: stats.InternalStats.BlockBytes,
BlockBytesInCache: stats.InternalStats.BlockBytesInCache,
KeyBytes: stats.InternalStats.KeyBytes,
ValueBytes: stats.InternalStats.ValueBytes,
PointCount: stats.InternalStats.PointCount,
PointsCoveredByRangeTombstones: stats.InternalStats.PointsCoveredByRangeTombstones,
RangeKeyCount: uint64(stats.RangeKeyStats.Count),
RangeKeyContainedPoints: uint64(stats.RangeKeyStats.ContainedPoints),
RangeKeySkippedPoints: uint64(stats.RangeKeyStats.SkippedPoints),
SeparatedPointCount: stats.InternalStats.SeparatedPointValue.Count,
SeparatedPointValueBytes: stats.InternalStats.SeparatedPointValue.ValueBytes,
SeparatedPointValueBytesFetched: stats.InternalStats.SeparatedPointValue.ValueBytesFetched,
})
scanStats.NumInterfaceSeeks += uint64(seeks)
scanStats.NumInternalSeeks += uint64(internalSeeks)
scanStats.NumInterfaceSteps += uint64(steps)
scanStats.NumInternalSteps += uint64(internalSteps)
scanStats.BlockBytes += stats.InternalStats.BlockBytes
scanStats.BlockBytesInCache += stats.InternalStats.BlockBytesInCache
scanStats.KeyBytes += stats.InternalStats.KeyBytes
scanStats.ValueBytes += stats.InternalStats.ValueBytes
scanStats.PointCount += stats.InternalStats.PointCount
scanStats.PointsCoveredByRangeTombstones += stats.InternalStats.PointsCoveredByRangeTombstones
scanStats.RangeKeyCount += uint64(stats.RangeKeyStats.Count)
scanStats.RangeKeyContainedPoints += uint64(stats.RangeKeyStats.ContainedPoints)
scanStats.RangeKeySkippedPoints += uint64(stats.RangeKeyStats.SkippedPoints)
scanStats.SeparatedPointCount += stats.InternalStats.SeparatedPointValue.Count
scanStats.SeparatedPointValueBytes += stats.InternalStats.SeparatedPointValue.ValueBytes
scanStats.SeparatedPointValueBytesFetched += stats.InternalStats.SeparatedPointValue.ValueBytesFetched
}

// mvccScanInit performs some preliminary checks on the validity of options for
Expand Down Expand Up @@ -3733,7 +3731,7 @@ func mvccScanToBytes(
}

res.KVData = results.finish()
if err = finalizeScanResult(ctx, mvccScanner, &res, opts.errOnIntents()); err != nil {
if err = finalizeScanResult(mvccScanner, &res, opts); err != nil {
return MVCCScanResult{}, err
}
return res, nil
Expand All @@ -3743,20 +3741,28 @@ func mvccScanToBytes(
// completed successfully. It also performs some additional auxiliary tasks
// (like recording iterators stats).
func finalizeScanResult(
ctx context.Context, mvccScanner *pebbleMVCCScanner, res *MVCCScanResult, errOnIntents bool,
mvccScanner *pebbleMVCCScanner, res *MVCCScanResult, opts MVCCScanOptions,
) error {
res.NumKeys, res.NumBytes, _ = mvccScanner.results.sizeInfo(0 /* lenKey */, 0 /* lenValue */)

// If we have a trace, emit the scan stats that we produced.
recordIteratorStats(ctx, mvccScanner.parent)
// If we're tracking the ScanStats, include the stats from this Scan /
// ReverseScan.
if opts.ScanStats != nil {
recordIteratorStats(mvccScanner.parent, opts.ScanStats)
if opts.Reverse {
opts.ScanStats.NumReverseScans++
} else {
opts.ScanStats.NumScans++
}
}

var err error
res.Intents, err = buildScanIntents(mvccScanner.intentsRepr())
if err != nil {
return err
}

if errOnIntents && len(res.Intents) > 0 {
if opts.errOnIntents() && len(res.Intents) > 0 {
return &kvpb.WriteIntentError{Intents: res.Intents}
}
return nil
Expand Down Expand Up @@ -3833,6 +3839,7 @@ type MVCCScanOptions struct {
Reverse bool
FailOnMoreRecent bool
Txn *roachpb.Transaction
ScanStats *kvpb.ScanStats
Uncertainty uncertainty.Interval
// MaxKeys is the maximum number of kv pairs returned from this operation.
// The zero value represents an unbounded scan. If the limit stops the scan,
Expand Down

0 comments on commit 1154be5

Please sign in to comment.