Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update SpanProcessor Shutdown with context and error #1264

Merged
merged 11 commits into from
Oct 27, 2020
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
They no longer track the gRPC codes. (#1214)
- The `StatusCode` field of the `SpanData` struct in the `go.opentelemetry.io/otel/sdk/export/trace` package now uses the codes package from this package instead of the gRPC project. (#1214)
- Move the `go.opentelemetry.io/otel/api/baggage` package into `go.opentelemetry.io/otel/propagators`. (#1217)
- A `Shutdown` method of `SpanProcessor` and all its implementations receives a context and returns an error. (#1264)

### Fixed

Expand Down
5 changes: 3 additions & 2 deletions example/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ func main() {
log.Fatalf("failed to initialize stdout export pipeline: %v", err)
}

ctx := context.Background()

bsp := sdktrace.NewBatchSpanProcessor(exporter)
defer bsp.Shutdown()
defer func() { _ = bsp.Shutdown(ctx) }()
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(bsp))
pusher := push.New(
basic.New(
Expand Down Expand Up @@ -76,7 +78,6 @@ func main() {

valuerecorderTwo := otel.Must(meter).NewFloat64ValueRecorder("ex.com.two")

ctx := context.Background()
ctx = otel.ContextWithBaggageValues(ctx, fooKey.String("foo1"), barKey.String("bar1"))

valuerecorder := valuerecorderTwo.Bind(commonLabels...)
Expand Down
4 changes: 3 additions & 1 deletion example/namedtracer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func initTracer() func() {
sdktrace.WithSpanProcessor(bsp),
)
global.SetTracerProvider(tp)
return bsp.Shutdown
return func() {
_ = bsp.Shutdown(context.Background())
}
}

func main() {
Expand Down
5 changes: 3 additions & 2 deletions example/otel-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ func initProvider() func() {
pusher.Start()

return func() {
bsp.Shutdown() // shutdown the processor
handleErr(exp.Shutdown(context.Background()), "failed to stop exporter")
ctx := context.Background()
_ = bsp.Shutdown(ctx) // shutdown the processor
XSAM marked this conversation as resolved.
Show resolved Hide resolved
handleErr(exp.Shutdown(ctx), "failed to stop exporter")
pusher.Stop() // pushes any last exports to the receiver
}
}
Expand Down
18 changes: 15 additions & 3 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,23 @@ func (bsp *BatchSpanProcessor) OnEnd(sd *export.SpanData) {

// Shutdown flushes the queue and waits until all spans are processed.
// It only executes once. Subsequent call does nothing.
func (bsp *BatchSpanProcessor) Shutdown() {
func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error {
var err error
bsp.stopOnce.Do(func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
wait := make(chan struct{})
go func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
close(wait)
}()
// Wait until the wait group is done or the context is cancelled
select {
case <-wait:
case <-ctx.Done():
err = ctx.Err()
}
})
return err
}

// ForceFlush exports all ended spans that have not yet been exported.
Expand Down
6 changes: 3 additions & 3 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
bsp.OnStart(&export.SpanData{})
bsp.OnEnd(&export.SpanData{})
bsp.ForceFlush()
bsp.Shutdown()
_ = bsp.Shutdown(context.Background())
XSAM marked this conversation as resolved.
Show resolved Hide resolved
}

type testOption struct {
Expand Down Expand Up @@ -222,8 +222,8 @@ func getSpanContext() otel.SpanContext {
func TestBatchSpanProcessorShutdown(t *testing.T) {
bsp := sdktrace.NewBatchSpanProcessor(&testBatchExporter{})

bsp.Shutdown()
_ = bsp.Shutdown(context.Background())
XSAM marked this conversation as resolved.
Show resolved Hide resolved

// Multiple call to Shutdown() should not panic.
bsp.Shutdown()
_ = bsp.Shutdown(context.Background())
XSAM marked this conversation as resolved.
Show resolved Hide resolved
}
3 changes: 2 additions & 1 deletion sdk/trace/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package trace

import (
"context"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -141,7 +142,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) {
}
if stopOnce != nil {
stopOnce.state.Do(func() {
s.Shutdown()
_ = s.Shutdown(context.Background())
XSAM marked this conversation as resolved.
Show resolved Hide resolved
})
}
if len(new) > 1 {
Expand Down
3 changes: 2 additions & 1 deletion sdk/trace/simple_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func (ssp *SimpleSpanProcessor) OnEnd(sd *export.SpanData) {
}

// Shutdown method does nothing. There is no data to cleanup.
func (ssp *SimpleSpanProcessor) Shutdown() {
func (ssp *SimpleSpanProcessor) Shutdown(_ context.Context) error {
return nil
}

// ForceFlush does nothing as there is no data to flush.
Expand Down
2 changes: 1 addition & 1 deletion sdk/trace/simple_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@ func TestSimpleSpanProcessorShutdown(t *testing.T) {
t.Errorf("Error creating new instance of SimpleSpanProcessor\n")
}

ssp.Shutdown()
_ = ssp.Shutdown(context.Background())
XSAM marked this conversation as resolved.
Show resolved Hide resolved
}
5 changes: 3 additions & 2 deletions sdk/trace/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package trace

import (
"context"
"sync"

export "go.opentelemetry.io/otel/sdk/export/trace"
Expand All @@ -31,10 +32,10 @@ type SpanProcessor interface {
// and hence should not block.
OnEnd(sd *export.SpanData)

// Shutdown is invoked when SDK shutsdown. Use this call to cleanup any processor
// Shutdown is invoked when SDK shuts down. Use this call to cleanup any processor
// data. No calls to OnStart and OnEnd method is invoked after Shutdown call is
// made. It should not be blocked indefinitely.
Shutdown()
Shutdown(ctx context.Context) error

// ForceFlush exports all ended spans to the configured Exporter that have not yet
// been exported. It should only be called when absolutely necessary, such as when
Expand Down
13 changes: 7 additions & 6 deletions sdk/trace/span_processor_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package trace

import (
"context"
"time"

export "go.opentelemetry.io/otel/sdk/export/trace"
Expand All @@ -33,9 +34,9 @@ type DurationFilter struct {
Max time.Duration
}

func (f DurationFilter) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f DurationFilter) Shutdown() { f.Next.Shutdown() }
func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() }
func (f DurationFilter) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f DurationFilter) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) }
func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() }
func (f DurationFilter) OnEnd(sd *export.SpanData) {
if f.Min > 0 && sd.EndTime.Sub(sd.StartTime) < f.Min {
// Drop short lived spans.
Expand All @@ -59,9 +60,9 @@ type InstrumentationBlacklist struct {
Blacklist map[string]bool
}

func (f InstrumentationBlacklist) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f InstrumentationBlacklist) Shutdown() { f.Next.Shutdown() }
func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() }
func (f InstrumentationBlacklist) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f InstrumentationBlacklist) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) }
func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() }
func (f InstrumentationBlacklist) OnEnd(sd *export.SpanData) {
if f.Blacklist != nil && f.Blacklist[sd.InstrumentationLibrary.Name] {
// Drop spans from this instrumentation
Expand Down
5 changes: 3 additions & 2 deletions sdk/trace/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func (t *testSpanProcesor) OnEnd(s *export.SpanData) {
t.spansEnded = append(t.spansEnded, s)
}

func (t *testSpanProcesor) Shutdown() {
func (t *testSpanProcesor) Shutdown(_ context.Context) error {
t.shutdownCount++
return nil
}

func (t *testSpanProcesor) ForceFlush() {
Expand Down Expand Up @@ -181,7 +182,7 @@ func TestSpanProcessorShutdown(t *testing.T) {
tp.RegisterSpanProcessor(sp)

wantCount := 1
sp.Shutdown()
_ = sp.Shutdown(context.Background())
XSAM marked this conversation as resolved.
Show resolved Hide resolved

gotCount := sp.shutdownCount
if wantCount != gotCount {
Expand Down