From 96a6c732273a62c056f55c076d0afc6aeb2d7ceb Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Thu, 25 Aug 2022 11:27:06 +0200 Subject: [PATCH] fix: skip retrying on some errors --- CHANGELOG.md | 4 ++ internal/write/service.go | 92 ++++++++++++++++++++++++---------- internal/write/service_test.go | 40 +++++++++++++++ 3 files changed, 109 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e1fff5c..9594160f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ### Features - [#348](https://github.com/influxdata/influxdb-client-go/pull/348) Added `write.Options.Consitency` parameter to support InfluxDB Enterprise. +### Bug fixes +- [#349](https://github.com/influxdata/influxdb-client-go/pull/349) Skip retrying on specific write errors (mostly partial write error). + + ## 2.9.2 [2022-07-29] ### Bug fixes - [#341](https://github.com/influxdata/influxdb-client-go/pull/341) Changing logging level of messages about discarding batch to Error. diff --git a/internal/write/service.go b/internal/write/service.go index b35ce154..90c97108 100644 --- a/internal/write/service.go +++ b/internal/write/service.go @@ -164,38 +164,42 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { if batchToWrite != nil { perror := w.WriteBatch(ctx, batchToWrite) if perror != nil { - if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) { - log.Errorf("Write error: %s, batch kept for retrying\n", perror.Error()) - if perror.RetryAfter > 0 { - w.retryDelay = perror.RetryAfter * 1000 - } else { - w.retryDelay = w.computeRetryDelay(w.retryAttempts) - } - if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) { - log.Error("Callback rejected batch, discarding") - if !batchToWrite.Evicted { - w.retryQueue.pop() + if isIgnorableError(perror) { + log.Warnf("Write error: %s", perror.Error()) + } else { + if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) { + log.Errorf("Write error: %s, batch kept for retrying\n", perror.Error()) + if perror.RetryAfter > 0 { + w.retryDelay = perror.RetryAfter * 1000 + } else { + w.retryDelay = w.computeRetryDelay(w.retryAttempts) } - return perror - } - // store new batch (not taken from queue) - if !batchToWrite.Evicted && batchToWrite != w.retryQueue.first() { - if w.retryQueue.push(batch) { - log.Error("Retry buffer full, discarding oldest batch") + if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) { + log.Error("Callback rejected batch, discarding") + if !batchToWrite.Evicted { + w.retryQueue.pop() + } + return perror } - } else if batchToWrite.RetryAttempts == w.writeOptions.MaxRetries() { - log.Error("Reached maximum number of retries, discarding batch") - if !batchToWrite.Evicted { - w.retryQueue.pop() + // store new batch (not taken from queue) + if !batchToWrite.Evicted && batchToWrite != w.retryQueue.first() { + if w.retryQueue.push(batch) { + log.Error("Retry buffer full, discarding oldest batch") + } + } else if batchToWrite.RetryAttempts == w.writeOptions.MaxRetries() { + log.Error("Reached maximum number of retries, discarding batch") + if !batchToWrite.Evicted { + w.retryQueue.pop() + } } + batchToWrite.RetryAttempts++ + w.retryAttempts++ + log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay) + } else { + log.Errorf("Write error: %s\n", perror.Error()) } - batchToWrite.RetryAttempts++ - w.retryAttempts++ - log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay) - } else { - log.Errorf("Write error: %s\n", perror.Error()) + return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror) } - return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror) } w.retryDelay = w.writeOptions.RetryInterval() @@ -211,6 +215,40 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { return nil } +// Non-retryable errors +const ( + errStringHintedHandoffNotEmpty = "hinted handoff queue not empty" + errStringPartialWrite = "partial write" + errStringPointsBeyondRP = "points beyond retention policy" + errStringUnableToParse = "unable to parse" +) + +func isIgnorableError(error *http2.Error) bool { + // This "error" is an informational message about the state of the + // InfluxDB cluster. + if strings.Contains(error.Message, errStringHintedHandoffNotEmpty) { + return true + } + // Points beyond retention policy is returned when points are immediately + // discarded for being older than the retention policy. Usually this not + // a cause for concern, and we don't want to retry. + if strings.Contains(error.Message, errStringPointsBeyondRP) { + return true + } + // Other partial write errors, such as "field type conflict", are not + // correctable at this point and so the point is dropped instead of + // retrying. + if strings.Contains(error.Message, errStringPartialWrite) { + return true + } + // This error indicates an error in line protocol + // serialization, retries would not be successful. + if strings.Contains(error.Message, errStringUnableToParse) { + return true + } + return false +} + // computeRetryDelay calculates retry delay // Retry delay is calculated as random value within the interval // [retry_interval * exponential_base^(attempts) and retry_interval * exponential_base^(attempts+1)] diff --git a/internal/write/service_test.go b/internal/write/service_test.go index b27419e5..e60a8a3b 100644 --- a/internal/write/service_test.go +++ b/internal/write/service_test.go @@ -9,6 +9,8 @@ import ( "errors" "fmt" ilog "log" + ihttp "net/http" + "net/http/httptest" "runtime" "strings" "sync" @@ -659,3 +661,41 @@ func TestConsistencyParam(t *testing.T) { require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&org=org&precision=ns", srv.WriteURL()) } + +func TestIgnoreErrors(t *testing.T) { + log.Log.SetLogLevel(log.DebugLevel) + i := 0 + server := httptest.NewServer(ihttp.HandlerFunc(func(w ihttp.ResponseWriter, r *ihttp.Request) { + i++ + w.WriteHeader(ihttp.StatusInternalServerError) + switch i { + case 1: + _, _ = w.Write([]byte(`{"error":" "write failed: hinted handoff queue not empty"`)) + case 2: + _, _ = w.Write([]byte(`{"code":"internal error", "message":"partial write: field type conflict"}`)) + case 3: + _, _ = w.Write([]byte(`{"code":"internal error", "message":"partial write: points beyond retention policy"}`)) + case 4: + _, _ = w.Write([]byte(`{"code":"internal error", "message":"unable to parse 'cpu value': invalid field format"}`)) + case 5: + _, _ = w.Write([]byte(`{"code":"internal error", "message":"gateway error"}`)) + } + })) + defer server.Close() + // + opts := write.DefaultOptions() + ctx := context.Background() + srv := NewService("my-org", "my-bucket", http.NewService(server.URL, "", http.DefaultOptions()), opts) + + b := NewBatch("1", 20) + err := srv.HandleWrite(ctx, b) + assert.NoError(t, err) + err = srv.HandleWrite(ctx, b) + assert.NoError(t, err) + err = srv.HandleWrite(ctx, b) + assert.NoError(t, err) + err = srv.HandleWrite(ctx, b) + assert.NoError(t, err) + err = srv.HandleWrite(ctx, b) + assert.Error(t, err) +}