Skip to content

Commit

Permalink
[service] Replace telemetryInitializer by MeterProvider wrapper (#9454)
Browse files Browse the repository at this point in the history
**Description:** 

Refactor meter provider initialization; removes `telemetryInitializer`
and replaces it by a MeterProvider wrapper that owns the lifetime of the
OpenCensus registry and the servers associated with the MeterProvider.

**Link to tracking Issue:** Relates to #4970 (first refactor before
trying out the factory pattern in an internal package)
  • Loading branch information
mx-psi authored Feb 13, 2024
1 parent 586631b commit 069118b
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 84 deletions.
76 changes: 48 additions & 28 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,10 @@ type Settings struct {

// Service represents the implementation of a component.Host.
type Service struct {
buildInfo component.BuildInfo
telemetry *telemetry.Telemetry
telemetrySettings servicetelemetry.TelemetrySettings
host *serviceHost
telemetryInitializer *telemetryInitializer
collectorConf *confmap.Conf
buildInfo component.BuildInfo
telemetrySettings servicetelemetry.TelemetrySettings
host *serviceHost
collectorConf *confmap.Conf
}

func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
Expand All @@ -87,43 +85,48 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) {
buildInfo: set.BuildInfo,
asyncErrorChannel: set.AsyncErrorChannel,
},
telemetryInitializer: newColTelemetry(disableHighCard, extendedConfig),
collectorConf: set.CollectorConf,
collectorConf: set.CollectorConf,
}
var err error
srv.telemetry, err = telemetry.New(ctx, telemetry.Settings{BuildInfo: set.BuildInfo, ZapOptions: set.LoggingOptions}, cfg.Telemetry)
tel, err := telemetry.New(ctx, telemetry.Settings{BuildInfo: set.BuildInfo, ZapOptions: set.LoggingOptions}, cfg.Telemetry)
if err != nil {
return nil, fmt.Errorf("failed to get logger: %w", err)
}
res := resource.New(set.BuildInfo, cfg.Telemetry.Resource)
pcommonRes := pdataFromSdk(res)

logger := srv.telemetry.Logger()
if err = srv.telemetryInitializer.init(res, logger, cfg.Telemetry, set.AsyncErrorChannel); err != nil {
return nil, fmt.Errorf("failed to initialize telemetry: %w", err)
logger := tel.Logger()
mp, err := newMeterProvider(
meterProviderSettings{
res: res,
logger: logger,
cfg: cfg.Telemetry.Metrics,
asyncErrorChannel: set.AsyncErrorChannel,
},
disableHighCard,
extendedConfig,
)
if err != nil {
return nil, fmt.Errorf("failed to create metric provider: %w", err)
}
srv.telemetrySettings = servicetelemetry.TelemetrySettings{
Logger: logger,
TracerProvider: srv.telemetry.TracerProvider(),
MeterProvider: srv.telemetryInitializer.mp,
MeterProvider: mp,
TracerProvider: tel.TracerProvider(),
MetricsLevel: cfg.Telemetry.Metrics.Level,
// Construct telemetry attributes from build info and config's resource attributes.
Resource: pcommonRes,
Status: status.NewReporter(srv.host.notifyComponentStatusChange, func(err error) {
if errors.Is(err, status.ErrStatusNotReady) {
srv.telemetry.Logger().Warn("Invalid transition", zap.Error(err))
logger.Warn("Invalid transition", zap.Error(err))
}
// ignore other errors as they represent invalid state transitions and are considered benign.
}),
}

// process the configuration and initialize the pipeline
if err = srv.initExtensionsAndPipeline(ctx, set, cfg); err != nil {
// If pipeline initialization fails then shut down the telemetry server
if shutdownErr := srv.telemetryInitializer.shutdown(); shutdownErr != nil {
err = multierr.Append(err, fmt.Errorf("failed to shutdown collector telemetry: %w", shutdownErr))
}

// If pipeline initialization fails then shut down telemetry
err = multierr.Append(err, srv.shutdownTelemetry(ctx))
return nil, err
}

Expand Down Expand Up @@ -168,6 +171,28 @@ func (srv *Service) Start(ctx context.Context) error {
return nil
}

func (srv *Service) shutdownTelemetry(ctx context.Context) error {
// The metric.MeterProvider and trace.TracerProvider interfaces do not have a Shutdown method.
// To shutdown the providers we try to cast to this interface, which matches the type signature used in the SDK.
type shutdownable interface {
Shutdown(context.Context) error
}

var err error
if prov, ok := srv.telemetrySettings.MeterProvider.(shutdownable); ok {
if shutdownErr := prov.Shutdown(ctx); shutdownErr != nil {
err = multierr.Append(err, fmt.Errorf("failed to shutdown meter provider: %w", shutdownErr))
}
}

if prov, ok := srv.telemetrySettings.TracerProvider.(shutdownable); ok {
if shutdownErr := prov.Shutdown(ctx); shutdownErr != nil {
err = multierr.Append(err, fmt.Errorf("failed to shutdown tracer provider: %w", shutdownErr))
}
}
return err
}

// Shutdown the service. Shutdown will do the following steps in order:
// 1. Notify extensions that the pipeline is shutting down.
// 2. Shutdown all pipelines.
Expand All @@ -194,13 +219,8 @@ func (srv *Service) Shutdown(ctx context.Context) error {

srv.telemetrySettings.Logger.Info("Shutdown complete.")

if err := srv.telemetry.Shutdown(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown telemetry: %w", err))
}
errs = multierr.Append(errs, srv.shutdownTelemetry(ctx))

if err := srv.telemetryInitializer.shutdown(); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown collector telemetry: %w", err))
}
return errs
}

