From 952dd9f4a374d131f1866fb45b15fd327da99cd7 Mon Sep 17 00:00:00 2001 From: SungJin1212 Date: Thu, 26 Dec 2024 20:02:19 +0900 Subject: [PATCH] Support metadata federated query Signed-off-by: SungJin1212 --- CHANGELOG.md | 1 + pkg/api/handlers.go | 6 +- pkg/cortex/cortex.go | 1 + pkg/cortex/modules.go | 7 +- pkg/querier/metadata_handler.go | 11 +- .../metadata_merge_querier.go | 109 +++++++++++++ .../metadata_merge_querier_test.go | 146 ++++++++++++++++++ 7 files changed, 275 insertions(+), 6 deletions(-) create mode 100644 pkg/querier/tenantfederation/metadata_merge_querier.go create mode 100644 pkg/querier/tenantfederation/metadata_merge_querier_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index cfbb04b6d3..60d3a50dd2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ * [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 * [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256 * [FEATURE] Ruler: Add support for per-user external labels #6340 +* [FEATURE] Query Frontend: Support a metadata federated query when `-tenant-federation.enabled=true`. #6461 * [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449 * [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423 * [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388 diff --git a/pkg/api/handlers.go b/pkg/api/handlers.go index d931d4a40d..2af395e89c 100644 --- a/pkg/api/handlers.go +++ b/pkg/api/handlers.go @@ -162,7 +162,7 @@ func NewQuerierHandler( queryable storage.SampleAndChunkQueryable, exemplarQueryable storage.ExemplarQueryable, engine promql.QueryEngine, - distributor Distributor, + metadataQuerier querier.MetadataQuerier, reg prometheus.Registerer, logger log.Logger, ) http.Handler { @@ -266,7 +266,7 @@ func NewQuerierHandler( // TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in: // https://github.com/prometheus/prometheus/pull/7125/files - router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(distributor)) + router.Path(path.Join(prefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier)) router.Path(path.Join(prefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger)) router.Path(path.Join(prefix, "/api/v1/read")).Methods("POST").Handler(promRouter) router.Path(path.Join(prefix, "/api/v1/query")).Methods("GET", "POST").Handler(promRouter) @@ -279,7 +279,7 @@ func NewQuerierHandler( // TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in: // https://github.com/prometheus/prometheus/pull/7125/files - router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(distributor)) + router.Path(path.Join(legacyPrefix, "/api/v1/metadata")).Handler(querier.MetadataHandler(metadataQuerier)) router.Path(path.Join(legacyPrefix, "/api/v1/read")).Handler(querier.RemoteReadHandler(queryable, logger)) router.Path(path.Join(legacyPrefix, "/api/v1/read")).Methods("POST").Handler(legacyPromRouter) router.Path(path.Join(legacyPrefix, "/api/v1/query")).Methods("GET", "POST").Handler(legacyPromRouter) diff --git a/pkg/cortex/cortex.go b/pkg/cortex/cortex.go index 03ceb6e3cc..ee62bf7ee5 100644 --- a/pkg/cortex/cortex.go +++ b/pkg/cortex/cortex.go @@ -311,6 +311,7 @@ type Cortex struct { RuntimeConfig *runtimeconfig.Manager QuerierQueryable prom_storage.SampleAndChunkQueryable ExemplarQueryable prom_storage.ExemplarQueryable + MetadataQuerier querier.MetadataQuerier QuerierEngine promql.QueryEngine QueryFrontendTripperware tripperware.Tripperware diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index a771c22116..3e51ca27fe 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -260,6 +260,9 @@ func (t *Cortex) initQueryable() (serv services.Service, err error) { // Create a querier queryable and PromQL engine t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine = querier.New(t.Cfg.Querier, t.Overrides, t.Distributor, t.StoreQueryables, querierRegisterer, util_log.Logger) + // Use distributor as default MetadataQuerier + t.MetadataQuerier = t.Distributor + // Register the default endpoints that are always enabled for the querier module t.API.RegisterQueryable(t.QuerierQueryable, t.Distributor) @@ -274,6 +277,8 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) { // federation. byPassForSingleQuerier := true t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer)) + t.MetadataQuerier = tenantfederation.NewMetadataQuerier(t.MetadataQuerier, t.Cfg.TenantFederation.MaxConcurrent, prometheus.DefaultRegisterer) + } return nil, nil } @@ -335,7 +340,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) { t.QuerierQueryable, t.ExemplarQueryable, t.QuerierEngine, - t.Distributor, + t.MetadataQuerier, prometheus.DefaultRegisterer, util_log.Logger, ) diff --git a/pkg/querier/metadata_handler.go b/pkg/querier/metadata_handler.go index 1db757d354..dc81d8739f 100644 --- a/pkg/querier/metadata_handler.go +++ b/pkg/querier/metadata_handler.go @@ -1,11 +1,18 @@ package querier import ( + "context" "net/http" + "github.com/prometheus/prometheus/scrape" + "github.com/cortexproject/cortex/pkg/util" ) +type MetadataQuerier interface { + MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) +} + type metricMetadata struct { Type string `json:"type"` Help string `json:"help"` @@ -25,9 +32,9 @@ type metadataResult struct { // MetadataHandler returns metric metadata held by Cortex for a given tenant. // It is kept and returned as a set. -func MetadataHandler(d Distributor) http.Handler { +func MetadataHandler(m MetadataQuerier) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - resp, err := d.MetricsMetadata(r.Context()) + resp, err := m.MetricsMetadata(r.Context()) if err != nil { w.WriteHeader(http.StatusBadRequest) util.WriteJSONResponse(w, metadataResult{Status: statusError, Error: err.Error()}) diff --git a/pkg/querier/tenantfederation/metadata_merge_querier.go b/pkg/querier/tenantfederation/metadata_merge_querier.go new file mode 100644 index 0000000000..4a51a19d65 --- /dev/null +++ b/pkg/querier/tenantfederation/metadata_merge_querier.go @@ -0,0 +1,109 @@ +package tenantfederation + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/scrape" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/querier" + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util/concurrency" + "github.com/cortexproject/cortex/pkg/util/spanlogger" +) + +// NewMetadataQuerier returns a MetadataQuerier that merges metric +// metadata for multiple tenants. +func NewMetadataQuerier(upstream querier.MetadataQuerier, maxConcurrent int, reg prometheus.Registerer) querier.MetadataQuerier { + return &mergeMetadataQuerier{ + upstream: upstream, + maxConcurrent: maxConcurrent, + + tenantsPerMetadataQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "querier_federated_tenants_per_metadata_query", + Help: "Number of tenants per metadata query.", + Buckets: []float64{1, 2, 4, 8, 16, 32, 64}, + }), + } +} + +type mergeMetadataQuerier struct { + maxConcurrent int + tenantsPerMetadataQuery prometheus.Histogram + upstream querier.MetadataQuerier +} + +type metadataSelectJob struct { + pos int + querier querier.MetadataQuerier + id string +} + +// MetricsMetadata returns aggregated metadata for multiple tenants +func (m *mergeMetadataQuerier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) { + log, ctx := spanlogger.New(ctx, "mergeMetadataQuerier.MetricsMetadata") + defer log.Span.Finish() + + tenantIds, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, err + } + + m.tenantsPerMetadataQuery.Observe(float64(len(tenantIds))) + + if len(tenantIds) == 1 { + return m.upstream.MetricsMetadata(ctx) + } + + jobs := make([]interface{}, len(tenantIds)) + results := make([][]scrape.MetricMetadata, len(tenantIds)) + + var jobPos int + for _, tenantId := range tenantIds { + jobs[jobPos] = &metadataSelectJob{ + pos: jobPos, + querier: m.upstream, + id: tenantId, + } + jobPos++ + } + + run := func(ctx context.Context, jobIntf interface{}) error { + job, ok := jobIntf.(*metadataSelectJob) + if !ok { + return fmt.Errorf("unexpected type %T", jobIntf) + } + + res, err := job.querier.MetricsMetadata(user.InjectOrgID(ctx, job.id)) + if err != nil { + return errors.Wrapf(err, "error exemplars querying %s %s", job.id, err) + } + + results[job.pos] = res + return nil + } + + err = concurrency.ForEach(ctx, jobs, m.maxConcurrent, run) + if err != nil { + return nil, err + } + + // deduplicate for the same MetricMetadata across all tenants + var ret []scrape.MetricMetadata + deduplicated := make(map[scrape.MetricMetadata]struct{}) + for _, metadata := range results { + for _, m := range metadata { + if _, ok := deduplicated[m]; !ok { + ret = append(ret, m) + deduplicated[m] = struct{}{} + } + } + } + + return ret, nil +} diff --git a/pkg/querier/tenantfederation/metadata_merge_querier_test.go b/pkg/querier/tenantfederation/metadata_merge_querier_test.go new file mode 100644 index 0000000000..1157218e25 --- /dev/null +++ b/pkg/querier/tenantfederation/metadata_merge_querier_test.go @@ -0,0 +1,146 @@ +package tenantfederation + +import ( + "context" + "fmt" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/scrape" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/tenant" +) + +var ( + expectedSingleTenantsMetadataMetrics = ` +# HELP cortex_querier_federated_tenants_per_metadata_query Number of tenants per metadata query. +# TYPE cortex_querier_federated_tenants_per_metadata_query histogram +cortex_querier_federated_tenants_per_metadata_query_bucket{le="1"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="2"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="4"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="8"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="16"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="32"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="64"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="+Inf"} 1 +cortex_querier_federated_tenants_per_metadata_query_sum 1 +cortex_querier_federated_tenants_per_metadata_query_count 1 +` + + expectedTwoTenantsMetadataMetrics = ` +# HELP cortex_querier_federated_tenants_per_metadata_query Number of tenants per metadata query. +# TYPE cortex_querier_federated_tenants_per_metadata_query histogram +cortex_querier_federated_tenants_per_metadata_query_bucket{le="1"} 0 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="2"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="4"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="8"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="16"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="32"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="64"} 1 +cortex_querier_federated_tenants_per_metadata_query_bucket{le="+Inf"} 1 +cortex_querier_federated_tenants_per_metadata_query_sum 2 +cortex_querier_federated_tenants_per_metadata_query_count 1 +` +) + +type mockMetadataQuerier struct { + tenantIdToMetadata map[string][]scrape.MetricMetadata +} + +func (m *mockMetadataQuerier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) { + // Due to lint check for `ensure the query path is supporting multiple tenants` + ids, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, err + } + + id := ids[0] + if res, ok := m.tenantIdToMetadata[id]; !ok { + return nil, fmt.Errorf("tenant not found, tenantId: %s", id) + } else { + return res, nil + } +} + +func Test_mergeMetadataQuerier_MetricsMetadata(t *testing.T) { + // set a multi tenant resolver + tenant.WithDefaultResolver(tenant.NewMultiResolver()) + + tests := []struct { + name string + tenantIdToMetadata map[string][]scrape.MetricMetadata + orgId string + expectedResults []scrape.MetricMetadata + expectedMetrics string + }{ + { + name: "single tenant", + tenantIdToMetadata: map[string][]scrape.MetricMetadata{ + "user-1": { + {Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + }, + }, + orgId: "user-1", + expectedResults: []scrape.MetricMetadata{ + {Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + }, + expectedMetrics: expectedSingleTenantsMetadataMetrics, + }, + { + name: "should be merged two tenants results", + tenantIdToMetadata: map[string][]scrape.MetricMetadata{ + "user-1": { + {Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + }, + "user-2": { + {Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, + {Metric: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""}, + }, + }, + orgId: "user-1|user-2", + expectedResults: []scrape.MetricMetadata{ + {Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + {Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, + {Metric: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""}, + }, + expectedMetrics: expectedTwoTenantsMetadataMetrics, + }, + { + name: "should be deduplicated when the same metadata exist", + tenantIdToMetadata: map[string][]scrape.MetricMetadata{ + "user-1": { + {Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + {Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, + }, + "user-2": { + {Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, + }, + }, + orgId: "user-1|user-2", + expectedResults: []scrape.MetricMetadata{ + {Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, + {Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, + }, + expectedMetrics: expectedTwoTenantsMetadataMetrics, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + upstream := mockMetadataQuerier{ + tenantIdToMetadata: test.tenantIdToMetadata, + } + + mergeMetadataQuerier := NewMetadataQuerier(&upstream, defaultMaxConcurrency, reg) + metadata, err := mergeMetadataQuerier.MetricsMetadata(user.InjectOrgID(context.Background(), test.orgId)) + require.NoError(t, err) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_metadata_query")) + require.Equal(t, test.expectedResults, metadata) + }) + } +}