Skip to content

Commit

Permalink
Update SpanProcessor Shutdown with context and error (#1264)
Browse files Browse the repository at this point in the history
* 1232: update SpanProcessor Shutdown with context and error

* 1232: add changelog info

* 1232: fix CI error, rm commented code

* 1232: fix CI unhandled error

* 1232: Done commit properly

* Add shutdown error handling

* Merge branch 'master' into update-span-processor

* Revert now unneeded context declaration move

Co-authored-by: Tyler Yahn <[email protected]>
Co-authored-by: Tyler Yahn <[email protected]>
  • Loading branch information
3 people authored Oct 27, 2020
1 parent 412ee70 commit a6b31e0
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
They no longer track the gRPC codes. (#1214)
- The `StatusCode` field of the `SpanData` struct in the `go.opentelemetry.io/otel/sdk/export/trace` package now uses the codes package from this package instead of the gRPC project. (#1214)
- Move the `go.opentelemetry.io/otel/api/baggage` package into `go.opentelemetry.io/otel/propagators`. (#1217)
- A `Shutdown` method of `SpanProcessor` and all its implementations receives a context and returns an error. (#1264)

### Fixed

Expand Down
5 changes: 3 additions & 2 deletions example/otel-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ func initProvider() func() {
pusher.Start()

return func() {
handleErr(tracerProvider.Shutdown(context.Background()), "failed to shutdown provider")
handleErr(exp.Shutdown(context.Background()), "failed to stop exporter")
ctx := context.Background()
handleErr(tracerProvider.Shutdown(ctx), "failed to shutdown provider")
handleErr(exp.Shutdown(ctx), "failed to stop exporter")
pusher.Stop() // pushes any last exports to the receiver
}
}
Expand Down
18 changes: 15 additions & 3 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,23 @@ func (bsp *BatchSpanProcessor) OnEnd(sd *export.SpanData) {

// Shutdown flushes the queue and waits until all spans are processed.
// It only executes once. Subsequent call does nothing.
func (bsp *BatchSpanProcessor) Shutdown() {
func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error {
var err error
bsp.stopOnce.Do(func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
wait := make(chan struct{})
go func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
close(wait)
}()
// Wait until the wait group is done or the context is cancelled
select {
case <-wait:
case <-ctx.Done():
err = ctx.Err()
}
})
return err
}

// ForceFlush exports all ended spans that have not yet been exported.
Expand Down
15 changes: 12 additions & 3 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
bsp.OnStart(&export.SpanData{})
bsp.OnEnd(&export.SpanData{})
bsp.ForceFlush()
bsp.Shutdown()
err := bsp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the BatchSpanProcessor down\n")
}
}

type testOption struct {
Expand Down Expand Up @@ -222,8 +225,14 @@ func getSpanContext() otel.SpanContext {
func TestBatchSpanProcessorShutdown(t *testing.T) {
bsp := sdktrace.NewBatchSpanProcessor(&testBatchExporter{})

bsp.Shutdown()
err := bsp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the BatchSpanProcessor down\n")
}

// Multiple call to Shutdown() should not panic.
bsp.Shutdown()
err = bsp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the BatchSpanProcessor down\n")
}
}
5 changes: 3 additions & 2 deletions sdk/trace/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync/atomic"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/global"
export "go.opentelemetry.io/otel/sdk/export/trace"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/resource"
Expand Down Expand Up @@ -142,7 +143,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) {
}
if stopOnce != nil {
stopOnce.state.Do(func() {
s.Shutdown()
global.Handle(s.Shutdown(context.Background()))
})
}
if len(new) > 1 {
Expand Down Expand Up @@ -190,7 +191,7 @@ func (p *TracerProvider) Shutdown(ctx context.Context) error {

for _, sps := range spss {
sps.state.Do(func() {
sps.sp.Shutdown()
global.Handle(sps.sp.Shutdown(ctx))
})
}
return nil
Expand Down
3 changes: 2 additions & 1 deletion sdk/trace/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ type basicSpanProcesor struct {
running bool
}

func (t *basicSpanProcesor) Shutdown() {
func (t *basicSpanProcesor) Shutdown(context.Context) error {
t.running = false
return nil
}

func (t *basicSpanProcesor) OnStart(s *export.SpanData) {}
Expand Down
3 changes: 2 additions & 1 deletion sdk/trace/simple_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func (ssp *SimpleSpanProcessor) OnEnd(sd *export.SpanData) {
}

// Shutdown method does nothing. There is no data to cleanup.
func (ssp *SimpleSpanProcessor) Shutdown() {
func (ssp *SimpleSpanProcessor) Shutdown(_ context.Context) error {
return nil
}

// ForceFlush does nothing as there is no data to flush.
Expand Down
6 changes: 5 additions & 1 deletion sdk/trace/simple_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ func TestSimpleSpanProcessorShutdown(t *testing.T) {
ssp := sdktrace.NewSimpleSpanProcessor(&testExporter{})
if ssp == nil {
t.Errorf("Error creating new instance of SimpleSpanProcessor\n")
return
}

ssp.Shutdown()
err := ssp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the SimpleSpanProcessor down\n")
}
}
5 changes: 3 additions & 2 deletions sdk/trace/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package trace

import (
"context"
"sync"

export "go.opentelemetry.io/otel/sdk/export/trace"
Expand All @@ -31,10 +32,10 @@ type SpanProcessor interface {
// and hence should not block.
OnEnd(sd *export.SpanData)

// Shutdown is invoked when SDK shutsdown. Use this call to cleanup any processor
// Shutdown is invoked when SDK shuts down. Use this call to cleanup any processor
// data. No calls to OnStart and OnEnd method is invoked after Shutdown call is
// made. It should not be blocked indefinitely.
Shutdown()
Shutdown(ctx context.Context) error

// ForceFlush exports all ended spans to the configured Exporter that have not yet
// been exported. It should only be called when absolutely necessary, such as when
Expand Down
13 changes: 7 additions & 6 deletions sdk/trace/span_processor_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package trace

import (
"context"
"time"

export "go.opentelemetry.io/otel/sdk/export/trace"
Expand All @@ -33,9 +34,9 @@ type DurationFilter struct {
Max time.Duration
}

func (f DurationFilter) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f DurationFilter) Shutdown() { f.Next.Shutdown() }
func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() }
func (f DurationFilter) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f DurationFilter) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) }
func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() }
func (f DurationFilter) OnEnd(sd *export.SpanData) {
if f.Min > 0 && sd.EndTime.Sub(sd.StartTime) < f.Min {
// Drop short lived spans.
Expand All @@ -59,9 +60,9 @@ type InstrumentationBlacklist struct {
Blacklist map[string]bool
}

func (f InstrumentationBlacklist) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f InstrumentationBlacklist) Shutdown() { f.Next.Shutdown() }
func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() }
func (f InstrumentationBlacklist) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f InstrumentationBlacklist) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) }
func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() }
func (f InstrumentationBlacklist) OnEnd(sd *export.SpanData) {
if f.Blacklist != nil && f.Blacklist[sd.InstrumentationLibrary.Name] {
// Drop spans from this instrumentation
Expand Down
24 changes: 14 additions & 10 deletions sdk/trace/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import (
export "go.opentelemetry.io/otel/sdk/export/trace"
)

type testSpanProcesor struct {
type testSpanProcessor struct {
name string
spansStarted []*export.SpanData
spansEnded []*export.SpanData
shutdownCount int
}

func (t *testSpanProcesor) OnStart(s *export.SpanData) {
func (t *testSpanProcessor) OnStart(s *export.SpanData) {
kv := label.KeyValue{
Key: "OnStart",
Value: label.StringValue(t.name),
Expand All @@ -38,7 +38,7 @@ func (t *testSpanProcesor) OnStart(s *export.SpanData) {
t.spansStarted = append(t.spansStarted, s)
}

func (t *testSpanProcesor) OnEnd(s *export.SpanData) {
func (t *testSpanProcessor) OnEnd(s *export.SpanData) {
kv := label.KeyValue{
Key: "OnEnd",
Value: label.StringValue(t.name),
Expand All @@ -47,11 +47,12 @@ func (t *testSpanProcesor) OnEnd(s *export.SpanData) {
t.spansEnded = append(t.spansEnded, s)
}

func (t *testSpanProcesor) Shutdown() {
func (t *testSpanProcessor) Shutdown(_ context.Context) error {
t.shutdownCount++
return nil
}

func (t *testSpanProcesor) ForceFlush() {
func (t *testSpanProcessor) ForceFlush() {
}

func TestRegisterSpanProcessort(t *testing.T) {
Expand Down Expand Up @@ -181,7 +182,10 @@ func TestSpanProcessorShutdown(t *testing.T) {
tp.RegisterSpanProcessor(sp)

wantCount := 1
sp.Shutdown()
err := sp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the testSpanProcessor down\n")
}

gotCount := sp.shutdownCount
if wantCount != gotCount {
Expand Down Expand Up @@ -216,12 +220,12 @@ func TestMultipleUnregisterSpanProcessorCalls(t *testing.T) {
}
}

func NewTestSpanProcessor(name string) *testSpanProcesor {
return &testSpanProcesor{name: name}
func NewTestSpanProcessor(name string) *testSpanProcessor {
return &testSpanProcessor{name: name}
}

func NewNamedTestSpanProcessors(names []string) []*testSpanProcesor {
tsp := []*testSpanProcesor{}
func NewNamedTestSpanProcessors(names []string) []*testSpanProcessor {
tsp := []*testSpanProcessor{}
for _, n := range names {
tsp = append(tsp, NewTestSpanProcessor(n))
}
Expand Down

0 comments on commit a6b31e0

Please sign in to comment.