Skip to content

Commit

Permalink
internal/counter: fix windows mapped file extension
Browse files Browse the repository at this point in the history
On Windows, unmapping the previous mappedFile in mmap.Mmap caused
panics when counter pointers were read concurrent to the remapping.

Upon investigation, it appears that the unmapping was only necessary for
tests, to ensure that previous mappings were cleaned up (and therefore
that test files can be deleted). A call to runtime.SetFinalizer(...,
mappedFile.close) appeared to serve a similar purpose, yet for an
unknown reason finalizers were never run.

Deeper investigation revealed that there was simply one bug in file
cleanup (coincidentally already noted in a TODO): after storing the
newly mapped file in file.newCounter1 and invalidating counters, we can
close the previous mappedFile.

Therefore:
- fix the cleanup in file.newCounter1
- remove the unmap in mmap.Mmap on windows
- remove the now unnecessary 'existing' parameter in mmap APIs
- remove the SetFinalizer call
- add a test for multiple concurrent mappings of a file
- add an end-to-end test for concurrent file extension
- change ReadCounter to read by memory mapping the file, in an attempt
  to avoid msync issues

For golang/go#68311
Fixes golang/go#68358

Change-Id: I27b6f4f4939e93f7c76f920d553848bf014be236
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/597278
LUCI-TryBot-Result: Go LUCI <[email protected]>
Reviewed-by: Hyang-Ah Hana Kim <[email protected]>
  • Loading branch information
findleyr committed Jul 11, 2024
1 parent 9bf37a1 commit 8c19bfb
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 65 deletions.
108 changes: 108 additions & 0 deletions internal/counter/concurrent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package counter_test

import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"

"golang.org/x/telemetry/counter/countertest"
"golang.org/x/telemetry/internal/counter"
"golang.org/x/telemetry/internal/regtest"
"golang.org/x/telemetry/internal/telemetry"
"golang.org/x/telemetry/internal/testenv"
)

func TestConcurrentExtension(t *testing.T) {
testenv.SkipIfUnsupportedPlatform(t)

// This test verifies that files may be concurrently extended: when one file
// discovers that its entries exceed the mapped data, it remaps the data.

// Both programs populate enough new records to extend the file multiple
// times.
const numCounters = 50000
prog1 := regtest.NewProgram(t, "inc1", func() int {
for i := 0; i < numCounters; i++ {
counter.New(fmt.Sprint("gophers", i)).Inc()
}
return 0
})
prog2 := regtest.NewProgram(t, "inc2", func() int {
for i := numCounters; i < 2*numCounters; i++ {
counter.New(fmt.Sprint("gophers", i)).Inc()
}
return 0
})

dir := t.TempDir()
now := time.Now().UTC()

// Run a no-op program in the telemetry dir to ensure that the weekends file
// exists, and avoid the race described in golang/go#68390.
// (We could also call countertest.Open here, but better to avoid mutating
// state in the current process for a test that is otherwise hermetic)
prog0 := regtest.NewProgram(t, "init", func() int { return 0 })
if _, err := regtest.RunProgAsOf(t, dir, now, prog0); err != nil {
t.Fatal(err)
}

var wg sync.WaitGroup
wg.Add(2)

// Run the programs concurrently.
go func() {
defer wg.Done()
if out, err := regtest.RunProgAsOf(t, dir, now, prog1); err != nil {
t.Errorf("prog1 failed: %v; output:\n%s", err, out)
}
}()
go func() {
defer wg.Done()
if out, err := regtest.RunProgAsOf(t, dir, now, prog2); err != nil {
t.Errorf("prog2 failed: %v; output:\n%s", err, out)
}
}()

wg.Wait()

counts := readCountsForDir(t, telemetry.NewDir(dir).LocalDir())
if got, want := len(counts), 2*numCounters; got != want {
t.Errorf("Got %d counters, want %d", got, want)
}

for name, value := range counts {
if value != 1 {
t.Errorf("count(%s) = %d, want 1", name, value)
}
}
}

func readCountsForDir(t *testing.T, dir string) map[string]uint64 {
entries, err := os.ReadDir(dir)
if err != nil {
t.Fatal(err)
}
var countFiles []string
for _, entry := range entries {
if strings.HasSuffix(entry.Name(), ".count") {
countFiles = append(countFiles, filepath.Join(dir, entry.Name()))
}
}
if len(countFiles) != 1 {
t.Fatalf("found %d count files, want 1; directory contents: %v", len(countFiles), entries)
}

counters, _, err := countertest.ReadFile(countFiles[0])
if err != nil {
t.Fatal(err)
}
return counters
}
26 changes: 25 additions & 1 deletion internal/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (c *Counter) releaseLock(state counterStateBits) {
}
}

