diff --git a/processor/k8sattributesprocessor/client_test.go b/processor/k8sattributesprocessor/client_test.go index c69e0638301a..dca90b6bc3d9 100644 --- a/processor/k8sattributesprocessor/client_test.go +++ b/processor/k8sattributesprocessor/client_test.go @@ -9,10 +9,8 @@ import ( "go.opentelemetry.io/collector/component" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" ) @@ -37,19 +35,43 @@ func selectors() (labels.Selector, fields.Selector) { } // newFakeClient instantiates a new FakeClient object and satisfies the ClientProvider type -func newFakeClient(_ component.TelemetrySettings, _ k8sconfig.APIConfig, rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet, _ bool, _ time.Duration) (kube.Client, error) { - cs := fake.NewSimpleClientset() - +func newFakeClient( + _ component.TelemetrySettings, + rules kube.ExtractionRules, + filters kube.Filters, + associations []kube.Association, + _ kube.Excludes, + _ *kube.InformerProviders, + _ bool, + _ time.Duration, +) (kube.Client, error) { ls, fs := selectors() + closeCh := make(chan struct{}) + informer, err := kube.NewFakeInformer("", ls, fs, nil, closeCh) + if err != nil { + return nil, err + } + nsInformer, err := kube.NewFakeInformer("", ls, fs, nil, closeCh) + if err != nil { + return nil, err + } + nodeInformer, err := kube.NewFakeInformer("", ls, fs, nil, closeCh) + if err != nil { + return nil, err + } + rsInformer, err := kube.NewFakeInformer("", ls, fs, nil, closeCh) + if err != nil { + return nil, err + } return &fakeClient{ Pods: map[kube.PodIdentifier]*kube.Pod{}, Rules: rules, Filters: filters, Associations: associations, - Informer: kube.NewFakeInformer(cs, "", ls, fs), - NamespaceInformer: kube.NewFakeInformer(cs, "", ls, fs), - NodeInformer: kube.NewFakeInformer(cs, "", ls, fs), - ReplicaSetInformer: kube.NewFakeInformer(cs, "", ls, fs), + Informer: informer, + NamespaceInformer: nsInformer, + NodeInformer: nodeInformer, + ReplicaSetInformer: rsInformer, StopCh: make(chan struct{}), }, nil } diff --git a/processor/k8sattributesprocessor/factory.go b/processor/k8sattributesprocessor/factory.go index c3b66d049cac..b4078ea962a5 100644 --- a/processor/k8sattributesprocessor/factory.go +++ b/processor/k8sattributesprocessor/factory.go @@ -16,12 +16,10 @@ import ( "go.opentelemetry.io/collector/processor/xprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" - "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/metadata" ) var ( - kubeClientProvider = kube.ClientProvider(nil) consumerCapabilities = consumer.Capabilities{MutatesData: true} defaultExcludes = ExcludeConfig{Pods: []ExcludePodConfig{{Name: "jaeger-agent"}, {Name: "jaeger-collector"}}} ) diff --git a/processor/k8sattributesprocessor/factory_test.go b/processor/k8sattributesprocessor/factory_test.go index ee1714423f39..b3496a519445 100644 --- a/processor/k8sattributesprocessor/factory_test.go +++ b/processor/k8sattributesprocessor/factory_test.go @@ -24,9 +24,6 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateProcessor(t *testing.T) { factory := NewFactory() - realClient := kubeClientProvider - kubeClientProvider = newFakeClient - cfg := factory.CreateDefaultConfig() params := processortest.NewNopSettings() @@ -64,7 +61,4 @@ func TestCreateProcessor(t *testing.T) { pp, err = factory.(xprocessor.Factory).CreateProfiles(context.Background(), params, cfg, consumertest.NewNop()) assert.NotNil(t, pp) assert.NoError(t, err) - - // Switch it back so other tests run afterwards will not fail on unexpected state - kubeClientProvider = realClient } diff --git a/processor/k8sattributesprocessor/internal/kube/client.go b/processor/k8sattributesprocessor/internal/kube/client.go index a984e7f85cbd..54bcbcc9ec35 100644 --- a/processor/k8sattributesprocessor/internal/kube/client.go +++ b/processor/k8sattributesprocessor/internal/kube/client.go @@ -23,10 +23,8 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/metadata" ) @@ -40,20 +38,23 @@ var enableRFC3339Timestamp = featuregate.GlobalRegistry().MustRegister( // WatchClient is the main interface provided by this package to a kubernetes cluster. type WatchClient struct { - m sync.RWMutex - deleteMut sync.Mutex - logger *zap.Logger - kc kubernetes.Interface - informer cache.SharedInformer - namespaceInformer cache.SharedInformer - nodeInformer cache.SharedInformer - replicasetInformer cache.SharedInformer - replicasetRegex *regexp.Regexp - cronJobRegex *regexp.Regexp - deleteQueue []deleteRequest - stopCh chan struct{} - waitForMetadata bool - waitForMetadataTimeout time.Duration + m sync.RWMutex + deleteMut sync.Mutex + logger *zap.Logger + informer cache.SharedInformer + podHandlerRegistration cache.ResourceEventHandlerRegistration + namespaceInformer cache.SharedInformer + namespaceHandlerRegistration cache.ResourceEventHandlerRegistration + nodeInformer cache.SharedInformer + nodeHandlerRegistration cache.ResourceEventHandlerRegistration + replicasetInformer cache.SharedInformer + replicasetHandlerRegistration cache.ResourceEventHandlerRegistration + replicasetRegex *regexp.Regexp + cronJobRegex *regexp.Regexp + deleteQueue []deleteRequest + stopCh chan struct{} + waitForMetadata bool + waitForMetadataTimeout time.Duration // A map containing Pod related data, used to associate them with resources. // Key can be either an IP address or Pod UID @@ -89,15 +90,11 @@ var cronJobRegex = regexp.MustCompile(`^(.*)-[0-9]+$`) // New initializes a new k8s Client. func New( set component.TelemetrySettings, - apiCfg k8sconfig.APIConfig, rules ExtractionRules, filters Filters, associations []Association, exclude Excludes, - newClientSet APIClientsetProvider, - newInformer InformerProvider, - newNamespaceInformer InformerProviderNamespace, - newReplicaSetInformer InformerProviderReplicaSet, + informerProviders *InformerProviders, waitForMetadata bool, waitForMetadataTimeout time.Duration, ) (Client, error) { @@ -124,15 +121,6 @@ func New( c.Namespaces = map[string]*Namespace{} c.Nodes = map[string]*Node{} c.ReplicaSets = map[string]*ReplicaSet{} - if newClientSet == nil { - newClientSet = k8sconfig.MakeClient - } - - kc, err := newClientSet(apiCfg) - if err != nil { - return nil, err - } - c.kc = kc labelSelector, fieldSelector, err := selectorsFromFilters(c.Filters) if err != nil { @@ -143,56 +131,52 @@ func New( zap.String("labelSelector", labelSelector.String()), zap.String("fieldSelector", fieldSelector.String()), ) - if newInformer == nil { - newInformer = newSharedInformer - } - - if newNamespaceInformer == nil { - switch { - case c.extractNamespaceLabelsAnnotations(): - // if rules to extract metadata from namespace is configured use namespace shared informer containing - // all namespaces including kube-system which contains cluster uid information (kube-system-uid) - newNamespaceInformer = newNamespaceSharedInformer - case rules.ClusterUID: - // use kube-system shared informer to only watch kube-system namespace - // reducing overhead of watching all the namespaces - newNamespaceInformer = newKubeSystemSharedInformer - default: - newNamespaceInformer = NewNoOpInformer - } - } - c.informer = newInformer(c.kc, c.Filters.Namespace, labelSelector, fieldSelector) - err = c.informer.SetTransform( - func(object any) (any, error) { - originalPod, success := object.(*api_v1.Pod) - if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing - return object, nil - } + podTransformFunc := func(object any) (any, error) { + originalPod, success := object.(*api_v1.Pod) + if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing + return object, nil + } - return removeUnnecessaryPodData(originalPod, c.Rules), nil - }, + return removeUnnecessaryPodData(originalPod, c.Rules), nil + } + c.informer, err = informerProviders.PodInformerProvider( + c.Filters.Namespace, + labelSelector, + fieldSelector, + podTransformFunc, + c.stopCh, ) if err != nil { return nil, err } - c.namespaceInformer = newNamespaceInformer(c.kc) + // if rules to extract metadata from namespace is configured use namespace shared informer containing + // all namespaces including kube-system which contains cluster uid information (kube-system-uid) + if rules.extractNamespaceLabelsAnnotations() || rules.ClusterUID { + fs := fields.Everything() + if !rules.extractNamespaceLabelsAnnotations() { + fs = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace) + } + c.namespaceInformer, err = informerProviders.NamespaceInformerProvider(fs, c.stopCh) + if err != nil { + return nil, err + } + } if rules.DeploymentName || rules.DeploymentUID { - if newReplicaSetInformer == nil { - newReplicaSetInformer = newReplicaSetSharedInformer - } - c.replicasetInformer = newReplicaSetInformer(c.kc, c.Filters.Namespace) - err = c.replicasetInformer.SetTransform( - func(object any) (any, error) { - originalReplicaset, success := object.(*apps_v1.ReplicaSet) - if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing - return object, nil - } + transformFunc := func(object any) (any, error) { + originalReplicaset, success := object.(*apps_v1.ReplicaSet) + if !success { // means this is a cache.DeletedFinalStateUnknown, in which case we do nothing + return object, nil + } - return removeUnnecessaryReplicaSetData(originalReplicaset), nil - }, + return removeUnnecessaryReplicaSetData(originalReplicaset), nil + } + c.replicasetInformer, err = informerProviders.ReplicaSetInformerProvider( + c.Filters.Namespace, + transformFunc, + c.stopCh, ) if err != nil { return nil, err @@ -200,7 +184,10 @@ func New( } if c.extractNodeLabelsAnnotations() || c.extractNodeUID() { - c.nodeInformer = k8sconfig.NewNodeSharedInformer(c.kc, c.Filters.Node, 5*time.Minute) + c.nodeInformer, err = informerProviders.NodeInformerProvider(c.Filters.Node, 5*time.Minute, c.stopCh) + if err != nil { + return nil, err + } } return c, err @@ -208,8 +195,11 @@ func New( // Start registers pod event handlers and starts watching the kubernetes cluster for pod changes. func (c *WatchClient) Start() error { + var err error + c.m.Lock() + defer c.m.Unlock() synced := make([]cache.InformerSynced, 0) - reg, err := c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + c.podHandlerRegistration, err = c.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handlePodAdd, UpdateFunc: c.handlePodUpdate, DeleteFunc: c.handlePodDelete, @@ -217,22 +207,22 @@ func (c *WatchClient) Start() error { if err != nil { return err } - synced = append(synced, reg.HasSynced) - go c.informer.Run(c.stopCh) + synced = append(synced, c.podHandlerRegistration.HasSynced) - reg, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.handleNamespaceAdd, - UpdateFunc: c.handleNamespaceUpdate, - DeleteFunc: c.handleNamespaceDelete, - }) - if err != nil { - return err + if c.namespaceInformer != nil { + c.namespaceHandlerRegistration, err = c.namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.handleNamespaceAdd, + UpdateFunc: c.handleNamespaceUpdate, + DeleteFunc: c.handleNamespaceDelete, + }) + if err != nil { + return err + } + synced = append(synced, c.namespaceHandlerRegistration.HasSynced) } - synced = append(synced, reg.HasSynced) - go c.namespaceInformer.Run(c.stopCh) - if c.Rules.DeploymentName || c.Rules.DeploymentUID { - reg, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + if c.replicasetInformer != nil { + c.replicasetHandlerRegistration, err = c.replicasetInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleReplicaSetAdd, UpdateFunc: c.handleReplicaSetUpdate, DeleteFunc: c.handleReplicaSetDelete, @@ -240,12 +230,11 @@ func (c *WatchClient) Start() error { if err != nil { return err } - synced = append(synced, reg.HasSynced) - go c.replicasetInformer.Run(c.stopCh) + synced = append(synced, c.replicasetHandlerRegistration.HasSynced) } if c.nodeInformer != nil { - reg, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + c.nodeHandlerRegistration, err = c.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.handleNodeAdd, UpdateFunc: c.handleNodeUpdate, DeleteFunc: c.handleNodeDelete, @@ -253,10 +242,8 @@ func (c *WatchClient) Start() error { if err != nil { return err } - synced = append(synced, reg.HasSynced) - go c.nodeInformer.Run(c.stopCh) + synced = append(synced, c.nodeHandlerRegistration.HasSynced) } - if c.waitForMetadata { timeoutCh := make(chan struct{}) t := time.AfterFunc(c.waitForMetadataTimeout, func() { @@ -272,6 +259,42 @@ func (c *WatchClient) Start() error { // Stop signals the the k8s watcher/informer to stop watching for new events. func (c *WatchClient) Stop() { + c.m.Lock() + defer c.m.Unlock() + var eventHandlerRemovalErrors []error + if c.podHandlerRegistration != nil { + if err := c.informer.RemoveEventHandler(c.podHandlerRegistration); err != nil { + eventHandlerRemovalErrors = append(eventHandlerRemovalErrors, err) + } + c.podHandlerRegistration = nil + } + + if c.namespaceHandlerRegistration != nil { + if err := c.namespaceInformer.RemoveEventHandler(c.namespaceHandlerRegistration); err != nil { + eventHandlerRemovalErrors = append(eventHandlerRemovalErrors, err) + } + c.namespaceHandlerRegistration = nil + } + + if c.replicasetInformer != nil && c.replicasetHandlerRegistration != nil { + if err := c.replicasetInformer.RemoveEventHandler(c.replicasetHandlerRegistration); err != nil { + eventHandlerRemovalErrors = append(eventHandlerRemovalErrors, err) + } + c.replicasetHandlerRegistration = nil + } + + if c.nodeInformer != nil && c.nodeHandlerRegistration != nil { + if err := c.nodeInformer.RemoveEventHandler(c.nodeHandlerRegistration); err != nil { + eventHandlerRemovalErrors = append(eventHandlerRemovalErrors, err) + } + c.nodeHandlerRegistration = nil + } + + if len(eventHandlerRemovalErrors) > 0 { + multiErr := errors.Join(eventHandlerRemovalErrors...) + c.logger.Error("error removing event handlers from informers", zap.Error(multiErr)) + } + close(c.stopCh) } @@ -985,22 +1008,6 @@ func (c *WatchClient) addOrUpdateNamespace(namespace *api_v1.Namespace) { c.m.Unlock() } -func (c *WatchClient) extractNamespaceLabelsAnnotations() bool { - for _, r := range c.Rules.Labels { - if r.From == MetadataFromNamespace { - return true - } - } - - for _, r := range c.Rules.Annotations { - if r.From == MetadataFromNamespace { - return true - } - } - - return false -} - func (c *WatchClient) extractNodeLabelsAnnotations() bool { for _, r := range c.Rules.Labels { if r.From == MetadataFromNode { diff --git a/processor/k8sattributesprocessor/internal/kube/client_test.go b/processor/k8sattributesprocessor/internal/kube/client_test.go index de701f6fd673..f2987d5d72e4 100644 --- a/processor/k8sattributesprocessor/internal/kube/client_test.go +++ b/processor/k8sattributesprocessor/internal/kube/client_test.go @@ -4,7 +4,6 @@ package kube import ( - "fmt" "regexp" "testing" "time" @@ -21,17 +20,9 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" ) -func newFakeAPIClientset(_ k8sconfig.APIConfig) (kubernetes.Interface, error) { - return fake.NewSimpleClientset(), nil -} - func newPodIdentifier(from string, name string, value string) PodIdentifier { if from == "connection" { name = "" @@ -144,58 +135,159 @@ func nodeAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj any)) { assert.Equal(t, "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", got.NodeUID) } -func TestDefaultClientset(t *testing.T) { - c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, nil, false, 10*time.Second) - require.EqualError(t, err, "invalid authType for kubernetes: ") - assert.Nil(t, c) - - c, err = New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, nil, false, 10*time.Second) - assert.NoError(t, err) - assert.NotNil(t, c) -} - func TestBadFilters(t *testing.T) { - c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{Fields: []FieldFilter{{Op: selection.Exists}}}, []Association{}, Excludes{}, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer, false, 10*time.Second) + c, err := New( + componenttest.NewNopTelemetrySettings(), + ExtractionRules{}, + Filters{Fields: []FieldFilter{{Op: selection.Exists}}}, + []Association{}, + Excludes{}, + NewFakeInformerProviders(), + false, + 10*time.Second, + ) assert.Error(t, err) assert.Nil(t, c) } -func TestClientStartStop(t *testing.T) { - c, _ := newTestClient(t) - ctr := c.informer.GetController() - require.IsType(t, &FakeController{}, ctr) - fctr := ctr.(*FakeController) - require.NotNil(t, fctr) +func TestNewClientInformers(t *testing.T) { + testCases := []struct { + name string + extractionRules ExtractionRules + namespaceInformer bool + replicaSetInformer bool + nodeInformer bool + }{ + { + name: "default", + extractionRules: ExtractionRules{}, + }, + { + name: "namespace labels", + extractionRules: ExtractionRules{ + Labels: []FieldExtractionRule{ + { + From: MetadataFromNamespace, + }, + }, + }, + namespaceInformer: true, + }, + { + name: "namespace annotations", + extractionRules: ExtractionRules{ + Annotations: []FieldExtractionRule{ + { + From: MetadataFromNamespace, + }, + }, + }, + namespaceInformer: true, + }, + { + name: "cluster id", + extractionRules: ExtractionRules{ + ClusterUID: true, + }, + namespaceInformer: true, + }, + { + name: "deployment name", + extractionRules: ExtractionRules{ + DeploymentName: true, + }, + replicaSetInformer: true, + }, + { + name: "deployment uid", + extractionRules: ExtractionRules{ + DeploymentUID: true, + }, + replicaSetInformer: true, + }, + { + name: "deployment uid", + extractionRules: ExtractionRules{ + DeploymentUID: true, + }, + replicaSetInformer: true, + }, + { + name: "node labels", + extractionRules: ExtractionRules{ + Labels: []FieldExtractionRule{ + { + From: MetadataFromNode, + }, + }, + }, + nodeInformer: true, + }, + { + name: "node annotations", + extractionRules: ExtractionRules{ + Annotations: []FieldExtractionRule{ + { + From: MetadataFromNode, + }, + }, + }, + nodeInformer: true, + }, + { + name: "node uid", + extractionRules: ExtractionRules{ + NodeUID: true, + }, + nodeInformer: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + client, _ := newTestClientWithRulesAndFilters(t, Filters{}, tc.extractionRules) + assert.NotNil(t, client) + assert.NotNil(t, client.informer) + assert.Equal(t, tc.namespaceInformer, client.namespaceInformer != nil) + assert.Equal(t, tc.replicaSetInformer, client.replicasetInformer != nil) + assert.Equal(t, tc.nodeInformer, client.nodeInformer != nil) + }) + } +} +func TestClientStartStop(t *testing.T) { + rulesForAllInformers := ExtractionRules{ + DeploymentName: true, + ClusterUID: true, + NodeUID: true, + } + c, _ := newTestClientWithRulesAndFilters(t, Filters{}, rulesForAllInformers) + informers := []cache.SharedInformer{ + c.informer, c.namespaceInformer, c.replicasetInformer, c.nodeInformer, + } + for _, informer := range informers { + ctr := informer.GetController() + require.IsType(t, &FakeController{}, ctr) + fctr := ctr.(*FakeController) + require.NotNil(t, fctr) + } done := make(chan struct{}) - assert.False(t, fctr.HasStopped()) go func() { assert.NoError(t, c.Start()) close(done) }() - c.Stop() <-done - time.Sleep(time.Second) - assert.True(t, fctr.HasStopped()) -} + c.Stop() -func TestConstructorErrors(t *testing.T) { - er := ExtractionRules{} - ff := Filters{} - t.Run("client-provider-call", func(t *testing.T) { - var gotAPIConfig k8sconfig.APIConfig - apiCfg := k8sconfig.APIConfig{ - AuthType: "test-auth-type", - } - clientProvider := func(c k8sconfig.APIConfig) (kubernetes.Interface, error) { - gotAPIConfig = c - return nil, fmt.Errorf("error creating k8s client") - } - c, err := New(componenttest.NewNopTelemetrySettings(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, NewFakeNamespaceInformer, nil, false, 10*time.Second) - assert.Nil(t, c) - require.EqualError(t, err, "error creating k8s client") - assert.Equal(t, apiCfg, gotAPIConfig) - }) + for _, informer := range informers { + ctr := informer.GetController() + require.IsType(t, &FakeController{}, ctr) + fctr := ctr.(*FakeController) + require.NotNil(t, fctr) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.True(collect, fctr.HasStopped()) + }, time.Second, time.Millisecond) + } } func TestPodAdd(t *testing.T) { @@ -619,7 +711,7 @@ func TestGetIgnoredPod(t *testing.T) { } func TestHandlerWrongType(t *testing.T) { - c, logs := newTestClientWithRulesAndFilters(t, Filters{}) + c, logs := newTestClient(t) assert.Equal(t, 0, logs.Len()) c.handlePodAdd(1) c.handlePodDelete(1) @@ -631,7 +723,7 @@ func TestHandlerWrongType(t *testing.T) { } func TestExtractionRules(t *testing.T) { - c, _ := newTestClientWithRulesAndFilters(t, Filters{}) + c, _ := newTestClient(t) // Disable saving ip into k8s.pod.ip c.Associations[0].Sources[0].Name = "" @@ -987,7 +1079,7 @@ func TestExtractionRules(t *testing.T) { } func TestReplicaSetExtractionRules(t *testing.T) { - c, _ := newTestClientWithRulesAndFilters(t, Filters{}) + c, _ := newTestClient(t) // Disable saving ip into k8s.pod.ip c.Associations[0].Sources[0].Name = "" @@ -1143,7 +1235,7 @@ func TestReplicaSetExtractionRules(t *testing.T) { } func TestNamespaceExtractionRules(t *testing.T) { - c, _ := newTestClientWithRulesAndFilters(t, Filters{}) + c, _ := newTestClient(t) namespace := &api_v1.Namespace{ ObjectMeta: meta_v1.ObjectMeta{ @@ -1239,7 +1331,7 @@ func TestNamespaceExtractionRules(t *testing.T) { } func TestNodeExtractionRules(t *testing.T) { - c, _ := newTestClientWithRulesAndFilters(t, Filters{}) + c, _ := newTestClient(t) node := &api_v1.Node{ ObjectMeta: meta_v1.ObjectMeta{ @@ -1390,7 +1482,7 @@ func TestFilters(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - c, _ := newTestClientWithRulesAndFilters(t, tc.filters) + c, _ := newTestClientWithRulesAndFilters(t, tc.filters, ExtractionRules{}) inf := c.informer.(*FakeInformer) assert.Equal(t, tc.filters.Namespace, inf.namespace) assert.Equal(t, tc.labels, inf.labelSelector.String()) @@ -1864,7 +1956,6 @@ func TestErrorSelectorsFromFilters(t *testing.T) { } func TestExtractNamespaceLabelsAnnotations(t *testing.T) { - c, _ := newTestClientWithRulesAndFilters(t, Filters{}) testCases := []struct { name string shouldExtractNamespace bool @@ -1921,13 +2012,12 @@ func TestExtractNamespaceLabelsAnnotations(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - c.Rules = tc.rules - assert.Equal(t, tc.shouldExtractNamespace, c.extractNamespaceLabelsAnnotations()) + assert.Equal(t, tc.shouldExtractNamespace, tc.rules.extractNamespaceLabelsAnnotations()) }) } } -func newTestClientWithRulesAndFilters(t *testing.T, f Filters) (*WatchClient, *observer.ObservedLogs) { +func newTestClientWithRulesAndFilters(t *testing.T, f Filters, rules ExtractionRules) (*WatchClient, *observer.ObservedLogs) { set := componenttest.NewNopTelemetrySettings() observedLogger, logs := observer.New(zapcore.WarnLevel) set.Logger = zap.New(observedLogger) @@ -1954,13 +2044,22 @@ func newTestClientWithRulesAndFilters(t *testing.T, f Filters) (*WatchClient, *o }, }, } - c, err := New(set, k8sconfig.APIConfig{}, ExtractionRules{}, f, associations, exclude, newFakeAPIClientset, NewFakeInformer, NewFakeNamespaceInformer, NewFakeReplicaSetInformer, false, 10*time.Second) + c, err := New( + set, + rules, + f, + associations, + exclude, + NewFakeInformerProviders(), + false, + 10*time.Second, + ) require.NoError(t, err) return c.(*WatchClient), logs } func newTestClient(t *testing.T) (*WatchClient, *observer.ObservedLogs) { - return newTestClientWithRulesAndFilters(t, Filters{}) + return newTestClientWithRulesAndFilters(t, Filters{}, ExtractionRules{}) } type neverSyncedFakeClient struct { @@ -1994,23 +2093,46 @@ func TestWaitForMetadata(t *testing.T) { err: false, }, { name: "wait but never synced", - informerProvider: func(client kubernetes.Interface, namespace string, labelSelector labels.Selector, fieldSelector fields.Selector) cache.SharedInformer { - return &neverSyncedFakeClient{NewFakeInformer(client, namespace, labelSelector, fieldSelector)} + informerProvider: func( + namespace string, + labelSelector labels.Selector, + fieldSelector fields.Selector, + transformFunc cache.TransformFunc, + stopCh <-chan struct{}, + ) (cache.SharedInformer, error) { + informer, err := NewFakeInformer(namespace, labelSelector, fieldSelector, transformFunc, stopCh) + if err != nil { + return nil, err + } + return &neverSyncedFakeClient{informer}, nil }, err: true, }} for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - c, err := New(componenttest.NewNopTelemetrySettings(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, tc.informerProvider, nil, nil, true, 1*time.Second) - require.NoError(t, err) - - err = c.Start() - if tc.err { - require.Error(t, err) - } else { + t.Run( + tc.name, func(t *testing.T) { + informerProviders := NewFakeInformerProviders() + informerProviders.PodInformerProvider = tc.informerProvider + c, err := New( + componenttest.NewNopTelemetrySettings(), + ExtractionRules{}, + Filters{}, + []Association{}, + Excludes{}, + informerProviders, + true, + 1*time.Second, + ) require.NoError(t, err) - } - }) + + err = c.Start() + if tc.err { + require.Error(t, err) + } else { + require.NoError(t, err) + } + }, + ) } } diff --git a/processor/k8sattributesprocessor/internal/kube/fake_informer.go b/processor/k8sattributesprocessor/internal/kube/fake_informer.go index 15d63c4e9c0c..4fb87b3c58e6 100644 --- a/processor/k8sattributesprocessor/internal/kube/fake_informer.go +++ b/processor/k8sattributesprocessor/internal/kube/fake_informer.go @@ -21,18 +21,30 @@ type FakeInformer struct { fieldSelector fields.Selector } +func NewFakeInformerProviders() *InformerProviders { + return &InformerProviders{ + PodInformerProvider: NewFakeInformer, + NamespaceInformerProvider: NewFakeNamespaceInformer, + ReplicaSetInformerProvider: NewFakeReplicaSetInformer, + NodeInformerProvider: NewFakeNodeInformer, + } +} + func NewFakeInformer( - _ kubernetes.Interface, namespace string, labelSelector labels.Selector, fieldSelector fields.Selector, -) cache.SharedInformer { - return &FakeInformer{ + _ cache.TransformFunc, + closeCh <-chan struct{}, +) (cache.SharedInformer, error) { + informer := &FakeInformer{ FakeController: &FakeController{}, namespace: namespace, labelSelector: labelSelector, fieldSelector: fieldSelector, } + go informer.Run(closeCh) + return informer, nil } func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { @@ -68,11 +80,15 @@ type FakeNamespaceInformer struct { } func NewFakeNamespaceInformer( - _ kubernetes.Interface, -) cache.SharedInformer { - return &FakeInformer{ + fs fields.Selector, + closeCh <-chan struct{}, +) (cache.SharedInformer, error) { + informer := &FakeInformer{ FakeController: &FakeController{}, + fieldSelector: fs, } + go informer.Run(closeCh) + return informer, nil } func (f *FakeNamespaceInformer) AddEventHandler(_ cache.ResourceEventHandler) {} @@ -93,12 +109,15 @@ type FakeReplicaSetInformer struct { } func NewFakeReplicaSetInformer( - _ kubernetes.Interface, _ string, -) cache.SharedInformer { - return &FakeInformer{ + _ cache.TransformFunc, + stopCh <-chan struct{}, +) (cache.SharedInformer, error) { + informer := &FakeInformer{ FakeController: &FakeController{}, } + go informer.Run(stopCh) + return informer, nil } func (f *FakeReplicaSetInformer) AddEventHandler(_ cache.ResourceEventHandler) {} @@ -118,6 +137,18 @@ func (f *FakeReplicaSetInformer) GetController() cache.Controller { return f.FakeController } +func NewFakeNodeInformer( + _ string, + _ time.Duration, + stopCh <-chan struct{}, +) (cache.SharedInformer, error) { + informer := &FakeInformer{ + FakeController: &FakeController{}, + } + go informer.Run(stopCh) + return informer, nil +} + type FakeController struct { sync.Mutex stopped bool @@ -154,10 +185,13 @@ type NoOpInformer struct { func NewNoOpInformer( _ kubernetes.Interface, + stopCh <-chan struct{}, ) cache.SharedInformer { - return &NoOpInformer{ + informer := &NoOpInformer{ NoOpController: &NoOpController{}, } + go informer.Run(stopCh) + return informer } func (f *NoOpInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) { diff --git a/processor/k8sattributesprocessor/internal/kube/informer.go b/processor/k8sattributesprocessor/internal/kube/informer.go index 85459afe53bb..11d009f45a21 100644 --- a/processor/k8sattributesprocessor/internal/kube/informer.go +++ b/processor/k8sattributesprocessor/internal/kube/informer.go @@ -5,6 +5,7 @@ package kube // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "context" + "time" apps_v1 "k8s.io/api/apps/v1" api_v1 "k8s.io/api/core/v1" @@ -15,44 +16,106 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" ) const kubeSystemNamespace = "kube-system" +// InformerProviders holds factory functions for informers needed by the kube client. The intent is to facilitate +// informer sharing by letting the processor factory pass in custom implementations of these factories. +type InformerProviders struct { + PodInformerProvider InformerProvider + NamespaceInformerProvider InformerProviderNamespace + ReplicaSetInformerProvider InformerProviderReplicaSet + NodeInformerProvider InformerProviderNode +} + +// NewDefaultInformerProviders returns informer providers using the given client. These are not shared, and exist +// to provide a simple default implementation. +func NewDefaultInformerProviders(client kubernetes.Interface) *InformerProviders { + podInformerProvider := func( + namespace string, + labelSelector labels.Selector, + fieldSelector fields.Selector, + transformFunc cache.TransformFunc, + stopCh <-chan struct{}, + ) (cache.SharedInformer, error) { + return newSharedInformer(client, namespace, labelSelector, fieldSelector, transformFunc, stopCh) + } + + namespaceInformerProvider := func( + fs fields.Selector, + stopCh <-chan struct{}, + ) (cache.SharedInformer, error) { + return newNamespaceSharedInformer(client, fs, stopCh) + } + + replicaSetInformerProvider := func( + namespace string, + transformFunc cache.TransformFunc, + stopCh <-chan struct{}, + ) (cache.SharedInformer, error) { + return newReplicaSetSharedInformer(client, namespace, transformFunc, stopCh) + } + + nodeInformerProvider := func( + nodeName string, + watchSyncPeriod time.Duration, + stopCh <-chan struct{}, + ) (cache.SharedInformer, error) { + return newNodeSharedInformer(client, nodeName, watchSyncPeriod, stopCh) + } + + return &InformerProviders{ + PodInformerProvider: podInformerProvider, + NamespaceInformerProvider: namespaceInformerProvider, + ReplicaSetInformerProvider: replicaSetInformerProvider, + NodeInformerProvider: nodeInformerProvider, + } +} + // InformerProvider defines a function type that returns a new SharedInformer. It is used to // allow passing custom shared informers to the watch client. type InformerProvider func( - client kubernetes.Interface, namespace string, labelSelector labels.Selector, fieldSelector fields.Selector, -) cache.SharedInformer + transformFunc cache.TransformFunc, + stopCh <-chan struct{}, +) (cache.SharedInformer, error) // InformerProviderNamespace defines a function type that returns a new SharedInformer. It is used to // allow passing custom shared informers to the watch client for fetching namespace objects. type InformerProviderNamespace func( - client kubernetes.Interface, -) cache.SharedInformer + fieldSelector fields.Selector, + stopCh <-chan struct{}, +) (cache.SharedInformer, error) // InformerProviderNode defines a function type that returns a new SharedInformer. It is used to // allow passing custom shared informers to the watch client for fetching node objects. type InformerProviderNode func( - client kubernetes.Interface, -) cache.SharedInformer + nodeName string, + watchSyncPeriod time.Duration, + stopCh <-chan struct{}, +) (cache.SharedInformer, error) // InformerProviderReplicaSet defines a function type that returns a new SharedInformer. It is used to // allow passing custom shared informers to the watch client. type InformerProviderReplicaSet func( - client kubernetes.Interface, namespace string, -) cache.SharedInformer + transformFunc cache.TransformFunc, + stopCh <-chan struct{}, +) (cache.SharedInformer, error) func newSharedInformer( client kubernetes.Interface, namespace string, ls labels.Selector, fs fields.Selector, -) cache.SharedInformer { + transformFunc cache.TransformFunc, + stopCh <-chan struct{}, +) (cache.SharedInformer, error) { informer := cache.NewSharedInformer( &cache.ListWatch{ ListFunc: informerListFuncWithSelectors(client, namespace, ls, fs), @@ -61,7 +124,14 @@ func newSharedInformer( &api_v1.Pod{}, watchSyncPeriod, ) - return informer + if transformFunc != nil { + err := informer.SetTransform(transformFunc) + if err != nil { + return nil, err + } + } + go informer.Run(stopCh) + return informer, nil } func informerListFuncWithSelectors(client kubernetes.Interface, namespace string, ls labels.Selector, fs fields.Selector) cache.ListFunc { @@ -80,49 +150,37 @@ func informerWatchFuncWithSelectors(client kubernetes.Interface, namespace strin } } -// newKubeSystemSharedInformer watches only kube-system namespace -func newKubeSystemSharedInformer( - client kubernetes.Interface, -) cache.SharedInformer { - informer := cache.NewSharedInformer( - &cache.ListWatch{ - ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { - opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace).String() - return client.CoreV1().Namespaces().List(context.Background(), opts) - }, - WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", kubeSystemNamespace).String() - return client.CoreV1().Namespaces().Watch(context.Background(), opts) - }, - }, - &api_v1.Namespace{}, - watchSyncPeriod, - ) - return informer -} - func newNamespaceSharedInformer( client kubernetes.Interface, -) cache.SharedInformer { + fs fields.Selector, + stopCh <-chan struct{}, +) (cache.SharedInformer, error) { informer := cache.NewSharedInformer( &cache.ListWatch{ - ListFunc: namespaceInformerListFunc(client), - WatchFunc: namespaceInformerWatchFunc(client), + ListFunc: namespaceInformerListFunc(client, fs), + WatchFunc: namespaceInformerWatchFunc(client, fs), }, &api_v1.Namespace{}, watchSyncPeriod, ) - return informer + go informer.Run(stopCh) + return informer, nil } -func namespaceInformerListFunc(client kubernetes.Interface) cache.ListFunc { +func namespaceInformerListFunc(client kubernetes.Interface, fs fields.Selector) cache.ListFunc { return func(opts metav1.ListOptions) (runtime.Object, error) { + if fs != nil { + opts.FieldSelector = fs.String() + } return client.CoreV1().Namespaces().List(context.Background(), opts) } } -func namespaceInformerWatchFunc(client kubernetes.Interface) cache.WatchFunc { +func namespaceInformerWatchFunc(client kubernetes.Interface, fs fields.Selector) cache.WatchFunc { return func(opts metav1.ListOptions) (watch.Interface, error) { + if fs != nil { + opts.FieldSelector = fs.String() + } return client.CoreV1().Namespaces().Watch(context.Background(), opts) } } @@ -130,7 +188,9 @@ func namespaceInformerWatchFunc(client kubernetes.Interface) cache.WatchFunc { func newReplicaSetSharedInformer( client kubernetes.Interface, namespace string, -) cache.SharedInformer { + transformFunc cache.TransformFunc, + stopCh <-chan struct{}, +) (cache.SharedInformer, error) { informer := cache.NewSharedInformer( &cache.ListWatch{ ListFunc: replicasetListFuncWithSelectors(client, namespace), @@ -139,7 +199,14 @@ func newReplicaSetSharedInformer( &apps_v1.ReplicaSet{}, watchSyncPeriod, ) - return informer + if transformFunc != nil { + err := informer.SetTransform(transformFunc) + if err != nil { + return nil, err + } + } + go informer.Run(stopCh) + return informer, nil } func replicasetListFuncWithSelectors(client kubernetes.Interface, namespace string) cache.ListFunc { @@ -153,3 +220,14 @@ func replicasetWatchFuncWithSelectors(client kubernetes.Interface, namespace str return client.AppsV1().ReplicaSets(namespace).Watch(context.Background(), opts) } } + +func newNodeSharedInformer( + client kubernetes.Interface, + nodeName string, + watchSyncPeriod time.Duration, + stopCh <-chan struct{}, +) (cache.SharedInformer, error) { + informer := k8sconfig.NewNodeSharedInformer(client, nodeName, watchSyncPeriod) + go informer.Run(stopCh) + return informer, nil +} diff --git a/processor/k8sattributesprocessor/internal/kube/informer_test.go b/processor/k8sattributesprocessor/internal/kube/informer_test.go index 74931220dd81..d4049b5bc452 100644 --- a/processor/k8sattributesprocessor/internal/kube/informer_test.go +++ b/processor/k8sattributesprocessor/internal/kube/informer_test.go @@ -12,32 +12,73 @@ import ( api_v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/selection" + "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/cache" - - "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" ) -func Test_newSharedInformer(t *testing.T) { +func Test_DefaultInformerProviders(t *testing.T) { + client := fake.NewClientset() labelSelector, fieldSelector, err := selectorsFromFilters(Filters{}) require.NoError(t, err) - client, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - require.NoError(t, err) - informer := newSharedInformer(client, "testns", labelSelector, fieldSelector) - assert.NotNil(t, informer) + providers := NewDefaultInformerProviders(client) + require.NotNil(t, providers) + closeCh := make(chan struct{}) + + podInformer, err := providers.PodInformerProvider("testns", labelSelector, fieldSelector, nil, closeCh) + assert.NoError(t, err) + assert.NotNil(t, podInformer) + assert.False(t, podInformer.IsStopped()) + + namespaceInformer, err := providers.NamespaceInformerProvider(fieldSelector, closeCh) + assert.NoError(t, err) + assert.NotNil(t, namespaceInformer) + assert.False(t, namespaceInformer.IsStopped()) + + replicasetInformer, err := providers.ReplicaSetInformerProvider("testns", nil, closeCh) + assert.NoError(t, err) + assert.NotNil(t, replicasetInformer) + assert.False(t, replicasetInformer.IsStopped()) + + nodeInformer, err := providers.NodeInformerProvider("testNode", time.Minute, closeCh) + assert.NoError(t, err) + assert.NotNil(t, nodeInformer) + assert.False(t, nodeInformer.IsStopped()) + + close(closeCh) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.True(collect, podInformer.IsStopped()) + assert.True(collect, namespaceInformer.IsStopped()) + assert.True(collect, replicasetInformer.IsStopped()) + assert.True(collect, nodeInformer.IsStopped()) + }, time.Second*5, time.Millisecond) } -func Test_newSharedNamespaceInformer(t *testing.T) { - client, err := newFakeAPIClientset(k8sconfig.APIConfig{}) +func Test_newSharedInformer(t *testing.T) { + labelSelector, fieldSelector, err := selectorsFromFilters(Filters{}) require.NoError(t, err) - informer := newNamespaceSharedInformer(client) + client := fake.NewClientset() + stopCh := make(chan struct{}) + informer, err := newSharedInformer(client, "testns", labelSelector, fieldSelector, nil, stopCh) + assert.NoError(t, err) assert.NotNil(t, informer) + assert.False(t, informer.IsStopped()) + close(stopCh) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.True(collect, informer.IsStopped()) + }, time.Second*5, time.Millisecond) } -func Test_newKubeSystemSharedInformer(t *testing.T) { - client, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - require.NoError(t, err) - informer := newKubeSystemSharedInformer(client) +func Test_newSharedNamespaceInformer(t *testing.T) { + client := fake.NewClientset() + stopCh := make(chan struct{}) + informer, err := newNamespaceSharedInformer(client, nil, stopCh) + assert.NoError(t, err) assert.NotNil(t, informer) + assert.False(t, informer.IsStopped()) + close(stopCh) + assert.EventuallyWithT(t, func(collect *assert.CollectT) { + assert.True(collect, informer.IsStopped()) + }, time.Second*5, time.Millisecond) } func Test_informerListFuncWithSelectors(t *testing.T) { @@ -58,8 +99,7 @@ func Test_informerListFuncWithSelectors(t *testing.T) { }, }) assert.NoError(t, err) - c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - assert.NoError(t, err) + c := fake.NewSimpleClientset() listFunc := informerListFuncWithSelectors(c, "test-ns", ls, fs) opts := metav1.ListOptions{} obj, err := listFunc(opts) @@ -68,9 +108,8 @@ func Test_informerListFuncWithSelectors(t *testing.T) { } func Test_namespaceInformerListFunc(t *testing.T) { - c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - assert.NoError(t, err) - listFunc := namespaceInformerListFunc(c) + c := fake.NewClientset() + listFunc := namespaceInformerListFunc(c, nil) opts := metav1.ListOptions{} obj, err := listFunc(opts) assert.NoError(t, err) @@ -95,8 +134,7 @@ func Test_informerWatchFuncWithSelectors(t *testing.T) { }, }) assert.NoError(t, err) - c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - assert.NoError(t, err) + c := fake.NewClientset() watchFunc := informerWatchFuncWithSelectors(c, "test-ns", ls, fs) opts := metav1.ListOptions{} obj, err := watchFunc(opts) @@ -105,9 +143,8 @@ func Test_informerWatchFuncWithSelectors(t *testing.T) { } func Test_namespaceInformerWatchFunc(t *testing.T) { - c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) - assert.NoError(t, err) - watchFunc := namespaceInformerWatchFunc(c) + c := fake.NewClientset() + watchFunc := namespaceInformerWatchFunc(c, nil) opts := metav1.ListOptions{} obj, err := watchFunc(opts) assert.NoError(t, err) @@ -116,9 +153,9 @@ func Test_namespaceInformerWatchFunc(t *testing.T) { func Test_fakeInformer(t *testing.T) { // nothing real to test here. just to make coverage happy - c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) + stopCh := make(chan struct{}) + i, err := NewFakeInformer("ns", nil, nil, nil, stopCh) assert.NoError(t, err) - i := NewFakeInformer(c, "ns", nil, nil) _, err = i.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{}, time.Second) assert.NoError(t, err) i.HasSynced() @@ -128,10 +165,9 @@ func Test_fakeInformer(t *testing.T) { } func Test_fakeNamespaceInformer(t *testing.T) { - // nothing real to test here. just to make coverage happy - c, err := newFakeAPIClientset(k8sconfig.APIConfig{}) + stopCh := make(chan struct{}) + i, err := NewFakeNamespaceInformer(nil, stopCh) assert.NoError(t, err) - i := NewFakeNamespaceInformer(c) _, err = i.AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{}, time.Second) assert.NoError(t, err) i.HasSynced() diff --git a/processor/k8sattributesprocessor/internal/kube/kube.go b/processor/k8sattributesprocessor/internal/kube/kube.go index 9faeee2452dd..362b0c8159c8 100644 --- a/processor/k8sattributesprocessor/internal/kube/kube.go +++ b/processor/k8sattributesprocessor/internal/kube/kube.go @@ -96,7 +96,15 @@ type Client interface { } // ClientProvider defines a func type that returns a new Client. -type ClientProvider func(component.TelemetrySettings, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, Excludes, APIClientsetProvider, InformerProvider, InformerProviderNamespace, InformerProviderReplicaSet, bool, time.Duration) (Client, error) +type ClientProvider func( + component.TelemetrySettings, + ExtractionRules, + Filters, + []Association, + Excludes, + *InformerProviders, + bool, time.Duration, +) (Client, error) // APIClientsetProvider defines a func type that initializes and return a new kubernetes // Clientset object. @@ -248,6 +256,22 @@ func (rules *ExtractionRules) IncludesOwnerMetadata() bool { return false } +func (rules *ExtractionRules) extractNamespaceLabelsAnnotations() bool { + for _, r := range rules.Labels { + if r.From == MetadataFromNamespace { + return true + } + } + + for _, r := range rules.Annotations { + if r.From == MetadataFromNamespace { + return true + } + } + + return false +} + // FieldExtractionRule is used to specify which fields to extract from pod fields // and inject into spans as attributes. type FieldExtractionRule struct { diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index e656a41469bc..c69483229cff 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -33,6 +33,9 @@ type kubernetesprocessor struct { telemetrySettings component.TelemetrySettings logger *zap.Logger apiConfig k8sconfig.APIConfig + k8sClientProvider kube.APIClientsetProvider + kcProvider kube.ClientProvider + informerProviders *kube.InformerProviders kc kube.Client passthroughMode bool rules kube.ExtractionRules @@ -44,11 +47,24 @@ type kubernetesprocessor struct { } func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, kubeClient kube.ClientProvider) error { - if kubeClient == nil { - kubeClient = kube.New + if kp.informerProviders == nil { + k8sClient, err := kp.k8sClientProvider(kp.apiConfig) + if err != nil { + return err + } + kp.informerProviders = kube.NewDefaultInformerProviders(k8sClient) } if !kp.passthroughMode { - kc, err := kubeClient(set, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, nil, kp.waitForMetadata, kp.waitForMetadataTimeout) + kc, err := kubeClient( + set, + kp.rules, + kp.filters, + kp.podAssociations, + kp.podIgnore, + kp.informerProviders, + kp.waitForMetadata, + kp.waitForMetadataTimeout, + ) if err != nil { return err } @@ -58,6 +74,14 @@ func (kp *kubernetesprocessor) initKubeClient(set component.TelemetrySettings, k } func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) error { + if kp.k8sClientProvider == nil { + kp.k8sClientProvider = k8sconfig.MakeClient + } + + if kp.kcProvider == nil { + kp.kcProvider = kube.New + } + allOptions := append(createProcessorOpts(kp.cfg), kp.options...) for _, opt := range allOptions { @@ -70,7 +94,7 @@ func (kp *kubernetesprocessor) Start(_ context.Context, host component.Host) err // This might have been set by an option already if kp.kc == nil { - err := kp.initKubeClient(kp.telemetrySettings, kubeClientProvider) + err := kp.initKubeClient(kp.telemetrySettings, kp.kcProvider) if err != nil { kp.logger.Error("Could not initialize kube client", zap.Error(err)) componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) diff --git a/processor/k8sattributesprocessor/processor_test.go b/processor/k8sattributesprocessor/processor_test.go index ee51cc82d9f0..88d1cd01c280 100644 --- a/processor/k8sattributesprocessor/processor_test.go +++ b/processor/k8sattributesprocessor/processor_test.go @@ -28,6 +28,8 @@ import ( "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/collector/processor/xprocessor" conventions "go.opentelemetry.io/collector/semconv/v1.8.0" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor/internal/kube" @@ -47,7 +49,7 @@ func newPodIdentifier(from string, name string, value string) kube.PodIdentifier func newTracesProcessor(cfg component.Config, next consumer.Traces, options ...option) (processor.Traces, error) { opts := options - opts = append(opts, withKubeClientProvider(newFakeClient)) + opts = append(opts, withKubeClientProvider(newFakeClient), withAPIClientsetProvider(newFakeClientset)) set := processortest.NewNopSettings() return createTracesProcessorWithOptions( context.Background(), @@ -60,7 +62,7 @@ func newTracesProcessor(cfg component.Config, next consumer.Traces, options ...o func newMetricsProcessor(cfg component.Config, nextMetricsConsumer consumer.Metrics, options ...option) (processor.Metrics, error) { opts := options - opts = append(opts, withKubeClientProvider(newFakeClient)) + opts = append(opts, withKubeClientProvider(newFakeClient), withAPIClientsetProvider(newFakeClientset)) set := processortest.NewNopSettings() return createMetricsProcessorWithOptions( context.Background(), @@ -73,7 +75,7 @@ func newMetricsProcessor(cfg component.Config, nextMetricsConsumer consumer.Metr func newLogsProcessor(cfg component.Config, nextLogsConsumer consumer.Logs, options ...option) (processor.Logs, error) { opts := options - opts = append(opts, withKubeClientProvider(newFakeClient)) + opts = append(opts, withKubeClientProvider(newFakeClient), withAPIClientsetProvider(newFakeClientset)) set := processortest.NewNopSettings() return createLogsProcessorWithOptions( context.Background(), @@ -86,7 +88,7 @@ func newLogsProcessor(cfg component.Config, nextLogsConsumer consumer.Logs, opti func newProfilesProcessor(cfg component.Config, nextProfilesConsumer xconsumer.Profiles, options ...option) (xprocessor.Profiles, error) { opts := options - opts = append(opts, withKubeClientProvider(newFakeClient)) + opts = append(opts, withKubeClientProvider(newFakeClient), withAPIClientsetProvider(newFakeClientset)) set := processortest.NewNopSettings() return createProfilesProcessorWithOptions( context.Background(), @@ -97,10 +99,23 @@ func newProfilesProcessor(cfg component.Config, nextProfilesConsumer xconsumer.P ) } +func newFakeClientset(_ k8sconfig.APIConfig) (kubernetes.Interface, error) { + return fake.NewClientset(), nil +} + +// withAPIClientsetProvider provides a factory method for creating the K8s clientset. +func withAPIClientsetProvider(provider kube.APIClientsetProvider) option { + return func(p *kubernetesprocessor) error { + p.k8sClientProvider = provider + return nil + } +} + // withKubeClientProvider sets the specific implementation for getting K8s Client instances func withKubeClientProvider(kcp kube.ClientProvider) option { return func(p *kubernetesprocessor) error { - return p.initKubeClient(p.telemetrySettings, kcp) + p.kcProvider = kcp + return nil } } @@ -137,6 +152,11 @@ func newMultiTest( errFunc func(err error), options ...option, ) *multiTest { + if errFunc == nil { + errFunc = func(err error) { + require.NoError(t, err) + } + } m := &multiTest{ t: t, nextTrace: new(consumertest.TracesSink), @@ -262,11 +282,19 @@ func (m *multiTest) assertResource(batchNum int, resourceFunc func(res pcommon.R func TestNewProcessor(t *testing.T) { cfg := NewFactory().CreateDefaultConfig() - newMultiTest(t, cfg, nil) + newMultiTest(t, cfg, nil, withAPIClientsetProvider(newFakeClientset)) } func TestProcessorBadClientProvider(t *testing.T) { - clientProvider := func(_ component.TelemetrySettings, _ k8sconfig.APIConfig, _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.InformerProviderNamespace, _ kube.InformerProviderReplicaSet, _ bool, _ time.Duration) (kube.Client, error) { + clientProvider := func( + _ component.TelemetrySettings, + _ kube.ExtractionRules, + _ kube.Filters, + _ []kube.Association, + _ kube.Excludes, + _ *kube.InformerProviders, + _ bool, _ time.Duration, + ) (kube.Client, error) { return nil, fmt.Errorf("bad client error") } @@ -1521,7 +1549,6 @@ func TestRealClient(t *testing.T) { func(err error) { require.EqualError(t, err, "unable to load k8s config, KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT must be defined") }, - withKubeClientProvider(kubeClientProvider), withAPIConfig(k8sconfig.APIConfig{AuthType: "none"}), ) }