From 37dc2c89ee37f1b5f375cbf01ac3b1f686cb3e32 Mon Sep 17 00:00:00 2001 From: Matthew Wear Date: Mon, 31 Jul 2023 17:11:49 -0700 Subject: [PATCH] Get things working after rebase This picks up where https://github.com/open-telemetry/opentelemetry-collector/pull/6560 left off. The first step is to get the code introduced in that PR working with the collector as it is today. There were significant changes to how pipelines are built and the component package was split into separate packages based on type (extension, processor, etc). This commit makes the necessary changes to get everything working, likely not in the most ideal way, but it's a start that we can iterate on. --- .../extensiontest}/statuswatcher_extension.go | 30 ++++----- otelcol/collector_test.go | 65 +++++++++++++++++++ otelcol/testdata/otelcol-statuswatcher.yaml | 2 +- .../processortest}/unhealthy_processor.go | 40 +++++------- service/host.go | 2 +- service/internal/graph/graph.go | 60 +++++++++++++++-- service/internal/graph/graph_test.go | 17 ++--- service/internal/servicehost/host.go | 4 +- service/internal/servicehost/nop_host.go | 4 +- service/service.go | 3 +- 10 files changed, 169 insertions(+), 58 deletions(-) rename {component/componenttest => extension/extensiontest}/statuswatcher_extension.go (65%) rename {component/componenttest => processor/processortest}/unhealthy_processor.go (52%) diff --git a/component/componenttest/statuswatcher_extension.go b/extension/extensiontest/statuswatcher_extension.go similarity index 65% rename from component/componenttest/statuswatcher_extension.go rename to extension/extensiontest/statuswatcher_extension.go index ffc01a76296..ee0280bf7c1 100644 --- a/component/componenttest/statuswatcher_extension.go +++ b/extension/extensiontest/statuswatcher_extension.go @@ -12,39 +12,34 @@ // See the License for the specific language governing permissions and // limitations under the License. -package componenttest // import "go.opentelemetry.io/collector/component/componenttest" +package extensiontest // import "go.opentelemetry.io/collector/extension/extensiontest" import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/extension" ) // NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions. -func NewStatusWatcherExtensionCreateSettings() component.ExtensionCreateSettings { - return component.ExtensionCreateSettings{ - TelemetrySettings: NewNopTelemetrySettings(), +func NewStatusWatcherExtensionCreateSettings() extension.CreateSettings { + return extension.CreateSettings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), } } -type statusWatcherExtensionConfig struct { - config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct -} - // NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions. func NewStatusWatcherExtensionFactory( onStatusChanged func(source component.StatusSource, event *component.StatusEvent), -) component.ExtensionFactory { - return component.NewExtensionFactory( +) extension.Factory { + return extension.NewFactory( "statuswatcher", - func() component.ExtensionConfig { - return &statusWatcherExtensionConfig{ - ExtensionSettings: config.NewExtensionSettings(component.NewID("statuswatcher")), - } + func() component.Config { + return &struct{}{} }, - func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) { + func(context.Context, extension.CreateSettings, component.Config) (component.Component, error) { return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil }, component.StabilityLevelStable) @@ -52,7 +47,8 @@ func NewStatusWatcherExtensionFactory( // statusWatcherExtension stores consumed traces and metrics for testing purposes. type statusWatcherExtension struct { - nopComponent + component.StartFunc + component.ShutdownFunc onStatusChanged func(source component.StatusSource, event *component.StatusEvent) } diff --git a/otelcol/collector_test.go b/otelcol/collector_test.go index 1a0ad8d0607..c84eb820ef6 100644 --- a/otelcol/collector_test.go +++ b/otelcol/collector_test.go @@ -19,6 +19,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/converter/expandconverter" + "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/processor/processortest" ) func TestStateString(t *testing.T) { @@ -151,6 +153,69 @@ func TestCollectorReportError(t *testing.T) { assert.Equal(t, StateClosed, col.GetState()) } +func TestComponentStatusWatcher(t *testing.T) { + factories, err := nopFactories() + assert.NoError(t, err) + + // Use a processor factory that creates "unhealthy" processor: one that + // always reports StatusError after successful Start. + unhealthyProcessorFactory := processortest.NewUnhealthyProcessorFactory() + factories.Processors[unhealthyProcessorFactory.Type()] = unhealthyProcessorFactory + + // Keep track of all status changes in a map. + changedComponents := map[component.StatusSource]component.Status{} + var mux sync.Mutex + onStatusChanged := func(source component.StatusSource, event *component.StatusEvent) { + mux.Lock() + defer mux.Unlock() + changedComponents[source] = event.Status() + } + + // Add a "statuswatcher" extension that will receive notifications when processor + // status changes. + factory := extensiontest.NewStatusWatcherExtensionFactory(onStatusChanged) + factories.Extensions[factory.Type()] = factory + + // Read config from file. This config uses 3 "unhealthy" processors. + validProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-statuswatcher.yaml")})) + require.NoError(t, err) + + // Create a collector + col, err := NewCollector(CollectorSettings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + ConfigProvider: validProvider, + }) + require.NoError(t, err) + + // Start the newly created collector. + wg := startCollector(context.Background(), t, col) + + // The "unhealthy" processors will now begin to asynchronously report StatusError. + // We expect to see these reports. + assert.Eventually(t, func() bool { + mux.Lock() + defer mux.Unlock() + + for k, v := range changedComponents { + // All processors must report a status change with the same ID + assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID()) + // And all must be in StatusError + assert.EqualValues(t, component.StatusError, v) + } + // We have 3 processors with exactly the same ID in otelcol-statuswatcher.yaml + // We must have exactly 3 items in our map. This ensures that the "source" argument + // passed to status change func is unique per instance of source component despite + // components having the same IDs (having same ID for different component instances + // is a normal situation for processors). + return len(changedComponents) == 3 + }, time.Second, time.Millisecond*10) + + col.Shutdown() + wg.Wait() + assert.Equal(t, StateClosed, col.GetState()) +} + func TestCollectorSendSignal(t *testing.T) { factories, err := nopFactories() require.NoError(t, err) diff --git a/otelcol/testdata/otelcol-statuswatcher.yaml b/otelcol/testdata/otelcol-statuswatcher.yaml index 34e6ea80063..2dcc322d341 100644 --- a/otelcol/testdata/otelcol-statuswatcher.yaml +++ b/otelcol/testdata/otelcol-statuswatcher.yaml @@ -19,7 +19,7 @@ service: pipelines: traces: receivers: [nop] - processors: [nop,unhealthy,unhealthy] + processors: [nop,unhealthy] exporters: [nop] metrics: receivers: [nop] diff --git a/component/componenttest/unhealthy_processor.go b/processor/processortest/unhealthy_processor.go similarity index 52% rename from component/componenttest/unhealthy_processor.go rename to processor/processortest/unhealthy_processor.go index 2a84874fa7b..64a52d1d570 100644 --- a/component/componenttest/unhealthy_processor.go +++ b/processor/processortest/unhealthy_processor.go @@ -12,53 +12,48 @@ // See the License for the specific language governing permissions and // limitations under the License. -package componenttest // import "go.opentelemetry.io/collector/component/componenttest" +package processortest // import "go.opentelemetry.io/collector/component/componenttest" import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor" ) // NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions. -func NewUnhealthyProcessorCreateSettings() component.ProcessorCreateSettings { - return component.ProcessorCreateSettings{ - TelemetrySettings: NewNopTelemetrySettings(), +func NewUnhealthyProcessorCreateSettings() processor.CreateSettings { + return processor.CreateSettings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), } } -type unhealthyProcessorConfig struct { - config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct -} - // NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors. -func NewUnhealthyProcessorFactory() component.ProcessorFactory { - return component.NewProcessorFactory( +func NewUnhealthyProcessorFactory() processor.Factory { + return processor.NewFactory( "unhealthy", - func() component.ProcessorConfig { - return &unhealthyProcessorConfig{ - ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")), - } + func() component.Config { + return &struct{}{} }, - component.WithTracesProcessor(createUnhealthyTracesProcessor, component.StabilityLevelStable), - component.WithMetricsProcessor(createUnhealthyMetricsProcessor, component.StabilityLevelStable), - component.WithLogsProcessor(createUnhealthyLogsProcessor, component.StabilityLevelStable), + processor.WithTraces(createUnhealthyTracesProcessor, component.StabilityLevelStable), + processor.WithMetrics(createUnhealthyMetricsProcessor, component.StabilityLevelStable), + processor.WithLogs(createUnhealthyLogsProcessor, component.StabilityLevelStable), ) } -func createUnhealthyTracesProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Traces) (component.TracesProcessor, error) { +func createUnhealthyTracesProcessor(context.Context, processor.CreateSettings, component.Config, consumer.Traces) (processor.Traces, error) { return unhealthyProcessorInstance, nil } -func createUnhealthyMetricsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Metrics) (component.MetricsProcessor, error) { +func createUnhealthyMetricsProcessor(context.Context, processor.CreateSettings, component.Config, consumer.Metrics) (processor.Metrics, error) { return unhealthyProcessorInstance, nil } -func createUnhealthyLogsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Logs) (component.LogsProcessor, error) { +func createUnhealthyLogsProcessor(context.Context, processor.CreateSettings, component.Config, consumer.Logs) (processor.Logs, error) { return unhealthyProcessorInstance, nil } @@ -68,7 +63,8 @@ var unhealthyProcessorInstance = &unhealthyProcessor{ // unhealthyProcessor stores consumed traces and metrics for testing purposes. type unhealthyProcessor struct { - nopComponent + component.StartFunc + component.ShutdownFunc consumertest.Consumer } diff --git a/service/host.go b/service/host.go index ff0d6a07aa2..1f2ef006bf3 100644 --- a/service/host.go +++ b/service/host.go @@ -40,7 +40,7 @@ func (host *serviceHost) ReportFatalError(err error) { } func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) { - host.extensions.NotifyComponentStatusChange(source, event) + host.serviceExtensions.NotifyComponentStatusChange(source, event) } func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 5b8a404d66f..5f6338ec458 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -10,6 +10,7 @@ import ( "strings" "go.uber.org/multierr" + "go.uber.org/zap" "gonum.org/v1/gonum/graph" "gonum.org/v1/gonum/graph/simple" "gonum.org/v1/gonum/graph/topo" @@ -22,6 +23,8 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" + "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/pipelines" ) @@ -45,12 +48,16 @@ type Graph struct { // Keep track of how nodes relate to pipelines, so we can declare edges in the graph. pipelines map[component.ID]*pipelineNodes + + // Keep track of status source per node + statusSources map[int64]*statusReportingComponent } func Build(ctx context.Context, set Settings) (*Graph, error) { pipelines := &Graph{ componentGraph: simple.NewDirectedGraph(), pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)), + statusSources: make(map[int64]*statusReportingComponent), } for pipelineID := range set.PipelineConfigs { pipelines.pipelines[pipelineID] = &pipelineNodes{ @@ -84,12 +91,21 @@ func (g *Graph) createNodes(set Settings) error { } rcvrNode := g.createReceiver(pipelineID.Type(), recvID) pipe.receivers[rcvrNode.ID()] = rcvrNode + g.statusSources[rcvrNode.ID()] = &statusReportingComponent{ + id: recvID, + kind: component.KindReceiver, + } } pipe.capabilitiesNode = newCapabilitiesNode(pipelineID) for _, procID := range pipelineCfg.Processors { - pipe.processors = append(pipe.processors, g.createProcessor(pipelineID, procID)) + procNode := g.createProcessor(pipelineID, procID) + pipe.processors = append(pipe.processors, procNode) + g.statusSources[procNode.ID()] = &statusReportingComponent{ + id: procID, + kind: component.KindProcessor, + } } pipe.fanOutNode = newFanOutNode(pipelineID) @@ -102,6 +118,10 @@ func (g *Graph) createNodes(set Settings) error { } expNode := g.createExporter(pipelineID.Type(), exprID) pipe.exporters[expNode.ID()] = expNode + g.statusSources[expNode.ID()] = &statusReportingComponent{ + id: expNode.componentID, + kind: component.KindExporter, + } } } @@ -158,6 +178,10 @@ func (g *Graph) createNodes(set Settings) error { connNode := g.createConnector(eID, rID, connID) g.pipelines[eID].exporters[connNode.ID()] = connNode g.pipelines[rID].receivers[connNode.ID()] = connNode + g.statusSources[connNode.ID()] = &statusReportingComponent{ + id: connNode.componentID, + kind: component.KindConnector, + } } } } @@ -316,7 +340,20 @@ type pipelineNodes struct { exporters map[int64]graph.Node } -func (g *Graph) StartAll(ctx context.Context, host component.Host) error { +type statusReportingComponent struct { + kind component.Kind + id component.ID +} + +func (s *statusReportingComponent) GetKind() component.Kind { + return s.kind +} + +func (s *statusReportingComponent) ID() component.ID { + return s.id +} + +func (g *Graph) StartAll(ctx context.Context, host servicehost.Host) error { nodes, err := topo.Sort(g.componentGraph) if err != nil { return err @@ -326,12 +363,27 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error { // are started before upstream components. This ensures that each // component's consumer is ready to consume. for i := len(nodes) - 1; i >= 0; i-- { - comp, ok := nodes[i].(component.Component) + node := nodes[i] + comp, ok := node.(component.Component) + if !ok { // Skip capabilities/fanout nodes continue } - if compErr := comp.Start(ctx, host); compErr != nil { + + statusSource, ok := g.statusSources[node.ID()] + + if !ok { + // TODO: this should not happen. I'm not sure this code path will remain, but if it does + // we should ensure that we have a valid nop value for statusSource. + } + + // note: there is no longer a per-component logger, hence the zap.NewNop() + // we should be able to remove the logger from components.NewHostWrapper as we deprecate + // and remove host.ReportFatalError + hostWrapper := components.NewHostWrapper(host, statusSource, zap.NewNop()) + + if compErr := comp.Start(ctx, hostWrapper); compErr != nil { return compErr } } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 1279ff145d5..9357f2a3d49 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/internal/testcomponents" "go.opentelemetry.io/collector/service/pipelines" ) @@ -146,7 +147,7 @@ func TestGraphStartStop(t *testing.T) { pg.componentGraph.SetEdge(simple.Edge{F: f, T: t}) } - require.NoError(t, pg.StartAll(ctx, componenttest.NewNopHost())) + require.NoError(t, pg.StartAll(ctx, servicehost.NewNopHost())) for _, edge := range tt.edges { assert.Greater(t, ctx.order[edge[0]], ctx.order[edge[1]]) } @@ -173,7 +174,7 @@ func TestGraphStartStopCycle(t *testing.T) { pg.componentGraph.SetEdge(simple.Edge{F: c1, T: e1}) pg.componentGraph.SetEdge(simple.Edge{F: c1, T: p1}) // loop back - err := pg.StartAll(context.Background(), componenttest.NewNopHost()) + err := pg.StartAll(context.Background(), servicehost.NewNopHost()) assert.Error(t, err) assert.Contains(t, err.Error(), `topo: no topological ordering: cyclic components`) @@ -194,7 +195,7 @@ func TestGraphStartStopComponentError(t *testing.T) { shutdownErr: errors.New("bar"), }, }) - assert.EqualError(t, pg.StartAll(context.Background(), componenttest.NewNopHost()), "foo") + assert.EqualError(t, pg.StartAll(context.Background(), servicehost.NewNopHost()), "foo") assert.EqualError(t, pg.ShutdownAll(context.Background()), "bar") } @@ -667,7 +668,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { assert.Equal(t, len(test.pipelineConfigs), len(pg.pipelines)) - assert.NoError(t, pg.StartAll(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, pg.StartAll(context.Background(), servicehost.NewNopHost())) mutatingPipelines := make(map[component.ID]bool, len(test.pipelineConfigs)) @@ -2027,7 +2028,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -2041,7 +2042,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -2055,7 +2056,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) @@ -2075,7 +2076,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { } pipelines, err := Build(context.Background(), set) assert.NoError(t, err) - assert.Error(t, pipelines.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Error(t, pipelines.StartAll(context.Background(), servicehost.NewNopHost())) assert.Error(t, pipelines.ShutdownAll(context.Background())) }) } diff --git a/service/internal/servicehost/host.go b/service/internal/servicehost/host.go index 754f483e49c..fe8181d24e3 100644 --- a/service/internal/servicehost/host.go +++ b/service/internal/servicehost/host.go @@ -33,6 +33,6 @@ type Host interface { ReportFatalError(err error) GetFactory(kind component.Kind, componentType component.Type) component.Factory - GetExtensions() map[component.ID]component.Extension - GetExporters() map[component.DataType]map[component.ID]component.Exporter + GetExtensions() map[component.ID]component.Component + GetExporters() map[component.DataType]map[component.ID]component.Component } diff --git a/service/internal/servicehost/nop_host.go b/service/internal/servicehost/nop_host.go index 7a5717624fb..62c265f2a3e 100644 --- a/service/internal/servicehost/nop_host.go +++ b/service/internal/servicehost/nop_host.go @@ -31,11 +31,11 @@ func (n nopHost) GetFactory(kind component.Kind, componentType component.Type) c return nil } -func (n nopHost) GetExtensions() map[component.ID]component.Extension { +func (n nopHost) GetExtensions() map[component.ID]component.Component { return nil } -func (n nopHost) GetExporters() map[component.DataType]map[component.ID]component.Exporter { +func (n nopHost) GetExporters() map[component.DataType]map[component.ID]component.Component { return nil } diff --git a/service/service.go b/service/service.go index ed9540ec948..c17a029bed1 100644 --- a/service/service.go +++ b/service/service.go @@ -29,6 +29,7 @@ import ( "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/graph" "go.opentelemetry.io/collector/service/internal/proctelemetry" + "go.opentelemetry.io/collector/service/internal/servicehost" "go.opentelemetry.io/collector/service/telemetry" ) @@ -234,7 +235,7 @@ func (srv *Service) Logger() *zap.Logger { return srv.telemetrySettings.Logger } -func getBallastSize(host component.Host) uint64 { +func getBallastSize(host servicehost.Host) uint64 { for _, ext := range host.GetExtensions() { if bExt, ok := ext.(interface{ GetBallastSize() uint64 }); ok { return bExt.GetBallastSize()