Skip to content

Commit

Permalink
storage: constrained span of rangedel in ClearRange to keys in range
Browse files Browse the repository at this point in the history
Constrains the width of the range deletion tombstone to the span of keys
actually present within the range. If the range has no kv-entries, then skip
the rangedel completely.

Before this change, when receiving a snapshot, the original file
would have a range deletion tombstone that spanned the entire range written to
it regardless of the actual keys contained in the range or if the range was
empty. This resulted in the creation of excessively wide tombstones, which has
significant performance implications since the wide tombstones impede
compaction.

Fixes #44048.

Release note: None.

Update documentation.

Constrain width of range deletion tombstone for subsumed ranges.

Address nathan's reviews.

Address comments and fix pkg renaming issues with engine to storage.
  • Loading branch information
Owen Qian committed Mar 5, 2020
1 parent d6c3a5e commit 37a1bd1
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 125 deletions.
117 changes: 77 additions & 40 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2851,11 +2851,33 @@ func TestStoreRangeMergeSlowWatcher(t *testing.T) {
}
}

// TestStoreRangeMergeRaftSnapshot sets up a situation which triggers a
// snapshot that spans both a range merge and split and verifies that the
// contents of the SSTs that are created for ingestion as a result of the
// snapshot are what we expect them to be.
//
// The test first creates three ranges: [a, b), [b, c), [c, e) and [e, /Max).
// Keys are inserted at a, b, c and d0 through d9. It then drops traffic to the
// [a, b) replica on store2 after which the above ranges are iteratively merged
// together to form [a, e), [e, /Max) then finally split to form
// [a, d), [d, e), [e, /Max). Once this is all done, the Raft log is truncated
// to ensure that the replica cannot be caught up with incremental diffs.
//
// This set-up allows us to verify that the receiving replica:
// 1. Creates an SSTs for the user keys in the snapshot with a range del
// tombstone that is only as wide as the keys that are present in the range.
// 2. This SST contain all of the user keys that we've inserted.
// 3. Creates SSTs which correctly clear keys contained in subsumed replicas
// again with tombstones that are constrained to the keys present in the
// range.
func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
defer leaktest.AfterTest(t)()

// We will be testing the SSTs written on store2's engine.
var receivingEng, sendingEng storage.Engine
var receivingEng storage.Engine
// Used to set the MVCCKey.Timestamp of the manually inserted keys so we
// can create keys that are byte-by-byte equal to the ones in the snapshot.
keyTimestamps := make(map[string]hlc.Timestamp)
ctx := context.Background()
storeCfg := kvserver.TestStoreConfig(nil)
storeCfg.TestingKnobs.DisableReplicateQueue = true
Expand Down Expand Up @@ -2905,41 +2927,48 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
// tombstones and range deletion tombstones.
var expectedSSTs [][]byte

