Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: efficient write blocking synchronization #354

Merged
merged 3 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
## [unreleased]
### Bug fixes
- [#354](https://github.com/influxdata/influxdb-client-go/pull/354) More efficient synchronization in WriteAPIBlocking.


## 2.10.0 [2022-08-25]
### Features
Expand All @@ -17,7 +20,7 @@
- [#344](https://github.com/influxdata/influxdb-client-go/pull/344) `WriteAPI.Flush()` writes also batches from the retry queue.

### Test
- [#345](https://github.com/influxdata/influxdb-client-go/pul/345) Added makefile for simplifing testing from command line.
- [#345](https://github.com/influxdata/influxdb-client-go/pul/345) Added makefile for simplifying testing from command line.

## 2.9.1 [2022-06-24]
### Bug fixes
Expand Down
23 changes: 12 additions & 11 deletions api/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,18 @@ type WriteAPIImpl struct {
service *iwrite.Service
writeBuffer []string

errCh chan error
writeCh chan *iwrite.Batch
bufferCh chan string
writeStop chan struct{}
bufferStop chan struct{}
bufferFlush chan struct{}
doneCh chan struct{}
bufferInfoCh chan writeBuffInfoReq
writeInfoCh chan writeBuffInfoReq
writeOptions *write.Options
closingMu *sync.Mutex
errCh chan error
writeCh chan *iwrite.Batch
bufferCh chan string
writeStop chan struct{}
bufferStop chan struct{}
bufferFlush chan struct{}
doneCh chan struct{}
bufferInfoCh chan writeBuffInfoReq
writeInfoCh chan writeBuffInfoReq
writeOptions *write.Options
closingMu *sync.Mutex
// more appropriate Bool type from sync/atomic cannot be used because it is available since go 1.19
isErrChReader int32
}

Expand Down
48 changes: 29 additions & 19 deletions api/writeAPIBlocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"strings"
"sync"
"sync/atomic"

http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
"github.com/influxdata/influxdb-client-go/v2/api/write"
Expand Down Expand Up @@ -51,9 +52,10 @@ type WriteAPIBlocking interface {
type writeAPIBlocking struct {
service *iwrite.Service
writeOptions *write.Options
batching bool
batch []string
mu sync.Mutex
// more appropriate Bool type from sync/atomic cannot be used because it is available since go 1.19
batching int32
batch []string
mu sync.Mutex
}

// NewWriteAPIBlocking creates new instance of blocking write client for writing data to bucket belonging to org
Expand All @@ -69,28 +71,26 @@ func NewWriteAPIBlockingWithBatching(org string, bucket string, service http2.Se
}

func (w *writeAPIBlocking) EnableBatching() {
w.mu.Lock()
defer w.mu.Unlock()
if !w.batching {
w.batching = true
if atomic.LoadInt32(&w.batching) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't be easier to enforce or document that EnableBatching can be called before any write is performed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be easier to document it. I don't think it will be easier to enforce it. The way how it is safe.

w.mu.Lock()
w.batching = 1
w.batch = make([]string, 0, w.writeOptions.BatchSize())
w.mu.Unlock()
}
}

func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
w.mu.Lock()
defer w.mu.Unlock()
body := line
if w.batching {
if atomic.LoadInt32(&w.batching) > 0 {
w.mu.Lock()
defer w.mu.Unlock()
w.batch = append(w.batch, line)
if len(w.batch) == int(w.writeOptions.BatchSize()) {
body = strings.Join(w.batch, "\n")
w.batch = w.batch[:0]
return w.flush(ctx)
} else {
return nil
}
}
err := w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()))
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime()))
if err != nil {
return err
}
Expand All @@ -112,13 +112,23 @@ func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point
return w.write(ctx, line)
}

func (w *writeAPIBlocking) Flush(ctx context.Context) error {
w.mu.Lock()
defer w.mu.Unlock()
if w.batching && len(w.batch) > 0 {
// flush is unsychronized helper for creating and sending batch
// Must be called from synchronized block
func (w *writeAPIBlocking) flush(ctx context.Context) error {
if len(w.batch) > 0 {
body := strings.Join(w.batch, "\n")
w.batch = w.batch[:0]
return w.service.WriteBatch(ctx, iwrite.NewBatch(body, w.writeOptions.MaxRetryTime()))
b := iwrite.NewBatch(body, w.writeOptions.MaxRetryTime())
return w.service.WriteBatch(ctx, b)
}
return nil
}

func (w *writeAPIBlocking) Flush(ctx context.Context) error {
if atomic.LoadInt32(&w.batching) > 0 {
w.mu.Lock()
defer w.mu.Unlock()
return w.flush(ctx)
}
return nil
}
6 changes: 5 additions & 1 deletion internal/test/http_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ func (t *HTTPService) DoHTTPRequestWithResponse(_ *http.Request, _ http2.Request
// DoPostRequest reads http request, validates URL and stores data in the request
func (t *HTTPService) DoPostRequest(_ context.Context, url string, body io.Reader, requestCallback http2.RequestCallback, _ http2.ResponseCallback) *http2.Error {
req, err := http.NewRequest("POST", url, nil)
t.lock.Lock()
t.requests++
t.lock.Unlock()
if err != nil {
return http2.NewError(err)
}
Expand All @@ -134,7 +136,9 @@ func (t *HTTPService) DoPostRequest(_ context.Context, url string, body io.Reade
body, _ = gzip.NewReader(body)
t.wasGzip = true
}
assert.Equal(t.t, fmt.Sprintf("%swrite?bucket=my-bucket&org=my-org&precision=ns", t.serverURL), url)
if t.t != nil {
assert.Equal(t.t, fmt.Sprintf("%swrite?bucket=my-bucket&org=my-org&precision=ns", t.serverURL), url)
}

if t.ReplyError() != nil {
return t.ReplyError()
Expand Down