Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: skip retrying on some errors #349

Merged
merged 1 commit into from
Aug 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
### Features
- [#348](https://github.com/influxdata/influxdb-client-go/pull/348) Added `write.Options.Consitency` parameter to support InfluxDB Enterprise.

### Bug fixes
- [#349](https://github.com/influxdata/influxdb-client-go/pull/349) Skip retrying on specific write errors (mostly partial write error).


## 2.9.2 [2022-07-29]
### Bug fixes
- [#341](https://github.com/influxdata/influxdb-client-go/pull/341) Changing logging level of messages about discarding batch to Error.
Expand Down
92 changes: 65 additions & 27 deletions internal/write/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,38 +164,42 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
if batchToWrite != nil {
perror := w.WriteBatch(ctx, batchToWrite)
if perror != nil {
if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) {
log.Errorf("Write error: %s, batch kept for retrying\n", perror.Error())
if perror.RetryAfter > 0 {
w.retryDelay = perror.RetryAfter * 1000
} else {
w.retryDelay = w.computeRetryDelay(w.retryAttempts)
}
if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) {
log.Error("Callback rejected batch, discarding")
if !batchToWrite.Evicted {
w.retryQueue.pop()
if isIgnorableError(perror) {
log.Warnf("Write error: %s", perror.Error())
} else {
if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) {
log.Errorf("Write error: %s, batch kept for retrying\n", perror.Error())
if perror.RetryAfter > 0 {
w.retryDelay = perror.RetryAfter * 1000
} else {
w.retryDelay = w.computeRetryDelay(w.retryAttempts)
}
return perror
}
// store new batch (not taken from queue)
if !batchToWrite.Evicted && batchToWrite != w.retryQueue.first() {
if w.retryQueue.push(batch) {
log.Error("Retry buffer full, discarding oldest batch")
if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) {
log.Error("Callback rejected batch, discarding")
if !batchToWrite.Evicted {
w.retryQueue.pop()
}
return perror
}
} else if batchToWrite.RetryAttempts == w.writeOptions.MaxRetries() {
log.Error("Reached maximum number of retries, discarding batch")
if !batchToWrite.Evicted {
w.retryQueue.pop()
// store new batch (not taken from queue)
if !batchToWrite.Evicted && batchToWrite != w.retryQueue.first() {
if w.retryQueue.push(batch) {
log.Error("Retry buffer full, discarding oldest batch")
}
} else if batchToWrite.RetryAttempts == w.writeOptions.MaxRetries() {
log.Error("Reached maximum number of retries, discarding batch")
if !batchToWrite.Evicted {
w.retryQueue.pop()
}
}
batchToWrite.RetryAttempts++
w.retryAttempts++
log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay)
} else {
log.Errorf("Write error: %s\n", perror.Error())
}
batchToWrite.RetryAttempts++
w.retryAttempts++
log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay)
} else {
log.Errorf("Write error: %s\n", perror.Error())
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror)
}
return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror)
}

w.retryDelay = w.writeOptions.RetryInterval()
Expand All @@ -211,6 +215,40 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
return nil
}

// Non-retryable errors
const (
errStringHintedHandoffNotEmpty = "hinted handoff queue not empty"
errStringPartialWrite = "partial write"
errStringPointsBeyondRP = "points beyond retention policy"
errStringUnableToParse = "unable to parse"
)

func isIgnorableError(error *http2.Error) bool {
// This "error" is an informational message about the state of the
// InfluxDB cluster.
if strings.Contains(error.Message, errStringHintedHandoffNotEmpty) {
return true
}
// Points beyond retention policy is returned when points are immediately
// discarded for being older than the retention policy. Usually this not
// a cause for concern, and we don't want to retry.
if strings.Contains(error.Message, errStringPointsBeyondRP) {
return true
}
// Other partial write errors, such as "field type conflict", are not
// correctable at this point and so the point is dropped instead of
// retrying.
if strings.Contains(error.Message, errStringPartialWrite) {
return true
}
// This error indicates an error in line protocol
// serialization, retries would not be successful.
if strings.Contains(error.Message, errStringUnableToParse) {
return true
}
return false
}

// computeRetryDelay calculates retry delay
// Retry delay is calculated as random value within the interval
// [retry_interval * exponential_base^(attempts) and retry_interval * exponential_base^(attempts+1)]
Expand Down
40 changes: 40 additions & 0 deletions internal/write/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"errors"
"fmt"
ilog "log"
ihttp "net/http"
"net/http/httptest"
"runtime"
"strings"
"sync"
Expand Down Expand Up @@ -659,3 +661,41 @@ func TestConsistencyParam(t *testing.T) {

require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&org=org&precision=ns", srv.WriteURL())
}

func TestIgnoreErrors(t *testing.T) {
log.Log.SetLogLevel(log.DebugLevel)
i := 0
server := httptest.NewServer(ihttp.HandlerFunc(func(w ihttp.ResponseWriter, r *ihttp.Request) {
i++
w.WriteHeader(ihttp.StatusInternalServerError)
switch i {
case 1:
_, _ = w.Write([]byte(`{"error":" "write failed: hinted handoff queue not empty"`))
case 2:
_, _ = w.Write([]byte(`{"code":"internal error", "message":"partial write: field type conflict"}`))
case 3:
_, _ = w.Write([]byte(`{"code":"internal error", "message":"partial write: points beyond retention policy"}`))
case 4:
_, _ = w.Write([]byte(`{"code":"internal error", "message":"unable to parse 'cpu value': invalid field format"}`))
case 5:
_, _ = w.Write([]byte(`{"code":"internal error", "message":"gateway error"}`))
}
}))
defer server.Close()
//
opts := write.DefaultOptions()
ctx := context.Background()
srv := NewService("my-org", "my-bucket", http.NewService(server.URL, "", http.DefaultOptions()), opts)

b := NewBatch("1", 20)
err := srv.HandleWrite(ctx, b)
assert.NoError(t, err)
err = srv.HandleWrite(ctx, b)
assert.NoError(t, err)
err = srv.HandleWrite(ctx, b)
assert.NoError(t, err)
err = srv.HandleWrite(ctx, b)
assert.NoError(t, err)
err = srv.HandleWrite(ctx, b)
assert.Error(t, err)
}