Skip to content

Commit

Permalink
[configgrpc] Use own compressors for zstd (open-telemetry#10323)
Browse files Browse the repository at this point in the history
Uses our own version of the zstd compressor for gRPC servers. The code
for it is based on the gzip compressor that comes built-in with gRPC.

Benchmarks before this PR:
```
Running tool: /usr/bin/go test -benchmem -run=^$ -bench ^BenchmarkCompressors$ go.opentelemetry.io/collector/config/configgrpc

sm_log_requestgoos: linux
goarch: amd64
pkg: go.opentelemetry.io/collector/config/configgrpc
cpu: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz
BenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_162/compressor_gzip-16         	   71594	     19066 ns/op	     615 B/op	       4 allocs/op
sm_log_requestBenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_159/compressor_zstd-16         	  151503	      8544 ns/op	     640 B/op	       6 allocs/op
sm_log_requestBenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_178/compressor_snappy-16       	 3632570	       303.8 ns/op	     304 B/op	       3 allocs/op
md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_219/compressor_gzip-16         	   68114	     16938 ns/op	     748 B/op	       4 allocs/op
md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_209/compressor_zstd-16         	  138091	      8047 ns/op	     896 B/op	       6 allocs/op
md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_260/compressor_snappy-16       	 3081198	       402.5 ns/op	     400 B/op	       3 allocs/op
lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_253/compressor_gzip-16        	   43414	     27174 ns/op	     386 B/op	       3 allocs/op
lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_216/compressor_zstd-16        	  117534	      9903 ns/op	   10112 B/op	       6 allocs/op
lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_454/compressor_snappy-16      	 1000000	      1190 ns/op	     528 B/op	       2 allocs/op
sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_203/compressor_gzip-16       	   67275	     17508 ns/op	     700 B/op	       4 allocs/op
sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_201/compressor_zstd-16       	  196862	      6137 ns/op	     848 B/op	       6 allocs/op
sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_220/compressor_snappy-16     	 3595815	       331.7 ns/op	     272 B/op	       2 allocs/op
md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_249/compressor_gzip-16       	   64105	     19104 ns/op	     844 B/op	       4 allocs/op
md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_256/compressor_zstd-16       	  169221	      6929 ns/op	    1120 B/op	       6 allocs/op
md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_279/compressor_snappy-16     	 2602239	       473.0 ns/op	     336 B/op	       2 allocs/op
lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_303/compressor_gzip-16      	   33861	     36473 ns/op	     904 B/op	       4 allocs/op
lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_258/compressor_zstd-16      	  107828	     10596 ns/op	   16832 B/op	       6 allocs/op
lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_591/compressor_snappy-16    	  725080	      1540 ns/op	     689 B/op	       2 allocs/op
sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_140/compressor_gzip-16      	   76315	     16394 ns/op	     496 B/op	       4 allocs/op
sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_137/compressor_zstd-16      	  193314	      5957 ns/op	     688 B/op	       6 allocs/op
sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_152/compressor_snappy-16    	 3558649	       345.2 ns/op	     208 B/op	       2 allocs/op
md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_194/compressor_gzip-16      	   68497	     18413 ns/op	     699 B/op	       4 allocs/op
md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_198/compressor_zstd-16      	  177841	      6520 ns/op	    1136 B/op	       6 allocs/op
md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_222/compressor_snappy-16    	 2354102	       497.4 ns/op	     272 B/op	       2 allocs/op
lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_601/compressor_gzip-16    	   21943	     54603 ns/op	    1941 B/op	       5 allocs/op
lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_559/compressor_zstd-16    	   71260	     16077 ns/op	   25312 B/op	       6 allocs/op
lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_1055/compressor_snappy-16 	  335415	      3026 ns/op	    1200 B/op	       2 allocs/op
PASS
ok  	go.opentelemetry.io/collector/config/configgrpc	37.766s
```

After this version:
```
Running tool: /usr/bin/go test -benchmem -run=^$ -bench ^BenchmarkCompressors$ go.opentelemetry.io/collector/config/configgrpc

sm_log_requestgoos: linux
goarch: amd64
pkg: go.opentelemetry.io/collector/config/configgrpc
cpu: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz
BenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_162/compressor_gzip-16         	   74952	     15710 ns/op	     603 B/op	       4 allocs/op
sm_log_requestBenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_159/compressor_zstd-16         	  156784	      6966 ns/op	     208 B/op	       2 allocs/op
sm_log_requestBenchmarkCompressors/sm_log_request/raw_bytes_160/compressed_bytes_178/compressor_snappy-16       	 2216174	       510.4 ns/op	     308 B/op	       3 allocs/op
md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_219/compressor_gzip-16         	   68095	     18569 ns/op	     736 B/op	       4 allocs/op
md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_209/compressor_zstd-16         	  150705	      8849 ns/op	     294 B/op	       2 allocs/op
md_log_requestBenchmarkCompressors/md_log_request/raw_bytes_242/compressed_bytes_260/compressor_snappy-16       	 2149710	       556.8 ns/op	     406 B/op	       3 allocs/op
lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_253/compressor_gzip-16        	   40040	     26159 ns/op	     368 B/op	       3 allocs/op
lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_216/compressor_zstd-16        	  123043	     10254 ns/op	     299 B/op	       2 allocs/op
lg_log_requestBenchmarkCompressors/lg_log_request/raw_bytes_4850/compressed_bytes_454/compressor_snappy-16      	  726780	      1457 ns/op	     533 B/op	       2 allocs/op
sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_203/compressor_gzip-16       	   64660	     18186 ns/op	     701 B/op	       4 allocs/op
sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_201/compressor_zstd-16       	  193225	      6267 ns/op	     273 B/op	       2 allocs/op
sm_trace_requestBenchmarkCompressors/sm_trace_request/raw_bytes_231/compressed_bytes_220/compressor_snappy-16     	 2925073	       418.2 ns/op	     276 B/op	       2 allocs/op
md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_249/compressor_gzip-16       	   61320	     20641 ns/op	     846 B/op	       4 allocs/op
md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_256/compressor_zstd-16       	  190965	      6440 ns/op	     321 B/op	       2 allocs/op
md_trace_requestBenchmarkCompressors/md_trace_request/raw_bytes_329/compressed_bytes_279/compressor_snappy-16     	 2051575	       656.8 ns/op	     341 B/op	       2 allocs/op
lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_303/compressor_gzip-16      	   30097	     40680 ns/op	     907 B/op	       4 allocs/op
lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_258/compressor_zstd-16      	  127027	      8437 ns/op	     363 B/op	       2 allocs/op
lg_trace_requestBenchmarkCompressors/lg_trace_request/raw_bytes_7025/compressed_bytes_591/compressor_snappy-16    	  716541	      1803 ns/op	     694 B/op	       2 allocs/op
sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_140/compressor_gzip-16      	   82287	     15054 ns/op	     496 B/op	       4 allocs/op
sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_137/compressor_zstd-16      	  230558	      5470 ns/op	     221 B/op	       2 allocs/op
sm_metric_requestBenchmarkCompressors/sm_metric_request/raw_bytes_183/compressed_bytes_152/compressor_snappy-16    	 2759403	       417.1 ns/op	     211 B/op	       2 allocs/op
md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_194/compressor_gzip-16      	   58208	     18925 ns/op	     702 B/op	       4 allocs/op
md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_198/compressor_zstd-16      	  199226	      6247 ns/op	     256 B/op	       2 allocs/op
md_metric_requestBenchmarkCompressors/md_metric_request/raw_bytes_376/compressed_bytes_222/compressor_snappy-16    	 2065202	       609.8 ns/op	     276 B/op	       2 allocs/op
lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_601/compressor_gzip-16    	   20583	     59762 ns/op	    1945 B/op	       5 allocs/op
lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_559/compressor_zstd-16    	   98254	     13152 ns/op	     728 B/op	       2 allocs/op
lg_metric_requestBenchmarkCompressors/lg_metric_request/raw_bytes_10991/compressed_bytes_1055/compressor_snappy-16 	  389401	      3976 ns/op	    1209 B/op	       2 allocs/op
PASS
ok  	go.opentelemetry.io/collector/config/configgrpc	40.394s
```

Signed-off-by: Juraci Paixão Kröhling <[email protected]>

---------

Signed-off-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
jpkrohling authored and mx-psi committed Jun 5, 2024
1 parent 0ab388b commit f5f1866
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 6 deletions.
13 changes: 13 additions & 0 deletions .chloggen/jpkroehling-configgrpc-use-own-compressors-for-zstd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'bug_fix'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: Use own compressors for zstd

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Before this change, the zstd compressor we used didn't respect the max message size.

# One or more tracking issues or pull requests related to the change
issues: [10323]
4 changes: 2 additions & 2 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/mostynb/go-grpc-compression/nonclobbering/snappy"
"github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"google.golang.org/grpc"
Expand All @@ -28,6 +27,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/configcompression"
grpcInternal "go.opentelemetry.io/collector/config/configgrpc/internal"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configopaque"
"go.opentelemetry.io/collector/config/configtelemetry"
Expand Down Expand Up @@ -426,7 +426,7 @@ func getGRPCCompressionName(compressionType configcompression.Type) (string, err
case configcompression.TypeSnappy:
return snappy.Name, nil
case configcompression.TypeZstd:
return zstd.Name, nil
return grpcInternal.ZstdName, nil
default:
return "", fmt.Errorf("unsupported compression type %q", compressionType)
}
Expand Down
4 changes: 2 additions & 2 deletions config/configgrpc/configgrpc_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
"testing"

"github.com/mostynb/go-grpc-compression/nonclobbering/snappy"
"github.com/mostynb/go-grpc-compression/nonclobbering/zstd"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/status"

"go.opentelemetry.io/collector/config/configgrpc/internal"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
Expand All @@ -27,7 +27,7 @@ func BenchmarkCompressors(b *testing.B) {

compressors := make([]encoding.Compressor, 0)
compressors = append(compressors, encoding.GetCompressor(gzip.Name))
compressors = append(compressors, encoding.GetCompressor(zstd.Name))
compressors = append(compressors, encoding.GetCompressor(internal.ZstdName))
compressors = append(compressors, encoding.GetCompressor(snappy.Name))

for _, payload := range payloads {
Expand Down
2 changes: 1 addition & 1 deletion config/configgrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module go.opentelemetry.io/collector/config/configgrpc
go 1.21.0

require (
github.com/klauspost/compress v1.17.2
github.com/mostynb/go-grpc-compression v1.2.2
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector v0.102.0
Expand Down Expand Up @@ -36,7 +37,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.1 // indirect
Expand Down
83 changes: 83 additions & 0 deletions config/configgrpc/internal/zstd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright The OpenTelemetry Authors
// Copyright 2017 gRPC authors
// SPDX-License-Identifier: Apache-2.0

package internal // import "go.opentelemetry.io/collector/config/configgrpc/internal"

import (
"errors"
"io"
"sync"

"github.com/klauspost/compress/zstd"
"google.golang.org/grpc/encoding"
)

const ZstdName = "zstd"

func init() {
encoding.RegisterCompressor(NewZstdCodec())
}

type writer struct {
*zstd.Encoder
pool *sync.Pool
}

func NewZstdCodec() encoding.Compressor {
c := &compressor{}
c.poolCompressor.New = func() any {
zw, _ := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1), zstd.WithWindowSize(512*1024))
return &writer{Encoder: zw, pool: &c.poolCompressor}
}
return c
}

func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
z := c.poolCompressor.Get().(*writer)
z.Encoder.Reset(w)
return z, nil
}

func (z *writer) Close() error {
defer z.pool.Put(z)
return z.Encoder.Close()
}

type reader struct {
*zstd.Decoder
pool *sync.Pool
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
z, inPool := c.poolDecompressor.Get().(*reader)
if !inPool {
newZ, err := zstd.NewReader(r)
if err != nil {
return nil, err
}
return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil
}
if err := z.Reset(r); err != nil {
c.poolDecompressor.Put(z)
return nil, err
}
return z, nil
}

func (z *reader) Read(p []byte) (n int, err error) {
n, err = z.Decoder.Read(p)
if errors.Is(err, io.EOF) {
z.pool.Put(z)
}
return n, err
}

func (c *compressor) Name() string {
return ZstdName
}

type compressor struct {
poolCompressor sync.Pool
poolDecompressor sync.Pool
}
41 changes: 41 additions & 0 deletions config/configgrpc/internal/zstd_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package internal

import (
"bytes"
"io"
"testing"

"github.com/stretchr/testify/require"
)

func Test_zstdCodec_CompressDecompress(t *testing.T) {
// prepare
msg := []byte("Hello world.")
compressed := &bytes.Buffer{}

// zstd header, for sanity checking
header := []byte{40, 181, 47, 253}

c := NewZstdCodec()
cWriter, err := c.Compress(compressed)
require.NoError(t, err)
require.NotNil(t, cWriter)

_, err = cWriter.Write(msg)
require.NoError(t, err)
cWriter.Close()

cReader, err := c.Decompress(compressed)
require.NoError(t, err)
require.NotNil(t, cReader)

uncompressed, err := io.ReadAll(cReader)
require.NoError(t, err)
require.Equal(t, msg, uncompressed)

// test header
require.Equal(t, header, compressed.Bytes()[:4])
}
3 changes: 2 additions & 1 deletion receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,8 @@ func TestGRPCMaxRecvSize(t *testing.T) {
require.NoError(t, err)

td := testdata.GenerateTraces(50000)
require.Error(t, exportTraces(cc, td))
err = exportTraces(cc, td)
require.Error(t, err)
assert.NoError(t, cc.Close())
require.NoError(t, recv.Shutdown(context.Background()))

Expand Down

0 comments on commit f5f1866

Please sign in to comment.