diff --git a/.chloggen/prwreceiver-parselabels.yaml b/.chloggen/prwreceiver-parselabels.yaml new file mode 100644 index 000000000000..ded584e2bdca --- /dev/null +++ b/.chloggen/prwreceiver-parselabels.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: receiver/prometheusremotewrite + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Parse labels from Prometheus Remote Write requests into Resource and Scope Attributes + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35656] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Warning - The HTTP Server still doesn't pass metrics to the next consumer. The component is unusable for now. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api, user] diff --git a/receiver/prometheusremotewritereceiver/go.mod b/receiver/prometheusremotewritereceiver/go.mod index 043f23799458..cbd45116bb48 100644 --- a/receiver/prometheusremotewritereceiver/go.mod +++ b/receiver/prometheusremotewritereceiver/go.mod @@ -5,6 +5,7 @@ go 1.22.0 require ( github.com/gogo/protobuf v1.3.2 github.com/golang/snappy v0.0.4 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0 github.com/prometheus/prometheus v0.54.1 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.112.0 @@ -54,6 +55,7 @@ require ( github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0 // indirect github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect @@ -77,7 +79,7 @@ require ( go.opentelemetry.io/collector/pdata/pprofile v0.112.0 // indirect go.opentelemetry.io/collector/pipeline v0.112.0 // indirect go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0 // indirect - go.opentelemetry.io/collector/semconv v0.105.0 // indirect + go.opentelemetry.io/collector/semconv v0.112.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 // indirect go.opentelemetry.io/otel v1.31.0 // indirect go.opentelemetry.io/otel/metric v1.31.0 // indirect diff --git a/receiver/prometheusremotewritereceiver/go.sum b/receiver/prometheusremotewritereceiver/go.sum index 6f57c1f7392d..467b9734a1a5 100644 --- a/receiver/prometheusremotewritereceiver/go.sum +++ b/receiver/prometheusremotewritereceiver/go.sum @@ -328,6 +328,12 @@ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+ github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.111.0 h1:0MJmp4O7KUQOUmQYJEGNgtf30Nhx/3nLMn0jnU4Klhw= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.111.0/go.mod h1:4PYgwpscyZUUdQVLsd7dh+LXtm1QbWCvU47P3G/7tLg= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0 h1:Ld/1EUAQ6z3CirSyf4A8waHzUAZbMPrDOno+7tb0vKM= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0/go.mod h1:wAOT1iGOOTPTw2ysr0DW2Wrfi0/TECVgiGByRQfFiV4= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0 h1:kUUO8VNv/d9Tpx0NvOsRnUsz/JvZ8SWRnK+vT0cNjuU= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0/go.mod h1:SstR8PglIFBVGCZHS69bwJGl6TaCQQ5aLSEoas/8SRA= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= @@ -459,8 +465,8 @@ go.opentelemetry.io/collector/receiver v0.112.0 h1:gdTBDOPGKMZlZghtN5A7ZLNlNwCHW go.opentelemetry.io/collector/receiver v0.112.0/go.mod h1:3QmfSUiyFzRTnHUqF8fyEvQpU5q/xuwS43jGt8JXEEA= go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0 h1:SShkZsWRsFss3iWZa9JwMC7h4gD5RbWDhUcz1/9dXSs= go.opentelemetry.io/collector/receiver/receiverprofiles v0.112.0/go.mod h1:615smszDXiz4YWwXslxlAjX7FzOVDU7Bk6xARFk+zpk= -go.opentelemetry.io/collector/semconv v0.105.0 h1:8p6dZ3JfxFTjbY38d8xlQGB1TQ3nPUvs+D0RERniZ1g= -go.opentelemetry.io/collector/semconv v0.105.0/go.mod h1:yMVUCNoQPZVq/IPfrHrnntZTWsLf5YGZ7qwKulIl5hw= +go.opentelemetry.io/collector/semconv v0.112.0 h1:JPQyvZhlNLVSuVI+FScONaiFygB7h7NTZceUEKIQUEc= +go.opentelemetry.io/collector/semconv v0.112.0/go.mod h1:zCJ5njhWpejR+A40kiEoeFm1xq1uzyZwMnRNX6/D82A= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0 h1:UP6IpuHFkUgOQL9FFQFrZ+5LiwhhYRbi7VZSIx6Nj5s= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.56.0/go.mod h1:qxuZLtbq5QDtdeSHsS7bcf6EH6uO6jUAgk764zd3rhM= go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= diff --git a/receiver/prometheusremotewritereceiver/receiver.go b/receiver/prometheusremotewritereceiver/receiver.go index 61195af2fa3c..55d1cdbeb446 100644 --- a/receiver/prometheusremotewritereceiver/receiver.go +++ b/receiver/prometheusremotewritereceiver/receiver.go @@ -14,11 +14,13 @@ import ( "github.com/gogo/protobuf/proto" promconfig "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/model/labels" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" promremote "github.com/prometheus/prometheus/storage/remote" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap/zapcore" @@ -28,6 +30,7 @@ func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsume return &prometheusRemoteWriteReceiver{ settings: settings, nextConsumer: nextConsumer, + jobInstanceCache: make(map[string]pmetric.ResourceMetrics), config: cfg, server: &http.Server{ ReadTimeout: 60 * time.Second, @@ -39,6 +42,8 @@ type prometheusRemoteWriteReceiver struct { settings receiver.Settings nextConsumer consumer.Metrics + jobInstanceCache map[string]pmetric.ResourceMetrics + config *Config server *http.Server } @@ -150,8 +155,71 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco } // translateV2 translates a v2 remote-write request into OTLP metrics. -// For now translateV2 is not implemented and returns an empty metrics. +// translate is not feature complete. // nolint -func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) { - return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil +func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) { + var ( + badRequestErrors []error + otelMetrics = pmetric.NewMetrics() + b = labels.NewScratchBuilder(0) + stats = promremote.WriteResponseStats{} + ) + + + for _, ts := range req.Timeseries { + ls := ts.ToLabels(&b, req.Symbols) + + if !ls.Has(labels.MetricName) { + badRequestErrors = append(badRequestErrors, fmt.Errorf("missing metric name in labels")) + continue + } else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate { + badRequestErrors = append(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel)) + continue + } + + var rm pmetric.ResourceMetrics + // This cache should be populated by the metric 'target_info', but we're not handling it yet. + cacheEntry, ok := prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")] + if ok { + rm = pmetric.NewResourceMetrics() + cacheEntry.CopyTo(rm) + } else { + // A remote-write request can have multiple timeseries with the same instance and job labels. + // While they are different timeseries in Prometheus, we're handling it as the same OTLP metric + // until we support 'target_info'. + rm = otelMetrics.ResourceMetrics().AppendEmpty() + parseJobAndInstance(rm.Resource().Attributes(), ls.Get("instance"), ls.Get("job")) + prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")] = rm + + + scopeAttributes := rm.ScopeMetrics().AppendEmpty().Scope().Attributes() + for _, l := range ls { + if l.Name == "instance" || l.Name == "job" || l.Name == labels.MetricName { + continue + } + scopeAttributes.PutStr(l.Name, l.Value) + } + } + + // Next step is to process metadata and samples. + } + + return otelMetrics, stats, errors.Join(badRequestErrors...) +} + +// parseJobAndInstance turns the job and instance labels service resource attributes. +// Following the specification at https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/ +func parseJobAndInstance(dest pcommon.Map, instance, job string) { + if job != "" { + dest.PutStr("service.namespace", job) + } + if instance != "" { + parts := strings.Split(instance, "/") + if len(parts) == 2 { + dest.PutStr("service.name", parts[0]) + dest.PutStr("service.instance.id", parts[1]) + return + } + dest.PutStr("service.name", instance) + } } diff --git a/receiver/prometheusremotewritereceiver/receiver_test.go b/receiver/prometheusremotewritereceiver/receiver_test.go index 8c5c9e659cfc..c597512e5759 100644 --- a/receiver/prometheusremotewritereceiver/receiver_test.go +++ b/receiver/prometheusremotewritereceiver/receiver_test.go @@ -14,13 +14,37 @@ import ( "github.com/golang/snappy" promconfig "github.com/prometheus/prometheus/config" writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2" + "github.com/prometheus/prometheus/storage/remote" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest" ) -func setupServer(t *testing.T) { +var ( + writeV2RequestFixture = &writev2.Request{ + Symbols: []string{"", "__name__", "test_metric1", "job", "test", "instance", "service-x/107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + { + LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. Should use the same resource metrics. + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + { + LabelsRefs: []uint32{1, 2, 3, 9, 5, 10, 7, 8, 9, 10}, // This series has different label values for job and instance. + Samples: []writev2.Sample{{Value: 2, Timestamp: 2}}, + }, + }, + } +) + +func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver { t.Helper() factory := NewFactory() @@ -30,6 +54,13 @@ func setupServer(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, prwReceiver, "metrics receiver creation failed") + return prwReceiver.(*prometheusRemoteWriteReceiver) +} + +func setupServer(t *testing.T) { + t.Helper() + + prwReceiver := setupMetricsReceiver(t) ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -98,3 +129,82 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) { }) } } + +func TestTranslateV2(t *testing.T) { + prwReceiver := setupMetricsReceiver(t) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + for _, tc := range []struct { + name string + request *writev2.Request + expectError string + expectedMetrics pmetric.Metrics + expectedStats remote.WriteResponseStats + }{ + { + name: "missing metric name", + request: &writev2.Request{ + Symbols: []string{"", "foo", "bar"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + }, + }, + expectError: "missing metric name in labels", + }, + { + name: "duplicate label", + request: &writev2.Request{ + Symbols: []string{"", "__name__", "test"}, + Timeseries: []writev2.TimeSeries{ + { + LabelsRefs: []uint32{1, 2, 1, 2}, + Samples: []writev2.Sample{{Value: 1, Timestamp: 1}}, + }, + }, + }, + expectError: `duplicate label "__name__" in labels`, + }, + { + name: "valid request", + request: writeV2RequestFixture, + expectedMetrics: func() pmetric.Metrics { + expected := pmetric.NewMetrics() + rm1 := expected.ResourceMetrics().AppendEmpty() + rmAttributes1 := rm1.Resource().Attributes() + rmAttributes1.PutStr("service.namespace", "test") + rmAttributes1.PutStr("service.name", "service-x") + rmAttributes1.PutStr("service.instance.id", "107cn001") + smAttributes1 := rm1.ScopeMetrics().AppendEmpty().Scope().Attributes() + smAttributes1.PutStr("d", "e") + smAttributes1.PutStr("foo", "bar") + + rm2 := expected.ResourceMetrics().AppendEmpty() + rmAttributes2 := rm2.Resource().Attributes() + rmAttributes2.PutStr("service.namespace", "foo") + rmAttributes2.PutStr("service.name", "bar") + smAttributes2 := rm2.ScopeMetrics().AppendEmpty().Scope().Attributes() + smAttributes2.PutStr("d", "e") + smAttributes2.PutStr("foo", "bar") + + return expected + }(), + expectedStats: remote.WriteResponseStats{}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + metrics, stats, err := prwReceiver.translateV2(ctx, tc.request) + if tc.expectError != "" { + assert.ErrorContains(t, err, tc.expectError) + return + } + + assert.NoError(t, err) + assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics)) + assert.Equal(t, tc.expectedStats, stats) + }) + } +}