From c0f39cbd1c6ae679947fb4fbc541deab02dfc468 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Mon, 4 Nov 2024 18:15:47 -0300 Subject: [PATCH 01/10] [receiver/prometheusremotewrite] Parse labels into resource and metric attributes Signed-off-by: Arthur Silva Sens --- .chloggen/prwreceiver-parselabels.yaml | 27 ++++ receiver/prometheusremotewritereceiver/go.mod | 8 ++ .../prometheusremotewritereceiver/receiver.go | 117 ++++++++++++++++-- .../receiver_test.go | 115 ++++++++++++++++- 4 files changed, 258 insertions(+), 9 deletions(-) create mode 100644 .chloggen/prwreceiver-parselabels.yaml diff --git a/.chloggen/prwreceiver-parselabels.yaml b/.chloggen/prwreceiver-parselabels.yaml new file mode 100644 index 000000000000..3cd2cad7bb3a --- /dev/null +++ b/.chloggen/prwreceiver-parselabels.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Parse labels from Prometheus Remote Write requests into Resource and Scope Attributes + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35656] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Warning - The HTTP Server still doesn't pass metrics to the next consumer. The component is unusable for now. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api, user] \ No newline at end of file diff --git a/receiver/prometheusremotewritereceiver/go.mod b/receiver/prometheusremotewritereceiver/go.mod index 8d0fa103b091..9023fffa73e1 100644 --- a/receiver/prometheusremotewritereceiver/go.mod +++ b/receiver/prometheusremotewritereceiver/go.mod @@ -5,6 +5,7 @@ go 1.22.0 require ( github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.112.0 github.com/prometheus/prometheus v0.54.1 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.115.1-0.20241206185113-3f3e208e71b8 @@ -56,6 +57,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.112.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect @@ -105,3 +107,9 @@ require ( k8s.io/klog/v2 v2.130.1 // indirect k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect ) + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../pkg/pdatautil + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 61195af2fa3c..99508ea64608 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -14,11 +14,13 @@ import ( "github.com/gogo/protobuf/proto" promconfig "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/labels" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" promremote "github.com/prometheus/prometheus/storage/remote" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap/zapcore" @@ -26,9 +28,10 @@ import ( func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) { return &prometheusRemoteWriteReceiver{ - settings: settings, - nextConsumer: nextConsumer, - config: cfg, + settings: settings, + nextConsumer: nextConsumer, + config: cfg, + jobInstanceCache: make(map[string]pmetric.ResourceMetrics), server: &http.Server{ ReadTimeout: 60 * time.Second, }, @@ -39,8 +42,9 @@ type prometheusRemoteWriteReceiver struct { settings receiver.Settings nextConsumer consumer.Metrics - config *Config - server *http.Server + jobInstanceCache map[string]pmetric.ResourceMetrics + config *Config + server *http.Server } func (prw *prometheusRemoteWriteReceiver) Start(ctx context.Context, host component.Host) error { @@ -150,8 +154,105 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco } // translateV2 translates a v2 remote-write request into OTLP metrics. -// For now translateV2 is not implemented and returns an empty metrics. +// translate is not feature complete. // nolint -func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) { - return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil +func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) { + var ( + badRequestErrors []error + otelMetrics = pmetric.NewMetrics() + b = labels.NewScratchBuilder(0) + stats = promremote.WriteResponseStats{} + ) + + for _, ts := range req.Timeseries { + ls := ts.ToLabels(&b, req.Symbols) + + if !ls.Has(labels.MetricName) { + badRequestErrors = append(badRequestErrors, fmt.Errorf("missing metric name in labels")) + continue + } else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate { + badRequestErrors = append(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel)) + continue + } + + var rm pmetric.ResourceMetrics + // This cache should be populated by the metric 'target_info', but we're not handling it yet. + cacheEntry, ok := prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")] + if ok { + rm = pmetric.NewResourceMetrics() + cacheEntry.CopyTo(rm) + } else { + // A remote-write request can have multiple timeseries with the same instance and job labels. + // While they are different timeseries in Prometheus, we're handling it as the same OTLP metric + // until we support 'target_info'. + rm = otelMetrics.ResourceMetrics().AppendEmpty() + parseJobAndInstance(rm.Resource().Attributes(), ls.Get("instance"), ls.Get("job")) + prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")] = rm + } + + switch ts.Metadata.Type { + case writev2.Metadata_METRIC_TYPE_COUNTER: + addCounterDatapoints(rm, ls, ts) + case writev2.Metadata_METRIC_TYPE_GAUGE: + addGaugeDatapoints(rm, ls, ts) + case writev2.Metadata_METRIC_TYPE_SUMMARY: + addSummaryDatapoints(rm, ls, ts) + case writev2.Metadata_METRIC_TYPE_HISTOGRAM: + addHistogramDatapoints(rm, ls, ts) + default: + badRequestErrors = append(badRequestErrors, fmt.Errorf("unsupported metric type %q for metric %q", ts.Metadata.Type, ls.Get(labels.MetricName))) + } + } + + return otelMetrics, stats, errors.Join(badRequestErrors...) +} + +// parseJobAndInstance turns the job and instance labels service resource attributes. +// Following the specification at https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/ +func parseJobAndInstance(dest pcommon.Map, instance, job string) { + if job != "" { + dest.PutStr("service.namespace", job) + } + if instance != "" { + parts := strings.Split(instance, "/") + if len(parts) == 2 { + dest.PutStr("service.name", parts[0]) + dest.PutStr("service.instance.id", parts[1]) + return + } + dest.PutStr("service.name", instance) + } +} + +func addCounterDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) { + // TODO: Implement this function +} + +func addGaugeDatapoints(rm pmetric.ResourceMetrics, ls labels.Labels, ts writev2.TimeSeries) { + m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge() + addDatapoints(m.DataPoints(), ls, ts) +} + +func addSummaryDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) { + // TODO: Implement this function +} + +func addHistogramDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2.TimeSeries) { + // TODO: Implement this function +} + +// addDatapoints adds the labels to the datapoints attributes. +// TODO: We're still not handling several fields that make a datapoint complete, e.g. StartTimestamp, +// Timestamp, Value, etc. +func addDatapoints(datapoints pmetric.NumberDataPointSlice, ls labels.Labels, _ writev2.TimeSeries) { + attributes := datapoints.AppendEmpty().Attributes() + + for _, l := range ls { + if l.Name == "instance" || l.Name == "job" || // Become resource attributes "service.name", "service.instance.id" and "service.namespace" + l.Name == labels.MetricName || // Becomes metric name + l.Name == "otel_scope_name" || l.Name == "otel_scope_version" { // Becomes scope name and version + continue + } + attributes.PutStr(l.Name, l.Value) + } } diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 8c5c9e659cfc..649d58e93fb2 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -14,13 +14,40 @@ import ( "github.com/golang/snappy" promconfig "github.com/prometheus/prometheus/config" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/prometheus/prometheus/storage/remote" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) -func setupServer(t *testing.T) { +var ( + writeV2RequestFixture = &writev2.Request{ + Symbols: []string{"", "__name__", "test_metric1", "job", "test", "instance", "service-x/107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}, + Timeseries: []writev2.TimeSeries{ + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. Should use the same resource metrics. + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 9, 5, 10, 7, 8, 9, 10}, // This series has different label values for job and instance. + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + }, + } +) + +func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver { t.Helper() factory := NewFactory() @@ -30,6 +57,13 @@ func setupServer(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, prwReceiver, "metrics receiver creation failed") + return prwReceiver.(*prometheusRemoteWriteReceiver) +} + +func setupServer(t *testing.T) { + t.Helper() + + prwReceiver := setupMetricsReceiver(t) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -98,3 +132,82 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) { }) } } + +func TestTranslateV2(t *testing.T) { + prwReceiver := setupMetricsReceiver(t) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + for _, tc := range []struct { + name string + request *writev2.Request + expectError string + expectedMetrics pmetric.Metrics + expectedStats remote.WriteResponseStats + }{ + { + name: "missing metric name", + request: &writev2.Request{ + Symbols: []string{"", "foo", "bar"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + }, + }, + expectError: "missing metric name in labels", + }, + { + name: "duplicate label", + request: &writev2.Request{ + Symbols: []string{"", "__name__", "test"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 1, 2}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + }, + }, + expectError: `duplicate label "__name__" in labels`, + }, + { + name: "valid request", + request: writeV2RequestFixture, + expectedMetrics: func() pmetric.Metrics { + expected := pmetric.NewMetrics() + rm1 := expected.ResourceMetrics().AppendEmpty() + rmAttributes1 := rm1.Resource().Attributes() + rmAttributes1.PutStr("service.namespace", "test") + rmAttributes1.PutStr("service.name", "service-x") + rmAttributes1.PutStr("service.instance.id", "107cn001") + mAttributes1 := rm1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() + mAttributes1.PutStr("d", "e") + mAttributes1.PutStr("foo", "bar") + + rm2 := expected.ResourceMetrics().AppendEmpty() + rmAttributes2 := rm2.Resource().Attributes() + rmAttributes2.PutStr("service.namespace", "foo") + rmAttributes2.PutStr("service.name", "bar") + mAttributes2 := rm2.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() + mAttributes2.PutStr("d", "e") + mAttributes2.PutStr("foo", "bar") + + return expected + }(), + expectedStats: remote.WriteResponseStats{}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + metrics, stats, err := prwReceiver.translateV2(ctx, tc.request) + if tc.expectError != "" { + assert.ErrorContains(t, err, tc.expectError) + return + } + + assert.NoError(t, err) + assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics)) + assert.Equal(t, tc.expectedStats, stats) + }) + } +} From 0fef5045f1de51fa80206c1ab1c45ec161bfbeb6 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Fri, 29 Nov 2024 13:34:08 -0300 Subject: [PATCH 02/10] Address comments Signed-off-by: Arthur Silva Sens --- .chloggen/prwreceiver-parselabels.yaml | 2 +- receiver/prometheusremotewritereceiver/go.mod | 4 +- .../prometheusremotewritereceiver/receiver.go | 42 ++++++++++--------- .../receiver_test.go | 40 +++++++++--------- 4 files changed, 44 insertions(+), 44 deletions(-) diff --git a/.chloggen/prwreceiver-parselabels.yaml b/.chloggen/prwreceiver-parselabels.yaml index 3cd2cad7bb3a..0e36950cd213 100644 --- a/.chloggen/prwreceiver-parselabels.yaml +++ b/.chloggen/prwreceiver-parselabels.yaml @@ -24,4 +24,4 @@ subtext: Warning - The HTTP Server still doesn't pass metrics to the next consum # Include 'user' if the change is relevant to end users. # Include 'api' if there is a change to a library API. # Default: '[user]' -change_logs: [api, user] \ No newline at end of file +change_logs: [user] \ No newline at end of file diff --git a/receiver/prometheusremotewritereceiver/go.mod b/receiver/prometheusremotewritereceiver/go.mod index 9023fffa73e1..45487ba1d7ad 100644 --- a/receiver/prometheusremotewritereceiver/go.mod +++ b/receiver/prometheusremotewritereceiver/go.mod @@ -5,7 +5,7 @@ go 1.22.0 require ( github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.112.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.114.0 github.com/prometheus/prometheus v0.54.1 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.115.1-0.20241206185113-3f3e208e71b8 @@ -57,7 +57,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.112.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.114.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 99508ea64608..1f8ed7a9ab2b 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -12,6 +12,7 @@ import ( "strings" "time" + "github.com/cespare/xxhash/v2" "github.com/gogo/protobuf/proto" promconfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/model/labels" @@ -31,7 +32,7 @@ func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsume settings: settings, nextConsumer: nextConsumer, config: cfg, - jobInstanceCache: make(map[string]pmetric.ResourceMetrics), + jobInstanceCache: make(map[uint64]pmetric.ResourceMetrics), server: &http.Server{ ReadTimeout: 60 * time.Second, }, @@ -42,7 +43,7 @@ type prometheusRemoteWriteReceiver struct { settings receiver.Settings nextConsumer consumer.Metrics - jobInstanceCache map[string]pmetric.ResourceMetrics + jobInstanceCache map[uint64]pmetric.ResourceMetrics config *Config server *http.Server } @@ -158,26 +159,27 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco // nolint func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) { var ( - badRequestErrors []error + badRequestErrors error otelMetrics = pmetric.NewMetrics() - b = labels.NewScratchBuilder(0) + labelsBuilder = labels.NewScratchBuilder(0) stats = promremote.WriteResponseStats{} ) for _, ts := range req.Timeseries { - ls := ts.ToLabels(&b, req.Symbols) + ls := ts.ToLabels(&labelsBuilder, req.Symbols) if !ls.Has(labels.MetricName) { - badRequestErrors = append(badRequestErrors, fmt.Errorf("missing metric name in labels")) + badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("missing metric name in labels")) continue } else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate { - badRequestErrors = append(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel)) + badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel)) continue } var rm pmetric.ResourceMetrics // This cache should be populated by the metric 'target_info', but we're not handling it yet. - cacheEntry, ok := prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")] + hashedJobAndInstance := xxhash.Sum64String(ls.Get("job") + ls.Get("instance")) + cacheEntry, ok := prw.jobInstanceCache[hashedJobAndInstance] if ok { rm = pmetric.NewResourceMetrics() cacheEntry.CopyTo(rm) @@ -186,8 +188,8 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr // While they are different timeseries in Prometheus, we're handling it as the same OTLP metric // until we support 'target_info'. rm = otelMetrics.ResourceMetrics().AppendEmpty() - parseJobAndInstance(rm.Resource().Attributes(), ls.Get("instance"), ls.Get("job")) - prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")] = rm + parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) + prw.jobInstanceCache[hashedJobAndInstance] = rm } switch ts.Metadata.Type { @@ -200,27 +202,27 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr case writev2.Metadata_METRIC_TYPE_HISTOGRAM: addHistogramDatapoints(rm, ls, ts) default: - badRequestErrors = append(badRequestErrors, fmt.Errorf("unsupported metric type %q for metric %q", ts.Metadata.Type, ls.Get(labels.MetricName))) + badRequestErrors = errors.Join(badRequestErrors, fmt.Errorf("unsupported metric type %q for metric %q", ts.Metadata.Type, ls.Get(labels.MetricName))) } } - return otelMetrics, stats, errors.Join(badRequestErrors...) + return otelMetrics, stats, badRequestErrors } // parseJobAndInstance turns the job and instance labels service resource attributes. // Following the specification at https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/ -func parseJobAndInstance(dest pcommon.Map, instance, job string) { - if job != "" { - dest.PutStr("service.namespace", job) - } +func parseJobAndInstance(dest pcommon.Map, job, instance string) { if instance != "" { - parts := strings.Split(instance, "/") + dest.PutStr("service.instance.id", instance) + } + if job != "" { + parts := strings.Split(job, "/") if len(parts) == 2 { - dest.PutStr("service.name", parts[0]) - dest.PutStr("service.instance.id", parts[1]) + dest.PutStr("service.namespace", parts[0]) + dest.PutStr("service.name", parts[1]) return } - dest.PutStr("service.name", instance) + dest.PutStr("service.name", job) } } diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 649d58e93fb2..d5afe567ab6e 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -24,28 +24,26 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) -var ( - writeV2RequestFixture = &writev2.Request{ - Symbols: []string{"", "__name__", "test_metric1", "job", "test", "instance", "service-x/107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}, - Timeseries: []writev2.TimeSeries{ - { - Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, - LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels - Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, - }, - { - Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, - LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. Should use the same resource metrics. - Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, - }, - { - Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, - LabelsRefs: []uint32{1, 2, 3, 9, 5, 10, 7, 8, 9, 10}, // This series has different label values for job and instance. - Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, - }, +var writeV2RequestFixture = &writev2.Request{ + Symbols: []string{"", "__name__", "test_metric1", "job", "test", "instance", "service-x/107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}, + Timeseries: []writev2.TimeSeries{ + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, }, - } -) + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. Should use the same resource metrics. + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + { + Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, + LabelsRefs: []uint32{1, 2, 3, 9, 5, 10, 7, 8, 9, 10}, // This series has different label values for job and instance. + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + }, +} func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver { t.Helper() From 138f1431e383a00f3a65643af7a80de0adad320c Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Tue, 10 Dec 2024 11:06:19 -0300 Subject: [PATCH 03/10] Fix test for job/instance label translations Signed-off-by: Arthur Silva Sens --- .../prometheusremotewritereceiver/receiver_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index d5afe567ab6e..b23156bcff5a 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -25,7 +25,7 @@ import ( ) var writeV2RequestFixture = &writev2.Request{ - Symbols: []string{"", "__name__", "test_metric1", "job", "test", "instance", "service-x/107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}, + Symbols: []string{"", "__name__", "test_metric1", "job", "service-x/test", "instance", "107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}, Timeseries: []writev2.TimeSeries{ { Metadata: writev2.Metadata{Type: writev2.Metadata_METRIC_TYPE_GAUGE}, @@ -176,8 +176,8 @@ func TestTranslateV2(t *testing.T) { expected := pmetric.NewMetrics() rm1 := expected.ResourceMetrics().AppendEmpty() rmAttributes1 := rm1.Resource().Attributes() - rmAttributes1.PutStr("service.namespace", "test") - rmAttributes1.PutStr("service.name", "service-x") + rmAttributes1.PutStr("service.namespace", "service-x") + rmAttributes1.PutStr("service.name", "test") rmAttributes1.PutStr("service.instance.id", "107cn001") mAttributes1 := rm1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() mAttributes1.PutStr("d", "e") @@ -185,8 +185,8 @@ func TestTranslateV2(t *testing.T) { rm2 := expected.ResourceMetrics().AppendEmpty() rmAttributes2 := rm2.Resource().Attributes() - rmAttributes2.PutStr("service.namespace", "foo") - rmAttributes2.PutStr("service.name", "bar") + rmAttributes2.PutStr("service.name", "foo") + rmAttributes2.PutStr("service.instance.id", "bar") mAttributes2 := rm2.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() mAttributes2.PutStr("d", "e") mAttributes2.PutStr("foo", "bar") From 0f5a55437279ca74bbebb3dcb564a55c2281dd21 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Tue, 10 Dec 2024 11:06:55 -0300 Subject: [PATCH 04/10] cache resource instead of resourcemetrics Signed-off-by: Arthur Silva Sens --- receiver/prometheusremotewritereceiver/receiver.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 1f8ed7a9ab2b..70819afb5363 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -32,7 +32,7 @@ func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsume settings: settings, nextConsumer: nextConsumer, config: cfg, - jobInstanceCache: make(map[uint64]pmetric.ResourceMetrics), + jobInstanceCache: make(map[uint64]pcommon.Resource), server: &http.Server{ ReadTimeout: 60 * time.Second, }, @@ -43,7 +43,7 @@ type prometheusRemoteWriteReceiver struct { settings receiver.Settings nextConsumer consumer.Metrics - jobInstanceCache map[uint64]pmetric.ResourceMetrics + jobInstanceCache map[uint64]pcommon.Resource config *Config server *http.Server } @@ -182,14 +182,15 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr cacheEntry, ok := prw.jobInstanceCache[hashedJobAndInstance] if ok { rm = pmetric.NewResourceMetrics() - cacheEntry.CopyTo(rm) + cacheEntry.CopyTo(rm.Resource()) } else { // A remote-write request can have multiple timeseries with the same instance and job labels. // While they are different timeseries in Prometheus, we're handling it as the same OTLP metric // until we support 'target_info'. + // TODO: Use 'target_info' to populate the resource attributes instead of caching job and instance. rm = otelMetrics.ResourceMetrics().AppendEmpty() parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) - prw.jobInstanceCache[hashedJobAndInstance] = rm + prw.jobInstanceCache[hashedJobAndInstance] = rm.Resource() } switch ts.Metadata.Type { From 29094b7cec0cd9705899a89e995b43cf60a4bfbd Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Tue, 10 Dec 2024 11:26:48 -0300 Subject: [PATCH 05/10] make gotidy Signed-off-by: Arthur Silva Sens --- receiver/prometheusremotewritereceiver/go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/prometheusremotewritereceiver/go.mod b/receiver/prometheusremotewritereceiver/go.mod index 45487ba1d7ad..9a0168567d07 100644 --- a/receiver/prometheusremotewritereceiver/go.mod +++ b/receiver/prometheusremotewritereceiver/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/receiver/promet go 1.22.0 require ( + github.com/cespare/xxhash/v2 v2.3.0 github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.114.0 @@ -30,7 +31,6 @@ require ( github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect github.com/aws/aws-sdk-go v1.54.19 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dennwc/varint v1.0.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -57,7 +57,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.114.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.115.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect From c899c69405d1fc4e57e82040a6e10c729e6ba000 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Tue, 10 Dec 2024 11:29:51 -0300 Subject: [PATCH 06/10] Use separator in job+instance cache Signed-off-by: Arthur Silva Sens --- receiver/prometheusremotewritereceiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 70819afb5363..9487fc1fd838 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -178,7 +178,7 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr var rm pmetric.ResourceMetrics // This cache should be populated by the metric 'target_info', but we're not handling it yet. - hashedJobAndInstance := xxhash.Sum64String(ls.Get("job") + ls.Get("instance")) + hashedJobAndInstance := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) cacheEntry, ok := prw.jobInstanceCache[hashedJobAndInstance] if ok { rm = pmetric.NewResourceMetrics() From 475378eeef94d34da1a5b2896398c3afd945227e Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Tue, 10 Dec 2024 11:57:52 -0300 Subject: [PATCH 07/10] Update .chloggen/prwreceiver-parselabels.yaml --- .chloggen/prwreceiver-parselabels.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/prwreceiver-parselabels.yaml b/.chloggen/prwreceiver-parselabels.yaml index 0e36950cd213..3157f8d8165e 100644 --- a/.chloggen/prwreceiver-parselabels.yaml +++ b/.chloggen/prwreceiver-parselabels.yaml @@ -7,7 +7,7 @@ change_type: enhancement component: receiver/prometheusremotewrite # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Parse labels from Prometheus Remote Write requests into Resource and Scope Attributes +note: Parse labels from Prometheus Remote Write requests into Resource and Metric Attributes. # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [35656] From ed564ef79588ca758d9ba481c7a37ca45a637e23 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Thu, 12 Dec 2024 10:13:35 -0300 Subject: [PATCH 08/10] Add TODO to cache metric name+type+unit Signed-off-by: Arthur Silva Sens --- receiver/prometheusremotewritereceiver/receiver.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 9487fc1fd838..25fefa436c65 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -232,6 +232,9 @@ func addCounterDatapoints(_ pmetric.ResourceMetrics, _ labels.Labels, _ writev2. } func addGaugeDatapoints(rm pmetric.ResourceMetrics, ls labels.Labels, ts writev2.TimeSeries) { + // TODO: Cache metric name+type+unit and look up cache before creating new empty metric. + // In OTel name+type+unit is the unique identifier of a metric and we should not create + // a new metric if it already exists. m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge() addDatapoints(m.DataPoints(), ls, ts) } From eba9ab9997203ce6be9143e4b4c230aaf9b886c6 Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Thu, 12 Dec 2024 12:32:26 -0300 Subject: [PATCH 09/10] Replace iter request cache with intra request cache Signed-off-by: Arthur Silva Sens --- .../prometheusremotewritereceiver/receiver.go | 35 ++++++++++--------- .../receiver_test.go | 13 +++++-- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 25fefa436c65..0fc3ba579453 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -29,10 +29,9 @@ import ( func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsumer consumer.Metrics) (receiver.Metrics, error) { return &prometheusRemoteWriteReceiver{ - settings: settings, - nextConsumer: nextConsumer, - config: cfg, - jobInstanceCache: make(map[uint64]pcommon.Resource), + settings: settings, + nextConsumer: nextConsumer, + config: cfg, server: &http.Server{ ReadTimeout: 60 * time.Second, }, @@ -43,9 +42,8 @@ type prometheusRemoteWriteReceiver struct { settings receiver.Settings nextConsumer consumer.Metrics - jobInstanceCache map[uint64]pcommon.Resource - config *Config - server *http.Server + config *Config + server *http.Server } func (prw *prometheusRemoteWriteReceiver) Start(ctx context.Context, host component.Host) error { @@ -163,6 +161,11 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr otelMetrics = pmetric.NewMetrics() labelsBuilder = labels.NewScratchBuilder(0) stats = promremote.WriteResponseStats{} + // Prometheus Remote-Write can send multiple time series with the same labels in the same request. + // Instead of creating a whole new OTLP metric, we just append the new sample to the existing OTLP metric. + // This cache is called "intra" because in the future we'll have a "interRequestCache" to cache resourceAttributes + // between requests based on the metric "target_info". + intraRequestCache = make(map[uint64]pmetric.ResourceMetrics) ) for _, ts := range req.Timeseries { @@ -177,20 +180,15 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr } var rm pmetric.ResourceMetrics - // This cache should be populated by the metric 'target_info', but we're not handling it yet. - hashedJobAndInstance := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) - cacheEntry, ok := prw.jobInstanceCache[hashedJobAndInstance] + hashedLabels := xxhash.Sum64String(string(ls.Bytes(make([]byte, 0)))) + intraCacheEntry, ok := intraRequestCache[hashedLabels] if ok { - rm = pmetric.NewResourceMetrics() - cacheEntry.CopyTo(rm.Resource()) + // We found the same time series in the same request, so we should append to the same OTLP metric. + rm = intraCacheEntry } else { - // A remote-write request can have multiple timeseries with the same instance and job labels. - // While they are different timeseries in Prometheus, we're handling it as the same OTLP metric - // until we support 'target_info'. - // TODO: Use 'target_info' to populate the resource attributes instead of caching job and instance. rm = otelMetrics.ResourceMetrics().AppendEmpty() parseJobAndInstance(rm.Resource().Attributes(), ls.Get("job"), ls.Get("instance")) - prw.jobInstanceCache[hashedJobAndInstance] = rm.Resource() + intraRequestCache[hashedLabels] = rm } switch ts.Metadata.Type { @@ -235,6 +233,9 @@ func addGaugeDatapoints(rm pmetric.ResourceMetrics, ls labels.Labels, ts writev2 // TODO: Cache metric name+type+unit and look up cache before creating new empty metric. // In OTel name+type+unit is the unique identifier of a metric and we should not create // a new metric if it already exists. + + // TODO: Check if Scope is already present by comparing labels "otel_scope_name" and "otel_scope_version" + // with Scope.Name and Scope.Version. If it is present, we should append to the existing Scope. m := rm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge() addDatapoints(m.DataPoints(), ls, ts) } diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index b23156bcff5a..c1e46602452d 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -179,9 +179,16 @@ func TestTranslateV2(t *testing.T) { rmAttributes1.PutStr("service.namespace", "service-x") rmAttributes1.PutStr("service.name", "test") rmAttributes1.PutStr("service.instance.id", "107cn001") - mAttributes1 := rm1.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() - mAttributes1.PutStr("d", "e") - mAttributes1.PutStr("foo", "bar") + sm1 := rm1.ScopeMetrics().AppendEmpty() + sm1Attributes := sm1.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() + sm1Attributes.PutStr("d", "e") + sm1Attributes.PutStr("foo", "bar") + // Since we don't check "scope_name" and "scope_version", we end up with duplicated scope metrics for repeated series. + // TODO: Properly handle scope metrics. + sm2 := rm1.ScopeMetrics().AppendEmpty() + sm2Attributes := sm2.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty().Attributes() + sm2Attributes.PutStr("d", "e") + sm2Attributes.PutStr("foo", "bar") rm2 := expected.ResourceMetrics().AppendEmpty() rmAttributes2 := rm2.Resource().Attributes() From 3bc2ef39673234e481a6283d569bd51ae11c275d Mon Sep 17 00:00:00 2001 From: Arthur Silva Sens Date: Thu, 12 Dec 2024 13:27:45 -0300 Subject: [PATCH 10/10] Cache based on job+instance only Signed-off-by: Arthur Silva Sens --- receiver/prometheusremotewritereceiver/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 0fc3ba579453..007d5a08199e 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -180,7 +180,7 @@ func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *wr } var rm pmetric.ResourceMetrics - hashedLabels := xxhash.Sum64String(string(ls.Bytes(make([]byte, 0)))) + hashedLabels := xxhash.Sum64String(ls.Get("job") + string([]byte{'\xff'}) + ls.Get("instance")) intraCacheEntry, ok := intraRequestCache[hashedLabels] if ok { // We found the same time series in the same request, so we should append to the same OTLP metric.