-
Notifications
You must be signed in to change notification settings - Fork 807
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Support metadata federated query (#6461)
Signed-off-by: SungJin1212 <[email protected]> Signed-off-by: Charlie Le <[email protected]> Co-authored-by: Charlie Le <[email protected]>
- Loading branch information
1 parent
7ec57ac
commit 4b8203c
Showing
7 changed files
with
274 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package tenantfederation | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promauto" | ||
"github.com/prometheus/prometheus/scrape" | ||
"github.com/weaveworks/common/user" | ||
|
||
"github.com/cortexproject/cortex/pkg/querier" | ||
"github.com/cortexproject/cortex/pkg/tenant" | ||
"github.com/cortexproject/cortex/pkg/util/concurrency" | ||
"github.com/cortexproject/cortex/pkg/util/spanlogger" | ||
) | ||
|
||
// NewMetadataQuerier returns a MetadataQuerier that merges metric | ||
// metadata for multiple tenants. | ||
func NewMetadataQuerier(upstream querier.MetadataQuerier, maxConcurrent int, reg prometheus.Registerer) querier.MetadataQuerier { | ||
return &mergeMetadataQuerier{ | ||
upstream: upstream, | ||
maxConcurrent: maxConcurrent, | ||
|
||
tenantsPerMetadataQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ | ||
Namespace: "cortex", | ||
Name: "querier_federated_tenants_per_metadata_query", | ||
Help: "Number of tenants per metadata query.", | ||
Buckets: []float64{1, 2, 4, 8, 16, 32, 64}, | ||
}), | ||
} | ||
} | ||
|
||
type mergeMetadataQuerier struct { | ||
maxConcurrent int | ||
tenantsPerMetadataQuery prometheus.Histogram | ||
upstream querier.MetadataQuerier | ||
} | ||
|
||
type metadataSelectJob struct { | ||
pos int | ||
querier querier.MetadataQuerier | ||
id string | ||
} | ||
|
||
// MetricsMetadata returns aggregated metadata for multiple tenants | ||
func (m *mergeMetadataQuerier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) { | ||
log, ctx := spanlogger.New(ctx, "mergeMetadataQuerier.MetricsMetadata") | ||
defer log.Span.Finish() | ||
|
||
tenantIds, err := tenant.TenantIDs(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
m.tenantsPerMetadataQuery.Observe(float64(len(tenantIds))) | ||
|
||
if len(tenantIds) == 1 { | ||
return m.upstream.MetricsMetadata(ctx) | ||
} | ||
|
||
jobs := make([]interface{}, len(tenantIds)) | ||
results := make([][]scrape.MetricMetadata, len(tenantIds)) | ||
|
||
var jobPos int | ||
for _, tenantId := range tenantIds { | ||
jobs[jobPos] = &metadataSelectJob{ | ||
pos: jobPos, | ||
querier: m.upstream, | ||
id: tenantId, | ||
} | ||
jobPos++ | ||
} | ||
|
||
run := func(ctx context.Context, jobIntf interface{}) error { | ||
job, ok := jobIntf.(*metadataSelectJob) | ||
if !ok { | ||
return fmt.Errorf("unexpected type %T", jobIntf) | ||
} | ||
|
||
res, err := job.querier.MetricsMetadata(user.InjectOrgID(ctx, job.id)) | ||
if err != nil { | ||
return errors.Wrapf(err, "error exemplars querying %s %s", job.id, err) | ||
} | ||
|
||
results[job.pos] = res | ||
return nil | ||
} | ||
|
||
err = concurrency.ForEach(ctx, jobs, m.maxConcurrent, run) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// deduplicate for the same MetricMetadata across all tenants | ||
var ret []scrape.MetricMetadata | ||
deduplicated := make(map[scrape.MetricMetadata]struct{}) | ||
for _, metadata := range results { | ||
for _, m := range metadata { | ||
if _, ok := deduplicated[m]; !ok { | ||
ret = append(ret, m) | ||
deduplicated[m] = struct{}{} | ||
} | ||
} | ||
} | ||
|
||
return ret, nil | ||
} |
146 changes: 146 additions & 0 deletions
146
pkg/querier/tenantfederation/metadata_merge_querier_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,146 @@ | ||
package tenantfederation | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/testutil" | ||
"github.com/prometheus/prometheus/scrape" | ||
"github.com/stretchr/testify/require" | ||
"github.com/weaveworks/common/user" | ||
|
||
"github.com/cortexproject/cortex/pkg/tenant" | ||
) | ||
|
||
var ( | ||
expectedSingleTenantsMetadataMetrics = ` | ||
# HELP cortex_querier_federated_tenants_per_metadata_query Number of tenants per metadata query. | ||
# TYPE cortex_querier_federated_tenants_per_metadata_query histogram | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="1"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="2"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="4"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="8"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="16"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="32"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="64"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="+Inf"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_sum 1 | ||
cortex_querier_federated_tenants_per_metadata_query_count 1 | ||
` | ||
|
||
expectedTwoTenantsMetadataMetrics = ` | ||
# HELP cortex_querier_federated_tenants_per_metadata_query Number of tenants per metadata query. | ||
# TYPE cortex_querier_federated_tenants_per_metadata_query histogram | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="1"} 0 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="2"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="4"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="8"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="16"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="32"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="64"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_bucket{le="+Inf"} 1 | ||
cortex_querier_federated_tenants_per_metadata_query_sum 2 | ||
cortex_querier_federated_tenants_per_metadata_query_count 1 | ||
` | ||
) | ||
|
||
type mockMetadataQuerier struct { | ||
tenantIdToMetadata map[string][]scrape.MetricMetadata | ||
} | ||
|
||
func (m *mockMetadataQuerier) MetricsMetadata(ctx context.Context) ([]scrape.MetricMetadata, error) { | ||
// Due to lint check for `ensure the query path is supporting multiple tenants` | ||
ids, err := tenant.TenantIDs(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
id := ids[0] | ||
if res, ok := m.tenantIdToMetadata[id]; !ok { | ||
return nil, fmt.Errorf("tenant not found, tenantId: %s", id) | ||
} else { | ||
return res, nil | ||
} | ||
} | ||
|
||
func Test_mergeMetadataQuerier_MetricsMetadata(t *testing.T) { | ||
// set a multi tenant resolver | ||
tenant.WithDefaultResolver(tenant.NewMultiResolver()) | ||
|
||
tests := []struct { | ||
name string | ||
tenantIdToMetadata map[string][]scrape.MetricMetadata | ||
orgId string | ||
expectedResults []scrape.MetricMetadata | ||
expectedMetrics string | ||
}{ | ||
{ | ||
name: "single tenant", | ||
tenantIdToMetadata: map[string][]scrape.MetricMetadata{ | ||
"user-1": { | ||
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, | ||
}, | ||
}, | ||
orgId: "user-1", | ||
expectedResults: []scrape.MetricMetadata{ | ||
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, | ||
}, | ||
expectedMetrics: expectedSingleTenantsMetadataMetrics, | ||
}, | ||
{ | ||
name: "should be merged two tenants results", | ||
tenantIdToMetadata: map[string][]scrape.MetricMetadata{ | ||
"user-1": { | ||
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, | ||
}, | ||
"user-2": { | ||
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, | ||
{Metric: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""}, | ||
}, | ||
}, | ||
orgId: "user-1|user-2", | ||
expectedResults: []scrape.MetricMetadata{ | ||
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, | ||
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, | ||
{Metric: "metadata3", Help: "metadata3 help", Type: "gauge", Unit: ""}, | ||
}, | ||
expectedMetrics: expectedTwoTenantsMetadataMetrics, | ||
}, | ||
{ | ||
name: "should be deduplicated when the same metadata exist", | ||
tenantIdToMetadata: map[string][]scrape.MetricMetadata{ | ||
"user-1": { | ||
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, | ||
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, | ||
}, | ||
"user-2": { | ||
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, | ||
}, | ||
}, | ||
orgId: "user-1|user-2", | ||
expectedResults: []scrape.MetricMetadata{ | ||
{Metric: "metadata1", Help: "metadata1 help", Type: "gauge", Unit: ""}, | ||
{Metric: "metadata2", Help: "metadata2 help", Type: "counter", Unit: ""}, | ||
}, | ||
expectedMetrics: expectedTwoTenantsMetadataMetrics, | ||
}, | ||
} | ||
|
||
for _, test := range tests { | ||
t.Run(test.name, func(t *testing.T) { | ||
reg := prometheus.NewPedanticRegistry() | ||
upstream := mockMetadataQuerier{ | ||
tenantIdToMetadata: test.tenantIdToMetadata, | ||
} | ||
|
||
mergeMetadataQuerier := NewMetadataQuerier(&upstream, defaultMaxConcurrency, reg) | ||
metadata, err := mergeMetadataQuerier.MetricsMetadata(user.InjectOrgID(context.Background(), test.orgId)) | ||
require.NoError(t, err) | ||
require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_metadata_query")) | ||
require.Equal(t, test.expectedResults, metadata) | ||
}) | ||
} | ||
} |