diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index c5199dea0306..88a3dff5fd0e 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -132,8 +132,6 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (err error) { gzipBuffer := bytes.NewBuffer(make([]byte, 0, c.config.MaxContentLengthLogs)) gzipWriter.Reset(gzipBuffer) - defer gzipWriter.Close() - // Callback when each batch is to be sent. send := func(ctx context.Context, buf *bytes.Buffer) (err error) { shouldCompress := buf.Len() >= minCompressionLen && !c.config.DisableCompression @@ -146,7 +144,7 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (err error) { return fmt.Errorf("failed copying buffer to gzip writer: %v", err) } - if err = gzipWriter.Flush(); err != nil { + if err = gzipWriter.Close(); err != nil { return fmt.Errorf("failed flushing compressed data to gzip writer: %v", err) } diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index 25d9be1e29ae..b8949c14ea5e 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -14,6 +14,7 @@ package splunkhecexporter import ( + "bytes" "compress/gzip" "context" "errors" @@ -128,7 +129,7 @@ func createLogData(numResources int, numLibraries int, numRecords int) pdata.Log type CapturingData struct { testing *testing.T - receivedRequest chan string + receivedRequest chan []byte statusCode int checkCompression bool } @@ -146,7 +147,7 @@ func (c *CapturingData) ServeHTTP(w http.ResponseWriter, r *http.Request) { panic(err) } go func() { - c.receivedRequest <- string(body) + c.receivedRequest <- body }() w.WriteHeader(c.statusCode) } @@ -163,7 +164,7 @@ func runMetricsExport(disableCompression bool, numberOfDataPoints int, t *testin cfg.DisableCompression = disableCompression cfg.Token = "1234-1234" - receivedRequest := make(chan string) + receivedRequest := make(chan []byte) capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !cfg.DisableCompression} s := &http.Server{ Handler: &capture, @@ -184,7 +185,7 @@ func runMetricsExport(disableCompression bool, numberOfDataPoints int, t *testin assert.NoError(t, err) select { case request := <-receivedRequest: - return request, nil + return string(request), nil case <-time.After(1 * time.Second): return "", errors.New("timeout") } @@ -202,7 +203,7 @@ func runTraceExport(disableCompression bool, numberOfTraces int, t *testing.T) ( cfg.DisableCompression = disableCompression cfg.Token = "1234-1234" - receivedRequest := make(chan string) + receivedRequest := make(chan []byte) capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !cfg.DisableCompression} s := &http.Server{ Handler: &capture, @@ -223,13 +224,13 @@ func runTraceExport(disableCompression bool, numberOfTraces int, t *testing.T) ( assert.NoError(t, err) select { case request := <-receivedRequest: - return request, nil + return string(request), nil case <-time.After(1 * time.Second): return "", errors.New("timeout") } } -func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([]string, error) { +func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([][]byte, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { panic(err) @@ -238,7 +239,7 @@ func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([]string, error) { cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector" cfg.Token = "1234-1234" - receivedRequest := make(chan string) + receivedRequest := make(chan []byte) capture := CapturingData{testing: t, receivedRequest: receivedRequest, statusCode: 200, checkCompression: !cfg.DisableCompression} s := &http.Server{ Handler: &capture, @@ -256,7 +257,7 @@ func runLogExport(cfg *Config, ld pdata.Logs, t *testing.T) ([]string, error) { err = exporter.ConsumeLogs(context.Background(), ld) assert.NoError(t, err) - var requests []string + var requests [][]byte for { select { case request := <-receivedRequest: @@ -286,8 +287,13 @@ func TestReceiveLogs(t *testing.T) { type wantType struct { batches []string numBatches int + compressed bool } + // The test cases depend on the constant minCompressionLen = 1500. + // If the constant changed, the test cases with want.compressed=true must be updated. + require.Equal(t, minCompressionLen, 1500) + tests := []struct { name string conf *Config @@ -348,6 +354,62 @@ func TestReceiveLogs(t *testing.T) { numBatches: 2, }, }, + { + name: "1 compressed batch of 1837 bytes, make sure the event size is more than minCompressionLen=1500 to trigger compression", + logs: createLogData(1, 1, 10), + conf: func() *Config { + return NewFactory().CreateDefaultConfig().(*Config) + }(), + want: wantType{ + batches: []string{ + `{"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.001,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.002,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.003,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.004,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.005,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.006,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.007,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.008,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.009,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n", + }, + numBatches: 1, + compressed: true, + }, + }, + { + name: "2 compressed batches - 1652 bytes each, make sure the log size is more than minCompressionLen=1500 to trigger compression", + logs: createLogData(1, 1, 18), // comes to HEC events payload size - 1837 bytes + conf: func() *Config { + cfg := NewFactory().CreateDefaultConfig().(*Config) + cfg.MaxContentLengthLogs = 1700 + return cfg + }(), + want: wantType{ + batches: []string{ + `{"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.001,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.002,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.003,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.004,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.005,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.006,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.007,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.008,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n", + `{"time":0.009,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.01,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.011,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.012,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.013,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.014,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.015,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.016,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n" + + `{"time":0.017,"host":"myhost","source":"myapp","sourcetype":"myapp-type","index":"myindex","event":"mylog","fields":{"custom":"custom","host.name":"myhost","service.name":"myapp"}}` + "\n\r\n\r\n", + }, + numBatches: 2, + compressed: true, + }, + }, } for _, test := range tests { @@ -359,7 +421,11 @@ func TestReceiveLogs(t *testing.T) { for i := 0; i < test.want.numBatches; i++ { require.NotZero(t, got[i]) - assert.Equal(t, test.want.batches[i], got[i]) + if test.want.compressed { + validateCompressedEqual(t, test.want.batches[i], got[i]) + } else { + assert.Equal(t, test.want.batches[i], string(got[i])) + } } }) @@ -391,7 +457,7 @@ func TestReceiveMetricsWithCompression(t *testing.T) { } func TestErrorReceived(t *testing.T) { - receivedRequest := make(chan string) + receivedRequest := make(chan []byte) capture := CapturingData{receivedRequest: receivedRequest, statusCode: 500} listener, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { @@ -720,3 +786,15 @@ func TestSubLogs(t *testing.T) { // The name of the sole log record should be 1_1_2. assert.Equal(t, "1_1_2", got.ResourceLogs().At(0).InstrumentationLibraryLogs().At(0).Logs().At(0).Name()) } + +// validateCompressedEqual validates that GZipped `got` contains `expected` string +func validateCompressedEqual(t *testing.T, expected string, got []byte) { + z, err := gzip.NewReader(bytes.NewReader(got)) + require.NoError(t, err) + defer z.Close() + + p, err := ioutil.ReadAll(z) + require.NoError(t, err) + + assert.Equal(t, expected, string(p)) +}