Skip to content

Commit

Permalink
engine: add byte limit to MVCCScan
Browse files Browse the repository at this point in the history
A fledgling step towards cockroachdb#19721 is allowing incoming KV requests to
bound the size of the response in terms of bytes rather than rows.
This commit adds a TargetBytes field to MVCCScanOptions to address
this need: scans stop once the size of the result meets or exceeds the
threshold (at least one key will be added, regardless of its size),
and returns a ResumeSpan as appropriate.

The classic example of the problem this addresses is a table in which
each row is, say, ~1mb in size. A full table scan will currently fetch
data from KV in batches of [10k], causing at least 10GB of data held in
memory at any given moment. This sort of thing does happen in practice;
we have a long-failing roachtest cockroachdb#33660 because of just that, and
anecdotally OOMs in production clusters are with regularity caused by
individual queries consuming excessive amounts of memory at the KV
level.

Plumbing this limit into a header field on BatchRequest and down to the
engine level will allow the batching in [10k] to become byte-sized in
nature, thus avoiding one obvious source OOMs. This doesn't solve cockroachdb#19721
in general (many such requests could work together to consume a lot of
memory after all), but it's a sane baby step that might just avoid a
large portion of OOMs already.

[10k]: https://github.com/cockroachdb/cockroach/blob/0a658c19cd164e7c021eaff7f73db173f0650e8c/pkg/sql/row/kv_batch_fetcher.go#L25-L29

Release note: None
  • Loading branch information
tbg committed Feb 3, 2020
1 parent e49530d commit 14b6cc3
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 15 deletions.
2 changes: 2 additions & 0 deletions c-deps/libroach/chunked_buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@ void chunkedBuffer::Put(const rocksdb::Slice& key, const rocksdb::Slice& value)
put(key.data(), key.size(), value.size());
put(value.data(), value.size(), 0);
count_++;
bytes_ += sizeof(size_buf) + key.size() + value.size(); // see (*pebbleResults).put
}

void chunkedBuffer::Clear() {
for (int i = 0; i < bufs_.size(); i++) {
delete[] bufs_[i].data;
}
count_ = 0;
bytes_ = 0;
buf_ptr_ = nullptr;
bufs_.clear();
}
Expand Down
3 changes: 3 additions & 0 deletions c-deps/libroach/chunked_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,16 @@ class chunkedBuffer {

// Get the number of key/value pairs written to this chunkedBuffer.
int Count() const { return count_; }
// Get the number of bytes written to this chunkedBuffer.
int NumBytes() const { return bytes_; }

private:
void put(const char* data, int len, int next_size_hint);

private:
std::vector<DBSlice> bufs_;
int64_t count_;
int64_t bytes_;
char* buf_ptr_;
};

Expand Down
4 changes: 2 additions & 2 deletions c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -366,8 +366,8 @@ typedef struct {
DBScanResults MVCCGet(DBIterator* iter, DBSlice key, DBTimestamp timestamp, DBTxn txn,
bool inconsistent, bool tombstones, bool fail_on_more_recent);
DBScanResults MVCCScan(DBIterator* iter, DBSlice start, DBSlice end, DBTimestamp timestamp,
int64_t max_keys, DBTxn txn, bool inconsistent, bool reverse,
bool tombstones, bool fail_on_more_recent);
int64_t max_keys, int64_t target_bytes, DBTxn txn, bool inconsistent,
bool reverse, bool tombstones, bool fail_on_more_recent);

