Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [processor/interval] Implement the main logic #30827

Closed
wants to merge 11 commits into from
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ processor/deltatorateprocessor/ @open-telemetry/collect
processor/filterprocessor/ @open-telemetry/collector-contrib-approvers @TylerHelmuth @boostchicken
processor/groupbyattrsprocessor/ @open-telemetry/collector-contrib-approvers @rnishtala-sumo
processor/groupbytraceprocessor/ @open-telemetry/collector-contrib-approvers @jpkrohling
processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams
processor/intervalprocessor/ @open-telemetry/collector-contrib-approvers @RichieSams @sh0rez
processor/k8sattributesprocessor/ @open-telemetry/collector-contrib-approvers @dmitryax @rmfitzpatrick @fatsheep9146 @TylerHelmuth
processor/logstransformprocessor/ @open-telemetry/collector-contrib-approvers @djaglowski @dehaansa
processor/metricsgenerationprocessor/ @open-telemetry/collector-contrib-approvers @Aneurysm9
Expand Down
10 changes: 6 additions & 4 deletions internal/exp/metrics/go.mod
Original file line number Diff line number Diff line change
@@ -1,28 +1,30 @@
module github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil
go 1.21

require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.95.0
github.com/stretchr/testify v1.8.4
go.opentelemetry.io/collector/pdata v1.2.0
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231106174013-bbf56f31fb17 // indirect
google.golang.org/grpc v1.61.0 // indirect
google.golang.org/protobuf v1.32.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

go 1.21

toolchain go1.21.1
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil => ../../../pkg/pdatautil
9 changes: 9 additions & 0 deletions internal/exp/metrics/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions internal/exp/metrics/staleness/map.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness"

import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

// Map is an abstraction over a map
type Map[T any] interface {
// Load the value at key. If it does not exist, the boolean will be false and the value returned will be the zero value
Load(key identity.Stream) (T, bool)
// Store the given key value pair in the map
Store(key identity.Stream, value T)
// Remove the value at key from the map
Delete(key identity.Stream)
// Items returns an iterator function that in future go version can be used with range
// See: https://go.dev/wiki/RangefuncExperiment
Items() func(yield func(identity.Stream, T) bool) bool
}

// RawMap implementation

var _ Map[time.Time] = (*RawMap[identity.Stream, time.Time])(nil)

// RawMap is an implementation of the Map interface using a standard golang map
type RawMap[K comparable, V any] map[K]V

func (rm *RawMap[K, V]) Load(key K) (V, bool) {
value, ok := (*rm)[key]
return value, ok
}

func (rm *RawMap[K, V]) Store(key K, value V) {
(*rm)[key] = value
}

func (rm *RawMap[K, V]) Delete(key K) {
delete(*rm, key)
}

func (rm *RawMap[K, V]) Items() func(yield func(K, V) bool) bool {
return func(yield func(K, V) bool) bool {
for k, v := range *rm {
if !yield(k, v) {
break
}
}
return false
}
}
111 changes: 111 additions & 0 deletions internal/exp/metrics/staleness/priority_queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/staleness"

