diff --git a/component/componenttest/nop_host.go b/component/componenttest/nop_host.go index b535b674af6..c24584b2b2a 100644 --- a/component/componenttest/nop_host.go +++ b/component/componenttest/nop_host.go @@ -28,6 +28,8 @@ func NewNopHost() component.Host { func (nh *nopHost) ReportFatalError(_ error) {} +func (hw *nopHost) ReportComponentStatus(event *component.StatusEvent) {} + func (nh *nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory { return nil } diff --git a/component/componenttest/statuswatcher_extension.go b/component/componenttest/statuswatcher_extension.go new file mode 100644 index 00000000000..ffc01a76296 --- /dev/null +++ b/component/componenttest/statuswatcher_extension.go @@ -0,0 +1,61 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest // import "go.opentelemetry.io/collector/component/componenttest" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" +) + +// NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions. +func NewStatusWatcherExtensionCreateSettings() component.ExtensionCreateSettings { + return component.ExtensionCreateSettings{ + TelemetrySettings: NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + } +} + +type statusWatcherExtensionConfig struct { + config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct +} + +// NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions. +func NewStatusWatcherExtensionFactory( + onStatusChanged func(source component.StatusSource, event *component.StatusEvent), +) component.ExtensionFactory { + return component.NewExtensionFactory( + "statuswatcher", + func() component.ExtensionConfig { + return &statusWatcherExtensionConfig{ + ExtensionSettings: config.NewExtensionSettings(component.NewID("statuswatcher")), + } + }, + func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) { + return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil + }, + component.StabilityLevelStable) +} + +// statusWatcherExtension stores consumed traces and metrics for testing purposes. +type statusWatcherExtension struct { + nopComponent + onStatusChanged func(source component.StatusSource, event *component.StatusEvent) +} + +func (e statusWatcherExtension) ComponentStatusChanged(source component.StatusSource, event *component.StatusEvent) { + e.onStatusChanged(source, event) +} diff --git a/component/componenttest/unhealthy_processor.go b/component/componenttest/unhealthy_processor.go new file mode 100644 index 00000000000..2a84874fa7b --- /dev/null +++ b/component/componenttest/unhealthy_processor.go @@ -0,0 +1,81 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest // import "go.opentelemetry.io/collector/component/componenttest" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" +) + +// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions. +func NewUnhealthyProcessorCreateSettings() component.ProcessorCreateSettings { + return component.ProcessorCreateSettings{ + TelemetrySettings: NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + } +} + +type unhealthyProcessorConfig struct { + config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct +} + +// NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors. +func NewUnhealthyProcessorFactory() component.ProcessorFactory { + return component.NewProcessorFactory( + "unhealthy", + func() component.ProcessorConfig { + return &unhealthyProcessorConfig{ + ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")), + } + }, + component.WithTracesProcessor(createUnhealthyTracesProcessor, component.StabilityLevelStable), + component.WithMetricsProcessor(createUnhealthyMetricsProcessor, component.StabilityLevelStable), + component.WithLogsProcessor(createUnhealthyLogsProcessor, component.StabilityLevelStable), + ) +} + +func createUnhealthyTracesProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Traces) (component.TracesProcessor, error) { + return unhealthyProcessorInstance, nil +} + +func createUnhealthyMetricsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Metrics) (component.MetricsProcessor, error) { + return unhealthyProcessorInstance, nil +} + +func createUnhealthyLogsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Logs) (component.LogsProcessor, error) { + return unhealthyProcessorInstance, nil +} + +var unhealthyProcessorInstance = &unhealthyProcessor{ + Consumer: consumertest.NewNop(), +} + +// unhealthyProcessor stores consumed traces and metrics for testing purposes. +type unhealthyProcessor struct { + nopComponent + consumertest.Consumer +} + +func (unhealthyProcessor) Start(ctx context.Context, host component.Host) error { + go func() { + evt, _ := component.NewStatusEvent(component.StatusError) + host.ReportComponentStatus(evt) + }() + return nil +} diff --git a/component/host.go b/component/host.go index 92c395b0306..3c39fbde9eb 100644 --- a/component/host.go +++ b/component/host.go @@ -23,8 +23,17 @@ type Host interface { // // ReportFatalError should be called by the component anytime after Component.Start() ends and // before Component.Shutdown() begins. + // Deprecated: [0.65.0] Use ReportComponentStatus instead (with an event of type status.ComponentError) ReportFatalError(err error) + // ReportComponentStatus can be used by a component to communicate its status to the Host. + // The Host implementations may broadcast this information to interested parties via + // StatusWatcher interface. + // May be called by the component any time after Component.Start is called or while + // Component.Start call is executing. + // May be called concurrently with itself. + ReportComponentStatus(event *StatusEvent) + // GetFactory of the specified kind. Returns the factory for a component type. // This allows components to create other components. For example: // func (r MyReceiver) Start(host component.Host) error { diff --git a/component/status.go b/component/status.go new file mode 100644 index 00000000000..42de901445f --- /dev/null +++ b/component/status.go @@ -0,0 +1,90 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package component + +import ( + "errors" +) + +type Status int32 + +const ( + StatusOK Status = iota + StatusError +) + +// StatusSource component that reports a status about itself. +// The implementation of this interface must be comparable to be useful as a map key. +type StatusSource interface { + ID() ID +} + +type StatusEvent struct { + status Status + err error +} + +func (ev *StatusEvent) Status() Status { + return ev.status +} + +// Err returns the error associated with the ComponentEvent. +func (ev *StatusEvent) Err() error { + return ev.err +} + +// statusEventOption applies options to a StatusEvent. +type statusEventOption func(*StatusEvent) error + +// WithError sets the error object of the Event. It is optional +// and should only be applied to an Event of type ComponentError. +func WithError(err error) statusEventOption { + return func(o *StatusEvent) error { + if o.status == StatusOK { + return errors.New("event with ComponentOK cannot have an error") + } + o.err = err + return nil + } +} + +// NewStatusEvent creates and returns a StatusEvent with default and provided +// options. Will return an error if an error is provided for a non-error event +// type (status.ComponentOK). +// If the timestamp is not provided will set it to time.Now(). +func NewStatusEvent(status Status, options ...statusEventOption) (*StatusEvent, error) { + ev := StatusEvent{ + status: status, + } + + for _, opt := range options { + if err := opt(&ev); err != nil { + return nil, err + } + } + + return &ev, nil +} + +// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry +// Collector that is to be implemented by extensions interested in changes to component +// status. +type StatusWatcher interface { + // ComponentStatusChanged notifies about a change in the source component status. + // Extensions that implement this interface must be ready that the ComponentStatusChanged + // may be called before, after or concurrently with Component.Shutdown() call. + // The function may be called concurrently with itself. + ComponentStatusChanged(source StatusSource, event *StatusEvent) +} diff --git a/component/status_test.go b/component/status_test.go new file mode 100644 index 00000000000..65603b04555 --- /dev/null +++ b/component/status_test.go @@ -0,0 +1,11 @@ +package component + +import ( + "fmt" + "testing" + "unsafe" +) + +func TestStatusEventSize(t *testing.T) { + fmt.Printf("StatusEvent size=%d", unsafe.Sizeof(StatusEvent{})) +} diff --git a/service/collector.go b/service/collector.go index 4250e9874a5..abcaeaa4422 100644 --- a/service/collector.go +++ b/service/collector.go @@ -28,9 +28,9 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/service/internal/grpclog" + "go.opentelemetry.io/collector/service/internal/servicehost" ) // State defines Collector's state. @@ -278,7 +278,7 @@ func (col *Collector) setCollectorState(state State) { col.state.Store(int32(state)) } -func getBallastSize(host component.Host) uint64 { +func getBallastSize(host servicehost.Host) uint64 { var ballastSize uint64 extensions := host.GetExtensions() for _, extension := range extensions { diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 1e20f430290..f1a47b0bc5c 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -25,6 +25,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -36,13 +37,26 @@ type Extensions struct { extMap map[component.ID]component.Extension } +type statusReportingExtension struct { + id component.ID +} + +func (s *statusReportingExtension) GetKind() component.Kind { + return component.KindExtension +} + +func (s *statusReportingExtension) ID() component.ID { + return s.id +} + // Start starts all extensions. -func (bes *Extensions) Start(ctx context.Context, host component.Host) error { +func (bes *Extensions) Start(ctx context.Context, host servicehost.Host) error { bes.telemetry.Logger.Info("Starting extensions...") for extID, ext := range bes.extMap { extLogger := extensionLogger(bes.telemetry.Logger, extID) extLogger.Info("Extension is starting...") - if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil { + statusSource := &statusReportingExtension{extID} + if err := ext.Start(ctx, components.NewHostWrapper(host, statusSource, extLogger)); err != nil { return err } extLogger.Info("Extension started.") @@ -83,6 +97,16 @@ func (bes *Extensions) NotifyPipelineNotReady() error { return errs } +func (bes *Extensions) NotifyComponentStatusChange(source component.StatusSource, event *component.StatusEvent) error { + var errs error + for _, ext := range bes.extMap { + if pw, ok := ext.(component.StatusWatcher); ok { + pw.ComponentStatusChanged(source, event) + } + } + return errs +} + func (bes *Extensions) GetExtensions() map[component.ID]component.Extension { result := make(map[component.ID]component.Extension, len(bes.extMap)) for extID, v := range bes.extMap { diff --git a/service/host.go b/service/host.go index 31be010608d..195b6e6be0a 100644 --- a/service/host.go +++ b/service/host.go @@ -18,9 +18,10 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/pipelines" + "go.opentelemetry.io/collector/service/internal/servicehost" ) -var _ component.Host = (*serviceHost)(nil) +var _ servicehost.Host = (*serviceHost)(nil) type serviceHost struct { asyncErrorChannel chan error @@ -34,10 +35,15 @@ type serviceHost struct { // ReportFatalError is used to report to the host that the receiver encountered // a fatal error (i.e.: an error that the instance can't recover from) after // its start function has already returned. +// Deprecated: [0.65.0] Replaced by ReportComponentStatus func (host *serviceHost) ReportFatalError(err error) { host.asyncErrorChannel <- err } +func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { + host.extensions.NotifyComponentStatusChange(source, event) +} + func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { switch kind { case component.KindReceiver: diff --git a/service/internal/components/host_wrapper.go b/service/internal/components/host_wrapper.go index 78428e4e2f0..67beaf0b981 100644 --- a/service/internal/components/host_wrapper.go +++ b/service/internal/components/host_wrapper.go @@ -20,17 +20,21 @@ import ( "go.uber.org/zap" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" ) // hostWrapper adds behavior on top of the component.Host being passed when starting the built components. +// TODO: rename this to componentHost or hostComponentConnector to better reflect the purpose. type hostWrapper struct { - component.Host + servicehost.Host + component component.StatusSource *zap.Logger } -func NewHostWrapper(host component.Host, logger *zap.Logger) component.Host { +func NewHostWrapper(host servicehost.Host, component component.StatusSource, logger *zap.Logger) component.Host { return &hostWrapper{ host, + component, logger, } } @@ -41,6 +45,12 @@ func (hw *hostWrapper) ReportFatalError(err error) { hw.Host.ReportFatalError(err) } +var emptyComponentID = component.ID{} + +func (hw *hostWrapper) ReportComponentStatus(event *component.StatusEvent) { + hw.Host.ReportComponentStatus(hw.component, event) +} + // RegisterZPages is used by zpages extension to register handles from service. // When the wrapper is passed to the extension it won't be successful when casting // the interface, for the time being expose the interface here. diff --git a/service/internal/components/host_wrapper_test.go b/service/internal/components/host_wrapper_test.go index 720669bd847..0738df63600 100644 --- a/service/internal/components/host_wrapper_test.go +++ b/service/internal/components/host_wrapper_test.go @@ -18,12 +18,17 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" "go.uber.org/zap" - "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/servicehost" ) func Test_newHostWrapper(t *testing.T) { - hw := NewHostWrapper(componenttest.NewNopHost(), zap.NewNop()) + hw := NewHostWrapper(servicehost.NewNopHost(), nil, zap.NewNop()) hw.ReportFatalError(errors.New("test error")) + ev, err := component.NewStatusEvent(component.StatusOK) + assert.NoError(t, err) + hw.ReportComponentStatus(ev) } diff --git a/service/internal/pipelines/pipelines.go b/service/internal/pipelines/pipelines.go index 24907012a86..bcb6e0879a0 100644 --- a/service/internal/pipelines/pipelines.go +++ b/service/internal/pipelines/pipelines.go @@ -28,6 +28,7 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/service/internal/components" "go.opentelemetry.io/collector/service/internal/fanoutconsumer" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -65,18 +66,32 @@ type Pipelines struct { pipelines map[component.ID]*builtPipeline } +type statusReportingComponent struct { + kind component.Kind + id component.ID +} + +func (s *statusReportingComponent) GetKind() component.Kind { + return s.kind +} + +func (s *statusReportingComponent) ID() component.ID { + return s.id +} + // StartAll starts all pipelines. // // Start with exporters, processors (in reverse configured order), then receivers. // This is important so that components that are earlier in the pipeline and reference components that are // later in the pipeline do not start sending data to later components which are not yet started. -func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { +func (bps *Pipelines) StartAll(ctx context.Context, host servicehost.Host) error { bps.telemetry.Logger.Info("Starting exporters...") for dt, expByID := range bps.allExporters { for expID, exp := range expByID { expLogger := exporterLogger(bps.telemetry.Logger, expID, dt) expLogger.Info("Exporter is starting...") - if err := exp.Start(ctx, components.NewHostWrapper(host, expLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindExporter, expID} + if err := exp.Start(ctx, components.NewHostWrapper(host, statusSource, expLogger)); err != nil { return err } expLogger.Info("Exporter started.") @@ -86,9 +101,12 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { bps.telemetry.Logger.Info("Starting processors...") for pipelineID, bp := range bps.pipelines { for i := len(bp.processors) - 1; i >= 0; i-- { - procLogger := processorLogger(bps.telemetry.Logger, bp.processors[i].id, pipelineID) + processor := bp.processors[i] + procID := processor.id + procLogger := processorLogger(bps.telemetry.Logger, procID, pipelineID) procLogger.Info("Processor is starting...") - if err := bp.processors[i].comp.Start(ctx, components.NewHostWrapper(host, procLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindProcessor, procID} + if err := processor.comp.Start(ctx, components.NewHostWrapper(host, statusSource, procLogger)); err != nil { return err } procLogger.Info("Processor started.") @@ -100,7 +118,8 @@ func (bps *Pipelines) StartAll(ctx context.Context, host component.Host) error { for recvID, recv := range recvByID { recvLogger := receiverLogger(bps.telemetry.Logger, recvID, dt) recvLogger.Info("Receiver is starting...") - if err := recv.Start(ctx, components.NewHostWrapper(host, recvLogger)); err != nil { + statusSource := &statusReportingComponent{component.KindReceiver, recvID} + if err := recv.Start(ctx, components.NewHostWrapper(host, statusSource, recvLogger)); err != nil { return err } recvLogger.Info("Receiver started.") diff --git a/service/internal/pipelines/pipelines_test.go b/service/internal/pipelines/pipelines_test.go index 2bc2ed63096..d92f05db19f 100644 --- a/service/internal/pipelines/pipelines_test.go +++ b/service/internal/pipelines/pipelines_test.go @@ -32,6 +32,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/testdata" "go.opentelemetry.io/collector/service/internal/configunmarshaler" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/testcomponents" ) @@ -95,7 +96,7 @@ func TestBuild(t *testing.T) { pipelines, err := Build(context.Background(), toSettings(factories, cfg)) assert.NoError(t, err) - assert.NoError(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) // Verify exporters created, started and empty. for _, expID := range test.exporterIDs { @@ -311,7 +312,7 @@ func TestFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -325,7 +326,7 @@ func TestFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -339,7 +340,7 @@ func TestFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) } diff --git a/service/internal/servicehost/host.go b/service/internal/servicehost/host.go new file mode 100644 index 00000000000..754f483e49c --- /dev/null +++ b/service/internal/servicehost/host.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicehost + +import ( + "go.opentelemetry.io/collector/component" +) + +// Host mirrors component.Host interface, with one important difference: servicehost.Host +// is not associated with a component and thus ReportComponentStatus() requires the source +// component to be explicitly specified. +type Host interface { + // ReportComponentStatus is used to communicate the status of a source component to the Host. + // The Host implementations will broadcast this information to interested parties via + // StatusWatcher interface. + ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) + + // See component.Host for the documentation of the rest of the functions. + + // Deprecated: [0.65.0] Replaced by ReportComponentStatus. + ReportFatalError(err error) + + GetFactory(kind component.Kind, componentType component.Type) component.Factory + GetExtensions() map[component.ID]component.Extension + GetExporters() map[component.DataType]map[component.ID]component.Exporter +} diff --git a/service/internal/servicehost/nop_host.go b/service/internal/servicehost/nop_host.go new file mode 100644 index 00000000000..7a5717624fb --- /dev/null +++ b/service/internal/servicehost/nop_host.go @@ -0,0 +1,45 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package servicehost + +import ( + "go.opentelemetry.io/collector/component" +) + +// nopHost mocks a receiver.ReceiverHost for test purposes. +type nopHost struct{} + +func (n nopHost) ReportFatalError(err error) { +} + +func (n nopHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { +} + +func (n nopHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { + return nil +} + +func (n nopHost) GetExtensions() map[component.ID]component.Extension { + return nil +} + +func (n nopHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter { + return nil +} + +// NewNopHost returns a new instance of nopHost with proper defaults for most tests. +func NewNopHost() Host { + return &nopHost{} +} diff --git a/service/service_test.go b/service/service_test.go index c65d322a80b..248b7dc4432 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -19,7 +19,9 @@ import ( "fmt" "net/http" "path/filepath" + "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -223,3 +225,73 @@ func createExampleService(t *testing.T, factories component.Factories) *service }) return srv } + +func TestComponentStatusWatcher(t *testing.T) { + factories, err := componenttest.NopFactories() + require.NoError(t, err) + + // Use a processor factory that creates "unhealthy" processor: one that + // always reports StatusError after successful Start. + unhealthyProcessorFactory := componenttest.NewUnhealthyProcessorFactory() + factories.Processors[unhealthyProcessorFactory.Type()] = unhealthyProcessorFactory + + // Keep track of all status changes in a map. + changedComponents := map[component.StatusSource]component.Status{} + var mux sync.Mutex + onStatusChanged := func(source component.StatusSource, event *component.StatusEvent) { + mux.Lock() + defer mux.Unlock() + changedComponents[source] = event.Status() + } + + // Add a "statuswatcher" extension that will receive notifications when processor + // status changes. + factory := componenttest.NewStatusWatcherExtensionFactory(onStatusChanged) + factories.Extensions[factory.Type()] = factory + + // Read config from file. This config uses 4 "unhealthy" processors. + validProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-statuswatcher.yaml")})) + require.NoError(t, err) + validCfg, err := validProvider.Get(context.Background(), factories) + require.NoError(t, err) + + // Create a service + telemetry := newColTelemetry(featuregate.NewRegistry()) + srv, err := newService(&settings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + Config: validCfg, + telemetry: telemetry, + }) + require.NoError(t, err) + + // Start the service. + require.NoError(t, srv.Start(context.Background())) + + // The "unhealthy" processors will now begin to asynchronously report StatusError. + // We expect to see these reports. + + assert.Eventually(t, func() bool { + mux.Lock() + defer mux.Unlock() + + for k, v := range changedComponents { + // All processors must report a status change with the same ID + assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID()) + // And all must be in StatusError + assert.EqualValues(t, component.StatusError, v) + } + + // We have 4 processors with exactly the same ID in otelcol-statuswatcher.yaml + // We must have exactly 4 items in our map. This ensures that the "source" argument + // passed to status change func is unique per instance of source component despite + // components having the same IDs (having same ID for different component instances + // is a normal situation for processors, but also not prohibited for other component types). + return len(changedComponents) == 4 + }, time.Second, time.Millisecond*10) + + t.Cleanup(func() { + assert.NoError(t, telemetry.shutdown()) + assert.NoError(t, srv.Shutdown(context.Background())) + }) +} diff --git a/service/testdata/otelcol-statuswatcher.yaml b/service/testdata/otelcol-statuswatcher.yaml new file mode 100644 index 00000000000..34e6ea80063 --- /dev/null +++ b/service/testdata/otelcol-statuswatcher.yaml @@ -0,0 +1,31 @@ +receivers: + nop: + +processors: + nop: + unhealthy: + +exporters: + nop: + +extensions: + statuswatcher: + +service: + telemetry: + metrics: + address: localhost:8888 + extensions: [statuswatcher] + pipelines: + traces: + receivers: [nop] + processors: [nop,unhealthy,unhealthy] + exporters: [nop] + metrics: + receivers: [nop] + processors: [nop,unhealthy] + exporters: [nop] + logs: + receivers: [nop] + processors: [nop,unhealthy] + exporters: [nop]