Skip to content

Commit

Permalink
1232: update SpanProcessor Shutdown with context and error
Browse files Browse the repository at this point in the history
  • Loading branch information
ilyakaznacheev committed Oct 18, 2020
1 parent d6dd84f commit 4e58bb8
Show file tree
Hide file tree
Showing 11 changed files with 70 additions and 20 deletions.
5 changes: 3 additions & 2 deletions example/basic/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ func main() {
log.Fatalf("failed to initialize stdout export pipeline: %v", err)
}

ctx := context.Background()

bsp := sdktrace.NewBatchSpanProcessor(exporter)
defer bsp.Shutdown()
defer bsp.Shutdown(ctx)
tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(bsp))
pusher := push.New(
basic.New(
Expand Down Expand Up @@ -76,7 +78,6 @@ func main() {

valuerecorderTwo := otel.Must(meter).NewFloat64ValueRecorder("ex.com.two")

ctx := context.Background()
ctx = otel.ContextWithBaggageValues(ctx, fooKey.String("foo1"), barKey.String("bar1"))

valuerecorder := valuerecorderTwo.Bind(commonLabels...)
Expand Down
4 changes: 3 additions & 1 deletion example/namedtracer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ func initTracer() func() {
sdktrace.WithSpanProcessor(bsp),
)
global.SetTracerProvider(tp)
return bsp.Shutdown
return func() {
bsp.Shutdown(context.Background())
}
}

func main() {
Expand Down
5 changes: 3 additions & 2 deletions example/otel-collector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ func initProvider() func() {
pusher.Start()

return func() {
bsp.Shutdown() // shutdown the processor
handleErr(exp.Shutdown(context.Background()), "failed to stop exporter")
ctx := context.Background()
bsp.Shutdown(ctx) // shutdown the processor
handleErr(exp.Shutdown(ctx), "failed to stop exporter")
pusher.Stop() // pushes any last exports to the receiver
}
}
Expand Down
18 changes: 15 additions & 3 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,23 @@ func (bsp *BatchSpanProcessor) OnEnd(sd *export.SpanData) {

// Shutdown flushes the queue and waits until all spans are processed.
// It only executes once. Subsequent call does nothing.
func (bsp *BatchSpanProcessor) Shutdown() {
func (bsp *BatchSpanProcessor) Shutdown(ctx context.Context) error {
var err error
bsp.stopOnce.Do(func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
wait := make(chan struct{})
go func() {
close(bsp.stopCh)
bsp.stopWait.Wait()
close(wait)
}()
// Wait until the wait group is done or the context is cancelled
select {
case <-wait:
case <-ctx.Done():
err = ctx.Err()
}
})
return err
}

// ForceFlush exports all ended spans that have not yet been exported.
Expand Down
35 changes: 32 additions & 3 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
bsp.OnStart(&export.SpanData{})
bsp.OnEnd(&export.SpanData{})
bsp.ForceFlush()
bsp.Shutdown()
bsp.Shutdown(context.Background())
}

type testOption struct {
Expand Down Expand Up @@ -222,8 +222,37 @@ func getSpanContext() otel.SpanContext {
func TestBatchSpanProcessorShutdown(t *testing.T) {
bsp := sdktrace.NewBatchSpanProcessor(&testBatchExporter{})

bsp.Shutdown()
bsp.Shutdown(context.Background())

// Multiple call to Shutdown() should not panic.
bsp.Shutdown()
bsp.Shutdown(context.Background())
}

type testSlowExporter struct {
}

func (t *testSlowExporter) ExportSpans(ctx context.Context, _ []*export.SpanData) error {
// log.Fatal("ExportSpans aaaa")
<-ctx.Done()
return ctx.Err()
}

func (t *testSlowExporter) Shutdown(context.Context) error { return nil }

// func TestBatchSpanProcessorShutdownCancel(t *testing.T) {
// sd := export.SpanData{
// SpanContext: otel.SpanContext{TraceFlags: otel.FlagsSampled},
// }
// bsp := sdktrace.NewBatchSpanProcessor(&testSlowExporter{},
// // sdktrace.WithBatchTimeout(0),
// sdktrace.WithMaxExportBatchSize(1),
// sdktrace.WithMaxQueueSize(1),
// sdktrace.WithBlocking(),
// )
// // These should not panic.
// bsp.OnStart(&sd)
// bsp.OnEnd(&sd)
// bsp.ForceFlush()
// ctx, cancel := context.WithCancel(context.Background())
// bsp.Shutdown(ctx)
// }
3 changes: 2 additions & 1 deletion sdk/trace/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package trace

import (
"context"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -141,7 +142,7 @@ func (p *TracerProvider) UnregisterSpanProcessor(s SpanProcessor) {
}
if stopOnce != nil {
stopOnce.state.Do(func() {
s.Shutdown()
s.Shutdown(context.Background())
})
}
if len(new) > 1 {
Expand Down
3 changes: 2 additions & 1 deletion sdk/trace/simple_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func (ssp *SimpleSpanProcessor) OnEnd(sd *export.SpanData) {
}

// Shutdown method does nothing. There is no data to cleanup.
func (ssp *SimpleSpanProcessor) Shutdown() {
func (ssp *SimpleSpanProcessor) Shutdown(_ context.Context) error {
return nil
}

// ForceFlush does nothing as there is no data to flush.
Expand Down
2 changes: 1 addition & 1 deletion sdk/trace/simple_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,5 +84,5 @@ func TestSimpleSpanProcessorShutdown(t *testing.T) {
t.Errorf("Error creating new instance of SimpleSpanProcessor\n")
}

ssp.Shutdown()
ssp.Shutdown(context.Background())
}
5 changes: 3 additions & 2 deletions sdk/trace/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package trace

import (
"context"
"sync"

export "go.opentelemetry.io/otel/sdk/export/trace"
Expand All @@ -31,10 +32,10 @@ type SpanProcessor interface {
// and hence should not block.
OnEnd(sd *export.SpanData)

// Shutdown is invoked when SDK shutsdown. Use this call to cleanup any processor
// Shutdown is invoked when SDK shuts down. Use this call to cleanup any processor
// data. No calls to OnStart and OnEnd method is invoked after Shutdown call is
// made. It should not be blocked indefinitely.
Shutdown()
Shutdown(ctx context.Context) error

// ForceFlush exports all ended spans to the configured Exporter that have not yet
// been exported. It should only be called when absolutely necessary, such as when
Expand Down
5 changes: 3 additions & 2 deletions sdk/trace/span_processor_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package trace

import (
"context"
"time"

export "go.opentelemetry.io/otel/sdk/export/trace"
Expand All @@ -34,7 +35,7 @@ type DurationFilter struct {
}

func (f DurationFilter) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f DurationFilter) Shutdown() { f.Next.Shutdown() }
func (f DurationFilter) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) }
func (f DurationFilter) ForceFlush() { f.Next.ForceFlush() }
func (f DurationFilter) OnEnd(sd *export.SpanData) {
if f.Min > 0 && sd.EndTime.Sub(sd.StartTime) < f.Min {
Expand All @@ -60,7 +61,7 @@ type InstrumentationBlacklist struct {
}

func (f InstrumentationBlacklist) OnStart(sd *export.SpanData) { f.Next.OnStart(sd) }
func (f InstrumentationBlacklist) Shutdown() { f.Next.Shutdown() }
func (f InstrumentationBlacklist) Shutdown(ctx context.Context) error { return f.Next.Shutdown(ctx) }
func (f InstrumentationBlacklist) ForceFlush() { f.Next.ForceFlush() }
func (f InstrumentationBlacklist) OnEnd(sd *export.SpanData) {
if f.Blacklist != nil && f.Blacklist[sd.InstrumentationLibrary.Name] {
Expand Down
5 changes: 3 additions & 2 deletions sdk/trace/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ func (t *testSpanProcesor) OnEnd(s *export.SpanData) {
t.spansEnded = append(t.spansEnded, s)
}

func (t *testSpanProcesor) Shutdown() {
func (t *testSpanProcesor) Shutdown(_ context.Context) error {
t.shutdownCount++
return nil
}

func (t *testSpanProcesor) ForceFlush() {
Expand Down Expand Up @@ -181,7 +182,7 @@ func TestSpanProcessorShutdown(t *testing.T) {
tp.RegisterSpanProcessor(sp)

wantCount := 1
sp.Shutdown()
sp.Shutdown(nil)

gotCount := sp.shutdownCount
if wantCount != gotCount {
Expand Down

0 comments on commit 4e58bb8

Please sign in to comment.