Skip to content

Commit

Permalink
Fix metric SDK race condition (#293)
Browse files Browse the repository at this point in the history
* Initial fix

* benchmark acquire

* Rename handle benchmarks
  • Loading branch information
jmacd authored Nov 6, 2019
1 parent 68bd627 commit 2546646
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 95 deletions.
150 changes: 57 additions & 93 deletions sdk/metric/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/otel/api/core"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/counter"
Expand Down Expand Up @@ -66,6 +67,16 @@ func (bf *benchFixture) AggregatorFor(rec export.Record) export.Aggregator {
func (bf *benchFixture) Export(ctx context.Context, rec export.Record, agg export.Aggregator) {
}

func makeLabelSets(n int) [][]core.KeyValue {
r := make([][]core.KeyValue, n)

for i := 0; i < n; i++ {
r[i] = makeLabels(1)
}

return r
}

func makeLabels(n int) []core.KeyValue {
used := map[string]bool{}
l := make([]core.KeyValue, n)
Expand Down Expand Up @@ -117,69 +128,98 @@ func BenchmarkLabels_16(b *testing.B) {
// Note: performance does not depend on label set size for the
// benchmarks below.

func BenchmarkInt64CounterAdd(b *testing.B) {
ctx := context.Background()
func BenchmarkAcquireNewHandle(b *testing.B) {
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labelSets := makeLabelSets(b.N)
cnt := fix.sdk.NewInt64Counter("int64.counter")
labels := make([]metric.LabelSet, b.N)

for i := 0; i < b.N; i++ {
labels[i] = fix.sdk.Labels(labelSets[i]...)
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
cnt.Add(ctx, 1, labs)
cnt.AcquireHandle(labels[i])
}
}

func BenchmarkInt64CounterAcquireHandle(b *testing.B) {
func BenchmarkAcquireExistingHandle(b *testing.B) {
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
labelSets := makeLabelSets(b.N)
cnt := fix.sdk.NewInt64Counter("int64.counter")
labels := make([]metric.LabelSet, b.N)

for i := 0; i < b.N; i++ {
labels[i] = fix.sdk.Labels(labelSets[i]...)
cnt.AcquireHandle(labels[i]).Release()
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
handle := cnt.AcquireHandle(labs)
handle.Release()
cnt.AcquireHandle(labels[i])
}
}

func BenchmarkInt64CounterHandleAdd(b *testing.B) {
func BenchmarkAcquireReleaseExistingHandle(b *testing.B) {
fix := newFixture(b)
labelSets := makeLabelSets(b.N)
cnt := fix.sdk.NewInt64Counter("int64.counter")
labels := make([]metric.LabelSet, b.N)

for i := 0; i < b.N; i++ {
labels[i] = fix.sdk.Labels(labelSets[i]...)
cnt.AcquireHandle(labels[i]).Release()
}

b.ResetTimer()

for i := 0; i < b.N; i++ {
cnt.AcquireHandle(labels[i]).Release()
}
}

// Counters

func BenchmarkInt64CounterAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
cnt := fix.sdk.NewInt64Counter("int64.counter")
handle := cnt.AcquireHandle(labs)

b.ResetTimer()

for i := 0; i < b.N; i++ {
handle.Add(ctx, 1)
cnt.Add(ctx, 1, labs)
}
}

func BenchmarkFloat64CounterAdd(b *testing.B) {
func BenchmarkInt64CounterHandleAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
cnt := fix.sdk.NewFloat64Counter("float64.counter")
cnt := fix.sdk.NewInt64Counter("int64.counter")
handle := cnt.AcquireHandle(labs)

b.ResetTimer()

for i := 0; i < b.N; i++ {
cnt.Add(ctx, 1.1, labs)
handle.Add(ctx, 1)
}
}

func BenchmarkFloat64CounterAcquireHandle(b *testing.B) {
func BenchmarkFloat64CounterAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
cnt := fix.sdk.NewFloat64Counter("float64.counter")

b.ResetTimer()

for i := 0; i < b.N; i++ {
handle := cnt.AcquireHandle(labs)
handle.Release()
cnt.Add(ctx, 1.1, labs)
}
}

Expand Down Expand Up @@ -212,19 +252,6 @@ func BenchmarkInt64GaugeAdd(b *testing.B) {
}
}

func BenchmarkInt64GaugeAcquireHandle(b *testing.B) {
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
gau := fix.sdk.NewInt64Gauge("int64.gauge")

b.ResetTimer()

for i := 0; i < b.N; i++ {
handle := gau.AcquireHandle(labs)
handle.Release()
}
}

func BenchmarkInt64GaugeHandleAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
Expand Down Expand Up @@ -252,19 +279,6 @@ func BenchmarkFloat64GaugeAdd(b *testing.B) {
}
}

func BenchmarkFloat64GaugeAcquireHandle(b *testing.B) {
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
gau := fix.sdk.NewFloat64Gauge("float64.gauge")

b.ResetTimer()

for i := 0; i < b.N; i++ {
handle := gau.AcquireHandle(labs)
handle.Release()
}
}

func BenchmarkFloat64GaugeHandleAdd(b *testing.B) {
ctx := context.Background()
fix := newFixture(b)
Expand Down Expand Up @@ -294,19 +308,6 @@ func benchmarkInt64MeasureAdd(b *testing.B, name string) {
}
}

func benchmarkInt64MeasureAcquireHandle(b *testing.B, name string) {
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
mea := fix.sdk.NewInt64Measure(name)

b.ResetTimer()

for i := 0; i < b.N; i++ {
handle := mea.AcquireHandle(labs)
handle.Release()
}
}

func benchmarkInt64MeasureHandleAdd(b *testing.B, name string) {
ctx := context.Background()
fix := newFixture(b)
Expand Down Expand Up @@ -334,19 +335,6 @@ func benchmarkFloat64MeasureAdd(b *testing.B, name string) {
}
}

func benchmarkFloat64MeasureAcquireHandle(b *testing.B, name string) {
fix := newFixture(b)
labs := fix.sdk.Labels(makeLabels(1)...)
mea := fix.sdk.NewFloat64Measure(name)

b.ResetTimer()

for i := 0; i < b.N; i++ {
handle := mea.AcquireHandle(labs)
handle.Release()
}
}

func benchmarkFloat64MeasureHandleAdd(b *testing.B, name string) {
ctx := context.Background()
fix := newFixture(b)
Expand All @@ -367,10 +355,6 @@ func BenchmarkInt64MaxSumCountAdd(b *testing.B) {
benchmarkInt64MeasureAdd(b, "int64.maxsumcount")
}

func BenchmarkInt64MaxSumCountAcquireHandle(b *testing.B) {
benchmarkInt64MeasureAcquireHandle(b, "int64.maxsumcount")
}

func BenchmarkInt64MaxSumCountHandleAdd(b *testing.B) {
benchmarkInt64MeasureHandleAdd(b, "int64.maxsumcount")
}
Expand All @@ -379,10 +363,6 @@ func BenchmarkFloat64MaxSumCountAdd(b *testing.B) {
benchmarkFloat64MeasureAdd(b, "float64.maxsumcount")
}

func BenchmarkFloat64MaxSumCountAcquireHandle(b *testing.B) {
benchmarkFloat64MeasureAcquireHandle(b, "float64.maxsumcount")
}

func BenchmarkFloat64MaxSumCountHandleAdd(b *testing.B) {
benchmarkFloat64MeasureHandleAdd(b, "float64.maxsumcount")
}
Expand All @@ -393,10 +373,6 @@ func BenchmarkInt64DDSketchAdd(b *testing.B) {
benchmarkInt64MeasureAdd(b, "int64.ddsketch")
}

func BenchmarkInt64DDSketchAcquireHandle(b *testing.B) {
benchmarkInt64MeasureAcquireHandle(b, "int64.ddsketch")
}

func BenchmarkInt64DDSketchHandleAdd(b *testing.B) {
benchmarkInt64MeasureHandleAdd(b, "int64.ddsketch")
}
Expand All @@ -405,10 +381,6 @@ func BenchmarkFloat64DDSketchAdd(b *testing.B) {
benchmarkFloat64MeasureAdd(b, "float64.ddsketch")
}

func BenchmarkFloat64DDSketchAcquireHandle(b *testing.B) {
benchmarkFloat64MeasureAcquireHandle(b, "float64.ddsketch")
}

func BenchmarkFloat64DDSketchHandleAdd(b *testing.B) {
benchmarkFloat64MeasureHandleAdd(b, "float64.ddsketch")
}
Expand All @@ -419,10 +391,6 @@ func BenchmarkInt64ArrayAdd(b *testing.B) {
benchmarkInt64MeasureAdd(b, "int64.array")
}

func BenchmarkInt64ArrayAcquireHandle(b *testing.B) {
benchmarkInt64MeasureAcquireHandle(b, "int64.array")
}

func BenchmarkInt64ArrayHandleAdd(b *testing.B) {
benchmarkInt64MeasureHandleAdd(b, "int64.array")
}
Expand All @@ -431,10 +399,6 @@ func BenchmarkFloat64ArrayAdd(b *testing.B) {
benchmarkFloat64MeasureAdd(b, "float64.array")
}

func BenchmarkFloat64ArrayAcquireHandle(b *testing.B) {
benchmarkFloat64MeasureAcquireHandle(b, "float64.array")
}

func BenchmarkFloat64ArrayHandleAdd(b *testing.B) {
benchmarkFloat64MeasureHandleAdd(b, "float64.array")
}
12 changes: 10 additions & 2 deletions sdk/metric/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,19 @@ func (i *instrument) Meter() api.Meter {
}

func (i *instrument) acquireHandle(ls *labels) *record {
// Create lookup key for sync.Map
// Create lookup key for sync.Map (one allocation)
mk := mapkey{
descriptor: i.descriptor,
encoded: ls.encoded,
}

if actual, ok := i.meter.current.Load(mk); ok {
// Existing record case, only one allocation so far.
rec := actual.(*record)
atomic.AddInt64(&rec.refcount, 1)
return rec
}

// There's a memory allocation here.
rec := &record{
labels: ls,
Expand All @@ -174,6 +181,8 @@ func (i *instrument) acquireHandle(ls *labels) *record {
modifiedEpoch: 0,
}

rec.recorder = i.meter.exporter.AggregatorFor(rec)

// Load/Store: there's a memory allocation to place `mk` into
// an interface here.
if actual, loaded := i.meter.current.LoadOrStore(mk, rec); loaded {
Expand All @@ -182,7 +191,6 @@ func (i *instrument) acquireHandle(ls *labels) *record {
atomic.AddInt64(&rec.refcount, 1)
return rec
}
rec.recorder = i.meter.exporter.AggregatorFor(rec)

i.meter.addPrimary(rec)
return rec
Expand Down

0 comments on commit 2546646

Please sign in to comment.