-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce component status reporting #6560
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package componenttest // import "go.opentelemetry.io/collector/component/componenttest" | ||
|
||
import ( | ||
"context" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config" | ||
) | ||
|
||
// NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions. | ||
func NewStatusWatcherExtensionCreateSettings() component.ExtensionCreateSettings { | ||
return component.ExtensionCreateSettings{ | ||
TelemetrySettings: NewNopTelemetrySettings(), | ||
BuildInfo: component.NewDefaultBuildInfo(), | ||
} | ||
} | ||
|
||
type statusWatcherExtensionConfig struct { | ||
config.ExtensionSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct | ||
} | ||
|
||
// NewStatusWatcherExtensionFactory returns a component.ExtensionFactory that constructs nop extensions. | ||
func NewStatusWatcherExtensionFactory( | ||
onStatusChanged func(source component.StatusSource, event *component.StatusEvent), | ||
) component.ExtensionFactory { | ||
return component.NewExtensionFactory( | ||
"statuswatcher", | ||
func() component.ExtensionConfig { | ||
return &statusWatcherExtensionConfig{ | ||
ExtensionSettings: config.NewExtensionSettings(component.NewID("statuswatcher")), | ||
} | ||
}, | ||
func(context.Context, component.ExtensionCreateSettings, component.ExtensionConfig) (component.Extension, error) { | ||
return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil | ||
}, | ||
component.StabilityLevelStable) | ||
} | ||
|
||
// statusWatcherExtension stores consumed traces and metrics for testing purposes. | ||
type statusWatcherExtension struct { | ||
nopComponent | ||
onStatusChanged func(source component.StatusSource, event *component.StatusEvent) | ||
} | ||
|
||
func (e statusWatcherExtension) ComponentStatusChanged(source component.StatusSource, event *component.StatusEvent) { | ||
e.onStatusChanged(source, event) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package componenttest // import "go.opentelemetry.io/collector/component/componenttest" | ||
|
||
import ( | ||
"context" | ||
|
||
"go.opentelemetry.io/collector/component" | ||
"go.opentelemetry.io/collector/config" | ||
"go.opentelemetry.io/collector/consumer" | ||
"go.opentelemetry.io/collector/consumer/consumertest" | ||
) | ||
|
||
// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions. | ||
func NewUnhealthyProcessorCreateSettings() component.ProcessorCreateSettings { | ||
return component.ProcessorCreateSettings{ | ||
TelemetrySettings: NewNopTelemetrySettings(), | ||
BuildInfo: component.NewDefaultBuildInfo(), | ||
} | ||
} | ||
|
||
type unhealthyProcessorConfig struct { | ||
config.ProcessorSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct | ||
} | ||
|
||
// NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors. | ||
func NewUnhealthyProcessorFactory() component.ProcessorFactory { | ||
return component.NewProcessorFactory( | ||
"unhealthy", | ||
func() component.ProcessorConfig { | ||
return &unhealthyProcessorConfig{ | ||
ProcessorSettings: config.NewProcessorSettings(component.NewID("nop")), | ||
} | ||
}, | ||
component.WithTracesProcessor(createUnhealthyTracesProcessor, component.StabilityLevelStable), | ||
component.WithMetricsProcessor(createUnhealthyMetricsProcessor, component.StabilityLevelStable), | ||
component.WithLogsProcessor(createUnhealthyLogsProcessor, component.StabilityLevelStable), | ||
) | ||
} | ||
|
||
func createUnhealthyTracesProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Traces) (component.TracesProcessor, error) { | ||
return unhealthyProcessorInstance, nil | ||
} | ||
|
||
func createUnhealthyMetricsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Metrics) (component.MetricsProcessor, error) { | ||
return unhealthyProcessorInstance, nil | ||
} | ||
|
||
func createUnhealthyLogsProcessor(context.Context, component.ProcessorCreateSettings, component.ProcessorConfig, consumer.Logs) (component.LogsProcessor, error) { | ||
return unhealthyProcessorInstance, nil | ||
} | ||
|
||
var unhealthyProcessorInstance = &unhealthyProcessor{ | ||
Consumer: consumertest.NewNop(), | ||
} | ||
|
||
// unhealthyProcessor stores consumed traces and metrics for testing purposes. | ||
type unhealthyProcessor struct { | ||
nopComponent | ||
consumertest.Consumer | ||
} | ||
|
||
func (unhealthyProcessor) Start(ctx context.Context, host component.Host) error { | ||
go func() { | ||
evt, _ := component.NewStatusEvent(component.StatusError) | ||
host.ReportComponentStatus(evt) | ||
}() | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package component | ||
|
||
import ( | ||
"errors" | ||
) | ||
|
||
type Status int32 | ||
|
||
const ( | ||
StatusOK Status = iota | ||
StatusError | ||
) | ||
|
||
// StatusSource component that reports a status about itself. | ||
// The implementation of this interface must be comparable to be useful as a map key. | ||
type StatusSource interface { | ||
ID() ID | ||
} | ||
Comment on lines
+28
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you need this interface at all? Simply pass the ID? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need a key that uniquely identifies the component. ID is not a unique identification. You may have a processor and a receiver that have the same ID (we don't enforce that receivers and processors don't have the same type string - although in reality they are all unique in core and contrib). Why it needs to be unique? So that the StatusWatcher can maintain a Alternatively we should enforce uniqueness of ID across all component types. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interesting point... I understand now the need, let me think a bit more, probably better to have this as a struct if we can define it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yet another alternate is to require the Host to track the aggregate component status. In that case StatusWatcher will only be notified of that single aggregate bool status. But in that case we have to hard-code the aggregation logic in the Host and lose flexibility in allowing the extension to decide what to do with individual component statuses. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For the "global component ID", I think we need something like:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This also requires #6540 otherwise you can still have duplicates even with GlobalID. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point about that. Now I am more confused after I looked at your implementation https://github.com/open-telemetry/opentelemetry-collector/pull/6560/files#diff-bcb3fc758a54acf0dda21a0d9c79b6cf6e6adcf58d0a282c93be50553523b09dR41 and https://github.com/open-telemetry/opentelemetry-collector/pull/6560/files#diff-19abf8e6f60ec275051d1447d4d97f4c55b449192f2cb946e3e4ea44ff49fcc0R69 At least in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a test that verifies the uniqueness of |
||
|
||
type StatusEvent struct { | ||
status Status | ||
err error | ||
} | ||
|
||
func (ev *StatusEvent) Status() Status { | ||
return ev.status | ||
} | ||
|
||
// Err returns the error associated with the ComponentEvent. | ||
func (ev *StatusEvent) Err() error { | ||
return ev.err | ||
} | ||
Comment on lines
+43
to
+46
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can this be a "Description" instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you mean to rename to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
// statusEventOption applies options to a StatusEvent. | ||
type statusEventOption func(*StatusEvent) error | ||
|
||
// WithError sets the error object of the Event. It is optional | ||
// and should only be applied to an Event of type ComponentError. | ||
func WithError(err error) statusEventOption { | ||
return func(o *StatusEvent) error { | ||
if o.status == StatusOK { | ||
return errors.New("event with ComponentOK cannot have an error") | ||
} | ||
o.err = err | ||
return nil | ||
} | ||
} | ||
|
||
// NewStatusEvent creates and returns a StatusEvent with default and provided | ||
// options. Will return an error if an error is provided for a non-error event | ||
// type (status.ComponentOK). | ||
// If the timestamp is not provided will set it to time.Now(). | ||
func NewStatusEvent(status Status, options ...statusEventOption) (*StatusEvent, error) { | ||
ev := StatusEvent{ | ||
status: status, | ||
} | ||
|
||
for _, opt := range options { | ||
if err := opt(&ev); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
return &ev, nil | ||
} | ||
|
||
// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry | ||
// Collector that is to be implemented by extensions interested in changes to component | ||
// status. | ||
Comment on lines
+81
to
+83
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any Component or just extensions? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only extensions can watch. Any component can be a source. We can add support for any component to watch but that requires more work and I don't know if we need it. |
||
type StatusWatcher interface { | ||
// ComponentStatusChanged notifies about a change in the source component status. | ||
// Extensions that implement this interface must be ready that the ComponentStatusChanged | ||
// may be called before, after or concurrently with Component.Shutdown() call. | ||
// The function may be called concurrently with itself. | ||
ComponentStatusChanged(source StatusSource, event *StatusEvent) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
package component | ||
|
||
import ( | ||
"fmt" | ||
"testing" | ||
"unsafe" | ||
) | ||
|
||
func TestStatusEventSize(t *testing.T) { | ||
fmt.Printf("StatusEvent size=%d", unsafe.Sizeof(StatusEvent{})) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addition of this method is a breaking change. See failure on Jaeger repo: https://github.com/open-telemetry/opentelemetry-collector/actions/runs/3481700391/jobs/5823123326
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bogdandrutu we have a problem with this.
Adding a method to this interface breaks external codebases (like Jaeger).
I do not see a good way to handle this gracefully. We can add the reporting capability to a different interface and probe for it in the component's, but that does not look very nice to me, e.g.
Another approach would be to add a new optional
Start2(ctx context.Context, host component.Host2)
to Component2 interface and components can opt in to it, but must continue to support Start(). This looks even uglier.If we don't want to handle this gracefully then we must coordinate this change with Jaeger.
Any other ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I submitted a contrib PR that should remove the dependency on Jaeger collector, so we can make contrib tests pass, but I believe we will still likely need to run through a breaking change.
I see a few other problems with the current state of this interface:
Get
which goes against Go recommendations, maybe we want to change them and removeGet
prefix.I believe we need to take the opportunity and make a bigger refactoring that would allow us to solve all the outlined problems at once. We discussed the problem with @bogdandrutu and came up with the following plan:
Keep only one "core" method
ReportComponentStatus
theHost
interface and move all other "getter" methods to optional interfaces. The optional interfaces for exporter and extension getters can be moved to their own packages (exporter/extension), so we don't need to change returned values fromExtension|Exporter
toComponent
as done in #6553 anymore.I'll play with it, submit a draft PR, then we can combine it with this PR and try to migrate to the new interfaces as gracefully as feasible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm catching up on this PR. It looks like this is where it stalled out. Some thoughts on how to move forward:
I'm proposing that we deprecate
GetExporters
(#7370). I don't believe there are valid reasons to need this function or others like it, such asGetReceivers
,GetProcessors
, etc.Between renaming
GetExtensions
toExtensions
, deprecatingReportFatalError
andGetExporters
, and addingReportComponentStatus
, it looks like we want quite a different interface. Hard to believe we will just live with this as is, so ideally we just need to identify the best migration option. It looks like we no longer need to coordinate with Jaeger but presumably this will affect other components defined elsewhere.I also tried to identify a graceful option. The best I came up with is quite ugly, but would require no unannounced breaking changes and does not require changes to any other interfaces (i.e.
component.Component
):component.Host2
.ReportFatalError
,GetExporters
, andGetExtensions
to alert users of these specific functions. Recommend the ugly solution of asserting the optional interfaces (temporarily).component.Host
to alert all all affected. However, clearly indicate that this interface will be changed tocomponent.Host
, rather than removed.component.Host
to matchcomponent.Host2
. Undeprecatecomponent.Host
and deprecatecomponent.Host2