Skip to content

Commit

Permalink
Global trace forwarding implementation (#406)
Browse files Browse the repository at this point in the history
* Global trace forwarding implementation according to open-telemetry/oteps#74
  • Loading branch information
shbieng authored and lizthegrey committed May 27, 2021
1 parent 2f32fba commit 8a5791c
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 1 deletion.
41 changes: 41 additions & 0 deletions api/global/internal/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ import (
"go.opentelemetry.io/otel/api/global/internal"
"go.opentelemetry.io/otel/api/key"
"go.opentelemetry.io/otel/api/metric"
"go.opentelemetry.io/otel/api/trace"
export "go.opentelemetry.io/otel/sdk/export/metric"
sdk "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/counter"
"go.opentelemetry.io/otel/sdk/metric/aggregator/ddsketch"
"go.opentelemetry.io/otel/sdk/metric/aggregator/gauge"
"go.opentelemetry.io/otel/sdk/metric/aggregator/minmaxsumcount"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

// benchFixture is copied from sdk/metric/benchmark_test.go.
Expand Down Expand Up @@ -100,3 +102,42 @@ func BenchmarkGlobalInt64CounterAddWithSDK(b *testing.B) {
cnt.Add(ctx, 1, labs)
}
}

func BenchmarkStartEndSpan(b *testing.B) {
// Comapare with BenchmarkStartEndSpan() in ../../sdk/trace/benchmark_test.go
traceBenchmark(b, func(b *testing.B) {
t := global.TraceProvider().Tracer("Benchmark StartEndSpan")
ctx := context.Background()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, span := t.Start(ctx, "/foo")
span.End()
}
})
}

func traceBenchmark(b *testing.B, fn func(*testing.B)) {
internal.ResetForTest()
b.Run("No SDK", func(b *testing.B) {
b.ReportAllocs()
fn(b)
})
b.Run("Default SDK (AlwaysSample)", func(b *testing.B) {
b.ReportAllocs()
global.SetTraceProvider(traceProvider(b, sdktrace.AlwaysSample()))
fn(b)
})
b.Run("Default SDK (NeverSample)", func(b *testing.B) {
b.ReportAllocs()
global.SetTraceProvider(traceProvider(b, sdktrace.NeverSample()))
fn(b)
})
}

func traceProvider(b *testing.B, sampler sdktrace.Sampler) trace.Provider {
tp, err := sdktrace.NewProvider(sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sampler}))
if err != nil {
b.Fatalf("Failed to create trace provider with sampler: %v", err)
}
return tp
}
16 changes: 15 additions & 1 deletion api/global/internal/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
globalMeter = defaultMeterValue()

delegateMeterOnce sync.Once
delegateTraceOnce sync.Once
)

// TraceProvider is the internal implementation for global.TraceProvider.
Expand All @@ -32,6 +33,18 @@ func TraceProvider() trace.Provider {

// SetTraceProvider is the internal implementation for global.SetTraceProvider.
func SetTraceProvider(tp trace.Provider) {
delegateTraceOnce.Do(func() {
current := TraceProvider()
if current == tp {
// Setting the provider to the prior default is nonsense, panic.
// Panic is acceptable because we are likely still early in the
// process lifetime.
panic("invalid Provider, the global instance cannot be reinstalled")
} else if def, ok := current.(*traceProvider); ok {
def.setDelegate(tp)
}

})
globalTracer.Store(traceProviderHolder{tp: tp})
}

Expand Down Expand Up @@ -59,7 +72,7 @@ func SetMeterProvider(mp metric.Provider) {

func defaultTracerValue() *atomic.Value {
v := &atomic.Value{}
v.Store(traceProviderHolder{tp: trace.NoopProvider{}})
v.Store(traceProviderHolder{tp: &traceProvider{}})
return v
}

Expand All @@ -74,4 +87,5 @@ func ResetForTest() {
globalTracer = defaultTracerValue()
globalMeter = defaultMeterValue()
delegateMeterOnce = sync.Once{}
delegateTraceOnce = sync.Once{}
}
118 changes: 118 additions & 0 deletions api/global/internal/trace.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package internal

/*
This file contains the forwarding implementation of the trace.Provider used as
the default global instance. Prior to initialization of an SDK, Tracers
returned by the global Provider will provide no-op functionality. This means
that all Span created prior to initialization are no-op Spans.
Once an SDK has been initialized, all provided no-op Tracers are swapped for
Tracers provided by the SDK defined Provider. However, any Span started prior
to this initialization does not change its behavior. Meaning, the Span remains
a no-op Span.
The implementation to track and swap Tracers locks all new Tracer creation
until the swap is complete. This assumes that this operation is not
performance-critical. If that assumption is incorrect, be sure to configure an
SDK prior to any Tracer creation.
*/

import (
"context"
"sync"

"go.opentelemetry.io/otel/api/trace"
)

// traceProvider is a placeholder for a configured SDK Provider.
//
// All Provider functionality is forwarded to a delegate once configured.
type traceProvider struct {
mtx sync.Mutex
tracers []*tracer

delegate trace.Provider
}

// Compile-time guarantee that traceProvider implements the trace.Provider interface.
var _ trace.Provider = &traceProvider{}

// setDelegate configures p to delegate all Provider functionality to provider.
//
// All Tracers provided prior to this function call are switched out to be
// Tracers provided by provider.
//
// Delegation only happens on the first call to this method. All subsequent
// calls result in no delegation changes.
func (p *traceProvider) setDelegate(provider trace.Provider) {
if p.delegate != nil {
return
}

p.mtx.Lock()
defer p.mtx.Unlock()

p.delegate = provider
for _, t := range p.tracers {
t.setDelegate(provider)
}

p.tracers = nil
}

// Tracer implements trace.Provider.
func (p *traceProvider) Tracer(name string) trace.Tracer {
p.mtx.Lock()
defer p.mtx.Unlock()

if p.delegate != nil {
return p.delegate.Tracer(name)
}

t := &tracer{name: name}
p.tracers = append(p.tracers, t)
return t
}

// tracer is a placeholder for a trace.Tracer.
//
// All Tracer functionality is forwarded to a delegate once configured.
// Otherwise, all functionality is forwarded to a NoopTracer.
type tracer struct {
once sync.Once
name string

delegate trace.Tracer
}

// Compile-time guarantee that tracer implements the trace.Tracer interface.
var _ trace.Tracer = &tracer{}

// setDelegate configures t to delegate all Tracer functionality to Tracers
// created by provider.
//
// All subsequent calls to the Tracer methods will be passed to the delegate.
//
// Delegation only happens on the first call to this method. All subsequent
// calls result in no delegation changes.
func (t *tracer) setDelegate(provider trace.Provider) {
t.once.Do(func() { t.delegate = provider.Tracer(t.name) })
}

// WithSpan implements trace.Tracer by forwarding the call to t.delegate if
// set, otherwise it forwards the call to a NoopTracer.
func (t *tracer) WithSpan(ctx context.Context, name string, body func(context.Context) error) error {
if t.delegate != nil {
return t.delegate.WithSpan(ctx, name, body)
}
return trace.NoopTracer{}.WithSpan(ctx, name, body)
}

// Start implements trace.Tracer by forwarding the call to t.delegate if
// set, otherwise it forwards the call to a NoopTracer.
func (t *tracer) Start(ctx context.Context, name string, opts ...trace.StartOption) (context.Context, trace.Span) {
if t.delegate != nil {
return t.delegate.Start(ctx, name, opts...)
}
return trace.NoopTracer{}.Start(ctx, name, opts...)
}
75 changes: 75 additions & 0 deletions api/global/internal/trace_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package internal_test

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/global/internal"
export "go.opentelemetry.io/otel/sdk/export/trace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

type testSpanProcesor struct {
// Names of Spans started.
spansStarted []string
// Names of Spans ended.
spansEnded []string
}

func (t *testSpanProcesor) OnStart(s *export.SpanData) {
t.spansStarted = append(t.spansStarted, s.Name)
}

func (t *testSpanProcesor) OnEnd(s *export.SpanData) {
t.spansEnded = append(t.spansEnded, s.Name)
}

func (t *testSpanProcesor) Shutdown() {}

func TestTraceDefaultSDK(t *testing.T) {
internal.ResetForTest()

ctx := context.Background()
gtp := global.TraceProvider()
tracer1 := gtp.Tracer("pre")
_, span1 := tracer1.Start(ctx, "span1")

// This should be dropped.
if err := tracer1.WithSpan(ctx, "withSpan1", func(context.Context) error { return nil }); err != nil {
t.Errorf("failed to wrap function with span prior to initialization: %v", err)
}

tp, err := sdktrace.NewProvider(sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}))
if err != nil {
t.Fatal(err)
}
tsp := &testSpanProcesor{}
tp.RegisterSpanProcessor(tsp)

global.SetTraceProvider(tp)

// This span was started before initialization, it is expected to be dropped.
span1.End()

// The existing Tracer should have been configured to now use the configured SDK.
_, span2 := tracer1.Start(ctx, "span2")
span2.End()
if err := tracer1.WithSpan(ctx, "withSpan2", func(context.Context) error { return nil }); err != nil {
t.Errorf("failed to wrap function with span post initialization: %v", err)
}

// The global trace Provider should now create Tracers that also use the newly configured SDK.
tracer2 := gtp.Tracer("post")
_, span3 := tracer2.Start(ctx, "span3")
span3.End()
if err := tracer2.WithSpan(ctx, "withSpan3", func(context.Context) error { return nil }); err != nil {
t.Errorf("failed to wrap function with span post initialization with new tracer: %v", err)
}

expected := []string{"pre/span2", "pre/withSpan2", "post/span3", "post/withSpan3"}
require.Equal(t, tsp.spansStarted, expected)
require.Equal(t, tsp.spansEnded, expected)
}

0 comments on commit 8a5791c

Please sign in to comment.