Skip to content

Commit

Permalink
Merge pull request #524 from cockroachdb/pmattis/open-read-only
Browse files Browse the repository at this point in the history
db: fix WAL replay in read-only mode
  • Loading branch information
petermattis authored Feb 10, 2020
2 parents 94802b4 + 213ca72 commit 7ad59b7
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 78 deletions.
3 changes: 3 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,9 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
var logSeqNum uint64
if b != nil {
logSeqNum = b.SeqNum()
if b.flushable != nil {
logSeqNum += uint64(b.Count())
}
} else {
logSeqNum = atomic.LoadUint64(&d.mu.versions.logSeqNum)
}
Expand Down
11 changes: 10 additions & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,17 @@ func TestLargeBatch(t *testing.T) {
}
return info.Size()
}
memTableCreationSeqNum := func() uint64 {
d.mu.Lock()
defer d.mu.Unlock()
return d.mu.mem.mutable.logSeqNum
}

startLogNum := logNum()
startLogStartSize := fileSize(startLogNum)
startSeqNum := atomic.LoadUint64(&d.mu.versions.logSeqNum)

// Write two keys with values that are larger than the memtable size.
// Write a key with a value larger than the memtable size.
if err := d.Set([]byte("a"), bytes.Repeat([]byte("a"), 512), nil); err != nil {
t.Fatal(err)
}
Expand All @@ -409,6 +415,9 @@ func TestLargeBatch(t *testing.T) {
if endLogSize != 0 {
t.Fatalf("expected %06d.log to be empty, but found %d", endLogNum, endLogSize)
}
if creationSeqNum := memTableCreationSeqNum(); creationSeqNum <= startSeqNum {
t.Fatalf("expected memTable.logSeqNum=%d > largeBatch.seqNum=%d", creationSeqNum, startSeqNum)
}

// Verify this results in one L0 table being created.
err = try(100*time.Microsecond, 20*time.Second, verifyLSM("0:\n 000005:[a-a]\n"))
Expand Down
20 changes: 14 additions & 6 deletions mem_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package pebble

import (
"bytes"
"fmt"
"sync"
"sync/atomic"
Expand All @@ -19,6 +20,17 @@ func memTableEntrySize(keyBytes, valueBytes int) uint32 {
return arenaskl.MaxNodeSize(uint32(keyBytes)+8, uint32(valueBytes))
}

// memTableEmptySize is the amount of allocated space in the arena when the
// memtable is empty.
var memTableEmptySize = func() uint32 {
var pointSkl arenaskl.Skiplist
var rangeDelSkl arenaskl.Skiplist
arena := arenaskl.NewArena(16 << 10 /* 16 KB */)
pointSkl.Reset(arena, bytes.Compare)
rangeDelSkl.Reset(arena, bytes.Compare)
return arena.Size()
}()

// A memTable implements an in-memory layer of the LSM. A memTable is mutable,
// but append-only. Records are added, but never removed. Deletion is supported
// via tombstones, but it is up to higher level code (see Iterator) to support
Expand Down Expand Up @@ -49,9 +61,6 @@ type memTable struct {
equal Equal
skl arenaskl.Skiplist
rangeDelSkl arenaskl.Skiplist
// emptySize is the amount of allocated space in the arena when the memtable
// is empty.
emptySize uint32
// reserved tracks the amount of space used by the memtable, both by actual
// data stored in the memtable as well as inflight batch commit
// operations. This value is incremented pessimistically by prepare() in
Expand Down Expand Up @@ -96,7 +105,6 @@ func newMemTable(opts memTableOptions) *memTable {
arena := arenaskl.NewArena(uint32(opts.size))
m.skl.Reset(arena, m.cmp)
m.rangeDelSkl.Reset(arena, m.cmp)
m.emptySize = arena.Size()
return m
}

Expand Down Expand Up @@ -228,7 +236,7 @@ func (m *memTable) availBytes() uint32 {
}

func (m *memTable) inuseBytes() uint64 {
return uint64(m.skl.Size() - m.emptySize)
return uint64(m.skl.Size() - memTableEmptySize)
}

func (m *memTable) totalBytes() uint64 {
Expand All @@ -241,7 +249,7 @@ func (m *memTable) close() error {

// empty returns whether the MemTable has no key/value pairs.
func (m *memTable) empty() bool {
return m.skl.Size() == m.emptySize
return m.skl.Size() == memTableEmptySize
}

// A rangeTombstoneFrags holds a set of fragmented range tombstones generated
Expand Down
31 changes: 18 additions & 13 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ func Open(dirname string, opts *Options) (*DB, error) {
}

d := &DB{
cacheID: opts.Cache.NewID(),
dirname: dirname,
walDirname: opts.WALDir,
opts: opts,
cmp: opts.Comparer.Compare,
equal: opts.Comparer.Equal,
merge: opts.Merger.Merge,
split: opts.Comparer.Split,
abbreviatedKey: opts.Comparer.AbbreviatedKey,
logRecycler: logRecycler{limit: opts.MemTableStopWritesThreshold + 1},
cacheID: opts.Cache.NewID(),
dirname: dirname,
walDirname: opts.WALDir,
opts: opts,
cmp: opts.Comparer.Compare,
equal: opts.Comparer.Equal,
merge: opts.Merger.Merge,
split: opts.Comparer.Split,
abbreviatedKey: opts.Comparer.AbbreviatedKey,
largeBatchThreshold: (opts.MemTableSize - int(memTableEmptySize)) / 2,
logRecycler: logRecycler{limit: opts.MemTableStopWritesThreshold + 1},
}
if d.equal == nil {
d.equal = bytes.Equal
Expand Down Expand Up @@ -145,11 +146,13 @@ func Open(dirname string, opts *Options) (*DB, error) {
}
}

{
// In read-only mode, we replay directly into the mutable memtable but never
// flush it. We need to delay creation of the memtable until we know the
// sequence number of the first batch that will be inserted.
if !d.opts.ReadOnly {
var entry *flushableEntry
d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum)
d.mu.mem.queue = append(d.mu.mem.queue, entry)
d.largeBatchThreshold = (d.opts.MemTableSize - int(d.mu.mem.mutable.emptySize)) / 2
}

ls, err := opts.FS.List(d.walDirname)
Expand Down Expand Up @@ -306,7 +309,9 @@ func (d *DB) replayWAL(
// In read-only mode, we replay directly into the mutable memtable which will
// never be flushed.
mem = d.mu.mem.mutable
entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
if mem != nil {
entry = d.mu.mem.queue[len(d.mu.mem.queue)-1]
}
}

// Flushes the current memtable, if not nil.
Expand Down
171 changes: 117 additions & 54 deletions open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,6 @@ func TestOpenReadOnly(t *testing.T) {
}

func TestOpenWALReplay(t *testing.T) {
mem := vfs.NewMem()

largeValue := []byte(strings.Repeat("a", 100<<10))
hugeValue := []byte(strings.Repeat("b", 10<<20))
checkIter := func(iter *Iterator) {
Expand All @@ -392,62 +390,127 @@ func TestOpenWALReplay(t *testing.T) {
t.Fatalf("%s\n%s", strings.Join(diff, "\n"), keys)
}
}

for _, readOnly := range []bool{false, true} {
t.Logf("read-only: %t", readOnly)
// Create a new DB and populate it with some data.
dir := fmt.Sprint(readOnly)
d, err := Open(dir, &Options{
FS: mem,
MemTableSize: 32 << 20,
})
require.NoError(t, err)
// All these values will fit in a single memtable, so on closing the db there
// will be no sst and all the data is in a single WAL.
require.NoError(t, d.Set([]byte("1"), largeValue, nil))
require.NoError(t, d.Set([]byte("2"), largeValue, nil))
require.NoError(t, d.Set([]byte("3"), largeValue, nil))
require.NoError(t, d.Set([]byte("4"), hugeValue, nil))
require.NoError(t, d.Set([]byte("5"), largeValue, nil))
checkIter(d.NewIter(nil))
require.NoError(t, d.Close())
files, err := mem.List(dir)
require.NoError(t, err)
sort.Strings(files)
logCount, sstCount := 0, 0
for _, fname := range files {
t.Log(fname)
if strings.HasSuffix(fname, ".sst") {
sstCount++
t.Run(fmt.Sprintf("read-only=%t", readOnly), func(t *testing.T) {
// Create a new DB and populate it with some data.
const dir = ""
mem := vfs.NewMem()
d, err := Open(dir, &Options{
FS: mem,
MemTableSize: 32 << 20,
})
require.NoError(t, err)
// All these values will fit in a single memtable, so on closing the db there
// will be no sst and all the data is in a single WAL.
require.NoError(t, d.Set([]byte("1"), largeValue, nil))
require.NoError(t, d.Set([]byte("2"), largeValue, nil))
require.NoError(t, d.Set([]byte("3"), largeValue, nil))
require.NoError(t, d.Set([]byte("4"), hugeValue, nil))
require.NoError(t, d.Set([]byte("5"), largeValue, nil))
checkIter(d.NewIter(nil))
require.NoError(t, d.Close())
files, err := mem.List(dir)
require.NoError(t, err)
sort.Strings(files)
logCount, sstCount := 0, 0
for _, fname := range files {
if strings.HasSuffix(fname, ".sst") {
sstCount++
}
if strings.HasSuffix(fname, ".log") {
logCount++
}
}
if strings.HasSuffix(fname, ".log") {
logCount++
require.Equal(t, 0, sstCount)
// The memtable size starts at 256KB and doubles up to 32MB so we expect 5
// logs (one for each doubling).
require.Equal(t, 7, logCount)

// Re-open the DB with a smaller memtable. Values for 1, 2 will fit in the first memtable;
// value for 3 will go in the next memtable; value for 4 will be in a flushable batch
// which will cause the previous memtable to be flushed; value for 5 will go in the next
// memtable
d, err = Open(dir, &Options{
FS: mem,
MemTableSize: 300 << 10,
ReadOnly: readOnly,
})
if err != nil {
t.Fatal(err)
}
if readOnly {
d.mu.Lock()
require.Equal(t, 10, len(d.mu.mem.queue))
require.NotNil(t, d.mu.mem.mutable)
d.mu.Unlock()
}
checkIter(d.NewIter(nil))
require.NoError(t, d.Close())
})
}
}

// Similar to TestOpenWALReplay, except we test replay behavior after a
// memtable has been flushed. We test all 3 reasons for flushing: forced, size,
// and large-batch.
func TestOpenWALReplay2(t *testing.T) {
for _, readOnly := range []bool{false, true} {
t.Run(fmt.Sprintf("read-only=%t", readOnly), func(t *testing.T) {
for _, reason := range []string{"forced", "size", "large-batch"} {
t.Run(reason, func(t *testing.T) {
mem := vfs.NewMem()
d, err := Open("", &Options{
FS: mem,
MemTableSize: 256 << 10,
})
require.NoError(t, err)

switch reason {
case "forced":
require.NoError(t, d.Set([]byte("1"), nil, nil))
require.NoError(t, d.Flush())
require.NoError(t, d.Set([]byte("2"), nil, nil))
case "size":
largeValue := []byte(strings.Repeat("a", 100<<10))
require.NoError(t, d.Set([]byte("1"), largeValue, nil))
require.NoError(t, d.Set([]byte("2"), largeValue, nil))
require.NoError(t, d.Set([]byte("3"), largeValue, nil))
case "large-batch":
largeValue := []byte(strings.Repeat("a", d.largeBatchThreshold))
require.NoError(t, d.Set([]byte("1"), nil, nil))
require.NoError(t, d.Set([]byte("2"), largeValue, nil))
require.NoError(t, d.Set([]byte("3"), nil, nil))
}
require.NoError(t, d.Close())

files, err := mem.List("")
require.NoError(t, err)
sort.Strings(files)
sstCount := 0
for _, fname := range files {
if strings.HasSuffix(fname, ".sst") {
sstCount++
}
}
require.Equal(t, 1, sstCount)

// Re-open the DB with a smaller memtable. Values for 1, 2 will fit in the first memtable;
// value for 3 will go in the next memtable; value for 4 will be in a flushable batch
// which will cause the previous memtable to be flushed; value for 5 will go in the next
// memtable
d, err = Open("", &Options{
FS: mem,
MemTableSize: 300 << 10,
ReadOnly: readOnly,
})
if err != nil {
t.Fatal(err)
}
require.NoError(t, d.Close())
})
}
}
require.Equal(t, 0, sstCount)
// The memtable size starts at 256KB and doubles up to 32MB so we expect 5
// logs (one for each doubling).
require.Equal(t, 7, logCount)

// Re-open the DB with a smaller memtable. Values for 1, 2 will fit in the first memtable;
// value for 3 will go in the next memtable; value for 4 will be in a flushable batch
// which will cause the previous memtable to be flushed; value for 5 will go in the next
// memtable
d, err = Open(dir, &Options{
FS: mem,
MemTableSize: 300 << 10,
ReadOnly: readOnly,
})
if err != nil {
t.Fatal(err)
}
if readOnly {
d.mu.Lock()
require.Equal(t, 10, len(d.mu.mem.queue))
require.NotNil(t, d.mu.mem.mutable)
d.mu.Unlock()
}
checkIter(d.NewIter(nil))
require.NoError(t, d.Close())
}
}

Expand Down
19 changes: 15 additions & 4 deletions version_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ func (vs *versionSet) logUnlock() {
// (see logLock). Will unconditionally release the manifest lock (via
// logUnlock) even if an error occurs.
//
// inProgressCompactions is called while DB.mu is held, to get the list of in-progress compactions.
// inProgressCompactions is called while DB.mu is held, to get the list of
// in-progress compactions.
func (vs *versionSet) logAndApply(
jobID int,
ve *versionEdit,
Expand All @@ -312,6 +313,7 @@ func (vs *versionSet) logAndApply(
ve.MinUnflushedLogNum))
}
}

// This is the next manifest filenum, but if the current file is too big we
// will write this ve to the next file which means what ve encodes is the
// current filenum and not the next one.
Expand All @@ -320,8 +322,16 @@ func (vs *versionSet) logAndApply(
ve.NextFileNum = vs.nextFileNum

// LastSeqNum is set to the current upper bound on the assigned sequence
// numbers.
// numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is
// used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on
// replay. It must be higher than any than any sequence number written to an
// sstable, including sequence numbers in ingested files. Note that
// LastSeqNum is not (and cannot be) the minumum unflushed sequence
// number. This is fallout from ingestion which allows a sequence number X to
// be assigned to an ingested sstable even though sequence number X-1 resides
// in an unflushed memtable.
ve.LastSeqNum = atomic.LoadUint64(&vs.logSeqNum)

currentVersion := vs.currentVersion()
var newVersion *version

Expand All @@ -332,7 +342,8 @@ func (vs *versionSet) logAndApply(
newManifestFileNum = vs.getNextFileNum()
}

// Grab certain values before releasing vs.mu, in case createManifest() needs to be called.
// Grab certain values before releasing vs.mu, in case createManifest() needs
// to be called.
minUnflushedLogNum := vs.minUnflushedLogNum
nextFileNum := vs.nextFileNum

Expand Down Expand Up @@ -447,7 +458,7 @@ func (vs *versionSet) incrementFlushes() {

// createManifest creates a manifest file that contains a snapshot of vs.
func (vs *versionSet) createManifest(
dirname string, fileNum uint64, minUnflushedLogNum uint64, nextFileNum uint64,
dirname string, fileNum, minUnflushedLogNum, nextFileNum uint64,
) (err error) {
var (
filename = base.MakeFilename(vs.fs, dirname, fileTypeManifest, fileNum)
Expand Down

0 comments on commit 7ad59b7

Please sign in to comment.