From cc47013df98c74a6fc2e49a89c39b2e3fcf376c2 Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 28 Aug 2024 04:02:10 -0400 Subject: [PATCH] Split target allocator into an internal package (#33223) Fixes https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33146 --- .chloggen/refactor-target-allocator.yaml | 27 ++ receiver/prometheusreceiver/config.go | 66 +--- receiver/prometheusreceiver/config_test.go | 31 -- .../prometheusreceiver/metrics_receiver.go | 330 ++---------------- .../targetallocator/config.go | 197 +++++++++++ .../targetallocator/config_test.go | 69 ++++ .../targetallocator/manager.go | 244 +++++++++++++ .../manager_test.go} | 129 +++---- .../targetallocator/testdata/config.yaml | 7 + .../testdata/dummy-tls-cert-file | 0 .../testdata/dummy-tls-key-file | 0 11 files changed, 633 insertions(+), 467 deletions(-) create mode 100644 .chloggen/refactor-target-allocator.yaml create mode 100644 receiver/prometheusreceiver/targetallocator/config.go create mode 100644 receiver/prometheusreceiver/targetallocator/config_test.go create mode 100644 receiver/prometheusreceiver/targetallocator/manager.go rename receiver/prometheusreceiver/{metrics_receiver_target_allocator_test.go => targetallocator/manager_test.go} (88%) create mode 100644 receiver/prometheusreceiver/targetallocator/testdata/config.yaml create mode 100644 receiver/prometheusreceiver/targetallocator/testdata/dummy-tls-cert-file create mode 100644 receiver/prometheusreceiver/targetallocator/testdata/dummy-tls-key-file diff --git a/.chloggen/refactor-target-allocator.yaml b/.chloggen/refactor-target-allocator.yaml new file mode 100644 index 000000000000..ea632a87843a --- /dev/null +++ b/.chloggen/refactor-target-allocator.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: prometheusreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Move the TargetAllocator configuration struct to an internal directory + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33146] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/receiver/prometheusreceiver/config.go b/receiver/prometheusreceiver/config.go index c4269efde038..631c562679f4 100644 --- a/receiver/prometheusreceiver/config.go +++ b/receiver/prometheusreceiver/config.go @@ -6,19 +6,17 @@ package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-co import ( "errors" "fmt" - "net/url" "os" "sort" "strings" - "time" commonconfig "github.com/prometheus/common/config" promconfig "github.com/prometheus/prometheus/config" - promHTTP "github.com/prometheus/prometheus/discovery/http" "github.com/prometheus/prometheus/discovery/kubernetes" - "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/confmap" "gopkg.in/yaml.v2" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" ) // Config defines configuration for Prometheus receiver. @@ -37,7 +35,7 @@ type Config struct { // ReportExtraScrapeMetrics - enables reporting of additional metrics for Prometheus client like scrape_body_size_bytes ReportExtraScrapeMetrics bool `mapstructure:"report_extra_scrape_metrics"` - TargetAllocator *TargetAllocator `mapstructure:"target_allocator"` + TargetAllocator *targetallocator.Config `mapstructure:"target_allocator"` } // Validate checks the receiver configuration is valid. @@ -48,27 +46,6 @@ func (cfg *Config) Validate() error { return nil } -type TargetAllocator struct { - confighttp.ClientConfig `mapstructure:",squash"` - Interval time.Duration `mapstructure:"interval"` - CollectorID string `mapstructure:"collector_id"` - HTTPSDConfig *PromHTTPSDConfig `mapstructure:"http_sd_config"` - HTTPScrapeConfig *PromHTTPClientConfig `mapstructure:"http_scrape_config"` -} - -func (cfg *TargetAllocator) Validate() error { - // ensure valid endpoint - if _, err := url.ParseRequestURI(cfg.Endpoint); err != nil { - return fmt.Errorf("TargetAllocator endpoint is not valid: %s", cfg.Endpoint) - } - // ensure valid collectorID without variables - if cfg.CollectorID == "" || strings.Contains(cfg.CollectorID, "${") { - return fmt.Errorf("CollectorID is not a valid ID") - } - - return nil -} - // PromConfig is a redeclaration of promconfig.Config because we need custom unmarshaling // as prometheus "config" uses `yaml` tags. type PromConfig promconfig.Config @@ -126,43 +103,6 @@ func (cfg *PromConfig) Validate() error { return nil } -// PromHTTPSDConfig is a redeclaration of promHTTP.SDConfig because we need custom unmarshaling -// as prometheus "config" uses `yaml` tags. -type PromHTTPSDConfig promHTTP.SDConfig - -var _ confmap.Unmarshaler = (*PromHTTPSDConfig)(nil) - -func (cfg *PromHTTPSDConfig) Unmarshal(componentParser *confmap.Conf) error { - cfgMap := componentParser.ToStringMap() - if len(cfgMap) == 0 { - return nil - } - cfgMap["url"] = "http://placeholder" // we have to set it as else marshaling will fail - return unmarshalYAML(cfgMap, (*promHTTP.SDConfig)(cfg)) -} - -type PromHTTPClientConfig commonconfig.HTTPClientConfig - -var _ confmap.Unmarshaler = (*PromHTTPClientConfig)(nil) - -func (cfg *PromHTTPClientConfig) Unmarshal(componentParser *confmap.Conf) error { - cfgMap := componentParser.ToStringMap() - if len(cfgMap) == 0 { - return nil - } - return unmarshalYAML(cfgMap, (*commonconfig.HTTPClientConfig)(cfg)) -} - -func (cfg *PromHTTPClientConfig) Validate() error { - httpCfg := (*commonconfig.HTTPClientConfig)(cfg) - if err := validateHTTPClientConfig(httpCfg); err != nil { - return err - } - // Prometheus UnmarshalYaml implementation by default calls Validate, - // but it is safer to do it here as well. - return httpCfg.Validate() -} - func unmarshalYAML(in map[string]any, out any) error { yamlOut, err := yaml.Marshal(in) if err != nil { diff --git a/receiver/prometheusreceiver/config_test.go b/receiver/prometheusreceiver/config_test.go index 6ba48222ce92..36fa893baf1e 100644 --- a/receiver/prometheusreceiver/config_test.go +++ b/receiver/prometheusreceiver/config_test.go @@ -341,34 +341,3 @@ func TestFileSDConfigWithoutSDFile(t *testing.T) { require.NoError(t, component.ValidateConfig(cfg)) } - -func TestPromHTTPClientConfigValidateAuthorization(t *testing.T) { - cfg := PromHTTPClientConfig{} - require.NoError(t, component.ValidateConfig(cfg)) - cfg.Authorization = &promConfig.Authorization{} - require.NoError(t, component.ValidateConfig(cfg)) - cfg.Authorization.CredentialsFile = "none" - require.Error(t, component.ValidateConfig(cfg)) - cfg.Authorization.CredentialsFile = filepath.Join("testdata", "dummy-tls-cert-file") - require.NoError(t, component.ValidateConfig(cfg)) -} - -func TestPromHTTPClientConfigValidateTLSConfig(t *testing.T) { - cfg := PromHTTPClientConfig{} - require.NoError(t, component.ValidateConfig(cfg)) - cfg.TLSConfig.CertFile = "none" - require.Error(t, component.ValidateConfig(cfg)) - cfg.TLSConfig.CertFile = filepath.Join("testdata", "dummy-tls-cert-file") - cfg.TLSConfig.KeyFile = "none" - require.Error(t, component.ValidateConfig(cfg)) - cfg.TLSConfig.KeyFile = filepath.Join("testdata", "dummy-tls-key-file") - require.NoError(t, component.ValidateConfig(cfg)) -} - -func TestPromHTTPClientConfigValidateMain(t *testing.T) { - cfg := PromHTTPClientConfig{} - require.NoError(t, component.ValidateConfig(cfg)) - cfg.BearerToken = "foo" - cfg.BearerTokenFile = filepath.Join("testdata", "dummy-tls-key-file") - require.Error(t, component.ValidateConfig(cfg)) -} diff --git a/receiver/prometheusreceiver/metrics_receiver.go b/receiver/prometheusreceiver/metrics_receiver.go index 8868cb71534f..90613149b14b 100644 --- a/receiver/prometheusreceiver/metrics_receiver.go +++ b/receiver/prometheusreceiver/metrics_receiver.go @@ -4,20 +4,11 @@ package prometheusreceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver" import ( - "bytes" "context" - "encoding/base64" "errors" "fmt" - "hash/fnv" - "io" - "net/http" - "net/url" - "os" "reflect" "regexp" - "sort" - "strings" "sync" "time" "unsafe" @@ -25,19 +16,17 @@ import ( "github.com/go-kit/log" "github.com/prometheus/client_golang/prometheus" commonconfig "github.com/prometheus/common/config" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/config" + promconfig "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/discovery" - promHTTP "github.com/prometheus/prometheus/discovery/http" "github.com/prometheus/prometheus/scrape" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" - "gopkg.in/yaml.v2" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/internal" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" ) const ( @@ -47,33 +36,38 @@ const ( // pReceiver is the type that provides Prometheus scraper/receiver functionality. type pReceiver struct { - cfg *Config - consumer consumer.Metrics - cancelFunc context.CancelFunc - targetAllocatorStop chan struct{} - configLoaded chan struct{} - loadConfigOnce sync.Once - - settings receiver.Settings - scrapeManager *scrape.Manager - discoveryManager *discovery.Manager - httpClient *http.Client - registerer prometheus.Registerer - unregisterMetrics func() - skipOffsetting bool // for testing only + cfg *Config + consumer consumer.Metrics + cancelFunc context.CancelFunc + configLoaded chan struct{} + loadConfigOnce sync.Once + + settings receiver.Settings + scrapeManager *scrape.Manager + discoveryManager *discovery.Manager + targetAllocatorManager *targetallocator.Manager + registerer prometheus.Registerer + unregisterMetrics func() + skipOffsetting bool // for testing only } // New creates a new prometheus.Receiver reference. func newPrometheusReceiver(set receiver.Settings, cfg *Config, next consumer.Metrics) *pReceiver { + baseCfg := promconfig.Config(*cfg.PrometheusConfig) pr := &pReceiver{ - cfg: cfg, - consumer: next, - settings: set, - configLoaded: make(chan struct{}), - targetAllocatorStop: make(chan struct{}), + cfg: cfg, + consumer: next, + settings: set, + configLoaded: make(chan struct{}), registerer: prometheus.WrapRegistererWith( prometheus.Labels{"receiver": set.ID.String()}, prometheus.DefaultRegisterer), + targetAllocatorManager: targetallocator.NewManager( + set, + cfg.TargetAllocator, + &baseCfg, + enableNativeHistogramsGate.IsEnabled(), + ), } return pr } @@ -86,34 +80,17 @@ func (r *pReceiver) Start(ctx context.Context, host component.Host) error { logger := internal.NewZapToGokitLogAdapter(r.settings.Logger) - // add scrape configs defined by the collector configs - baseCfg := r.cfg.PrometheusConfig - err := r.initPrometheusComponents(discoveryCtx, logger, host) if err != nil { r.settings.Logger.Error("Failed to initPrometheusComponents Prometheus components", zap.Error(err)) return err } - err = r.applyCfg(baseCfg) + err = r.targetAllocatorManager.Start(ctx, host, r.scrapeManager, r.discoveryManager) if err != nil { - r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) return err } - allocConf := r.cfg.TargetAllocator - if allocConf != nil { - r.httpClient, err = r.cfg.TargetAllocator.ToClient(ctx, host, r.settings.TelemetrySettings) - if err != nil { - r.settings.Logger.Error("Failed to create http client", zap.Error(err)) - return err - } - err = r.startTargetAllocator(allocConf, baseCfg) - if err != nil { - return err - } - } - r.loadConfigOnce.Do(func() { close(r.configLoaded) }) @@ -121,255 +98,6 @@ func (r *pReceiver) Start(ctx context.Context, host component.Host) error { return nil } -func (r *pReceiver) startTargetAllocator(allocConf *TargetAllocator, baseCfg *PromConfig) error { - r.settings.Logger.Info("Starting target allocator discovery") - // immediately sync jobs, not waiting for the first tick - savedHash, err := r.syncTargetAllocator(uint64(0), allocConf, baseCfg) - if err != nil { - return err - } - go func() { - targetAllocatorIntervalTicker := time.NewTicker(allocConf.Interval) - for { - select { - case <-targetAllocatorIntervalTicker.C: - hash, newErr := r.syncTargetAllocator(savedHash, allocConf, baseCfg) - if newErr != nil { - r.settings.Logger.Error(newErr.Error()) - continue - } - savedHash = hash - case <-r.targetAllocatorStop: - targetAllocatorIntervalTicker.Stop() - r.settings.Logger.Info("Stopping target allocator") - return - } - } - }() - return nil -} - -// Calculate a hash for a scrape config map. -// This is done by marshaling to YAML because it's the most straightforward and doesn't run into problems with unexported fields. -func getScrapeConfigHash(jobToScrapeConfig map[string]*config.ScrapeConfig) (uint64, error) { - var err error - hash := fnv.New64() - yamlEncoder := yaml.NewEncoder(hash) - - jobKeys := make([]string, 0, len(jobToScrapeConfig)) - for jobName := range jobToScrapeConfig { - jobKeys = append(jobKeys, jobName) - } - sort.Strings(jobKeys) - - for _, jobName := range jobKeys { - _, err = hash.Write([]byte(jobName)) - if err != nil { - return 0, err - } - err = yamlEncoder.Encode(jobToScrapeConfig[jobName]) - if err != nil { - return 0, err - } - } - yamlEncoder.Close() - return hash.Sum64(), err -} - -// ConvertTLSVersion converts a string TLS version to the corresponding config.TLSVersion value in prometheus common. -func convertTLSVersion(version string) (commonconfig.TLSVersion, error) { - normalizedVersion := "TLS" + strings.ReplaceAll(version, ".", "") - - if tlsVersion, exists := commonconfig.TLSVersions[normalizedVersion]; exists { - return tlsVersion, nil - } - return 0, fmt.Errorf("unsupported TLS version: %s", version) -} - -// configureSDHTTPClientConfig configures the http client for the service discovery manager -// based on the provided TargetAllocator configuration. -func configureSDHTTPClientConfigFromTA(httpSD *promHTTP.SDConfig, allocConf *TargetAllocator) error { - httpSD.HTTPClientConfig.FollowRedirects = false - - httpSD.HTTPClientConfig.TLSConfig = commonconfig.TLSConfig{ - InsecureSkipVerify: allocConf.TLSSetting.InsecureSkipVerify, - ServerName: allocConf.TLSSetting.ServerName, - CAFile: allocConf.TLSSetting.CAFile, - CertFile: allocConf.TLSSetting.CertFile, - KeyFile: allocConf.TLSSetting.KeyFile, - } - - if allocConf.TLSSetting.CAPem != "" { - decodedCA, err := base64.StdEncoding.DecodeString(string(allocConf.TLSSetting.CAPem)) - if err != nil { - return fmt.Errorf("failed to decode CA: %w", err) - } - httpSD.HTTPClientConfig.TLSConfig.CA = string(decodedCA) - } - - if allocConf.TLSSetting.CertPem != "" { - decodedCert, err := base64.StdEncoding.DecodeString(string(allocConf.TLSSetting.CertPem)) - if err != nil { - return fmt.Errorf("failed to decode Cert: %w", err) - } - httpSD.HTTPClientConfig.TLSConfig.Cert = string(decodedCert) - } - - if allocConf.TLSSetting.KeyPem != "" { - decodedKey, err := base64.StdEncoding.DecodeString(string(allocConf.TLSSetting.KeyPem)) - if err != nil { - return fmt.Errorf("failed to decode Key: %w", err) - } - httpSD.HTTPClientConfig.TLSConfig.Key = commonconfig.Secret(decodedKey) - } - - if allocConf.TLSSetting.MinVersion != "" { - minVersion, err := convertTLSVersion(allocConf.TLSSetting.MinVersion) - if err != nil { - return err - } - httpSD.HTTPClientConfig.TLSConfig.MinVersion = minVersion - } - - if allocConf.TLSSetting.MaxVersion != "" { - maxVersion, err := convertTLSVersion(allocConf.TLSSetting.MaxVersion) - if err != nil { - return err - } - httpSD.HTTPClientConfig.TLSConfig.MaxVersion = maxVersion - } - - if allocConf.ProxyURL != "" { - proxyURL, err := url.Parse(allocConf.ProxyURL) - if err != nil { - return err - } - httpSD.HTTPClientConfig.ProxyURL = commonconfig.URL{URL: proxyURL} - } - - return nil -} - -// syncTargetAllocator request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash. -// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs. -func (r *pReceiver) syncTargetAllocator(compareHash uint64, allocConf *TargetAllocator, baseCfg *PromConfig) (uint64, error) { - r.settings.Logger.Debug("Syncing target allocator jobs") - scrapeConfigsResponse, err := r.getScrapeConfigsResponse(allocConf.Endpoint) - if err != nil { - r.settings.Logger.Error("Failed to retrieve job list", zap.Error(err)) - return 0, err - } - - hash, err := getScrapeConfigHash(scrapeConfigsResponse) - if err != nil { - r.settings.Logger.Error("Failed to hash job list", zap.Error(err)) - return 0, err - } - if hash == compareHash { - // no update needed - return hash, nil - } - - // Clear out the current configurations - baseCfg.ScrapeConfigs = []*config.ScrapeConfig{} - - for jobName, scrapeConfig := range scrapeConfigsResponse { - var httpSD promHTTP.SDConfig - if allocConf.HTTPSDConfig == nil { - httpSD = promHTTP.SDConfig{ - RefreshInterval: model.Duration(30 * time.Second), - } - } else { - httpSD = promHTTP.SDConfig(*allocConf.HTTPSDConfig) - } - escapedJob := url.QueryEscape(jobName) - httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", allocConf.Endpoint, escapedJob, allocConf.CollectorID) - - err = configureSDHTTPClientConfigFromTA(&httpSD, allocConf) - if err != nil { - r.settings.Logger.Error("Failed to configure http client config", zap.Error(err)) - return 0, err - } - - scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{ - &httpSD, - } - - if allocConf.HTTPScrapeConfig != nil { - scrapeConfig.HTTPClientConfig = commonconfig.HTTPClientConfig(*allocConf.HTTPScrapeConfig) - } - - baseCfg.ScrapeConfigs = append(baseCfg.ScrapeConfigs, scrapeConfig) - } - - err = r.applyCfg(baseCfg) - if err != nil { - r.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) - return 0, err - } - - return hash, nil -} - -// instantiateShard inserts the SHARD environment variable in the returned configuration -func (r *pReceiver) instantiateShard(body []byte) []byte { - shard, ok := os.LookupEnv("SHARD") - if !ok { - shard = "0" - } - return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard)) -} - -func (r *pReceiver) getScrapeConfigsResponse(baseURL string) (map[string]*config.ScrapeConfig, error) { - scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL) - _, err := url.Parse(scrapeConfigsURL) // check if valid - if err != nil { - return nil, err - } - - resp, err := r.httpClient.Get(scrapeConfigsURL) - if err != nil { - return nil, err - } - - body, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err - } - - jobToScrapeConfig := map[string]*config.ScrapeConfig{} - envReplacedBody := r.instantiateShard(body) - err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig) - if err != nil { - return nil, err - } - err = resp.Body.Close() - if err != nil { - return nil, err - } - return jobToScrapeConfig, nil -} - -func (r *pReceiver) applyCfg(cfg *PromConfig) error { - if !enableNativeHistogramsGate.IsEnabled() { - // Enforce scraping classic histograms to avoid dropping them. - for _, scrapeConfig := range cfg.ScrapeConfigs { - scrapeConfig.ScrapeClassicHistograms = true - } - } - - if err := r.scrapeManager.ApplyConfig((*config.Config)(cfg)); err != nil { - return err - } - - discoveryCfg := make(map[string]discovery.Configs) - for _, scrapeConfig := range cfg.ScrapeConfigs { - discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs - r.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName)) - } - return r.discoveryManager.ApplyConfig(discoveryCfg) -} - func (r *pReceiver) initPrometheusComponents(ctx context.Context, logger log.Logger, host component.Host) error { // Some SD mechanisms use the "refresh" package, which has its own metrics. refreshSdMetrics := discovery.NewRefreshMetrics(r.registerer) @@ -489,7 +217,9 @@ func (r *pReceiver) Shutdown(context.Context) error { if r.scrapeManager != nil { r.scrapeManager.Stop() } - close(r.targetAllocatorStop) + if r.targetAllocatorManager != nil { + r.targetAllocatorManager.Shutdown() + } if r.unregisterMetrics != nil { r.unregisterMetrics() } diff --git a/receiver/prometheusreceiver/targetallocator/config.go b/receiver/prometheusreceiver/targetallocator/config.go new file mode 100644 index 000000000000..07fc1d579a83 --- /dev/null +++ b/receiver/prometheusreceiver/targetallocator/config.go @@ -0,0 +1,197 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package targetallocator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" + +import ( + "encoding/base64" + "fmt" + "net/url" + "os" + "strings" + "time" + + commonconfig "github.com/prometheus/common/config" + promHTTP "github.com/prometheus/prometheus/discovery/http" + "go.opentelemetry.io/collector/config/confighttp" + "go.opentelemetry.io/collector/confmap" + "gopkg.in/yaml.v2" +) + +type Config struct { + confighttp.ClientConfig `mapstructure:",squash"` + Interval time.Duration `mapstructure:"interval"` + CollectorID string `mapstructure:"collector_id"` + HTTPSDConfig *PromHTTPSDConfig `mapstructure:"http_sd_config"` + HTTPScrapeConfig *PromHTTPClientConfig `mapstructure:"http_scrape_config"` +} + +// PromHTTPSDConfig is a redeclaration of promHTTP.SDConfig because we need custom unmarshaling +// as prometheus "config" uses `yaml` tags. +type PromHTTPSDConfig promHTTP.SDConfig + +func (cfg *Config) Validate() error { + // ensure valid endpoint + if _, err := url.ParseRequestURI(cfg.Endpoint); err != nil { + return fmt.Errorf("TargetAllocator endpoint is not valid: %s", cfg.Endpoint) + } + // ensure valid collectorID without variables + if cfg.CollectorID == "" || strings.Contains(cfg.CollectorID, "${") { + return fmt.Errorf("CollectorID is not a valid ID") + } + + return nil +} + +var _ confmap.Unmarshaler = (*PromHTTPSDConfig)(nil) + +func (cfg *PromHTTPSDConfig) Unmarshal(componentParser *confmap.Conf) error { + cfgMap := componentParser.ToStringMap() + if len(cfgMap) == 0 { + return nil + } + cfgMap["url"] = "http://placeholder" // we have to set it as else marshaling will fail + return unmarshalYAML(cfgMap, (*promHTTP.SDConfig)(cfg)) +} + +type PromHTTPClientConfig commonconfig.HTTPClientConfig + +var _ confmap.Unmarshaler = (*PromHTTPClientConfig)(nil) + +func (cfg *PromHTTPClientConfig) Unmarshal(componentParser *confmap.Conf) error { + cfgMap := componentParser.ToStringMap() + if len(cfgMap) == 0 { + return nil + } + return unmarshalYAML(cfgMap, (*commonconfig.HTTPClientConfig)(cfg)) +} + +func (cfg *PromHTTPClientConfig) Validate() error { + httpCfg := (*commonconfig.HTTPClientConfig)(cfg) + if err := validateHTTPClientConfig(httpCfg); err != nil { + return err + } + // Prometheus UnmarshalYaml implementation by default calls Validate, + // but it is safer to do it here as well. + return httpCfg.Validate() +} + +func validateHTTPClientConfig(cfg *commonconfig.HTTPClientConfig) error { + if cfg.Authorization != nil { + if err := checkFile(cfg.Authorization.CredentialsFile); err != nil { + return fmt.Errorf("error checking authorization credentials file %q: %w", cfg.Authorization.CredentialsFile, err) + } + } + + if err := checkTLSConfig(cfg.TLSConfig); err != nil { + return err + } + return nil + +} + +func checkFile(fn string) error { + // Nothing set, nothing to error on. + if fn == "" { + return nil + } + _, err := os.Stat(fn) + return err +} + +func checkTLSConfig(tlsConfig commonconfig.TLSConfig) error { + if err := checkFile(tlsConfig.CertFile); err != nil { + return fmt.Errorf("error checking client cert file %q: %w", tlsConfig.CertFile, err) + } + if err := checkFile(tlsConfig.KeyFile); err != nil { + return fmt.Errorf("error checking client key file %q: %w", tlsConfig.KeyFile, err) + } + return nil +} + +func unmarshalYAML(in map[string]any, out any) error { + yamlOut, err := yaml.Marshal(in) + if err != nil { + return fmt.Errorf("prometheus receiver: failed to marshal config to yaml: %w", err) + } + + err = yaml.UnmarshalStrict(yamlOut, out) + if err != nil { + return fmt.Errorf("prometheus receiver: failed to unmarshal yaml to prometheus config object: %w", err) + } + return nil +} + +// convertTLSVersion converts a string TLS version to the corresponding config.TLSVersion value in prometheus common. +func convertTLSVersion(version string) (commonconfig.TLSVersion, error) { + normalizedVersion := "TLS" + strings.ReplaceAll(version, ".", "") + + if tlsVersion, exists := commonconfig.TLSVersions[normalizedVersion]; exists { + return tlsVersion, nil + } + return 0, fmt.Errorf("unsupported TLS version: %s", version) +} + +// configureSDHTTPClientConfigFromTA configures the http client for the service discovery manager +// based on the provided TargetAllocator configuration. +func configureSDHTTPClientConfigFromTA(httpSD *promHTTP.SDConfig, allocConf *Config) error { + httpSD.HTTPClientConfig.FollowRedirects = false + + httpSD.HTTPClientConfig.TLSConfig = commonconfig.TLSConfig{ + InsecureSkipVerify: allocConf.TLSSetting.InsecureSkipVerify, + ServerName: allocConf.TLSSetting.ServerName, + CAFile: allocConf.TLSSetting.CAFile, + CertFile: allocConf.TLSSetting.CertFile, + KeyFile: allocConf.TLSSetting.KeyFile, + } + + if allocConf.TLSSetting.CAPem != "" { + decodedCA, err := base64.StdEncoding.DecodeString(string(allocConf.TLSSetting.CAPem)) + if err != nil { + return fmt.Errorf("failed to decode CA: %w", err) + } + httpSD.HTTPClientConfig.TLSConfig.CA = string(decodedCA) + } + + if allocConf.TLSSetting.CertPem != "" { + decodedCert, err := base64.StdEncoding.DecodeString(string(allocConf.TLSSetting.CertPem)) + if err != nil { + return fmt.Errorf("failed to decode Cert: %w", err) + } + httpSD.HTTPClientConfig.TLSConfig.Cert = string(decodedCert) + } + + if allocConf.TLSSetting.KeyPem != "" { + decodedKey, err := base64.StdEncoding.DecodeString(string(allocConf.TLSSetting.KeyPem)) + if err != nil { + return fmt.Errorf("failed to decode Key: %w", err) + } + httpSD.HTTPClientConfig.TLSConfig.Key = commonconfig.Secret(decodedKey) + } + + if allocConf.TLSSetting.MinVersion != "" { + minVersion, err := convertTLSVersion(allocConf.TLSSetting.MinVersion) + if err != nil { + return err + } + httpSD.HTTPClientConfig.TLSConfig.MinVersion = minVersion + } + + if allocConf.TLSSetting.MaxVersion != "" { + maxVersion, err := convertTLSVersion(allocConf.TLSSetting.MaxVersion) + if err != nil { + return err + } + httpSD.HTTPClientConfig.TLSConfig.MaxVersion = maxVersion + } + + if allocConf.ProxyURL != "" { + proxyURL, err := url.Parse(allocConf.ProxyURL) + if err != nil { + return err + } + httpSD.HTTPClientConfig.ProxyURL = commonconfig.URL{URL: proxyURL} + } + + return nil +} diff --git a/receiver/prometheusreceiver/targetallocator/config_test.go b/receiver/prometheusreceiver/targetallocator/config_test.go new file mode 100644 index 000000000000..650da0327eac --- /dev/null +++ b/receiver/prometheusreceiver/targetallocator/config_test.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package targetallocator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" + +import ( + "path/filepath" + "testing" + "time" + + promConfig "github.com/prometheus/common/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/confmap/confmaptest" +) + +func TestComponentConfigStruct(t *testing.T) { + require.NoError(t, componenttest.CheckConfigStruct(Config{})) +} + +func TestLoadTargetAllocatorConfig(t *testing.T) { + cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) + require.NoError(t, err) + cfg := &Config{} + + sub, err := cm.Sub("target_allocator") + require.NoError(t, err) + require.NoError(t, sub.Unmarshal(cfg)) + require.NoError(t, component.ValidateConfig(cfg)) + + assert.Equal(t, "http://localhost:8080", cfg.ClientConfig.Endpoint) + assert.Equal(t, 5*time.Second, cfg.ClientConfig.Timeout) + assert.Equal(t, "client.crt", cfg.ClientConfig.TLSSetting.CertFile) + assert.Equal(t, 30*time.Second, cfg.Interval) + assert.Equal(t, "collector-1", cfg.CollectorID) +} + +func TestPromHTTPClientConfigValidateAuthorization(t *testing.T) { + cfg := PromHTTPClientConfig{} + require.NoError(t, component.ValidateConfig(cfg)) + cfg.Authorization = &promConfig.Authorization{} + require.NoError(t, component.ValidateConfig(cfg)) + cfg.Authorization.CredentialsFile = "none" + require.Error(t, component.ValidateConfig(cfg)) + cfg.Authorization.CredentialsFile = filepath.Join("testdata", "dummy-tls-cert-file") + require.NoError(t, component.ValidateConfig(cfg)) +} + +func TestPromHTTPClientConfigValidateTLSConfig(t *testing.T) { + cfg := PromHTTPClientConfig{} + require.NoError(t, component.ValidateConfig(cfg)) + cfg.TLSConfig.CertFile = "none" + require.Error(t, component.ValidateConfig(cfg)) + cfg.TLSConfig.CertFile = filepath.Join("testdata", "dummy-tls-cert-file") + cfg.TLSConfig.KeyFile = "none" + require.Error(t, component.ValidateConfig(cfg)) + cfg.TLSConfig.KeyFile = filepath.Join("testdata", "dummy-tls-key-file") + require.NoError(t, component.ValidateConfig(cfg)) +} + +func TestPromHTTPClientConfigValidateMain(t *testing.T) { + cfg := PromHTTPClientConfig{} + require.NoError(t, component.ValidateConfig(cfg)) + cfg.BearerToken = "foo" + cfg.BearerTokenFile = filepath.Join("testdata", "dummy-tls-key-file") + require.Error(t, component.ValidateConfig(cfg)) +} diff --git a/receiver/prometheusreceiver/targetallocator/manager.go b/receiver/prometheusreceiver/targetallocator/manager.go new file mode 100644 index 000000000000..67d29fb297d5 --- /dev/null +++ b/receiver/prometheusreceiver/targetallocator/manager.go @@ -0,0 +1,244 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package targetallocator // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver/targetallocator" + +import ( + "bytes" + "context" + "fmt" + "hash/fnv" + "io" + "net/http" + "net/url" + "os" + "sort" + "time" + + commonconfig "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + promconfig "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" + promHTTP "github.com/prometheus/prometheus/discovery/http" + "github.com/prometheus/prometheus/scrape" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/receiver" + "go.uber.org/zap" + "gopkg.in/yaml.v2" +) + +type Manager struct { + settings receiver.Settings + shutdown chan struct{} + cfg *Config + promCfg *promconfig.Config + scrapeManager *scrape.Manager + discoveryManager *discovery.Manager + enableNativeHistograms bool +} + +func NewManager(set receiver.Settings, cfg *Config, promCfg *promconfig.Config, enableNativeHistograms bool) *Manager { + return &Manager{ + shutdown: make(chan struct{}), + settings: set, + cfg: cfg, + promCfg: promCfg, + enableNativeHistograms: enableNativeHistograms, + } +} + +func (m *Manager) Start(ctx context.Context, host component.Host, sm *scrape.Manager, dm *discovery.Manager) error { + m.scrapeManager = sm + m.discoveryManager = dm + err := m.applyCfg() + if err != nil { + m.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) + return err + } + if m.cfg == nil { + // the target allocator is disabled + return nil + } + httpClient, err := m.cfg.ClientConfig.ToClient(ctx, host, m.settings.TelemetrySettings) + if err != nil { + m.settings.Logger.Error("Failed to create http client", zap.Error(err)) + return err + } + m.settings.Logger.Info("Starting target allocator discovery") + // immediately sync jobs, not waiting for the first tick + savedHash, err := m.sync(uint64(0), httpClient) + if err != nil { + return err + } + go func() { + targetAllocatorIntervalTicker := time.NewTicker(m.cfg.Interval) + for { + select { + case <-targetAllocatorIntervalTicker.C: + hash, newErr := m.sync(savedHash, httpClient) + if newErr != nil { + m.settings.Logger.Error(newErr.Error()) + continue + } + savedHash = hash + case <-m.shutdown: + targetAllocatorIntervalTicker.Stop() + m.settings.Logger.Info("Stopping target allocator") + return + } + } + }() + return nil +} + +func (m *Manager) Shutdown() { + close(m.shutdown) +} + +// sync request jobs from targetAllocator and update underlying receiver, if the response does not match the provided compareHash. +// baseDiscoveryCfg can be used to provide additional ScrapeConfigs which will be added to the retrieved jobs. +func (m *Manager) sync(compareHash uint64, httpClient *http.Client) (uint64, error) { + m.settings.Logger.Debug("Syncing target allocator jobs") + scrapeConfigsResponse, err := getScrapeConfigsResponse(httpClient, m.cfg.Endpoint) + if err != nil { + m.settings.Logger.Error("Failed to retrieve job list", zap.Error(err)) + return 0, err + } + + hash, err := getScrapeConfigHash(scrapeConfigsResponse) + if err != nil { + m.settings.Logger.Error("Failed to hash job list", zap.Error(err)) + return 0, err + } + if hash == compareHash { + // no update needed + return hash, nil + } + + // Clear out the current configurations + m.promCfg.ScrapeConfigs = []*promconfig.ScrapeConfig{} + + for jobName, scrapeConfig := range scrapeConfigsResponse { + var httpSD promHTTP.SDConfig + if m.cfg.HTTPSDConfig == nil { + httpSD = promHTTP.SDConfig{ + RefreshInterval: model.Duration(30 * time.Second), + } + } else { + httpSD = promHTTP.SDConfig(*m.cfg.HTTPSDConfig) + } + escapedJob := url.QueryEscape(jobName) + httpSD.URL = fmt.Sprintf("%s/jobs/%s/targets?collector_id=%s", m.cfg.Endpoint, escapedJob, m.cfg.CollectorID) + + err = configureSDHTTPClientConfigFromTA(&httpSD, m.cfg) + if err != nil { + m.settings.Logger.Error("Failed to configure http client config", zap.Error(err)) + return 0, err + } + + httpSD.HTTPClientConfig.FollowRedirects = false + scrapeConfig.ServiceDiscoveryConfigs = discovery.Configs{ + &httpSD, + } + + if m.cfg.HTTPScrapeConfig != nil { + scrapeConfig.HTTPClientConfig = commonconfig.HTTPClientConfig(*m.cfg.HTTPScrapeConfig) + } + + m.promCfg.ScrapeConfigs = append(m.promCfg.ScrapeConfigs, scrapeConfig) + } + + err = m.applyCfg() + if err != nil { + m.settings.Logger.Error("Failed to apply new scrape configuration", zap.Error(err)) + return 0, err + } + + return hash, nil +} + +func (m *Manager) applyCfg() error { + if !m.enableNativeHistograms { + // Enforce scraping classic histograms to avoid dropping them. + for _, scrapeConfig := range m.promCfg.ScrapeConfigs { + scrapeConfig.ScrapeClassicHistograms = true + } + } + + if err := m.scrapeManager.ApplyConfig(m.promCfg); err != nil { + return err + } + + discoveryCfg := make(map[string]discovery.Configs) + for _, scrapeConfig := range m.promCfg.ScrapeConfigs { + discoveryCfg[scrapeConfig.JobName] = scrapeConfig.ServiceDiscoveryConfigs + m.settings.Logger.Info("Scrape job added", zap.String("jobName", scrapeConfig.JobName)) + } + return m.discoveryManager.ApplyConfig(discoveryCfg) +} + +func getScrapeConfigsResponse(httpClient *http.Client, baseURL string) (map[string]*promconfig.ScrapeConfig, error) { + scrapeConfigsURL := fmt.Sprintf("%s/scrape_configs", baseURL) + _, err := url.Parse(scrapeConfigsURL) // check if valid + if err != nil { + return nil, err + } + + resp, err := httpClient.Get(scrapeConfigsURL) + if err != nil { + return nil, err + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + jobToScrapeConfig := map[string]*promconfig.ScrapeConfig{} + envReplacedBody := instantiateShard(body) + err = yaml.Unmarshal(envReplacedBody, &jobToScrapeConfig) + if err != nil { + return nil, err + } + err = resp.Body.Close() + if err != nil { + return nil, err + } + return jobToScrapeConfig, nil +} + +// instantiateShard inserts the SHARD environment variable in the returned configuration +func instantiateShard(body []byte) []byte { + shard, ok := os.LookupEnv("SHARD") + if !ok { + shard = "0" + } + return bytes.ReplaceAll(body, []byte("$(SHARD)"), []byte(shard)) +} + +// Calculate a hash for a scrape config map. +// This is done by marshaling to YAML because it's the most straightforward and doesn't run into problems with unexported fields. +func getScrapeConfigHash(jobToScrapeConfig map[string]*promconfig.ScrapeConfig) (uint64, error) { + var err error + hash := fnv.New64() + yamlEncoder := yaml.NewEncoder(hash) + + jobKeys := make([]string, 0, len(jobToScrapeConfig)) + for jobName := range jobToScrapeConfig { + jobKeys = append(jobKeys, jobName) + } + sort.Strings(jobKeys) + + for _, jobName := range jobKeys { + _, err = hash.Write([]byte(jobName)) + if err != nil { + return 0, err + } + err = yamlEncoder.Encode(jobToScrapeConfig[jobName]) + if err != nil { + return 0, err + } + } + yamlEncoder.Close() + return hash.Sum64(), err +} diff --git a/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go b/receiver/prometheusreceiver/targetallocator/manager_test.go similarity index 88% rename from receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go rename to receiver/prometheusreceiver/targetallocator/manager_test.go index 448c54022dba..3845f75ab67d 100644 --- a/receiver/prometheusreceiver/metrics_receiver_target_allocator_test.go +++ b/receiver/prometheusreceiver/targetallocator/manager_test.go @@ -3,7 +3,7 @@ //go:build !race -package prometheusreceiver +package targetallocator import ( "context" @@ -18,17 +18,20 @@ import ( "testing" "time" + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" commonconfig "github.com/prometheus/common/config" "github.com/prometheus/common/model" promconfig "github.com/prometheus/prometheus/config" + "github.com/prometheus/prometheus/discovery" promHTTP "github.com/prometheus/prometheus/discovery/http" "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/scrape" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/config/configtls" - "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" ) @@ -335,19 +338,16 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { }, }, cfg: &Config{ - PrometheusConfig: &PromConfig{GlobalConfig: promconfig.DefaultGlobalConfig}, - TargetAllocator: &TargetAllocator{ - Interval: 10 * time.Second, - CollectorID: "collector-1", - HTTPSDConfig: &PromHTTPSDConfig{ - HTTPClientConfig: commonconfig.HTTPClientConfig{ - BasicAuth: &commonconfig.BasicAuth{ - Username: "user", - Password: "aPassword", - }, + Interval: 10 * time.Second, + CollectorID: "collector-1", + HTTPSDConfig: &PromHTTPSDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{ + BasicAuth: &commonconfig.BasicAuth{ + Username: "user", + Password: "aPassword", }, - RefreshInterval: model.Duration(60 * time.Second), }, + RefreshInterval: model.Duration(60 * time.Second), }, }, want: expectedTestResult{ @@ -431,14 +431,11 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { }, }, cfg: &Config{ - PrometheusConfig: &PromConfig{GlobalConfig: promconfig.DefaultGlobalConfig}, - TargetAllocator: &TargetAllocator{ - Interval: 10 * time.Second, - CollectorID: "collector-1", - HTTPSDConfig: &PromHTTPSDConfig{ - HTTPClientConfig: commonconfig.HTTPClientConfig{}, - RefreshInterval: model.Duration(60 * time.Second), - }, + Interval: 10 * time.Second, + CollectorID: "collector-1", + HTTPSDConfig: &PromHTTPSDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{}, + RefreshInterval: model.Duration(60 * time.Second), }, }, want: expectedTestResult{ @@ -547,14 +544,11 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { }, }, cfg: &Config{ - PrometheusConfig: &PromConfig{GlobalConfig: promconfig.DefaultGlobalConfig}, - TargetAllocator: &TargetAllocator{ - Interval: 10 * time.Second, - CollectorID: "collector-1", - HTTPSDConfig: &PromHTTPSDConfig{ - HTTPClientConfig: commonconfig.HTTPClientConfig{}, - RefreshInterval: model.Duration(60 * time.Second), - }, + Interval: 10 * time.Second, + CollectorID: "collector-1", + HTTPSDConfig: &PromHTTPSDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{}, + RefreshInterval: model.Duration(60 * time.Second), }, }, want: expectedTestResult{ @@ -589,14 +583,11 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { }, }, cfg: &Config{ - PrometheusConfig: &PromConfig{GlobalConfig: promconfig.DefaultGlobalConfig}, - TargetAllocator: &TargetAllocator{ - Interval: 50 * time.Millisecond, - CollectorID: "collector-1", - HTTPSDConfig: &PromHTTPSDConfig{ - HTTPClientConfig: commonconfig.HTTPClientConfig{}, - RefreshInterval: model.Duration(60 * time.Second), - }, + Interval: 50 * time.Millisecond, + CollectorID: "collector-1", + HTTPSDConfig: &PromHTTPSDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{}, + RefreshInterval: model.Duration(60 * time.Second), }, }, want: expectedTestResult{ @@ -666,33 +657,11 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { }, }, cfg: &Config{ - PrometheusConfig: &PromConfig{ - ScrapeConfigs: []*promconfig.ScrapeConfig{ - { - JobName: "job1", - HonorTimestamps: true, - ScrapeInterval: model.Duration(30 * time.Second), - ScrapeTimeout: model.Duration(30 * time.Second), - ScrapeProtocols: promconfig.DefaultScrapeProtocols, - MetricsPath: "/metrics", - Scheme: "http", - MetricRelabelConfigs: []*relabel.Config{ - { - Separator: ";", - Regex: relabel.MustNewRegexp("(.*)"), - Action: relabel.Keep, - }, - }, - }, - }, - }, - TargetAllocator: &TargetAllocator{ - Interval: 10 * time.Second, - CollectorID: "collector-1", - HTTPSDConfig: &PromHTTPSDConfig{ - HTTPClientConfig: commonconfig.HTTPClientConfig{}, - RefreshInterval: model.Duration(60 * time.Second), - }, + Interval: 10 * time.Second, + CollectorID: "collector-1", + HTTPSDConfig: &PromHTTPSDConfig{ + HTTPClientConfig: commonconfig.HTTPClientConfig{}, + RefreshInterval: model.Duration(60 * time.Second), }, }, want: expectedTestResult{ @@ -715,7 +684,6 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { } { t.Run(tc.desc, func(t *testing.T) { ctx := context.Background() - cms := new(consumertest.MetricsSink) allocator, err := setupMockTargetAllocator(tc.responses) require.NoError(t, err, "Failed to create allocator", tc.responses) @@ -723,14 +691,16 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { allocator.Start() defer allocator.Stop() - tc.cfg.TargetAllocator.Endpoint = allocator.srv.URL // set service URL with the automatic generated one - receiver := newPrometheusReceiver(receivertest.NewNopSettings(), tc.cfg, cms) + tc.cfg.Endpoint = allocator.srv.URL // set service URL with the automatic generated one + scrapeManager, discoveryManager := initPrometheusManagers(ctx, t) - require.NoError(t, receiver.Start(ctx, componenttest.NewNopHost())) + baseCfg := promconfig.Config{GlobalConfig: promconfig.DefaultGlobalConfig} + manager := NewManager(receivertest.NewNopSettings(), tc.cfg, &baseCfg, false) + require.NoError(t, manager.Start(ctx, componenttest.NewNopHost(), scrapeManager, discoveryManager)) allocator.wg.Wait() - providers := receiver.discoveryManager.Providers() + providers := discoveryManager.Providers() if tc.want.empty { // if no base config is supplied and the job retrieval fails then no configuration should be found require.Len(t, providers, 0) @@ -747,7 +717,7 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { // are http configs applied? sdConfig := provider.Config().(*promHTTP.SDConfig) - require.Equal(t, tc.cfg.TargetAllocator.HTTPSDConfig.HTTPClientConfig, sdConfig.HTTPClientConfig) + require.Equal(t, tc.cfg.HTTPSDConfig.HTTPClientConfig, sdConfig.HTTPClientConfig) for _, group := range refresh { found := false @@ -764,7 +734,7 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { s.Labels["__meta_url"] = model.LabelValue(sdConfig.URL) require.Equal(t, s.Labels, group.Labels) if s.MetricRelabelConfig != nil { - for _, sc := range receiver.cfg.PrometheusConfig.ScrapeConfigs { + for _, sc := range manager.promCfg.ScrapeConfigs { if sc.JobName == s.MetricRelabelConfig.JobName { for _, mc := range sc.MetricRelabelConfigs { require.Equal(t, s.MetricRelabelConfig.MetricRelabelRegex, mc.Regex) @@ -782,7 +752,7 @@ func TestTargetAllocatorJobRetrieval(t *testing.T) { } func TestConfigureSDHTTPClientConfigFromTA(t *testing.T) { - ta := &TargetAllocator{} + ta := &Config{} ta.TLSSetting = configtls.ClientConfig{ InsecureSkipVerify: true, ServerName: "test.server", @@ -821,10 +791,23 @@ func TestConfigureSDHTTPClientConfigFromTA(t *testing.T) { assert.Equal(t, commonconfig.URL{URL: parsedProxyURL}, httpSD.HTTPClientConfig.ProxyURL) // Test case with empty TargetAllocator - emptyTA := &TargetAllocator{} + emptyTA := &Config{} emptyHTTPSD := &promHTTP.SDConfig{RefreshInterval: model.Duration(30 * time.Second)} err = configureSDHTTPClientConfigFromTA(emptyHTTPSD, emptyTA) assert.NoError(t, err) } + +func initPrometheusManagers(ctx context.Context, t *testing.T) (*scrape.Manager, *discovery.Manager) { + logger := log.NewNopLogger() + reg := prometheus.NewRegistry() + sdMetrics, err := discovery.RegisterSDMetrics(reg, discovery.NewRefreshMetrics(reg)) + require.NoError(t, err) + discoveryManager := discovery.NewManager(ctx, logger, reg, sdMetrics) + require.NotNil(t, discoveryManager) + + scrapeManager, err := scrape.NewManager(&scrape.Options{}, logger, nil, reg) + require.NoError(t, err) + return scrapeManager, discoveryManager +} diff --git a/receiver/prometheusreceiver/targetallocator/testdata/config.yaml b/receiver/prometheusreceiver/targetallocator/testdata/config.yaml new file mode 100644 index 000000000000..2f243000155d --- /dev/null +++ b/receiver/prometheusreceiver/targetallocator/testdata/config.yaml @@ -0,0 +1,7 @@ +target_allocator: + endpoint: http://localhost:8080 + timeout: 5s + tls: + cert_file: "client.crt" + interval: 30s + collector_id: collector-1 diff --git a/receiver/prometheusreceiver/targetallocator/testdata/dummy-tls-cert-file b/receiver/prometheusreceiver/targetallocator/testdata/dummy-tls-cert-file new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/receiver/prometheusreceiver/targetallocator/testdata/dummy-tls-key-file b/receiver/prometheusreceiver/targetallocator/testdata/dummy-tls-key-file new file mode 100644 index 000000000000..e69de29bb2d1