Skip to content

Commit

Permalink
[service] Move TracerProvider initialization to service/telemetry pac…
Browse files Browse the repository at this point in the history
…kage (#9384)

Second redo of
#8171 that
does not depend on
#9131

Link to tracking Issue: Updates
#8170

---------

Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
mx-psi and Alex Boten authored Jan 30, 2024
1 parent 4087433 commit 07a00ff
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 64 deletions.
4 changes: 2 additions & 2 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
collectorConf: set.CollectorConf,
}
var err error
srv.telemetry, err = telemetry.New(ctx, telemetry.Settings{ZapOptions: set.LoggingOptions}, cfg.Telemetry)
srv.telemetry, err = telemetry.New(ctx, telemetry.Settings{BuildInfo: set.BuildInfo, ZapOptions: set.LoggingOptions}, cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}
Expand All @@ -104,7 +104,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
}
srv.telemetrySettings = servicetelemetry.TelemetrySettings{
Logger: logger,
TracerProvider: srv.telemetryInitializer.tp,
TracerProvider: srv.telemetry.TracerProvider(),
MeterProvider: srv.telemetryInitializer.mp,
MetricsLevel: cfg.Telemetry.Metrics.Level,
// Construct telemetry attributes from build info and config's resource attributes.
Expand Down
57 changes: 0 additions & 57 deletions service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,17 @@ package service // import "go.opentelemetry.io/collector/service"

import (
"context"
"errors"
"net"
"net/http"
"strconv"

ocmetric "go.opencensus.io/metric"
"go.opencensus.io/metric/metricproducer"
"go.opentelemetry.io/contrib/config"
"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
noopmetric "go.opentelemetry.io/otel/metric/noop"
"go.opentelemetry.io/otel/propagation"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
nooptrace "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/multierr"
"go.uber.org/zap"

Expand All @@ -34,20 +27,11 @@ import (
const (
zapKeyTelemetryAddress = "address"
zapKeyTelemetryLevel = "level"

// supported trace propagators
traceContextPropagator = "tracecontext"
b3Propagator = "b3"
)

var (
errUnsupportedPropagator = errors.New("unsupported trace propagator")
)

type telemetryInitializer struct {
ocRegistry *ocmetric.Registry
mp metric.MeterProvider
tp trace.TracerProvider
servers []*http.Server

disableHighCardinality bool
Expand All @@ -57,7 +41,6 @@ type telemetryInitializer struct {
func newColTelemetry(disableHighCardinality bool, extendedConfig bool) *telemetryInitializer {
return &telemetryInitializer{
mp: noopmetric.NewMeterProvider(),
tp: nooptrace.NewTracerProvider(),
disableHighCardinality: disableHighCardinality,
extendedConfig: extendedConfig,
}
Expand All @@ -74,34 +57,9 @@ func (tel *telemetryInitializer) init(res *resource.Resource, logger *zap.Logger
}

logger.Info("Setting up own telemetry...")

if tp, err := tel.initTraces(res, cfg); err == nil {
tel.tp = tp
} else {
return err
}

if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil {
otel.SetTextMapPropagator(tp)
} else {
return err
}

return tel.initMetrics(res, logger, cfg, asyncErrorChannel)
}

func (tel *telemetryInitializer) initTraces(res *resource.Resource, cfg telemetry.Config) (trace.TracerProvider, error) {
opts := []sdktrace.TracerProviderOption{}
for _, processor := range cfg.Traces.Processors {
sp, err := proctelemetry.InitSpanProcessor(context.Background(), processor)
if err != nil {
return nil, err
}
opts = append(opts, sdktrace.WithSpanProcessor(sp))
}
return proctelemetry.InitTracerProvider(res, opts)
}

func (tel *telemetryInitializer) initMetrics(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error {
// Initialize the ocRegistry, still used by the process metrics.
tel.ocRegistry = ocmetric.NewRegistry()
Expand Down Expand Up @@ -171,18 +129,3 @@ func (tel *telemetryInitializer) shutdown() error {
}
return errs
}

func textMapPropagatorFromConfig(props []string) (propagation.TextMapPropagator, error) {
var textMapPropagators []propagation.TextMapPropagator
for _, prop := range props {
switch prop {
case traceContextPropagator:
textMapPropagators = append(textMapPropagators, propagation.TraceContext{})
case b3Propagator:
textMapPropagators = append(textMapPropagators, b3.New())
default:
return nil, errUnsupportedPropagator
}
}
return propagation.NewCompositeTextMapPropagator(textMapPropagators...), nil
}
71 changes: 66 additions & 5 deletions service/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,30 @@ package telemetry // import "go.opentelemetry.io/collector/service/telemetry"

import (
"context"
"errors"

"go.opentelemetry.io/contrib/propagators/b3"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/service/internal/proctelemetry"
"go.opentelemetry.io/collector/service/internal/resource"
)

const (
// supported trace propagators
traceContextPropagator = "tracecontext"
b3Propagator = "b3"
)

var (
errUnsupportedPropagator = errors.New("unsupported trace propagator")
)

type Telemetry struct {
Expand All @@ -35,25 +53,68 @@ func (t *Telemetry) Shutdown(ctx context.Context) error {

// Settings holds configuration for building Telemetry.
type Settings struct {
BuildInfo component.BuildInfo
ZapOptions []zap.Option
}

// New creates a new Telemetry from Config.
func New(_ context.Context, set Settings, cfg Config) (*Telemetry, error) {
func New(ctx context.Context, set Settings, cfg Config) (*Telemetry, error) {
logger, err := newLogger(cfg.Logs, set.ZapOptions)
if err != nil {
return nil, err
}
tp := sdktrace.NewTracerProvider(
// needed for supporting the zpages extension
sdktrace.WithSampler(alwaysRecord()),
)

tp, err := newTracerProvider(ctx, set, cfg)
if err != nil {
return nil, err
}

return &Telemetry{
logger: logger,
tracerProvider: tp,
}, nil
}

func newTracerProvider(ctx context.Context, set Settings, cfg Config) (*sdktrace.TracerProvider, error) {
opts := []sdktrace.TracerProviderOption{sdktrace.WithSampler(alwaysRecord())}

This comment has been minimized.

Copy link
@tdx

tdx Mar 11, 2024

This opt will override sampler configured from ENV.

for _, processor := range cfg.Traces.Processors {
sp, err := proctelemetry.InitSpanProcessor(ctx, processor)
if err != nil {
return nil, err
}
opts = append(opts, sdktrace.WithSpanProcessor(sp))
}

res := resource.New(set.BuildInfo, cfg.Resource)
tp, err := proctelemetry.InitTracerProvider(res, opts)
if err != nil {
return nil, err
}

if tp, err := textMapPropagatorFromConfig(cfg.Traces.Propagators); err == nil {
otel.SetTextMapPropagator(tp)
} else {
return nil, err
}

return tp, nil
}

func textMapPropagatorFromConfig(props []string) (propagation.TextMapPropagator, error) {
var textMapPropagators []propagation.TextMapPropagator
for _, prop := range props {
switch prop {
case traceContextPropagator:
textMapPropagators = append(textMapPropagators, propagation.TraceContext{})
case b3Propagator:
textMapPropagators = append(textMapPropagators, b3.New())
default:
return nil, errUnsupportedPropagator
}
}
return propagation.NewCompositeTextMapPropagator(textMapPropagators...), nil
}

func newLogger(cfg LogsConfig, options []zap.Option) (*zap.Logger, error) {
// Copied from NewProductionConfig.
zapCfg := &zap.Config{
Expand Down

0 comments on commit 07a00ff

Please sign in to comment.