Skip to content

Commit

Permalink
[receiver/prometheusremotewrite] Parse labels into resource and scope…
Browse files Browse the repository at this point in the history
… attributes

When translating Prometheus metrics to OTLP, we rely on the metric called 'target_info' that may come in different remote write requests.
The target_info metric is used to populate resource attributes of different OTel metrics.
Currently, we're not doing supporting the correct usage of 'target_info'.

Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens committed Oct 30, 2024
1 parent 48cd5d6 commit 1d30b01
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 4 deletions.
27 changes: 27 additions & 0 deletions .chloggen/prwreceiver-parselabels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: receiver/prometheusremotewrite

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Parse labels from Prometheus Remote Write requests into Resource Attributes

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [35656]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Warning - The HTTP Server still doesn't pass metrics to the next consumer. The component is unusable for now.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api, user]
2 changes: 2 additions & 0 deletions receiver/prometheusremotewritereceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.22.0
require (
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.111.0
github.com/prometheus/prometheus v0.54.1
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.112.0
Expand Down Expand Up @@ -54,6 +55,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.111.0 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
Expand Down
6 changes: 6 additions & 0 deletions receiver/prometheusremotewritereceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 71 additions & 3 deletions receiver/prometheusremotewritereceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ import (

"github.com/gogo/protobuf/proto"
promconfig "github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/labels"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
promremote "github.com/prometheus/prometheus/storage/remote"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.uber.org/zap/zapcore"
Expand All @@ -28,6 +30,7 @@ func newRemoteWriteReceiver(settings receiver.Settings, cfg *Config, nextConsume
return &prometheusRemoteWriteReceiver{
settings: settings,
nextConsumer: nextConsumer,
jobInstanceCache: make(map[string]pmetric.ResourceMetrics),
config: cfg,
server: &http.Server{
ReadTimeout: 60 * time.Second,
Expand All @@ -39,6 +42,8 @@ type prometheusRemoteWriteReceiver struct {
settings receiver.Settings
nextConsumer consumer.Metrics

jobInstanceCache map[string]pmetric.ResourceMetrics

config *Config
server *http.Server
}
Expand Down Expand Up @@ -150,8 +155,71 @@ func (prw *prometheusRemoteWriteReceiver) parseProto(contentType string) (promco
}

// translateV2 translates a v2 remote-write request into OTLP metrics.
// For now translateV2 is not implemented and returns an empty metrics.
// translate is not feature complete.
// nolint
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, _ *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
return pmetric.NewMetrics(), promremote.WriteResponseStats{}, nil
func (prw *prometheusRemoteWriteReceiver) translateV2(_ context.Context, req *writev2.Request) (pmetric.Metrics, promremote.WriteResponseStats, error) {
var (
badRequestErrors []error
otelMetrics = pmetric.NewMetrics()
b = labels.NewScratchBuilder(0)
stats = promremote.WriteResponseStats{}
)


for _, ts := range req.Timeseries {
ls := ts.ToLabels(&b, req.Symbols)

if !ls.Has(labels.MetricName) {
badRequestErrors = append(badRequestErrors, fmt.Errorf("missing metric name in labels"))
continue
} else if duplicateLabel, hasDuplicate := ls.HasDuplicateLabelNames(); hasDuplicate {
badRequestErrors = append(badRequestErrors, fmt.Errorf("duplicate label %q in labels", duplicateLabel))
continue
}

var rm pmetric.ResourceMetrics
// This cache should be populated by the metric 'target_info', but we're not handling it yet.
cacheEntry, ok := prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")]
if ok {
rm = pmetric.NewResourceMetrics()
cacheEntry.CopyTo(rm)
} else {
// A remote-write request can have multiple timeseries with the same instance and job labels.
// While they are different timeseries in Prometheus, we're handling it as the same OTLP metric
// until we support 'target_info'.
rm = otelMetrics.ResourceMetrics().AppendEmpty()
parseJobAndInstance(rm.Resource().Attributes(), ls.Get("instance"), ls.Get("job"))
prw.jobInstanceCache[ls.Get("instance")+ls.Get("job")] = rm


scopeAttributes := rm.ScopeMetrics().AppendEmpty().Scope().Attributes()
for _, l := range ls {
if l.Name == "instance" || l.Name == "job" || l.Name == labels.MetricName {
continue
}
scopeAttributes.PutStr(l.Name, l.Value)
}
}

// Next step is to process metadata and samples.
}

return otelMetrics, stats, errors.Join(badRequestErrors...)
}

// parseJobAndInstance turns the job and instance labels service resource attributes.
// Following the specification at https://opentelemetry.io/docs/specs/otel/compatibility/prometheus_and_openmetrics/
func parseJobAndInstance(dest pcommon.Map, instance, job string) {
if job != "" {
dest.PutStr("service.namespace", job)
}
if instance != "" {
parts := strings.Split(instance, "/")
if len(parts) == 2 {
dest.PutStr("service.name", parts[0])
dest.PutStr("service.instance.id", parts[1])
return
}
dest.PutStr("service.name", instance)
}
}
112 changes: 111 additions & 1 deletion receiver/prometheusremotewritereceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,37 @@ import (
"github.com/golang/snappy"
promconfig "github.com/prometheus/prometheus/config"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage/remote"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/receivertest"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
)

func setupServer(t *testing.T) {
var (
writeV2RequestFixture = &writev2.Request{
Symbols: []string{"", "__name__", "test_metric1", "job", "test", "instance", "service-x/107cn001", "d", "e", "foo", "bar", "f", "g", "h", "i", "Test gauge for test purposes", "Maybe op/sec who knows (:", "Test counter for test purposes"},
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Symbolized writeRequestFixture.Timeseries[0].Labels
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
},
{
LabelsRefs: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, // Same series as first. Should use the same resource metrics.
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
},
{
LabelsRefs: []uint32{1, 2, 3, 9, 5, 10, 7, 8, 9, 10}, // This series has different label values for job and instance.
Samples: []writev2.Sample{{Value: 2, Timestamp: 2}},
},
},
}
)

func setupMetricsReceiver(t *testing.T) *prometheusRemoteWriteReceiver {
t.Helper()

factory := NewFactory()
Expand All @@ -30,6 +54,13 @@ func setupServer(t *testing.T) {
assert.NoError(t, err)
assert.NotNil(t, prwReceiver, "metrics receiver creation failed")

return prwReceiver.(*prometheusRemoteWriteReceiver)
}

func setupServer(t *testing.T) {
t.Helper()

prwReceiver := setupMetricsReceiver(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

Expand Down Expand Up @@ -98,3 +129,82 @@ func TestHandlePRWContentTypeNegotiation(t *testing.T) {
})
}
}

func TestTranslateV2(t *testing.T) {
prwReceiver := setupMetricsReceiver(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

for _, tc := range []struct {
name string
request *writev2.Request
expectError string
expectedMetrics pmetric.Metrics
expectedStats remote.WriteResponseStats
}{
{
name: "missing metric name",
request: &writev2.Request{
Symbols: []string{"", "foo", "bar"},
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
},
},
},
expectError: "missing metric name in labels",
},
{
name: "duplicate label",
request: &writev2.Request{
Symbols: []string{"", "__name__", "test"},
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: []uint32{1, 2, 1, 2},
Samples: []writev2.Sample{{Value: 1, Timestamp: 1}},
},
},
},
expectError: `duplicate label "__name__" in labels`,
},
{
name: "valid request",
request: writeV2RequestFixture,
expectedMetrics: func() pmetric.Metrics {
expected := pmetric.NewMetrics()
rm1 := expected.ResourceMetrics().AppendEmpty()
rmAttributes1 := rm1.Resource().Attributes()
rmAttributes1.PutStr("service.namespace", "test")
rmAttributes1.PutStr("service.name", "service-x")
rmAttributes1.PutStr("service.instance.id", "107cn001")
smAttributes1 := rm1.ScopeMetrics().AppendEmpty().Scope().Attributes()
smAttributes1.PutStr("d", "e")
smAttributes1.PutStr("foo", "bar")

rm2 := expected.ResourceMetrics().AppendEmpty()
rmAttributes2 := rm2.Resource().Attributes()
rmAttributes2.PutStr("service.namespace", "foo")
rmAttributes2.PutStr("service.name", "bar")
smAttributes2 := rm2.ScopeMetrics().AppendEmpty().Scope().Attributes()
smAttributes2.PutStr("d", "e")
smAttributes2.PutStr("foo", "bar")

return expected
}(),
expectedStats: remote.WriteResponseStats{},
},
} {
t.Run(tc.name, func(t *testing.T) {
metrics, stats, err := prwReceiver.translateV2(ctx, tc.request)
if tc.expectError != "" {
assert.ErrorContains(t, err, tc.expectError)
return
}

assert.NoError(t, err)
assert.NoError(t, pmetrictest.CompareMetrics(tc.expectedMetrics, metrics))
assert.Equal(t, tc.expectedStats, stats)
})
}
}

0 comments on commit 1d30b01

Please sign in to comment.