Skip to content

Commit

Permalink
Merge pull request #349 from bonitoo-io/feat/ignore_pw
Browse files Browse the repository at this point in the history
fix: skip retrying on some errors
  • Loading branch information
vlastahajek authored Aug 25, 2022
2 parents 38ada92 + 96a6c73 commit 5b9008c
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
### Features
- [#348](https://github.com/influxdata/influxdb-client-go/pull/348) Added `write.Options.Consitency` parameter to support InfluxDB Enterprise.

### Bug fixes
- [#349](https://github.com/influxdata/influxdb-client-go/pull/349) Skip retrying on specific write errors (mostly partial write error).


## 2.9.2 [2022-07-29]
### Bug fixes
- [#341](https://github.com/influxdata/influxdb-client-go/pull/341) Changing logging level of messages about discarding batch to Error.
Expand Down
92 changes: 65 additions & 27 deletions internal/write/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,38 +164,42 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
if batchToWrite != nil {
perror := w.WriteBatch(ctx, batchToWrite)
if perror != nil {
if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) {
log.Errorf("Write error: %s, batch kept for retrying\n", perror.Error())
if perror.RetryAfter > 0 {
w.retryDelay = perror.RetryAfter * 1000
} else {
w.retryDelay = w.computeRetryDelay(w.retryAttempts)
}
if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) {
log.Error("Callback rejected batch, discarding")
if !batchToWrite.Evicted {
w.retryQueue.pop()
if isIgnorableError(perror) {
log.Warnf("Write error: %s", perror.Error())
} else {
if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) {
log.Errorf("Write error: %s, batch kept for retrying\n", perror.Error())
if perror.RetryAfter > 0 {
w.retryDelay = perror.RetryAfter * 1000
} else {
w.retryDelay = w.computeRetryDelay(w.retryAttempts)
}
return perror
}
// store new batch (not taken from queue)
if !batchToWrite.Evicted && batchToWrite != w.retryQueue.first() {
if w.retryQueue.push(batch) {
log.Error("Retry buffer full, discarding oldest batch")
if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) {
log.Error("Callback rejected batch, discarding")
if !batchToWrite.Evicted {
w.retryQueue.pop()
}
return perror
}
} else if batchToWrite.RetryAttempts == w.writeOptions.MaxRetries() {
log.Error("Reached maximum number of retries, discarding batch")
if !batchToWrite.Evicted {
w.retryQueue.pop()
// store new batch (not taken from queue)
if !batchToWrite.Evicted && batchToWrite != w.retryQueue.first() {
if w.retryQueue.push(batch) {
log.Error("Retry buffer full, discarding oldest batch")
}
} else if batchToWrite.RetryAttempts == w.writeOptions.MaxRetries() {
log.Error("Reached maximum number of retries, discarding batch")
if !batchToWrite.Evicted {
w.retryQueue.pop()
}
}
batchToWrite.RetryAttempts++
w.retryAttempts++
log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay)
} else {
log.Errorf("Write error: %s\n", perror.Error())
}
batchToWrite.RetryAttempts++
w.retryAttempts++
log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay)
} else {
log.Errorf("Write error: %s\n", perror.Error())
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror)
}
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror)
}

w.retryDelay = w.writeOptions.RetryInterval()
Expand All @@ -211,6 +215,40 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
return nil
}

// Non-retryable errors
const (
errStringHintedHandoffNotEmpty = "hinted handoff queue not empty"
errStringPartialWrite = "partial write"
errStringPointsBeyondRP = "points beyond retention policy"
errStringUnableToParse = "unable to parse"
)

func isIgnorableError(error *http2.Error) bool {
// This "error" is an informational message about the state of the
// InfluxDB cluster.
if strings.Contains(error.Message, errStringHintedHandoffNotEmpty) {
return true
}
// Points beyond retention policy is returned when points are immediately
// discarded for being older than the retention policy. Usually this not
// a cause for concern, and we don't want to retry.
if strings.Contains(error.Message, errStringPointsBeyondRP) {
return true
}
// Other partial write errors, such as "field type conflict", are not
// correctable at this point and so the point is dropped instead of
// retrying.
if strings.Contains(error.Message, errStringPartialWrite) {
return true
}
// This error indicates an error in line protocol
// serialization, retries would not be successful.
if strings.Contains(error.Message, errStringUnableToParse) {
return true
}
return false
}

// computeRetryDelay calculates retry delay
// Retry delay is calculated as random value within the interval
// [retry_interval * exponential_base^(attempts) and retry_interval * exponential_base^(attempts+1)]
Expand Down
40 changes: 40 additions & 0 deletions internal/write/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"errors"
"fmt"
ilog "log"
ihttp "net/http"
"net/http/httptest"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -659,3 +661,41 @@ func TestConsistencyParam(t *testing.T) {

require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&org=org&precision=ns", srv.WriteURL())
}

func TestIgnoreErrors(t *testing.T) {
log.Log.SetLogLevel(log.DebugLevel)
i := 0
server := httptest.NewServer(ihttp.HandlerFunc(func(w ihttp.ResponseWriter, r *ihttp.Request) {
i++
w.WriteHeader(ihttp.StatusInternalServerError)
switch i {
case 1:
_, _ = w.Write([]byte(`{"error":" "write failed: hinted handoff queue not empty"`))
case 2:
_, _ = w.Write([]byte(`{"code":"internal error", "message":"partial write: field type conflict"}`))
case 3:
_, _ = w.Write([]byte(`{"code":"internal error", "message":"partial write: points beyond retention policy"}`))
case 4:
_, _ = w.Write([]byte(`{"code":"internal error", "message":"unable to parse 'cpu value': invalid field format"}`))
case 5:
_, _ = w.Write([]byte(`{"code":"internal error", "message":"gateway error"}`))
}
}))
defer server.Close()
//
opts := write.DefaultOptions()
ctx := context.Background()
srv := NewService("my-org", "my-bucket", http.NewService(server.URL, "", http.DefaultOptions()), opts)

b := NewBatch("1", 20)
err := srv.HandleWrite(ctx, b)
assert.NoError(t, err)
err = srv.HandleWrite(ctx, b)
assert.NoError(t, err)
err = srv.HandleWrite(ctx, b)
assert.NoError(t, err)
err = srv.HandleWrite(ctx, b)
assert.NoError(t, err)
err = srv.HandleWrite(ctx, b)
assert.Error(t, err)
}

0 comments on commit 5b9008c

Please sign in to comment.