Skip to content

Commit

Permalink
prometheusremotewrite: release lock when wal empty
Browse files Browse the repository at this point in the history
Previously, the wal tailing routine held an exclusive lock over the
entire wal, while waiting for new sections to be written.

This led to a deadlock situtation, as a locked wal naturally can not be written to.

To remedy this, the lock is now only held during actual read attempts,
not in between.

Furthermore, inotify watching of the wal dir has been replaced with
channel-based signaling between the producers and the tailing routine,
so that the latter blocks until any new writes have happened.
  • Loading branch information
sh0rez committed Apr 12, 2023
1 parent e4a2605 commit 45fa20d
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 89 deletions.
154 changes: 65 additions & 89 deletions exporter/prometheusremotewriteexporter/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"sync/atomic"
"time"

"github.com/fsnotify/fsnotify"
"github.com/gogo/protobuf/proto"
"github.com/prometheus/prometheus/prompb"
"github.com/tidwall/wal"
Expand All @@ -41,8 +40,11 @@ type prweWAL struct {

stopOnce sync.Once
stopChan chan struct{}
rNotify chan struct{}
rWALIndex *atomic.Uint64
wWALIndex *atomic.Uint64

log *zap.Logger
}

const (
Expand Down Expand Up @@ -77,13 +79,16 @@ func newWAL(walConfig *WALConfig, exportSink func(context.Context, []*prompb.Wri
return nil, errNilConfig
}

return &prweWAL{
wal := prweWAL{
exportSink: exportSink,
walConfig: walConfig,
stopChan: make(chan struct{}),
rNotify: make(chan struct{}),
rWALIndex: &atomic.Uint64{},
wWALIndex: &atomic.Uint64{},
}, nil
}

return &wal, nil
}

func (wc *WALConfig) createWAL() (*wal.Log, string, error) {
Expand Down Expand Up @@ -126,7 +131,9 @@ func (prwe *prweWAL) retrieveWALIndices() (err error) {
if err != nil {
return fmt.Errorf("prometheusremotewriteexporter: failed to retrieve the first WAL index: %w", err)
}
prwe.rWALIndex.Store(rIndex)

// NOTE: github.com/tidwall/wal reading starts at 1, but FirstIndex() may return 0 if the wal is empty
prwe.rWALIndex.Store(max(rIndex, 1))

wIndex, err := prwe.wal.LastIndex()
if err != nil {
Expand Down Expand Up @@ -155,6 +162,7 @@ func (prwe *prweWAL) run(ctx context.Context) (err error) {
if err != nil {
return
}
prwe.log = logger

if err = prwe.retrieveWALIndices(); err != nil {
logger.Error("unable to start write-ahead log", zap.Error(err))
Expand Down Expand Up @@ -324,113 +332,81 @@ func (prwe *prweWAL) persistToWAL(requests []*prompb.WriteRequest) error {
return err
}
wIndex := prwe.wWALIndex.Add(1)
prwe.log.Debug("write", zap.Uint64("index", wIndex))
batch.Write(wIndex, protoBlob)
}

// notify possibly waiting tailing routine of write
select {
case prwe.rNotify <- struct{}{}:
// tailing routine is actively waiting for write
default:
// no receiver, ignore
}

return prwe.wal.WriteBatch(batch)
}

func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (wreq *prompb.WriteRequest, err error) {
prwe.mu.Lock()
defer prwe.mu.Unlock()

var protoBlob []byte
for i := 0; i < 12; i++ {
// Firstly check if we've been terminated, then exit if so.
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-prwe.stopChan:
return nil, fmt.Errorf("attempt to read from WAL after stopped")
default:
}

if index <= 0 {
index = 1
}

if prwe.wal == nil {
return nil, fmt.Errorf("attempt to read from closed WAL")
}
// read repeatedly attempts to fetch a *prompb.WriteRequest from the WAL,
// aborting on error.
// prwe.mu is temporarily acquired for the individual read attempts only,
// allowing writes to happen while waiting
func (prwe *prweWAL) readPrompbFromWAL(ctx context.Context, index uint64) (*prompb.WriteRequest, error) {

protoBlob, err = prwe.wal.Read(index)
if err == nil { // The read succeeded.
req := new(prompb.WriteRequest)
if err = proto.Unmarshal(protoBlob, req); err != nil {
return nil, err
}
if prwe == nil {
return nil, fmt.Errorf("attempt to read from closed WAL")
}

// Now increment the WAL's read index.
prwe.rWALIndex.Add(1)
try := func() (*prompb.WriteRequest, error) {
prwe.mu.Lock()
defer prwe.mu.Unlock()

return req, nil
prwe.log.Debug("read", zap.Uint64("index", index))
protoBlob, err := prwe.wal.Read(index)
if err != nil {
return nil, err
}

if !errors.Is(err, wal.ErrNotFound) {
var req prompb.WriteRequest
if err := proto.Unmarshal(protoBlob, &req); err != nil {
return nil, err
}

if index <= 1 {
// This could be the very first attempted read, so try again, after a small sleep.
time.Sleep(time.Duration(1<<i) * time.Millisecond)
continue
}
prwe.rWALIndex.Add(1)
return &req, nil
}

// Otherwise, we couldn't find the record, let's try watching
// the WAL file until perhaps there is a write to it.
walWatcher, werr := fsnotify.NewWatcher()
if werr != nil {
return nil, werr
}
if werr = walWatcher.Add(prwe.walPath); werr != nil {
return nil, werr
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-prwe.stopChan:
return nil, fmt.Errorf("attempt to read from WAL after stopped")
default:
}

// Watch until perhaps there is a write to the WAL file.
watchCh := make(chan error)
wErr := err
go func() {
defer func() {
watchCh <- wErr
close(watchCh)
// Close the file watcher.
walWatcher.Close()
}()
req, err := try()
if errors.Is(err, wal.ErrNotFound) {
prwe.log.Debug("wal empty - waiting for write")

select {
case <-ctx.Done(): // If the context was cancelled, bail out ASAP.
wErr = ctx.Err()
return

case event, ok := <-walWatcher.Events:
if !ok {
return
}
switch event.Op {
case fsnotify.Remove:
// The file got deleted.
// TODO: Add capabilities to search for the updated file.
case fsnotify.Rename:
// Renamed, we don't have information about the renamed file's new name.
case fsnotify.Write:
// Finally a write, let's try reading again, but after some watch.
wErr = nil
}

case eerr, ok := <-walWatcher.Errors:
if ok {
wErr = eerr
}
case <-prwe.rNotify:
case <-ctx.Done():
return nil, ctx.Err()
}
}()

if gerr := <-watchCh; gerr != nil {
return nil, gerr
continue
} else if err != nil {
return nil, err
}

// Otherwise a write occurred might have occurred,
// and we can sleep for a little bit then try again.
time.Sleep(time.Duration(1<<i) * time.Millisecond)
return req, nil
}
}

func max(a, b uint64) uint64 {
if a > b {
return a
}
return nil, err
return b
}
63 changes: 63 additions & 0 deletions exporter/prometheusremotewriteexporter/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ package prometheusremotewriteexporter

import (
"context"
"os"
"sort"
"testing"
"time"

"github.com/prometheus/prometheus/prompb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func doNothingExportSink(_ context.Context, reqL []*prompb.WriteRequest) error {
Expand Down Expand Up @@ -167,3 +169,64 @@ func TestWAL_persist(t *testing.T) {
require.Equal(t, reqLFromWAL[0], reqL[0])
require.Equal(t, reqLFromWAL[1], reqL[1])
}

func TestWAL_E2E(t *testing.T) {
in := []*prompb.WriteRequest{
series("mem_used_percent", 0, 0),
series("mem_used_percent", 15, 34),
series("mem_used_percent", 30, 99),
}
out := make([]*prompb.WriteRequest, 0, len(in))

done := make(chan struct{})
sink := func(ctx context.Context, reqs []*prompb.WriteRequest) error {
out = append(out, reqs...)
if len(out) >= len(in) {
close(done)
}
return nil
}

wal, err := newWAL(&WALConfig{
Directory: t.TempDir(),
}, sink)
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(t.TempDir())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log, err := zap.NewDevelopment()
if err != nil {
panic(err)
}
ctx = contextWithLogger(ctx, log)

if err := wal.run(ctx); err != nil {
panic(err)
}

if err := wal.persistToWAL(in); err != nil {
panic(err)
}

// wait until the tail routine is no longer busy
wal.rNotify <- struct{}{}
cancel()

// wait until we received all series
<-done
require.Equal(t, in, out)
}

func series(name string, ts int64, value float64) *prompb.WriteRequest {
return &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{{Name: "__name__", Value: name}},
Samples: []prompb.Sample{{Value: value, Timestamp: ts}},
},
},
}
}

0 comments on commit 45fa20d

Please sign in to comment.