import (
"container/heap"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

// PriorityQueue represents a way to store entries sorted by their priority.
// Pop() will return the oldest entry of the set.
type PriorityQueue interface {
// Update will add or update an entry, and reshuffle the queue internally as needed to keep it sorted
Update(id identity.Stream, newPrio time.Time)
// Peek will return the entry at the HEAD of the queue *without* removing it from the queue
Peek() (identity.Stream, time.Time)
// Pop will remove the entry at the HEAD of the queue and return it
Pop() (identity.Stream, time.Time)
// Len will return the number of entries in the queue
Len() int
}

// heapQueue implements heap.Interface.
// We use it as the inner implementation of a heap-based sorted queue
type heapQueue []*queueItem

type queueItem struct {
key identity.Stream
prio time.Time
index int
}

func (pq heapQueue) Len() int { return len(pq) }

func (pq heapQueue) Less(i, j int) bool {
// We want Pop to give us the lowest priority
return pq[i].prio.Before(pq[j].prio)
}

func (pq heapQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}

func (pq *heapQueue) Push(x any) {
n := len(*pq)
item := x.(*queueItem)
item.index = n
*pq = append(*pq, item)
}

func (pq *heapQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}

type heapPriorityQueue struct {
inner heapQueue
itemLookup map[identity.Stream]*queueItem
}

func NewPriorityQueue() PriorityQueue {
pq := &heapPriorityQueue{
inner: heapQueue{},
itemLookup: map[identity.Stream]*queueItem{},
}
heap.Init(&pq.inner)

return pq
}

func (pq *heapPriorityQueue) Update(id identity.Stream, newPrio time.Time) {
// Check if the entry already exists in the queue
item, ok := pq.itemLookup[id]
if ok {
// If so, we can update it in place
item.prio = newPrio
heap.Fix(&pq.inner, item.index)
} else {
item = &queueItem{
key: id,
prio: newPrio,
}
heap.Push(&pq.inner, item)
pq.itemLookup[id] = item
}
}

func (pq *heapPriorityQueue) Peek() (identity.Stream, time.Time) {
val := pq.inner[0]
return val.key, val.prio
}

func (pq *heapPriorityQueue) Pop() (identity.Stream, time.Time) {
val := heap.Pop(&pq.inner).(*queueItem)
delete(pq.itemLookup, val.key)
return val.key, val.prio
}

func (pq *heapPriorityQueue) Len() int {
return pq.inner.Len()
}
116 changes: 116 additions & 0 deletions internal/exp/metrics/staleness/priority_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package staleness

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
)

func TestPriorityQueueImpl(t *testing.T) {
t.Parallel()

pq := NewPriorityQueue()

idA := generateStreamID(t, map[string]any{
"aaa": "123",
})
idB := generateStreamID(t, map[string]any{
"bbb": "456",
})
idC := generateStreamID(t, map[string]any{
"ccc": "789",
})

initialTime := time.Time{}
prioA := initialTime.Add(2 * time.Second)
prioB := initialTime.Add(1 * time.Second)
prioC := initialTime.Add(3 * time.Second)

pq.Update(idA, prioA)
pq.Update(idB, prioB)
pq.Update(idC, prioC)

// The first item should be B
id, prio := pq.Peek()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// If we peek again, nothing should change
id, prio = pq.Peek()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idB, id)
require.Equal(t, prioB, prio)

// Now if we peek again, it should be the next item
id, prio = pq.Peek()
require.Equal(t, idA, id)
require.Equal(t, prioA, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idA, id)
require.Equal(t, prioA, prio)

// One last time
id, prio = pq.Peek()
require.Equal(t, idC, id)
require.Equal(t, prioC, prio)

// Pop should return the same thing
id, prio = pq.Pop()
require.Equal(t, idC, id)
require.Equal(t, prioC, prio)

// The queue should now be empty
require.Equal(t, 0, pq.Len())

// And the inner lookup map should also be empty
require.IsType(t, &heapPriorityQueue{}, pq)
heapQueue := pq.(*heapPriorityQueue)
require.Len(t, heapQueue.itemLookup, 0)
}

func generateStreamID(t *testing.T, attributes map[string]any) identity.Stream {
res := pcommon.NewResource()
err := res.Attributes().FromRaw(map[string]any{
"foo": "bar",
"asdf": "qwer",
})
require.NoError(t, err)

scope := pcommon.NewInstrumentationScope()
scope.SetName("TestScope")
scope.SetVersion("v1.2.3")
err = scope.Attributes().FromRaw(map[string]any{
"aaa": "bbb",
"ccc": "ddd",
})
require.NoError(t, err)

metric := pmetric.NewMetric()

sum := metric.SetEmptySum()
sum.SetIsMonotonic(true)
sum.SetAggregationTemporality(pmetric.AggregationTemporalityDelta)

dp := sum.DataPoints().AppendEmpty()
dp.SetStartTimestamp(678)
dp.SetTimestamp(789)
dp.SetDoubleValue(123.456)
err = dp.Attributes().FromRaw(attributes)
require.NoError(t, err)

return identity.OfStream(identity.OfResourceMetric(res, scope, metric), dp)
}
Loading
Loading