// DBStatsResult contains various runtime stats for RocksDB.
typedef struct {
Expand Down
14 changes: 7 additions & 7 deletions c-deps/libroach/mvcc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -275,21 +275,21 @@ DBScanResults MVCCGet(DBIterator* iter, DBSlice key, DBTimestamp timestamp, DBTx
// different than the start key. This is a bit of a hack.
const DBSlice end = {0, 0};
ScopedStats scoped_iter(iter);
mvccForwardScanner scanner(iter, key, end, timestamp, 1 /* max_keys */, txn, inconsistent,
tombstones, fail_on_more_recent);
mvccForwardScanner scanner(iter, key, end, timestamp, 1 /* max_keys */, 0 /* target_bytes */, txn,
inconsistent, tombstones, fail_on_more_recent);
return scanner.get();
}

DBScanResults MVCCScan(DBIterator* iter, DBSlice start, DBSlice end, DBTimestamp timestamp,
int64_t max_keys, DBTxn txn, bool inconsistent, bool reverse,
bool tombstones, bool fail_on_more_recent) {
int64_t max_keys, int64_t target_bytes, DBTxn txn, bool inconsistent,
bool reverse, bool tombstones, bool fail_on_more_recent) {
ScopedStats scoped_iter(iter);
if (reverse) {
mvccReverseScanner scanner(iter, end, start, timestamp, max_keys, txn, inconsistent, tombstones,
fail_on_more_recent);
mvccReverseScanner scanner(iter, end, start, timestamp, max_keys, target_bytes, txn,
inconsistent, tombstones, fail_on_more_recent);
return scanner.scan();
} else {
mvccForwardScanner scanner(iter, start, end, timestamp, max_keys, txn, inconsistent, tombstones,
mvccForwardScanner scanner(iter, start, end, timestamp, max_keys, target_bytes, txn, inconsistent, tombstones,
fail_on_more_recent);
return scanner.scan();
}
Expand Down
9 changes: 7 additions & 2 deletions c-deps/libroach/mvcc.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ static const int kMaxItersBeforeSeek = 10;
template <bool reverse> class mvccScanner {
public:
mvccScanner(DBIterator* iter, DBSlice start, DBSlice end, DBTimestamp timestamp, int64_t max_keys,
DBTxn txn, bool inconsistent, bool tombstones, bool fail_on_more_recent)
int64_t target_bytes, DBTxn txn, bool inconsistent, bool tombstones, bool fail_on_more_recent)
: iter_(iter),
iter_rep_(iter->rep.get()),
start_key_(ToSlice(start)),
end_key_(ToSlice(end)),
max_keys_(max_keys),
target_bytes_(target_bytes),
timestamp_(timestamp),
txn_id_(ToSlice(txn.id)),
txn_epoch_(txn.epoch),
Expand Down Expand Up @@ -557,6 +558,9 @@ template <bool reverse> class mvccScanner {
// instructed to include tombstones in the results.
if (value.size() > 0 || tombstones_) {
kvs_->Put(cur_raw_key_, value);
if (target_bytes_ > 0 && kvs_->NumBytes() >= target_bytes_) {
max_keys_ = kvs_->Count();
}
if (kvs_->Count() == max_keys_) {
return false;
}
Expand Down Expand Up @@ -745,7 +749,8 @@ template <bool reverse> class mvccScanner {
rocksdb::Iterator* const iter_rep_;
const rocksdb::Slice start_key_;
const rocksdb::Slice end_key_;
const int64_t max_keys_;
int64_t max_keys_;
const int64_t target_bytes_; // see MVCCScanOptions
const DBTimestamp timestamp_;
const rocksdb::Slice txn_id_;
const uint32_t txn_epoch_;
Expand Down
13 changes: 13 additions & 0 deletions pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2329,6 +2329,7 @@ func mvccScanToBytes(
end: endKey,
ts: timestamp,
maxKeys: max,
targetBytes: opts.TargetBytes,
inconsistent: opts.Inconsistent,
tombstones: opts.Tombstones,
failOnMoreRecent: opts.FailOnMoreRecent,
Expand Down Expand Up @@ -2430,6 +2431,18 @@ type MVCCScanOptions struct {
Reverse bool
FailOnMoreRecent bool
Txn *roachpb.Transaction
// TargetBytes is a byte threshold to limit the amount of data pulled into
// memory during a Scan operation. Once the target is satisfied (i.e. met or
// exceeded) by the emitted emitted KV pairs, iteration stops (with a
// ResumeSpan as appropriate). In particular, at least one kv pair is
// returned (when one exists).
//
// The number of bytes a particular kv pair accrues depends on internal data
// structures, but it is guaranteed to exceed that of the bytes stored in
// the key and value itself.
//
// The zero value indicates no limit.
TargetBytes int64
}

func (opts *MVCCScanOptions) validate() error {
Expand Down
10 changes: 9 additions & 1 deletion pkg/storage/engine/mvcc_history_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,12 @@ func cmdScan(e *evalCtx) error {
e.scanArg("max", &imax)
max = int64(imax)
}
vals, _, intents, err := MVCCScan(e.ctx, e.engine, key, endKey, max, ts, opts)
if key := "targetbytes"; e.hasArg(key) {
var tb int
e.scanArg(key, &tb)
opts.TargetBytes = int64(tb)
}
vals, resumeSpan, intents, err := MVCCScan(e.ctx, e.engine, key, endKey, max, ts, opts)
// NB: the error is returned below. This ensures the test can
// ascertain no result is populated in the intents when an error
// occurs.
Expand All @@ -713,6 +718,9 @@ func cmdScan(e *evalCtx) error {
for _, val := range vals {
fmt.Fprintf(e.results.buf, "scan: %v -> %v @%v\n", val.Key, val.Value.PrettyPrint(), val.Value.Timestamp)
}
if resumeSpan != nil {
fmt.Fprintf(e.results.buf, "scan: resume span [%s,%s)\n", resumeSpan.Key, resumeSpan.EndKey)
}
if len(vals) == 0 {
fmt.Fprintf(e.results.buf, "scan: %v-%v -> <no data>\n", key, endKey)
}
Expand Down
17 changes: 16 additions & 1 deletion pkg/storage/engine/pebble_mvcc_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
// expected by MVCCScanDecodeKeyValue.
type pebbleResults struct {
count int64
bytes int64
repr []byte
bufs [][]byte
}
Expand Down Expand Up @@ -76,6 +77,7 @@ func (p *pebbleResults) put(key MVCCKey, value []byte) {
encodeKeyToBuf(p.repr[startIdx+kvLenSize:startIdx+kvLenSize+lenKey], key, lenKey)
copy(p.repr[startIdx+kvLenSize+lenKey:], value)
p.count++
p.bytes += int64(lenToAdd)
}

func (p *pebbleResults) finish() [][]byte {
Expand All @@ -96,8 +98,13 @@ type pebbleMVCCScanner struct {
start, end roachpb.Key
// Timestamp with which MVCCScan/MVCCGet was called.
ts hlc.Timestamp
// Max number of keys to return.
// Max number of keys to return. Note that targetBytes below is implemented
// by mutating maxKeys. (In particular, one must not assume that if maxKeys
// is zero initially it will always be zero).
maxKeys int64
// Stop adding keys once p.result.bytes matches or exceeds this threshold,
// if nonzero.
targetBytes int64
// Transaction epoch and sequence number.
txn *roachpb.Transaction
txnEpoch enginepb.TxnEpoch
Expand Down Expand Up @@ -549,6 +556,14 @@ func (p *pebbleMVCCScanner) addAndAdvance(val []byte) bool {
// to include tombstones in the results.
if len(val) > 0 || p.tombstones {
p.results.put(p.curMVCCKey(), val)
if p.targetBytes > 0 && p.results.bytes >= p.targetBytes {
// When the target bytes are met or exceeded, stop producing more
// keys. We implement this by reducing maxKeys to the current
// number of keys.
//
// TODO(bilal): see if this can be implemented more transparently.
p.maxKeys = p.results.count
}
if p.results.count == p.maxKeys {
return false
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2485,8 +2485,8 @@ func (r *rocksDBIterator) MVCCScan(

r.clearState()
state := C.MVCCScan(
r.iter, goToCSlice(start), goToCSlice(end),
goToCTimestamp(timestamp), C.int64_t(max),
r.iter, goToCSlice(start), goToCSlice(end), goToCTimestamp(timestamp),
C.int64_t(max), C.int64_t(opts.TargetBytes),
goToCTxn(opts.Txn), C.bool(opts.Inconsistent),
C.bool(opts.Reverse), C.bool(opts.Tombstones),
C.bool(opts.FailOnMoreRecent),
Expand Down
Loading

0 comments on commit 14b6cc3

Please sign in to comment.