From de96862364385b3d64cde42c058086634952589e Mon Sep 17 00:00:00 2001 From: volokluev <3169433+volokluev@users.noreply.github.com> Date: Thu, 9 Jan 2025 16:29:38 -0800 Subject: [PATCH] feat(EAP): Trace Item resolvers (#6732) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Context In order to make room for new kinds of TraceItems (e.g. uptime monitors, errors, logs, etc),add a layer of indirection to the RPC layer Before: ``` RPC Flask Endpoint -> RPCEndpoint.execute() -> Snuba query pipeline ``` After: ``` /-> EAP resolver -> Snuba query pipeline (or whatever else) RPC Flask Endpoint -> RPCEndpoint.execute() -> Uptime resolver -> Snuba query pipeline (or whatever else) \-> Errors resolver -> Snuba query pipeline (or whatever else) ``` For rationale, please read [this doc](https://www.notion.so/sentry/Where-does-RPC-Entity-specific-code-live-1238b10e4b5d806bb328d37c5f3bf522) ### How it works: Every EAP endpoint has a `get_resolver` method based on the `TraceItemName` passed in the request meta. A resolver has the same inputs and outputs as the RPCEndpoint. For each endpoint in EAP, there is a resolver class: https://github.com/getsentry/snuba/pull/6732/files#diff-c018ef30d919d555c5e69908ce74e5f9144d613de3588a48d849f8c757b58628R22-R53 For each TraceItem we have, we implement the appropriate resolvers, this PR moves EndpointTraceItemTable and EndpointTimeSeries implementations for EAP spans into resolver implementations. Resolvers are picked up automatically if they follow the directory structure: ``` snuba/web/rpc/v1/resolvers |->R_eap_spans |->R_uptime |->R_logs |->R_etc ``` Implementing resolver classes in these directories will ❇️ just work ❇️ without needing to change endpoint code. ### Outstanding things It's not clear *yet* what is really "common" and what is not. We will figure this out through the course of doing this work --------- Co-authored-by: getsantry[bot] <66042841+getsantry[bot]@users.noreply.github.com> --- snuba/web/rpc/__init__.py | 42 +++ snuba/web/rpc/v1/endpoint_time_series.py | 318 +----------------- snuba/web/rpc/v1/endpoint_trace_item_table.py | 258 +------------- .../rpc/v1/resolvers/R_eap_spans/__init__.py | 0 .../resolvers/R_eap_spans/common/__init__.py | 0 .../R_eap_spans}/common/aggregation.py | 0 .../R_eap_spans/resolver_time_series.py | 317 +++++++++++++++++ .../R_eap_spans/resolver_trace_item_table.py | 257 ++++++++++++++ snuba/web/rpc/v1/resolvers/__init__.py | 67 ++++ tests/web/rpc/test_aggregation.py | 2 +- .../test_endpoint_trace_item_table.py | 15 + 11 files changed, 733 insertions(+), 543 deletions(-) create mode 100644 snuba/web/rpc/v1/resolvers/R_eap_spans/__init__.py create mode 100644 snuba/web/rpc/v1/resolvers/R_eap_spans/common/__init__.py rename snuba/web/rpc/{ => v1/resolvers/R_eap_spans}/common/aggregation.py (100%) create mode 100644 snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py create mode 100644 snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py create mode 100644 snuba/web/rpc/v1/resolvers/__init__.py diff --git a/snuba/web/rpc/__init__.py b/snuba/web/rpc/__init__.py index 13311b53893..56a82d4e548 100644 --- a/snuba/web/rpc/__init__.py +++ b/snuba/web/rpc/__init__.py @@ -5,6 +5,7 @@ from google.protobuf.message import DecodeError from google.protobuf.message import Message as ProtobufMessage from sentry_protos.snuba.v1.error_pb2 import Error as ErrorProto +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemName from snuba import environment from snuba.utils.metrics.backends.abstract import MetricsBackend @@ -25,6 +26,42 @@ Tout = TypeVar("Tout", bound=ProtobufMessage) +class TraceItemDataResolver(Generic[Tin, Tout], metaclass=RegisteredClass): + def __init__( + self, timer: Timer | None = None, metrics_backend: MetricsBackend | None = None + ) -> None: + self._timer = timer or Timer("endpoint_timing") + self._metrics_backend = metrics_backend or environment.metrics + + @classmethod + def config_key(cls) -> str: + return f"{cls.endpoint_name()}__{cls.trace_item_name()}" + + @classmethod + def endpoint_name(cls) -> str: + if cls.__name__ == "TraceItemDataResolver": + return cls.__name__ + raise NotImplementedError + + @classmethod + def trace_item_name(cls) -> TraceItemName.ValueType: + return TraceItemName.TRACE_ITEM_NAME_UNSPECIFIED + + @classmethod + def get_from_trace_item_name( + cls, trace_item_name: TraceItemName.ValueType + ) -> "Type[TraceItemDataResolver[Tin, Tout]]": + return cast( + Type["TraceItemDataResolver[Tin, Tout]"], + getattr(cls, "_registry").get_class_from_name( + f"{cls.endpoint_name()}__{trace_item_name}" + ), + ) + + def resolve(self, in_msg: Tin) -> Tout: + raise NotImplementedError + + class RPCEndpoint(Generic[Tin, Tout], metaclass=RegisteredClass): def __init__(self, metrics_backend: MetricsBackend | None = None) -> None: self._timer = Timer("endpoint_timing") @@ -46,6 +83,11 @@ def version(cls) -> str: def config_key(cls) -> str: return f"{cls.__name__}__{cls.version()}" + def get_resolver( + self, trace_item_name: TraceItemName.ValueType + ) -> TraceItemDataResolver[Tin, Tout]: + raise NotImplementedError + @property def metrics(self) -> MetricsWrapper: return MetricsWrapper( diff --git a/snuba/web/rpc/v1/endpoint_time_series.py b/snuba/web/rpc/v1/endpoint_time_series.py index 78e9848b24c..b1f234c6853 100644 --- a/snuba/web/rpc/v1/endpoint_time_series.py +++ b/snuba/web/rpc/v1/endpoint_time_series.py @@ -1,51 +1,16 @@ import math import uuid -from collections import defaultdict -from datetime import datetime -from typing import Any, Dict, Iterable, Type +from typing import Type -from google.protobuf.json_format import MessageToDict -from google.protobuf.timestamp_pb2 import Timestamp from sentry_protos.snuba.v1.endpoint_time_series_pb2 import ( - DataPoint, - TimeSeries, TimeSeriesRequest, TimeSeriesResponse, ) -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ExtrapolationMode +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemName -from snuba.attribution.appid import AppID -from snuba.attribution.attribution_info import AttributionInfo -from snuba.datasets.entities.entity_key import EntityKey -from snuba.datasets.entities.factory import get_entity -from snuba.datasets.pluggable_dataset import PluggableDataset -from snuba.query import OrderBy, OrderByDirection, SelectedExpression -from snuba.query.data_source.simple import Entity -from snuba.query.dsl import Functions as f -from snuba.query.dsl import column -from snuba.query.logical import Query -from snuba.query.query_settings import HTTPQuerySettings -from snuba.request import Request as SnubaRequest -from snuba.web.query import run_query -from snuba.web.rpc import RPCEndpoint -from snuba.web.rpc.common.aggregation import ( - ExtrapolationContext, - aggregation_to_expression, - get_average_sample_rate_column, - get_confidence_interval_column, - get_count_column, -) -from snuba.web.rpc.common.common import ( - attribute_key_to_expression, - base_conditions_and, - trace_item_filters_to_expression, - treeify_or_and_conditions, -) -from snuba.web.rpc.common.debug_info import ( - extract_response_meta, - setup_trace_query_settings, -) +from snuba.web.rpc import RPCEndpoint, TraceItemDataResolver from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException +from snuba.web.rpc.v1.resolvers import ResolverTimeSeries _VALID_GRANULARITY_SECS = set( [ @@ -68,249 +33,6 @@ _MAX_BUCKETS_IN_REQUEST = 2016 -def _convert_result_timeseries( - request: TimeSeriesRequest, data: list[Dict[str, Any]] -) -> Iterable[TimeSeries]: - """This function takes the results of the clickhouse query and converts it to a list of TimeSeries objects. It also handles - zerofilling data points where data was not present for a specific bucket. - - Example: - data is a list of dictionaries that look like this: - - >>> [ - >>> {"time": "2024-4-20 16:20:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g1", "group_by_attr_2": "a1"} - >>> {"time": "2024-4-20 16:20:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g1", "group_by_attr_2": "a2"} - >>> {"time": "2024-4-20 16:20:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g2", "group_by_attr_2": "a1"} - >>> {"time": "2024-4-20 16:20:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g2", "group_by_attr_2": "a2"} - >>> # next time bucket starts below - - >>> {"time": "2024-4-20 16:21:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g1", "group_by_attr_2": "a1"} - >>> # here you can see that not every timeseries had data in every time bucket - >>> {"time": "2024-4-20 16:22:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g1", "group_by_attr_2": "a2"} - >>> {"time": "2024-4-20 16:23:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g2", "group_by_attr_2": "a1"} - >>> {"time": "2024-4-20 16:24:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g2", "group_by_attr_2": "a2"} - - >>> ... - >>> ] - - In this example we have 8 different timeseries and they are all sparse: - - sum(sentry.duration), group_by_attributes = {"group_by_attr_1": "g1", "group_by_attr_2": "a1"} - sum(sentry.duration), group_by_attributes = {"group_by_attr_1": "g1", "group_by_attr_2": "a2"} - sum(sentry.duration), group_by_attributes = {"group_by_attr_1": "g2", "group_by_attr_2": "a1"} - sum(sentry.duration), group_by_attributes = {"group_by_attr_1": "g2", "group_by_attr_2": "a2"} - - - p95(sentry.duration), group_by_attributes = {"group_by_attr_1": "g1", "group_by_attr_2": "a1"} - p95(sentry.duration), group_by_attributes = {"group_by_attr_1": "g1", "group_by_attr_2": "a2"} - p95(sentry.duration), group_by_attributes = {"group_by_attr_1": "g2", "group_by_attr_2": "a1"} - p95(sentry.duration), group_by_attributes = {"group_by_attr_1": "g2", "group_by_attr_2": "a2"} - - Returns: - an Iterable of TimeSeries objects where each possible bucket has a DataPoint with `data_present` set correctly - - """ - - # to convert the results, need to know which were the groupby columns and which ones - # were aggregations - aggregation_labels = set([agg.label for agg in request.aggregations]) - group_by_labels = set([attr.name for attr in request.group_by]) - - # create a mapping with (all the group by attribute key,val pairs as strs, label name) - # In the example in the docstring it would look like: - # { ("group_by_attr_1,g1|group_by_attr_2,g2", "sum(sentry.duration"): TimeSeries()} - result_timeseries: dict[tuple[str, str], TimeSeries] = {} - - # create a mapping for each timeseries of timestamp: row to fill data points not returned in the query - # { - # ("group_by_attr_1,g1|group_by_attr_2,g2", "sum(sentry.duration"): { - # time_converted_to_integer_timestamp: row_data_for_that_time_bucket - # } - # } - result_timeseries_timestamp_to_row: defaultdict[ - tuple[str, str], dict[int, Dict[str, Any]] - ] = defaultdict(dict) - - query_duration = ( - request.meta.end_timestamp.seconds - request.meta.start_timestamp.seconds - ) - time_buckets = [ - Timestamp(seconds=(request.meta.start_timestamp.seconds) + secs) - for secs in range(0, query_duration, request.granularity_secs) - ] - - # this loop fill in our pre-computed dictionaries so that we can zerofill later - for row in data: - group_by_map = {} - - for col_name, col_value in row.items(): - if col_name in group_by_labels: - group_by_map[col_name] = str(col_value) - - group_by_key = "|".join([f"{k},{v}" for k, v in group_by_map.items()]) - for col_name in aggregation_labels: - if not result_timeseries.get((group_by_key, col_name), None): - result_timeseries[(group_by_key, col_name)] = TimeSeries( - group_by_attributes=group_by_map, - label=col_name, - buckets=time_buckets, - ) - result_timeseries_timestamp_to_row[(group_by_key, col_name)][ - int(datetime.fromisoformat(row["time"]).timestamp()) - ] = row - - # Go through every possible time bucket in the query, if there's row data for it, fill in its data - # otherwise put a dummy datapoint in - - for bucket in time_buckets: - for timeseries_key, timeseries in result_timeseries.items(): - row_data = result_timeseries_timestamp_to_row.get(timeseries_key, {}).get( - bucket.seconds - ) - if not row_data: - timeseries.data_points.append(DataPoint(data=0, data_present=False)) - else: - extrapolation_context = ExtrapolationContext.from_row( - timeseries.label, row_data - ) - if extrapolation_context.is_data_present: - timeseries.data_points.append( - DataPoint( - data=row_data[timeseries.label], - data_present=True, - avg_sampling_rate=extrapolation_context.average_sample_rate, - reliability=extrapolation_context.reliability, - ) - ) - else: - timeseries.data_points.append(DataPoint(data=0, data_present=False)) - return result_timeseries.values() - - -def _build_query(request: TimeSeriesRequest) -> Query: - # TODO: This is hardcoded still - entity = Entity( - key=EntityKey("eap_spans"), - schema=get_entity(EntityKey("eap_spans")).get_data_model(), - sample=None, - ) - - aggregation_columns = [ - SelectedExpression( - name=aggregation.label, expression=aggregation_to_expression(aggregation) - ) - for aggregation in request.aggregations - ] - - additional_context_columns = [] - for aggregation in request.aggregations: - if ( - aggregation.extrapolation_mode - == ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED - ): - confidence_interval_column = get_confidence_interval_column(aggregation) - if confidence_interval_column is not None: - additional_context_columns.append( - SelectedExpression( - name=confidence_interval_column.alias, - expression=confidence_interval_column, - ) - ) - - average_sample_rate_column = get_average_sample_rate_column(aggregation) - additional_context_columns.append( - SelectedExpression( - name=average_sample_rate_column.alias, - expression=average_sample_rate_column, - ) - ) - - count_column = get_count_column(aggregation) - additional_context_columns.append( - SelectedExpression(name=count_column.alias, expression=count_column) - ) - - groupby_columns = [ - SelectedExpression( - name=attr_key.name, expression=attribute_key_to_expression(attr_key) - ) - for attr_key in request.group_by - ] - - res = Query( - from_clause=entity, - selected_columns=[ - # buckets time by granularity according to the start time of the request. - # time_slot = start_time + (((timestamp - start_time) // granularity) * granularity) - # Example: - # start_time = 1001 - # end_time = 1901 - # granularity = 300 - # timestamps = [1201, 1002, 1302, 1400, 1700] - # buckets = [1001, 1301, 1601] # end time not included because it would be filtered out by the request - SelectedExpression( - name="time", - expression=f.toDateTime( - f.plus( - request.meta.start_timestamp.seconds, - f.multiply( - f.intDiv( - f.minus( - f.toUnixTimestamp(column("timestamp")), - request.meta.start_timestamp.seconds, - ), - request.granularity_secs, - ), - request.granularity_secs, - ), - ), - alias="time_slot", - ), - ), - *aggregation_columns, - *groupby_columns, - *additional_context_columns, - ], - granularity=request.granularity_secs, - condition=base_conditions_and( - request.meta, trace_item_filters_to_expression(request.filter) - ), - groupby=[ - column("time_slot"), - *[attribute_key_to_expression(attr_key) for attr_key in request.group_by], - ], - order_by=[ - OrderBy(expression=column("time_slot"), direction=OrderByDirection.ASC) - ], - ) - treeify_or_and_conditions(res) - return res - - -def _build_snuba_request(request: TimeSeriesRequest) -> SnubaRequest: - query_settings = ( - setup_trace_query_settings() if request.meta.debug else HTTPQuerySettings() - ) - - return SnubaRequest( - id=uuid.UUID(request.meta.request_id), - original_body=MessageToDict(request), - query=_build_query(request), - query_settings=query_settings, - attribution_info=AttributionInfo( - referrer=request.meta.referrer, - team="eap", - feature="eap", - tenant_ids={ - "organization_id": request.meta.organization_id, - "referrer": request.meta.referrer, - }, - app_id=AppID("eap"), - parent_api="eap_span_samples", - ), - ) - - def _enforce_no_duplicate_labels(request: TimeSeriesRequest) -> None: labels = set() @@ -364,6 +86,13 @@ def request_class(cls) -> Type[TimeSeriesRequest]: def response_class(cls) -> Type[TimeSeriesResponse]: return TimeSeriesResponse + def get_resolver( + self, trace_item_name: TraceItemName.ValueType + ) -> TraceItemDataResolver[TimeSeriesRequest, TimeSeriesResponse]: + return ResolverTimeSeries.get_from_trace_item_name(trace_item_name)( + timer=self._timer, metrics_backend=self._metrics_backend + ) + def _execute(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse: # TODO: Move this to base in_msg.meta.request_id = getattr(in_msg.meta, "request_id", None) or str( @@ -371,22 +100,9 @@ def _execute(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse: ) _enforce_no_duplicate_labels(in_msg) _validate_time_buckets(in_msg) - snuba_request = _build_snuba_request(in_msg) - res = run_query( - dataset=PluggableDataset(name="eap", all_entities=[]), - request=snuba_request, - timer=self._timer, - ) - response_meta = extract_response_meta( - in_msg.meta.request_id, - in_msg.meta.debug, - [res], - [self._timer], - ) - - return TimeSeriesResponse( - result_timeseries=list( - _convert_result_timeseries(in_msg, res.result.get("data", [])) - ), - meta=response_meta, - ) + # NOTE: EAP spans was the first TraceItem, we didn't enforce a trace item name originally so we default to it + # for backwards compatibility + if in_msg.meta.trace_item_name == TraceItemName.TRACE_ITEM_NAME_UNSPECIFIED: + in_msg.meta.trace_item_name = TraceItemName.TRACE_ITEM_NAME_EAP_SPANS + resolver = self.get_resolver(in_msg.meta.trace_item_name) + return resolver.resolve(in_msg) diff --git a/snuba/web/rpc/v1/endpoint_trace_item_table.py b/snuba/web/rpc/v1/endpoint_trace_item_table.py index 8b887e135cc..7328f6ff277 100644 --- a/snuba/web/rpc/v1/endpoint_trace_item_table.py +++ b/snuba/web/rpc/v1/endpoint_trace_item_table.py @@ -1,239 +1,20 @@ import uuid -from collections import defaultdict -from dataclasses import replace -from typing import Any, Callable, Dict, Iterable, Sequence, Type +from typing import Type -from google.protobuf.json_format import MessageToDict from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( Column, - TraceItemColumnValues, TraceItemTableRequest, TraceItemTableResponse, ) -from sentry_protos.snuba.v1.request_common_pb2 import PageToken -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( - AttributeKey, - AttributeValue, - ExtrapolationMode, -) +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemName -from snuba.attribution.appid import AppID -from snuba.attribution.attribution_info import AttributionInfo -from snuba.datasets.entities.entity_key import EntityKey -from snuba.datasets.entities.factory import get_entity -from snuba.datasets.pluggable_dataset import PluggableDataset -from snuba.query import OrderBy, OrderByDirection, SelectedExpression -from snuba.query.data_source.simple import Entity -from snuba.query.logical import Query -from snuba.query.query_settings import HTTPQuerySettings -from snuba.request import Request as SnubaRequest -from snuba.web.query import run_query -from snuba.web.rpc import RPCEndpoint -from snuba.web.rpc.common.aggregation import ( - ExtrapolationContext, - aggregation_to_expression, - get_average_sample_rate_column, - get_confidence_interval_column, - get_count_column, -) -from snuba.web.rpc.common.common import ( - apply_virtual_columns, - attribute_key_to_expression, - base_conditions_and, - trace_item_filters_to_expression, - treeify_or_and_conditions, -) -from snuba.web.rpc.common.debug_info import ( - extract_response_meta, - setup_trace_query_settings, -) +from snuba.web.rpc import RPCEndpoint, TraceItemDataResolver from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException - -_DEFAULT_ROW_LIMIT = 10_000 +from snuba.web.rpc.v1.resolvers import ResolverTraceItemTable _GROUP_BY_DISALLOWED_COLUMNS = ["timestamp"] -def _convert_order_by( - order_by: Sequence[TraceItemTableRequest.OrderBy], -) -> Sequence[OrderBy]: - res: list[OrderBy] = [] - for x in order_by: - direction = OrderByDirection.DESC if x.descending else OrderByDirection.ASC - if x.column.HasField("key"): - res.append( - OrderBy( - direction=direction, - expression=attribute_key_to_expression(x.column.key), - ) - ) - elif x.column.HasField("aggregation"): - res.append( - OrderBy( - direction=direction, - expression=aggregation_to_expression(x.column.aggregation), - ) - ) - return res - - -def _build_query(request: TraceItemTableRequest) -> Query: - # TODO: This is hardcoded still - entity = Entity( - key=EntityKey("eap_spans"), - schema=get_entity(EntityKey("eap_spans")).get_data_model(), - sample=None, - ) - - selected_columns = [] - for column in request.columns: - if column.HasField("key"): - key_col = attribute_key_to_expression(column.key) - # The key_col expression alias may differ from the column label. That is okay - # the attribute key name is used in the groupby, the column label is just the name of - # the returned attribute value - selected_columns.append( - SelectedExpression(name=column.label, expression=key_col) - ) - elif column.HasField("aggregation"): - function_expr = aggregation_to_expression(column.aggregation) - # aggregation label may not be set and the column label takes priority anyways. - function_expr = replace(function_expr, alias=column.label) - selected_columns.append( - SelectedExpression(name=column.label, expression=function_expr) - ) - - if ( - column.aggregation.extrapolation_mode - == ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED - ): - confidence_interval_column = get_confidence_interval_column( - column.aggregation - ) - if confidence_interval_column is not None: - selected_columns.append( - SelectedExpression( - name=confidence_interval_column.alias, - expression=confidence_interval_column, - ) - ) - - average_sample_rate_column = get_average_sample_rate_column( - column.aggregation - ) - count_column = get_count_column(column.aggregation) - selected_columns.append( - SelectedExpression( - name=average_sample_rate_column.alias, - expression=average_sample_rate_column, - ) - ) - selected_columns.append( - SelectedExpression(name=count_column.alias, expression=count_column) - ) - else: - raise BadSnubaRPCRequestException( - "Column is neither an aggregate or an attribute" - ) - - res = Query( - from_clause=entity, - selected_columns=selected_columns, - condition=base_conditions_and( - request.meta, - trace_item_filters_to_expression(request.filter), - ), - order_by=_convert_order_by(request.order_by), - groupby=[ - attribute_key_to_expression(attr_key) for attr_key in request.group_by - ], - # protobuf sets limit to 0 by default if it is not set, - # give it a default value that will actually return data - limit=request.limit if request.limit > 0 else _DEFAULT_ROW_LIMIT, - ) - treeify_or_and_conditions(res) - apply_virtual_columns(res, request.virtual_column_contexts) - return res - - -def _build_snuba_request(request: TraceItemTableRequest) -> SnubaRequest: - query_settings = ( - setup_trace_query_settings() if request.meta.debug else HTTPQuerySettings() - ) - - return SnubaRequest( - id=uuid.UUID(request.meta.request_id), - original_body=MessageToDict(request), - query=_build_query(request), - query_settings=query_settings, - attribution_info=AttributionInfo( - referrer=request.meta.referrer, - team="eap", - feature="eap", - tenant_ids={ - "organization_id": request.meta.organization_id, - "referrer": request.meta.referrer, - }, - app_id=AppID("eap"), - parent_api="eap_span_samples", - ), - ) - - -def _convert_results( - request: TraceItemTableRequest, data: Iterable[Dict[str, Any]] -) -> list[TraceItemColumnValues]: - - converters: Dict[str, Callable[[Any], AttributeValue]] = {} - - for column in request.columns: - if column.HasField("key"): - if column.key.type == AttributeKey.TYPE_BOOLEAN: - converters[column.label] = lambda x: AttributeValue(val_bool=bool(x)) - elif column.key.type == AttributeKey.TYPE_STRING: - converters[column.label] = lambda x: AttributeValue(val_str=str(x)) - elif column.key.type == AttributeKey.TYPE_INT: - converters[column.label] = lambda x: AttributeValue(val_int=int(x)) - elif column.key.type == AttributeKey.TYPE_FLOAT: - converters[column.label] = lambda x: AttributeValue(val_float=float(x)) - elif column.HasField("aggregation"): - converters[column.label] = lambda x: AttributeValue(val_float=float(x)) - else: - raise BadSnubaRPCRequestException( - "column is neither an attribute or aggregation" - ) - - res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues) - for row in data: - for column_name, value in row.items(): - if column_name in converters.keys(): - res[column_name].results.append(converters[column_name](value)) - res[column_name].attribute_name = column_name - extrapolation_context = ExtrapolationContext.from_row(column_name, row) - if extrapolation_context.is_extrapolated: - res[column_name].reliabilities.append( - extrapolation_context.reliability - ) - - column_ordering = {column.label: i for i, column in enumerate(request.columns)} - - return list( - # we return the columns in the order they were requested - sorted( - res.values(), key=lambda c: column_ordering.__getitem__(c.attribute_name) - ) - ) - - -def _get_page_token( - request: TraceItemTableRequest, response: list[TraceItemColumnValues] -) -> PageToken: - if not response: - return PageToken(offset=0) - num_rows = len(response[0].results) - return PageToken(offset=request.page_token.offset + num_rows) - - def _apply_labels_to_columns(in_msg: TraceItemTableRequest) -> TraceItemTableRequest: def _apply_label_to_column(column: Column) -> None: if column.label != "" and column.label is not None: @@ -299,6 +80,13 @@ def version(cls) -> str: def request_class(cls) -> Type[TraceItemTableRequest]: return TraceItemTableRequest + def get_resolver( + self, trace_item_name: TraceItemName.ValueType + ) -> TraceItemDataResolver[TraceItemTableRequest, TraceItemTableResponse]: + return ResolverTraceItemTable.get_from_trace_item_name(trace_item_name)( + timer=self._timer, metrics_backend=self._metrics_backend + ) + @classmethod def response_class(cls) -> Type[TraceItemTableResponse]: return TraceItemTableResponse @@ -311,21 +99,9 @@ def _execute(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse: in_msg.meta.request_id = getattr(in_msg.meta, "request_id", None) or str( uuid.uuid4() ) - snuba_request = _build_snuba_request(in_msg) - res = run_query( - dataset=PluggableDataset(name="eap", all_entities=[]), - request=snuba_request, - timer=self._timer, - ) - column_values = _convert_results(in_msg, res.result.get("data", [])) - response_meta = extract_response_meta( - in_msg.meta.request_id, - in_msg.meta.debug, - [res], - [self._timer], - ) - return TraceItemTableResponse( - column_values=column_values, - page_token=_get_page_token(in_msg, column_values), - meta=response_meta, - ) + # NOTE: EAP spans was the first TraceItem, we didn't enforce a trace item name originally so we default to it + # for backwards compatibility + if in_msg.meta.trace_item_name == TraceItemName.TRACE_ITEM_NAME_UNSPECIFIED: + in_msg.meta.trace_item_name = TraceItemName.TRACE_ITEM_NAME_EAP_SPANS + resolver = self.get_resolver(in_msg.meta.trace_item_name) + return resolver.resolve(in_msg) diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/__init__.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/common/__init__.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/common/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/snuba/web/rpc/common/aggregation.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py similarity index 100% rename from snuba/web/rpc/common/aggregation.py rename to snuba/web/rpc/v1/resolvers/R_eap_spans/common/aggregation.py diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py new file mode 100644 index 00000000000..5e9a14b52bc --- /dev/null +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_time_series.py @@ -0,0 +1,317 @@ +import uuid +from collections import defaultdict +from datetime import datetime +from typing import Any, Dict, Iterable + +from google.protobuf.json_format import MessageToDict +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.endpoint_time_series_pb2 import ( + DataPoint, + TimeSeries, + TimeSeriesRequest, + TimeSeriesResponse, +) +from sentry_protos.snuba.v1.request_common_pb2 import TraceItemName +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ExtrapolationMode + +from snuba.attribution.appid import AppID +from snuba.attribution.attribution_info import AttributionInfo +from snuba.datasets.entities.entity_key import EntityKey +from snuba.datasets.entities.factory import get_entity +from snuba.datasets.pluggable_dataset import PluggableDataset +from snuba.query import OrderBy, OrderByDirection, SelectedExpression +from snuba.query.data_source.simple import Entity +from snuba.query.dsl import Functions as f +from snuba.query.dsl import column +from snuba.query.logical import Query +from snuba.query.query_settings import HTTPQuerySettings +from snuba.request import Request as SnubaRequest +from snuba.web.query import run_query +from snuba.web.rpc.common.common import ( + attribute_key_to_expression, + base_conditions_and, + trace_item_filters_to_expression, + treeify_or_and_conditions, +) +from snuba.web.rpc.common.debug_info import ( + extract_response_meta, + setup_trace_query_settings, +) +from snuba.web.rpc.v1.resolvers import ResolverTimeSeries +from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import ( + ExtrapolationContext, + aggregation_to_expression, + get_average_sample_rate_column, + get_confidence_interval_column, + get_count_column, +) + + +def _convert_result_timeseries( + request: TimeSeriesRequest, data: list[Dict[str, Any]] +) -> Iterable[TimeSeries]: + """This function takes the results of the clickhouse query and converts it to a list of TimeSeries objects. It also handles + zerofilling data points where data was not present for a specific bucket. + + Example: + data is a list of dictionaries that look like this: + + >>> [ + >>> {"time": "2024-4-20 16:20:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g1", "group_by_attr_2": "a1"} + >>> {"time": "2024-4-20 16:20:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g1", "group_by_attr_2": "a2"} + >>> {"time": "2024-4-20 16:20:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g2", "group_by_attr_2": "a1"} + >>> {"time": "2024-4-20 16:20:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g2", "group_by_attr_2": "a2"} + >>> # next time bucket starts below + + >>> {"time": "2024-4-20 16:21:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g1", "group_by_attr_2": "a1"} + >>> # here you can see that not every timeseries had data in every time bucket + >>> {"time": "2024-4-20 16:22:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g1", "group_by_attr_2": "a2"} + >>> {"time": "2024-4-20 16:23:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g2", "group_by_attr_2": "a1"} + >>> {"time": "2024-4-20 16:24:00", "sum(sentry.duration)": 1235, "p95(sentry.duration)": 123456, "group_by_attr_1": "g2", "group_by_attr_2": "a2"} + + >>> ... + >>> ] + + In this example we have 8 different timeseries and they are all sparse: + + sum(sentry.duration), group_by_attributes = {"group_by_attr_1": "g1", "group_by_attr_2": "a1"} + sum(sentry.duration), group_by_attributes = {"group_by_attr_1": "g1", "group_by_attr_2": "a2"} + sum(sentry.duration), group_by_attributes = {"group_by_attr_1": "g2", "group_by_attr_2": "a1"} + sum(sentry.duration), group_by_attributes = {"group_by_attr_1": "g2", "group_by_attr_2": "a2"} + + + p95(sentry.duration), group_by_attributes = {"group_by_attr_1": "g1", "group_by_attr_2": "a1"} + p95(sentry.duration), group_by_attributes = {"group_by_attr_1": "g1", "group_by_attr_2": "a2"} + p95(sentry.duration), group_by_attributes = {"group_by_attr_1": "g2", "group_by_attr_2": "a1"} + p95(sentry.duration), group_by_attributes = {"group_by_attr_1": "g2", "group_by_attr_2": "a2"} + + Returns: + an Iterable of TimeSeries objects where each possible bucket has a DataPoint with `data_present` set correctly + + """ + + # to convert the results, need to know which were the groupby columns and which ones + # were aggregations + aggregation_labels = set([agg.label for agg in request.aggregations]) + group_by_labels = set([attr.name for attr in request.group_by]) + + # create a mapping with (all the group by attribute key,val pairs as strs, label name) + # In the example in the docstring it would look like: + # { ("group_by_attr_1,g1|group_by_attr_2,g2", "sum(sentry.duration"): TimeSeries()} + result_timeseries: dict[tuple[str, str], TimeSeries] = {} + + # create a mapping for each timeseries of timestamp: row to fill data points not returned in the query + # { + # ("group_by_attr_1,g1|group_by_attr_2,g2", "sum(sentry.duration"): { + # time_converted_to_integer_timestamp: row_data_for_that_time_bucket + # } + # } + result_timeseries_timestamp_to_row: defaultdict[ + tuple[str, str], dict[int, Dict[str, Any]] + ] = defaultdict(dict) + + query_duration = ( + request.meta.end_timestamp.seconds - request.meta.start_timestamp.seconds + ) + time_buckets = [ + Timestamp(seconds=(request.meta.start_timestamp.seconds) + secs) + for secs in range(0, query_duration, request.granularity_secs) + ] + + # this loop fill in our pre-computed dictionaries so that we can zerofill later + for row in data: + group_by_map = {} + + for col_name, col_value in row.items(): + if col_name in group_by_labels: + group_by_map[col_name] = str(col_value) + + group_by_key = "|".join([f"{k},{v}" for k, v in group_by_map.items()]) + for col_name in aggregation_labels: + if not result_timeseries.get((group_by_key, col_name), None): + result_timeseries[(group_by_key, col_name)] = TimeSeries( + group_by_attributes=group_by_map, + label=col_name, + buckets=time_buckets, + ) + result_timeseries_timestamp_to_row[(group_by_key, col_name)][ + int(datetime.fromisoformat(row["time"]).timestamp()) + ] = row + + # Go through every possible time bucket in the query, if there's row data for it, fill in its data + # otherwise put a dummy datapoint in + + for bucket in time_buckets: + for timeseries_key, timeseries in result_timeseries.items(): + row_data = result_timeseries_timestamp_to_row.get(timeseries_key, {}).get( + bucket.seconds + ) + if not row_data: + timeseries.data_points.append(DataPoint(data=0, data_present=False)) + else: + extrapolation_context = ExtrapolationContext.from_row( + timeseries.label, row_data + ) + if extrapolation_context.is_data_present: + timeseries.data_points.append( + DataPoint( + data=row_data[timeseries.label], + data_present=True, + avg_sampling_rate=extrapolation_context.average_sample_rate, + reliability=extrapolation_context.reliability, + ) + ) + else: + timeseries.data_points.append(DataPoint(data=0, data_present=False)) + return result_timeseries.values() + + +def _build_query(request: TimeSeriesRequest) -> Query: + # TODO: This is hardcoded still + entity = Entity( + key=EntityKey("eap_spans"), + schema=get_entity(EntityKey("eap_spans")).get_data_model(), + sample=None, + ) + + aggregation_columns = [ + SelectedExpression( + name=aggregation.label, expression=aggregation_to_expression(aggregation) + ) + for aggregation in request.aggregations + ] + + additional_context_columns = [] + for aggregation in request.aggregations: + if ( + aggregation.extrapolation_mode + == ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED + ): + confidence_interval_column = get_confidence_interval_column(aggregation) + if confidence_interval_column is not None: + additional_context_columns.append( + SelectedExpression( + name=confidence_interval_column.alias, + expression=confidence_interval_column, + ) + ) + + average_sample_rate_column = get_average_sample_rate_column(aggregation) + additional_context_columns.append( + SelectedExpression( + name=average_sample_rate_column.alias, + expression=average_sample_rate_column, + ) + ) + + count_column = get_count_column(aggregation) + additional_context_columns.append( + SelectedExpression(name=count_column.alias, expression=count_column) + ) + + groupby_columns = [ + SelectedExpression( + name=attr_key.name, expression=attribute_key_to_expression(attr_key) + ) + for attr_key in request.group_by + ] + + res = Query( + from_clause=entity, + selected_columns=[ + # buckets time by granularity according to the start time of the request. + # time_slot = start_time + (((timestamp - start_time) // granularity) * granularity) + # Example: + # start_time = 1001 + # end_time = 1901 + # granularity = 300 + # timestamps = [1201, 1002, 1302, 1400, 1700] + # buckets = [1001, 1301, 1601] # end time not included because it would be filtered out by the request + SelectedExpression( + name="time", + expression=f.toDateTime( + f.plus( + request.meta.start_timestamp.seconds, + f.multiply( + f.intDiv( + f.minus( + f.toUnixTimestamp(column("timestamp")), + request.meta.start_timestamp.seconds, + ), + request.granularity_secs, + ), + request.granularity_secs, + ), + ), + alias="time_slot", + ), + ), + *aggregation_columns, + *groupby_columns, + *additional_context_columns, + ], + granularity=request.granularity_secs, + condition=base_conditions_and( + request.meta, trace_item_filters_to_expression(request.filter) + ), + groupby=[ + column("time_slot"), + *[attribute_key_to_expression(attr_key) for attr_key in request.group_by], + ], + order_by=[ + OrderBy(expression=column("time_slot"), direction=OrderByDirection.ASC) + ], + ) + treeify_or_and_conditions(res) + return res + + +def _build_snuba_request(request: TimeSeriesRequest) -> SnubaRequest: + query_settings = ( + setup_trace_query_settings() if request.meta.debug else HTTPQuerySettings() + ) + + return SnubaRequest( + id=uuid.UUID(request.meta.request_id), + original_body=MessageToDict(request), + query=_build_query(request), + query_settings=query_settings, + attribution_info=AttributionInfo( + referrer=request.meta.referrer, + team="eap", + feature="eap", + tenant_ids={ + "organization_id": request.meta.organization_id, + "referrer": request.meta.referrer, + }, + app_id=AppID("eap"), + parent_api="eap_span_samples", + ), + ) + + +class ResolverTimeSeriesEAPSpans(ResolverTimeSeries): + @classmethod + def trace_item_name(cls) -> TraceItemName.ValueType: + return TraceItemName.TRACE_ITEM_NAME_EAP_SPANS + + def resolve(self, in_msg: TimeSeriesRequest) -> TimeSeriesResponse: + snuba_request = _build_snuba_request(in_msg) + res = run_query( + dataset=PluggableDataset(name="eap", all_entities=[]), + request=snuba_request, + timer=self._timer, + ) + response_meta = extract_response_meta( + in_msg.meta.request_id, + in_msg.meta.debug, + [res], + [self._timer], + ) + + return TimeSeriesResponse( + result_timeseries=list( + _convert_result_timeseries(in_msg, res.result.get("data", [])) + ), + meta=response_meta, + ) diff --git a/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py new file mode 100644 index 00000000000..9710e0c6eec --- /dev/null +++ b/snuba/web/rpc/v1/resolvers/R_eap_spans/resolver_trace_item_table.py @@ -0,0 +1,257 @@ +import uuid +from collections import defaultdict +from dataclasses import replace +from typing import Any, Callable, Dict, Iterable, Sequence + +from google.protobuf.json_format import MessageToDict +from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( + TraceItemColumnValues, + TraceItemTableRequest, + TraceItemTableResponse, +) +from sentry_protos.snuba.v1.request_common_pb2 import PageToken, TraceItemName +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + AttributeKey, + AttributeValue, + ExtrapolationMode, +) + +from snuba.attribution.appid import AppID +from snuba.attribution.attribution_info import AttributionInfo +from snuba.datasets.entities.entity_key import EntityKey +from snuba.datasets.entities.factory import get_entity +from snuba.datasets.pluggable_dataset import PluggableDataset +from snuba.query import OrderBy, OrderByDirection, SelectedExpression +from snuba.query.data_source.simple import Entity +from snuba.query.logical import Query +from snuba.query.query_settings import HTTPQuerySettings +from snuba.request import Request as SnubaRequest +from snuba.web.query import run_query +from snuba.web.rpc.common.common import ( + apply_virtual_columns, + attribute_key_to_expression, + base_conditions_and, + trace_item_filters_to_expression, + treeify_or_and_conditions, +) +from snuba.web.rpc.common.debug_info import ( + extract_response_meta, + setup_trace_query_settings, +) +from snuba.web.rpc.common.exceptions import BadSnubaRPCRequestException +from snuba.web.rpc.v1.resolvers import ResolverTraceItemTable +from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import ( + ExtrapolationContext, + aggregation_to_expression, + get_average_sample_rate_column, + get_confidence_interval_column, + get_count_column, +) + +_DEFAULT_ROW_LIMIT = 10_000 + + +def _convert_order_by( + order_by: Sequence[TraceItemTableRequest.OrderBy], +) -> Sequence[OrderBy]: + res: list[OrderBy] = [] + for x in order_by: + direction = OrderByDirection.DESC if x.descending else OrderByDirection.ASC + if x.column.HasField("key"): + res.append( + OrderBy( + direction=direction, + expression=attribute_key_to_expression(x.column.key), + ) + ) + elif x.column.HasField("aggregation"): + res.append( + OrderBy( + direction=direction, + expression=aggregation_to_expression(x.column.aggregation), + ) + ) + return res + + +def _build_query(request: TraceItemTableRequest) -> Query: + # TODO: This is hardcoded still + entity = Entity( + key=EntityKey("eap_spans"), + schema=get_entity(EntityKey("eap_spans")).get_data_model(), + sample=None, + ) + + selected_columns = [] + for column in request.columns: + if column.HasField("key"): + key_col = attribute_key_to_expression(column.key) + # The key_col expression alias may differ from the column label. That is okay + # the attribute key name is used in the groupby, the column label is just the name of + # the returned attribute value + selected_columns.append( + SelectedExpression(name=column.label, expression=key_col) + ) + elif column.HasField("aggregation"): + function_expr = aggregation_to_expression(column.aggregation) + # aggregation label may not be set and the column label takes priority anyways. + function_expr = replace(function_expr, alias=column.label) + selected_columns.append( + SelectedExpression(name=column.label, expression=function_expr) + ) + + if ( + column.aggregation.extrapolation_mode + == ExtrapolationMode.EXTRAPOLATION_MODE_SAMPLE_WEIGHTED + ): + confidence_interval_column = get_confidence_interval_column( + column.aggregation + ) + if confidence_interval_column is not None: + selected_columns.append( + SelectedExpression( + name=confidence_interval_column.alias, + expression=confidence_interval_column, + ) + ) + + average_sample_rate_column = get_average_sample_rate_column( + column.aggregation + ) + count_column = get_count_column(column.aggregation) + selected_columns.append( + SelectedExpression( + name=average_sample_rate_column.alias, + expression=average_sample_rate_column, + ) + ) + selected_columns.append( + SelectedExpression(name=count_column.alias, expression=count_column) + ) + else: + raise BadSnubaRPCRequestException( + "Column is neither an aggregate or an attribute" + ) + + res = Query( + from_clause=entity, + selected_columns=selected_columns, + condition=base_conditions_and( + request.meta, + trace_item_filters_to_expression(request.filter), + ), + order_by=_convert_order_by(request.order_by), + groupby=[ + attribute_key_to_expression(attr_key) for attr_key in request.group_by + ], + # protobuf sets limit to 0 by default if it is not set, + # give it a default value that will actually return data + limit=request.limit if request.limit > 0 else _DEFAULT_ROW_LIMIT, + ) + treeify_or_and_conditions(res) + apply_virtual_columns(res, request.virtual_column_contexts) + return res + + +def _build_snuba_request(request: TraceItemTableRequest) -> SnubaRequest: + query_settings = ( + setup_trace_query_settings() if request.meta.debug else HTTPQuerySettings() + ) + + return SnubaRequest( + id=uuid.UUID(request.meta.request_id), + original_body=MessageToDict(request), + query=_build_query(request), + query_settings=query_settings, + attribution_info=AttributionInfo( + referrer=request.meta.referrer, + team="eap", + feature="eap", + tenant_ids={ + "organization_id": request.meta.organization_id, + "referrer": request.meta.referrer, + }, + app_id=AppID("eap"), + parent_api="eap_span_samples", + ), + ) + + +def _convert_results( + request: TraceItemTableRequest, data: Iterable[Dict[str, Any]] +) -> list[TraceItemColumnValues]: + + converters: Dict[str, Callable[[Any], AttributeValue]] = {} + + for column in request.columns: + if column.HasField("key"): + if column.key.type == AttributeKey.TYPE_BOOLEAN: + converters[column.label] = lambda x: AttributeValue(val_bool=bool(x)) + elif column.key.type == AttributeKey.TYPE_STRING: + converters[column.label] = lambda x: AttributeValue(val_str=str(x)) + elif column.key.type == AttributeKey.TYPE_INT: + converters[column.label] = lambda x: AttributeValue(val_int=int(x)) + elif column.key.type == AttributeKey.TYPE_FLOAT: + converters[column.label] = lambda x: AttributeValue(val_float=float(x)) + elif column.HasField("aggregation"): + converters[column.label] = lambda x: AttributeValue(val_float=float(x)) + else: + raise BadSnubaRPCRequestException( + "column is neither an attribute or aggregation" + ) + + res: defaultdict[str, TraceItemColumnValues] = defaultdict(TraceItemColumnValues) + for row in data: + for column_name, value in row.items(): + if column_name in converters.keys(): + res[column_name].results.append(converters[column_name](value)) + res[column_name].attribute_name = column_name + extrapolation_context = ExtrapolationContext.from_row(column_name, row) + if extrapolation_context.is_extrapolated: + res[column_name].reliabilities.append( + extrapolation_context.reliability + ) + + column_ordering = {column.label: i for i, column in enumerate(request.columns)} + + return list( + # we return the columns in the order they were requested + sorted( + res.values(), key=lambda c: column_ordering.__getitem__(c.attribute_name) + ) + ) + + +def _get_page_token( + request: TraceItemTableRequest, response: list[TraceItemColumnValues] +) -> PageToken: + if not response: + return PageToken(offset=0) + num_rows = len(response[0].results) + return PageToken(offset=request.page_token.offset + num_rows) + + +class ResolverTraceItemTableEAPSpans(ResolverTraceItemTable): + @classmethod + def trace_item_name(cls) -> TraceItemName.ValueType: + return TraceItemName.TRACE_ITEM_NAME_EAP_SPANS + + def resolve(self, in_msg: TraceItemTableRequest) -> TraceItemTableResponse: + snuba_request = _build_snuba_request(in_msg) + res = run_query( + dataset=PluggableDataset(name="eap", all_entities=[]), + request=snuba_request, + timer=self._timer, + ) + column_values = _convert_results(in_msg, res.result.get("data", [])) + response_meta = extract_response_meta( + in_msg.meta.request_id, + in_msg.meta.debug, + [res], + [self._timer], + ) + return TraceItemTableResponse( + column_values=column_values, + page_token=_get_page_token(in_msg, column_values), + meta=response_meta, + ) diff --git a/snuba/web/rpc/v1/resolvers/__init__.py b/snuba/web/rpc/v1/resolvers/__init__.py new file mode 100644 index 00000000000..56e99044608 --- /dev/null +++ b/snuba/web/rpc/v1/resolvers/__init__.py @@ -0,0 +1,67 @@ +import os + +from sentry_protos.snuba.v1.endpoint_time_series_pb2 import ( + TimeSeriesRequest, + TimeSeriesResponse, +) +from sentry_protos.snuba.v1.endpoint_trace_item_attributes_pb2 import ( + TraceItemAttributeNamesRequest, + TraceItemAttributeNamesResponse, + TraceItemAttributeValuesRequest, + TraceItemAttributeValuesResponse, +) +from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import ( + TraceItemTableRequest, + TraceItemTableResponse, +) + +from snuba.utils.registered_class import import_submodules_in_directory +from snuba.web.rpc import TraceItemDataResolver + + +class ResolverTraceItemTable( + TraceItemDataResolver[TraceItemTableRequest, TraceItemTableResponse] +): + @classmethod + def endpoint_name(cls) -> str: + return "TraceItemTable" + + +class ResolverTimeSeries(TraceItemDataResolver[TimeSeriesRequest, TimeSeriesResponse]): + @classmethod + def endpoint_name(cls) -> str: + return "TimeSeries" + + +class ResolverAttributeNames( + TraceItemDataResolver[ + TraceItemAttributeNamesRequest, TraceItemAttributeNamesResponse + ] +): + @classmethod + def endpoint_name(cls) -> str: + return "AttributeNames" + + +class ResolverAttributeValues( + TraceItemDataResolver[ + TraceItemAttributeValuesRequest, TraceItemAttributeValuesResponse + ] +): + @classmethod + def endpoint_name(cls) -> str: + return "AttributeValues" + + +# TODO: Traces, subscriptions + + +_TO_IMPORT = {} + +for f in os.listdir(os.path.dirname(os.path.realpath(__file__))): + if f.startswith("R_"): + _TO_IMPORT[f] = os.path.join(os.path.dirname(os.path.realpath(__file__)), f) + + +for v, module_path in _TO_IMPORT.items(): + import_submodules_in_directory(module_path, f"snuba.web.rpc.v1.resolvers.{v}") diff --git a/tests/web/rpc/test_aggregation.py b/tests/web/rpc/test_aggregation.py index 48bba0fca6d..fd4efa4f386 100644 --- a/tests/web/rpc/test_aggregation.py +++ b/tests/web/rpc/test_aggregation.py @@ -9,7 +9,7 @@ Reliability, ) -from snuba.web.rpc.common.aggregation import ( +from snuba.web.rpc.v1.resolvers.R_eap_spans.common.aggregation import ( CUSTOM_COLUMN_PREFIX, CustomColumnInformation, ExtrapolationContext, diff --git a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py index e0bbd44fec9..17d56ae5579 100644 --- a/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py +++ b/tests/web/rpc/v1/test_endpoint_trace_item_table/test_endpoint_trace_item_table.py @@ -17,6 +17,7 @@ PageToken, RequestMeta, ResponseMeta, + TraceItemName, ) from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( AttributeAggregation, @@ -153,6 +154,7 @@ def test_basic(self) -> None: referrer="something", start_timestamp=ts, end_timestamp=ts, + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), filter=TraceItemFilter( exists_filter=ExistsFilter( @@ -191,6 +193,7 @@ def test_with_data(self, setup_teardown: Any) -> None: start_timestamp=Timestamp(seconds=hour_ago), end_timestamp=ts, request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), filter=TraceItemFilter( exists_filter=ExistsFilter( @@ -238,6 +241,7 @@ def test_booleans_and_number_compares(self, setup_teardown: Any) -> None: start_timestamp=Timestamp(seconds=hour_ago), end_timestamp=ts, request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), filter=TraceItemFilter( or_filter=OrFilter( @@ -320,6 +324,7 @@ def test_with_virtual_columns(self, setup_teardown: Any) -> None: start_timestamp=Timestamp(seconds=hour_ago), end_timestamp=ts, request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480", + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), filter=TraceItemFilter( exists_filter=ExistsFilter( @@ -415,6 +420,7 @@ def test_order_by_virtual_columns(self, setup_teardown: Any) -> None: referrer="something", start_timestamp=Timestamp(seconds=hour_ago), end_timestamp=ts, + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), filter=TraceItemFilter( exists_filter=ExistsFilter( @@ -463,6 +469,7 @@ def test_table_with_aggregates(self, setup_teardown: Any) -> None: referrer="something", start_timestamp=Timestamp(seconds=hour_ago), end_timestamp=ts, + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), filter=TraceItemFilter( exists_filter=ExistsFilter( @@ -545,6 +552,7 @@ def test_table_with_columns_not_in_groupby(self, setup_teardown: Any) -> None: referrer="something", start_timestamp=Timestamp(seconds=hour_ago), end_timestamp=ts, + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), columns=[ Column( @@ -585,6 +593,7 @@ def test_order_by_non_selected(self) -> None: referrer="something", start_timestamp=Timestamp(seconds=hour_ago), end_timestamp=ts, + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), filter=TraceItemFilter( exists_filter=ExistsFilter( @@ -639,6 +648,7 @@ def test_order_by_aggregation(self, setup_teardown: Any) -> None: referrer="something", start_timestamp=Timestamp(seconds=hour_ago), end_timestamp=ts, + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), filter=TraceItemFilter( exists_filter=ExistsFilter( @@ -710,6 +720,7 @@ def test_aggregation_on_attribute_column(self) -> None: referrer="something", start_timestamp=Timestamp(seconds=hour_ago), end_timestamp=ts, + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), columns=[ Column( @@ -741,6 +752,7 @@ def test_different_column_label_and_attr_name(self, setup_teardown: Any) -> None end_timestamp=ts, referrer="something", cogs_category="something", + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), columns=[ Column( @@ -782,6 +794,7 @@ def test_cast_bug(self, setup_teardown: Any) -> None: "projectIds": ["1"], "startTimestamp": hour_ago.ToJsonString(), "endTimestamp": ts.ToJsonString(), + "traceItemName": "TRACE_ITEM_NAME_EAP_SPANS", }, "columns": [ { @@ -972,6 +985,7 @@ def test_table_with_disallowed_group_by_columns(self, setup_teardown: Any) -> No referrer="something", start_timestamp=Timestamp(seconds=hour_ago), end_timestamp=ts, + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), columns=[ Column( @@ -1004,6 +1018,7 @@ def test_table_with_group_by_columns_without_aggregation( referrer="something", start_timestamp=Timestamp(seconds=hour_ago), end_timestamp=ts, + trace_item_name=TraceItemName.TRACE_ITEM_NAME_EAP_SPANS, ), columns=[ Column(