Skip to content

Commit

Permalink
db: manually managed memTable arena memory
Browse files Browse the repository at this point in the history
  • Loading branch information
petermattis committed Feb 7, 2020
1 parent d156c7f commit e16cf94
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 35 deletions.
22 changes: 15 additions & 7 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
"errors"
"fmt"
"io"
"runtime"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/pebble/internal/arenaskl"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/manual"
"github.com/cockroachdb/pebble/internal/record"
"github.com/cockroachdb/pebble/vfs"
)
Expand Down Expand Up @@ -1145,19 +1148,24 @@ func (d *DB) newMemTable(logNum, logSeqNum uint64) (*memTable, *flushableEntry)
atomic.AddInt64(&d.memTableCount, 1)
atomic.AddInt64(&d.memTableReserved, int64(size))
releaseAccountingReservation := d.opts.Cache.Reserve(size)
releaseMemAccounting := func() {
atomic.AddInt64(&d.memTableCount, -1)
atomic.AddInt64(&d.memTableReserved, -int64(size))
releaseAccountingReservation()
}

mem := newMemTable(memTableOptions{
Options: d.opts,
size: size,
arenaBuf: manual.New(int(size)),
logSeqNum: logSeqNum,
})
if invariants.Enabled {
runtime.SetFinalizer(mem, checkMemTable)
}

entry := d.newFlushableEntry(mem, logNum, logSeqNum)
entry.releaseMemAccounting = releaseMemAccounting
entry.releaseMemAccounting = func() {
manual.Free(mem.arenaBuf)
mem.arenaBuf = nil
atomic.AddInt64(&d.memTableCount, -1)
atomic.AddInt64(&d.memTableReserved, -int64(size))
releaseAccountingReservation()
}
return mem, entry
}

Expand Down
6 changes: 2 additions & 4 deletions internal/arenaskl/arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"math"
"sync/atomic"
"unsafe"

"github.com/cockroachdb/pebble/internal/rawalloc"
)

// Arena is lock-free.
Expand All @@ -43,12 +41,12 @@ var (
)