Expand Down Expand Up @@ -231,7 +251,7 @@ func (srv *Service) initExtensionsAndPipeline(ctx context.Context, set Settings,

if cfg.Telemetry.Metrics.Level != configtelemetry.LevelNone && cfg.Telemetry.Metrics.Address != "" {
// The process telemetry initialization requires the ballast size, which is available after the extensions are initialized.
if err = proctelemetry.RegisterProcessMetrics(srv.telemetryInitializer.mp, getBallastSize(srv.host)); err != nil {
if err = proctelemetry.RegisterProcessMetrics(srv.telemetrySettings.MeterProvider, getBallastSize(srv.host)); err != nil {
return fmt.Errorf("failed to register process metrics: %w", err)
}
}
Expand Down
93 changes: 44 additions & 49 deletions service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,57 +29,46 @@ const (
zapKeyTelemetryLevel = "level"
)

type telemetryInitializer struct {
type meterProvider struct {
*sdkmetric.MeterProvider
ocRegistry *ocmetric.Registry
mp metric.MeterProvider
servers []*http.Server

disableHighCardinality bool
extendedConfig bool
}

func newColTelemetry(disableHighCardinality bool, extendedConfig bool) *telemetryInitializer {
return &telemetryInitializer{
mp: noopmetric.NewMeterProvider(),
disableHighCardinality: disableHighCardinality,
extendedConfig: extendedConfig,
}
type meterProviderSettings struct {
res *resource.Resource
logger *zap.Logger
cfg telemetry.MetricsConfig
asyncErrorChannel chan error
}

func (tel *telemetryInitializer) init(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error {
if cfg.Metrics.Level == configtelemetry.LevelNone || (cfg.Metrics.Address == "" && len(cfg.Metrics.Readers) == 0) {
logger.Info(
func newMeterProvider(set meterProviderSettings, disableHighCardinality bool, extendedConfig bool) (metric.MeterProvider, error) {
if set.cfg.Level == configtelemetry.LevelNone || (set.cfg.Address == "" && len(set.cfg.Readers) == 0) {
set.logger.Info(
"Skipping telemetry setup.",
zap.String(zapKeyTelemetryAddress, cfg.Metrics.Address),
zap.String(zapKeyTelemetryLevel, cfg.Metrics.Level.String()),
zap.String(zapKeyTelemetryAddress, set.cfg.Address),
zap.String(zapKeyTelemetryLevel, set.cfg.Level.String()),
)
return nil
return noopmetric.NewMeterProvider(), nil
}

logger.Info("Setting up own telemetry...")
return tel.initMetrics(res, logger, cfg, asyncErrorChannel)
}

func (tel *telemetryInitializer) initMetrics(res *resource.Resource, logger *zap.Logger, cfg telemetry.Config, asyncErrorChannel chan error) error {
// Initialize the ocRegistry, still used by the process metrics.
tel.ocRegistry = ocmetric.NewRegistry()

if len(cfg.Metrics.Address) != 0 {
if tel.extendedConfig {
logger.Warn("service::telemetry::metrics::address is being deprecated in favor of service::telemetry::metrics::readers")
set.logger.Info("Setting up own telemetry...")
if len(set.cfg.Address) != 0 {
if extendedConfig {
set.logger.Warn("service::telemetry::metrics::address is being deprecated in favor of service::telemetry::metrics::readers")
}
host, port, err := net.SplitHostPort(cfg.Metrics.Address)
host, port, err := net.SplitHostPort(set.cfg.Address)
if err != nil {
return err
return nil, err
}
portInt, err := strconv.Atoi(port)
if err != nil {
return err
return nil, err
}
if cfg.Metrics.Readers == nil {
cfg.Metrics.Readers = []config.MetricReader{}
if set.cfg.Readers == nil {
set.cfg.Readers = []config.MetricReader{}
}
cfg.Metrics.Readers = append(cfg.Metrics.Readers, config.MetricReader{
set.cfg.Readers = append(set.cfg.Readers, config.MetricReader{
Pull: &config.PullMetricReader{
Exporter: config.MetricExporter{
Prometheus: &config.Prometheus{
Expand All @@ -91,41 +80,47 @@ func (tel *telemetryInitializer) initMetrics(res *resource.Resource, logger *zap
})
}

metricproducer.GlobalManager().AddProducer(tel.ocRegistry)
mp := &meterProvider{
// Initialize the ocRegistry, still used by the process metrics.
ocRegistry: ocmetric.NewRegistry(),
}
metricproducer.GlobalManager().AddProducer(mp.ocRegistry)
opts := []sdkmetric.Option{}
for _, reader := range cfg.Metrics.Readers {
for _, reader := range set.cfg.Readers {
// https://github.com/open-telemetry/opentelemetry-collector/issues/8045
r, server, err := proctelemetry.InitMetricReader(context.Background(), reader, asyncErrorChannel)
r, server, err := proctelemetry.InitMetricReader(context.Background(), reader, set.asyncErrorChannel)
if err != nil {
return err
return nil, err
}
if server != nil {
tel.servers = append(tel.servers, server)
logger.Info(
mp.servers = append(mp.servers, server)
set.logger.Info(
"Serving metrics",
zap.String(zapKeyTelemetryAddress, server.Addr),
zap.String(zapKeyTelemetryLevel, cfg.Metrics.Level.String()),
zap.String(zapKeyTelemetryLevel, set.cfg.Level.String()),
)
}
opts = append(opts, sdkmetric.WithReader(r))
}

mp, err := proctelemetry.InitOpenTelemetry(res, opts, tel.disableHighCardinality)
var err error
mp.MeterProvider, err = proctelemetry.InitOpenTelemetry(set.res, opts, disableHighCardinality)
if err != nil {
return err
return nil, err
}
tel.mp = mp
return nil
return mp, nil
}

func (tel *telemetryInitializer) shutdown() error {
metricproducer.GlobalManager().DeleteProducer(tel.ocRegistry)
// Shutdown the meter provider and all the associated resources.
// The type signature of this method matches that of the sdkmetric.MeterProvider.
func (mp *meterProvider) Shutdown(ctx context.Context) error {
metricproducer.GlobalManager().DeleteProducer(mp.ocRegistry)

var errs error
for _, server := range tel.servers {
for _, server := range mp.servers {
if server != nil {
errs = multierr.Append(errs, server.Close())
}
}
return errs
return multierr.Append(errs, mp.MeterProvider.Shutdown(ctx))
}
19 changes: 12 additions & 7 deletions service/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,6 @@ func TestTelemetryInit(t *testing.T) {
},
} {
t.Run(tc.name, func(t *testing.T) {
tel := newColTelemetry(tc.disableHighCard, tc.extendedConfig)
buildInfo := component.NewDefaultBuildInfo()
if tc.extendedConfig {
tc.cfg.Metrics.Readers = []config.MetricReader{
{
Expand All @@ -299,19 +297,26 @@ func TestTelemetryInit(t *testing.T) {
},
}
}
otelRes := resource.New(buildInfo, tc.cfg.Resource)
err := tel.init(otelRes, zap.NewNop(), *tc.cfg, make(chan error))
set := meterProviderSettings{
res: resource.New(component.NewDefaultBuildInfo(), tc.cfg.Resource),
logger: zap.NewNop(),
cfg: tc.cfg.Metrics,
asyncErrorChannel: make(chan error),
}
mp, err := newMeterProvider(set, tc.disableHighCard, tc.extendedConfig)
require.NoError(t, err)
defer func() {
require.NoError(t, tel.shutdown())
if prov, ok := mp.(interface{ Shutdown(context.Context) error }); ok {
require.NoError(t, prov.Shutdown(context.Background()))
}
}()

v := createTestMetrics(t, tel.mp)
v := createTestMetrics(t, mp)
defer func() {
view.Unregister(v)
}()

metrics := getMetricsFromPrometheus(t, tel.servers[0].Handler)
metrics := getMetricsFromPrometheus(t, mp.(*meterProvider).servers[0].Handler)
require.Equal(t, len(tc.expectedMetrics), len(metrics))

for metricName, metricValue := range tc.expectedMetrics {
Expand Down

0 comments on commit 069118b

Please sign in to comment.