Skip to content

Commit

Permalink
Implement and use component.GlobalID in place of component.StatusSource
Browse files Browse the repository at this point in the history
This is an implementation based on the thread starting with this comment:
open-telemetry#6560 (comment)
  • Loading branch information
mwear committed Aug 3, 2023
1 parent cec36a7 commit b82a38c
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 62 deletions.
7 changes: 7 additions & 0 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,10 @@ type CreateDefaultConfigFunc func() Config
func (f CreateDefaultConfigFunc) CreateDefaultConfig() Config {
return f()
}

// GlobalID uniquely identifies a component
type GlobalID struct {
ID ID
Kind Kind
PipelineID ID // Not empty only if the Kind is Processor
}
8 changes: 1 addition & 7 deletions component/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,6 @@ const (
StatusError
)

// StatusSource component that reports a status about itself.
// The implementation of this interface must be comparable to be useful as a map key.
type StatusSource interface {
ID() ID
}

type StatusEvent struct {
status Status
err error
Expand Down Expand Up @@ -75,5 +69,5 @@ type StatusWatcher interface {
// Extensions that implement this interface must be ready that the ComponentStatusChanged
// may be called before, after or concurrently with Component.Shutdown() call.
// The function may be called concurrently with itself.
ComponentStatusChanged(source StatusSource, event *StatusEvent)
ComponentStatusChanged(source *GlobalID, event *StatusEvent)
}
6 changes: 3 additions & 3 deletions extension/extensiontest/statuswatcher_extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewStatusWatcherExtensionCreateSettings() extension.CreateSettings {

// NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions.
func NewStatusWatcherExtensionFactory(
onStatusChanged func(source component.StatusSource, event *component.StatusEvent),
onStatusChanged func(source *component.GlobalID, event *component.StatusEvent),
) extension.Factory {
return extension.NewFactory(
"statuswatcher",
Expand All @@ -38,9 +38,9 @@ func NewStatusWatcherExtensionFactory(
type statusWatcherExtension struct {
component.StartFunc
component.ShutdownFunc
onStatusChanged func(source component.StatusSource, event *component.StatusEvent)
onStatusChanged func(source *component.GlobalID, event *component.StatusEvent)
}

func (e statusWatcherExtension) ComponentStatusChanged(source component.StatusSource, event *component.StatusEvent) {
func (e statusWatcherExtension) ComponentStatusChanged(source *component.GlobalID, event *component.StatusEvent) {
e.onStatusChanged(source, event)
}
6 changes: 3 additions & 3 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ func TestComponentStatusWatcher(t *testing.T) {
factories.Processors[unhealthyProcessorFactory.Type()] = unhealthyProcessorFactory

// Keep track of all status changes in a map.
changedComponents := map[component.StatusSource]component.Status{}
changedComponents := map[*component.GlobalID]component.Status{}
var mux sync.Mutex
onStatusChanged := func(source component.StatusSource, event *component.StatusEvent) {
onStatusChanged := func(source *component.GlobalID, event *component.StatusEvent) {
mux.Lock()
defer mux.Unlock()
changedComponents[source] = event.Status()
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestComponentStatusWatcher(t *testing.T) {

for k, v := range changedComponents {
// All processors must report a status change with the same ID
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID())
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID)
// And all must be in StatusError
assert.EqualValues(t, component.StatusError, v)
}
Expand Down
21 changes: 6 additions & 15 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,17 @@ type Extensions struct {
extMap map[component.ID]extension.Extension
}

type statusReportingExtension struct {
id component.ID
}

func (s *statusReportingExtension) GetKind() component.Kind {
return component.KindExtension
}

func (s *statusReportingExtension) ID() component.ID {
return s.id
}

// Start starts all extensions.
func (bes *Extensions) Start(ctx context.Context, host servicehost.Host) error {
bes.telemetry.Logger.Info("Starting extensions...")
for extID, ext := range bes.extMap {
extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID)
extLogger.Info("Extension is starting...")
statusSource := &statusReportingExtension{extID}
if err := ext.Start(ctx, components.NewHostWrapper(host, statusSource, extLogger)); err != nil {
globalID := &component.GlobalID{
ID: extID,
Kind: component.KindExtension,
}
if err := ext.Start(ctx, components.NewHostWrapper(host, globalID, extLogger)); err != nil {
return err
}
extLogger.Info("Extension started.")
Expand Down Expand Up @@ -86,7 +77,7 @@ func (bes *Extensions) NotifyPipelineNotReady() error {
return errs
}

func (bes *Extensions) NotifyComponentStatusChange(source component.StatusSource, event *component.StatusEvent) error {
func (bes *Extensions) NotifyComponentStatusChange(source *component.GlobalID, event *component.StatusEvent) error {
var errs error
for _, ext := range bes.extMap {
if pw, ok := ext.(component.StatusWatcher); ok {
Expand Down
2 changes: 1 addition & 1 deletion service/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (host *serviceHost) ReportFatalError(err error) {
host.asyncErrorChannel <- err
}

func (host *serviceHost) ReportComponentStatus(source component.StatusSource, event *component.StatusEvent) {
func (host *serviceHost) ReportComponentStatus(source *component.GlobalID, event *component.StatusEvent) {
// TODO: What should we do if there is an error notifying here?
host.serviceExtensions.NotifyComponentStatusChange(source, event) //nolint:errcheck
}
Expand Down
4 changes: 2 additions & 2 deletions service/internal/components/host_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
// TODO: rename this to componentHost or hostComponentConnector to better reflect the purpose.
type hostWrapper struct {
servicehost.Host
component component.StatusSource
component *component.GlobalID
*zap.Logger
}

func NewHostWrapper(host servicehost.Host, component component.StatusSource, logger *zap.Logger) component.Host {
func NewHostWrapper(host servicehost.Host, component *component.GlobalID, logger *zap.Logger) component.Host {
return &hostWrapper{
host,
component,
Expand Down
46 changes: 17 additions & 29 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ type Graph struct {
pipelines map[component.ID]*pipelineNodes

// Keep track of status source per node
statusSources map[int64]*statusReportingComponent
globalIDs map[int64]*component.GlobalID
}

func Build(ctx context.Context, set Settings) (*Graph, error) {
pipelines := &Graph{
componentGraph: simple.NewDirectedGraph(),
pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)),
statusSources: make(map[int64]*statusReportingComponent),
globalIDs: make(map[int64]*component.GlobalID),
}
for pipelineID := range set.PipelineConfigs {
pipelines.pipelines[pipelineID] = &pipelineNodes{
Expand Down Expand Up @@ -91,9 +91,9 @@ func (g *Graph) createNodes(set Settings) error {
}
rcvrNode := g.createReceiver(pipelineID.Type(), recvID)
pipe.receivers[rcvrNode.ID()] = rcvrNode
g.statusSources[rcvrNode.ID()] = &statusReportingComponent{
id: recvID,
kind: component.KindReceiver,
g.globalIDs[rcvrNode.ID()] = &component.GlobalID{
ID: recvID,
Kind: component.KindReceiver,
}
}

Expand All @@ -102,9 +102,10 @@ func (g *Graph) createNodes(set Settings) error {
for _, procID := range pipelineCfg.Processors {
procNode := g.createProcessor(pipelineID, procID)
pipe.processors = append(pipe.processors, procNode)
g.statusSources[procNode.ID()] = &statusReportingComponent{
id: procID,
kind: component.KindProcessor,
g.globalIDs[procNode.ID()] = &component.GlobalID{
ID: procID,
Kind: component.KindProcessor,
PipelineID: pipelineID,
}
}

Expand All @@ -118,9 +119,9 @@ func (g *Graph) createNodes(set Settings) error {
}
expNode := g.createExporter(pipelineID.Type(), exprID)
pipe.exporters[expNode.ID()] = expNode
g.statusSources[expNode.ID()] = &statusReportingComponent{
id: expNode.componentID,
kind: component.KindExporter,
g.globalIDs[expNode.ID()] = &component.GlobalID{
ID: expNode.componentID,
Kind: component.KindExporter,
}
}
}
Expand Down Expand Up @@ -178,9 +179,9 @@ func (g *Graph) createNodes(set Settings) error {
connNode := g.createConnector(eID, rID, connID)
g.pipelines[eID].exporters[connNode.ID()] = connNode
g.pipelines[rID].receivers[connNode.ID()] = connNode
g.statusSources[connNode.ID()] = &statusReportingComponent{
id: connNode.componentID,
kind: component.KindConnector,
g.globalIDs[connNode.ID()] = &component.GlobalID{
ID: connNode.componentID,
Kind: component.KindConnector,
}
}
}
Expand Down Expand Up @@ -340,19 +341,6 @@ type pipelineNodes struct {
exporters map[int64]graph.Node
}

type statusReportingComponent struct {
kind component.Kind
id component.ID
}

func (s *statusReportingComponent) GetKind() component.Kind {
return s.kind
}

func (s *statusReportingComponent) ID() component.ID {
return s.id
}

func (g *Graph) StartAll(ctx context.Context, host servicehost.Host) error {
nodes, err := topo.Sort(g.componentGraph)
if err != nil {
Expand All @@ -371,8 +359,8 @@ func (g *Graph) StartAll(ctx context.Context, host servicehost.Host) error {
continue
}

statusSource := g.statusSources[node.ID()]
hostWrapper := components.NewHostWrapper(host, statusSource, zap.NewNop())
globalID := g.globalIDs[node.ID()]
hostWrapper := components.NewHostWrapper(host, globalID, zap.NewNop())

if compErr := comp.Start(ctx, hostWrapper); compErr != nil {
return compErr
Expand Down
2 changes: 1 addition & 1 deletion service/internal/servicehost/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Host interface {
// ReportComponentStatus is used to communicate the status of a source component to the Host.
// The Host implementations will broadcast this information to interested parties via
// StatusWatcher interface.
ReportComponentStatus(source component.StatusSource, event *component.StatusEvent)
ReportComponentStatus(source *component.GlobalID, event *component.StatusEvent)

// See component.Host for the documentation of the rest of the functions.

Expand Down
2 changes: 1 addition & 1 deletion service/internal/servicehost/nop_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type nopHost struct{}
func (n nopHost) ReportFatalError(_ error) {
}

func (n nopHost) ReportComponentStatus(_ component.StatusSource, _ *component.StatusEvent) {
func (n nopHost) ReportComponentStatus(_ *component.GlobalID, _ *component.StatusEvent) {
}

func (n nopHost) GetFactory(_ component.Kind, _ component.Type) component.Factory {
Expand Down

0 comments on commit b82a38c

Please sign in to comment.