Skip to content

Commit

Permalink
Histogram metric fixes for container insights (#124)
Browse files Browse the repository at this point in the history
* bug fix - for container insights prometheus sourced metrics, convert them to deltas.  Also fix linter errors

* fix some metrics which are not actually histograms
  • Loading branch information
chadpatel authored Oct 18, 2023
1 parent 1f23619 commit 6e9de26
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 46 deletions.
38 changes: 30 additions & 8 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,11 @@ type numberDataPointSlice struct {

// histogramDataPointSlice is a wrapper for pmetric.HistogramDataPointSlice
type histogramDataPointSlice struct {
// Todo:(khanhntd) Calculate delta value for count and sum value with histogram
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18245
deltaMetricMetadata
pmetric.HistogramDataPointSlice
}

type exponentialHistogramDataPointSlice struct {
// TODO: Calculate delta value for count and sum value with exponential histogram
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/18245
deltaMetricMetadata
pmetric.ExponentialHistogramDataPointSlice
}
Expand Down Expand Up @@ -146,16 +142,40 @@ func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationS
}

// CalculateDeltaDatapoints retrieves the HistogramDataPoint at the given index.
func (dps histogramDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
func (dps histogramDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
metric := dps.HistogramDataPointSlice.At(i)
labels := createLabels(metric.Attributes(), instrumentationScopeName)
timestamp := unixNanoToMilliseconds(metric.Timestamp())

sum := metric.Sum()
count := metric.Count()

var datapoints []dataPoint

if dps.adjustToDelta {
var delta interface{}
mKey := aws.NewKey(dps.deltaMetricMetadata, labels)
delta, retained := calculators.summary.Calculate(mKey, summaryMetricEntry{sum, count}, metric.Timestamp().AsTime())

// If a delta to the previous data point could not be computed use the current metric value instead
if !retained && dps.retainInitialValueForDelta {
retained = true
delta = summaryMetricEntry{sum, count}
}

if !retained {
return datapoints, retained
}
summaryMetricDelta := delta.(summaryMetricEntry)
sum = summaryMetricDelta.sum
count = summaryMetricDelta.count
}

return []dataPoint{{
name: dps.metricName,
value: &cWMetricStats{
Count: metric.Count(),
Sum: metric.Sum(),
Count: count,
Sum: sum,
Max: metric.Max(),
Min: metric.Min(),
},
Expand Down Expand Up @@ -350,6 +370,8 @@ func getDataPoints(pmd pmetric.Metric, metadata cWMetricMetadata, logger *zap.Lo
}
case pmetric.MetricTypeHistogram:
metric := pmd.Histogram()
// the prometheus histograms from the container insights should be adjusted for delta
metricMetadata.adjustToDelta = metadata.receiver == containerInsightsReceiver
dps = histogramDataPointSlice{
metricMetadata,
metric.DataPoints(),
Expand All @@ -368,7 +390,7 @@ func getDataPoints(pmd pmetric.Metric, metadata cWMetricMetadata, logger *zap.Lo
// attribute processor) from resource metrics. If it exists, and equals to prometheus, the sum and count will be
// converted.
// For more information: https://github.com/open-telemetry/opentelemetry-collector/blob/main/receiver/prometheusreceiver/DESIGN.md#summary
metricMetadata.adjustToDelta = metadata.receiver == prometheusReceiver
metricMetadata.adjustToDelta = metadata.receiver == prometheusReceiver || metadata.receiver == containerInsightsReceiver
dps = summaryDataPointSlice{
metricMetadata,
metric.DataPoints(),
Expand Down
101 changes: 82 additions & 19 deletions exporter/awsemfexporter/datapoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,52 @@ func TestCalculateDeltaDatapoints_HistogramDataPointSlice(t *testing.T) {
assert.True(t, retained)
assert.Equal(t, 1, histogramDatapointSlice.Len())
assert.Equal(t, tc.expectedDatapoint, dps[0])

})
}
}

func TestCalculateDeltaDatapoints_HistogramDataPointSlice_Delta(t *testing.T) {
cumulativeDeltaMetricMetadata := generateDeltaMetricMetadata(true, "foo", false)

histogramDPS := pmetric.NewHistogramDataPointSlice()
histogramDP := histogramDPS.AppendEmpty()
histogramDP.SetCount(uint64(17))
histogramDP.SetSum(17.13)
histogramDP.SetMin(10)
histogramDP.SetMax(30)
histogramDP.Attributes().PutStr("label1", "value1")

histogramDatapointSlice := histogramDataPointSlice{cumulativeDeltaMetricMetadata, histogramDPS}
emfCalcs := setupEmfCalculators()
defer require.NoError(t, shutdownEmfCalculators(emfCalcs))
dps, retained := histogramDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false, emfCalcs)

assert.False(t, retained)
assert.Equal(t, 1, histogramDatapointSlice.Len())
assert.Equal(t, 0, len(dps))

dps, retained = histogramDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false, emfCalcs)
assert.True(t, retained)
assert.Equal(t, 1, histogramDatapointSlice.Len())
assert.Equal(t, dataPoint{
name: "foo",
value: &cWMetricStats{Sum: 0, Count: 0, Min: 10, Max: 30},
labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"},
}, dps[0])

histogramDatapointSlice.HistogramDataPointSlice.At(0).SetCount(uint64(27))
histogramDatapointSlice.HistogramDataPointSlice.At(0).SetSum(27.27)
histogramDP.SetMin(5)
histogramDP.SetMax(40)

dps, retained = histogramDatapointSlice.CalculateDeltaDatapoints(0, instrLibName, false, emfCalcs)
assert.True(t, retained)
assert.Equal(t, 1, histogramDatapointSlice.Len())
assert.Equal(t, dataPoint{
name: "foo",
value: &cWMetricStats{Sum: 10.14, Count: 10, Min: 5, Max: 40},
labels: map[string]string{oTellibDimensionKey: instrLibName, "label1": "value1"},
}, dps[0])
}

func TestCalculateDeltaDatapoints_ExponentialHistogramDataPointSlice(t *testing.T) {
Expand Down Expand Up @@ -600,54 +642,75 @@ func TestCreateLabels(t *testing.T) {
func TestGetDataPoints(t *testing.T) {
logger := zap.NewNop()

normalDeltraMetricMetadata := generateDeltaMetricMetadata(false, "foo", false)
normalDeltaMetricMetadata := generateDeltaMetricMetadata(false, "foo", false)
cumulativeDeltaMetricMetadata := generateDeltaMetricMetadata(true, "foo", false)

testCases := []struct {
name string
isPrometheusMetrics bool
receiver string
metric pmetric.Metrics
expectedDatapointSlice dataPoints
expectedAttributes map[string]interface{}
}{
{
name: "Int gauge",
isPrometheusMetrics: false,
receiver: "",
metric: generateTestGaugeMetric("foo", intValueType),
expectedDatapointSlice: numberDataPointSlice{normalDeltraMetricMetadata, pmetric.NumberDataPointSlice{}},
expectedDatapointSlice: numberDataPointSlice{normalDeltaMetricMetadata, pmetric.NumberDataPointSlice{}},
expectedAttributes: map[string]interface{}{"label1": "value1"},
},
{
name: "Double sum",
isPrometheusMetrics: false,
receiver: "",
metric: generateTestSumMetric("foo", doubleValueType),
expectedDatapointSlice: numberDataPointSlice{cumulativeDeltaMetricMetadata, pmetric.NumberDataPointSlice{}},
expectedAttributes: map[string]interface{}{"label1": "value1"},
},
{
name: "Histogram",
isPrometheusMetrics: false,
receiver: "",
metric: generateTestHistogramMetric("foo"),
expectedDatapointSlice: histogramDataPointSlice{normalDeltaMetricMetadata, pmetric.HistogramDataPointSlice{}},
expectedAttributes: map[string]interface{}{"label1": "value1"},
},
{
name: "Histogram from ContainerInsights",
receiver: containerInsightsReceiver,
metric: generateTestHistogramMetric("foo"),
expectedDatapointSlice: histogramDataPointSlice{cumulativeDeltaMetricMetadata, pmetric.HistogramDataPointSlice{}},
expectedAttributes: map[string]interface{}{"label1": "value1"},
},
{
name: "Histogram from Prometheus",
receiver: prometheusReceiver,
metric: generateTestHistogramMetric("foo"),
expectedDatapointSlice: histogramDataPointSlice{normalDeltaMetricMetadata, pmetric.HistogramDataPointSlice{}},
expectedAttributes: map[string]interface{}{"label1": "value1"},
},
{
name: "ExponentialHistogram",
isPrometheusMetrics: false,
receiver: "",
metric: generateTestExponentialHistogramMetric("foo"),
expectedDatapointSlice: exponentialHistogramDataPointSlice{cumulativeDeltaMetricMetadata, pmetric.ExponentialHistogramDataPointSlice{}},
expectedDatapointSlice: exponentialHistogramDataPointSlice{normalDeltaMetricMetadata, pmetric.ExponentialHistogramDataPointSlice{}},
expectedAttributes: map[string]interface{}{"label1": "value1"},
},
{
name: "Summary from SDK",
isPrometheusMetrics: false,
receiver: "",
metric: generateTestSummaryMetric("foo"),
expectedDatapointSlice: summaryDataPointSlice{normalDeltraMetricMetadata, pmetric.SummaryDataPointSlice{}},
expectedDatapointSlice: summaryDataPointSlice{normalDeltaMetricMetadata, pmetric.SummaryDataPointSlice{}},
expectedAttributes: map[string]interface{}{"label1": "value1"},
},
{
name: "Summary from Prometheus",
isPrometheusMetrics: true,
receiver: prometheusReceiver,
metric: generateTestSummaryMetric("foo"),
expectedDatapointSlice: summaryDataPointSlice{cumulativeDeltaMetricMetadata, pmetric.SummaryDataPointSlice{}},
expectedAttributes: map[string]interface{}{"label1": "value1"},
},
{
name: "Summary from ContainerInsights",
receiver: containerInsightsReceiver,
metric: generateTestSummaryMetric("foo"),
expectedDatapointSlice: summaryDataPointSlice{cumulativeDeltaMetricMetadata, pmetric.SummaryDataPointSlice{}},
expectedAttributes: map[string]interface{}{"label1": "value1"},
Expand All @@ -661,20 +724,15 @@ func TestGetDataPoints(t *testing.T) {
metadata := generateTestMetricMetadata("namespace", time.Now().UnixNano()/int64(time.Millisecond), "log-group", "log-stream", "cloudwatch-otel", metric.Type())

t.Run(tc.name, func(t *testing.T) {

if tc.isPrometheusMetrics {
metadata.receiver = prometheusReceiver
} else {
metadata.receiver = ""
}

metadata.receiver = tc.receiver
dps := getDataPoints(metric, metadata, logger)
assert.NotNil(t, dps)
assert.Equal(t, reflect.TypeOf(tc.expectedDatapointSlice), reflect.TypeOf(dps))
switch convertedDPS := dps.(type) {
case numberDataPointSlice:
expectedDPS := tc.expectedDatapointSlice.(numberDataPointSlice)
assert.Equal(t, expectedDPS.deltaMetricMetadata, convertedDPS.deltaMetricMetadata)
assert.Equal(t, expectedDPS.adjustToDelta, convertedDPS.adjustToDelta)
assert.Equal(t, 1, convertedDPS.Len())
dp := convertedDPS.NumberDataPointSlice.At(0)
switch dp.ValueType() {
Expand All @@ -685,13 +743,17 @@ func TestGetDataPoints(t *testing.T) {
}
assert.Equal(t, tc.expectedAttributes, dp.Attributes().AsRaw())
case histogramDataPointSlice:
expectedDPS := tc.expectedDatapointSlice.(histogramDataPointSlice)
assert.Equal(t, expectedDPS.adjustToDelta, convertedDPS.adjustToDelta)
assert.Equal(t, 1, convertedDPS.Len())
dp := convertedDPS.HistogramDataPointSlice.At(0)
assert.Equal(t, 35.0, dp.Sum())
assert.Equal(t, uint64(18), dp.Count())
assert.Equal(t, []float64{0, 10}, dp.ExplicitBounds().AsRaw())
assert.Equal(t, tc.expectedAttributes, dp.Attributes().AsRaw())
case exponentialHistogramDataPointSlice:
expectedDPS := tc.expectedDatapointSlice.(exponentialHistogramDataPointSlice)
assert.Equal(t, expectedDPS.adjustToDelta, convertedDPS.adjustToDelta)
assert.Equal(t, 1, convertedDPS.Len())
dp := convertedDPS.ExponentialHistogramDataPointSlice.At(0)
assert.Equal(t, float64(0), dp.Sum())
Expand All @@ -703,6 +765,7 @@ func TestGetDataPoints(t *testing.T) {
case summaryDataPointSlice:
expectedDPS := tc.expectedDatapointSlice.(summaryDataPointSlice)
assert.Equal(t, expectedDPS.deltaMetricMetadata, convertedDPS.deltaMetricMetadata)
assert.Equal(t, expectedDPS.adjustToDelta, convertedDPS.adjustToDelta)
assert.Equal(t, 1, convertedDPS.Len())
dp := convertedDPS.SummaryDataPointSlice.At(0)
assert.Equal(t, 15.0, dp.Sum())
Expand Down
10 changes: 10 additions & 0 deletions exporter/awsemfexporter/metric_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pmetric"
Expand All @@ -27,6 +28,7 @@ const (
singleDimensionRollupOnly = "SingleDimensionRollupOnly"

prometheusReceiver = "prometheus"
containerInsightsReceiver = "awscontainerinsight"
attributeReceiver = "receiver"
fieldPrometheusMetricType = "prom_metric_type"
)
Expand Down Expand Up @@ -124,6 +126,14 @@ func (mt metricTranslator) translateOTelToGroupedMetric(rm pmetric.ResourceMetri
if receiver, ok := rm.Resource().Attributes().Get(attributeReceiver); ok {
metricReceiver = receiver.Str()
}

if serviceName, ok := rm.Resource().Attributes().Get("service.name"); ok {
if strings.HasPrefix(serviceName.Str(), "containerInsightsKubeAPIServerScraper") {
// the prometheus metrics that come from the container insight receiver need to be clearly tagged as coming from container insights
metricReceiver = containerInsightsReceiver
}
}

for j := 0; j < ilms.Len(); j++ {
ilm := ilms.At(j)
if ilm.Scope().Name() != "" {
Expand Down
Loading

0 comments on commit 6e9de26

Please sign in to comment.