diff --git a/CHANGELOG.md b/CHANGELOG.md index 075e594a..573e6e32 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 2.3.0 [in progress] +### Documentation +1. [#241](https://github.com/influxdata/influxdb-client-go/pull/241) Documentation improvements: + - [Custom server API example](https://pkg.go.dev/github.com/influxdata/influxdb-client-go/v2#example-Client-CustomServerAPICall) now shows how to create DBRP mapping + - Improved documentation about concurrency + ## 2.2.3 [2021-04-01] ### Bug fixes 1. [#236](https://github.com/influxdata/influxdb-client-go/pull/236) Setting MaxRetries to zero value disables retry strategy. diff --git a/README.md b/README.md index fcf96530..9deaf467 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ This repository contains the reference Go client for InfluxDB 2. - [Basic Example](#basic-example) - [Writes in Detail](#writes) - [Queries in Detail](#queries) + - [Concurrency](#concurrency) - [InfluxDB 1.8 API compatibility](#influxdb-18-api-compatibility) - [Contributing](#contributing) - [License](#license) @@ -377,7 +378,68 @@ func main() { client.Close() } ``` +### Concurrency +InfluxDB Go Client can be used in a concurrent environment. All its functions are thread safe. +The best practise is to use a single `Client` instance per server URL. This ensures optimized resources usage, +most importantly reusing HTTP connections. + +Client ensures that there is a single instance of each server API sub-client for the specific area. E.g. a single `WriteAPI` instance for each org/bucket pair, +a single `QueryAPI` for each org. + +Such a single API sub-client instance can be used concurrently: +``` +package main + +import ( + "math/rand" + "sync" + "time" + + influxdb2 "github.com/influxdata/influxdb-client-go" + "github.com/influxdata/influxdb-client-go/v2/api/write" +) + +func main() { + // Create client + client := influxdb2.NewClient("http://localhost:8086", "my-token") + // Ensure closing the client + defer client.Close() + + // Get write client + writeApi := client.WriteAPI("my-org", "my-bucket") + + // Create channel for points feeding + pointsCh := make(chan *write.Point, 200) + + threads := 5 + + var wg sync.WaitGroup + go func(points int) { + for i := 0; i < points; i++ { + p := influxdb2.NewPoint("meas", + map[string]string{"tag": "tagvalue"}, + map[string]interface{}{"val1": rand.Int63n(1000), "val2": rand.Float64()*100.0 - 50.0}, + time.Now()) + pointsCh <- p + } + close(pointsCh) + }(1000000) + + // Launch write routines + for t := 0; t < threads; t++ { + wg.Add(1) + go func() { + for p := range pointsCh { + writeApi.WritePoint(p) + } + wg.Done() + }() + } + // Wait for writes complete + wg.Wait() +} +``` ## InfluxDB 1.8 API compatibility diff --git a/api/write.go b/api/write.go index e5be0ab1..00aeaac9 100644 --- a/api/write.go +++ b/api/write.go @@ -16,6 +16,8 @@ import ( ) // WriteAPI is Write client interface with non-blocking methods for writing time series data asynchronously in batches into an InfluxDB server. +// WriteAPI is allowed to be used concurrently. +// When using multiple goroutines for writing, use a single WriteAPI instance in all goroutines. type WriteAPI interface { // WriteRecord writes asynchronously line protocol record into bucket. // WriteRecord adds record into the buffer which is sent on the background when it reaches the batch size. diff --git a/api/writeAPIBlocking.go b/api/writeAPIBlocking.go index 0cf5f0db..e285911e 100644 --- a/api/writeAPIBlocking.go +++ b/api/writeAPIBlocking.go @@ -15,6 +15,10 @@ import ( // WriteAPIBlocking offers blocking methods for writing time series data synchronously into an InfluxDB server. // It doesn't implicitly create batches of points. It is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches. +// +// WriteAPIBlocking is allowed to be used concurrently. +// When using multiple goroutines for writing, use a single WriteAPIBlocking instance in all goroutines. +// // To add implicit batching, use a wrapper, such as: // type writer struct { // batch []*write.Point diff --git a/client.go b/client.go index 6df54eb1..86505dbd 100644 --- a/client.go +++ b/client.go @@ -42,11 +42,14 @@ type Client interface { ServerURL() string // HTTPService returns underlying HTTP service object used by client HTTPService() http.Service - // WriteAPI returns the asynchronous, non-blocking, Write client + // WriteAPI returns the asynchronous, non-blocking, Write client. + // Ensures using a single WriteAPI instance for each org/bucket pair. WriteAPI(org, bucket string) api.WriteAPI - // WriteAPIBlocking returns the synchronous, blocking, Write client + // WriteAPIBlocking returns the synchronous, blocking, Write client. + // Ensures using a single WriteAPIBlocking instance for each org/bucket pair. WriteAPIBlocking(org, bucket string) api.WriteAPIBlocking - // QueryAPI returns Query client + // QueryAPI returns Query client. + // Ensures using a single QueryAPI instance each org. QueryAPI(org string) api.QueryAPI // AuthorizationsAPI returns Authorizations API client. AuthorizationsAPI() api.AuthorizationsAPI