Skip to content

Commit

Permalink
Update Histogram Extrema and Sum to be generic (#3870)
Browse files Browse the repository at this point in the history
* Update Histogram Extrema and Sum to be generic

* Update metric SDK

* Update exporters

* Add changes to changelog
  • Loading branch information
MrAlias authored Mar 29, 2023
1 parent 63a0f51 commit f4a9d78
Show file tree
Hide file tree
Showing 14 changed files with 123 additions and 106 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Changed

- The `Extrema` in `go.opentelemetry.io/otel/sdk/metric/metricdata` is redefined with a generic argument of `[N int64 | float64]`. (#3870)
- Move No-Op implementation from `go.opentelemetry.io/otel/metric` into its own package `go.opentelemetry.io/otel/metric/noop`. (#3941)
- `metric.NewNoopMeterProvider` is replaced with `noop.NewMeterProvider`

Expand Down
8 changes: 5 additions & 3 deletions exporters/otlp/otlpmetric/internal/transform/metricdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func Histogram[N int64 | float64](h metricdata.Histogram[N]) (*mpb.Metric_Histog
func HistogramDataPoints[N int64 | float64](dPts []metricdata.HistogramDataPoint[N]) []*mpb.HistogramDataPoint {
out := make([]*mpb.HistogramDataPoint, 0, len(dPts))
for _, dPt := range dPts {
sum := dPt.Sum
sum := float64(dPt.Sum)
hdp := &mpb.HistogramDataPoint{
Attributes: AttrIter(dPt.Attributes.Iter()),
StartTimeUnixNano: uint64(dPt.StartTime.UnixNano()),
Expand All @@ -186,10 +186,12 @@ func HistogramDataPoints[N int64 | float64](dPts []metricdata.HistogramDataPoint
ExplicitBounds: dPt.Bounds,
}
if v, ok := dPt.Min.Value(); ok {
hdp.Min = &v
vF64 := float64(v)
hdp.Min = &vF64
}
if v, ok := dPt.Max.Value(); ok {
hdp.Max = &v
vF64 := float64(v)
hdp.Max = &vF64
}
out = append(out, hdp)
}
Expand Down
12 changes: 6 additions & 6 deletions exporters/otlp/otlpmetric/internal/transform/metricdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ var (
Count: 30,
Bounds: []float64{1, 5},
BucketCounts: []uint64{0, 30, 0},
Min: metricdata.NewExtrema(minA),
Max: metricdata.NewExtrema(maxA),
Sum: sumA,
Min: metricdata.NewExtrema(int64(minA)),
Max: metricdata.NewExtrema(int64(maxA)),
Sum: int64(sumA),
}, {
Attributes: bob,
StartTime: start,
Time: end,
Count: 3,
Bounds: []float64{1, 5},
BucketCounts: []uint64{0, 1, 2},
Min: metricdata.NewExtrema(minB),
Max: metricdata.NewExtrema(maxB),
Sum: sumB,
Min: metricdata.NewExtrema(int64(minB)),
Max: metricdata.NewExtrema(int64(maxB)),
Sum: int64(sumB),
}}
otelHDPFloat64 = []metricdata.HistogramDataPoint[float64]{{
Attributes: alice,
Expand Down
2 changes: 1 addition & 1 deletion exporters/prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func addHistogramMetric[N int64 | float64](ch chan<- prometheus.Metric, histogra
cumulativeCount += dp.BucketCounts[i]
buckets[bound] = cumulativeCount
}
m, err := prometheus.NewConstHistogram(desc, dp.Count, dp.Sum, buckets, values...)
m, err := prometheus.NewConstHistogram(desc, dp.Count, float64(dp.Sum), buckets, values...)
if err != nil {
otel.Handle(err)
continue
Expand Down
15 changes: 10 additions & 5 deletions sdk/metric/internal/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ var (
bob = attribute.NewSet(attribute.String("user", "bob"), attribute.Bool("admin", false))
carol = attribute.NewSet(attribute.String("user", "carol"), attribute.Bool("admin", false))

monoIncr = setMap{alice: 1, bob: 10, carol: 2}
nonMonoIncr = setMap{alice: 1, bob: -1, carol: 2}

// Sat Jan 01 2000 00:00:00 GMT+0000.
staticTime = time.Unix(946684800, 0)
staticNowFunc = func() time.Time { return staticTime }
Expand All @@ -52,8 +49,16 @@ var (
}
)

func monoIncr[N int64 | float64]() setMap[N] {
return setMap[N]{alice: 1, bob: 10, carol: 2}
}

func nonMonoIncr[N int64 | float64]() setMap[N] {
return setMap[N]{alice: 1, bob: -1, carol: 2}
}

// setMap maps attribute sets to a number.
type setMap map[attribute.Set]int
type setMap[N int64 | float64] map[attribute.Set]N

// expectFunc is a function that returns an Aggregation of expected values for
// a cycle that contains m measurements (total across all goroutines). Each
Expand All @@ -79,7 +84,7 @@ type aggregatorTester[N int64 | float64] struct {
CycleN int
}

func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap, eFunc expectFunc) func(*testing.T) {
func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap[N], eFunc expectFunc) func(*testing.T) {
m := at.MeasurementN * at.GoroutineN
return func(t *testing.T) {
t.Run("Comparable", func(t *testing.T) {
Expand Down
29 changes: 12 additions & 17 deletions sdk/metric/internal/histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ import (
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

type buckets struct {
type buckets[N int64 | float64] struct {
counts []uint64
count uint64
sum float64
min, max float64
sum N
min, max N
}

// newBuckets returns buckets with n bins.
func newBuckets(n int) *buckets {
return &buckets{counts: make([]uint64, n)}
func newBuckets[N int64 | float64](n int) *buckets[N] {
return &buckets[N]{counts: make([]uint64, n)}
}

func (b *buckets) bin(idx int, value float64) {
func (b *buckets[N]) bin(idx int, value N) {
b.counts[idx]++
b.count++
b.sum += value
Expand All @@ -52,7 +52,7 @@ func (b *buckets) bin(idx int, value float64) {
type histValues[N int64 | float64] struct {
bounds []float64

values map[attribute.Set]*buckets
values map[attribute.Set]*buckets[N]
valuesMu sync.Mutex
}

Expand All @@ -66,24 +66,19 @@ func newHistValues[N int64 | float64](bounds []float64) *histValues[N] {
sort.Float64s(b)
return &histValues[N]{
bounds: b,
values: make(map[attribute.Set]*buckets),
values: make(map[attribute.Set]*buckets[N]),
}
}

// Aggregate records the measurement value, scoped by attr, and aggregates it
// into a histogram.
func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
// Accept all types to satisfy the Aggregator interface. However, since
// the Aggregation produced by this Aggregator is only float64, convert
// here to only use this type.
v := float64(value)

// This search will return an index in the range [0, len(s.bounds)], where
// it will return len(s.bounds) if value is greater than the last element
// of s.bounds. This aligns with the buckets in that the length of buckets
// is len(s.bounds)+1, with the last bucket representing:
// (s.bounds[len(s.bounds)-1], +∞).
idx := sort.SearchFloat64s(s.bounds, v)
idx := sort.SearchFloat64s(s.bounds, float64(value))

s.valuesMu.Lock()
defer s.valuesMu.Unlock()
Expand All @@ -97,12 +92,12 @@ func (s *histValues[N]) Aggregate(value N, attr attribute.Set) {
// Then,
//
// buckets = (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, +∞)
b = newBuckets(len(s.bounds) + 1)
b = newBuckets[N](len(s.bounds) + 1)
// Ensure min and max are recorded values (not zero), for new buckets.
b.min, b.max = v, v
b.min, b.max = value, value
s.values[attr] = b
}
b.bin(idx, v)
b.bin(idx, value)
}

// NewDeltaHistogram returns an Aggregator that summarizes a set of
Expand Down
49 changes: 28 additions & 21 deletions sdk/metric/internal/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,41 +48,41 @@ func testHistogram[N int64 | float64](t *testing.T) {
CycleN: defaultCycles,
}

incr := monoIncr
incr := monoIncr[N]()
eFunc := deltaHistExpecter[N](incr)
t.Run("Delta", tester.Run(NewDeltaHistogram[N](histConf), incr, eFunc))
eFunc = cumuHistExpecter[N](incr)
t.Run("Cumulative", tester.Run(NewCumulativeHistogram[N](histConf), incr, eFunc))
}

func deltaHistExpecter[N int64 | float64](incr setMap) expectFunc {
func deltaHistExpecter[N int64 | float64](incr setMap[N]) expectFunc {
h := metricdata.Histogram[N]{Temporality: metricdata.DeltaTemporality}
return func(m int) metricdata.Aggregation {
h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr))
for a, v := range incr {
h.DataPoints = append(h.DataPoints, hPoint[N](a, float64(v), uint64(m)))
h.DataPoints = append(h.DataPoints, hPoint[N](a, v, uint64(m)))
}
return h
}
}

func cumuHistExpecter[N int64 | float64](incr setMap) expectFunc {
func cumuHistExpecter[N int64 | float64](incr setMap[N]) expectFunc {
var cycle int
h := metricdata.Histogram[N]{Temporality: metricdata.CumulativeTemporality}
return func(m int) metricdata.Aggregation {
cycle++
h.DataPoints = make([]metricdata.HistogramDataPoint[N], 0, len(incr))
for a, v := range incr {
h.DataPoints = append(h.DataPoints, hPoint[N](a, float64(v), uint64(cycle*m)))
h.DataPoints = append(h.DataPoints, hPoint[N](a, v, uint64(cycle*m)))
}
return h
}
}

// hPoint returns an HistogramDataPoint that started and ended now with multi
// number of measurements values v. It includes a min and max (set to v).
func hPoint[N int64 | float64](a attribute.Set, v float64, multi uint64) metricdata.HistogramDataPoint[N] {
idx := sort.SearchFloat64s(bounds, v)
func hPoint[N int64 | float64](a attribute.Set, v N, multi uint64) metricdata.HistogramDataPoint[N] {
idx := sort.SearchFloat64s(bounds, float64(v))
counts := make([]uint64, len(bounds)+1)
counts[idx] += multi
return metricdata.HistogramDataPoint[N]{
Expand All @@ -94,25 +94,32 @@ func hPoint[N int64 | float64](a attribute.Set, v float64, multi uint64) metricd
BucketCounts: counts,
Min: metricdata.NewExtrema(v),
Max: metricdata.NewExtrema(v),
Sum: v * float64(multi),
Sum: v * N(multi),
}
}

func TestBucketsBin(t *testing.T) {
b := newBuckets(3)
assertB := func(counts []uint64, count uint64, sum, min, max float64) {
assert.Equal(t, counts, b.counts)
assert.Equal(t, count, b.count)
assert.Equal(t, sum, b.sum)
assert.Equal(t, min, b.min)
assert.Equal(t, max, b.max)
}
t.Run("Int64", testBucketsBin[int64]())
t.Run("Float64", testBucketsBin[float64]())
}

func testBucketsBin[N int64 | float64]() func(t *testing.T) {
return func(t *testing.T) {
b := newBuckets[N](3)
assertB := func(counts []uint64, count uint64, sum, min, max N) {
assert.Equal(t, counts, b.counts)
assert.Equal(t, count, b.count)
assert.Equal(t, sum, b.sum)
assert.Equal(t, min, b.min)
assert.Equal(t, max, b.max)
}

assertB([]uint64{0, 0, 0}, 0, 0, 0, 0)
b.bin(1, 2)
assertB([]uint64{0, 1, 0}, 1, 2, 0, 2)
b.bin(0, -1)
assertB([]uint64{1, 1, 0}, 2, 1, -1, 2)
assertB([]uint64{0, 0, 0}, 0, 0, 0, 0)
b.bin(1, 2)
assertB([]uint64{0, 1, 0}, 1, 2, 0, 2)
b.bin(0, -1)
assertB([]uint64{1, 1, 0}, 2, 1, -1, 2)
}
}

func testHistImmutableBounds[N int64 | float64](newA func(aggregation.ExplicitBucketHistogram) Aggregator[N], getBounds func(Aggregator[N]) []float64) func(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions sdk/metric/internal/lastvalue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func testLastValue[N int64 | float64]() func(*testing.T) {
CycleN: defaultCycles,
}

eFunc := func(increments setMap) expectFunc {
eFunc := func(increments setMap[N]) expectFunc {
data := make([]metricdata.DataPoint[N], 0, len(increments))
for a, v := range increments {
point := metricdata.DataPoint[N]{Attributes: a, Time: now(), Value: N(v)}
Expand All @@ -46,7 +46,7 @@ func testLastValue[N int64 | float64]() func(*testing.T) {
gauge := metricdata.Gauge[N]{DataPoints: data}
return func(int) metricdata.Aggregation { return gauge }
}
incr := monoIncr
incr := monoIncr[N]()
return tester.Run(NewLastValue[N](), incr, eFunc(incr))
}

Expand Down
30 changes: 15 additions & 15 deletions sdk/metric/internal/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,71 +39,71 @@ func testSum[N int64 | float64](t *testing.T) {
}

t.Run("Delta", func(t *testing.T) {
incr, mono := monoIncr, true
incr, mono := monoIncr[N](), true
eFunc := deltaExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewDeltaSum[N](mono), incr, eFunc))

incr, mono = nonMonoIncr, false
incr, mono = nonMonoIncr[N](), false
eFunc = deltaExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewDeltaSum[N](mono), incr, eFunc))
})

t.Run("Cumulative", func(t *testing.T) {
incr, mono := monoIncr, true
incr, mono := monoIncr[N](), true
eFunc := cumuExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc))

incr, mono = nonMonoIncr, false
incr, mono = nonMonoIncr[N](), false
eFunc = cumuExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewCumulativeSum[N](mono), incr, eFunc))
})

t.Run("PreComputedDelta", func(t *testing.T) {
incr, mono := monoIncr, true
incr, mono := monoIncr[N](), true
eFunc := preDeltaExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc))

incr, mono = nonMonoIncr, false
incr, mono = nonMonoIncr[N](), false
eFunc = preDeltaExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewPrecomputedDeltaSum[N](mono), incr, eFunc))
})

t.Run("PreComputedCumulative", func(t *testing.T) {
incr, mono := monoIncr, true
incr, mono := monoIncr[N](), true
eFunc := preCumuExpecter[N](incr, mono)
t.Run("Monotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc))

incr, mono = nonMonoIncr, false
incr, mono = nonMonoIncr[N](), false
eFunc = preCumuExpecter[N](incr, mono)
t.Run("NonMonotonic", tester.Run(NewPrecomputedCumulativeSum[N](mono), incr, eFunc))
})
}

func deltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
func deltaExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono}
return func(m int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point(a, N(v*m)))
sum.DataPoints = append(sum.DataPoints, point(a, v*N(m)))
}
return sum
}
}

func cumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
var cycle int
func cumuExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
var cycle N
sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono}
return func(m int) metricdata.Aggregation {
cycle++
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
for a, v := range incr {
sum.DataPoints = append(sum.DataPoints, point(a, N(v*cycle*m)))
sum.DataPoints = append(sum.DataPoints, point(a, v*cycle*N(m)))
}
return sum
}
}

func preDeltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
func preDeltaExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono}
last := make(map[attribute.Set]N)
return func(int) metricdata.Aggregation {
Expand All @@ -117,7 +117,7 @@ func preDeltaExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
}
}

func preCumuExpecter[N int64 | float64](incr setMap, mono bool) expectFunc {
func preCumuExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc {
sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono}
return func(int) metricdata.Aggregation {
sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr))
Expand Down
Loading

0 comments on commit f4a9d78

Please sign in to comment.