Skip to content

Commit

Permalink
Move exemplar types to non-internal package (#5747)
Browse files Browse the repository at this point in the history
Part of #5249

This makes all existing types designed to implement the public Exemplar
API public by moving most of `internal/exemplar` to `exemplar`. The only
types that are not being made public are `exemplar.Drop`, and
`exemplar.FilteredReservoir`. Those types are moved to
`internal/aggregate`, and are renamed to `DropReservoir` and
`FilteredExemplarReservoir`.

The following types are made public:

* `exemplar.Exemplar`
* `exemplar.Filter`
* `exemplar.SampledFilter`
* `exemplar.AlwaysOnFilter`
* `exemplar.HistogramReservoir`
* `exemplar.FixedSizeReservoir`
* `exemplar.Reservoir`
* `exemplar.Value`
* `exemplar.ValueType`
  • Loading branch information
dashpole authored Sep 26, 2024
1 parent 6edc7a6 commit 481f498
Show file tree
Hide file tree
Showing 29 changed files with 214 additions and 172 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## [Unreleased]

### Added

- Add `go.opentelemetry.io/otel/sdk/metric/exemplar` package which includes `Exemplar`, `Filter`, `SampledFilter`, `AlwaysOnFilter`, `HistogramReservoir`, `FixedSizeReservoir`, `Reservoir`, `Value` and `ValueType` types. These will be used for configuring the exemplar reservoir for the metrics sdk. (#5747)

### Changed

- Enable exemplars by default in `go.opentelemetry.io/otel/sdk/metric`. Exemplars can be disabled by setting `OTEL_METRICS_EXEMPLAR_FILTER=always_off` (#5778)
Expand Down
15 changes: 8 additions & 7 deletions sdk/metric/exemplar.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
"runtime"
"slices"

"go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
"go.opentelemetry.io/otel/sdk/metric/exemplar"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
)

// reservoirFunc returns the appropriately configured exemplar reservoir
Expand All @@ -18,7 +19,7 @@ import (
// Note: This will only return non-nil values when the experimental exemplar
// feature is enabled and the OTEL_METRICS_EXEMPLAR_FILTER environment variable
// is not set to always_off.
func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredReservoir[N] {
func reservoirFunc[N int64 | float64](agg Aggregation) func() aggregate.FilteredExemplarReservoir[N] {
// https://github.com/open-telemetry/opentelemetry-specification/blob/d4b241f451674e8f611bb589477680341006ad2b/specification/configuration/sdk-environment-variables.md#exemplar
const filterEnvKey = "OTEL_METRICS_EXEMPLAR_FILTER"

Expand All @@ -28,7 +29,7 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
case "always_on":
filter = exemplar.AlwaysOnFilter
case "always_off":
return exemplar.Drop
return aggregate.DropReservoir
case "trace_based":
fallthrough
default:
Expand All @@ -41,9 +42,9 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
a, ok := agg.(AggregationExplicitBucketHistogram)
if ok && len(a.Boundaries) > 0 {
cp := slices.Clone(a.Boundaries)
return func() exemplar.FilteredReservoir[N] {
return func() aggregate.FilteredExemplarReservoir[N] {
bounds := cp
return exemplar.NewFilteredReservoir[N](filter, exemplar.Histogram(bounds))
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewHistogramReservoir(bounds))
}
}

Expand Down Expand Up @@ -71,7 +72,7 @@ func reservoirFunc[N int64 | float64](agg Aggregation) func() exemplar.FilteredR
}
}

return func() exemplar.FilteredReservoir[N] {
return exemplar.NewFilteredReservoir[N](filter, exemplar.FixedSize(n))
return func() aggregate.FilteredExemplarReservoir[N] {
return aggregate.NewFilteredExemplarReservoir[N](filter, exemplar.NewFixedSizeReservoir(n))
}
}
3 changes: 3 additions & 0 deletions sdk/metric/exemplar/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Metric SDK Exemplars

[![PkgGoDev](https://pkg.go.dev/badge/go.opentelemetry.io/otel/sdk/metric/exemplar)](https://pkg.go.dev/go.opentelemetry.io/otel/sdk/metric/exemplar)
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

// Package exemplar provides an implementation of the OpenTelemetry exemplar
// reservoir to be used in metric collection pipelines.
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"time"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
Expand All @@ -12,15 +12,21 @@ import (
"go.opentelemetry.io/otel/attribute"
)

// FixedSize returns a [Reservoir] that samples at most k exemplars. If there
// are k or less measurements made, the Reservoir will sample each one. If
// there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
func FixedSize(k int) Reservoir {
return newRandRes(newStorage(k))
// NewFixedSizeReservoir returns a [FixedSizeReservoir] that samples at most
// k exemplars. If there are k or less measurements made, the Reservoir will
// sample each one. If there are more than k, the Reservoir will then randomly
// sample all additional measurement with a decreasing probability.
func NewFixedSizeReservoir(k int) *FixedSizeReservoir {
return newFixedSizeReservoir(newStorage(k))
}

type randRes struct {
var _ Reservoir = &FixedSizeReservoir{}

// FixedSizeReservoir is a [Reservoir] that samples at most k exemplars. If
// there are k or less measurements made, the Reservoir will sample each one.
// If there are more than k, the Reservoir will then randomly sample all
// additional measurement with a decreasing probability.
type FixedSizeReservoir struct {
*storage

// count is the number of measurement seen.
Expand All @@ -39,8 +45,8 @@ type randRes struct {
rng *rand.Rand
}

func newRandRes(s *storage) *randRes {
r := &randRes{
func newFixedSizeReservoir(s *storage) *FixedSizeReservoir {
r := &FixedSizeReservoir{
storage: s,
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
}
Expand All @@ -50,7 +56,7 @@ func newRandRes(s *storage) *randRes {

// randomFloat64 returns, as a float64, a uniform pseudo-random number in the
// open interval (0.0,1.0).
func (r *randRes) randomFloat64() float64 {
func (r *FixedSizeReservoir) randomFloat64() float64 {
// TODO: This does not return a uniform number. rng.Float64 returns a
// uniformly random int in [0,2^53) that is divided by 2^53. Meaning it
// returns multiples of 2^-53, and not all floating point numbers between 0
Expand All @@ -75,7 +81,18 @@ func (r *randRes) randomFloat64() float64 {
return f
}

func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The v and a
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
func (r *FixedSizeReservoir) Offer(ctx context.Context, t time.Time, n Value, a []attribute.KeyValue) {
// The following algorithm is "Algorithm L" from Li, Kim-Hung (4 December
// 1994). "Reservoir-Sampling Algorithms of Time Complexity
// O(n(1+log(N/n)))". ACM Transactions on Mathematical Software. 20 (4):
Expand Down Expand Up @@ -131,7 +148,7 @@ func (r *randRes) Offer(ctx context.Context, t time.Time, n Value, a []attribute
}

// reset resets r to the initial state.
func (r *randRes) reset() {
func (r *FixedSizeReservoir) reset() {
// This resets the number of exemplars known.
r.count = 0
// Random index inserts should only happen after the storage is full.
Expand All @@ -153,7 +170,7 @@ func (r *randRes) reset() {

// advance updates the count at which the offered measurement will overwrite an
// existing exemplar.
func (r *randRes) advance() {
func (r *FixedSizeReservoir) advance() {
// Calculate the next value in the random number series.
//
// The current value of r.w is based on the max of a distribution of random
Expand All @@ -180,7 +197,10 @@ func (r *randRes) advance() {
r.next += int64(math.Log(r.randomFloat64())/math.Log(1-r.w)) + 1
}

func (r *randRes) Collect(dest *[]Exemplar) {
// Collect returns all the held exemplars.
//
// The Reservoir state is preserved after this call.
func (r *FixedSizeReservoir) Collect(dest *[]Exemplar) {
r.storage.Collect(dest)
// Call reset here even though it will reset r.count and restart the random
// number series. This will persist any old exemplars as long as no new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ import (
"github.com/stretchr/testify/assert"
)

func TestFixedSize(t *testing.T) {
func TestNewFixedSizeReservoir(t *testing.T) {
t.Run("Int64", ReservoirTest[int64](func(n int) (Reservoir, int) {
return FixedSize(n), n
return NewFixedSizeReservoir(n), n
}))

t.Run("Float64", ReservoirTest[float64](func(n int) (Reservoir, int) {
return FixedSize(n), n
return NewFixedSizeReservoir(n), n
}))
}

func TestFixedSizeSamplingCorrectness(t *testing.T) {
func TestNewFixedSizeReservoirSamplingCorrectness(t *testing.T) {
intensity := 0.1
sampleSize := 1000

Expand All @@ -38,13 +38,13 @@ func TestFixedSizeSamplingCorrectness(t *testing.T) {
// Sort to test position bias.
slices.Sort(data)

r := FixedSize(sampleSize)
r := NewFixedSizeReservoir(sampleSize)
for _, value := range data {
r.Offer(context.Background(), staticTime, NewValue(value), nil)
}

var sum float64
for _, m := range r.(*randRes).store {
for _, m := range r.store {
sum += m.Value.Float64()
}
mean := sum / float64(sampleSize)
Expand Down
62 changes: 62 additions & 0 deletions sdk/metric/exemplar/histogram_reservoir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
"slices"
"sort"
"time"

"go.opentelemetry.io/otel/attribute"
)

// NewHistogramReservoir returns a [HistogramReservoir] that samples the last
// measurement that falls within a histogram bucket. The histogram bucket
// upper-boundaries are define by bounds.
//
// The passed bounds will be sorted by this function.
func NewHistogramReservoir(bounds []float64) *HistogramReservoir {
slices.Sort(bounds)
return &HistogramReservoir{
bounds: bounds,
storage: newStorage(len(bounds) + 1),
}
}

var _ Reservoir = &HistogramReservoir{}

// HistogramReservoir is a [Reservoir] that samples the last measurement that
// falls within a histogram bucket. The histogram bucket upper-boundaries are
// define by bounds.
type HistogramReservoir struct {
*storage

// bounds are bucket bounds in ascending order.
bounds []float64
}

// Offer accepts the parameters associated with a measurement. The
// parameters will be stored as an exemplar if the Reservoir decides to
// sample the measurement.
//
// The passed ctx needs to contain any baggage or span that were active
// when the measurement was made. This information may be used by the
// Reservoir in making a sampling decision.
//
// The time t is the time when the measurement was made. The v and a
// parameters are the value and dropped (filtered) attributes of the
// measurement respectively.
func (r *HistogramReservoir) Offer(ctx context.Context, t time.Time, v Value, a []attribute.KeyValue) {
var x float64
switch v.Type() {
case Int64ValueType:
x = float64(v.Int64())
case Float64ValueType:
x = v.Float64()
default:
panic("unknown value type")
}
r.store[sort.SearchFloat64s(r.bounds, x)] = newMeasurement(ctx, t, v, a)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import "testing"
func TestHist(t *testing.T) {
bounds := []float64{0, 100}
t.Run("Int64", ReservoirTest[int64](func(int) (Reservoir, int) {
return Histogram(bounds), len(bounds)
return NewHistogramReservoir(bounds), len(bounds)
}))

t.Run("Float64", ReservoirTest[float64](func(int) (Reservoir, int) {
return Histogram(bounds), len(bounds)
return NewHistogramReservoir(bounds), len(bounds)
}))
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import (
"context"
Expand Down Expand Up @@ -35,7 +35,7 @@ func (r *storage) Collect(dest *[]Exemplar) {
continue
}

m.Exemplar(&(*dest)[n])
m.exemplar(&(*dest)[n])
n++
}
*dest = (*dest)[:n]
Expand Down Expand Up @@ -66,8 +66,8 @@ func newMeasurement(ctx context.Context, ts time.Time, v Value, droppedAttr []at
}
}

// Exemplar returns m as an [Exemplar].
func (m measurement) Exemplar(dest *Exemplar) {
// exemplar returns m as an [Exemplar].
func (m measurement) exemplar(dest *Exemplar) {
dest.FilteredAttributes = m.FilteredAttributes
dest.Time = m.Time
dest.Value = m.Value
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar"
package exemplar // import "go.opentelemetry.io/otel/sdk/metric/exemplar"

import "math"

Expand Down
File renamed without changes.
Loading

0 comments on commit 481f498

Please sign in to comment.