// NewArena allocates a new arena of the specified size and returns it.
func NewArena(size uint32) *Arena {
func NewArena(buf []byte) *Arena {
// Don't store data at position 0 in order to reserve offset=0 as a kind
// of nil pointer.
return &Arena{
n: 1,
buf: rawalloc.New(int(size), int(size)),
buf: buf,
}
}

Expand Down
6 changes: 5 additions & 1 deletion internal/arenaskl/arena_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ import (
"github.com/stretchr/testify/require"
)

func newArena(n uint32) *Arena {
return NewArena(make([]byte, n))
}

// TestArenaSizeOverflow tests that large allocations do not cause Arena's
// internal size accounting to overflow and produce incorrect results.
func TestArenaSizeOverflow(t *testing.T) {
a := NewArena(math.MaxUint32)
a := newArena(math.MaxUint32)

// Allocating under the limit throws no error.
offset, _, err := a.alloc(math.MaxUint16, 0)
Expand Down
40 changes: 20 additions & 20 deletions internal/arenaskl/skl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func lengthRev(s *Skiplist) int {

func TestEmpty(t *testing.T) {
key := makeKey("aaa")
l := NewSkiplist(NewArena(arenaSize), bytes.Compare)
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
it := newIterAdapter(l.NewIter(nil, nil))

require.False(t, it.Valid())
Expand All @@ -160,7 +160,7 @@ func TestEmpty(t *testing.T) {
}

func TestFull(t *testing.T) {
l := NewSkiplist(NewArena(1000), bytes.Compare)
l := NewSkiplist(newArena(1000), bytes.Compare)

foundArenaFull := false
for i := 0; i < 100; i++ {
Expand All @@ -181,7 +181,7 @@ func TestFull(t *testing.T) {
func TestBasic(t *testing.T) {
for _, inserter := range []bool{false, true} {
t.Run(fmt.Sprintf("inserter=%t", inserter), func(t *testing.T) {
l := NewSkiplist(NewArena(arenaSize), bytes.Compare)
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
it := newIterAdapter(l.NewIter(nil, nil))

add := l.Add
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestConcurrentBasic(t *testing.T) {
for _, inserter := range []bool{false, true} {
t.Run(fmt.Sprintf("inserter=%t", inserter), func(t *testing.T) {
// Set testing flag to make it easier to trigger unusual race conditions.
l := NewSkiplist(NewArena(arenaSize), bytes.Compare)
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
l.testing = true

var wg sync.WaitGroup
Expand Down Expand Up @@ -298,7 +298,7 @@ func TestConcurrentOneKey(t *testing.T) {
for _, inserter := range []bool{false, true} {
t.Run(fmt.Sprintf("inserter=%t", inserter), func(t *testing.T) {
// Set testing flag to make it easier to trigger unusual race conditions.
l := NewSkiplist(NewArena(arenaSize), bytes.Compare)
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
l.testing = true

var wg sync.WaitGroup
Expand Down Expand Up @@ -352,7 +352,7 @@ func TestConcurrentOneKey(t *testing.T) {
func TestSkiplistAdd(t *testing.T) {
for _, inserter := range []bool{false, true} {
t.Run(fmt.Sprintf("inserter=%t", inserter), func(t *testing.T) {
l := NewSkiplist(NewArena(arenaSize), bytes.Compare)
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
it := newIterAdapter(l.NewIter(nil, nil))

add := l.Add
Expand All @@ -367,7 +367,7 @@ func TestSkiplistAdd(t *testing.T) {
require.EqualValues(t, []byte{}, it.Key().UserKey)
require.EqualValues(t, []byte{}, it.Value())

l = NewSkiplist(NewArena(arenaSize), bytes.Compare)
l = NewSkiplist(newArena(arenaSize), bytes.Compare)
it = newIterAdapter(l.NewIter(nil, nil))

add = l.Add
Expand Down Expand Up @@ -429,7 +429,7 @@ func TestConcurrentAdd(t *testing.T) {
const n = 100

// Set testing flag to make it easier to trigger unusual race conditions.
l := NewSkiplist(NewArena(arenaSize), bytes.Compare)
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
l.testing = true

start := make([]sync.WaitGroup, n)
Expand Down Expand Up @@ -476,7 +476,7 @@ func TestConcurrentAdd(t *testing.T) {
// TestIteratorNext tests a basic iteration over all nodes from the beginning.
func TestIteratorNext(t *testing.T) {
const n = 100
l := NewSkiplist(NewArena(arenaSize), bytes.Compare)
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
it := newIterAdapter(l.NewIter(nil, nil))

require.False(t, it.Valid())
Expand All @@ -501,7 +501,7 @@ func TestIteratorNext(t *testing.T) {
// TestIteratorPrev tests a basic iteration over all nodes from the end.
func TestIteratorPrev(t *testing.T) {
const n = 100
l := NewSkiplist(NewArena(arenaSize), bytes.Compare)
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
it := newIterAdapter(l.NewIter(nil, nil))

require.False(t, it.Valid())
Expand All @@ -526,7 +526,7 @@ func TestIteratorPrev(t *testing.T) {

func TestIteratorSeekGE(t *testing.T) {
const n = 100
l := NewSkiplist(NewArena(arenaSize), bytes.Compare)
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
it := newIterAdapter(l.NewIter(nil, nil))

require.False(t, it.Valid())
Expand Down Expand Up @@ -576,7 +576,7 @@ func TestIteratorSeekGE(t *testing.T) {

func TestIteratorSeekLT(t *testing.T) {
const n = 100
l := NewSkiplist(NewArena(arenaSize), bytes.Compare)
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
it := newIterAdapter(l.NewIter(nil, nil))

require.False(t, it.Valid())
Expand Down Expand Up @@ -627,7 +627,7 @@ func TestIteratorSeekLT(t *testing.T) {

// TODO(peter): test First and Last.
func TestIteratorBounds(t *testing.T) {
l := NewSkiplist(NewArena(arenaSize), bytes.Compare)
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
for i := 1; i < 10; i++ {
err := l.Add(makeIntKey(i), makeValue(i))
if err != nil {
Expand Down Expand Up @@ -695,7 +695,7 @@ func TestIteratorBounds(t *testing.T) {
}

func TestBytesIterated(t *testing.T) {
l := NewSkiplist(NewArena(arenaSize), bytes.Compare)
l := NewSkiplist(newArena(arenaSize), bytes.Compare)
emptySize := l.arena.Size()
for i := 0; i < 200; i++ {
bytesIterated := l.bytesIterated(t)
Expand Down Expand Up @@ -737,7 +737,7 @@ func BenchmarkReadWrite(b *testing.B) {
for i := 0; i <= 10; i++ {
readFrac := float32(i) / 10.0
b.Run(fmt.Sprintf("frac_%d", i*10), func(b *testing.B) {
l := NewSkiplist(NewArena(uint32((b.N+2)*maxNodeSize)), bytes.Compare)
l := NewSkiplist(newArena(uint32((b.N+2)*maxNodeSize)), bytes.Compare)
b.ResetTimer()
var count int
b.RunParallel(func(pb *testing.PB) {
Expand All @@ -762,7 +762,7 @@ func BenchmarkReadWrite(b *testing.B) {
}

func BenchmarkOrderedWrite(b *testing.B) {
l := NewSkiplist(NewArena(8<<20), bytes.Compare)
l := NewSkiplist(newArena(8<<20), bytes.Compare)
var ins Inserter
buf := make([]byte, 8)

Expand All @@ -771,15 +771,15 @@ func BenchmarkOrderedWrite(b *testing.B) {
binary.BigEndian.PutUint64(buf, uint64(i))
if err := ins.Add(l, base.InternalKey{UserKey: buf}, nil); err == ErrArenaFull {
b.StopTimer()
l = NewSkiplist(NewArena(uint32((b.N+2)*maxNodeSize)), bytes.Compare)
l = NewSkiplist(newArena(uint32((b.N+2)*maxNodeSize)), bytes.Compare)
ins = Inserter{}
b.StartTimer()
}
}
}

func BenchmarkIterNext(b *testing.B) {
l := NewSkiplist(NewArena(64<<10), bytes.Compare)
l := NewSkiplist(newArena(64<<10), bytes.Compare)
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
buf := make([]byte, 8)
for {
Expand All @@ -800,7 +800,7 @@ func BenchmarkIterNext(b *testing.B) {
}

func BenchmarkIterPrev(b *testing.B) {
l := NewSkiplist(NewArena(64<<10), bytes.Compare)
l := NewSkiplist(newArena(64<<10), bytes.Compare)
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))
buf := make([]byte, 8)
for {
Expand Down Expand Up @@ -852,7 +852,7 @@ func BenchmarkIterPrev(b *testing.B) {
// }

func TestInvalidInternalKeyDecoding(t *testing.T) {
a := NewArena(arenaSize)
a := newArena(arenaSize)

// We synthetically fill the arena with an invalid key
// that doesn't have an 8 byte trailer.
Expand Down
18 changes: 17 additions & 1 deletion mem_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package pebble

import (
"fmt"
"os"
"sync"
"sync/atomic"
"unsafe"
Expand Down Expand Up @@ -47,6 +48,7 @@ func memTableEntrySize(keyBytes, valueBytes int) uint32 {
type memTable struct {
cmp Compare
equal Equal
arenaBuf []byte
skl arenaskl.Skiplist
rangeDelSkl arenaskl.Skiplist
// emptySize is the amount of allocated space in the arena when the memtable
Expand Down Expand Up @@ -74,10 +76,19 @@ type memTable struct {
// which is used by tests.
type memTableOptions struct {
*Options
arenaBuf []byte
size int
logSeqNum uint64
}

func checkMemTable(obj interface{}) {
m := obj.(*memTable)
if m.arenaBuf != nil {
fmt.Fprintf(os.Stderr, "%p: memTable buffer was not freed\n", m.arenaBuf)
os.Exit(1)
}
}

// newMemTable returns a new MemTable of the specified size. If size is zero,
// Options.MemTableSize is used instead.
func newMemTable(opts memTableOptions) *memTable {
Expand All @@ -89,11 +100,16 @@ func newMemTable(opts memTableOptions) *memTable {
m := &memTable{
cmp: opts.Comparer.Compare,
equal: opts.Comparer.Equal,
arenaBuf: opts.arenaBuf,
writerRefs: 1,
logSeqNum: opts.logSeqNum,
}

arena := arenaskl.NewArena(uint32(opts.size))
if m.arenaBuf == nil {
m.arenaBuf = make([]byte, opts.size)
}

arena := arenaskl.NewArena(m.arenaBuf)
m.skl.Reset(arena, m.cmp)
m.rangeDelSkl.Reset(arena, m.cmp)
m.emptySize = arena.Size()
Expand Down
38 changes: 36 additions & 2 deletions open.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,32 @@ import (
"io"
"io/ioutil"
"os"
"runtime"
"sort"
"sync/atomic"
"time"

"github.com/cockroachdb/pebble/internal/arenaskl"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/manual"
"github.com/cockroachdb/pebble/internal/rate"
"github.com/cockroachdb/pebble/internal/record"
"github.com/cockroachdb/pebble/vfs"
)

const initialMemTableSize = 256 << 10 // 256 KB

// Open opens a LevelDB whose files live in the given directory.
func Open(dirname string, opts *Options) (*DB, error) {
func checkDB(obj interface{}) {
d := obj.(*DB)
if atomic.LoadInt32(&d.closed) == 0 {
fmt.Fprintf(os.Stderr, "%p: unreferenced DB not closed\n", d)
os.Exit(1)
}
}

// Open opens a DB whose files live in the given directory.
func Open(dirname string, opts *Options) (db *DB, _ error) {
// Make a copy of the options so that we don't mutate the passed in options.
opts = opts.Clone()
opts = opts.EnsureDefaults()
Expand All @@ -44,6 +55,25 @@ func Open(dirname string, opts *Options) (*DB, error) {
abbreviatedKey: opts.Comparer.AbbreviatedKey,
logRecycler: logRecycler{limit: opts.MemTableStopWritesThreshold + 1},
}

defer func() {
// If an error or panic occurs during open, attempt to release the manually
// allocated memory resources. Note that rather than look for an error, we
// look for the return of a nil DB pointer.
if r := recover(); db == nil {
for _, mem := range d.mu.mem.queue {
switch t := mem.flushable.(type) {
case *memTable:
manual.Free(t.arenaBuf)
t.arenaBuf = nil
}
}
if r != nil {
panic(r)
}
}
}()

if d.equal == nil {
d.equal = bytes.Equal
}
Expand Down Expand Up @@ -274,6 +304,10 @@ func Open(dirname string, opts *Options) (*DB, error) {
d.maybeScheduleFlush()
d.maybeScheduleCompaction()

if invariants.Enabled {
runtime.SetFinalizer(d, checkDB)
}

d.fileLock, fileLock = fileLock, nil
return d, nil
}
Expand Down

0 comments on commit e16cf94

Please sign in to comment.