Skip to content

Commit

Permalink
Merge branch 'main' into makefile-examples
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias authored Mar 8, 2021
2 parents a507932 + a3aa9fd commit 9013f15
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 54 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## Added

- Added `Marshler` config option to `otlphttp` to enable otlp over json or protobufs. (#1586)
- A `ForceFlush` method to the `"go.opentelemetry.io/otel/sdk/trace".TracerProvider` to flush all registered `SpanProcessor`s. (#1608)


### Changed

- Update the `ForceFlush` method signature to the `"go.opentelemetry.io/otel/sdk/trace".SpanProcessor` to accept a `context.Context` and return an error. (#1608)
- Update the `Shutdown` method to the `"go.opentelemetry.io/otel/sdk/trace".TracerProvider` return an error on shutdown failure. (#1608)
- The SimpleSpanProcessor will now shut down the enclosed `SpanExporter` and gracefully ignore subsequent calls to `OnEnd` after `Shutdown` is called. (#1612)
- `"go.opentelemetry.io/sdk/metric/controller.basic".WithPusher` is replaced with `WithExporter` to provide consistent naming across project. (#1656)
- Added non-empty string check for trace `Attribute` keys. (#1659)
Expand All @@ -23,6 +27,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Removed the exported `SimpleSpanProcessor` and `BatchSpanProcessor` structs.
These are now returned as a SpanProcessor interface from their respective constructors. (#1638)
- Removed setting status to `Error` while recording an error as a span event in `RecordError`. (#1663)


### Fixed

Expand Down
1 change: 1 addition & 0 deletions example/zipkin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
go.opentelemetry.io/otel v0.18.0
go.opentelemetry.io/otel/exporters/trace/zipkin v0.18.0
go.opentelemetry.io/otel/sdk v0.18.0
go.opentelemetry.io/otel/trace v0.18.0
)

replace go.opentelemetry.io/otel/bridge/opencensus => ../../bridge/opencensus
Expand Down
22 changes: 15 additions & 7 deletions example/zipkin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,19 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/trace/zipkin"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
)

var logger = log.New(os.Stderr, "zipkin-example", log.Ldate|log.Ltime|log.Llongfile)

// initTracer creates a new trace provider instance and registers it as global trace provider.
func initTracer(url string) {
func initTracer(url string) func() {
// Create Zipkin Exporter and install it as a global tracer.
//
// For demoing purposes, always sample. In a production application, you should
// configure the sampler to a trace.ParentBased(trace.TraceIDRatioBased) set at the desired
// ratio.
err := zipkin.InstallNewPipeline(
exporter, err := zipkin.NewRawExporter(
url,
"zipkin-test",
zipkin.WithLogger(logger),
Expand All @@ -46,25 +47,32 @@ func initTracer(url string) {
if err != nil {
log.Fatal(err)
}

batcher := sdktrace.NewBatchSpanProcessor(exporter)

tp := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(batcher))
otel.SetTracerProvider(tp)

return func() {
_ = tp.Shutdown(context.Background())
}
}

func main() {
url := flag.String("zipkin", "http://localhost:9411/api/v2/spans", "zipkin url")
flag.Parse()

initTracer(*url)
shutdown := initTracer(*url)
defer shutdown()

ctx := context.Background()

tr := otel.GetTracerProvider().Tracer("component-main")
ctx, span := tr.Start(ctx, "foo")
ctx, span := tr.Start(ctx, "foo", trace.WithSpanKind(trace.SpanKindServer))
<-time.After(6 * time.Millisecond)
bar(ctx)
<-time.After(6 * time.Millisecond)
span.End()

// Wait for the spans to be exported.
<-time.After(5 * time.Second)
}

func bar(ctx context.Context) {
Expand Down
2 changes: 1 addition & 1 deletion internal/tools/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/client9/misspell v0.3.4
github.com/gogo/protobuf v1.3.2
github.com/golangci/golangci-lint v1.37.1
github.com/itchyny/gojq v0.12.1
github.com/itchyny/gojq v0.12.2
golang.org/x/tools v0.1.0
)

Expand Down
14 changes: 6 additions & 8 deletions internal/tools/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,11 @@ github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/J
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/itchyny/astgen-go v0.0.0-20210113000433-0da0671862a3 h1:l7vogWrq+zj8v5t/G69/eT13nAGs2H7cq+CI2nlnKdk=
github.com/itchyny/astgen-go v0.0.0-20210113000433-0da0671862a3/go.mod h1:296z3W7Xsrp2mlIY88ruDKscuvrkL6zXCNRtaYVshzw=
github.com/itchyny/go-flags v1.5.0/go.mod h1:lenkYuCobuxLBAd/HGFE4LRoW8D3B6iXRQfWYJ+MNbA=
github.com/itchyny/gojq v0.12.1 h1:pQJrG8LXgEbZe9hvpfjKg7UlBfieQQydIw3YQq+7WIA=
github.com/itchyny/gojq v0.12.1/go.mod h1:Y5Lz0qoT54ii+ucY/K3yNDy19qzxZvWNBMBpKUDQR/4=
github.com/itchyny/timefmt-go v0.1.1 h1:rLpnm9xxb39PEEVzO0n4IRp0q6/RmBc7Dy/rE4HrA0U=
github.com/itchyny/timefmt-go v0.1.1/go.mod h1:0osSSCQSASBJMsIZnhAaF1C2fCBTJZXrnj37mG8/c+A=
github.com/itchyny/gojq v0.12.2 h1:TxhFjk1w7Vnb0SwQPeG4FxTC98O4Es+x/mPaD5Azgfs=
github.com/itchyny/gojq v0.12.2/go.mod h1:mi4PdXSlFllHyByM68JKUrbiArtEdEnNEmjbwxcQKAg=
github.com/itchyny/timefmt-go v0.1.2 h1:q0Xa4P5it6K6D7ISsbLAMwx1PnWlixDcJL6/sFs93Hs=
github.com/itchyny/timefmt-go v0.1.2/go.mod h1:0osSSCQSASBJMsIZnhAaF1C2fCBTJZXrnj37mG8/c+A=
github.com/jgautheron/goconst v0.0.0-20201117150253-ccae5bf973f3 h1:7nkB9fLPMwtn/R6qfPcHileL/x9ydlhw8XyDrLI1ZXg=
github.com/jgautheron/goconst v0.0.0-20201117150253-ccae5bf973f3/go.mod h1:aAosetZ5zaeC/2EfMeRswtxUFBpe2Hr7HzkgX4fanO4=
github.com/jingyugao/rowserrcheck v0.0.0-20210130005344-c6a0c12dd98d h1:BYDZtm80MLJpTWalkwHxNnIbO/2akQHERcfLq4TbIWE=
Expand Down Expand Up @@ -540,9 +538,9 @@ golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200602225109-6fdc65e7d980/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210113181707-4bcb84eeeb78/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210301091718-77cc2087c03b h1:kHlr0tATeLRMEiZJu5CknOw/E8V6h69sXXQFGoPtjcc=
golang.org/x/sys v0.0.0-20210301091718-77cc2087c03b/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
Expand Down
31 changes: 22 additions & 9 deletions sdk/trace/batch_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
}

// ForceFlush exports all ended spans that have not yet been exported.
func (bsp *batchSpanProcessor) ForceFlush() {
bsp.exportSpans()
func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
return bsp.exportSpans(ctx)
}

func WithMaxQueueSize(size int) BatchSpanProcessorOption {
Expand Down Expand Up @@ -176,18 +176,19 @@ func WithBlocking() BatchSpanProcessorOption {
}

// exportSpans is a subroutine of processing and draining the queue.
func (bsp *batchSpanProcessor) exportSpans() {
func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
bsp.timer.Reset(bsp.o.BatchTimeout)

bsp.batchMutex.Lock()
defer bsp.batchMutex.Unlock()

if len(bsp.batch) > 0 {
if err := bsp.e.ExportSpans(context.Background(), bsp.batch); err != nil {
otel.Handle(err)
if err := bsp.e.ExportSpans(ctx, bsp.batch); err != nil {
return err
}
bsp.batch = bsp.batch[:0]
}
return nil
}

// processQueue removes spans from the `queue` channel until processor
Expand All @@ -196,12 +197,16 @@ func (bsp *batchSpanProcessor) exportSpans() {
func (bsp *batchSpanProcessor) processQueue() {
defer bsp.timer.Stop()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
select {
case <-bsp.stopCh:
return
case <-bsp.timer.C:
bsp.exportSpans()
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
case sd := <-bsp.queue:
bsp.batchMutex.Lock()
bsp.batch = append(bsp.batch, sd)
Expand All @@ -211,7 +216,9 @@ func (bsp *batchSpanProcessor) processQueue() {
if !bsp.timer.Stop() {
<-bsp.timer.C
}
bsp.exportSpans()
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
}
}
}
Expand All @@ -220,11 +227,15 @@ func (bsp *batchSpanProcessor) processQueue() {
// drainQueue awaits the any caller that had added to bsp.stopWait
// to finish the enqueue, then exports the final batch.
func (bsp *batchSpanProcessor) drainQueue() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
select {
case sd := <-bsp.queue:
if sd == nil {
bsp.exportSpans()
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
return
}

Expand All @@ -234,7 +245,9 @@ func (bsp *batchSpanProcessor) drainQueue() {
bsp.batchMutex.Unlock()

if shouldExport {
bsp.exportSpans()
if err := bsp.exportSpans(ctx); err != nil {
otel.Handle(err)
}
}
default:
close(bsp.queue)
Expand Down
9 changes: 5 additions & 4 deletions sdk/trace/batch_span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,11 @@ func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
// These should not panic.
bsp.OnStart(context.Background(), span.(sdktrace.ReadWriteSpan))
bsp.OnEnd(span.(sdktrace.ReadOnlySpan))
bsp.ForceFlush()
err := bsp.Shutdown(context.Background())
if err != nil {
t.Error("Error shutting the BatchSpanProcessor down\n")
if err := bsp.ForceFlush(context.Background()); err != nil {
t.Errorf("failed to ForceFlush the BatchSpanProcessor: %v", err)
}
if err := bsp.Shutdown(context.Background()); err != nil {
t.Errorf("failed to Shutdown the BatchSpanProcessor: %v", err)
}
}

Expand Down
47 changes: 42 additions & 5 deletions sdk/trace/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package trace // import "go.opentelemetry.io/otel/sdk/trace"

import (
"context"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -201,19 +202,55 @@ func (p *TracerProvider) ApplyConfig(cfg Config) {
p.config.Store(&c)
}

// Shutdown shuts down the span processors in the order they were registered
// ForceFlush immediately exports all spans that have not yet been exported for
// all the registered span processors.
func (p *TracerProvider) ForceFlush(ctx context.Context) error {
spss, ok := p.spanProcessors.Load().(spanProcessorStates)
if !ok {
return fmt.Errorf("failed to load span processors")
}
if len(spss) == 0 {
return nil
}

for _, sps := range spss {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

if err := sps.sp.ForceFlush(ctx); err != nil {
return err
}
}
return nil
}

// Shutdown shuts down the span processors in the order they were registered.
func (p *TracerProvider) Shutdown(ctx context.Context) error {
spss, ok := p.spanProcessors.Load().(spanProcessorStates)
if !ok || len(spss) == 0 {
if !ok {
return fmt.Errorf("failed to load span processors")
}
if len(spss) == 0 {
return nil
}

for _, sps := range spss {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

var err error
sps.state.Do(func() {
if err := sps.sp.Shutdown(ctx); err != nil {
otel.Handle(err)
}
err = sps.sp.Shutdown(ctx)
})
if err != nil {
return err
}
}
return nil
}
Expand Down
20 changes: 10 additions & 10 deletions sdk/trace/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ func (t *basicSpanProcesor) Shutdown(context.Context) error {
return t.injectShutdownError
}

func (t *basicSpanProcesor) OnStart(parent context.Context, s ReadWriteSpan) {}
func (t *basicSpanProcesor) OnEnd(s ReadOnlySpan) {}
func (t *basicSpanProcesor) ForceFlush() {}
func (t *basicSpanProcesor) OnStart(context.Context, ReadWriteSpan) {}
func (t *basicSpanProcesor) OnEnd(ReadOnlySpan) {}
func (t *basicSpanProcesor) ForceFlush(context.Context) error {
return nil
}

func TestShutdownTraceProvider(t *testing.T) {
stp := NewTracerProvider()
Expand All @@ -51,7 +53,6 @@ func TestShutdownTraceProvider(t *testing.T) {
}

func TestFailedProcessorShutdown(t *testing.T) {
handler.Reset()
stp := NewTracerProvider()
spErr := errors.New("basic span processor shutdown failure")
sp := &basicSpanProcesor{
Expand All @@ -60,9 +61,9 @@ func TestFailedProcessorShutdown(t *testing.T) {
}
stp.RegisterSpanProcessor(sp)

_ = stp.Shutdown(context.Background())

assert.Contains(t, handler.errs, spErr)
err := stp.Shutdown(context.Background())
assert.Error(t, err)
assert.Equal(t, err, spErr)
}

func TestFailedProcessorShutdownInUnregister(t *testing.T) {
Expand All @@ -78,7 +79,6 @@ func TestFailedProcessorShutdownInUnregister(t *testing.T) {

assert.Contains(t, handler.errs, spErr)

handler.errs = nil
_ = stp.Shutdown(context.Background())
assert.Empty(t, handler.errs)
err := stp.Shutdown(context.Background())
assert.NoError(t, err)
}
3 changes: 2 additions & 1 deletion sdk/trace/simple_span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,6 @@ func (ssp *simpleSpanProcessor) Shutdown(ctx context.Context) error {
}

// ForceFlush does nothing as there is no data to flush.
func (ssp *simpleSpanProcessor) ForceFlush() {
func (ssp *simpleSpanProcessor) ForceFlush(context.Context) error {
return nil
}
7 changes: 4 additions & 3 deletions sdk/trace/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,14 +258,15 @@ func (s *span) End(options ...trace.SpanOption) {
}
}

// RecordError will record err as a span event for this span. If this span is
// not being recorded or err is nil than this method does nothing.
// RecordError will record err as a span event for this span. An additional call to
// SetStatus is required if the Status of the Span should be set to Error, this method
// does not change the Span status. If this span is not being recorded or err is nil
// than this method does nothing.
func (s *span) RecordError(err error, opts ...trace.EventOption) {
if s == nil || err == nil || !s.IsRecording() {
return
}

s.SetStatus(codes.Error, "")
opts = append(opts, trace.WithAttributes(
errorTypeKey.String(typeStr(err)),
errorMessageKey.String(err.Error()),
Expand Down
2 changes: 1 addition & 1 deletion sdk/trace/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type SpanProcessor interface {
// been exported. It should only be called when absolutely necessary, such as when
// using a FaaS provider that may suspend the process after an invocation, but before
// the Processor can export the completed spans.
ForceFlush()
ForceFlush(ctx context.Context) error
}

type spanProcessorState struct {
Expand Down
Loading

0 comments on commit 9013f15

Please sign in to comment.