// Construct SST #1 through #3 as numbered above, but only ultimately
// keep the 3rd one.
keyRanges := rditer.MakeReplicatedKeyRanges(inSnap.State.Desc)
it := rditer.NewReplicaDataIterator(inSnap.State.Desc, sendingEng, true /* replicatedOnly */, false /* seekEnd */)
defer it.Close()
// Write a range deletion tombstone to each of the SSTs then put in the
// kv entries from the sender of the snapshot.
for _, r := range keyRanges {
// Construct SST #3 as numbered above. We set-up the scenario that
// triggered the snapshot so we should be able to predict exactly what
// should be in the SST.
{
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(sstFile)
if err := sst.ClearRange(r.Start, r.End); err != nil {
// Expect the user KeyRange to be [a, d) but we only have keys at
// a, b and c so we expect rditer.ConstrainToKeys to restrict the
// width of the range del tombstone to [a, c).
if err := sst.ClearRange(
storage.MakeMVCCMetadataKey(roachpb.Key("a")),
storage.MakeMVCCMetadataKey(roachpb.Key("c").Next()),
); err != nil {
return err
}

// Keep adding kv data to the SST until the the key exceeds the
// bounds of the range, then proceed to the next range.
for ; ; it.Next() {
valid, err := it.Valid()
if err != nil {
return err
}
if !valid || r.End.Key.Compare(it.Key().Key) <= 0 {
if err := sst.Finish(); err != nil {
return err
}
sst.Close()
expectedSSTs = append(expectedSSTs, sstFile.Data())
break
}
if err := sst.Put(it.Key(), it.Value()); err != nil {
return err
}
// Insert keys "a", "b" and "c" with a timestamp that matches the
// sender's and has a value of 1.
createValFunc := func(key string, val int64) (storage.MVCCKey, roachpb.Value) {
k := roachpb.Key(key)
mvccKey := storage.MVCCKey{Key: k, Timestamp: keyTimestamps[string(k)]}
var v roachpb.Value
v.SetInt(val)
v.InitChecksum(k)
return mvccKey, v
}
k1, v1 := createValFunc("a", int64(1))
k2, v2 := createValFunc("b", int64(1))
k3, v3 := createValFunc("c", int64(1))
if err := sst.Put(k1, v1.RawBytes); err != nil {
return err
}
if err := sst.Put(k2, v2.RawBytes); err != nil {
return err
}
if err := sst.Put(k3, v3.RawBytes); err != nil {
return err
}

sst.Close()
expectedSSTs = append(expectedSSTs, sstFile.Data())
}
expectedSSTs = expectedSSTs[2:]

// Construct SSTs #5 and #6: range-id local keys of subsumed replicas
// with RangeIDs 3 and 4.
Expand Down Expand Up @@ -2967,16 +2996,12 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
sstFile := &storage.MemFile{}
sst := storage.MakeIngestionSSTWriter(sstFile)
defer sst.Close()
desc := roachpb.RangeDescriptor{
StartKey: roachpb.RKey("d"),
EndKey: roachpb.RKeyMax,
}
r := rditer.MakeUserKeyRange(&desc)
if err := storage.ClearRangeWithHeuristic(receivingEng, &sst, r.Start.Key, r.End.Key); err != nil {
// Expect there to be keys d0 through d9 in the [a, d) user KeyRange
// so the range del tombstone should only span exactly these keys.
if err := storage.ClearRangeWithHeuristic(receivingEng, &sst, roachpb.Key("d0"), roachpb.Key("d9").Next()); err != nil {
return err
}
err := sst.Finish()
if err != nil {
if err := sst.Finish(); err != nil {
return err
}
expectedSSTs = append(expectedSSTs, sstFile.Data())
Expand All @@ -2993,7 +3018,7 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
}
}
if len(mismatchedSstsIdx) != 0 {
return errors.Errorf("SST indices %v don't match", mismatchedSstsIdx)
return errors.Errorf("actual and expected SST indices %v don't match", mismatchedSstsIdx)
}
return nil
}
Expand All @@ -3007,7 +3032,6 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
mtc.Start(t, 3)
defer mtc.Stop()
store0, store2 := mtc.Store(0), mtc.Store(2)
sendingEng = store0.Engine()
receivingEng = store2.Engine()
distSender := mtc.distSenders[0]

Expand All @@ -3017,9 +3041,15 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
if _, pErr := client.SendWrapped(ctx, distSender, adminSplitArgs(key)); pErr != nil {
t.Fatal(pErr)
}
if _, pErr := client.SendWrapped(ctx, distSender, incrementArgs(key, 1)); pErr != nil {
// Manually send the request so we can store the timestamp.
ba := roachpb.BatchRequest{}
ba.Add(incrementArgs(key, 1))
br, pErr := distSender.Send(ctx, ba)
if pErr != nil {
t.Fatal(pErr)
}
keyTimestamps[string(key)] = br.Timestamp

mtc.waitForValues(key, []int64{1, 1, 1})
}

Expand All @@ -3035,6 +3065,13 @@ func TestStoreRangeMergeRaftSnapshot(t *testing.T) {
mtc.waitForValues(key, []int64{1, 1, 1})
}

// Split [d, /Max) into [d, e) and [e, /Max) so we can predict the
// contents of [a, d) without having to worry about metadata keys that will
// now be in [e, /Max) instead.
if _, pErr := client.SendWrapped(ctx, distSender, adminSplitArgs(roachpb.Key("e"))); pErr != nil {
t.Fatal(pErr)
}

aRepl0 := store0.LookupReplica(roachpb.RKey("a"))

// Start dropping all Raft traffic to the first range on store2.
Expand Down
31 changes: 29 additions & 2 deletions pkg/kv/kvserver/rditer/replica_data_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/pkg/errors"
)

// KeyRange is a helper struct for the ReplicaDataIterator.
Expand All @@ -24,10 +25,10 @@ type KeyRange struct {

// ReplicaDataIterator provides a complete iteration over all key / value
// rows in a range, including all system-local metadata and user data.
// The ranges keyRange slice specifies the key ranges which comprise
// The ranges KeyRange slice specifies the key ranges which comprise
// all of the range's data.
//
// A ReplicaDataIterator provides a subset of the engine.Iterator interface.
// A ReplicaDataIterator provides a subset of the storage.Iterator interface.
//
// TODO(tschottdorf): the API is awkward. By default, ReplicaDataIterator uses
// a byte allocator which needs to be reset manually using `ResetAllocator`.
Expand All @@ -50,6 +51,32 @@ func MakeAllKeyRanges(d *roachpb.RangeDescriptor) []KeyRange {
}
}

// ConstrainToKeys returns a Span constrained to the keys present in the given Range.
// Returns (span, true) if the Range is contains no keys, otherwise (span, false).
func ConstrainToKeys(
reader storage.Reader, span roachpb.Span,
) (_ roachpb.Span, empty bool, _ error) {
it := reader.NewIterator(storage.IterOptions{LowerBound: span.Key, UpperBound: span.EndKey})
defer it.Close()
it.SeekGE(storage.MakeMVCCMetadataKey(span.Key))
if valid, err := it.Valid(); err != nil {
return roachpb.Span{}, true, errors.Wrapf(err, "unexpected error when constraining non-empty span")
} else if !valid {
return roachpb.Span{}, true, nil
}
startKey := it.Key().Key

it.SeekLT(storage.MakeMVCCMetadataKey(span.EndKey))
if valid, err := it.Valid(); err != nil {
return roachpb.Span{}, true, errors.Wrapf(err, "unexpected error when constraining non-empty span")
} else if !valid {
return roachpb.Span{}, true, nil
}
endKey := it.Key().Key.Next()

return roachpb.Span{Key: startKey, EndKey: endKey}, false, nil
}

// MakeReplicatedKeyRanges returns all key ranges that are fully Raft
// replicated for the given Range.
//
Expand Down
64 changes: 64 additions & 0 deletions pkg/kv/kvserver/rditer/replica_data_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

func fakePrevKey(k []byte) roachpb.Key {
Expand Down Expand Up @@ -265,3 +266,66 @@ func TestReplicaDataIterator(t *testing.T) {
})
}
}

