diff --git a/.chloggen/batch-exporter-helper.yaml b/.chloggen/batch-exporter-helper.yaml index 613cfbac5b2..c52c432ea37 100755 --- a/.chloggen/batch-exporter-helper.yaml +++ b/.chloggen/batch-exporter-helper.yaml @@ -15,7 +15,10 @@ issues: [8122] # (Optional) One or more lines of additional information to render under the primary note. # These lines will be padded with 2 spaces and then inserted directly into the document. # Use pipe (|) for multiline entries. -subtext: +subtext: | + `WithBatcher` can be used with both regular exporter helper (e.g. NewTracesExporter) and the request-based exporter + helper (e.g. NewTracesRequestExporter). The request-based exporter helpers require `WithRequestBatchFuncs` option + providing batching functions. # Optional: The change log or logs in which this entry should be included. # e.g. '[user]' or '[user, api]' diff --git a/exporter/exporterhelper/batch_sender.go b/exporter/exporterhelper/batch_sender.go index 1a6f08ced3f..7e4dc1d754d 100644 --- a/exporter/exporterhelper/batch_sender.go +++ b/exporter/exporterhelper/batch_sender.go @@ -45,14 +45,17 @@ type batchSender struct { } // newBatchSender returns a new batch consumer component. -func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings) *batchSender { +func newBatchSender(cfg exporterbatcher.Config, set exporter.CreateSettings, + mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) *batchSender { bs := &batchSender{ - activeBatch: newEmptyBatch(), - cfg: cfg, - logger: set.Logger, - shutdownCh: make(chan struct{}), - stopped: &atomic.Bool{}, - resetTimerCh: make(chan struct{}), + activeBatch: newEmptyBatch(), + cfg: cfg, + logger: set.Logger, + mergeFunc: mf, + mergeSplitFunc: msf, + shutdownCh: make(chan struct{}), + stopped: &atomic.Bool{}, + resetTimerCh: make(chan struct{}), } return bs } diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index b81bfdc4efa..347ffa2bd01 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -148,13 +148,20 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } // BatcherOption apply changes to batcher sender. -type BatcherOption func(*batchSender) +type BatcherOption func(*batchSender) error // WithRequestBatchFuncs sets the functions for merging and splitting batches for an exporter built for custom request types. func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) BatcherOption { - return func(bs *batchSender) { + return func(bs *batchSender) error { + if mf == nil || msf == nil { + return fmt.Errorf("WithRequestBatchFuncs must be provided with non-nil functions") + } + if bs.mergeFunc != nil || bs.mergeSplitFunc != nil { + return fmt.Errorf("WithRequestBatchFuncs can be used only once with request-based exporters") + } bs.mergeFunc = mf bs.mergeSplitFunc = msf + return nil } } @@ -166,7 +173,7 @@ func WithRequestBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf expor // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func WithBatcher(cfg exporterbatcher.Config, opts ...BatcherOption) Option { return func(o *baseExporter) error { - bs := newBatchSender(cfg, o.set) + bs := newBatchSender(cfg, o.set, o.batchMergeFunc, o.batchMergeSplitfunc) for _, opt := range opts { opt(bs) } @@ -196,14 +203,28 @@ func withUnmarshaler(unmarshaler exporterqueue.Unmarshaler[Request]) Option { } } +// withBatchFuncs is used to set the functions for merging and splitting batches for OLTP-based exporters. +// It must be provided as the first option when creating a new exporter helper. +func withBatchFuncs(mf exporterbatcher.BatchMergeFunc[Request], msf exporterbatcher.BatchMergeSplitFunc[Request]) Option { + return func(o *baseExporter) error { + o.batchMergeFunc = mf + o.batchMergeSplitfunc = msf + return nil + } +} + // baseExporter contains common fields between different exporter types. type baseExporter struct { component.StartFunc component.ShutdownFunc + signal component.DataType + + batchMergeFunc exporterbatcher.BatchMergeFunc[Request] + batchMergeSplitfunc exporterbatcher.BatchMergeSplitFunc[Request] + marshaler exporterqueue.Marshaler[Request] unmarshaler exporterqueue.Unmarshaler[Request] - signal component.DataType set exporter.CreateSettings obsrep *ObsReport diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 30a73820f6d..c0585d8dc48 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -26,6 +26,8 @@ type logsRequest struct { pusher consumer.ConsumeLogsFunc } +var _ Request = (*logsRequest)(nil) + func newLogsRequest(ld plog.Logs, pusher consumer.ConsumeLogsFunc) Request { return &logsRequest{ ld: ld, @@ -82,7 +84,10 @@ func NewLogsExporter( if pusher == nil { return nil, errNilPushLogsData } - logsOpts := []Option{withMarshaler(logsRequestMarshaler), withUnmarshaler(newLogsRequestUnmarshalerFunc(pusher))} + logsOpts := []Option{ + withMarshaler(logsRequestMarshaler), withUnmarshaler(newLogsRequestUnmarshalerFunc(pusher)), + withBatchFuncs(mergeLogs, mergeSplitLogs), + } return NewLogsRequestExporter(ctx, set, requestFromLogs(pusher), append(logsOpts, options...)...) } diff --git a/exporter/exporterhelper/logs_batch.go b/exporter/exporterhelper/logs_batch.go new file mode 100644 index 00000000000..e5ba39594ed --- /dev/null +++ b/exporter/exporterhelper/logs_batch.go @@ -0,0 +1,136 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper + +import ( + "context" + "errors" + + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/pdata/plog" +) + +// mergeLogs merges two logs requests into one. +func mergeLogs(_ context.Context, r1 Request, r2 Request) (Request, error) { + lr1, ok1 := r1.(*logsRequest) + lr2, ok2 := r2.(*logsRequest) + if !ok1 || !ok2 { + return nil, errors.New("invalid input type") + } + lr2.ld.ResourceLogs().MoveAndAppendTo(lr1.ld.ResourceLogs()) + return lr1, nil +} + +// mergeSplitLogs splits and/or merges the logs into multiple requests based on the MaxSizeConfig. +func mergeSplitLogs(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 Request, r2 Request) ([]Request, error) { + var ( + res []Request + destReq *logsRequest + capacityLeft = cfg.MaxSizeItems + ) + for _, req := range []Request{r1, r2} { + if req == nil { + continue + } + srcReq, ok := req.(*logsRequest) + if !ok { + return nil, errors.New("invalid input type") + } + if srcReq.ld.LogRecordCount() <= capacityLeft { + if destReq == nil { + destReq = srcReq + } else { + srcReq.ld.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs()) + } + capacityLeft = cfg.MaxSizeItems - destReq.ld.LogRecordCount() + continue + } + + for { + extractedLogs := extractLogs(srcReq.ld, capacityLeft) + if extractedLogs.LogRecordCount() == 0 { + break + } + capacityLeft -= extractedLogs.LogRecordCount() + if destReq == nil { + destReq = &logsRequest{ld: extractedLogs, pusher: srcReq.pusher} + } else { + extractedLogs.ResourceLogs().MoveAndAppendTo(destReq.ld.ResourceLogs()) + } + if capacityLeft == 0 { + res = append(res, destReq) + destReq = nil + capacityLeft = cfg.MaxSizeItems + } + } + } + + if destReq != nil { + res = append(res, destReq) + } + return res, nil +} + +// extractLogs extracts logs from the input logs and returns a new logs with the specified number of log records. +func extractLogs(srcLogs plog.Logs, count int) plog.Logs { + destLogs := plog.NewLogs() + srcLogs.ResourceLogs().RemoveIf(func(srcRL plog.ResourceLogs) bool { + if count <= 0 { + return false + } + needToExtract := resourceLogsCount(srcRL) > count + if needToExtract { + srcRL = extractResourceLogs(srcRL, count) + } + count -= resourceLogsCount(srcRL) + srcRL.MoveTo(destLogs.ResourceLogs().AppendEmpty()) + return !needToExtract + }) + return destLogs +} + +// extractResourceLogs extracts resource logs and returns a new resource logs with the specified number of log records. +func extractResourceLogs(srcRL plog.ResourceLogs, count int) plog.ResourceLogs { + destRL := plog.NewResourceLogs() + destRL.SetSchemaUrl(srcRL.SchemaUrl()) + srcRL.Resource().CopyTo(destRL.Resource()) + srcRL.ScopeLogs().RemoveIf(func(srcSL plog.ScopeLogs) bool { + if count <= 0 { + return false + } + needToExtract := srcSL.LogRecords().Len() > count + if needToExtract { + srcSL = extractScopeLogs(srcSL, count) + } + count -= srcSL.LogRecords().Len() + srcSL.MoveTo(destRL.ScopeLogs().AppendEmpty()) + return !needToExtract + }) + return destRL +} + +// extractScopeLogs extracts scope logs and returns a new scope logs with the specified number of log records. +func extractScopeLogs(srcSL plog.ScopeLogs, count int) plog.ScopeLogs { + destSL := plog.NewScopeLogs() + destSL.SetSchemaUrl(srcSL.SchemaUrl()) + srcSL.Scope().CopyTo(destSL.Scope()) + srcSL.LogRecords().RemoveIf(func(srcLR plog.LogRecord) bool { + if count <= 0 { + return false + } + srcLR.MoveTo(destSL.LogRecords().AppendEmpty()) + count-- + return true + }) + return destSL +} + +// resourceLogsCount calculates the total number of log records in the plog.ResourceLogs. +func resourceLogsCount(rl plog.ResourceLogs) int { + count := 0 + for k := 0; k < rl.ScopeLogs().Len(); k++ { + count += rl.ScopeLogs().At(k).LogRecords().Len() + } + return count +} diff --git a/exporter/exporterhelper/logs_batch_test.go b/exporter/exporterhelper/logs_batch_test.go new file mode 100644 index 00000000000..0fc92aacbc8 --- /dev/null +++ b/exporter/exporterhelper/logs_batch_test.go @@ -0,0 +1,159 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper + +import ( + "context" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/pdata/plog" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/internal/testdata" +) + +func TestMergeLogs(t *testing.T) { + lr1 := &logsRequest{ld: testdata.GenerateLogs(2)} + lr2 := &logsRequest{ld: testdata.GenerateLogs(3)} + res, err := mergeLogs(context.Background(), lr1, lr2) + assert.Nil(t, err) + assert.Equal(t, 5, res.(*logsRequest).ld.LogRecordCount()) +} + +func TestMergeLogsInvalidInput(t *testing.T) { + lr1 := &tracesRequest{td: testdata.GenerateTraces(2)} + lr2 := &logsRequest{ld: testdata.GenerateLogs(3)} + _, err := mergeLogs(context.Background(), lr1, lr2) + assert.Error(t, err) +} + +func TestMergeSplitLogs(t *testing.T) { + tests := []struct { + name string + cfg exporterbatcher.MaxSizeConfig + lr1 Request + lr2 Request + expected []*logsRequest + }{ + { + name: "both_requests_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + lr1: &logsRequest{ld: plog.NewLogs()}, + lr2: &logsRequest{ld: plog.NewLogs()}, + expected: []*logsRequest{{ld: plog.NewLogs()}}, + }, + { + name: "both_requests_nil", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + lr1: nil, + lr2: nil, + expected: []*logsRequest{}, + }, + { + name: "first_request_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + lr1: &logsRequest{ld: plog.NewLogs()}, + lr2: &logsRequest{ld: testdata.GenerateLogs(5)}, + expected: []*logsRequest{{ld: testdata.GenerateLogs(5)}}, + }, + { + name: "first_requests_nil", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + lr1: nil, + lr2: &logsRequest{ld: testdata.GenerateLogs(5)}, + expected: []*logsRequest{{ld: testdata.GenerateLogs(5)}}, + }, + { + name: "first_nil_second_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + lr1: nil, + lr2: &logsRequest{ld: plog.NewLogs()}, + expected: []*logsRequest{{ld: plog.NewLogs()}}, + }, + { + name: "merge_only", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + lr1: &logsRequest{ld: testdata.GenerateLogs(4)}, + lr2: &logsRequest{ld: testdata.GenerateLogs(6)}, + expected: []*logsRequest{{ld: func() plog.Logs { + logs := testdata.GenerateLogs(4) + testdata.GenerateLogs(6).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs()) + return logs + }()}}, + }, + { + name: "split_only", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, + lr1: nil, + lr2: &logsRequest{ld: testdata.GenerateLogs(10)}, + expected: []*logsRequest{ + {ld: testdata.GenerateLogs(4)}, + {ld: testdata.GenerateLogs(4)}, + {ld: testdata.GenerateLogs(2)}, + }, + }, + { + name: "merge_and_split", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + lr1: &logsRequest{ld: testdata.GenerateLogs(8)}, + lr2: &logsRequest{ld: testdata.GenerateLogs(20)}, + expected: []*logsRequest{ + {ld: func() plog.Logs { + logs := testdata.GenerateLogs(8) + testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(logs.ResourceLogs()) + return logs + }()}, + {ld: testdata.GenerateLogs(10)}, + {ld: testdata.GenerateLogs(8)}, + }, + }, + { + name: "scope_logs_split", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, + lr1: &logsRequest{ld: func() plog.Logs { + ld := testdata.GenerateLogs(4) + ld.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("extra log") + return ld + }()}, + lr2: &logsRequest{ld: testdata.GenerateLogs(2)}, + expected: []*logsRequest{ + {ld: testdata.GenerateLogs(4)}, + {ld: func() plog.Logs { + ld := testdata.GenerateLogs(0) + ld.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().AppendEmpty().Body().SetStr("extra log") + testdata.GenerateLogs(2).ResourceLogs().MoveAndAppendTo(ld.ResourceLogs()) + return ld + }()}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := mergeSplitLogs(context.Background(), tt.cfg, tt.lr1, tt.lr2) + assert.Nil(t, err) + assert.Equal(t, len(tt.expected), len(res)) + for i, r := range res { + assert.Equal(t, tt.expected[i], r.(*logsRequest)) + } + }) + + } +} + +func TestMergeSplitLogsInvalidInput(t *testing.T) { + r1 := &tracesRequest{td: testdata.GenerateTraces(2)} + r2 := &logsRequest{ld: testdata.GenerateLogs(3)} + _, err := mergeSplitLogs(context.Background(), exporterbatcher.MaxSizeConfig{}, r1, r2) + assert.Error(t, err) +} + +func TestExtractLogs(t *testing.T) { + for i := 0; i < 10; i++ { + ld := testdata.GenerateLogs(10) + extractedLogs := extractLogs(ld, i) + assert.Equal(t, i, extractedLogs.LogRecordCount()) + assert.Equal(t, 10-i, ld.LogRecordCount()) + } +} diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 596de71ae68..83db18229dd 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -82,7 +82,10 @@ func NewMetricsExporter( if pusher == nil { return nil, errNilPushMetricsData } - metricsOpts := []Option{withMarshaler(metricsRequestMarshaler), withUnmarshaler(newMetricsRequestUnmarshalerFunc(pusher))} + metricsOpts := []Option{ + withMarshaler(metricsRequestMarshaler), withUnmarshaler(newMetricsRequestUnmarshalerFunc(pusher)), + withBatchFuncs(mergeMetrics, mergeSplitMetrics), + } return NewMetricsRequestExporter(ctx, set, requestFromMetrics(pusher), append(metricsOpts, options...)...) } diff --git a/exporter/exporterhelper/metrics_batch.go b/exporter/exporterhelper/metrics_batch.go new file mode 100644 index 00000000000..8244034ecbb --- /dev/null +++ b/exporter/exporterhelper/metrics_batch.go @@ -0,0 +1,235 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper + +import ( + "context" + "errors" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/pdata/pmetric" +) + +// mergeMetrics merges two metrics requests into one. +func mergeMetrics(_ context.Context, r1 Request, r2 Request) (Request, error) { + mr1, ok1 := r1.(*metricsRequest) + mr2, ok2 := r2.(*metricsRequest) + if !ok1 || !ok2 { + return nil, errors.New("invalid input type") + } + mr2.md.ResourceMetrics().MoveAndAppendTo(mr1.md.ResourceMetrics()) + return mr1, nil +} + +// mergeSplitMetrics splits and/or merges the metrics into multiple requests based on the MaxSizeConfig. +func mergeSplitMetrics(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 Request, r2 Request) ([]Request, error) { + var ( + res []Request + destReq *metricsRequest + capacityLeft = cfg.MaxSizeItems + ) + for _, req := range []Request{r1, r2} { + if req == nil { + continue + } + srcReq, ok := req.(*metricsRequest) + if !ok { + return nil, errors.New("invalid input type") + } + if srcReq.md.DataPointCount() <= capacityLeft { + if destReq == nil { + destReq = srcReq + } else { + srcReq.md.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics()) + } + capacityLeft = cfg.MaxSizeItems - destReq.md.DataPointCount() + continue + } + + for { + extractedMetrics := extractMetrics(srcReq.md, capacityLeft) + if extractedMetrics.DataPointCount() == 0 { + break + } + capacityLeft -= extractedMetrics.DataPointCount() + if destReq == nil { + destReq = &metricsRequest{md: extractedMetrics, pusher: srcReq.pusher} + } else { + extractedMetrics.ResourceMetrics().MoveAndAppendTo(destReq.md.ResourceMetrics()) + } + + if capacityLeft == 0 { + res = append(res, destReq) + destReq = nil + capacityLeft = cfg.MaxSizeItems + } + } + } + + if destReq != nil { + res = append(res, destReq) + } + + return res, nil +} + +// extractMetrics extracts metrics from srcMetrics until count of data points is reached. +func extractMetrics(srcMetrics pmetric.Metrics, count int) pmetric.Metrics { + destMetrics := pmetric.NewMetrics() + srcMetrics.ResourceMetrics().RemoveIf(func(srcRM pmetric.ResourceMetrics) bool { + if count <= 0 { + return false + } + needToExtract := resourceDataPointsCount(srcRM) > count + if needToExtract { + srcRM = extractResourceMetrics(srcRM, count) + } + count -= resourceDataPointsCount(srcRM) + srcRM.MoveTo(destMetrics.ResourceMetrics().AppendEmpty()) + return !needToExtract + }) + return destMetrics +} + +// extractResourceMetrics extracts resource metrics and returns a new resource metrics with the specified number of data points. +func extractResourceMetrics(srcRM pmetric.ResourceMetrics, count int) pmetric.ResourceMetrics { + destRM := pmetric.NewResourceMetrics() + destRM.SetSchemaUrl(srcRM.SchemaUrl()) + srcRM.Resource().CopyTo(destRM.Resource()) + srcRM.ScopeMetrics().RemoveIf(func(srcSM pmetric.ScopeMetrics) bool { + if count <= 0 { + return false + } + needToExtract := scopeDataPointsCount(srcSM) > count + if needToExtract { + srcSM = extractScopeMetrics(srcSM, count) + } + count -= scopeDataPointsCount(srcSM) + srcSM.MoveTo(destRM.ScopeMetrics().AppendEmpty()) + return !needToExtract + }) + return destRM +} + +// extractScopeMetrics extracts scope metrics and returns a new scope metrics with the specified number of data points. +func extractScopeMetrics(srcSM pmetric.ScopeMetrics, count int) pmetric.ScopeMetrics { + destSM := pmetric.NewScopeMetrics() + destSM.SetSchemaUrl(srcSM.SchemaUrl()) + srcSM.Scope().CopyTo(destSM.Scope()) + srcSM.Metrics().RemoveIf(func(srcMetric pmetric.Metric) bool { + if count <= 0 { + return false + } + needToExtract := metricDataPointCount(srcMetric) > count + if needToExtract { + srcMetric = extractMetricDataPoints(srcMetric, count) + } + count -= metricDataPointCount(srcMetric) + srcMetric.MoveTo(destSM.Metrics().AppendEmpty()) + return !needToExtract + }) + return destSM +} + +func extractMetricDataPoints(srcMetric pmetric.Metric, count int) pmetric.Metric { + destMetric := pmetric.NewMetric() + switch srcMetric.Type() { + case pmetric.MetricTypeGauge: + extractGaugeDataPoints(srcMetric.Gauge(), count, destMetric.SetEmptyGauge()) + case pmetric.MetricTypeSum: + extractSumDataPoints(srcMetric.Sum(), count, destMetric.SetEmptySum()) + case pmetric.MetricTypeHistogram: + extractHistogramDataPoints(srcMetric.Histogram(), count, destMetric.SetEmptyHistogram()) + case pmetric.MetricTypeExponentialHistogram: + extractExponentialHistogramDataPoints(srcMetric.ExponentialHistogram(), count, + destMetric.SetEmptyExponentialHistogram()) + case pmetric.MetricTypeSummary: + extractSummaryDataPoints(srcMetric.Summary(), count, destMetric.SetEmptySummary()) + } + return destMetric +} + +func extractGaugeDataPoints(srcGauge pmetric.Gauge, count int, destGauge pmetric.Gauge) { + srcGauge.DataPoints().RemoveIf(func(srcDP pmetric.NumberDataPoint) bool { + if count <= 0 { + return false + } + srcDP.MoveTo(destGauge.DataPoints().AppendEmpty()) + count-- + return true + }) +} + +func extractSumDataPoints(srcSum pmetric.Sum, count int, destSum pmetric.Sum) { + srcSum.DataPoints().RemoveIf(func(srcDP pmetric.NumberDataPoint) bool { + if count <= 0 { + return false + } + srcDP.MoveTo(destSum.DataPoints().AppendEmpty()) + count-- + return true + }) +} + +func extractHistogramDataPoints(srcHistogram pmetric.Histogram, count int, destHistogram pmetric.Histogram) { + srcHistogram.DataPoints().RemoveIf(func(srcDP pmetric.HistogramDataPoint) bool { + if count <= 0 { + return false + } + srcDP.MoveTo(destHistogram.DataPoints().AppendEmpty()) + count-- + return true + }) +} + +func extractExponentialHistogramDataPoints(srcExponentialHistogram pmetric.ExponentialHistogram, count int, destExponentialHistogram pmetric.ExponentialHistogram) { + srcExponentialHistogram.DataPoints().RemoveIf(func(srcDP pmetric.ExponentialHistogramDataPoint) bool { + if count <= 0 { + return false + } + srcDP.MoveTo(destExponentialHistogram.DataPoints().AppendEmpty()) + count-- + return true + }) +} + +func extractSummaryDataPoints(srcSummary pmetric.Summary, count int, destSummary pmetric.Summary) { + srcSummary.DataPoints().RemoveIf(func(srcDP pmetric.SummaryDataPoint) bool { + if count <= 0 { + return false + } + srcDP.MoveTo(destSummary.DataPoints().AppendEmpty()) + count-- + return true + }) +} + +func resourceDataPointsCount(rm pmetric.ResourceMetrics) (count int) { + for i := 0; i < rm.ScopeMetrics().Len(); i++ { + count += scopeDataPointsCount(rm.ScopeMetrics().At(i)) + } + return count +} + +func scopeDataPointsCount(sm pmetric.ScopeMetrics) (count int) { + for i := 0; i < sm.Metrics().Len(); i++ { + count += metricDataPointCount(sm.Metrics().At(i)) + } + return count +} + +func metricDataPointCount(m pmetric.Metric) int { + switch m.Type() { + case pmetric.MetricTypeGauge: + return m.Gauge().DataPoints().Len() + case pmetric.MetricTypeSum: + return m.Sum().DataPoints().Len() + case pmetric.MetricTypeHistogram: + return m.Histogram().DataPoints().Len() + case pmetric.MetricTypeExponentialHistogram: + return m.ExponentialHistogram().DataPoints().Len() + case pmetric.MetricTypeSummary: + return m.Summary().DataPoints().Len() + } + return 0 +} diff --git a/exporter/exporterhelper/metrics_batch_test.go b/exporter/exporterhelper/metrics_batch_test.go new file mode 100644 index 00000000000..1a6ff471b4b --- /dev/null +++ b/exporter/exporterhelper/metrics_batch_test.go @@ -0,0 +1,166 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper + +import ( + "context" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/pdata/pmetric" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/internal/testdata" +) + +func TestMergeMetrics(t *testing.T) { + mr1 := &metricsRequest{md: testdata.GenerateMetrics(2)} + mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)} + res, err := mergeMetrics(context.Background(), mr1, mr2) + assert.Nil(t, err) + assert.Equal(t, 5, res.(*metricsRequest).md.MetricCount()) +} + +func TestMergeMetricsInvalidInput(t *testing.T) { + mr1 := &tracesRequest{td: testdata.GenerateTraces(2)} + mr2 := &metricsRequest{md: testdata.GenerateMetrics(3)} + _, err := mergeMetrics(context.Background(), mr1, mr2) + assert.Error(t, err) +} + +func TestMergeSplitMetrics(t *testing.T) { + tests := []struct { + name string + cfg exporterbatcher.MaxSizeConfig + mr1 Request + mr2 Request + expected []*metricsRequest + }{ + { + name: "both_requests_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + mr1: &metricsRequest{md: pmetric.NewMetrics()}, + mr2: &metricsRequest{md: pmetric.NewMetrics()}, + expected: []*metricsRequest{{md: pmetric.NewMetrics()}}, + }, + { + name: "both_requests_nil", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + mr1: nil, + mr2: nil, + expected: []*metricsRequest{}, + }, + { + name: "first_request_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + mr1: &metricsRequest{md: pmetric.NewMetrics()}, + mr2: &metricsRequest{md: testdata.GenerateMetrics(5)}, + expected: []*metricsRequest{{md: testdata.GenerateMetrics(5)}}, + }, + { + name: "first_requests_nil", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + mr1: nil, + mr2: &metricsRequest{md: testdata.GenerateMetrics(5)}, + expected: []*metricsRequest{{md: testdata.GenerateMetrics(5)}}, + }, + { + name: "first_nil_second_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + mr1: nil, + mr2: &metricsRequest{md: pmetric.NewMetrics()}, + expected: []*metricsRequest{{md: pmetric.NewMetrics()}}, + }, + { + name: "merge_only", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 60}, + mr1: &metricsRequest{md: testdata.GenerateMetrics(10)}, + mr2: &metricsRequest{md: testdata.GenerateMetrics(14)}, + expected: []*metricsRequest{{md: func() pmetric.Metrics { + metrics := testdata.GenerateMetrics(10) + testdata.GenerateMetrics(14).ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics()) + return metrics + }()}}, + }, + { + name: "split_only", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 14}, + mr1: nil, + mr2: &metricsRequest{md: testdata.GenerateMetrics(15)}, // 15 metrics, 30 data points + expected: []*metricsRequest{ + {md: testdata.GenerateMetrics(7)}, // 7 metrics, 14 data points + {md: testdata.GenerateMetrics(7)}, // 7 metrics, 14 data points + {md: testdata.GenerateMetrics(1)}, // 1 metric, 2 data points + }, + }, + { + name: "split_and_merge", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 28}, + mr1: &metricsRequest{md: testdata.GenerateMetrics(7)}, // 7 metrics, 14 data points + mr2: &metricsRequest{md: testdata.GenerateMetrics(25)}, // 25 metrics, 50 data points + expected: []*metricsRequest{ + {md: func() pmetric.Metrics { + metrics := testdata.GenerateMetrics(7) + testdata.GenerateMetrics(7).ResourceMetrics().MoveAndAppendTo(metrics.ResourceMetrics()) + return metrics + }()}, + {md: testdata.GenerateMetrics(14)}, // 14 metrics, 28 data points + {md: testdata.GenerateMetrics(4)}, // 4 metrics, 8 data points + }, + }, + { + name: "scope_metrics_split", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 8}, + mr1: &metricsRequest{md: func() pmetric.Metrics { + md := testdata.GenerateMetrics(4) + extraScopeMetrics := md.ResourceMetrics().At(0).ScopeMetrics().AppendEmpty() + testdata.GenerateMetrics(4).ResourceMetrics().At(0).ScopeMetrics().At(0).MoveTo(extraScopeMetrics) + extraScopeMetrics.Scope().SetName("extra scope") + return md + }()}, + mr2: nil, + expected: []*metricsRequest{ + {md: testdata.GenerateMetrics(4)}, + {md: func() pmetric.Metrics { + md := testdata.GenerateMetrics(4) + md.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().SetName("extra scope") + return md + }()}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := mergeSplitMetrics(context.Background(), tt.cfg, tt.mr1, tt.mr2) + assert.Nil(t, err) + assert.Equal(t, len(tt.expected), len(res)) + for i := range res { + assert.Equal(t, tt.expected[i], res[i].(*metricsRequest)) + } + }) + } +} + +func TestMergeSplitMetricsInvalidInput(t *testing.T) { + r1 := &tracesRequest{td: testdata.GenerateTraces(2)} + r2 := &metricsRequest{md: testdata.GenerateMetrics(3)} + _, err := mergeSplitMetrics(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r1, r2) + assert.Error(t, err) +} + +func TestExtractMetrics(t *testing.T) { + for i := 0; i < 20; i++ { + md := testdata.GenerateMetrics(10) + extractedMetrics := extractMetrics(md, i) + assert.Equal(t, i, extractedMetrics.DataPointCount()) + assert.Equal(t, 20-i, md.DataPointCount()) + } +} + +func TestExtractMetricsInvalidMetric(t *testing.T) { + md := testdata.GenerateMetricsMetricTypeInvalid() + extractedMetrics := extractMetrics(md, 10) + assert.Equal(t, testdata.GenerateMetricsMetricTypeInvalid(), extractedMetrics) + assert.Equal(t, 0, md.ResourceMetrics().Len()) +} diff --git a/exporter/exporterhelper/traces_batch.go b/exporter/exporterhelper/traces_batch.go new file mode 100644 index 00000000000..74d91e6f621 --- /dev/null +++ b/exporter/exporterhelper/traces_batch.go @@ -0,0 +1,138 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper + +import ( + "context" + "errors" + "go.opentelemetry.io/collector/pdata/ptrace" + + "go.opentelemetry.io/collector/exporter/exporterbatcher" +) + +// mergeTraces merges two traces requests into one. +func mergeTraces(_ context.Context, r1 Request, r2 Request) (Request, error) { + tr1, ok1 := r1.(*tracesRequest) + tr2, ok2 := r2.(*tracesRequest) + if !ok1 || !ok2 { + return nil, errors.New("invalid input type") + } + tr2.td.ResourceSpans().MoveAndAppendTo(tr1.td.ResourceSpans()) + return tr1, nil +} + +// mergeSplitTraces splits and/or merges the traces into multiple requests based on the MaxSizeConfig. +func mergeSplitTraces(_ context.Context, cfg exporterbatcher.MaxSizeConfig, r1 Request, r2 Request) ([]Request, error) { + var ( + res []Request + destReq *tracesRequest + capacityLeft = cfg.MaxSizeItems + ) + for _, req := range []Request{r1, r2} { + if req == nil { + continue + } + srcReq, ok := req.(*tracesRequest) + if !ok { + return nil, errors.New("invalid input type") + } + if srcReq.td.SpanCount() <= capacityLeft { + if destReq == nil { + destReq = srcReq + } else { + srcReq.td.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans()) + } + capacityLeft = cfg.MaxSizeItems - destReq.td.SpanCount() + continue + } + + for { + extractedTraces := extractTraces(srcReq.td, capacityLeft) + if extractedTraces.SpanCount() == 0 { + break + } + capacityLeft -= extractedTraces.SpanCount() + if destReq == nil { + destReq = &tracesRequest{td: extractedTraces, pusher: srcReq.pusher} + } else { + extractedTraces.ResourceSpans().MoveAndAppendTo(destReq.td.ResourceSpans()) + } + if capacityLeft <= 0 { + res = append(res, destReq) + destReq = nil + capacityLeft = cfg.MaxSizeItems + } + } + } + + if destReq != nil { + res = append(res, destReq) + } + return res, nil +} + +// extractTraces extracts a new traces with a maximum number of spans. +func extractTraces(srcTraces ptrace.Traces, count int) ptrace.Traces { + destTraces := ptrace.NewTraces() + srcTraces.ResourceSpans().RemoveIf(func(srcRS ptrace.ResourceSpans) bool { + if count <= 0 { + return false + } + needToExtract := resourceTracesCount(srcRS) > count + if needToExtract { + srcRS = extractResourceSpans(srcRS, count) + } + count -= resourceTracesCount(srcRS) + srcRS.MoveTo(destTraces.ResourceSpans().AppendEmpty()) + return !needToExtract + }) + return destTraces +} + +// extractResourceSpans extracts spans and returns a new resource spans with the specified number of spans. +func extractResourceSpans(srcRS ptrace.ResourceSpans, count int) ptrace.ResourceSpans { + destRS := ptrace.NewResourceSpans() + destRS.SetSchemaUrl(srcRS.SchemaUrl()) + srcRS.Resource().CopyTo(destRS.Resource()) + srcRS.ScopeSpans().RemoveIf(func(srcSS ptrace.ScopeSpans) bool { + if count <= 0 { + return false + } + needToExtract := srcSS.Spans().Len() > count + if needToExtract { + srcSS = extractScopeSpans(srcSS, count) + } + count -= srcSS.Spans().Len() + srcSS.MoveTo(destRS.ScopeSpans().AppendEmpty()) + return !needToExtract + }) + srcRS.Resource().CopyTo(destRS.Resource()) + return destRS +} + +// extractScopeSpans extracts spans and returns a new scope spans with the specified number of spans. +func extractScopeSpans(srcSS ptrace.ScopeSpans, count int) ptrace.ScopeSpans { + destSS := ptrace.NewScopeSpans() + destSS.SetSchemaUrl(srcSS.SchemaUrl()) + srcSS.Scope().CopyTo(destSS.Scope()) + srcSS.Spans().RemoveIf(func(srcSpan ptrace.Span) bool { + if count <= 0 { + return false + } + srcSpan.MoveTo(destSS.Spans().AppendEmpty()) + count-- + return true + }) + return destSS +} + +// resourceTracesCount calculates the total number of spans in the pdata.ResourceSpans. +func resourceTracesCount(rs ptrace.ResourceSpans) int { + count := 0 + rs.ScopeSpans().RemoveIf(func(ss ptrace.ScopeSpans) bool { + count += ss.Spans().Len() + return false + }) + return count +} diff --git a/exporter/exporterhelper/traces_batch_test.go b/exporter/exporterhelper/traces_batch_test.go new file mode 100644 index 00000000000..b4d3cb45548 --- /dev/null +++ b/exporter/exporterhelper/traces_batch_test.go @@ -0,0 +1,159 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper + +import ( + "context" + "go.opentelemetry.io/collector/exporter/exporterbatcher" + "go.opentelemetry.io/collector/pdata/ptrace" + "testing" + + "github.com/stretchr/testify/assert" + + "go.opentelemetry.io/collector/internal/testdata" +) + +func TestMergeTraces(t *testing.T) { + tr1 := &tracesRequest{td: testdata.GenerateTraces(2)} + tr2 := &tracesRequest{td: testdata.GenerateTraces(3)} + res, err := mergeTraces(context.Background(), tr1, tr2) + assert.Nil(t, err) + assert.Equal(t, 5, res.(*tracesRequest).td.SpanCount()) +} + +func TestMergeTracesInvalidInput(t *testing.T) { + tr1 := &logsRequest{ld: testdata.GenerateLogs(2)} + tr2 := &tracesRequest{td: testdata.GenerateTraces(3)} + _, err := mergeTraces(context.Background(), tr1, tr2) + assert.Error(t, err) +} + +func TestMergeSplitTraces(t *testing.T) { + tests := []struct { + name string + cfg exporterbatcher.MaxSizeConfig + tr1 Request + tr2 Request + expected []*tracesRequest + }{ + { + name: "both_requests_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + tr1: &tracesRequest{td: ptrace.NewTraces()}, + tr2: &tracesRequest{td: ptrace.NewTraces()}, + expected: []*tracesRequest{{td: ptrace.NewTraces()}}, + }, + { + name: "both_requests_nil", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + tr1: nil, + tr2: nil, + expected: []*tracesRequest{}, + }, + { + name: "first_request_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + tr1: &tracesRequest{td: ptrace.NewTraces()}, + tr2: &tracesRequest{td: testdata.GenerateTraces(5)}, + expected: []*tracesRequest{{td: testdata.GenerateTraces(5)}}, + }, + { + name: "second_request_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + tr1: &tracesRequest{td: testdata.GenerateTraces(5)}, + tr2: &tracesRequest{td: ptrace.NewTraces()}, + expected: []*tracesRequest{{td: testdata.GenerateTraces(5)}}, + }, + { + name: "first_nil_second_empty", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + tr1: nil, + tr2: &tracesRequest{td: ptrace.NewTraces()}, + expected: []*tracesRequest{{td: ptrace.NewTraces()}}, + }, + { + name: "merge_only", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + tr1: &tracesRequest{td: testdata.GenerateTraces(5)}, + tr2: &tracesRequest{td: testdata.GenerateTraces(5)}, + expected: []*tracesRequest{{td: func() ptrace.Traces { + td := testdata.GenerateTraces(5) + testdata.GenerateTraces(5).ResourceSpans().MoveAndAppendTo(td.ResourceSpans()) + return td + }()}}, + }, + { + name: "split_only", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 4}, + tr1: nil, + tr2: &tracesRequest{td: testdata.GenerateTraces(10)}, + expected: []*tracesRequest{ + {td: testdata.GenerateTraces(4)}, + {td: testdata.GenerateTraces(4)}, + {td: testdata.GenerateTraces(2)}, + }, + }, + { + name: "split_and_merge", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + tr1: &tracesRequest{td: testdata.GenerateTraces(4)}, + tr2: &tracesRequest{td: testdata.GenerateTraces(20)}, + expected: []*tracesRequest{ + {td: func() ptrace.Traces { + td := testdata.GenerateTraces(4) + testdata.GenerateTraces(6).ResourceSpans().MoveAndAppendTo(td.ResourceSpans()) + return td + }()}, + {td: testdata.GenerateTraces(10)}, + {td: testdata.GenerateTraces(4)}, + }, + }, + { + name: "scope_spans_split", + cfg: exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, + tr1: &tracesRequest{td: func() ptrace.Traces { + td := testdata.GenerateTraces(10) + extraScopeTraces := testdata.GenerateTraces(5) + extraScopeTraces.ResourceSpans().At(0).ScopeSpans().At(0).Scope().SetName("extra scope") + extraScopeTraces.ResourceSpans().MoveAndAppendTo(td.ResourceSpans()) + return td + }()}, + tr2: nil, + expected: []*tracesRequest{ + {td: testdata.GenerateTraces(10)}, + {td: func() ptrace.Traces { + td := testdata.GenerateTraces(5) + td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().SetName("extra scope") + return td + }()}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + res, err := mergeSplitTraces(context.Background(), tt.cfg, tt.tr1, tt.tr2) + assert.Nil(t, err) + assert.Equal(t, len(tt.expected), len(res)) + for i := range res { + assert.Equal(t, tt.expected[i], res[i].(*tracesRequest)) + } + }) + } +} + +func TestMergeSplitTracesInvalidInput(t *testing.T) { + r1 := &tracesRequest{td: testdata.GenerateTraces(2)} + r2 := &metricsRequest{md: testdata.GenerateMetrics(3)} + _, err := mergeSplitTraces(context.Background(), exporterbatcher.MaxSizeConfig{MaxSizeItems: 10}, r1, r2) + assert.Error(t, err) +} + +func TestExtractTraces(t *testing.T) { + for i := 0; i < 10; i++ { + td := testdata.GenerateTraces(10) + extractedTraces := extractTraces(td, i) + assert.Equal(t, i, extractedTraces.SpanCount()) + assert.Equal(t, 10-i, td.SpanCount()) + } +}