// add wraps the atomic.Uint64.Add operation to handle integer overflow.
func (c *Counter) add(n uint64) uint64 {
count := c.ptr.count
for {
Expand Down Expand Up @@ -340,7 +341,7 @@ func readFile(f *file) (*File, error) {
func ReadFile(name string) (counters, stackCounters map[string]uint64, _ error) {
// TODO: Document the format of the stackCounters names.

data, err := os.ReadFile(name)
data, err := readMapped(name)
if err != nil {
return nil, nil, fmt.Errorf("failed to read from file: %v", err)
}
Expand All @@ -359,3 +360,26 @@ func ReadFile(name string) (counters, stackCounters map[string]uint64, _ error)
}
return counters, stackCounters, nil
}

// readMapped reads the contents of the given file by memory mapping.
//
// This avoids file synchronization issues.
func readMapped(name string) ([]byte, error) {
f, err := os.OpenFile(name, os.O_RDWR, 0666)
if err != nil {
return nil, err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return nil, err
}
mapping, err := memmap(f)
if err != nil {
return nil, err
}
data := make([]byte, fi.Size())
copy(data, mapping.Data)
munmap(mapping)
return data, nil
}
24 changes: 11 additions & 13 deletions internal/counter/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func TestBasic(t *testing.T) {

t.Logf("GOOS %s GOARCH %s", runtime.GOOS, runtime.GOARCH)
setup(t)
defer restore()
var f file
defer close(&f)
c := f.New("gophers")
Expand Down Expand Up @@ -82,9 +81,9 @@ func TestParallel(t *testing.T) {

t.Logf("GOOS %s GOARCH %s", runtime.GOOS, runtime.GOARCH)
setup(t)
defer restore()
var f file
defer close(&f)

c := f.New("manygophers")

var wg sync.WaitGroup
Expand Down Expand Up @@ -122,8 +121,9 @@ func TestParallel(t *testing.T) {
}
}

// this is needed in Windows so that the generated testing.go file
// can clean up the temporary test directory
// close ensures that the given mapped file is closed. On Windows, this is
// necessary prior to test cleanup.
// TODO(rfindley): rename.
func close(f *file) {
mf := f.current.Load()
if mf == nil {
Expand All @@ -137,7 +137,7 @@ func TestLarge(t *testing.T) {
testenv.SkipIfUnsupportedPlatform(t)
t.Logf("GOOS %s GOARCH %s", runtime.GOOS, runtime.GOARCH)
setup(t)
defer restore()

var f file
defer close(&f)
f.rotate()
Expand Down Expand Up @@ -184,7 +184,6 @@ func TestRepeatedNew(t *testing.T) {

t.Logf("GOOS %s GOARCH %s", runtime.GOOS, runtime.GOARCH)
setup(t)
defer restore()
var f file
defer close(&f)
f.rotate()
Expand Down Expand Up @@ -224,7 +223,7 @@ func TestNewFile(t *testing.T) {

t.Logf("GOOS %s GOARCH %s", runtime.GOOS, runtime.GOARCH)
setup(t)
defer restore()

now := CounterTime().UTC()
year, month, day := now.Date()
// preserve time location as done in (*file).filename.
Expand Down Expand Up @@ -357,8 +356,9 @@ func TestWeekends(t *testing.T) {
if weekends != ends.Weekday() {
t.Errorf("weekends %s unexpecteledy not end day %s", weekends, ends.Weekday())
}
// needed for Windows
// On Windows, we must unmap f.current before removing files below.
close(&f)

// remove files for the next iteration of the loop
for _, f := range fis {
os.Remove(filepath.Join(telemetry.Default.LocalDir(), f.Name()))
Expand All @@ -377,7 +377,6 @@ func TestStack(t *testing.T) {
testenv.SkipIfUnsupportedPlatform(t)
t.Logf("GOOS %s GOARCH %s", runtime.GOOS, runtime.GOARCH)
setup(t)
defer restore()
var f file
defer close(&f)
f.rotate()
Expand Down Expand Up @@ -508,10 +507,9 @@ func setup(t *testing.T) {
telemetry.Default = telemetry.NewDir(t.TempDir()) // new dir for each test
os.MkdirAll(telemetry.Default.LocalDir(), 0777)
os.MkdirAll(telemetry.Default.UploadDir(), 0777)
}

func restore() {
CounterTime = func() time.Time { return time.Now().UTC() }
t.Cleanup(func() {
CounterTime = func() time.Time { return time.Now().UTC() }
})
}

func (f *file) New(name string) *Counter {
Expand Down
62 changes: 44 additions & 18 deletions internal/counter/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,22 @@ type file struct {
buildInfo *debug.BuildInfo
timeBegin, timeEnd time.Time
err error
current atomic.Pointer[mappedFile] // may be read without holding mu, but may be nil
// current holds the current file mapping, which may change when the file is
// rotated or extended.
//
// current may be read without holding mu, but may be nil.
//
// The cleanup logic for file mappings is complicated, because invalidating
// counter pointers is reentrant: [file.invalidateCounters] may call
// [file.lookup], which acquires mu. Therefore, writing current must be done
// as follows:
// 1. record the previous value of current
// 2. Store a new value in current
// 3. unlock mu
// 4. call invalidateCounters
// 5. close the previous mapped value from (1)
// TODO(rfindley): simplify
current atomic.Pointer[mappedFile]
}

var defaultFile file
Expand Down Expand Up @@ -292,7 +307,7 @@ func (f *file) rotate1() time.Time {
}
name := filepath.Join(dir, baseName)

m, err := openMapped(name, meta, nil)
m, err := openMapped(name, meta)
if err != nil {
// Mapping failed:
// If there used to be a mapped file, after cleanup
Expand Down Expand Up @@ -334,8 +349,10 @@ func (f *file) newCounter1(name string) (v *atomic.Uint64, cleanup func()) {
cleanup = nop
if newM != nil {
f.current.Store(newM)
// TODO(rfindley): shouldn't this close f.current?
cleanup = f.invalidateCounters
cleanup = func() {
f.invalidateCounters()
current.close()
}
}
return v, cleanup
}
Expand Down Expand Up @@ -386,7 +403,7 @@ type mappedFile struct {

// existing should be nil the first time this is called for a file,
// and when remapping, should be the previous mappedFile.
func openMapped(name string, meta string, existing *mappedFile) (_ *mappedFile, err error) {
func openMapped(name string, meta string) (_ *mappedFile, err error) {
hdr, err := mappedHeader(meta)
if err != nil {
return nil, err
Expand All @@ -402,13 +419,13 @@ func openMapped(name string, meta string, existing *mappedFile) (_ *mappedFile,
f: f,
meta: meta,
}
// without this files cannot be cleanedup on Windows (affects tests)
runtime.SetFinalizer(m, (*mappedFile).close)

defer func() {
if err != nil {
m.close()
}
}()

info, err := f.Stat()
if err != nil {
return nil, err
Expand All @@ -433,16 +450,11 @@ func openMapped(name string, meta string, existing *mappedFile) (_ *mappedFile,
}

// Map into memory.
var mapping mmap.Data
if existing != nil {
mapping, err = memmap(f, existing.mapping)
} else {
mapping, err = memmap(f, nil)
}
mapping, err := memmap(f)
if err != nil {
return nil, err
}
m.mapping = &mapping
m.mapping = mapping
if !bytes.HasPrefix(m.mapping.Data, hdr) {
return nil, fmt.Errorf("counter: header mismatch")
}
Expand Down Expand Up @@ -597,7 +609,11 @@ func (m *mappedFile) newCounter(name string) (v *atomic.Uint64, m1 *mappedFile,
}()

v, headOff, head, ok := m.lookup(name)
for !ok {
for tries := 0; !ok; tries++ {
if tries >= 10 {
debugPrintf("corrupt: failed to remap after 10 tries")
return nil, nil, errCorrupt
}
// Lookup found an invalid pointer,
// perhaps because the file has grown larger than the mapping.
limit := m.load32(m.hdrLen + limitOff)
Expand All @@ -606,10 +622,12 @@ func (m *mappedFile) newCounter(name string) (v *atomic.Uint64, m1 *mappedFile,
debugPrintf("corrupt1\n")
return nil, nil, errCorrupt
}
newM, err := openMapped(m.f.Name(), m.meta, m)
newM, err := openMapped(m.f.Name(), m.meta)
if err != nil {
return nil, nil, err
}
// If m != orig, this is at least the second time around the loop
// trying to open the mapping. Close the previous attempt.
if m != orig {
m.close()
}
Expand Down Expand Up @@ -690,8 +708,16 @@ func (m *mappedFile) extend(end uint32) (*mappedFile, error) {
return nil, err
}
}
newM, err := openMapped(m.f.Name(), m.meta, m)
m.f.Close()
newM, err := openMapped(m.f.Name(), m.meta)
if err != nil {
return nil, err
}
if int64(len(newM.mapping.Data)) < int64(end) {
// File system or logic bug: new file is somehow not extended.
// See go.dev/issue/68311, where this appears to have been happening.
newM.close()
return nil, errCorrupt
}
return newM, err
}

Expand Down
Loading

0 comments on commit 8c19bfb

Please sign in to comment.