diff --git a/component/component.go b/component/component.go index 9a6a95d798a..d58a7ff0954 100644 --- a/component/component.go +++ b/component/component.go @@ -175,3 +175,10 @@ type CreateDefaultConfigFunc func() Config func (f CreateDefaultConfigFunc) CreateDefaultConfig() Config { return f() } + +// GlobalID uniquely identifies a component +type GlobalID struct { + ID ID + Kind Kind + PipelineID ID // Not empty only if the Kind is Processor +} diff --git a/component/status.go b/component/status.go index d797a16b3e7..dcf001ec6ce 100644 --- a/component/status.go +++ b/component/status.go @@ -14,12 +14,6 @@ const ( StatusError ) -// StatusSource component that reports a status about itself. -// The implementation of this interface must be comparable to be useful as a map key. -type StatusSource interface { - ID() ID -} - type StatusEvent struct { status Status err error @@ -75,5 +69,5 @@ type StatusWatcher interface { // Extensions that implement this interface must be ready that the ComponentStatusChanged // may be called before, after or concurrently with Component.Shutdown() call. // The function may be called concurrently with itself. - ComponentStatusChanged(source StatusSource, event *StatusEvent) + ComponentStatusChanged(source *GlobalID, event *StatusEvent) } diff --git a/extension/extensiontest/statuswatcher_extension.go b/extension/extensiontest/statuswatcher_extension.go index cba925f4435..aaa1b358beb 100644 --- a/extension/extensiontest/statuswatcher_extension.go +++ b/extension/extensiontest/statuswatcher_extension.go @@ -21,7 +21,7 @@ func NewStatusWatcherExtensionCreateSettings() extension.CreateSettings { // NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions. func NewStatusWatcherExtensionFactory( - onStatusChanged func(source component.StatusSource, event *component.StatusEvent), + onStatusChanged func(source *component.GlobalID, event *component.StatusEvent), ) extension.Factory { return extension.NewFactory( "statuswatcher", @@ -38,9 +38,9 @@ func NewStatusWatcherExtensionFactory( type statusWatcherExtension struct { component.StartFunc component.ShutdownFunc - onStatusChanged func(source component.StatusSource, event *component.StatusEvent) + onStatusChanged func(source *component.GlobalID, event *component.StatusEvent) } -func (e statusWatcherExtension) ComponentStatusChanged(source component.StatusSource, event *component.StatusEvent) { +func (e statusWatcherExtension) ComponentStatusChanged(source *component.GlobalID, event *component.StatusEvent) { e.onStatusChanged(source, event) } diff --git a/otelcol/collector_test.go b/otelcol/collector_test.go index 3b01deb6e73..81b2a2702cc 100644 --- a/otelcol/collector_test.go +++ b/otelcol/collector_test.go @@ -161,9 +161,9 @@ func TestComponentStatusWatcher(t *testing.T) { factories.Processors[unhealthyProcessorFactory.Type()] = unhealthyProcessorFactory // Keep track of all status changes in a map. - changedComponents := map[component.StatusSource]component.Status{} + changedComponents := map[*component.GlobalID]component.Status{} var mux sync.Mutex - onStatusChanged := func(source component.StatusSource, event *component.StatusEvent) { + onStatusChanged := func(source *component.GlobalID, event *component.StatusEvent) { mux.Lock() defer mux.Unlock() changedComponents[source] = event.Status() @@ -197,7 +197,7 @@ func TestComponentStatusWatcher(t *testing.T) { for k, v := range changedComponents { // All processors must report a status change with the same ID - assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID()) + assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID) // And all must be in StatusError assert.EqualValues(t, component.StatusError, v) } diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 2a1577e2b7c..4f6b7c34328 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -26,26 +26,17 @@ type Extensions struct { extMap map[component.ID]extension.Extension } -type statusReportingExtension struct { - id component.ID -} - -func (s *statusReportingExtension) GetKind() component.Kind { - return component.KindExtension -} - -func (s *statusReportingExtension) ID() component.ID { - return s.id -} - // Start starts all extensions. func (bes *Extensions) Start(ctx context.Context, host servicehost.Host) error { bes.telemetry.Logger.Info("Starting extensions...") for extID, ext := range bes.extMap { extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID) extLogger.Info("Extension is starting...") - statusSource := &statusReportingExtension{extID} - if err := ext.Start(ctx, components.NewHostWrapper(host, statusSource, extLogger)); err != nil { + globalID := &component.GlobalID{ + ID: extID, + Kind: component.KindExtension, + } + if err := ext.Start(ctx, components.NewHostWrapper(host, globalID, extLogger)); err != nil { return err } extLogger.Info("Extension started.") @@ -86,7 +77,7 @@ func (bes *Extensions) NotifyPipelineNotReady() error { return errs } -func (bes *Extensions) NotifyComponentStatusChange(source component.StatusSource, event *component.StatusEvent) error { +func (bes *Extensions) NotifyComponentStatusChange(source *component.GlobalID, event *component.StatusEvent) error { var errs error for _, ext := range bes.extMap { if pw, ok := ext.(component.StatusWatcher); ok { diff --git a/service/host.go b/service/host.go index 1a3d9bba261..dbe188f191d 100644 --- a/service/host.go +++ b/service/host.go @@ -39,7 +39,7 @@ func (host *serviceHost) ReportFatalError(err error) { host.asyncErrorChannel <- err } -func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { +func (host *serviceHost) ReportComponentStatus(source *component.GlobalID, event *component.StatusEvent) { // TODO: What should we do if there is an error notifying here? host.serviceExtensions.NotifyComponentStatusChange(source, event) //nolint:errcheck } diff --git a/service/internal/components/host_wrapper.go b/service/internal/components/host_wrapper.go index 245be162a65..a020aa5d96e 100644 --- a/service/internal/components/host_wrapper.go +++ b/service/internal/components/host_wrapper.go @@ -16,11 +16,11 @@ import ( // TODO: rename this to componentHost or hostComponentConnector to better reflect the purpose. type hostWrapper struct { servicehost.Host - component component.StatusSource + component *component.GlobalID *zap.Logger } -func NewHostWrapper(host servicehost.Host, component component.StatusSource, logger *zap.Logger) component.Host { +func NewHostWrapper(host servicehost.Host, component *component.GlobalID, logger *zap.Logger) component.Host { return &hostWrapper{ host, component, diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 41e990295fc..036e5938243 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -50,14 +50,14 @@ type Graph struct { pipelines map[component.ID]*pipelineNodes // Keep track of status source per node - statusSources map[int64]*statusReportingComponent + globalIDs map[int64]*component.GlobalID } func Build(ctx context.Context, set Settings) (*Graph, error) { pipelines := &Graph{ componentGraph: simple.NewDirectedGraph(), pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)), - statusSources: make(map[int64]*statusReportingComponent), + globalIDs: make(map[int64]*component.GlobalID), } for pipelineID := range set.PipelineConfigs { pipelines.pipelines[pipelineID] = &pipelineNodes{ @@ -91,9 +91,9 @@ func (g *Graph) createNodes(set Settings) error { } rcvrNode := g.createReceiver(pipelineID.Type(), recvID) pipe.receivers[rcvrNode.ID()] = rcvrNode - g.statusSources[rcvrNode.ID()] = &statusReportingComponent{ - id: recvID, - kind: component.KindReceiver, + g.globalIDs[rcvrNode.ID()] = &component.GlobalID{ + ID: recvID, + Kind: component.KindReceiver, } } @@ -102,9 +102,10 @@ func (g *Graph) createNodes(set Settings) error { for _, procID := range pipelineCfg.Processors { procNode := g.createProcessor(pipelineID, procID) pipe.processors = append(pipe.processors, procNode) - g.statusSources[procNode.ID()] = &statusReportingComponent{ - id: procID, - kind: component.KindProcessor, + g.globalIDs[procNode.ID()] = &component.GlobalID{ + ID: procID, + Kind: component.KindProcessor, + PipelineID: pipelineID, } } @@ -118,9 +119,9 @@ func (g *Graph) createNodes(set Settings) error { } expNode := g.createExporter(pipelineID.Type(), exprID) pipe.exporters[expNode.ID()] = expNode - g.statusSources[expNode.ID()] = &statusReportingComponent{ - id: expNode.componentID, - kind: component.KindExporter, + g.globalIDs[expNode.ID()] = &component.GlobalID{ + ID: expNode.componentID, + Kind: component.KindExporter, } } } @@ -178,9 +179,9 @@ func (g *Graph) createNodes(set Settings) error { connNode := g.createConnector(eID, rID, connID) g.pipelines[eID].exporters[connNode.ID()] = connNode g.pipelines[rID].receivers[connNode.ID()] = connNode - g.statusSources[connNode.ID()] = &statusReportingComponent{ - id: connNode.componentID, - kind: component.KindConnector, + g.globalIDs[connNode.ID()] = &component.GlobalID{ + ID: connNode.componentID, + Kind: component.KindConnector, } } } @@ -340,19 +341,6 @@ type pipelineNodes struct { exporters map[int64]graph.Node } -type statusReportingComponent struct { - kind component.Kind - id component.ID -} - -func (s *statusReportingComponent) GetKind() component.Kind { - return s.kind -} - -func (s *statusReportingComponent) ID() component.ID { - return s.id -} - func (g *Graph) StartAll(ctx context.Context, host servicehost.Host) error { nodes, err := topo.Sort(g.componentGraph) if err != nil { @@ -371,8 +359,8 @@ func (g *Graph) StartAll(ctx context.Context, host servicehost.Host) error { continue } - statusSource := g.statusSources[node.ID()] - hostWrapper := components.NewHostWrapper(host, statusSource, zap.NewNop()) + globalID := g.globalIDs[node.ID()] + hostWrapper := components.NewHostWrapper(host, globalID, zap.NewNop()) if compErr := comp.Start(ctx, hostWrapper); compErr != nil { return compErr diff --git a/service/internal/servicehost/host.go b/service/internal/servicehost/host.go index 74406735620..78100565208 100644 --- a/service/internal/servicehost/host.go +++ b/service/internal/servicehost/host.go @@ -16,7 +16,7 @@ type Host interface { // ReportComponentStatus is used to communicate the status of a source component to the Host. // The Host implementations will broadcast this information to interested parties via // StatusWatcher interface. - ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) + ReportComponentStatus(source *component.GlobalID, event *component.StatusEvent) // See component.Host for the documentation of the rest of the functions. diff --git a/service/internal/servicehost/nop_host.go b/service/internal/servicehost/nop_host.go index 926ea122a5a..9fd802e4e4c 100644 --- a/service/internal/servicehost/nop_host.go +++ b/service/internal/servicehost/nop_host.go @@ -13,7 +13,7 @@ type nopHost struct{} func (n nopHost) ReportFatalError(_ error) { } -func (n nopHost) ReportComponentStatus(_ component.StatusSource, _ *component.StatusEvent) { +func (n nopHost) ReportComponentStatus(_ *component.GlobalID, _ *component.StatusEvent) { } func (n nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory {