Skip to content

Commit

Permalink
kv: finish up impl of TargetBytes scan limit
Browse files Browse the repository at this point in the history
This PR finishes up the work initiated in cockroachdb#44925 to allow (forward and
reverse) scans to specify a TargetBytes hint which (mod overshooting by
one row) restricts the size of the responses.

The plan is to use it in kvfetcher, however this is left as a separate
commit - here we focus on testing the functionality only.

Release note: None
  • Loading branch information
tbg committed Feb 24, 2020
1 parent 9ecf1b0 commit 575dedd
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 29 deletions.
21 changes: 16 additions & 5 deletions pkg/kv/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ func (ds *DistSender) initAndVerifyBatch(
return roachpb.NewErrorf("empty batch")
}

if ba.MaxSpanRequestKeys != 0 {
if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 {
// Verify that the batch contains only specific range requests or the
// EndTxnRequest. Verify that a batch with a ReverseScan only contains
// ReverseScan range requests.
Expand Down Expand Up @@ -672,10 +672,11 @@ func (ds *DistSender) Send(
splitET = true
}
parts := splitBatchAndCheckForRefreshSpans(ba, splitET)
if len(parts) > 1 && ba.MaxSpanRequestKeys != 0 {
if len(parts) > 1 && (ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0) {
// We already verified above that the batch contains only scan requests of the same type.
// Such a batch should never need splitting.
panic("batch with MaxSpanRequestKeys needs splitting")
log.Fatalf(ctx, "batch with MaxSpanRequestKeys=%d, TargetBytes=%d needs splitting",
log.Safe(ba.MaxSpanRequestKeys), log.Safe(ba.TargetBytes))
}

errIdxOffset := 0
Expand Down Expand Up @@ -1152,7 +1153,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
}
}()

canParallelize := ba.Header.MaxSpanRequestKeys == 0
canParallelize := ba.Header.MaxSpanRequestKeys == 0 && ba.Header.TargetBytes == 0
if ba.IsSingleCheckConsistencyRequest() {
// Don't parallelize full checksum requests as they have to touch the
// entirety of each replica of each range they touch.
Expand Down Expand Up @@ -1213,12 +1214,14 @@ func (ds *DistSender) divideAndSendBatchToRanges(
ba.UpdateTxn(resp.reply.Txn)
}

mightStopEarly := ba.MaxSpanRequestKeys > 0
mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0
// Check whether we've received enough responses to exit query loop.
if mightStopEarly {
var replyResults int64
var replyBytes int64
for _, r := range resp.reply.Responses {
replyResults += r.GetInner().Header().NumKeys
replyBytes += r.GetInner().Header().NumBytes
}
// Update MaxSpanRequestKeys, if applicable. Note that ba might be
// passed recursively to further divideAndSendBatchToRanges() calls.
Expand All @@ -1235,6 +1238,14 @@ func (ds *DistSender) divideAndSendBatchToRanges(
return
}
}
if ba.TargetBytes > 0 {
ba.TargetBytes -= replyBytes
if ba.TargetBytes <= 0 {
couldHaveSkippedResponses = true
resumeReason = roachpb.RESUME_KEY_LIMIT
return
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (rh *ResponseHeader) combine(otherRH ResponseHeader) error {
rh.ResumeSpan = otherRH.ResumeSpan
rh.ResumeReason = otherRH.ResumeReason
rh.NumKeys += otherRH.NumKeys
rh.NumBytes += otherRH.NumBytes
rh.RangeInfos = append(rh.RangeInfos, otherRH.RangeInfos...)
return nil
}
Expand Down
107 changes: 85 additions & 22 deletions pkg/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var nodeTestBaseContext = testutils.NewNodeTestBaseContext()
Expand Down Expand Up @@ -415,9 +416,10 @@ func TestMultiRangeScanDeleteRange(t *testing.T) {
}
}

// TestMultiRangeScanWithMaxResults tests that commands which access multiple
// ranges with MaxResults parameter are carried out properly.
func TestMultiRangeScanWithMaxResults(t *testing.T) {
// TestMultiRangeScanWithPagination tests that specifying MaxSpanResultKeys
// and/or TargetBytes to break up result sets works properly, even across
// ranges.
func TestMultiRangeScanWithPagination(t *testing.T) {
defer leaktest.AfterTest(t)()
testCases := []struct {
splitKeys []roachpb.Key
Expand All @@ -430,7 +432,7 @@ func TestMultiRangeScanWithMaxResults(t *testing.T) {
roachpb.Key("r"), roachpb.Key("w"), roachpb.Key("y")}},
}

for i, tc := range testCases {
for _, tc := range testCases {
t.Run("", func(t *testing.T) {
ctx := context.Background()
s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
Expand All @@ -451,26 +453,87 @@ func TestMultiRangeScanWithMaxResults(t *testing.T) {
}
}

// Try every possible ScanRequest startKey.
for start := 0; start < len(tc.keys); start++ {
// Try every possible maxResults, from 1 to beyond the size of key array.
for maxResults := 1; maxResults <= len(tc.keys)-start+1; maxResults++ {
scan := roachpb.NewScan(tc.keys[start], tc.keys[len(tc.keys)-1].Next())
reply, err := client.SendWrappedWith(
ctx, tds, roachpb.Header{MaxSpanRequestKeys: int64(maxResults)}, scan,
)
if err != nil {
t.Fatal(err)
}
rows := reply.(*roachpb.ScanResponse).Rows
if start+maxResults <= len(tc.keys) && len(rows) != maxResults {
t.Errorf("%d: start=%s: expected %d rows, but got %d", i, tc.keys[start], maxResults, len(rows))
} else if start+maxResults == len(tc.keys)+1 && len(rows) != maxResults-1 {
t.Errorf("%d: expected %d rows, but got %d", i, maxResults-1, len(rows))
}
}
// The maximum TargetBytes to use in this test. We use the bytes in
// all kvs in this test case as a ceiling. Nothing interesting
// happens above this.
var maxTargetBytes int64
{
resp, pErr := client.SendWrapped(ctx, tds, roachpb.NewScan(tc.keys[0], tc.keys[len(tc.keys)-1].Next()))
require.Nil(t, pErr)
maxTargetBytes = resp.Header().NumBytes
}

testutils.RunTrueAndFalse(t, "reverse", func(t *testing.T, reverse bool) {
// Iterate through MaxSpanRequestKeys=1..n and TargetBytes=1..m
// and (where n and m are chosen to reveal the full result set
// in one page). At each(*) combination, paginate
// both the forward and reverse scan and make sure we get the
// right result.
//
// (*) we don't increase the limits when there's only one page,
// but short circuit to something more interesting instead.
msrq := int64(1)
for targetBytes := int64(1); ; targetBytes++ {
var numPages int
t.Run(fmt.Sprintf("targetBytes=%d,maxSpanRequestKeys=%d", targetBytes, msrq), func(t *testing.T) {
req := func(span roachpb.Span) roachpb.Request {
if reverse {
return roachpb.NewReverseScan(span.Key, span.EndKey)
}
return roachpb.NewScan(span.Key, span.EndKey)
}
// Paginate.
resumeSpan := &roachpb.Span{Key: tc.keys[0], EndKey: tc.keys[len(tc.keys)-1].Next()}
var keys []roachpb.Key
for {
numPages++
scan := req(*resumeSpan)
var ba roachpb.BatchRequest
ba.Add(scan)
ba.Header.TargetBytes = targetBytes
ba.Header.MaxSpanRequestKeys = msrq
br, pErr := tds.Send(ctx, ba)
require.Nil(t, pErr)
var rows []roachpb.KeyValue
if reverse {
rows = br.Responses[0].GetReverseScan().Rows
} else {
rows = br.Responses[0].GetScan().Rows
}
for _, kv := range rows {
keys = append(keys, kv.Key)
}
resumeSpan = br.Responses[0].GetInner().Header().ResumeSpan
t.Logf("page #%d: scan %v -> keys (after) %v resume %v", scan.Header().Span(), numPages, keys, resumeSpan)
if resumeSpan == nil {
// Done with this pagination.
break
}
}
if reverse {
for i, n := 0, len(keys); i < n-i-1; i++ {
keys[i], keys[n-i-1] = keys[n-i-1], keys[i]
}
}
require.Equal(t, tc.keys, keys)
if targetBytes == 1 || msrq < int64(len(tc.keys)) {
// Definitely more than one page in this case.
require.Less(t, 1, numPages)
}
if targetBytes >= maxTargetBytes && msrq >= int64(len(tc.keys)) {
// Definitely one page if limits are larger than result set.
require.Equal(t, 1, numPages)
}
})
if targetBytes >= maxTargetBytes || numPages == 1 {
if msrq >= int64(len(tc.keys)) {
return
}
targetBytes = 0
msrq++
}
}
})
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/engine/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -2317,7 +2317,7 @@ func mvccScanToBytes(
if err := opts.validate(); err != nil {
return MVCCScanResult{}, err
}
if opts.MaxKeys < 0 {
if opts.MaxKeys < 0 || opts.TargetBytes < 0 {
resumeSpan := &roachpb.Span{Key: key, EndKey: endKey}
return MVCCScanResult{ResumeSpan: resumeSpan}, nil
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/engine/testdata/mvcc_histories/target_bytes
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ scan: "c" -> /BYTES/ghijkllkjihg @0.000000123,45
scan: "a" -> /BYTES/abcdef @0.000000123,45
scan: 108 bytes (target 10000000)

# Scans with target size -1 return no results.
run ok
with ts=300,0 k=a end=z targetbytes=-1
scan
scan reverse=true
----
scan: resume span ["a","z")
scan: "a"-"z" -> <no data>
scan: resume span ["a","z")
scan: "a"-"z" -> <no data>

run ok
# Target size one byte returns one result (overshooting instead of returning nothing).
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/replica_evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,14 @@ func evaluateBatch(
baHeader.MaxSpanRequestKeys = -1
}
}
if baHeader.TargetBytes > 0 {
retBytes := reply.Header().NumBytes
if baHeader.TargetBytes > retBytes {
baHeader.TargetBytes -= retBytes
} else {
baHeader.TargetBytes = -1
}
}

// If transactional, we use ba.Txn for each individual command and
// accumulate updates to it. Once accumulated, we then remove the Txn
Expand Down
50 changes: 49 additions & 1 deletion pkg/storage/replica_evaluate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ func TestEvaluateBatch(t *testing.T) {
defer leaktest.AfterTest(t)()

tcs := []testCase{
//
// Test suite for MaxRequestSpans.
//
{
// We should never evaluate empty batches, but here's what would happen
// if we did.
Expand Down Expand Up @@ -196,7 +199,52 @@ func TestEvaluateBatch(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "value-e", string(b))
},
}}
},
//
// Test suite for TargetBytes.
//
{
// Two scans and a target bytes limit that saturates during the
// first.
name: "scans with TargetBytes=1",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
d.ba.Add(scanArgsString("a", "c"))
d.ba.Add(getArgsString("e"))
d.ba.Add(scanArgsString("c", "e"))
d.ba.TargetBytes = 1
// Also set a nontrivial MaxSpanRequestKeys, just to make sure
// there's no weird interaction (like it overriding TargetBytes).
// The stricter one ought to win.
d.ba.MaxSpanRequestKeys = 3
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"a"}, []string{"e"}, nil)
verifyResumeSpans(t, r, "b-c", "", "c-e")
b, err := r.br.Responses[1].GetGet().Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "value-e", string(b))
},
}, {
// Ditto in reverse.
name: "reverse scans with TargetBytes=1",
setup: func(t *testing.T, d *data) {
writeABCDEF(t, d)
d.ba.Add(revScanArgsString("c", "e"))
d.ba.Add(getArgsString("e"))
d.ba.Add(revScanArgsString("a", "c"))
d.ba.TargetBytes = 1
d.ba.MaxSpanRequestKeys = 3
},
check: func(t *testing.T, r resp) {
verifyScanResult(t, r, []string{"d"}, []string{"e"}, nil)
verifyResumeSpans(t, r, "c-c\x00", "", "a-c")
b, err := r.br.Responses[1].GetGet().Value.GetBytes()
require.NoError(t, err)
require.Equal(t, "value-e", string(b))
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
Expand Down

0 comments on commit 575dedd

Please sign in to comment.