diff --git a/CHANGELOG.md b/CHANGELOG.md index 654251221c1..5e70598495d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed +- The SimpleSpanProcessor will now shut down the enclosed `SpanExporter` and gracefully ignore subsequent calls to `OnEnd` after `Shutdown` is called. (#1612) - Added non-empty string check for trace `Attribute` keys. (#1659) - Add `description` to SpanStatus only when `StatusCode` is set to error. (#1662) diff --git a/sdk/trace/simple_span_processor.go b/sdk/trace/simple_span_processor.go index 78d76b7d35e..88e753f8b41 100644 --- a/sdk/trace/simple_span_processor.go +++ b/sdk/trace/simple_span_processor.go @@ -16,15 +16,18 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace" import ( "context" + "sync" "go.opentelemetry.io/otel" export "go.opentelemetry.io/otel/sdk/export/trace" ) // simpleSpanProcessor is a SpanProcessor that synchronously sends all -// SpanSnapshots to a trace.Exporter when the span finishes. +// completed Spans to a trace.Exporter immediately. type simpleSpanProcessor struct { - e export.SpanExporter + exporterMu sync.RWMutex + exporter export.SpanExporter + stopOnce sync.Once } var _ SpanProcessor = (*simpleSpanProcessor)(nil) @@ -33,28 +36,43 @@ var _ SpanProcessor = (*simpleSpanProcessor)(nil) // send completed spans to the exporter immediately. func NewSimpleSpanProcessor(exporter export.SpanExporter) SpanProcessor { ssp := &simpleSpanProcessor{ - e: exporter, + exporter: exporter, } return ssp } -// OnStart method does nothing. -func (ssp *simpleSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) { -} +// OnStart does nothing. +func (ssp *simpleSpanProcessor) OnStart(context.Context, ReadWriteSpan) {} -// OnEnd method exports a ReadOnlySpan using the associated exporter. +// OnEnd immediately exports a ReadOnlySpan. func (ssp *simpleSpanProcessor) OnEnd(s ReadOnlySpan) { - if ssp.e != nil && s.SpanContext().IsSampled() { + ssp.exporterMu.RLock() + defer ssp.exporterMu.RUnlock() + + if ssp.exporter != nil && s.SpanContext().IsSampled() { ss := s.Snapshot() - if err := ssp.e.ExportSpans(context.Background(), []*export.SpanSnapshot{ss}); err != nil { + if err := ssp.exporter.ExportSpans(context.Background(), []*export.SpanSnapshot{ss}); err != nil { otel.Handle(err) } } } -// Shutdown method does nothing. There is no data to cleanup. -func (ssp *simpleSpanProcessor) Shutdown(_ context.Context) error { - return nil +// Shutdown shuts down the exporter this SimpleSpanProcessor exports to. +func (ssp *simpleSpanProcessor) Shutdown(ctx context.Context) error { + var err error + ssp.stopOnce.Do(func() { + ssp.exporterMu.Lock() + exporter := ssp.exporter + // Set exporter to nil so subsequent calls to OnEnd are ignored + // gracefully. + ssp.exporter = nil + ssp.exporterMu.Unlock() + + // Clear the ssp.exporter prior to shutting it down so if that creates + // a span that needs to be exported there is no deadlock. + err = exporter.Shutdown(ctx) + }) + return err } // ForceFlush does nothing as there is no data to flush. diff --git a/sdk/trace/simple_span_processor_test.go b/sdk/trace/simple_span_processor_test.go index 181dd31790e..a5c8ff0531d 100644 --- a/sdk/trace/simple_span_processor_test.go +++ b/sdk/trace/simple_span_processor_test.go @@ -24,8 +24,14 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" ) +var ( + tid, _ = trace.TraceIDFromHex("01020304050607080102040810203040") + sid, _ = trace.SpanIDFromHex("0102040810203040") +) + type testExporter struct { - spans []*export.SpanSnapshot + spans []*export.SpanSnapshot + shutdown bool } func (t *testExporter) ExportSpans(ctx context.Context, ss []*export.SpanSnapshot) error { @@ -33,36 +39,27 @@ func (t *testExporter) ExportSpans(ctx context.Context, ss []*export.SpanSnapsho return nil } -func (t *testExporter) Shutdown(context.Context) error { return nil } +func (t *testExporter) Shutdown(context.Context) error { + t.shutdown = true + return nil +} var _ export.SpanExporter = (*testExporter)(nil) func TestNewSimpleSpanProcessor(t *testing.T) { - ssp := sdktrace.NewSimpleSpanProcessor(&testExporter{}) - if ssp == nil { - t.Errorf("Error creating new instance of SimpleSpanProcessor\n") + if ssp := sdktrace.NewSimpleSpanProcessor(&testExporter{}); ssp == nil { + t.Error("failed to create new SimpleSpanProcessor") } } func TestNewSimpleSpanProcessorWithNilExporter(t *testing.T) { - ssp := sdktrace.NewSimpleSpanProcessor(nil) - if ssp == nil { - t.Errorf("Error creating new instance of SimpleSpanProcessor with nil Exporter\n") + if ssp := sdktrace.NewSimpleSpanProcessor(nil); ssp == nil { + t.Error("failed to create new SimpleSpanProcessor with nil exporter") } } -func TestSimpleSpanProcessorOnEnd(t *testing.T) { - tp := basicTracerProvider(t) - te := testExporter{} - ssp := sdktrace.NewSimpleSpanProcessor(&te) - if ssp == nil { - t.Errorf("Error creating new instance of SimpleSpanProcessor with nil Exporter\n") - } - - tp.RegisterSpanProcessor(ssp) +func startSpan(tp trace.TracerProvider) trace.Span { tr := tp.Tracer("SimpleSpanProcessor") - tid, _ := trace.TraceIDFromHex("01020304050607080102040810203040") - sid, _ := trace.SpanIDFromHex("0102040810203040") sc := trace.SpanContext{ TraceID: tid, SpanID: sid, @@ -70,7 +67,16 @@ func TestSimpleSpanProcessorOnEnd(t *testing.T) { } ctx := trace.ContextWithRemoteSpanContext(context.Background(), sc) _, span := tr.Start(ctx, "OnEnd") - span.End() + return span +} + +func TestSimpleSpanProcessorOnEnd(t *testing.T) { + tp := basicTracerProvider(t) + te := testExporter{} + ssp := sdktrace.NewSimpleSpanProcessor(&te) + + tp.RegisterSpanProcessor(ssp) + startSpan(tp).End() wantTraceID := tid gotTraceID := te.spans[0].SpanContext.TraceID @@ -80,14 +86,60 @@ func TestSimpleSpanProcessorOnEnd(t *testing.T) { } func TestSimpleSpanProcessorShutdown(t *testing.T) { - ssp := sdktrace.NewSimpleSpanProcessor(&testExporter{}) - if ssp == nil { - t.Errorf("Error creating new instance of SimpleSpanProcessor\n") - return + exporter := &testExporter{} + ssp := sdktrace.NewSimpleSpanProcessor(exporter) + + // Ensure we can export a span before we test we cannot after shutdown. + tp := basicTracerProvider(t) + tp.RegisterSpanProcessor(ssp) + startSpan(tp).End() + nExported := len(exporter.spans) + if nExported != 1 { + t.Error("failed to verify span export") } - err := ssp.Shutdown(context.Background()) - if err != nil { - t.Error("Error shutting the SimpleSpanProcessor down\n") + if err := ssp.Shutdown(context.Background()); err != nil { + t.Errorf("shutting the SimpleSpanProcessor down: %v", err) } + if !exporter.shutdown { + t.Error("SimpleSpanProcessor.Shutdown did not shut down exporter") + } + + startSpan(tp).End() + if len(exporter.spans) > nExported { + t.Error("exported span to shutdown exporter") + } +} + +func TestSimpleSpanProcessorShutdownOnEndConcurrency(t *testing.T) { + exporter := &testExporter{} + ssp := sdktrace.NewSimpleSpanProcessor(exporter) + tp := basicTracerProvider(t) + tp.RegisterSpanProcessor(ssp) + + stop := make(chan struct{}) + done := make(chan struct{}) + go func() { + defer func() { + done <- struct{}{} + }() + for { + select { + case <-stop: + return + default: + startSpan(tp).End() + } + } + }() + + if err := ssp.Shutdown(context.Background()); err != nil { + t.Errorf("shutting the SimpleSpanProcessor down: %v", err) + } + if !exporter.shutdown { + t.Error("SimpleSpanProcessor.Shutdown did not shut down exporter") + } + + stop <- struct{}{} + <-done }