From 575dedd3e4bf459c1caf1ce8c3256e6362e1bdf4 Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Mon, 24 Feb 2020 15:08:59 +0100 Subject: [PATCH] kv: finish up impl of TargetBytes scan limit This PR finishes up the work initiated in #44925 to allow (forward and reverse) scans to specify a TargetBytes hint which (mod overshooting by one row) restricts the size of the responses. The plan is to use it in kvfetcher, however this is left as a separate commit - here we focus on testing the functionality only. Release note: None --- pkg/kv/dist_sender.go | 21 +++- pkg/roachpb/api.go | 1 + pkg/server/server_test.go | 107 ++++++++++++++---- pkg/storage/engine/mvcc.go | 2 +- .../testdata/mvcc_histories/target_bytes | 10 ++ pkg/storage/replica_evaluate.go | 8 ++ pkg/storage/replica_evaluate_test.go | 50 +++++++- 7 files changed, 170 insertions(+), 29 deletions(-) diff --git a/pkg/kv/dist_sender.go b/pkg/kv/dist_sender.go index fdd9024e9a49..2a80b1e931b4 100644 --- a/pkg/kv/dist_sender.go +++ b/pkg/kv/dist_sender.go @@ -558,7 +558,7 @@ func (ds *DistSender) initAndVerifyBatch( return roachpb.NewErrorf("empty batch") } - if ba.MaxSpanRequestKeys != 0 { + if ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0 { // Verify that the batch contains only specific range requests or the // EndTxnRequest. Verify that a batch with a ReverseScan only contains // ReverseScan range requests. @@ -672,10 +672,11 @@ func (ds *DistSender) Send( splitET = true } parts := splitBatchAndCheckForRefreshSpans(ba, splitET) - if len(parts) > 1 && ba.MaxSpanRequestKeys != 0 { + if len(parts) > 1 && (ba.MaxSpanRequestKeys != 0 || ba.TargetBytes != 0) { // We already verified above that the batch contains only scan requests of the same type. // Such a batch should never need splitting. - panic("batch with MaxSpanRequestKeys needs splitting") + log.Fatalf(ctx, "batch with MaxSpanRequestKeys=%d, TargetBytes=%d needs splitting", + log.Safe(ba.MaxSpanRequestKeys), log.Safe(ba.TargetBytes)) } errIdxOffset := 0 @@ -1152,7 +1153,7 @@ func (ds *DistSender) divideAndSendBatchToRanges( } }() - canParallelize := ba.Header.MaxSpanRequestKeys == 0 + canParallelize := ba.Header.MaxSpanRequestKeys == 0 && ba.Header.TargetBytes == 0 if ba.IsSingleCheckConsistencyRequest() { // Don't parallelize full checksum requests as they have to touch the // entirety of each replica of each range they touch. @@ -1213,12 +1214,14 @@ func (ds *DistSender) divideAndSendBatchToRanges( ba.UpdateTxn(resp.reply.Txn) } - mightStopEarly := ba.MaxSpanRequestKeys > 0 + mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0 // Check whether we've received enough responses to exit query loop. if mightStopEarly { var replyResults int64 + var replyBytes int64 for _, r := range resp.reply.Responses { replyResults += r.GetInner().Header().NumKeys + replyBytes += r.GetInner().Header().NumBytes } // Update MaxSpanRequestKeys, if applicable. Note that ba might be // passed recursively to further divideAndSendBatchToRanges() calls. @@ -1235,6 +1238,14 @@ func (ds *DistSender) divideAndSendBatchToRanges( return } } + if ba.TargetBytes > 0 { + ba.TargetBytes -= replyBytes + if ba.TargetBytes <= 0 { + couldHaveSkippedResponses = true + resumeReason = roachpb.RESUME_KEY_LIMIT + return + } + } } } diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 4103e6e5dae4..9893ec2d1bfa 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -232,6 +232,7 @@ func (rh *ResponseHeader) combine(otherRH ResponseHeader) error { rh.ResumeSpan = otherRH.ResumeSpan rh.ResumeReason = otherRH.ResumeReason rh.NumKeys += otherRH.NumKeys + rh.NumBytes += otherRH.NumBytes rh.RangeInfos = append(rh.RangeInfos, otherRH.RangeInfos...) return nil } diff --git a/pkg/server/server_test.go b/pkg/server/server_test.go index 2312d9240332..42f7a90a7eb9 100644 --- a/pkg/server/server_test.go +++ b/pkg/server/server_test.go @@ -51,6 +51,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) var nodeTestBaseContext = testutils.NewNodeTestBaseContext() @@ -415,9 +416,10 @@ func TestMultiRangeScanDeleteRange(t *testing.T) { } } -// TestMultiRangeScanWithMaxResults tests that commands which access multiple -// ranges with MaxResults parameter are carried out properly. -func TestMultiRangeScanWithMaxResults(t *testing.T) { +// TestMultiRangeScanWithPagination tests that specifying MaxSpanResultKeys +// and/or TargetBytes to break up result sets works properly, even across +// ranges. +func TestMultiRangeScanWithPagination(t *testing.T) { defer leaktest.AfterTest(t)() testCases := []struct { splitKeys []roachpb.Key @@ -430,7 +432,7 @@ func TestMultiRangeScanWithMaxResults(t *testing.T) { roachpb.Key("r"), roachpb.Key("w"), roachpb.Key("y")}}, } - for i, tc := range testCases { + for _, tc := range testCases { t.Run("", func(t *testing.T) { ctx := context.Background() s, _, db := serverutils.StartServer(t, base.TestServerArgs{}) @@ -451,26 +453,87 @@ func TestMultiRangeScanWithMaxResults(t *testing.T) { } } - // Try every possible ScanRequest startKey. - for start := 0; start < len(tc.keys); start++ { - // Try every possible maxResults, from 1 to beyond the size of key array. - for maxResults := 1; maxResults <= len(tc.keys)-start+1; maxResults++ { - scan := roachpb.NewScan(tc.keys[start], tc.keys[len(tc.keys)-1].Next()) - reply, err := client.SendWrappedWith( - ctx, tds, roachpb.Header{MaxSpanRequestKeys: int64(maxResults)}, scan, - ) - if err != nil { - t.Fatal(err) - } - rows := reply.(*roachpb.ScanResponse).Rows - if start+maxResults <= len(tc.keys) && len(rows) != maxResults { - t.Errorf("%d: start=%s: expected %d rows, but got %d", i, tc.keys[start], maxResults, len(rows)) - } else if start+maxResults == len(tc.keys)+1 && len(rows) != maxResults-1 { - t.Errorf("%d: expected %d rows, but got %d", i, maxResults-1, len(rows)) - } - } + // The maximum TargetBytes to use in this test. We use the bytes in + // all kvs in this test case as a ceiling. Nothing interesting + // happens above this. + var maxTargetBytes int64 + { + resp, pErr := client.SendWrapped(ctx, tds, roachpb.NewScan(tc.keys[0], tc.keys[len(tc.keys)-1].Next())) + require.Nil(t, pErr) + maxTargetBytes = resp.Header().NumBytes } + testutils.RunTrueAndFalse(t, "reverse", func(t *testing.T, reverse bool) { + // Iterate through MaxSpanRequestKeys=1..n and TargetBytes=1..m + // and (where n and m are chosen to reveal the full result set + // in one page). At each(*) combination, paginate + // both the forward and reverse scan and make sure we get the + // right result. + // + // (*) we don't increase the limits when there's only one page, + // but short circuit to something more interesting instead. + msrq := int64(1) + for targetBytes := int64(1); ; targetBytes++ { + var numPages int + t.Run(fmt.Sprintf("targetBytes=%d,maxSpanRequestKeys=%d", targetBytes, msrq), func(t *testing.T) { + req := func(span roachpb.Span) roachpb.Request { + if reverse { + return roachpb.NewReverseScan(span.Key, span.EndKey) + } + return roachpb.NewScan(span.Key, span.EndKey) + } + // Paginate. + resumeSpan := &roachpb.Span{Key: tc.keys[0], EndKey: tc.keys[len(tc.keys)-1].Next()} + var keys []roachpb.Key + for { + numPages++ + scan := req(*resumeSpan) + var ba roachpb.BatchRequest + ba.Add(scan) + ba.Header.TargetBytes = targetBytes + ba.Header.MaxSpanRequestKeys = msrq + br, pErr := tds.Send(ctx, ba) + require.Nil(t, pErr) + var rows []roachpb.KeyValue + if reverse { + rows = br.Responses[0].GetReverseScan().Rows + } else { + rows = br.Responses[0].GetScan().Rows + } + for _, kv := range rows { + keys = append(keys, kv.Key) + } + resumeSpan = br.Responses[0].GetInner().Header().ResumeSpan + t.Logf("page #%d: scan %v -> keys (after) %v resume %v", scan.Header().Span(), numPages, keys, resumeSpan) + if resumeSpan == nil { + // Done with this pagination. + break + } + } + if reverse { + for i, n := 0, len(keys); i < n-i-1; i++ { + keys[i], keys[n-i-1] = keys[n-i-1], keys[i] + } + } + require.Equal(t, tc.keys, keys) + if targetBytes == 1 || msrq < int64(len(tc.keys)) { + // Definitely more than one page in this case. + require.Less(t, 1, numPages) + } + if targetBytes >= maxTargetBytes && msrq >= int64(len(tc.keys)) { + // Definitely one page if limits are larger than result set. + require.Equal(t, 1, numPages) + } + }) + if targetBytes >= maxTargetBytes || numPages == 1 { + if msrq >= int64(len(tc.keys)) { + return + } + targetBytes = 0 + msrq++ + } + } + }) }) } } diff --git a/pkg/storage/engine/mvcc.go b/pkg/storage/engine/mvcc.go index 3beda21be4f2..f9133dde5160 100644 --- a/pkg/storage/engine/mvcc.go +++ b/pkg/storage/engine/mvcc.go @@ -2317,7 +2317,7 @@ func mvccScanToBytes( if err := opts.validate(); err != nil { return MVCCScanResult{}, err } - if opts.MaxKeys < 0 { + if opts.MaxKeys < 0 || opts.TargetBytes < 0 { resumeSpan := &roachpb.Span{Key: key, EndKey: endKey} return MVCCScanResult{ResumeSpan: resumeSpan}, nil } diff --git a/pkg/storage/engine/testdata/mvcc_histories/target_bytes b/pkg/storage/engine/testdata/mvcc_histories/target_bytes index 148da0306a47..e2048338aa9e 100644 --- a/pkg/storage/engine/testdata/mvcc_histories/target_bytes +++ b/pkg/storage/engine/testdata/mvcc_histories/target_bytes @@ -69,6 +69,16 @@ scan: "c" -> /BYTES/ghijkllkjihg @0.000000123,45 scan: "a" -> /BYTES/abcdef @0.000000123,45 scan: 108 bytes (target 10000000) +# Scans with target size -1 return no results. +run ok +with ts=300,0 k=a end=z targetbytes=-1 + scan + scan reverse=true +---- +scan: resume span ["a","z") +scan: "a"-"z" -> +scan: resume span ["a","z") +scan: "a"-"z" -> run ok # Target size one byte returns one result (overshooting instead of returning nothing). diff --git a/pkg/storage/replica_evaluate.go b/pkg/storage/replica_evaluate.go index 61a0d3ea0861..795805d6cbe1 100644 --- a/pkg/storage/replica_evaluate.go +++ b/pkg/storage/replica_evaluate.go @@ -331,6 +331,14 @@ func evaluateBatch( baHeader.MaxSpanRequestKeys = -1 } } + if baHeader.TargetBytes > 0 { + retBytes := reply.Header().NumBytes + if baHeader.TargetBytes > retBytes { + baHeader.TargetBytes -= retBytes + } else { + baHeader.TargetBytes = -1 + } + } // If transactional, we use ba.Txn for each individual command and // accumulate updates to it. Once accumulated, we then remove the Txn diff --git a/pkg/storage/replica_evaluate_test.go b/pkg/storage/replica_evaluate_test.go index d41d43ba7e4f..3c6b896b8406 100644 --- a/pkg/storage/replica_evaluate_test.go +++ b/pkg/storage/replica_evaluate_test.go @@ -30,6 +30,9 @@ func TestEvaluateBatch(t *testing.T) { defer leaktest.AfterTest(t)() tcs := []testCase{ + // + // Test suite for MaxRequestSpans. + // { // We should never evaluate empty batches, but here's what would happen // if we did. @@ -196,7 +199,52 @@ func TestEvaluateBatch(t *testing.T) { require.NoError(t, err) require.Equal(t, "value-e", string(b)) }, - }} + }, + // + // Test suite for TargetBytes. + // + { + // Two scans and a target bytes limit that saturates during the + // first. + name: "scans with TargetBytes=1", + setup: func(t *testing.T, d *data) { + writeABCDEF(t, d) + d.ba.Add(scanArgsString("a", "c")) + d.ba.Add(getArgsString("e")) + d.ba.Add(scanArgsString("c", "e")) + d.ba.TargetBytes = 1 + // Also set a nontrivial MaxSpanRequestKeys, just to make sure + // there's no weird interaction (like it overriding TargetBytes). + // The stricter one ought to win. + d.ba.MaxSpanRequestKeys = 3 + }, + check: func(t *testing.T, r resp) { + verifyScanResult(t, r, []string{"a"}, []string{"e"}, nil) + verifyResumeSpans(t, r, "b-c", "", "c-e") + b, err := r.br.Responses[1].GetGet().Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "value-e", string(b)) + }, + }, { + // Ditto in reverse. + name: "reverse scans with TargetBytes=1", + setup: func(t *testing.T, d *data) { + writeABCDEF(t, d) + d.ba.Add(revScanArgsString("c", "e")) + d.ba.Add(getArgsString("e")) + d.ba.Add(revScanArgsString("a", "c")) + d.ba.TargetBytes = 1 + d.ba.MaxSpanRequestKeys = 3 + }, + check: func(t *testing.T, r resp) { + verifyScanResult(t, r, []string{"d"}, []string{"e"}, nil) + verifyResumeSpans(t, r, "c-c\x00", "", "a-c") + b, err := r.br.Responses[1].GetGet().Value.GetBytes() + require.NoError(t, err) + require.Equal(t, "value-e", string(b)) + }, + }, + } for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) {