diff --git a/db.go b/db.go index c4cbda3d38..5936574bfc 100644 --- a/db.go +++ b/db.go @@ -1352,6 +1352,9 @@ func (d *DB) makeRoomForWrite(b *Batch) error { var logSeqNum uint64 if b != nil { logSeqNum = b.SeqNum() + if b.flushable != nil { + logSeqNum += uint64(b.Count()) + } } else { logSeqNum = atomic.LoadUint64(&d.mu.versions.logSeqNum) } diff --git a/db_test.go b/db_test.go index bcaea4e49f..6932a04464 100644 --- a/db_test.go +++ b/db_test.go @@ -384,11 +384,17 @@ func TestLargeBatch(t *testing.T) { } return info.Size() } + memTableCreationSeqNum := func() uint64 { + d.mu.Lock() + defer d.mu.Unlock() + return d.mu.mem.mutable.logSeqNum + } startLogNum := logNum() startLogStartSize := fileSize(startLogNum) + startSeqNum := atomic.LoadUint64(&d.mu.versions.logSeqNum) - // Write two keys with values that are larger than the memtable size. + // Write a key with a value larger than the memtable size. if err := d.Set([]byte("a"), bytes.Repeat([]byte("a"), 512), nil); err != nil { t.Fatal(err) } @@ -409,6 +415,9 @@ func TestLargeBatch(t *testing.T) { if endLogSize != 0 { t.Fatalf("expected %06d.log to be empty, but found %d", endLogNum, endLogSize) } + if creationSeqNum := memTableCreationSeqNum(); creationSeqNum <= startSeqNum { + t.Fatalf("expected memTable.logSeqNum=%d > largeBatch.seqNum=%d", creationSeqNum, startSeqNum) + } // Verify this results in one L0 table being created. err = try(100*time.Microsecond, 20*time.Second, verifyLSM("0:\n 000005:[a-a]\n")) diff --git a/mem_table.go b/mem_table.go index 6df1f1a4c4..420e02d98b 100644 --- a/mem_table.go +++ b/mem_table.go @@ -5,6 +5,7 @@ package pebble import ( + "bytes" "fmt" "sync" "sync/atomic" @@ -19,6 +20,17 @@ func memTableEntrySize(keyBytes, valueBytes int) uint32 { return arenaskl.MaxNodeSize(uint32(keyBytes)+8, uint32(valueBytes)) } +// memTableEmptySize is the amount of allocated space in the arena when the +// memtable is empty. +var memTableEmptySize = func() uint32 { + var pointSkl arenaskl.Skiplist + var rangeDelSkl arenaskl.Skiplist + arena := arenaskl.NewArena(16 << 10 /* 16 KB */) + pointSkl.Reset(arena, bytes.Compare) + rangeDelSkl.Reset(arena, bytes.Compare) + return arena.Size() +}() + // A memTable implements an in-memory layer of the LSM. A memTable is mutable, // but append-only. Records are added, but never removed. Deletion is supported // via tombstones, but it is up to higher level code (see Iterator) to support @@ -49,9 +61,6 @@ type memTable struct { equal Equal skl arenaskl.Skiplist rangeDelSkl arenaskl.Skiplist - // emptySize is the amount of allocated space in the arena when the memtable - // is empty. - emptySize uint32 // reserved tracks the amount of space used by the memtable, both by actual // data stored in the memtable as well as inflight batch commit // operations. This value is incremented pessimistically by prepare() in @@ -96,7 +105,6 @@ func newMemTable(opts memTableOptions) *memTable { arena := arenaskl.NewArena(uint32(opts.size)) m.skl.Reset(arena, m.cmp) m.rangeDelSkl.Reset(arena, m.cmp) - m.emptySize = arena.Size() return m } @@ -228,7 +236,7 @@ func (m *memTable) availBytes() uint32 { } func (m *memTable) inuseBytes() uint64 { - return uint64(m.skl.Size() - m.emptySize) + return uint64(m.skl.Size() - memTableEmptySize) } func (m *memTable) totalBytes() uint64 { @@ -241,7 +249,7 @@ func (m *memTable) close() error { // empty returns whether the MemTable has no key/value pairs. func (m *memTable) empty() bool { - return m.skl.Size() == m.emptySize + return m.skl.Size() == memTableEmptySize } // A rangeTombstoneFrags holds a set of fragmented range tombstones generated diff --git a/open.go b/open.go index 480fcdef21..cd799e3c9d 100644 --- a/open.go +++ b/open.go @@ -33,16 +33,17 @@ func Open(dirname string, opts *Options) (*DB, error) { } d := &DB{ - cacheID: opts.Cache.NewID(), - dirname: dirname, - walDirname: opts.WALDir, - opts: opts, - cmp: opts.Comparer.Compare, - equal: opts.Comparer.Equal, - merge: opts.Merger.Merge, - split: opts.Comparer.Split, - abbreviatedKey: opts.Comparer.AbbreviatedKey, - logRecycler: logRecycler{limit: opts.MemTableStopWritesThreshold + 1}, + cacheID: opts.Cache.NewID(), + dirname: dirname, + walDirname: opts.WALDir, + opts: opts, + cmp: opts.Comparer.Compare, + equal: opts.Comparer.Equal, + merge: opts.Merger.Merge, + split: opts.Comparer.Split, + abbreviatedKey: opts.Comparer.AbbreviatedKey, + largeBatchThreshold: (opts.MemTableSize - int(memTableEmptySize)) / 2, + logRecycler: logRecycler{limit: opts.MemTableStopWritesThreshold + 1}, } if d.equal == nil { d.equal = bytes.Equal @@ -145,11 +146,13 @@ func Open(dirname string, opts *Options) (*DB, error) { } } - { + // In read-only mode, we replay directly into the mutable memtable but never + // flush it. We need to delay creation of the memtable until we know the + // sequence number of the first batch that will be inserted. + if !d.opts.ReadOnly { var entry *flushableEntry d.mu.mem.mutable, entry = d.newMemTable(0 /* logNum */, d.mu.versions.logSeqNum) d.mu.mem.queue = append(d.mu.mem.queue, entry) - d.largeBatchThreshold = (d.opts.MemTableSize - int(d.mu.mem.mutable.emptySize)) / 2 } ls, err := opts.FS.List(d.walDirname) @@ -306,7 +309,9 @@ func (d *DB) replayWAL( // In read-only mode, we replay directly into the mutable memtable which will // never be flushed. mem = d.mu.mem.mutable - entry = d.mu.mem.queue[len(d.mu.mem.queue)-1] + if mem != nil { + entry = d.mu.mem.queue[len(d.mu.mem.queue)-1] + } } // Flushes the current memtable, if not nil. diff --git a/open_test.go b/open_test.go index 938d7e235e..47eed41b1a 100644 --- a/open_test.go +++ b/open_test.go @@ -375,8 +375,6 @@ func TestOpenReadOnly(t *testing.T) { } func TestOpenWALReplay(t *testing.T) { - mem := vfs.NewMem() - largeValue := []byte(strings.Repeat("a", 100<<10)) hugeValue := []byte(strings.Repeat("b", 10<<20)) checkIter := func(iter *Iterator) { @@ -392,62 +390,127 @@ func TestOpenWALReplay(t *testing.T) { t.Fatalf("%s\n%s", strings.Join(diff, "\n"), keys) } } + for _, readOnly := range []bool{false, true} { - t.Logf("read-only: %t", readOnly) - // Create a new DB and populate it with some data. - dir := fmt.Sprint(readOnly) - d, err := Open(dir, &Options{ - FS: mem, - MemTableSize: 32 << 20, - }) - require.NoError(t, err) - // All these values will fit in a single memtable, so on closing the db there - // will be no sst and all the data is in a single WAL. - require.NoError(t, d.Set([]byte("1"), largeValue, nil)) - require.NoError(t, d.Set([]byte("2"), largeValue, nil)) - require.NoError(t, d.Set([]byte("3"), largeValue, nil)) - require.NoError(t, d.Set([]byte("4"), hugeValue, nil)) - require.NoError(t, d.Set([]byte("5"), largeValue, nil)) - checkIter(d.NewIter(nil)) - require.NoError(t, d.Close()) - files, err := mem.List(dir) - require.NoError(t, err) - sort.Strings(files) - logCount, sstCount := 0, 0 - for _, fname := range files { - t.Log(fname) - if strings.HasSuffix(fname, ".sst") { - sstCount++ + t.Run(fmt.Sprintf("read-only=%t", readOnly), func(t *testing.T) { + // Create a new DB and populate it with some data. + const dir = "" + mem := vfs.NewMem() + d, err := Open(dir, &Options{ + FS: mem, + MemTableSize: 32 << 20, + }) + require.NoError(t, err) + // All these values will fit in a single memtable, so on closing the db there + // will be no sst and all the data is in a single WAL. + require.NoError(t, d.Set([]byte("1"), largeValue, nil)) + require.NoError(t, d.Set([]byte("2"), largeValue, nil)) + require.NoError(t, d.Set([]byte("3"), largeValue, nil)) + require.NoError(t, d.Set([]byte("4"), hugeValue, nil)) + require.NoError(t, d.Set([]byte("5"), largeValue, nil)) + checkIter(d.NewIter(nil)) + require.NoError(t, d.Close()) + files, err := mem.List(dir) + require.NoError(t, err) + sort.Strings(files) + logCount, sstCount := 0, 0 + for _, fname := range files { + if strings.HasSuffix(fname, ".sst") { + sstCount++ + } + if strings.HasSuffix(fname, ".log") { + logCount++ + } } - if strings.HasSuffix(fname, ".log") { - logCount++ + require.Equal(t, 0, sstCount) + // The memtable size starts at 256KB and doubles up to 32MB so we expect 5 + // logs (one for each doubling). + require.Equal(t, 7, logCount) + + // Re-open the DB with a smaller memtable. Values for 1, 2 will fit in the first memtable; + // value for 3 will go in the next memtable; value for 4 will be in a flushable batch + // which will cause the previous memtable to be flushed; value for 5 will go in the next + // memtable + d, err = Open(dir, &Options{ + FS: mem, + MemTableSize: 300 << 10, + ReadOnly: readOnly, + }) + if err != nil { + t.Fatal(err) + } + if readOnly { + d.mu.Lock() + require.Equal(t, 10, len(d.mu.mem.queue)) + require.NotNil(t, d.mu.mem.mutable) + d.mu.Unlock() + } + checkIter(d.NewIter(nil)) + require.NoError(t, d.Close()) + }) + } +} + +// Similar to TestOpenWALReplay, except we test replay behavior after a +// memtable has been flushed. We test all 3 reasons for flushing: forced, size, +// and large-batch. +func TestOpenWALReplay2(t *testing.T) { + for _, readOnly := range []bool{false, true} { + t.Run(fmt.Sprintf("read-only=%t", readOnly), func(t *testing.T) { + for _, reason := range []string{"forced", "size", "large-batch"} { + t.Run(reason, func(t *testing.T) { + mem := vfs.NewMem() + d, err := Open("", &Options{ + FS: mem, + MemTableSize: 256 << 10, + }) + require.NoError(t, err) + + switch reason { + case "forced": + require.NoError(t, d.Set([]byte("1"), nil, nil)) + require.NoError(t, d.Flush()) + require.NoError(t, d.Set([]byte("2"), nil, nil)) + case "size": + largeValue := []byte(strings.Repeat("a", 100<<10)) + require.NoError(t, d.Set([]byte("1"), largeValue, nil)) + require.NoError(t, d.Set([]byte("2"), largeValue, nil)) + require.NoError(t, d.Set([]byte("3"), largeValue, nil)) + case "large-batch": + largeValue := []byte(strings.Repeat("a", d.largeBatchThreshold)) + require.NoError(t, d.Set([]byte("1"), nil, nil)) + require.NoError(t, d.Set([]byte("2"), largeValue, nil)) + require.NoError(t, d.Set([]byte("3"), nil, nil)) + } + require.NoError(t, d.Close()) + + files, err := mem.List("") + require.NoError(t, err) + sort.Strings(files) + sstCount := 0 + for _, fname := range files { + if strings.HasSuffix(fname, ".sst") { + sstCount++ + } + } + require.Equal(t, 1, sstCount) + + // Re-open the DB with a smaller memtable. Values for 1, 2 will fit in the first memtable; + // value for 3 will go in the next memtable; value for 4 will be in a flushable batch + // which will cause the previous memtable to be flushed; value for 5 will go in the next + // memtable + d, err = Open("", &Options{ + FS: mem, + MemTableSize: 300 << 10, + ReadOnly: readOnly, + }) + if err != nil { + t.Fatal(err) + } + require.NoError(t, d.Close()) + }) } - } - require.Equal(t, 0, sstCount) - // The memtable size starts at 256KB and doubles up to 32MB so we expect 5 - // logs (one for each doubling). - require.Equal(t, 7, logCount) - - // Re-open the DB with a smaller memtable. Values for 1, 2 will fit in the first memtable; - // value for 3 will go in the next memtable; value for 4 will be in a flushable batch - // which will cause the previous memtable to be flushed; value for 5 will go in the next - // memtable - d, err = Open(dir, &Options{ - FS: mem, - MemTableSize: 300 << 10, - ReadOnly: readOnly, }) - if err != nil { - t.Fatal(err) - } - if readOnly { - d.mu.Lock() - require.Equal(t, 10, len(d.mu.mem.queue)) - require.NotNil(t, d.mu.mem.mutable) - d.mu.Unlock() - } - checkIter(d.NewIter(nil)) - require.NoError(t, d.Close()) } } diff --git a/version_set.go b/version_set.go index 9fe3c0f82f..5d9e4f8d5a 100644 --- a/version_set.go +++ b/version_set.go @@ -292,7 +292,8 @@ func (vs *versionSet) logUnlock() { // (see logLock). Will unconditionally release the manifest lock (via // logUnlock) even if an error occurs. // -// inProgressCompactions is called while DB.mu is held, to get the list of in-progress compactions. +// inProgressCompactions is called while DB.mu is held, to get the list of +// in-progress compactions. func (vs *versionSet) logAndApply( jobID int, ve *versionEdit, @@ -312,6 +313,7 @@ func (vs *versionSet) logAndApply( ve.MinUnflushedLogNum)) } } + // This is the next manifest filenum, but if the current file is too big we // will write this ve to the next file which means what ve encodes is the // current filenum and not the next one. @@ -320,8 +322,16 @@ func (vs *versionSet) logAndApply( ve.NextFileNum = vs.nextFileNum // LastSeqNum is set to the current upper bound on the assigned sequence - // numbers. + // numbers. Note that this is exactly the behavior of RocksDB. LastSeqNum is + // used to initialize versionSet.logSeqNum and versionSet.visibleSeqNum on + // replay. It must be higher than any than any sequence number written to an + // sstable, including sequence numbers in ingested files. Note that + // LastSeqNum is not (and cannot be) the minumum unflushed sequence + // number. This is fallout from ingestion which allows a sequence number X to + // be assigned to an ingested sstable even though sequence number X-1 resides + // in an unflushed memtable. ve.LastSeqNum = atomic.LoadUint64(&vs.logSeqNum) + currentVersion := vs.currentVersion() var newVersion *version @@ -332,7 +342,8 @@ func (vs *versionSet) logAndApply( newManifestFileNum = vs.getNextFileNum() } - // Grab certain values before releasing vs.mu, in case createManifest() needs to be called. + // Grab certain values before releasing vs.mu, in case createManifest() needs + // to be called. minUnflushedLogNum := vs.minUnflushedLogNum nextFileNum := vs.nextFileNum @@ -447,7 +458,7 @@ func (vs *versionSet) incrementFlushes() { // createManifest creates a manifest file that contains a snapshot of vs. func (vs *versionSet) createManifest( - dirname string, fileNum uint64, minUnflushedLogNum uint64, nextFileNum uint64, + dirname string, fileNum, minUnflushedLogNum, nextFileNum uint64, ) (err error) { var ( filename = base.MakeFilename(vs.fs, dirname, fileTypeManifest, fileNum)