diff --git a/CHANGELOG.md b/CHANGELOG.md index 590753ed3c..95e75b569c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ * [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 * [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256 * [FEATURE] Ruler: Add support for per-user external labels #6340 +* [FEATURE] Query Frontend: Support an exemplar federated query when `-tenant-federation.enabled=true`. #6455 * [ENHANCEMENT] Query Frontend: Add a `source` label to query stat metrics. #6470 * [ENHANCEMENT] Querier: Add a `-tenant-federation.max-concurrent` flags to configure the number of worker processing federated query and add a `cortex_querier_federated_tenants_per_query` histogram to track the number of tenants per query. #6449 * [ENHANCEMENT] Query Frontend: Add a number of series in the query response to the query stat log. #6423 @@ -63,6 +64,7 @@ * [BUGFIX] Ingester: Fix regression on usage of cortex_ingester_queried_chunks. #6398 * [BUGFIX] Ingester: Fix possible race condition when `active series per LabelSet` is configured. #6409 * [BUGFIX] Query Frontend: Fix @ modifier not being applied correctly on sub queries. #6450 +* [BUGFIX] Cortex Redis flags with multiple dots #6476 ## 1.18.1 2024-10-14 diff --git a/MAINTAINERS.md b/MAINTAINERS.md index 4715b51d7f..4be39c598b 100644 --- a/MAINTAINERS.md +++ b/MAINTAINERS.md @@ -11,6 +11,7 @@ ### Triaggers -| Name | Email | GitHub | Company | -|-------------|----------------------|--------------|-----------------| -| Sungjin Lee | tjdwls1201@gmail.com | @SungJin1212 | KakaoEnterprise | +| Name | Email | GitHub | Company | +|-----------------|----------------------------|-----------------|---------------------| +| Sungjin Lee | tjdwls1201@gmail.com | @SungJin1212 | KakaoEnterprise | +| Anand Rajagopal | anand.rajagopal@icloud.com | @rajagopalanand | Amazon Web Services | diff --git a/RELEASE.md b/RELEASE.md index 85577bf52f..1f4041e7ad 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -42,7 +42,7 @@ The release shepherd is responsible for the entire release series of a minor rel * We aim to keep the master branch in a working state at all times. In principle, it should be possible to cut a release from master at any time. In practice, things might not work out as nicely. A few days before the pre-release is scheduled, the shepherd should check the state of master. Following their best judgement, the shepherd should try to expedite bug fixes that are still in progress but should make it into the release. On the other hand, the shepherd may hold back merging last-minute invasive and risky changes that are better suited for the next minor release. * On the date listed in the table above, the release shepherd cuts the first pre-release (using the suffix `-rc.0`) and creates a new branch called `release-.` starting at the commit tagged for the pre-release. In general, a pre-release is considered a release candidate (that's what `rc` stands for) and should therefore not contain any known bugs that are planned to be fixed in the final release. -* With the pre-release, the release shepherd is responsible for coordinating or running the release candidate in any [end user](https://github.com/cortexproject/cortex/blob/master/ADOPTERS.md) production environment for 3 days. This is typically done in Grafana Labs or Weaveworks but we are looking for more volunteers! +* With the pre-release, the release shepherd is responsible for coordinating or running the release candidate in any [end user](https://github.com/cortexproject/cortex/blob/master/ADOPTERS.md) production environment for 3 days. * If regressions or critical bugs are detected, they need to get fixed before cutting a new pre-release (called `-rc.1`, `-rc.2`, etc.). See the next section for details on cutting an individual release. diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index f6141c08d0..8e441f1cdb 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -755,25 +755,25 @@ blocks_storage: # Path to the client certificate file, which will be used for # authenticating with the server. Also requires the key path to be # configured. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-cert-path + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-cert-path [tls_cert_path: | default = ""] # Path to the key file for the client certificate. Also requires the # client certificate to be configured. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-key-path + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-key-path [tls_key_path: | default = ""] # Path to the CA certificates file to validate server certificate # against. If not set, the host's root CA certificates are used. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-ca-path + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-ca-path [tls_ca_path: | default = ""] # Override the expected name on the server certificate. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-server-name + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-server-name [tls_server_name: | default = ""] # Skip validating server certificate. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-insecure-skip-verify + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] # If not zero then client-side caching is enabled. Client-side caching @@ -990,25 +990,25 @@ blocks_storage: # Path to the client certificate file, which will be used for # authenticating with the server. Also requires the key path to be # configured. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-cert-path + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-cert-path [tls_cert_path: | default = ""] # Path to the key file for the client certificate. Also requires the # client certificate to be configured. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-key-path + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-key-path [tls_key_path: | default = ""] # Path to the CA certificates file to validate server certificate # against. If not set, the host's root CA certificates are used. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-ca-path + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-ca-path [tls_ca_path: | default = ""] # Override the expected name on the server certificate. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-server-name + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-server-name [tls_server_name: | default = ""] # Skip validating server certificate. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-insecure-skip-verify + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] # If not zero then client-side caching is enabled. Client-side caching @@ -1231,25 +1231,25 @@ blocks_storage: # Path to the client certificate file, which will be used for # authenticating with the server. Also requires the key path to be # configured. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-cert-path + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-cert-path [tls_cert_path: | default = ""] # Path to the key file for the client certificate. Also requires the # client certificate to be configured. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-key-path + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-key-path [tls_key_path: | default = ""] # Path to the CA certificates file to validate server certificate # against. If not set, the host's root CA certificates are used. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-ca-path + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-ca-path [tls_ca_path: | default = ""] # Override the expected name on the server certificate. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-server-name + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-server-name [tls_server_name: | default = ""] # Skip validating server certificate. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-insecure-skip-verify + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] # If not zero then client-side caching is enabled. Client-side caching diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 8efa0365ba..a8d9bfa7a0 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -859,25 +859,25 @@ blocks_storage: # Path to the client certificate file, which will be used for # authenticating with the server. Also requires the key path to be # configured. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-cert-path + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-cert-path [tls_cert_path: | default = ""] # Path to the key file for the client certificate. Also requires the # client certificate to be configured. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-key-path + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-key-path [tls_key_path: | default = ""] # Path to the CA certificates file to validate server certificate # against. If not set, the host's root CA certificates are used. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-ca-path + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-ca-path [tls_ca_path: | default = ""] # Override the expected name on the server certificate. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-server-name + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-server-name [tls_server_name: | default = ""] # Skip validating server certificate. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-insecure-skip-verify + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] # If not zero then client-side caching is enabled. Client-side caching @@ -1094,25 +1094,25 @@ blocks_storage: # Path to the client certificate file, which will be used for # authenticating with the server. Also requires the key path to be # configured. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-cert-path + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-cert-path [tls_cert_path: | default = ""] # Path to the key file for the client certificate. Also requires the # client certificate to be configured. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-key-path + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-key-path [tls_key_path: | default = ""] # Path to the CA certificates file to validate server certificate # against. If not set, the host's root CA certificates are used. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-ca-path + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-ca-path [tls_ca_path: | default = ""] # Override the expected name on the server certificate. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-server-name + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-server-name [tls_server_name: | default = ""] # Skip validating server certificate. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-insecure-skip-verify + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] # If not zero then client-side caching is enabled. Client-side caching @@ -1335,25 +1335,25 @@ blocks_storage: # Path to the client certificate file, which will be used for # authenticating with the server. Also requires the key path to be # configured. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-cert-path + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-cert-path [tls_cert_path: | default = ""] # Path to the key file for the client certificate. Also requires the # client certificate to be configured. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-key-path + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-key-path [tls_key_path: | default = ""] # Path to the CA certificates file to validate server certificate # against. If not set, the host's root CA certificates are used. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-ca-path + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-ca-path [tls_ca_path: | default = ""] # Override the expected name on the server certificate. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-server-name + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-server-name [tls_server_name: | default = ""] # Skip validating server certificate. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-insecure-skip-verify + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] # If not zero then client-side caching is enabled. Client-side caching diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index 70978a36bd..6a36afefb0 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -1299,25 +1299,25 @@ bucket_store: # Path to the client certificate file, which will be used for # authenticating with the server. Also requires the key path to be # configured. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-cert-path + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-cert-path [tls_cert_path: | default = ""] # Path to the key file for the client certificate. Also requires the # client certificate to be configured. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-key-path + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-key-path [tls_key_path: | default = ""] # Path to the CA certificates file to validate server certificate against. # If not set, the host's root CA certificates are used. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-ca-path + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-ca-path [tls_ca_path: | default = ""] # Override the expected name on the server certificate. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-server-name + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-server-name [tls_server_name: | default = ""] # Skip validating server certificate. - # CLI flag: -blocks-storage.bucket-store.index-cache.redis..tls-insecure-skip-verify + # CLI flag: -blocks-storage.bucket-store.index-cache.redis.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] # If not zero then client-side caching is enabled. Client-side caching is @@ -1532,25 +1532,25 @@ bucket_store: # Path to the client certificate file, which will be used for # authenticating with the server. Also requires the key path to be # configured. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-cert-path + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-cert-path [tls_cert_path: | default = ""] # Path to the key file for the client certificate. Also requires the # client certificate to be configured. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-key-path + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-key-path [tls_key_path: | default = ""] # Path to the CA certificates file to validate server certificate against. # If not set, the host's root CA certificates are used. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-ca-path + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-ca-path [tls_ca_path: | default = ""] # Override the expected name on the server certificate. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-server-name + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-server-name [tls_server_name: | default = ""] # Skip validating server certificate. - # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis..tls-insecure-skip-verify + # CLI flag: -blocks-storage.bucket-store.chunks-cache.redis.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] # If not zero then client-side caching is enabled. Client-side caching is @@ -1770,25 +1770,25 @@ bucket_store: # Path to the client certificate file, which will be used for # authenticating with the server. Also requires the key path to be # configured. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-cert-path + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-cert-path [tls_cert_path: | default = ""] # Path to the key file for the client certificate. Also requires the # client certificate to be configured. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-key-path + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-key-path [tls_key_path: | default = ""] # Path to the CA certificates file to validate server certificate against. # If not set, the host's root CA certificates are used. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-ca-path + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-ca-path [tls_ca_path: | default = ""] # Override the expected name on the server certificate. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-server-name + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-server-name [tls_server_name: | default = ""] # Skip validating server certificate. - # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis..tls-insecure-skip-verify + # CLI flag: -blocks-storage.bucket-store.metadata-cache.redis.tls-insecure-skip-verify [tls_insecure_skip_verify: | default = false] # If not zero then client-side caching is enabled. Client-side caching is diff --git a/docs/guides/rule-evaluations-via-query-frontend.md b/docs/guides/rule-evaluations-via-query-frontend.md new file mode 100644 index 0000000000..04e62bceee --- /dev/null +++ b/docs/guides/rule-evaluations-via-query-frontend.md @@ -0,0 +1,47 @@ +--- +title: "Rule evaluations via query frontend" +linkTitle: "Rule evaluations via query frontend" +weight: 10 +slug: rule-evalutions-via-query-frontend +--- + +This guide explains how to configure the Ruler to evaluate rules via Query Frontends instead of the Ingester/Store Gateway, and the pros and cons of rule evaluation via Query Frontend. + +## How to enable + +By default, the Ruler queries both Ingesters and Store Gateway depending on the Rule time range for evaluating rules (alerting rules or recording rules). If you have set `-ruler.frontend-address`, then the Ruler queries the Query Frontend for evaluation rules. +The address should be the gRPC listen address in host:port format. + + +You can configure via args: +``` +-ruler.frontend-address=query-frontend.svc.cluster.local:9095 +``` +And via yaml: +```yaml +ruler: + frontend_address: query-frontend.svc.cluster.local:9095 +``` + +In addition, you can configure gRPC client (Ruler -> Query Frontend) config, please refer to frontend_client section in [ruler config](../configuration/config-file-reference.md#ruler_config). + +## Configure query response format +You can configure the query response format via `-ruler.query-response-format`. It is used to retrieve query results from the Query Frontend. +The supported values are `protobuf` and `json`. We recommend using `protobuf`(default) because the retrieved query results containing native histograms are only supported on `protobuf`. + + +## Pros and Cons +If this feature is enabled, the query execute path is as follows: + +Ruler -> Query Frontend -> Query Scheduler -> Querier -> Ingester/Store Gateway + +There are pros and cons regarding query performance as more hops than before (Ruler -> Ingester/Store Gateway). +### Pros +- The rule evaluation performance could be improved in such a situation where the number of Queriers pulling queries from the Query Scheduler is good enough. +If then, the queries in Query Scheduler are fetched in a reasonable time (i.e. a lot of hops are not a defect for query performance). In this environment, query performance could be improved as we can use Query Frontend features like the vertical query sharding. +- The Ruler can use fewer resources as it doesn't need to run a query engine to evaluate rules. + +### Cons +- If there are not enough Queriers, adding rule queries to Query Scheduler could cause query starvation problem (queries in Query Scheduler could not be fetched in a reasonable time), so rules cannot be evaluated in time. + +You can utilize the `cortex_prometheus_rule_evaluation_duration_seconds` metric whether to use `-ruler.frontend-address`. \ No newline at end of file diff --git a/docs/roadmap.md b/docs/roadmap.md index 73a4a3557c..45815e35f7 100644 --- a/docs/roadmap.md +++ b/docs/roadmap.md @@ -9,39 +9,31 @@ This document highlights some ideas for major features we'd like to implement in To get a more complete overview of planned features and current work, see the [issue tracker](https://github.com/cortexproject/cortex/issues). Note that these are not ordered by priority. -## Helm charts and other packaging +Last updated: January 4, 2025 -We have a [helm chart](https://github.com/cortexproject/cortex-helm-chart) but it needs work before it can be effectively utilised by different backends. We also don't provide an official set of dashboards and alerts to our users yet. This is one of the most requested features and something we will tackle in the immediate future. We also plan on publishing debs, rpms along with guides on how to run Cortex on bare-metal. +## Short-term (< 6 months) -## Auth Gateway +### Support for Prometheus Remote Write 2.0 -Cortex server has a simple authentication mechanism (X-Scope-OrgId) but users can't use the multitenancy features out of the box without complicated proxy configuration. It's hard to support all the different authentication mechanisms used by different companies but plan to have a simple but opinionated auth-gateway that provides value out of the box. The configuration could be as simple as: +[Prometheus Remote Write 2.0](https://prometheus.io/docs/specs/remote_write_spec_2_0/) -``` -tenants: -- name: infra-team - password: basic-auth-password -- name: api-team - password: basic-auth-password2 -``` +* adds a new Protobuf Message with new features enabling more use cases and wider adoption on top of performance and cost savings +* deprecates the previous Protobuf Message from a 1.0 Remote-Write specification +* adds mandatory X-Prometheus-Remote-Write-*-Written HTTP response headers for reliability purposes -## Billing and Usage analytics +For more information tracking this, please see [issue #6116](https://github.com/cortexproject/cortex/issues/6116). -We have all the metrics to track how many series, samples and queries each tenant is sending but don't have dashboards that help with this. We plan to have dashboards and UIs that will help operators monitor and control each tenants usage out of the box. +## Long-term (> 6 months) -## Downsampling -Downsampling means storing fewer samples, e.g. one per minute instead of one every 15 seconds. -This makes queries over long periods more efficient. It can reduce storage space slightly if the full-detail data is discarded. +### CNCF Graduation Status -## Per-metric retention +Cortex was accepted to the CNCF on September 20, 2018 and moved to the Incubating maturity level on August 20, 2020. The Cortex maintainers are working towards promoting the project to the graduation status. -Cortex blocks storage supports deleting all data for a tenant after a time period (e.g. 3 months, 1 year), but we would also like to have custom retention for subsets of metrics (e.g. delete server metrics but retain business metrics). +For more information tracking this, please see [issue #6075](https://github.com/cortexproject/cortex/issues/6075). -## Exemplar support -[Exemplars](https://docs.google.com/document/d/1ymZlc9yuTj8GvZyKz1r3KDRrhaOjZ1W1qZVW_5Gj7gA/edit) -let you link metric samples to other data, such as distributed tracing. -As of early 2021 Prometheus will collect exemplars and send them via remote write, but Cortex needs to be extended to handle them. +### Downsampling -## Scalability +[Downsampling](https://thanos.io/tip/components/compact.md/#downsampling) means storing fewer samples, e.g. one per minute instead of one every 15 seconds. +This makes queries over long periods more efficient. It can reduce storage space slightly if the full-detail data is discarded. -Scalability has always been a focus for the project, but there is a lot more work to be done. We can now scale to 100s of Millions of active series but 1 Billion active series is still an unknown. +For more information tracking this, please see [issue #4322](https://github.com/cortexproject/cortex/issues/4322). diff --git a/go.mod b/go.mod index d1cdc26870..2199b824fd 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( github.com/stretchr/testify v1.10.0 github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 github.com/thanos-io/promql-engine v0.0.0-20241217103156-9dbff30059cf - github.com/thanos-io/thanos v0.37.3-0.20241224143735-2d041dc774da + github.com/thanos-io/thanos v0.37.3-0.20250106173420-0e95c464dd42 github.com/uber/jaeger-client-go v2.30.0+incompatible github.com/weaveworks/common v0.0.0-20230728070032-dd9e68f319d5 go.etcd.io/etcd/api/v3 v3.5.17 diff --git a/go.sum b/go.sum index bcbf2121ac..8b7d78ed2f 100644 --- a/go.sum +++ b/go.sum @@ -1657,8 +1657,8 @@ github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97 h1:VjG0mwhN1Dkn github.com/thanos-io/objstore v0.0.0-20241111205755-d1dd89d41f97/go.mod h1:vyzFrBXgP+fGNG2FopEGWOO/zrIuoy7zt3LpLeezRsw= github.com/thanos-io/promql-engine v0.0.0-20241217103156-9dbff30059cf h1:JFh4PjC9yQidiFi4qMWbPddIgsLWPIsSEbXs75+tLxs= github.com/thanos-io/promql-engine v0.0.0-20241217103156-9dbff30059cf/go.mod h1:wx0JlRZtsB2S10JYUgeg5GqLfMxw31SzArP+28yyE00= -github.com/thanos-io/thanos v0.37.3-0.20241224143735-2d041dc774da h1:xnaeDaL1kOUgqA7BL6bPO5v5K66imXUweVYk8HqDFsA= -github.com/thanos-io/thanos v0.37.3-0.20241224143735-2d041dc774da/go.mod h1:E+olRxu9jl7KknntphHYXhLieVaMXXl/Q/Ioh6tj+oE= +github.com/thanos-io/thanos v0.37.3-0.20250106173420-0e95c464dd42 h1:UagJirP9rmR/m5ne4DyPZvAa+Yvkc5qiiPGl+jVVK9M= +github.com/thanos-io/thanos v0.37.3-0.20250106173420-0e95c464dd42/go.mod h1:VOu1neDpx4n/2OCQmfT/0RMU85UzhO35ce0S3Ew+NSk= github.com/tjhop/slog-gokit v0.1.2 h1:pmQI4SvU9h4gA0vIQsdhJQSqQg4mOmsPykG2/PM3j1I= github.com/tjhop/slog-gokit v0.1.2/go.mod h1:8fhlcp8C8ELbg3GCyKv06tgt4B5sDq2P1r2DQAu1HuM= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index 1e1485f219..85fe9fca2e 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -801,7 +801,46 @@ var comparer = cmp.Comparer(func(x, y model.Value) bool { const fraction = 1.e-10 // 0.00000001% return cmp.Equal(l, r, cmpopts.EquateNaNs(), cmpopts.EquateApprox(fraction, epsilon)) } + // count_values returns a metrics with one label {"value": "1.012321"} + compareValueMetrics := func(l, r model.Metric) (valueMetric bool, equals bool) { + lLabels := model.LabelSet(l).Clone() + rLabels := model.LabelSet(r).Clone() + var ( + lVal, rVal model.LabelValue + lFloat, rFloat float64 + ok bool + err error + ) + + if lVal, ok = lLabels["value"]; !ok { + return false, false + } + + if rVal, ok = rLabels["value"]; !ok { + return false, false + } + + if lFloat, err = strconv.ParseFloat(string(lVal), 64); err != nil { + return false, false + } + if rFloat, err = strconv.ParseFloat(string(rVal), 64); err != nil { + return false, false + } + + // Exclude the value label in comparison. + delete(lLabels, "value") + delete(rLabels, "value") + + if !lLabels.Equal(rLabels) { + return false, false + } + + return true, compareFloats(lFloat, rFloat) + } compareMetrics := func(l, r model.Metric) bool { + if valueMetric, equals := compareValueMetrics(l, r); valueMetric { + return equals + } return l.Equal(r) } diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 5b8a3640b0..6ca9f5c82d 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -274,6 +274,7 @@ func (t *Cortex) initTenantFederation() (serv services.Service, err error) { // federation. byPassForSingleQuerier := true t.QuerierQueryable = querier.NewSampleAndChunkQueryable(tenantfederation.NewQueryable(t.QuerierQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer)) + t.ExemplarQueryable = tenantfederation.NewExemplarQueryable(t.ExemplarQueryable, t.Cfg.TenantFederation.MaxConcurrent, byPassForSingleQuerier, prometheus.DefaultRegisterer) } return nil, nil } @@ -607,6 +608,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) { queryEngine = engine.New(engine.Opts{ EngineOpts: opts, LogicalOptimizers: logicalplan.AllOptimizers, + EnableAnalysis: true, }) } else { queryEngine = promql.NewEngine(opts) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index e9a80374cd..8927cd13f4 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -225,6 +225,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor queryEngine = engine.New(engine.Opts{ EngineOpts: opts, LogicalOptimizers: logicalplan.AllOptimizers, + EnableAnalysis: true, }) } else { queryEngine = promql.NewEngine(opts) diff --git a/pkg/querier/tenantfederation/exemplar_merge_queryable.go b/pkg/querier/tenantfederation/exemplar_merge_queryable.go new file mode 100644 index 0000000000..45e519af75 --- /dev/null +++ b/pkg/querier/tenantfederation/exemplar_merge_queryable.go @@ -0,0 +1,215 @@ +package tenantfederation + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util/concurrency" + "github.com/cortexproject/cortex/pkg/util/spanlogger" +) + +// NewExemplarQueryable returns a exemplarQueryable that iterates through all the +// tenant IDs that are part of the request and aggregates the results from each +// tenant's ExemplarQuerier by sending of subsequent requests. +// By setting byPassWithSingleQuerier to true the mergeExemplarQuerier gets by-passed +// and results for request with a single exemplar querier will not contain the +// "__tenant_id__" label. This allows a smoother transition, when enabling +// tenant federation in a cluster. +// The result contains a label "__tenant_id__" to identify the tenant ID that +// it originally resulted from. +// If the label "__tenant_id__" is already existing, its value is overwritten +// by the tenant ID and the previous value is exposed through a new label +// prefixed with "original_". This behaviour is not implemented recursively. +func NewExemplarQueryable(upstream storage.ExemplarQueryable, maxConcurrent int, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.ExemplarQueryable { + return NewMergeExemplarQueryable(defaultTenantLabel, maxConcurrent, tenantExemplarQuerierCallback(upstream), byPassWithSingleQuerier, reg) +} + +func tenantExemplarQuerierCallback(exemplarQueryable storage.ExemplarQueryable) MergeExemplarQuerierCallback { + return func(ctx context.Context) ([]string, []storage.ExemplarQuerier, error) { + tenantIDs, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, nil, err + } + + var queriers = make([]storage.ExemplarQuerier, len(tenantIDs)) + for pos, tenantID := range tenantIDs { + q, err := exemplarQueryable.ExemplarQuerier(user.InjectOrgID(ctx, tenantID)) + if err != nil { + return nil, nil, err + } + queriers[pos] = q + } + + return tenantIDs, queriers, nil + } +} + +// MergeExemplarQuerierCallback returns the underlying exemplar queriers and their +// IDs relevant for the query. +type MergeExemplarQuerierCallback func(ctx context.Context) (ids []string, queriers []storage.ExemplarQuerier, err error) + +// NewMergeExemplarQueryable returns a queryable that merges results from multiple +// underlying ExemplarQueryables. +// By setting byPassWithSingleQuerier to true the mergeExemplarQuerier gets by-passed +// and results for request with a single exemplar querier will not contain the +// "__tenant_id__" label. This allows a smoother transition, when enabling +// tenant federation in a cluster. +// Results contain a label `idLabelName` to identify the underlying exemplar queryable +// that it originally resulted from. +// If the label `idLabelName` is already existing, its value is overwritten and +// the previous value is exposed through a new label prefixed with "original_". +// This behaviour is not implemented recursively. +func NewMergeExemplarQueryable(idLabelName string, maxConcurrent int, callback MergeExemplarQuerierCallback, byPassWithSingleQuerier bool, reg prometheus.Registerer) storage.ExemplarQueryable { + return &mergeExemplarQueryable{ + idLabelName: idLabelName, + byPassWithSingleQuerier: byPassWithSingleQuerier, + callback: callback, + maxConcurrent: maxConcurrent, + + tenantsPerExemplarQuery: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Namespace: "cortex", + Name: "querier_federated_tenants_per_exemplar_query", + Help: "Number of tenants per exemplar query.", + Buckets: []float64{1, 2, 4, 8, 16, 32, 64}, + }), + } +} + +type mergeExemplarQueryable struct { + idLabelName string + maxConcurrent int + byPassWithSingleQuerier bool + callback MergeExemplarQuerierCallback + tenantsPerExemplarQuery prometheus.Histogram +} + +// ExemplarQuerier returns a new mergeExemplarQuerier which aggregates results from +// multiple exemplar queriers into a single result. +func (m *mergeExemplarQueryable) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { + ids, queriers, err := m.callback(ctx) + if err != nil { + return nil, err + } + + m.tenantsPerExemplarQuery.Observe(float64(len(ids))) + + if m.byPassWithSingleQuerier && len(queriers) == 1 { + return queriers[0], nil + } + + return &mergeExemplarQuerier{ + ctx: ctx, + idLabelName: m.idLabelName, + maxConcurrent: m.maxConcurrent, + tenantIds: ids, + queriers: queriers, + byPassWithSingleQuerier: m.byPassWithSingleQuerier, + }, nil +} + +// mergeExemplarQuerier aggregates the results from underlying exemplar queriers +// and adds a label `idLabelName` to identify the exemplar queryable that +// `seriesLabels` resulted from. +// If the label `idLabelName` is already existing, its value is overwritten and +// the previous value is exposed through a new label prefixed with "original_". +// This behaviour is not implemented recursively. +type mergeExemplarQuerier struct { + ctx context.Context + idLabelName string + maxConcurrent int + tenantIds []string + queriers []storage.ExemplarQuerier + byPassWithSingleQuerier bool +} + +type exemplarSelectJob struct { + pos int + querier storage.ExemplarQuerier + id string +} + +// Select returns aggregated exemplars within given time range for multiple tenants. +func (m mergeExemplarQuerier) Select(start, end int64, matchers ...[]*labels.Matcher) ([]exemplar.QueryResult, error) { + log, ctx := spanlogger.New(m.ctx, "mergeExemplarQuerier.Select") + defer log.Span.Finish() + + // filter out tenants to query and unrelated matchers + allMatchedTenantIds, allUnrelatedMatchers := filterAllTenantsAndMatchers(m.idLabelName, m.tenantIds, matchers) + jobs := make([]interface{}, len(allMatchedTenantIds)) + results := make([][]exemplar.QueryResult, len(allMatchedTenantIds)) + + var jobPos int + for idx, tenantId := range m.tenantIds { + if _, ok := allMatchedTenantIds[tenantId]; !ok { + // skip tenantIds that should not be queried + continue + } + + jobs[jobPos] = &exemplarSelectJob{ + pos: jobPos, + querier: m.queriers[idx], + id: tenantId, + } + jobPos++ + } + + run := func(ctx context.Context, jobIntf interface{}) error { + job, ok := jobIntf.(*exemplarSelectJob) + if !ok { + return fmt.Errorf("unexpected type %T", jobIntf) + } + + res, err := job.querier.Select(start, end, allUnrelatedMatchers...) + if err != nil { + return errors.Wrapf(err, "error exemplars querying %s %s", rewriteLabelName(m.idLabelName), job.id) + } + + // append __tenant__ label to `seriesLabels` to identify each tenants + for i, e := range res { + e.SeriesLabels = setLabelsRetainExisting(e.SeriesLabels, labels.Label{ + Name: m.idLabelName, + Value: job.id, + }) + res[i] = e + } + + results[job.pos] = res + return nil + } + + err := concurrency.ForEach(ctx, jobs, m.maxConcurrent, run) + if err != nil { + return nil, err + } + + var ret []exemplar.QueryResult + for _, exemplars := range results { + ret = append(ret, exemplars...) + } + + return ret, nil +} + +func filterAllTenantsAndMatchers(idLabelName string, tenantIds []string, allMatchers [][]*labels.Matcher) (map[string]struct{}, [][]*labels.Matcher) { + allMatchedTenantIds := make(map[string]struct{}) + allUnrelatedMatchers := make([][]*labels.Matcher, len(allMatchers)) + + for idx, matchers := range allMatchers { + matchedTenantIds, unrelatedMatchers := filterValuesByMatchers(idLabelName, tenantIds, matchers...) + for tenantId := range matchedTenantIds { + allMatchedTenantIds[tenantId] = struct{}{} + } + allUnrelatedMatchers[idx] = unrelatedMatchers + } + + return allMatchedTenantIds, allUnrelatedMatchers +} diff --git a/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go b/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go new file mode 100644 index 0000000000..309f2dea53 --- /dev/null +++ b/pkg/querier/tenantfederation/exemplar_merge_queryable_test.go @@ -0,0 +1,374 @@ +package tenantfederation + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/prometheus/model/exemplar" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/require" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/tenant" +) + +var ( + expectedSingleTenantsExemplarMetrics = ` +# HELP cortex_querier_federated_tenants_per_exemplar_query Number of tenants per exemplar query. +# TYPE cortex_querier_federated_tenants_per_exemplar_query histogram +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="1"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="2"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="4"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="8"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="16"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="32"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="64"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="+Inf"} 1 +cortex_querier_federated_tenants_per_exemplar_query_sum 1 +cortex_querier_federated_tenants_per_exemplar_query_count 1 +` + + expectedTwoTenantsExemplarMetrics = ` +# HELP cortex_querier_federated_tenants_per_exemplar_query Number of tenants per exemplar query. +# TYPE cortex_querier_federated_tenants_per_exemplar_query histogram +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="1"} 0 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="2"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="4"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="8"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="16"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="32"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="64"} 1 +cortex_querier_federated_tenants_per_exemplar_query_bucket{le="+Inf"} 1 +cortex_querier_federated_tenants_per_exemplar_query_sum 2 +cortex_querier_federated_tenants_per_exemplar_query_count 1 +` +) + +type mockExemplarQueryable struct { + exemplarQueriers map[string]storage.ExemplarQuerier +} + +func (m *mockExemplarQueryable) ExemplarQuerier(ctx context.Context) (storage.ExemplarQuerier, error) { + // Due to lint check for `ensure the query path is supporting multiple tenants` + ids, err := tenant.TenantIDs(ctx) + if err != nil { + return nil, err + } + + id := ids[0] + if _, ok := m.exemplarQueriers[id]; ok { + return m.exemplarQueriers[id], nil + } else { + return nil, errors.New("failed to get exemplar querier") + } +} + +type mockExemplarQuerier struct { + res []exemplar.QueryResult + err error +} + +func (m *mockExemplarQuerier) Select(_, _ int64, _ ...[]*labels.Matcher) ([]exemplar.QueryResult, error) { + if m.err != nil { + return nil, m.err + } + + return m.res, nil +} + +// getFixtureExemplarResult1 returns fixture examplar1 +func getFixtureExemplarResult1() []exemplar.QueryResult { + res := []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + } + return res +} + +// getFixtureExemplarResult2 returns fixture examplar +func getFixtureExemplarResult2() []exemplar.QueryResult { + res := []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "456"), + Value: 456, + Ts: 1734942338000, + }, + }, + }, + } + return res +} + +func Test_MergeExemplarQuerier_Select(t *testing.T) { + // set a multi tenant resolver + tenant.WithDefaultResolver(tenant.NewMultiResolver()) + + tests := []struct { + name string + upstream mockExemplarQueryable + matcher [][]*labels.Matcher + orgId string + expectedResult []exemplar.QueryResult + expectedErr error + expectedMetrics string + }{ + { + name: "should be treated as single tenant", + upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{ + "user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()}, + "user-2": &mockExemplarQuerier{res: getFixtureExemplarResult2()}, + }}, + matcher: [][]*labels.Matcher{{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"), + }}, + orgId: "user-1", + expectedResult: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + }, + expectedMetrics: expectedSingleTenantsExemplarMetrics, + }, + { + name: "two tenants results should be aggregated", + upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{ + "user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()}, + "user-2": &mockExemplarQuerier{res: getFixtureExemplarResult2()}, + }}, + matcher: [][]*labels.Matcher{{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"), + }}, + orgId: "user-1|user-2", + expectedResult: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-2"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "456"), + Value: 456, + Ts: 1734942338000, + }, + }, + }, + }, + expectedMetrics: expectedTwoTenantsExemplarMetrics, + }, + { + name: "should return the matched tenant query results", + upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{ + "user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()}, + "user-2": &mockExemplarQuerier{res: getFixtureExemplarResult2()}, + }}, + matcher: [][]*labels.Matcher{{ + labels.MustNewMatcher(labels.MatchEqual, "__tenant_id__", "user-1"), + }}, + orgId: "user-1|user-2", + expectedResult: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + }, + expectedMetrics: expectedTwoTenantsExemplarMetrics, + }, + { + name: "when the '__tenant_id__' label exist, should be converted to the 'original___tenant_id__'", + upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{ + "user-1": &mockExemplarQuerier{res: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", defaultTenantLabel, "tenant"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + }}, + "user-2": &mockExemplarQuerier{res: getFixtureExemplarResult2()}, + }}, + matcher: [][]*labels.Matcher{{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"), + }}, + orgId: "user-1|user-2", + expectedResult: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1", "original___tenant_id__", "tenant"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-2"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "456"), + Value: 456, + Ts: 1734942338000, + }, + }, + }, + }, + expectedMetrics: expectedTwoTenantsExemplarMetrics, + }, + { + name: "get error from one querier, should get error", + upstream: mockExemplarQueryable{exemplarQueriers: map[string]storage.ExemplarQuerier{ + "user-1": &mockExemplarQuerier{res: getFixtureExemplarResult1()}, + "user-2": &mockExemplarQuerier{err: errors.New("some error")}, + }}, + matcher: [][]*labels.Matcher{{ + labels.MustNewMatcher(labels.MatchEqual, "__name__", "exemplar_series"), + }}, + orgId: "user-1|user-2", + expectedResult: []exemplar.QueryResult{ + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-1"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "123"), + Value: 123, + Ts: 1734942337900, + }, + }, + }, + { + SeriesLabels: labels.FromStrings("__name__", "exemplar_series", "__tenant_id__", "user-2"), + Exemplars: []exemplar.Exemplar{ + { + Labels: labels.FromStrings("traceID", "456"), + Value: 456, + Ts: 1734942338000, + }, + }, + }, + }, + expectedErr: errors.New("some error"), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + exemplarQueryable := NewExemplarQueryable(&test.upstream, defaultMaxConcurrency, true, reg) + ctx := user.InjectOrgID(context.Background(), test.orgId) + q, err := exemplarQueryable.ExemplarQuerier(ctx) + require.NoError(t, err) + + result, err := q.Select(mint, maxt, test.matcher...) + if test.expectedErr != nil { + require.Error(t, err) + } else { + require.NoError(t, err) + require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(test.expectedMetrics), "cortex_querier_federated_tenants_per_exemplar_query")) + require.Equal(t, test.expectedResult, result) + } + }) + } +} + +func Test_filterAllTenantsAndMatchers(t *testing.T) { + idLabelName := defaultTenantLabel + + tests := []struct { + name string + tenantIds []string + allMatchers [][]*labels.Matcher + expectedLenAllMatchedTenantIds int + expectedUnrelatedMatchersCnt int + }{ + { + name: "Should match all tenants", + tenantIds: []string{"user-1", "user-2"}, + allMatchers: [][]*labels.Matcher{ + { + labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"), + }, + }, + expectedLenAllMatchedTenantIds: 2, + expectedUnrelatedMatchersCnt: 1, + }, + { + name: "Should match target tenant with the `idLabelName` matcher", + tenantIds: []string{"user-1", "user-2"}, + allMatchers: [][]*labels.Matcher{ + { + labels.MustNewMatcher(labels.MatchEqual, defaultTenantLabel, "user-1"), + }, + }, + expectedLenAllMatchedTenantIds: 1, + expectedUnrelatedMatchersCnt: 0, + }, + { + name: "Should match all tenants with the retained label name matcher", + tenantIds: []string{"user-1", "user-2"}, + allMatchers: [][]*labels.Matcher{ + { + labels.MustNewMatcher(labels.MatchEqual, retainExistingPrefix+defaultTenantLabel, "user-1"), + }, + }, + expectedLenAllMatchedTenantIds: 2, + expectedUnrelatedMatchersCnt: 1, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + allMatchedTenantIds, allUnrelatedMatchers := filterAllTenantsAndMatchers(idLabelName, test.tenantIds, test.allMatchers) + matcherCnt := 0 + for _, unrelatedMatchers := range allUnrelatedMatchers { + for _, matcher := range unrelatedMatchers { + if matcher.Name != "" { + matcherCnt++ + } + } + } + require.Equal(t, test.expectedLenAllMatchedTenantIds, len(allMatchedTenantIds)) + require.Equal(t, test.expectedUnrelatedMatchersCnt, matcherCnt) + }) + } +} diff --git a/pkg/ring/kv/etcd/mock_test.go b/pkg/ring/kv/etcd/mock_test.go index 58bf917528..4089aa9732 100644 --- a/pkg/ring/kv/etcd/mock_test.go +++ b/pkg/ring/kv/etcd/mock_test.go @@ -306,16 +306,20 @@ func TestMockKV_Watch(t *testing.T) { // returned wait group. setupWatchTest := func(key string, prefix bool) (*mockKV, context.CancelFunc, chan *clientv3.Event, *sync.WaitGroup) { kv := newMockKV() - // Use a condition to make sure the goroutine has started using the watch before + // Use a WaitGroup to make sure the goroutine has started using the watch before // we do anything to the mockKV that would emit an event the watcher is expecting - cond := sync.NewCond(&sync.Mutex{}) - wg := sync.WaitGroup{} + started := sync.WaitGroup{} + // Use another WaitGroup so that callers can tell when the channel returned by watch + // method is closed and the watch is complete. + complete := sync.WaitGroup{} + ch := make(chan *clientv3.Event) ctx, cancel := context.WithCancel(context.Background()) - wg.Add(1) + started.Add(1) + complete.Add(1) go func() { - defer wg.Done() + defer complete.Done() var ops []clientv3.OpOption if prefix { @@ -323,7 +327,7 @@ func TestMockKV_Watch(t *testing.T) { } watch := kv.Watch(ctx, key, ops...) - cond.Broadcast() + started.Done() for e := range watch { if len(e.Events) > 0 { @@ -332,33 +336,29 @@ func TestMockKV_Watch(t *testing.T) { } }() - // Wait for the watcher goroutine to start actually watching - cond.L.Lock() - cond.Wait() - cond.L.Unlock() - - return kv, cancel, ch, &wg + started.Wait() + return kv, cancel, ch, &complete } - t.Run("watch stopped by context", func(t *testing.T) { + t.Run("watch stopped by context", func(*testing.T) { // Ensure we can use the cancel method of the context given to the watch // to stop the watch - _, cancel, _, wg := setupWatchTest("/bar", false) + _, cancel, _, complete := setupWatchTest("/bar", false) cancel() - wg.Wait() + complete.Wait() }) - t.Run("watch stopped by close", func(t *testing.T) { + t.Run("watch stopped by close", func(*testing.T) { // Ensure we can use the Close method of the mockKV given to the watch // to stop the watch - kv, _, _, wg := setupWatchTest("/bar", false) + kv, _, _, complete := setupWatchTest("/bar", false) _ = kv.Close() - wg.Wait() + complete.Wait() }) t.Run("watch exact key", func(t *testing.T) { // watch for events with key "/bar" and send them via the channel - kv, cancel, ch, wg := setupWatchTest("/bar", false) + kv, cancel, ch, complete := setupWatchTest("/bar", false) _, err := kv.Put(context.Background(), "/foo", "1") require.NoError(t, err) @@ -371,12 +371,12 @@ func TestMockKV_Watch(t *testing.T) { assert.Equal(t, []byte("/bar"), event.Kv.Key) cancel() - wg.Wait() + complete.Wait() }) t.Run("watch prefix match", func(t *testing.T) { // watch for events with the prefix "/b" and send them via the channel - kv, cancel, ch, wg := setupWatchTest("/b", true) + kv, cancel, ch, complete := setupWatchTest("/b", true) _, err := kv.Delete(context.Background(), "/foo") require.NoError(t, err) @@ -389,6 +389,6 @@ func TestMockKV_Watch(t *testing.T) { assert.Equal(t, []byte("/bar"), event.Kv.Key) cancel() - wg.Wait() + complete.Wait() }) } diff --git a/pkg/storage/tsdb/redis_client_config.go b/pkg/storage/tsdb/redis_client_config.go index a0b6529db8..deb871f0b9 100644 --- a/pkg/storage/tsdb/redis_client_config.go +++ b/pkg/storage/tsdb/redis_client_config.go @@ -2,7 +2,6 @@ package tsdb import ( "flag" - "time" "github.com/pkg/errors" @@ -60,7 +59,7 @@ func (cfg *RedisClientConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix st f.StringVar(&cfg.MasterName, prefix+"master-name", "", "Specifies the master's name. Must be not empty for Redis Sentinel.") f.IntVar(&cfg.CacheSize, prefix+"cache-size", 0, "If not zero then client-side caching is enabled. Client-side caching is when data is stored in memory instead of fetching data each time. See https://redis.io/docs/manual/client-side-caching/ for more info.") f.BoolVar(&cfg.TLSEnabled, prefix+"tls-enabled", false, "Whether to enable tls for redis connection.") - cfg.TLS.RegisterFlagsWithPrefix(prefix, f) + cfg.TLS.RegisterFlagsWithPrefix(prefix[:len(prefix)-1], f) cfg.SetAsyncCircuitBreaker.RegisterFlagsWithPrefix(f, prefix+"set-async.") } diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/binary_reader.go b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/binary_reader.go index 1afaabb786..9beac22a51 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/binary_reader.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/binary_reader.go @@ -66,7 +66,7 @@ func newCRC32() hash.Hash32 { return crc32.New(castagnoliTable) } -// LazyBinaryReaderMetrics holds metrics tracked by LazyBinaryReader. +// BinaryReaderMetrics holds metrics tracked by BinaryReader. type BinaryReaderMetrics struct { downloadDuration prometheus.Histogram loadDuration prometheus.Histogram @@ -76,14 +76,18 @@ type BinaryReaderMetrics struct { func NewBinaryReaderMetrics(reg prometheus.Registerer) *BinaryReaderMetrics { return &BinaryReaderMetrics{ downloadDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "indexheader_download_duration_seconds", - Help: "Duration of the index-header download from objstore in seconds.", - Buckets: []float64{0.1, 0.2, 0.5, 1, 2, 5, 15, 30, 60, 90, 120, 300}, + Name: "indexheader_download_duration_seconds", + Help: "Duration of the index-header download from objstore in seconds.", + Buckets: []float64{0.1, 0.2, 0.5, 1, 2, 5, 15, 30, 60, 90, 120, 300}, + NativeHistogramMaxBucketNumber: 256, + NativeHistogramBucketFactor: 1.1, }), loadDuration: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "indexheader_load_duration_seconds", - Help: "Duration of the index-header loading in seconds.", - Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 15, 30, 60, 90, 120, 300}, + Name: "indexheader_load_duration_seconds", + Help: "Duration of the index-header loading in seconds.", + Buckets: []float64{0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 15, 30, 60, 90, 120, 300}, + NativeHistogramMaxBucketNumber: 256, + NativeHistogramBucketFactor: 1.1, }), } } @@ -97,7 +101,12 @@ type BinaryTOC struct { } // WriteBinary build index header from the pieces of index in object storage, and cached in file if necessary. -func WriteBinary(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID, filename string) ([]byte, error) { +func WriteBinary(ctx context.Context, bkt objstore.BucketReader, id ulid.ULID, filename string, downloadDuration prometheus.Histogram) ([]byte, error) { + start := time.Now() + + defer func() { + downloadDuration.Observe(time.Since(start).Seconds()) + }() var tmpDir = "" if filename != "" { tmpDir = filepath.Dir(filename) @@ -569,15 +578,14 @@ func NewBinaryReader(ctx context.Context, logger log.Logger, bkt objstore.Bucket level.Debug(logger).Log("msg", "failed to read index-header from disk; recreating", "path", binfn, "err", err) start := time.Now() - if _, err := WriteBinary(ctx, bkt, id, binfn); err != nil { + if _, err := WriteBinary(ctx, bkt, id, binfn, metrics.downloadDuration); err != nil { return nil, errors.Wrap(err, "write index header") } - metrics.loadDuration.Observe(time.Since(start).Seconds()) level.Debug(logger).Log("msg", "built index-header file", "path", binfn, "elapsed", time.Since(start)) return newFileBinaryReader(binfn, postingOffsetsInMemSampling, metrics) } else { - buf, err := WriteBinary(ctx, bkt, id, "") + buf, err := WriteBinary(ctx, bkt, id, "", metrics.downloadDuration) if err != nil { return nil, errors.Wrap(err, "generate index header") } diff --git a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/lazy_binary_reader.go b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/lazy_binary_reader.go index 2b36bf8025..c7a0a7a271 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/lazy_binary_reader.go +++ b/vendor/github.com/thanos-io/thanos/pkg/block/indexheader/lazy_binary_reader.go @@ -115,12 +115,11 @@ func NewLazyBinaryReader( level.Debug(logger).Log("msg", "the index-header doesn't exist on disk; recreating", "path", indexHeaderFile) start := time.Now() - if _, err := WriteBinary(ctx, bkt, id, indexHeaderFile); err != nil { + if _, err := WriteBinary(ctx, bkt, id, indexHeaderFile, binaryReaderMetrics.downloadDuration); err != nil { return nil, errors.Wrap(err, "write index header") } level.Debug(logger).Log("msg", "built index-header file", "path", indexHeaderFile, "elapsed", time.Since(start)) - binaryReaderMetrics.downloadDuration.Observe(time.Since(start).Seconds()) } } diff --git a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go index e700ffee7b..5737fac949 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go +++ b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_client.go @@ -102,6 +102,10 @@ type updatableServerSelector interface { // resolve. No attempt is made to connect to the server. If any // error occurs, no changes are made to the internal server list. SetServers(servers ...string) error + + // PickServerForKeys is like PickServer but returns a map of server address + // and corresponding keys. + PickServerForKeys(keys []string) (map[string][]string, error) } // MemcachedClientConfig is the config accepted by RemoteCacheClient. @@ -571,20 +575,13 @@ func (c *memcachedClient) getMultiSingle(ctx context.Context, keys []string) (it // *except* that keys sharded to the same server will be together. The order of keys // returned may change from call to call. func (c *memcachedClient) sortKeysByServer(keys []string) []string { - bucketed := make(map[string][]string) - - for _, key := range keys { - addr, err := c.selector.PickServer(key) - // If we couldn't determine the correct server, return keys in existing order - if err != nil { - return keys - } - - addrString := addr.String() - bucketed[addrString] = append(bucketed[addrString], key) + bucketed, err := c.selector.PickServerForKeys(keys) + // No need to pick server and sort keys if no more than 1 server. + if err != nil || len(bucketed) <= 1 { + return keys } - var out []string + out := make([]string, 0, len(keys)) for srv := range bucketed { out = append(out, bucketed[srv]...) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_server_selector.go b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_server_selector.go index 5426d6af33..bc1f36706f 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_server_selector.go +++ b/vendor/github.com/thanos-io/thanos/pkg/cacheutil/memcached_server_selector.go @@ -5,6 +5,7 @@ package cacheutil import ( "net" + "strings" "sync" "github.com/bradfitz/gomemcache/memcache" @@ -12,15 +13,6 @@ import ( "github.com/facette/natsort" ) -var ( - addrsPool = sync.Pool{ - New: func() interface{} { - addrs := make([]net.Addr, 0, 64) - return &addrs - }, - } -) - // MemcachedJumpHashSelector implements the memcache.ServerSelector // interface, utilizing a jump hash to distribute keys to servers. // @@ -30,9 +22,8 @@ var ( // with consistent DNS names where the naturally sorted order // is predictable (ie. Kubernetes statefulsets). type MemcachedJumpHashSelector struct { - // To avoid copy and pasting all memcache server list logic, - // we embed it and implement our features on top of it. - servers memcache.ServerList + mu sync.RWMutex + addrs []net.Addr } // SetServers changes a MemcachedJumpHashSelector's set of servers at @@ -53,52 +44,111 @@ func (s *MemcachedJumpHashSelector) SetServers(servers ...string) error { copy(sortedServers, servers) natsort.Sort(sortedServers) - return s.servers.SetServers(sortedServers...) + naddr := make([]net.Addr, len(servers)) + var err error + for i, server := range sortedServers { + naddr[i], err = parseStaticAddr(server) + if err != nil { + return err + } + } + + s.mu.Lock() + defer s.mu.Unlock() + s.addrs = naddr + return nil } // PickServer returns the server address that a given item // should be shared onto. func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error) { - // Unfortunately we can't read the list of server addresses from - // the original implementation, so we use Each() to fetch all of them. - addrs := *(addrsPool.Get().(*[]net.Addr)) - err := s.servers.Each(func(addr net.Addr) error { - addrs = append(addrs, addr) - return nil - }) - if err != nil { - return nil, err + s.mu.RLock() + defer s.mu.RUnlock() + if len(s.addrs) == 0 { + return nil, memcache.ErrNoServers + } else if len(s.addrs) == 1 { + return s.addrs[0], nil } + return pickServerWithJumpHash(s.addrs, key), nil +} + +// Each iterates over each server and calls the given function. +// If f returns a non-nil error, iteration will stop and that +// error will be returned. +func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error { + s.mu.RLock() + defer s.mu.RUnlock() + for _, def := range s.addrs { + if err := f(def); err != nil { + return err + } + } + return nil +} + +// PickServerForKeys is like PickServer but returns a map of server address +// and corresponding keys. +func (s *MemcachedJumpHashSelector) PickServerForKeys(keys []string) (map[string][]string, error) { + s.mu.RLock() + defer s.mu.RUnlock() // No need of a jump hash in case of 0 or 1 servers. - if len(addrs) == 0 { - addrs = (addrs)[:0] - addrsPool.Put(&addrs) + if len(s.addrs) <= 0 { return nil, memcache.ErrNoServers } - if len(addrs) == 1 { - picked := addrs[0] - addrs = (addrs)[:0] - addrsPool.Put(&addrs) + m := make(map[string][]string, len(keys)) + if len(s.addrs) == 1 { + m[s.addrs[0].String()] = keys + return m, nil + } - return picked, nil + for _, key := range keys { + // Pick a server using the jump hash. + picked := pickServerWithJumpHash(s.addrs, key).String() + m[picked] = append(m[picked], key) } + return m, nil +} + +// pickServerWithJumpHash returns the server address that a given item should be shared onto. +func pickServerWithJumpHash(addrs []net.Addr, key string) net.Addr { // Pick a server using the jump hash. cs := xxhash.Sum64String(key) idx := jumpHash(cs, len(addrs)) picked := (addrs)[idx] + return picked +} - addrs = (addrs)[:0] - addrsPool.Put(&addrs) +// Copied from https://github.com/bradfitz/gomemcache/blob/master/memcache/selector.go#L68. +func parseStaticAddr(server string) (net.Addr, error) { + if strings.Contains(server, "/") { + addr, err := net.ResolveUnixAddr("unix", server) + if err != nil { + return nil, err + } + return newStaticAddr(addr), nil + } + tcpaddr, err := net.ResolveTCPAddr("tcp", server) + if err != nil { + return nil, err + } + return newStaticAddr(tcpaddr), nil +} - return picked, nil +// Copied from https://github.com/bradfitz/gomemcache/blob/master/memcache/selector.go#L45 +// staticAddr caches the Network() and String() values from any net.Addr. +type staticAddr struct { + ntw, str string } -// Each iterates over each server and calls the given function. -// If f returns a non-nil error, iteration will stop and that -// error will be returned. -func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error { - return s.servers.Each(f) +func newStaticAddr(a net.Addr) net.Addr { + return &staticAddr{ + ntw: a.Network(), + str: a.String(), + } } + +func (s *staticAddr) Network() string { return s.ntw } +func (s *staticAddr) String() string { return s.str } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go index 33fb4732dc..efc2a4b392 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/bucket.go @@ -35,6 +35,7 @@ import ( "github.com/prometheus/prometheus/tsdb/chunks" "github.com/prometheus/prometheus/tsdb/encoding" "github.com/prometheus/prometheus/tsdb/index" + "github.com/prometheus/prometheus/util/zeropool" "github.com/weaveworks/common/httpgrpc" "golang.org/x/sync/errgroup" "google.golang.org/grpc" @@ -125,8 +126,22 @@ const ( var ( errBlockSyncConcurrencyNotValid = errors.New("the block sync concurrency must be equal or greater than 1.") hashPool = sync.Pool{New: func() interface{} { return xxhash.New() }} + postingsPool zeropool.Pool[[]storage.SeriesRef] ) +func getPostingsSlice() []storage.SeriesRef { + if p := postingsPool.Get(); p != nil { + return p + } + + // Pre-allocate slice with initial capacity. + return make([]storage.SeriesRef, 0, 1024) +} + +func putPostingsSlice(p []storage.SeriesRef) { + postingsPool.Put(p[:0]) +} + type bucketStoreMetrics struct { blocksLoaded prometheus.Gauge blockLoads prometheus.Counter @@ -436,6 +451,8 @@ type BucketStore struct { indexHeaderLazyDownloadStrategy indexheader.LazyDownloadIndexHeaderFunc requestLoggerFunc RequestLoggerFunc + + blockLifecycleCallback BlockLifecycleCallback } func (s *BucketStore) validate() error { @@ -583,6 +600,24 @@ func WithIndexHeaderLazyDownloadStrategy(strategy indexheader.LazyDownloadIndexH } } +// BlockLifecycleCallback specifies callbacks that will be called during the lifecycle of a block. +type BlockLifecycleCallback interface { + // PreAdd is called before adding a block to indicate if the block needs to be added. + // A non nil error means the block should not be added. + PreAdd(meta metadata.Meta) error +} + +type noopBlockLifecycleCallback struct{} + +func (c noopBlockLifecycleCallback) PreAdd(meta metadata.Meta) error { return nil } + +// WithBlockLifecycleCallback allows customizing callbacks of block lifecycle. +func WithBlockLifecycleCallback(c BlockLifecycleCallback) BucketStoreOption { + return func(s *BucketStore) { + s.blockLifecycleCallback = c + } +} + // NewBucketStore creates a new bucket backed store that implements the store API against // an object store bucket. It is optimized to work against high latency backends. func NewBucketStore( @@ -628,6 +663,7 @@ func NewBucketStore( sortingStrategy: sortingStrategyStore, indexHeaderLazyDownloadStrategy: indexheader.AlwaysEagerDownloadIndexHeader, requestLoggerFunc: NoopRequestLoggerFunc, + blockLifecycleCallback: &noopBlockLifecycleCallback{}, } for _, option := range options { @@ -683,6 +719,9 @@ func (s *BucketStore) SyncBlocks(ctx context.Context) error { wg.Add(1) go func() { for meta := range blockc { + if preAddErr := s.blockLifecycleCallback.PreAdd(*meta); preAddErr != nil { + continue + } if err := s.addBlock(ctx, meta); err != nil { continue } @@ -2549,6 +2588,10 @@ type bucketIndexReader struct { indexVersion int logger log.Logger + + // Posting slice to return to the postings pool on close. + // A single bucketIndexReader should have at most 1 postings slice to return. + postings []storage.SeriesRef } func newBucketIndexReader(block *bucketBlock, logger log.Logger) *bucketIndexReader { @@ -2678,13 +2721,13 @@ func (r *bucketIndexReader) ExpandedPostings( // ExpandPostingsWithContext returns the postings expanded as a slice and considers context. func ExpandPostingsWithContext(ctx context.Context, p index.Postings) ([]storage.SeriesRef, error) { - res := make([]storage.SeriesRef, 0, 1024) // Pre-allocate slice with initial capacity + res := getPostingsSlice() i := 0 for p.Next() { i++ if i%checkContextEveryNIterations == 0 { if err := ctx.Err(); err != nil { - return nil, err + return res, err } } res = append(res, p.At()) @@ -2978,6 +3021,7 @@ func (r *bucketIndexReader) fetchExpandedPostingsFromCache(ctx context.Context, } ps, err := ExpandPostingsWithContext(ctx, p) + r.postings = ps if err != nil { level.Error(r.logger).Log("msg", "failed to expand cached expanded postings, refetch postings", "id", r.block.meta.ULID.String(), "err", err) return false, nil, nil @@ -3030,12 +3074,14 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab output := make([]index.Postings, len(keys)) + var size int64 // Fetch postings from the cache with a single call. fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys, tenant) for _, dataFromCache := range fromCache { - if err := bytesLimiter.ReserveWithType(uint64(len(dataFromCache)), PostingsTouched); err != nil { - return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err) - } + size += int64(len(dataFromCache)) + } + if err := bytesLimiter.ReserveWithType(uint64(size), PostingsTouched); err != nil { + return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err) } // Iterate over all groups and fetch posting from cache. @@ -3086,13 +3132,14 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab return uint64(ptrs[i].ptr.Start), uint64(ptrs[i].ptr.End) }) + size = 0 for _, part := range parts { start := int64(part.Start) length := int64(part.End) - start - - if err := bytesLimiter.ReserveWithType(uint64(length), PostingsFetched); err != nil { - return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching postings: %s", err) - } + size += length + } + if err := bytesLimiter.ReserveWithType(uint64(size), PostingsFetched); err != nil { + return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching postings: %s", err) } g, ctx := errgroup.WithContext(ctx) @@ -3263,11 +3310,13 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser // Load series from cache, overwriting the list of ids to preload // with the missing ones. fromCache, ids := r.block.indexCache.FetchMultiSeries(ctx, r.block.meta.ULID, ids, tenant) + var size uint64 for id, b := range fromCache { r.loadedSeries[id] = b - if err := bytesLimiter.ReserveWithType(uint64(len(b)), SeriesTouched); err != nil { - return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading series from index cache: %s", err) - } + size += uint64(len(b)) + } + if err := bytesLimiter.ReserveWithType(size, SeriesTouched); err != nil { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading series from index cache: %s", err) } parts := r.block.partitioner.Partition(len(ids), func(i int) (start, end uint64) { @@ -3414,6 +3463,10 @@ func (r *bucketIndexReader) LoadSeriesForTime(ref storage.SeriesRef, lset *[]sym // Close released the underlying resources of the reader. func (r *bucketIndexReader) Close() error { r.block.pendingReaders.Done() + + if r.postings != nil { + putPostingsSlice(r.postings) + } return nil } @@ -3598,10 +3651,12 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ return uint64(pIdxs[i].offset), uint64(pIdxs[i].offset) + uint64(r.block.estimatedMaxChunkSize) }) + var size uint64 for _, p := range parts { - if err := bytesLimiter.ReserveWithType(uint64(p.End-p.Start), ChunksFetched); err != nil { - return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err) - } + size += p.End - p.Start + } + if err := bytesLimiter.ReserveWithType(size, ChunksFetched); err != nil { + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err) } for _, p := range parts { diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/cache/matchers_cache.go b/vendor/github.com/thanos-io/thanos/pkg/store/cache/matchers_cache.go new file mode 100644 index 0000000000..02aa06ca65 --- /dev/null +++ b/vendor/github.com/thanos-io/thanos/pkg/store/cache/matchers_cache.go @@ -0,0 +1,161 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storecache + +import ( + lru "github.com/hashicorp/golang-lru/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" + "golang.org/x/sync/singleflight" + + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +const DefaultCacheSize = 200 + +type NewItemFunc func() (*labels.Matcher, error) + +type MatchersCache interface { + // GetOrSet retrieves a matcher from cache or creates and stores it if not present. + // If the matcher is not in cache, it uses the provided newItem function to create it. + GetOrSet(key string, newItem NewItemFunc) (*labels.Matcher, error) +} + +// Ensure implementations satisfy the interface. +var ( + _ MatchersCache = (*LruMatchersCache)(nil) + _ MatchersCache = (*NoopMatcherCache)(nil) +) + +// NoopMatcherCache is a no-op implementation of MatchersCache that doesn't cache anything. +type NoopMatcherCache struct{} + +// NewNoopMatcherCache creates a new no-op matcher cache. +func NewNoopMatcherCache() MatchersCache { + return &NoopMatcherCache{} +} + +// GetOrSet implements MatchersCache by always creating a new matcher without caching. +func (n *NoopMatcherCache) GetOrSet(_ string, newItem NewItemFunc) (*labels.Matcher, error) { + return newItem() +} + +// LruMatchersCache implements MatchersCache with an LRU cache and metrics. +type LruMatchersCache struct { + reg prometheus.Registerer + cache *lru.Cache[string, *labels.Matcher] + metrics *matcherCacheMetrics + size int + sf singleflight.Group +} + +type MatcherCacheOption func(*LruMatchersCache) + +func WithPromRegistry(reg prometheus.Registerer) MatcherCacheOption { + return func(c *LruMatchersCache) { + c.reg = reg + } +} + +func WithSize(size int) MatcherCacheOption { + return func(c *LruMatchersCache) { + c.size = size + } +} + +func NewMatchersCache(opts ...MatcherCacheOption) (*LruMatchersCache, error) { + cache := &LruMatchersCache{ + size: DefaultCacheSize, + } + + for _, opt := range opts { + opt(cache) + } + cache.metrics = newMatcherCacheMetrics(cache.reg) + + lruCache, err := lru.NewWithEvict[string, *labels.Matcher](cache.size, cache.onEvict) + if err != nil { + return nil, err + } + cache.cache = lruCache + + return cache, nil +} + +func (c *LruMatchersCache) GetOrSet(key string, newItem NewItemFunc) (*labels.Matcher, error) { + c.metrics.requestsTotal.Inc() + v, err, _ := c.sf.Do(key, func() (interface{}, error) { + if item, ok := c.cache.Get(key); ok { + c.metrics.hitsTotal.Inc() + return item, nil + } + + item, err := newItem() + if err != nil { + return nil, err + } + c.cache.Add(key, item) + c.metrics.numItems.Set(float64(c.cache.Len())) + return item, nil + }) + + if err != nil { + return nil, err + } + return v.(*labels.Matcher), nil +} + +func (c *LruMatchersCache) onEvict(_ string, _ *labels.Matcher) { + c.metrics.evicted.Inc() + c.metrics.numItems.Set(float64(c.cache.Len())) +} + +type matcherCacheMetrics struct { + requestsTotal prometheus.Counter + hitsTotal prometheus.Counter + numItems prometheus.Gauge + maxItems prometheus.Gauge + evicted prometheus.Counter +} + +func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics { + return &matcherCacheMetrics{ + requestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_requests_total", + Help: "Total number of cache requests for series matchers", + }), + hitsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_hits_total", + Help: "Total number of cache hits for series matchers", + }), + numItems: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_matchers_cache_items", + Help: "Total number of cached items", + }), + maxItems: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_matchers_cache_max_items", + Help: "Maximum number of items that can be cached", + }), + evicted: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_evicted_total", + Help: "Total number of items evicted from the cache", + }), + } +} + +// MatchersToPromMatchersCached returns Prometheus matchers from proto matchers. +// Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions. +// NOTE: It allocates memory. +func MatchersToPromMatchersCached(cache MatchersCache, ms ...storepb.LabelMatcher) ([]*labels.Matcher, error) { + res := make([]*labels.Matcher, 0, len(ms)) + for i := range ms { + pm, err := cache.GetOrSet(ms[i].String(), func() (*labels.Matcher, error) { return storepb.MatcherToPromMatcher(ms[i]) }) + if err != nil { + return nil, err + } + res = append(res, pm) + } + return res, nil +} diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go b/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go index 783e04b31b..57b48cc342 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/lazy_postings.go @@ -310,6 +310,7 @@ func fetchAndExpandPostingGroups(ctx context.Context, r *bucketIndexReader, post return nil, nil, err } ps, err := ExpandPostingsWithContext(ctx, result) + r.postings = ps if err != nil { return nil, nil, errors.Wrap(err, "expand") } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/local.go b/vendor/github.com/thanos-io/thanos/pkg/store/local.go index cb80f8f8cb..5d72ee28af 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/local.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/local.go @@ -21,6 +21,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/runutil" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -130,7 +131,7 @@ func ScanGRPCCurlProtoStreamMessages(data []byte, atEOF bool) (advance int, toke // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels) + match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, storecache.NewNoopMatcherCache()) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/prometheus.go b/vendor/github.com/thanos-io/thanos/pkg/store/prometheus.go index 11d7f1ff77..a503d15689 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/prometheus.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/prometheus.go @@ -36,6 +36,7 @@ import ( "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/store/storepb/prompb" @@ -125,7 +126,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -488,8 +489,13 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que // matchesExternalLabels returns false if given matchers are not matching external labels. // If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels. -func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []*labels.Matcher, error) { - tms, err := storepb.MatchersToPromMatchers(ms...) +func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache storecache.MatchersCache) (bool, []*labels.Matcher, error) { + var ( + tms []*labels.Matcher + err error + ) + + tms, err = storecache.MatchersToPromMatchersCached(cache, ms...) if err != nil { return false, nil, err } @@ -537,7 +543,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -600,7 +606,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, storecache.NewNoopMatcherCache()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/proxy.go b/vendor/github.com/thanos-io/thanos/pkg/store/proxy.go index 498c80e2e7..af1ba9dae1 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/proxy.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/proxy.go @@ -25,6 +25,7 @@ import ( "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/info/infopb" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" @@ -89,6 +90,7 @@ type ProxyStore struct { retrievalStrategy RetrievalStrategy debugLogging bool tsdbSelector *TSDBSelector + matcherCache storecache.MatchersCache enableDedup bool } @@ -113,7 +115,7 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(* } } -// BucketStoreOption are functions that configure BucketStore. +// ProxyStoreOption are functions that configure the ProxyStore. type ProxyStoreOption func(s *ProxyStore) // WithProxyStoreDebugLogging toggles debug logging. @@ -137,6 +139,13 @@ func WithoutDedup() ProxyStoreOption { } } +// WithMatcherCache sets the matcher cache instance for the proxy. +func WithMatcherCache(cache storecache.MatchersCache) ProxyStoreOption { + return func(s *ProxyStore) { + s.matcherCache = cache + } +} + // NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. // Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL). func NewProxyStore( @@ -168,6 +177,7 @@ func NewProxyStore( retrievalStrategy: retrievalStrategy, tsdbSelector: DefaultSelector, enableDedup: true, + matcherCache: storecache.NewNoopMatcherCache(), } for _, option := range options { @@ -248,7 +258,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. reqLogger = log.With(reqLogger, "request", originalRequest.String()) } - match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels) + match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -353,7 +363,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, originalRequest *storepb.La if s.debugLogging { reqLogger = log.With(reqLogger, "request", originalRequest.String()) } - match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels) + match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -456,7 +466,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, originalRequest *storepb.L return nil, status.Error(codes.InvalidArgument, "label name parameter cannot be empty") } - match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels) + match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/custom.go b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/custom.go index d5461a5947..b165d76fcc 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/storepb/custom.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/storepb/custom.go @@ -385,30 +385,35 @@ func PromMatchersToMatchers(ms ...*labels.Matcher) ([]LabelMatcher, error) { // NOTE: It allocates memory. func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) { res := make([]*labels.Matcher, 0, len(ms)) - for _, m := range ms { - var t labels.MatchType - - switch m.Type { - case LabelMatcher_EQ: - t = labels.MatchEqual - case LabelMatcher_NEQ: - t = labels.MatchNotEqual - case LabelMatcher_RE: - t = labels.MatchRegexp - case LabelMatcher_NRE: - t = labels.MatchNotRegexp - default: - return nil, errors.Errorf("unrecognized label matcher type %d", m.Type) - } - m, err := labels.NewMatcher(t, m.Name, m.Value) + for i := range ms { + pm, err := MatcherToPromMatcher(ms[i]) if err != nil { return nil, err } - res = append(res, m) + res = append(res, pm) } return res, nil } +// MatcherToPromMatcher converts a Thanos label matcher to Prometheus label matcher. +func MatcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) { + var t labels.MatchType + + switch m.Type { + case LabelMatcher_EQ: + t = labels.MatchEqual + case LabelMatcher_NEQ: + t = labels.MatchNotEqual + case LabelMatcher_RE: + t = labels.MatchRegexp + case LabelMatcher_NRE: + t = labels.MatchNotRegexp + default: + return nil, errors.Errorf("unrecognized label matcher type %d", m.Type) + } + return labels.NewMatcher(t, m.Name, m.Value) +} + // MatchersToString converts label matchers to string format. // String should be parsable as a valid PromQL query metric selector. func MatchersToString(ms ...LabelMatcher) string { @@ -439,6 +444,14 @@ func (m *LabelMatcher) PromString() string { return fmt.Sprintf("%s%s%q", m.Name, m.Type.PromString(), m.Value) } +func (m *LabelMatcher) GetName() string { + return m.Name +} + +func (m *LabelMatcher) GetValue() string { + return m.Value +} + func (x LabelMatcher_Type) PromString() string { typeToStr := map[LabelMatcher_Type]string{ LabelMatcher_EQ: "=", diff --git a/vendor/github.com/thanos-io/thanos/pkg/store/tsdb.go b/vendor/github.com/thanos-io/thanos/pkg/store/tsdb.go index 737fee3bbd..a62481a53f 100644 --- a/vendor/github.com/thanos-io/thanos/pkg/store/tsdb.go +++ b/vendor/github.com/thanos-io/thanos/pkg/store/tsdb.go @@ -27,6 +27,7 @@ import ( "github.com/thanos-io/thanos/pkg/filter" "github.com/thanos-io/thanos/pkg/info/infopb" "github.com/thanos-io/thanos/pkg/runutil" + storecache "github.com/thanos-io/thanos/pkg/store/cache" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" ) @@ -53,6 +54,12 @@ func WithCuckooMetricNameStoreFilter() TSDBStoreOption { } } +func WithMatcherCacheInstance(cache storecache.MatchersCache) TSDBStoreOption { + return func(s *TSDBStore) { + s.matcherCache = cache + } +} + // TSDBStore implements the store API against a local TSDB instance. // It attaches the provided external labels to all results. It only responds with raw data // and does not support downsampling. @@ -62,6 +69,7 @@ type TSDBStore struct { component component.StoreAPI buffers sync.Pool maxBytesPerFrame int + matcherCache storecache.MatchersCache extLset labels.Labels startStoreFilterUpdate bool @@ -112,6 +120,7 @@ func NewTSDBStore( b := make([]byte, 0, initialBufSize) return &b }}, + matcherCache: storecache.NewNoopMatcherCache(), } for _, option := range options { @@ -177,13 +186,13 @@ func (s *TSDBStore) LabelSet() []labelpb.ZLabelSet { return labelSets } -func (p *TSDBStore) TSDBInfos() []infopb.TSDBInfo { - labels := p.LabelSet() +func (s *TSDBStore) TSDBInfos() []infopb.TSDBInfo { + labels := s.LabelSet() if len(labels) == 0 { return []infopb.TSDBInfo{} } - mint, maxt := p.TimeRange() + mint, maxt := s.TimeRange() return []infopb.TSDBInfo{ { Labels: labelpb.ZLabelSet{ @@ -247,7 +256,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser srv = fs } - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), s.matcherCache) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -370,7 +379,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), s.matcherCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -432,7 +441,7 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque } } - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), s.matcherCache) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 0000000000..4051830982 --- /dev/null +++ b/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,214 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func (p *panicError) Unwrap() error { + err, ok := p.value.(error) + if !ok { + return nil + } + + return err +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +// +// The returned channel will not be closed. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + g.mu.Lock() + defer g.mu.Unlock() + c.wg.Done() + if g.m[key] == c { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true + } +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + delete(g.m, key) + g.mu.Unlock() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 3922b35f95..799da9e20c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -987,7 +987,7 @@ github.com/thanos-io/promql-engine/query github.com/thanos-io/promql-engine/ringbuffer github.com/thanos-io/promql-engine/storage github.com/thanos-io/promql-engine/storage/prometheus -# github.com/thanos-io/thanos v0.37.3-0.20241224143735-2d041dc774da +# github.com/thanos-io/thanos v0.37.3-0.20250106173420-0e95c464dd42 ## explicit; go 1.23.0 github.com/thanos-io/thanos/pkg/api/query/querypb github.com/thanos-io/thanos/pkg/block @@ -1343,6 +1343,7 @@ golang.org/x/oauth2/jwt ## explicit; go 1.18 golang.org/x/sync/errgroup golang.org/x/sync/semaphore +golang.org/x/sync/singleflight # golang.org/x/sys v0.28.0 ## explicit; go 1.18 golang.org/x/sys/cpu