func TestConstrainToKeysEmptyRange(t *testing.T) {
defer leaktest.AfterTest(t)()

eng := storage.NewDefaultInMem()
defer eng.Close()

span, empty, err := ConstrainToKeys(eng, roachpb.Span{Key: roachpb.Key("a"), EndKey: roachpb.Key("z")})
require.NoError(t, err)
require.True(t, empty)
require.Equal(t, roachpb.Span{}, span)
}

func TestConstrainToKeysNonEmptyRange(t *testing.T) {
defer leaktest.AfterTest(t)()

eng := storage.NewDefaultInMem()
defer eng.Close()

originalSpan := roachpb.Span{
Key: roachpb.Key("b"),
EndKey: roachpb.Key("q"),
}

// Insert a single key, d, and expect [d, d.Next()).
err := storage.MVCCPut(context.Background(), eng, nil, roachpb.Key("d"), hlc.Timestamp{}, roachpb.MakeValueFromString("value"), nil)
require.NoError(t, err)
span, empty, err := ConstrainToKeys(eng, originalSpan)
require.NoError(t, err)
require.False(t, empty)
require.Equal(t, roachpb.Key("d"), span.Key)
require.Equal(t, roachpb.Key("d").Next(), span.EndKey)

// Insert a second key, h, and expect [d, h.Next()).
err = storage.MVCCPut(context.Background(), eng, nil, roachpb.Key("h"), hlc.Timestamp{}, roachpb.MakeValueFromString("value"), nil)
require.NoError(t, err)
span, empty, err = ConstrainToKeys(eng, originalSpan)
require.NoError(t, err)
require.False(t, empty)
require.Equal(t, roachpb.Key("d"), span.Key)
require.Equal(t, roachpb.Key("h").Next(), span.EndKey)

// Insert a third key, c, and expect [c, h.Next()).
err = storage.MVCCPut(context.Background(), eng, nil, roachpb.Key("c"), hlc.Timestamp{}, roachpb.MakeValueFromString("value"), nil)
require.NoError(t, err)
span, empty, err = ConstrainToKeys(eng, originalSpan)
require.NoError(t, err)
require.False(t, empty)
require.Equal(t, roachpb.Key("c"), span.Key)
require.Equal(t, roachpb.Key("h").Next(), span.EndKey)

// Insert a key before the start key and beyond the end key of the original
// span. Expect the constrained span to remain unchanged.
err = storage.MVCCPut(context.Background(), eng, nil, roachpb.Key("a"), hlc.Timestamp{}, roachpb.MakeValueFromString("value"), nil)
require.NoError(t, err)
err = storage.MVCCPut(context.Background(), eng, nil, roachpb.Key("z"), hlc.Timestamp{}, roachpb.MakeValueFromString("value"), nil)
require.NoError(t, err)
span, empty, err = ConstrainToKeys(eng, originalSpan)
require.NoError(t, err)
require.False(t, empty)
require.Equal(t, roachpb.Key("c"), span.Key)
require.Equal(t, roachpb.Key("h").Next(), span.EndKey)
}
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1740,8 +1740,11 @@ func execChangeReplicasTxn(
// three SSTs from them for direct ingestion: one for the replicated range-ID
// local keys, one for the range local keys, and one for the user keys. The
// reason it creates three separate SSTs is to prevent overlaps with the
// memtable and existing SSTs in RocksDB. Each of the SSTs also has a range
// deletion tombstone to delete the existing data in the range.
// memtable and existing SSTs in RocksDB. These SST files are created lazily,
// so in the case where the keyspace is empty, no file will be created. Each
// of the SSTs has a range deletion tombstone written to it to delete the
// existing data in the range, however if the keyspace is empty, then the
// tombstone will be omitted.
//
// Applying the snapshot: After the recipient has received the message
// indicating it has all the data, it hands it all to
Expand Down
35 changes: 21 additions & 14 deletions pkg/kv/kvserver/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,10 +577,10 @@ func snapshot(
// They are managed by the caller, including cleaning up obsolete on-disk
// payloads in case the log tail is replaced.
//
// NOTE: This method takes a engine.Writer because reads are unnecessary when
// NOTE: This method takes a storage.Writer because reads are unnecessary when
// prevLastIndex is 0 and prevLastTerm is invalidLastTerm. In the case where
// reading is necessary (I.E. entries are getting overwritten or deleted), a
// engine.ReadWriter must be passed in.
// storage.ReadWriter must be passed in.
func (r *Replica) append(
ctx context.Context,
writer storage.Writer,
Expand All @@ -606,11 +606,11 @@ func (r *Replica) append(
if ent.Index > prevLastIndex {
err = storage.MVCCBlindPut(ctx, writer, &diff, key, hlc.Timestamp{}, value, nil /* txn */)
} else {
// We type assert `writer` to also be an engine.ReadWriter only in
// We type assert `writer` to also be an storage.ReadWriter only in
// the case where we're replacing existing entries.
eng, ok := writer.(storage.ReadWriter)
if !ok {
panic("expected writer to be a engine.ReadWriter when overwriting log entries")
panic("expected writer to be a storage.ReadWriter when overwriting log entries")
}
err = storage.MVCCPut(ctx, eng, &diff, key, hlc.Timestamp{}, value, nil /* txn */)
}
Expand All @@ -623,11 +623,11 @@ func (r *Replica) append(
lastTerm := entries[len(entries)-1].Term
// Delete any previously appended log entries which never committed.
if prevLastIndex > 0 {
// We type assert `writer` to also be an engine.ReadWriter only in the
// We type assert `writer` to also be an storage.ReadWriter only in the
// case where we're deleting existing entries.
eng, ok := writer.(storage.ReadWriter)
if !ok {
panic("expected writer to be a engine.ReadWriter when deleting log entries")
panic("expected writer to be a storage.ReadWriter when deleting log entries")
}
for i := lastIndex + 1; i <= prevLastIndex; i++ {
// Note that the caller is in charge of deleting any sideloaded payloads
Expand Down Expand Up @@ -1066,14 +1066,21 @@ func (r *Replica) clearSubsumedReplicaDiskData(
subsumedReplSSTFile := &storage.MemFile{}
subsumedReplSST := storage.MakeIngestionSSTWriter(subsumedReplSSTFile)
defer subsumedReplSST.Close()
if err := storage.ClearRangeWithHeuristic(
r.store.Engine(),
&subsumedReplSST,
keyRanges[i].End.Key,
totalKeyRanges[i].End.Key,
); err != nil {
subsumedReplSST.Close()
return err
unconstrainedSpan := roachpb.Span{Key: keyRanges[i].End.Key, EndKey: totalKeyRanges[i].End.Key}
span, empty, err := rditer.ConstrainToKeys(r.Engine(), unconstrainedSpan)
if err != nil {
return errors.Wrapf(err, "error constraining width of range deletion tombstone")
}
if !empty {
if err := storage.ClearRangeWithHeuristic(
r.store.Engine(),
&subsumedReplSST,
span.Key,
span.EndKey,
); err != nil {
subsumedReplSST.Close()
return err
}
}
if err := subsumedReplSST.Finish(); err != nil {
return err
Expand Down
Loading

0 comments on commit 37a1bd1

Please sign in to comment.