From ea7332158520b8e31696f63fbcb9d9a90893658a Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 8 Jan 2025 17:51:38 +0100 Subject: [PATCH 01/10] fix(ci): Load snuba-ci image again (#6725) followup to #6721 It seems that by using `outputs:` in the previous step, we don't load the image into docker but _only_ into the local filesystem. this breaks docker push on master. --- .github/workflows/ci.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c8fcc14642..dc0929854e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -170,6 +170,11 @@ jobs: # cache-to is re-implemented using the next step, to run conditionally for contributors outputs: type=docker,dest=/tmp/snuba-ci.tar + - name: Load snuba-ci image + run: | + docker load --input /tmp/snuba-ci.tar + docker image ls -a + - name: Publish snuba-ci image to registry if: steps.branch.outputs.branch == 'master' || github.event.pull_request.head.repo.full_name == github.repository # outside contributors won't be able to push to the docker registry From 06a6865bf42b134f8f9d3a4b95b023c3d6c38f75 Mon Sep 17 00:00:00 2001 From: Lyn Nagara <1779792+lynnagara@users.noreply.github.com> Date: Wed, 8 Jan 2025 08:54:15 -0800 Subject: [PATCH 02/10] ref: Remove python replays processor (#5672) This is rust now --------- Co-authored-by: Markus Unterwaditzer Co-authored-by: Colton Allen --- .../datasets/processors/replays_processor.py | 233 ------- tests/datasets/test_replays_processor.py | 635 ------------------ 2 files changed, 868 deletions(-) delete mode 100644 tests/datasets/test_replays_processor.py diff --git a/snuba/datasets/processors/replays_processor.py b/snuba/datasets/processors/replays_processor.py index 04e8bb2c6a..a3c5a68815 100644 --- a/snuba/datasets/processors/replays_processor.py +++ b/snuba/datasets/processors/replays_processor.py @@ -1,239 +1,6 @@ -from __future__ import annotations - -import dataclasses -import logging -import uuid -from datetime import datetime, timezone -from hashlib import md5 -from typing import Any, Callable, Mapping, TypeVar - -import rapidjson - -from snuba import environment from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor -from snuba.processor import _collapse_uint16, _collapse_uint32 -from snuba.util import force_bytes -from snuba.utils.metrics.wrapper import MetricsWrapper - -logger = logging.getLogger(__name__) - -metrics = MetricsWrapper(environment.metrics, "replays.processor") - -ReplayEventDict = Mapping[Any, Any] -RetentionDays = int - -# Limit for error_ids / trace_ids / urls array elements -LIST_ELEMENT_LIMIT = 1000 -MAX_CLICK_EVENTS = 20 - -USER_FIELDS_PRECEDENCE = ("user_id", "username", "email", "ip_address") -LOG_LEVELS = ["fatal", "error", "warning", "info", "debug"] class ReplaysProcessor(RustCompatProcessor): def __init__(self) -> None: super().__init__("ReplaysProcessor") - - -T = TypeVar("T") -U = TypeVar("U") - - -def segment_id_to_event_hash(segment_id: int | None) -> str: - if segment_id is None: - # Rows with null segment_id fields are considered "out of band" meaning they do not - # originate from the SDK and do not relate to a specific segment. - # - # For example: archive requests. - return str(uuid.uuid4()) - else: - segment_id_bytes = force_bytes(str(segment_id)) - segment_hash = md5(segment_id_bytes).hexdigest() - return to_uuid(segment_hash) - - -def default(default: Callable[[], T], value: T | None) -> T: - """Return a default value only if the given value was null. - - Falsey types such as 0, "", False, [], {} are returned. - """ - return default() if value is None else value - - -def maybe(into: Callable[[T], U], value: T | None) -> U | None: - """Optionally return a processed value.""" - return None if value is None else into(value) - - -def to_datetime(value: Any) -> datetime: - """Return a datetime instance or err. - - Datetimes for the replays schema standardize on 32 bit dates. - """ - return _timestamp_to_datetime(_collapse_or_err(_collapse_uint32, int(value))) - - -def to_uint32(value: Any) -> int: - return _collapse_or_err(_collapse_uint32, int(value)) - - -def to_uint1(value: Any) -> int: - int_value = int(value) - if int_value == 0 or int_value == 1: - return int_value - else: - raise ValueError("Value must be 0 or 1") - - -def to_uint16(value: Any) -> int: - return _collapse_or_err(_collapse_uint16, int(value)) - - -def to_string(value: Any) -> str: - """Return a string or err. - - This function follows the lead of "snuba.processors._unicodify" and enforces UTF-8 - encoding. - """ - if isinstance(value, (bool, dict, list)): - result: str = rapidjson.dumps(value) - return _encode_utf8(result) - elif value is None: - return "" - else: - return _encode_utf8(str(value)) - - -def to_capped_string(capacity: int, value: Any) -> str: - """Return a capped string.""" - return to_string(value)[:capacity] - - -def to_enum(enumeration: list[str]) -> Callable[[Any], str | None]: - def inline(value: Any) -> str | None: - for enum in enumeration: - if value == enum: - return enum - return None - - return inline - - -def to_capped_list(metric_name: str, value: Any) -> list[Any]: - """Return a list of values capped to the maximum allowable limit.""" - return _capped_list(metric_name, default(list, maybe(_is_list, value))) - - -def to_typed_list(callable: Callable[[Any], T], values: list[Any]) -> list[T]: - return list(map(callable, filter(lambda value: value is not None, values))) - - -def to_uuid(value: Any) -> str: - """Return a stringified uuid or err.""" - return str(uuid.UUID(str(value))) - - -def raise_on_null(field: str, value: Any) -> Any: - if value is None: - raise ValueError(f"Missing data for required field: {field}") - return value - - -def _is_list(value: Any) -> list[Any]: - if isinstance(value, list): - return value - else: - raise TypeError( - f"Invalid type specified. Expected list; received {type(value)} with a value of " - f"{value}" - ) - - -def _encode_utf8(value: str) -> str: - """Return a utf-8 encoded string.""" - return value.encode("utf8", errors="backslashreplace").decode("utf8") - - -def _capped_list(metric_name: str, value: list[Any]) -> list[Any]: - """Return a list with a maximum configured length.""" - if len(value) > LIST_ELEMENT_LIMIT: - metrics.increment(f'"{metric_name}" exceeded maximum length.') - - return value[:LIST_ELEMENT_LIMIT] - - -def _collapse_or_err(callable: Callable[[int], int | None], value: int) -> int: - """Return the integer or error if it overflows.""" - if callable(value) is None: - # This exception can only be triggered through abuse. We choose not to suppress these - # exceptions in favor of identifying the origin. - raise ValueError(f'Integer "{value}" overflowed.') - else: - return value - - -def _timestamp_to_datetime(timestamp: int) -> datetime: - """Convert an integer timestamp to a timezone-aware utc datetime instance.""" - return datetime.fromtimestamp(timestamp, tz=timezone.utc) - - -# Tags processor. - - -@dataclasses.dataclass -class Tag: - keys: list[str] - values: list[str] - transaction: str | None - - @classmethod - def empty_set(cls) -> Tag: - return cls([], [], None) - - -def process_tags_object(value: Any) -> Tag: - if value is None: - return Tag.empty_set() - - # Excess tags are trimmed. - tags = _capped_list("tags", normalize_tags(value)) - - keys = [] - values = [] - transaction = None - - for key, value in tags: - # Keys and values are stored as optional strings regardless of their input type. - parsed_key, parsed_value = to_string(key), maybe(to_string, value) - - if key == "transaction": - transaction = parsed_value - elif parsed_value is not None: - keys.append(parsed_key) - values.append(parsed_value) - - return Tag(keys=keys, values=values, transaction=transaction) - - -def normalize_tags(value: Any) -> list[tuple[str, str]]: - """Normalize tags payload to a single format.""" - if isinstance(value, dict): - return _coerce_tags_dictionary(value) - elif isinstance(value, list): - return _coerce_tags_tuple_list(value) - else: - raise TypeError(f'Invalid tags type specified: "{type(value)}"') - - -def _coerce_tags_dictionary(tags: dict[str, Any]) -> list[tuple[str, str]]: - """Return a list of tag tuples from an unspecified dictionary.""" - return [(key, value) for key, value in tags.items() if isinstance(key, str)] - - -def _coerce_tags_tuple_list(tags_list: list[Any]) -> list[tuple[str, str]]: - """Return a list of tag tuples from an unspecified list of values.""" - return [ - (item[0], item[1]) - for item in tags_list - if (isinstance(item, (list, tuple)) and len(item) == 2) - ] diff --git a/tests/datasets/test_replays_processor.py b/tests/datasets/test_replays_processor.py deleted file mode 100644 index ad9be0aa2c..0000000000 --- a/tests/datasets/test_replays_processor.py +++ /dev/null @@ -1,635 +0,0 @@ -from __future__ import annotations - -import json -import uuid -from dataclasses import dataclass -from datetime import datetime, timezone -from typing import Any, Mapping, Optional - -import pytest - -from snuba.consumers.types import KafkaMessageMetadata -from snuba.datasets.processors.replays_processor import ( - ReplaysProcessor, - maybe, - segment_id_to_event_hash, - to_capped_list, - to_uuid, -) -from snuba.processor import InsertBatch - -LOG_LEVELS = ["fatal", "error", "warning", "info", "debug"] - - -@dataclass -class ReplayEvent: - replay_id: str - replay_type: str | None - event_hash: str | None - error_sample_rate: float | None - session_sample_rate: float | None - segment_id: Any - trace_ids: Any - error_ids: Any - urls: Any - is_archived: int | None - timestamp: Any - replay_start_timestamp: Any - platform: Any - environment: Any - release: Any - dist: Any - ipv4: str | None - ipv6: str | None - user_name: Any - user_id: Any - user_email: Any - os_name: Any - os_version: Any - browser_name: Any - browser_version: Any - device_name: Any - device_brand: Any - device_family: Any - device_model: Any - sdk_name: Any - sdk_version: Any - title: str | None - - @classmethod - def empty_set(cls) -> ReplayEvent: - return cls( - replay_id="e5e062bf2e1d4afd96fd2f90b6770431", - timestamp=int(datetime.now(timezone.utc).timestamp()), - segment_id=None, - replay_type=None, - event_hash=None, - error_sample_rate=None, - session_sample_rate=None, - title=None, - error_ids=[], - trace_ids=[], - replay_start_timestamp=None, - platform=None, - dist=None, - urls=[], - is_archived=None, - os_name=None, - os_version=None, - browser_name=None, - browser_version=None, - device_name=None, - device_brand=None, - device_family=None, - device_model=None, - user_name=None, - user_id=None, - user_email=None, - ipv4=None, - ipv6=None, - environment=None, - release=None, - sdk_name=None, - sdk_version=None, - ) - - def serialize( - self, header_overrides: Optional[dict[Any, Any]] = None - ) -> Mapping[Any, Any]: - replay_event: Any = { - "type": "replay_event", - "replay_id": self.replay_id, - "replay_type": self.replay_type, - "segment_id": self.segment_id, - "event_hash": self.event_hash, - "tags": [["customtag", "is_set"], ["transaction", self.title]], - "urls": self.urls, - "is_archived": self.is_archived, - "trace_ids": self.trace_ids, - "error_ids": self.error_ids, - "dist": self.dist, - "platform": self.platform, - "timestamp": _datetime_to_timestamp(self.timestamp), - "replay_start_timestamp": _datetime_to_timestamp( - self.replay_start_timestamp - ), - "environment": self.environment, - "release": self.release, - "user": { - "id": self.user_id, - "username": self.user_name, - "email": self.user_email, - "ip_address": self.ipv4, - }, - "sdk": { - "name": self.sdk_name, - "version": self.sdk_version, - }, - "contexts": { - "trace": { - "op": "pageload", - "span_id": "affa5649681a1eeb", - "trace_id": "23eda6cd4b174ef8a51f0096df3bfdd1", - }, - "replay": { - "error_sample_rate": self.error_sample_rate, - "session_sample_rate": self.session_sample_rate, - }, - "os": { - "name": self.os_name, - "version": self.os_version, - }, - "browser": { - "name": self.browser_name, - "version": self.browser_version, - }, - "device": { - "name": self.device_name, - "brand": self.device_brand, - "family": self.device_family, - "model": self.device_model, - }, - }, - "request": { - "url": "Doesn't matter not ingested.", - "headers": {"User-Agent": "not used"}, - }, - "extra": {}, - } - - headers = { - "start_time": datetime.now().timestamp(), - "type": "replay_event", - "replay_id": self.replay_id, - "project_id": 1, - "retention_days": 30, - } - headers.update(header_overrides or {}) - return { - **headers, - **{"payload": list(bytes(json.dumps(replay_event).encode()))}, - } - - def _user_field(self) -> Any | None: - user_fields = [ - self.user_id, - self.user_name, - self.user_email, - self.ipv4, - self.ipv6, - ] - for f in user_fields: - if f is not None: - return f - return None - - def build_result(self, meta: KafkaMessageMetadata) -> Mapping[str, Any]: - event_hash = segment_id_to_event_hash(self.segment_id) - - ret = { - "project_id": 1, - "replay_id": str(uuid.UUID(self.replay_id)), - "replay_type": self.replay_type, - "error_sample_rate": self.error_sample_rate, - "session_sample_rate": self.session_sample_rate, - "event_hash": self.event_hash or event_hash, - "segment_id": self.segment_id, - "trace_ids": list( - map(to_uuid, to_capped_list("trace_ids", self.trace_ids)) - ), - "error_ids": list( - map(to_uuid, to_capped_list("trace_ids", self.error_ids)) - ), - "timestamp": maybe(int, self.timestamp), - "replay_start_timestamp": maybe(int, self.replay_start_timestamp), - "platform": self.platform, - "environment": self.environment, - "release": self.release, - "dist": self.dist, - "urls": to_capped_list("urls", self.urls), - "is_archived": 1 if self.is_archived is True else None, - "user": (self.user_name or self.user_email or self.ipv4 or self.ipv6 or ""), - "user_id": self.user_id, - "user_name": self.user_name, - "user_email": self.user_email, - "os_name": self.os_name, - "os_version": self.os_version, - "browser_name": self.browser_name, - "browser_version": self.browser_version, - "device_name": self.device_name, - "device_brand": self.device_brand, - "device_family": self.device_family, - "device_model": self.device_model, - "tags.key": ["customtag"], - "tags.value": ["is_set"], - "title": self.title, - "sdk_name": "sentry.python", - "sdk_version": "0.9.0", - "retention_days": 30, - "offset": meta.offset, - "partition": meta.partition, - } - user = self._user_field() - - if user: - ret["user"] = user - - if self.ipv4: - ret["ip_address_v4"] = self.ipv4 - elif self.ipv6: - ret["ip_address_v6"] = self.ipv6 - return ret - - -class TestReplaysProcessor: - def test_process_message(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - header_overrides = { - "start_time": int(datetime.now(tz=timezone.utc).timestamp()) - } - message = ReplayEvent( - replay_id="e5e062bf2e1d4afd96fd2f90b6770431", - replay_type="session", - event_hash=None, - error_sample_rate=0.5, - session_sample_rate=0.5, - title="/organizations/:orgId/issues/", - error_ids=["36e980a9c6024cde9f5d089f15b83b5f"], - trace_ids=[ - "36e980a9c6024cde9f5d089f15b83b5f", - "8bea4461d8b944f393c15a3cb1c4169a", - ], - segment_id=0, - timestamp=int(datetime.now(tz=timezone.utc).timestamp()), - replay_start_timestamp=int(datetime.now(tz=timezone.utc).timestamp()), - platform="python", - dist="", - urls=["http://127.0.0.1:8001"], - is_archived=True, - user_name="me", - user_id="232", - user_email="test@test.com", - os_name="iOS", - os_version="16.2", - browser_name="Chrome", - browser_version="103.0.38", - device_name="iPhone 11", - device_brand="Apple", - device_family="iPhone", - device_model="iPhone", - ipv4="127.0.0.1", - ipv6=None, - environment="prod", - release="34a554c14b68285d8a8eb6c5c4c56dfc1db9a83a", - sdk_name="sentry.python", - sdk_version="0.9.0", - ) - - expected = InsertBatch( - [message.build_result(meta)], - datetime.utcfromtimestamp(header_overrides["start_time"]), - ).rows[0] - - response = ( - ReplaysProcessor() - .process_message(message.serialize(header_overrides), meta) - .rows[0] - ) - - assert response | expected == response - - def test_process_message_minimal_payload_segment_id(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - minimal_payload = { - "type": "replay_event", - "start_time": datetime.now().timestamp(), - "replay_id": str(uuid.uuid4()), - "project_id": 1, - "retention_days": 30, - "payload": list( - bytes( - json.dumps( - { - "type": "replay_event", - "replay_id": str(uuid.uuid4()), - "timestamp": int(datetime.now().timestamp()), - "segment_id": 0, - "platform": "internal", - } - ).encode() - ) - ), - } - - # Asserting that the minimal payload was successfully processed. - result = ReplaysProcessor().process_message(minimal_payload, meta) - assert isinstance(result, InsertBatch) - assert len(result.rows) == 1 - assert len(result.rows[0]["event_hash"]) == 36 - - def test_process_message_minimal_payload_event_hash(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - event_hash = uuid.uuid4().hex - minimal_payload = { - "type": "replay_event", - "start_time": datetime.now().timestamp(), - "replay_id": str(uuid.uuid4()), - "project_id": 1, - "retention_days": 30, - "payload": list( - bytes( - json.dumps( - { - "type": "replay_event", - "replay_id": str(uuid.uuid4()), - "timestamp": int(datetime.now().timestamp()), - "event_hash": event_hash, - "platform": "internal", - } - ).encode() - ) - ), - } - - # Asserting that the minimal payload was successfully processed. - result = ReplaysProcessor().process_message(minimal_payload, meta) - assert isinstance(result, InsertBatch) - assert len(result.rows) == 1 - assert result.rows[0]["event_hash"] == str(uuid.UUID(event_hash)) - - def test_process_message_nulls(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - message = ReplayEvent.empty_set() - - processed_batch = ReplaysProcessor().process_message(message.serialize(), meta) - assert isinstance(processed_batch, InsertBatch) # required for type checker - received = processed_batch.rows[0] - assert isinstance(received, dict) # required for type checker - received_event_hash = received.pop("event_hash") - - expected = message.build_result(meta) - assert isinstance(expected, dict) # required for type checker - assert received_event_hash != expected.pop("event_hash") # hash is random - - # Sample rates default to -1.0 which is an impossible state for the field. - assert received["error_sample_rate"] == -1.0 - assert received["session_sample_rate"] == -1.0 - - assert received["platform"] == "" - assert received["replay_type"] == "" - assert received["dist"] == "" - assert received["user_name"] == "" - assert received["user_id"] == "" - assert received["user_email"] == "" - assert received["os_name"] == "" - assert received["os_version"] == "" - assert received["browser_name"] == "" - assert received["browser_version"] == "" - assert received["device_name"] == "" - assert received["device_brand"] == "" - assert received["device_family"] == "" - assert received["device_model"] == "" - assert received["environment"] == "" - assert received["release"] == "" - assert received["sdk_name"] == "" - assert received["sdk_version"] == "" - assert received["replay_start_timestamp"] is None - - def test_process_message_invalid_segment_id(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - message = ReplayEvent.empty_set() - - with pytest.raises(Exception): - message.segment_id = "a" - ReplaysProcessor().process_message(message.serialize(), meta) - - with pytest.raises(Exception): - message.segment_id = -1 - ReplaysProcessor().process_message(message.serialize(), meta) - - with pytest.raises(Exception): - message.segment_id = 2**16 - ReplaysProcessor().process_message(message.serialize(), meta) - - message.segment_id = 2**16 - 1 - ReplaysProcessor().process_message(message.serialize(), meta) - - def test_process_message_invalid_timestamp(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - message = ReplayEvent.empty_set() - - with pytest.raises(Exception): - message.timestamp = f"{2**32 - 1}" - ReplaysProcessor().process_message(message.serialize(), meta) - - message.timestamp = 2**32 - 1 - ReplaysProcessor().process_message(message.serialize(), meta) - - message.timestamp = -1 - ReplaysProcessor().process_message(message.serialize(), meta) - - message.timestamp = 2**32 - ReplaysProcessor().process_message(message.serialize(), meta) - - def test_process_message_invalid_replay_start_timestamp(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - message = ReplayEvent.empty_set() - - with pytest.raises(Exception): - message.replay_start_timestamp = f"{2**32 - 1}" - ReplaysProcessor().process_message(message.serialize(), meta) - - message.replay_start_timestamp = -1 - ReplaysProcessor().process_message(message.serialize(), meta) - - message.replay_start_timestamp = 2**32 - ReplaysProcessor().process_message(message.serialize(), meta) - - message.replay_start_timestamp = 2**32 - 1 - ReplaysProcessor().process_message(message.serialize(), meta) - - -class TestReplaysActionProcessor: - def test_replay_actions(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - now = datetime.now(tz=timezone.utc).replace(microsecond=0) - - message = { - "type": "replay_event", - "start_time": datetime.now().timestamp(), - "replay_id": "bb570198b8f04f8bbe87077668530da7", - "project_id": 1, - "retention_days": 30, - "payload": list( - bytes( - json.dumps( - { - "type": "replay_actions", - "replay_id": "bb570198b8f04f8bbe87077668530da7", - "clicks": [ - { - "node_id": 59, - "tag": "div", - "id": "id", - "class": ["class1", "class2"], - "role": "button", - "aria_label": "test", - "alt": "", - "testid": "", - "title": "", - "text": "text", - "component_name": "SignUpButton", - "timestamp": int(now.timestamp()), - "event_hash": "df3c3aa2daae465e89f1169e49139827", - "is_dead": 0, - "is_rage": 1, - } - ], - } - ).encode() - ) - ), - } - - result = ReplaysProcessor().process_message(message, meta) - assert isinstance(result, InsertBatch) - rows = result.rows - assert len(rows) == 1 - - row = rows[0] - assert row["project_id"] == 1 - assert row["timestamp"] == int(now.timestamp()) - assert row["replay_id"] == str(uuid.UUID("bb570198b8f04f8bbe87077668530da7")) - assert row["event_hash"] == str(uuid.UUID("df3c3aa2daae465e89f1169e49139827")) - assert row["segment_id"] is None - assert row["trace_ids"] == [] - assert row["error_ids"] == [] - assert row["urls"] == [] - assert row["platform"] == "" - assert row["user"] == "" - assert row["sdk_name"] == "" - assert row["sdk_version"] == "" - assert row["retention_days"] == 30 - assert row["partition"] == 0 - assert row["offset"] == 0 - assert row["click_node_id"] == 59 - assert row["click_tag"] == "div" - assert row["click_id"] == "id" - assert row["click_class"] == ["class1", "class2"] - assert row["click_aria_label"] == "test" - assert row["click_role"] == "button" - assert row["click_text"] == "text" - assert row["click_alt"] == "" - assert row["click_testid"] == "" - assert row["click_title"] == "" - assert row["click_component_name"] == "SignUpButton" - assert row["click_is_dead"] == 0 - assert row["click_is_rage"] == 1 - - -from hashlib import md5 - - -def make_payload_for_event_link(severity: str) -> tuple[dict[str, Any], str, str]: - now = datetime.now(tz=timezone.utc).replace(microsecond=0) - replay_id = "bb570198b8f04f8bbe87077668530da7" - event_id = uuid.uuid4().hex - message = { - "type": "replay_event", - "start_time": datetime.now().timestamp(), - "replay_id": replay_id, - "project_id": 1, - "retention_days": 30, - "payload": list( - bytes( - json.dumps( - { - "type": "event_link", - "replay_id": replay_id, - severity + "_id": event_id, - "timestamp": int(now.timestamp()), - "event_hash": md5( - (replay_id + event_id).encode("utf-8") - ).hexdigest(), - } - ).encode() - ) - ), - } - return (message, event_id, severity) - - -@pytest.mark.parametrize( - "event_link_message", - [pytest.param(make_payload_for_event_link(log_level)) for log_level in LOG_LEVELS], -) -def test_replay_event_links( - event_link_message: tuple[dict[str, Any], str, str] -) -> None: - message, event_id, severity = event_link_message - - meta = KafkaMessageMetadata(offset=0, partition=0, timestamp=datetime(1970, 1, 1)) - - result = ReplaysProcessor().process_message(message, meta) - assert isinstance(result, InsertBatch) - rows = result.rows - assert len(rows) == 1 - - row = rows[0] - assert row["project_id"] == 1 - assert "timestamp" in row - assert row[severity + "_id"] == str(uuid.UUID(event_id)) - assert row["replay_id"] == str(uuid.UUID(message["replay_id"])) - assert row["event_hash"] == str( - uuid.UUID(md5((message["replay_id"] + event_id).encode("utf-8")).hexdigest()) - ) - assert row["segment_id"] is None - assert row["retention_days"] == 30 - assert row["partition"] == 0 - assert row["offset"] == 0 - - -@pytest.mark.parametrize( - "event_link_message", - [pytest.param(make_payload_for_event_link("not_valid"))], -) -def test_replay_event_links_invalid_severity( - event_link_message: tuple[dict[str, Any], str, str] -) -> None: - message, _, _ = event_link_message - - meta = KafkaMessageMetadata(offset=0, partition=0, timestamp=datetime(1970, 1, 1)) - - with pytest.raises(Exception): - ReplaysProcessor().process_message(message, meta) - - -def _datetime_to_timestamp(value: Any) -> Any: - if isinstance(value, datetime): - return value.timestamp() - else: - return value From e3cb9aad051acfe1bfa34c5a77e0558b235f6353 Mon Sep 17 00:00:00 2001 From: Josh Ferge Date: Wed, 8 Jan 2025 14:16:59 -0500 Subject: [PATCH 03/10] feat(uptime): storage configuration and and consumer (#6697) --- rust_snuba/src/processors/mod.rs | 3 +- ...snuba-uptime-results__1__failure.json.snap | 25 +++ ...nuba-uptime-results__1__succeess.json.snap | 25 +++ ...snuba-uptime-results__1__timeout.json.snap | 25 +++ .../src/processors/uptime_monitor_checks.rs | 158 ++++++++++++++++++ scripts/load_uptime_checks.py | 67 ++++++++ .../storages/uptime_monitor_checks.yaml | 61 +++++++ .../processors/uptime_monitors_processor.py | 14 ++ snuba/datasets/storages/validator.py | 5 +- snuba/utils/streams/topics.py | 1 + 10 files changed, 382 insertions(+), 2 deletions(-) create mode 100644 rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__failure.json.snap create mode 100644 rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__succeess.json.snap create mode 100644 rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__timeout.json.snap create mode 100644 rust_snuba/src/processors/uptime_monitor_checks.rs create mode 100644 scripts/load_uptime_checks.py create mode 100644 snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml create mode 100644 snuba/datasets/processors/uptime_monitors_processor.py diff --git a/rust_snuba/src/processors/mod.rs b/rust_snuba/src/processors/mod.rs index b4667db3d4..8ad75fe7b7 100644 --- a/rust_snuba/src/processors/mod.rs +++ b/rust_snuba/src/processors/mod.rs @@ -9,8 +9,8 @@ mod querylog; mod release_health_metrics; mod replays; mod spans; +mod uptime_monitor_checks; mod utils; - use crate::config::ProcessorConfig; use crate::types::{InsertBatch, InsertOrReplacement, KafkaMessageMetadata}; use sentry_arroyo::backends::kafka::types::KafkaPayload; @@ -54,6 +54,7 @@ define_processing_functions! { ("ProfilesMessageProcessor", "processed-profiles", ProcessingFunctionType::ProcessingFunction(profiles::process_message)), ("QuerylogProcessor", "snuba-queries", ProcessingFunctionType::ProcessingFunction(querylog::process_message)), ("ReplaysProcessor", "ingest-replay-events", ProcessingFunctionType::ProcessingFunction(replays::process_message)), + ("UptimeMonitorChecksProcessor", "snuba-uptime-results", ProcessingFunctionType::ProcessingFunction(uptime_monitor_checks::process_message)), ("SpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(spans::process_message)), ("EAPSpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(eap_spans::process_message)), ("OutcomesProcessor", "outcomes", ProcessingFunctionType::ProcessingFunction(outcomes::process_message)), diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__failure.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__failure.json.snap new file mode 100644 index 0000000000..b4fce9485a --- /dev/null +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__failure.json.snap @@ -0,0 +1,25 @@ +--- +source: src/processors/mod.rs +description: "{\n \"guid\": \"54afc7ed9c53491481919c931f75bae1\",\n \"subscription_id\": \"5421b5df80744113a6b57014f01a3a42\",\n \"status\": \"failure\",\n \"status_reason\": {\n \"type\": \"dns_error\",\n \"description\": \"Unable to resolve hostname example.xyz\"\n },\n \"trace_id\": \"947efba02dac463b9c1d886a44bafc94\",\n \"span_id\": \"58e84098e63f42e1\",\n \"scheduled_check_time_ms\": 1717614062978,\n \"actual_check_time_ms\": 1717614068008,\n \"duration_ms\": 100,\n \"request_info\": {\n \"request_type\": \"HEAD\",\n \"http_status_code\": 500\n },\n \"organization_id\": 123,\n \"project_id\": 456,\n \"retention_days\": 90,\n \"region_slug\": \"us-east-1\"\n}\n" +expression: snapshot_payload +--- +[ + { + "check_status": "failure", + "check_status_reason": "dns_error", + "duration": 100, + "environment": null, + "http_status_code": 500, + "offset": 1, + "organization_id": 123, + "partition": 0, + "project_id": 456, + "region_slug": "us-east-1", + "retention_days": 90, + "scheduled_check_time": 1717614062978.0, + "timestamp": 1717614068008.0, + "trace_id": "947efba0-2dac-463b-9c1d-886a44bafc94", + "uptime_check_id": "54afc7ed-9c53-4914-8191-9c931f75bae1", + "uptime_subscription_id": "5421b5df-8074-4113-a6b5-7014f01a3a42" + } +] diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__succeess.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__succeess.json.snap new file mode 100644 index 0000000000..38bb4035fc --- /dev/null +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__succeess.json.snap @@ -0,0 +1,25 @@ +--- +source: src/processors/mod.rs +description: "{\n \"guid\": \"54afc7ed9c53491481919c931f75bae1\",\n \"subscription_id\": \"5421b5df80744113a6b57014f01a3a42\",\n \"status\": \"success\",\n \"status_reason\": null,\n \"trace_id\": \"947efba02dac463b9c1d886a44bafc94\",\n \"span_id\": \"58e84098e63f42e1\",\n \"scheduled_check_time_ms\": 1717614062978,\n \"actual_check_time_ms\": 1717614068008,\n \"duration_ms\": 50,\n \"request_info\": {\n \"request_type\": \"HEAD\",\n \"http_status_code\": 200\n },\n \"organization_id\": 123,\n \"project_id\": 456,\n \"retention_days\": 90,\n \"region_slug\": \"us-east-1\"\n}\n" +expression: snapshot_payload +--- +[ + { + "check_status": "success", + "check_status_reason": null, + "duration": 50, + "environment": null, + "http_status_code": 200, + "offset": 1, + "organization_id": 123, + "partition": 0, + "project_id": 456, + "region_slug": "us-east-1", + "retention_days": 90, + "scheduled_check_time": 1717614062978.0, + "timestamp": 1717614068008.0, + "trace_id": "947efba0-2dac-463b-9c1d-886a44bafc94", + "uptime_check_id": "54afc7ed-9c53-4914-8191-9c931f75bae1", + "uptime_subscription_id": "5421b5df-8074-4113-a6b5-7014f01a3a42" + } +] diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__timeout.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__timeout.json.snap new file mode 100644 index 0000000000..cf453b837a --- /dev/null +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__timeout.json.snap @@ -0,0 +1,25 @@ +--- +source: src/processors/mod.rs +description: "{\n \"guid\": \"54afc7ed9c53491481919c931f75bae1\",\n \"subscription_id\": \"5421b5df80744113a6b57014f01a3a42\",\n \"status\": \"failure\",\n \"status_reason\": {\n \"type\": \"timeout\",\n \"description\": \"Check timed out\"\n },\n \"trace_id\": \"947efba02dac463b9c1d886a44bafc94\",\n \"span_id\": \"58e84098e63f42e1\",\n \"scheduled_check_time_ms\": 1717614062978,\n \"actual_check_time_ms\": 1717614068008,\n \"duration_ms\": 100,\n \"request_info\": {\n \"request_type\": \"HEAD\",\n \"http_status_code\": null\n },\n \"organization_id\": 123,\n \"project_id\": 456,\n \"retention_days\": 90,\n \"region_slug\": \"us-east-1\"\n}\n" +expression: snapshot_payload +--- +[ + { + "check_status": "failure", + "check_status_reason": "timeout", + "duration": 100, + "environment": null, + "http_status_code": null, + "offset": 1, + "organization_id": 123, + "partition": 0, + "project_id": 456, + "region_slug": "us-east-1", + "retention_days": 90, + "scheduled_check_time": 1717614062978.0, + "timestamp": 1717614068008.0, + "trace_id": "947efba0-2dac-463b-9c1d-886a44bafc94", + "uptime_check_id": "54afc7ed-9c53-4914-8191-9c931f75bae1", + "uptime_subscription_id": "5421b5df-8074-4113-a6b5-7014f01a3a42" + } +] diff --git a/rust_snuba/src/processors/uptime_monitor_checks.rs b/rust_snuba/src/processors/uptime_monitor_checks.rs new file mode 100644 index 0000000000..0066e701f4 --- /dev/null +++ b/rust_snuba/src/processors/uptime_monitor_checks.rs @@ -0,0 +1,158 @@ +use crate::config::ProcessorConfig; +use anyhow::Context; +use chrono::DateTime; +use sentry_arroyo::backends::kafka::types::KafkaPayload; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::types::{InsertBatch, KafkaMessageMetadata}; + +pub fn process_message( + payload: KafkaPayload, + metadata: KafkaMessageMetadata, + _config: &ProcessorConfig, +) -> anyhow::Result { + let payload_bytes = payload.payload().context("Expected payload")?; + let (rows, origin_timestamp) = + deserialize_message(payload_bytes, metadata.partition, metadata.offset)?; + + InsertBatch::from_rows(rows, DateTime::from_timestamp(origin_timestamp as i64, 0)) +} + +pub fn deserialize_message( + payload: &[u8], + partition: u16, + offset: u64, +) -> anyhow::Result<(Vec, f64)> { + let monitor_message: UptimeMonitorCheckMessage = serde_json::from_slice(payload)?; + + let rows = vec![UptimeMonitorCheckRow { + organization_id: monitor_message.organization_id, + project_id: monitor_message.project_id, + environment: monitor_message.environment, + uptime_subscription_id: monitor_message.subscription_id, + uptime_check_id: monitor_message.guid, + scheduled_check_time: monitor_message.scheduled_check_time_ms, + timestamp: monitor_message.actual_check_time_ms, + duration: monitor_message.duration_ms, + region_slug: monitor_message.region_slug.unwrap_or_default(), + check_status: monitor_message.status, + check_status_reason: monitor_message.status_reason.map(|r| r.ty), + http_status_code: monitor_message + .request_info + .unwrap_or_default() + .http_status_code, + trace_id: monitor_message.trace_id, + retention_days: monitor_message.retention_days, + partition, + offset, + }]; + + Ok((rows, monitor_message.actual_check_time_ms)) +} + +#[derive(Debug, Deserialize)] +struct UptimeMonitorCheckMessage<'a> { + // TODO: add these to the message + organization_id: u64, + project_id: u64, + retention_days: u16, + region_slug: Option<&'a str>, + environment: Option<&'a str>, + subscription_id: Uuid, + guid: Uuid, + scheduled_check_time_ms: f64, + actual_check_time_ms: f64, + duration_ms: u64, + status: &'a str, + status_reason: Option>, + trace_id: Uuid, + request_info: Option, +} +#[derive(Debug, Deserialize, Default)] +pub struct RequestInfo { + pub http_status_code: Option, +} +#[derive(Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub struct CheckStatusReason<'a> { + /// The type of the status reason + #[serde(rename = "type")] + pub ty: &'a str, + + /// A human readable description of the status reason + pub description: String, +} + +#[derive(Debug, Default, Serialize)] +pub struct UptimeMonitorCheckRow<'a> { + organization_id: u64, + project_id: u64, + environment: Option<&'a str>, + uptime_subscription_id: Uuid, + uptime_check_id: Uuid, + scheduled_check_time: f64, + timestamp: f64, + duration: u64, + region_slug: &'a str, + check_status: &'a str, + check_status_reason: Option<&'a str>, + http_status_code: Option, + trace_id: Uuid, + retention_days: u16, + partition: u16, + offset: u64, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_monitor_checkin() { + let data = r#"{ + "organization_id": 1, + "project_id": 1, + "retention_days": 30, + "region_slug": "global", + "environment": "prod", + "subscription_id": "123e4567-e89b-12d3-a456-426614174000", + "guid": "550e8400-e29b-41d4-a716-446655440000", + "scheduled_check_time_ms": 1702659277, + "actual_check_time_ms": 1702659277, + "duration_ms": 100, + "status": "ok", + "status_reason": { + "type": "Request successful", + "description": "Request successful" + }, + "http_status_code": 200, + "trace_id": "550e8400-e29b-41d4-a716-446655440000", + "request_info": { + "request_type": "GET", + "http_status_code": 200 + } + }"#; + + let (rows, timestamp) = deserialize_message(data.as_bytes(), 0, 0).unwrap(); + let monitor_row = rows.first().unwrap(); + + assert_eq!(monitor_row.organization_id, 1); + assert_eq!(monitor_row.project_id, 1); + assert_eq!(monitor_row.environment, Some("prod")); + assert_eq!( + monitor_row.uptime_subscription_id, + Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000").unwrap() + ); + assert_eq!(monitor_row.duration, 100); + assert_eq!(monitor_row.timestamp, 1702659277.0); + assert_eq!(monitor_row.region_slug, "global".to_string()); + assert_eq!(monitor_row.check_status, "ok"); + assert_eq!(monitor_row.check_status_reason, Some("Request successful")); + assert_eq!(monitor_row.http_status_code, Some(200)); + assert_eq!(monitor_row.retention_days, 30); + assert_eq!(monitor_row.partition, 0); + assert_eq!(monitor_row.offset, 0); + assert_eq!(timestamp, 1702659277.0); + } +} diff --git a/scripts/load_uptime_checks.py b/scripts/load_uptime_checks.py new file mode 100644 index 0000000000..8795f0bf53 --- /dev/null +++ b/scripts/load_uptime_checks.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 + +import datetime +import json +import random +import uuid + +import requests + +# Generate and insert data for uptime checks for each project +base_time = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + +query = """ +INSERT INTO default.uptime_monitor_checks_local ( + organization_id, project_id, environment, uptime_subscription_id, uptime_check_id, + scheduled_check_time, timestamp, duration, region_id, check_status, + check_status_reason, http_status_code, trace_id, retention_days +) FORMAT JSONEachRow +""" + +total_records = 0 + +for project_id in range(1, 2): + project_data = [] + for minute in range(24 * 60 * 90): # 24 hours * 60 minutes * 90 days + timestamp = base_time + datetime.timedelta(minutes=minute) + scheduled_time = timestamp - datetime.timedelta(seconds=random.randint(1, 30)) + http_status = ( + 500 if minute % 100 == 0 else 200 + ) # Every 100th record gets status 500 + check_status = "failure" if http_status == 500 else "success" + project_data.append( + { + "organization_id": 1, + "project_id": project_id, + "environment": "production", + "uptime_subscription_id": random.randint(1, 3) * project_id, + "uptime_check_id": str(uuid.uuid4()), + "scheduled_check_time": scheduled_time.strftime("%Y-%m-%d %H:%M:%S"), + "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"), + "duration": random.randint(1, 1000), + "region_id": random.randint(1, 3), + "check_status": check_status, + "check_status_reason": "Timeout error" + if check_status == "failure" + else None, + "http_status_code": http_status, + "trace_id": str(uuid.uuid4()), + "retention_days": 30, + } + ) + + response = requests.post( + "http://localhost:8123", + params={"query": query}, + data="\n".join(json.dumps(row) for row in project_data), + ) + + if response.status_code == 200: + total_records += len(project_data) + print( + f"Successfully inserted {len(project_data)} records for project {project_id}" + ) + else: + print(f"Error inserting data for project {project_id}: {response.text}") + +print(f"Total records inserted: {total_records}") diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml new file mode 100644 index 0000000000..31c9ed6716 --- /dev/null +++ b/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml @@ -0,0 +1,61 @@ +version: v1 +kind: writable_storage +name: uptime_monitor_checks +storage: + key: events_analytics_platform + set_key: events_analytics_platform +readiness_state: limited +schema: + columns: + [ + { name: organization_id, type: UInt, args: { size: 64 } }, + { name: project_id, type: UInt, args: { size: 64 } }, + { name: environment, type: String, args: { schema_modifiers: [nullable, low_cardinality] } }, + { name: uptime_subscription_id, type: UUID }, + { name: uptime_check_id, type: UUID }, + { name: scheduled_check_time, type: DateTime64, args: { precision: 3 } }, + { name: timestamp, type: DateTime64, args: { precision: 3 } }, + { name: duration_ms, type: UInt, args: { size: 64 } }, + { name: region_slug, type: String, args: { schema_modifiers: [low_cardinality] } }, + { name: check_status, type: String, args: { schema_modifiers: [low_cardinality] } }, + { name: check_status_reason, type: String, args: { schema_modifiers: [low_cardinality] } }, + { name: http_status_code, type: UInt, args: { size: 16, schema_modifiers: [nullable] } }, + { name: trace_id, type: UUID }, + { name: retention_days, type: UInt, args: { size: 16 } }, + ] + local_table_name: uptime_monitor_checks_local + dist_table_name: uptime_monitor_checks_dist + + +mandatory_condition_checkers: + - condition: ProjectIdEnforcer + +stream_loader: + processor: UptimeMonitorChecksProcessor + default_topic: snuba-uptime-results + +allocation_policies: + - name: ConcurrentRateLimitAllocationPolicy + args: + required_tenant_types: + - organization_id + - referrer + - project_id + default_config_overrides: + is_enforced: 0 + - name: ReferrerGuardRailPolicy + args: + required_tenant_types: + - referrer + default_config_overrides: + is_enforced: 0 + is_active: 0 + - name: BytesScannedRejectingPolicy + args: + required_tenant_types: + - organization_id + - project_id + - referrer + default_config_overrides: + is_active: 0 + is_enforced: 0 diff --git a/snuba/datasets/processors/uptime_monitors_processor.py b/snuba/datasets/processors/uptime_monitors_processor.py new file mode 100644 index 0000000000..7bbc9dabb5 --- /dev/null +++ b/snuba/datasets/processors/uptime_monitors_processor.py @@ -0,0 +1,14 @@ +import logging + +from snuba import environment +from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor +from snuba.utils.metrics.wrapper import MetricsWrapper + +logger = logging.getLogger(__name__) + +metrics = MetricsWrapper(environment.metrics, "uptime_monitor_checks.processor") + + +class UptimeMonitorChecksProcessor(RustCompatProcessor): + def __init__(self) -> None: + super().__init__("UptimeMonitorChecksProcessor") diff --git a/snuba/datasets/storages/validator.py b/snuba/datasets/storages/validator.py index b3cd319248..ed32bb4fce 100644 --- a/snuba/datasets/storages/validator.py +++ b/snuba/datasets/storages/validator.py @@ -29,7 +29,10 @@ def check_readiness_state_level_with_migration_group(self) -> None: ) if storage_readiness_state > group_readiness_state: raise IncompatibleReadinessStates( - f"The storage={storage_set_key} readiness state is greater than the corresponding migration group's readiness state." + ( + f"The storage={storage_set_key} readiness state is greater than the corresponding migration group's readiness state.", + f"storage_readiness_state={storage_readiness_state}, group_readiness_state={group_readiness_state}", + ) ) diff --git a/snuba/utils/streams/topics.py b/snuba/utils/streams/topics.py index 0cf1692be2..fada1887a2 100644 --- a/snuba/utils/streams/topics.py +++ b/snuba/utils/streams/topics.py @@ -45,6 +45,7 @@ class Topic(Enum): PROFILE_CHUNKS = "snuba-profile-chunks" REPLAYEVENTS = "ingest-replay-events" + UPTIME_RESULTS = "snuba-uptime-results" GENERIC_METRICS = "snuba-generic-metrics" GENERIC_METRICS_SETS_COMMIT_LOG = "snuba-generic-metrics-sets-commit-log" GENERIC_METRICS_DISTRIBUTIONS_COMMIT_LOG = ( From 0932e899639e6ebcf3d7a7969e3da52cf5d6c40f Mon Sep 17 00:00:00 2001 From: Josh Ferge Date: Wed, 8 Jan 2025 14:37:59 -0500 Subject: [PATCH 04/10] fix(uptime): rename snuba migration to correct number (#6728) renames the migration as 19 was reverted in the EAP dataset. https://github.com/getsentry/snuba/pull/6697#discussion_r1906309935 --- ...{0020_uptime_monitors_init.py => 0019_uptime_monitors_init.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename snuba/snuba_migrations/events_analytics_platform/{0020_uptime_monitors_init.py => 0019_uptime_monitors_init.py} (100%) diff --git a/snuba/snuba_migrations/events_analytics_platform/0020_uptime_monitors_init.py b/snuba/snuba_migrations/events_analytics_platform/0019_uptime_monitors_init.py similarity index 100% rename from snuba/snuba_migrations/events_analytics_platform/0020_uptime_monitors_init.py rename to snuba/snuba_migrations/events_analytics_platform/0019_uptime_monitors_init.py From 23cef28bc4fdbe8e6f6613c709775f497388e865 Mon Sep 17 00:00:00 2001 From: Josh Ferge Date: Wed, 8 Jan 2025 15:56:42 -0500 Subject: [PATCH 05/10] chore(uptime): set readiness state to experimental (#6729) set this to experimental so we can test on s4s --- .../storages/uptime_monitor_checks.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml index 31c9ed6716..a64a927748 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml @@ -4,7 +4,7 @@ name: uptime_monitor_checks storage: key: events_analytics_platform set_key: events_analytics_platform -readiness_state: limited +readiness_state: experimental schema: columns: [ From 6c5213ce4cbc4eb1ee1fa9ca33f3fac4f8893b27 Mon Sep 17 00:00:00 2001 From: Colin <161344340+colin-sentry@users.noreply.github.com> Date: Wed, 8 Jan 2025 16:45:53 -0500 Subject: [PATCH 06/10] feat: add a MergeTree table for our structured logs product (#6727) We want to store logs in GCS, but we still need a table for environments that don't have GCS configured. This table will be used in dev and CI, for example --- .../0020_ourlogs_init.py | 90 +++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 snuba/snuba_migrations/events_analytics_platform/0020_ourlogs_init.py diff --git a/snuba/snuba_migrations/events_analytics_platform/0020_ourlogs_init.py b/snuba/snuba_migrations/events_analytics_platform/0020_ourlogs_init.py new file mode 100644 index 0000000000..36b15aaac0 --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/0020_ourlogs_init.py @@ -0,0 +1,90 @@ +from typing import List, Sequence + +from snuba.clickhouse.columns import UUID, Column, String, UInt +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations, table_engines +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import OperationTarget, SqlOperation +from snuba.utils.schemas import DateTime64, Float, Map + +storage_set = StorageSetKey.EVENTS_ANALYTICS_PLATFORM +local_table_name = "ourlogs_local" +dist_table_name = "ourlogs_dist" +num_attr_buckets = 20 + +columns: List[Column[Modifiers]] = [ + Column("organization_id", UInt(64)), + Column("project_id", UInt(64)), + Column("trace_id", UUID()), # optional + Column("span_id", UInt(64)), # optional + Column("severity_text", String()), + Column("severity_number", UInt(8)), + Column("retention_days", UInt(16)), + Column("timestamp", DateTime64(9)), # nanosecond precision + Column("body", String(modifiers=Modifiers(codecs=["ZSTD(1)"]))), +] + +columns.extend( + [ + Column( + f"attr_str_{i}", + Map(String(), String(), modifiers=Modifiers(codecs=["ZSTD(1)"])), + ) + for i in range(num_attr_buckets) + ] +) + +columns.extend( + [ + Column( + f"attr_num_{i}", + Map(String(), Float(64), modifiers=Modifiers(codecs=["ZSTD(1)"])), + ) + for i in range(num_attr_buckets) + ] +) + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + + def forwards_ops(self) -> Sequence[SqlOperation]: + return [ + operations.CreateTable( + storage_set=storage_set, + table_name=local_table_name, + columns=columns, + engine=table_engines.MergeTree( + order_by="(organization_id, project_id, toDateTime(timestamp), trace_id)", + partition_by="(retention_days, toMonday(timestamp))", + settings={"index_granularity": "8192"}, + storage_set=storage_set, + ttl="toDateTime(timestamp) + toIntervalDay(retention_days)", + ), + target=OperationTarget.LOCAL, + ), + operations.CreateTable( + storage_set=storage_set, + table_name=dist_table_name, + columns=columns, + engine=table_engines.Distributed( + local_table_name=local_table_name, + sharding_key="rand()", + ), + target=OperationTarget.DISTRIBUTED, + ), + ] + + def backwards_ops(self) -> Sequence[SqlOperation]: + return [ + operations.DropTable( + storage_set=storage_set, + table_name=dist_table_name, + target=OperationTarget.DISTRIBUTED, + ), + operations.DropTable( + storage_set=storage_set, + table_name=local_table_name, + target=OperationTarget.LOCAL, + ), + ] From 33453a0625cdeb26cb637e6ce5215bd30e15e18c Mon Sep 17 00:00:00 2001 From: davidtsuk <132949946+davidtsuk@users.noreply.github.com> Date: Wed, 8 Jan 2025 13:53:35 -0800 Subject: [PATCH 07/10] fix(eap): fix bug with data_present (#6726) Fixes https://github.com/getsentry/eap-planning/issues/144 ## Additional Context When we perform aggregations over attributes, the function is converted into function_nameIf (e.g. countIf) which returns a default value if no rows match the condition. This means that there is no way to distinguish between an aggregate result that is 0 because it's for example a sum of values that add up to 0, and a result that is 0 because no values matched the given condition. To deal with this, we compute the number of events being aggregated even when we aren't extrapolating so we can determine if data was present or not. --- snuba/web/rpc/common/aggregation.py | 22 ++------- snuba/web/rpc/v1/endpoint_time_series.py | 26 ++++------ tests/web/rpc/test_aggregation.py | 8 ++-- .../test_endpoint_time_series.py | 48 +++++++++++++++++++ 4 files changed, 67 insertions(+), 37 deletions(-) diff --git a/snuba/web/rpc/common/aggregation.py b/snuba/web/rpc/common/aggregation.py index 313038c453..593318fbe4 100644 --- a/snuba/web/rpc/common/aggregation.py +++ b/snuba/web/rpc/common/aggregation.py @@ -46,7 +46,7 @@ class ExtrapolationContext(ABC): sample_count: int @property - def extrapolated_data_present(self) -> bool: + def is_data_present(self) -> bool: return self.sample_count > 0 @property @@ -67,8 +67,8 @@ def from_row( value = row_data[column_label] confidence_interval = None - average_sample_rate = None - sample_count = None + average_sample_rate = 0 + sample_count = 0 percentile = 0.0 granularity = 0.0 @@ -108,18 +108,6 @@ def from_row( elif custom_column_information.custom_column_id == "count": sample_count = col_value - if ( - confidence_interval is None - or average_sample_rate is None - or sample_count is None - ): - return GenericExtrapolationContext( - value=value, - confidence_interval=None, - average_sample_rate=0, - sample_count=0, - ) - if is_percentile: return PercentileExtrapolationContext( value=value, @@ -150,7 +138,7 @@ def is_extrapolated(self) -> bool: @cached_property def reliability(self) -> Reliability.ValueType: - if not self.is_extrapolated or not self.extrapolated_data_present: + if not self.is_extrapolated or not self.is_data_present: return Reliability.RELIABILITY_UNSPECIFIED relative_confidence = ( @@ -183,7 +171,7 @@ def is_extrapolated(self) -> bool: @cached_property def reliability(self) -> Reliability.ValueType: - if not self.is_extrapolated or not self.extrapolated_data_present: + if not self.is_extrapolated or not self.is_data_present: return Reliability.RELIABILITY_UNSPECIFIED lower_bound, upper_bound = _calculate_approximate_ci_percentile_levels( diff --git a/snuba/web/rpc/v1/endpoint_time_series.py b/snuba/web/rpc/v1/endpoint_time_series.py index 04495bb8f7..78e9848b24 100644 --- a/snuba/web/rpc/v1/endpoint_time_series.py +++ b/snuba/web/rpc/v1/endpoint_time_series.py @@ -173,14 +173,7 @@ def _convert_result_timeseries( extrapolation_context = ExtrapolationContext.from_row( timeseries.label, row_data ) - if ( - # This isn't quite right but all non extrapolated aggregates - # are assumed to be present. - not extrapolation_context.is_extrapolated - # This checks if the extrapolated aggregate is present by - # inspecting the sample count - or extrapolation_context.extrapolated_data_present - ): + if extrapolation_context.is_data_present: timeseries.data_points.append( DataPoint( data=row_data[timeseries.label], @@ -209,7 +202,7 @@ def _build_query(request: TimeSeriesRequest) -> Query: for aggregation in request.aggregations ] - extrapolation_columns = [] + additional_context_columns = [] for aggregation in request.aggregations: if ( aggregation.extrapolation_mode @@ -217,7 +210,7 @@ def _build_query(request: TimeSeriesRequest) -> Query: ): confidence_interval_column = get_confidence_interval_column(aggregation) if confidence_interval_column is not None: - extrapolation_columns.append( + additional_context_columns.append( SelectedExpression( name=confidence_interval_column.alias, expression=confidence_interval_column, @@ -225,16 +218,17 @@ def _build_query(request: TimeSeriesRequest) -> Query: ) average_sample_rate_column = get_average_sample_rate_column(aggregation) - count_column = get_count_column(aggregation) - extrapolation_columns.append( + additional_context_columns.append( SelectedExpression( name=average_sample_rate_column.alias, expression=average_sample_rate_column, ) ) - extrapolation_columns.append( - SelectedExpression(name=count_column.alias, expression=count_column) - ) + + count_column = get_count_column(aggregation) + additional_context_columns.append( + SelectedExpression(name=count_column.alias, expression=count_column) + ) groupby_columns = [ SelectedExpression( @@ -275,7 +269,7 @@ def _build_query(request: TimeSeriesRequest) -> Query: ), *aggregation_columns, *groupby_columns, - *extrapolation_columns, + *additional_context_columns, ], granularity=request.granularity_secs, condition=base_conditions_and( diff --git a/tests/web/rpc/test_aggregation.py b/tests/web/rpc/test_aggregation.py index c7db14b9f9..48bba0fca6 100644 --- a/tests/web/rpc/test_aggregation.py +++ b/tests/web/rpc/test_aggregation.py @@ -87,7 +87,7 @@ def test_get_confidence_interval_column_for_non_extrapolatable_column() -> None: "column_name", "average_sample_rate", "reliability", - "extrapolated_data_present", + "is_data_present", "is_extrapolated", ), [ @@ -180,18 +180,18 @@ def test_get_confidence_interval_column_for_non_extrapolatable_column() -> None: ), ], ) -def test_get_extrapolation_meta( +def test_get_extrapolation_context( row_data: dict[str, Any], column_name: str, average_sample_rate: float, reliability: Reliability.ValueType, - extrapolated_data_present: bool, + is_data_present: bool, is_extrapolated: bool, ) -> None: extrapolation_context = ExtrapolationContext.from_row(column_name, row_data) assert extrapolation_context.average_sample_rate == average_sample_rate assert extrapolation_context.reliability == reliability - assert extrapolation_context.extrapolated_data_present == extrapolated_data_present + assert extrapolation_context.is_data_present == is_data_present assert extrapolation_context.is_extrapolated == is_extrapolated diff --git a/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py b/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py index 8a5c5ef46d..6aa7fb189f 100644 --- a/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py +++ b/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py @@ -626,6 +626,54 @@ def test_start_time_not_divisible_by_time_buckets_returns_valid_data(self) -> No assert len(ts.data_points) == 1 assert ts.data_points[0].data == 300 + def test_with_non_existent_attribute(self) -> None: + store_timeseries( + BASE_TIME, + 1, + 3600, + metrics=[ + DummyMetric("test_metric", get_value=lambda x: 1), + ], + ) + + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 30)), + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey( + type=AttributeKey.TYPE_FLOAT, name="non_existent_metric" + ), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ], + granularity_secs=300, + ) + + response = EndpointTimeSeries().execute(message) + expected_buckets = [ + Timestamp(seconds=int(BASE_TIME.timestamp()) + secs) + for secs in range(0, 60 * 30, 300) + ] + + assert response.result_timeseries == [ + TimeSeries( + label="sum", + buckets=expected_buckets, + data_points=[ + DataPoint(data_present=False) for _ in range(len(expected_buckets)) + ], + ) + ] + class TestUtils: def test_no_duplicate_labels(self) -> None: From b45cc6a2e39089c24e6b874683cfe82a7fd897f2 Mon Sep 17 00:00:00 2001 From: Josh Ferge Date: Wed, 8 Jan 2025 17:18:00 -0500 Subject: [PATCH 08/10] feat(uptime): Add uptime monitor checks consumer to deploy script (#6731) Add uptime monitor checks consumer to the deployment script to ensure it gets deployed along with other Snuba consumers. --- gocd/templates/bash/deploy.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/gocd/templates/bash/deploy.sh b/gocd/templates/bash/deploy.sh index 77f3ab24b3..e6f08ee513 100644 --- a/gocd/templates/bash/deploy.sh +++ b/gocd/templates/bash/deploy.sh @@ -54,6 +54,7 @@ eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}" --container-name="eap-spans-profiled-consumer" \ --container-name="eap-spans-subscriptions-scheduler" \ --container-name="eap-spans-subscriptions-executor" \ + --container-name="uptime-results-consumer" \ --container-name="lw-deletions-search-issues-consumer" \ && /devinfra/scripts/k8s/k8s-deploy.py \ --label-selector="${LABEL_SELECTOR}" \ From 64987614601f00100c209bf1033c954d5c4083ab Mon Sep 17 00:00:00 2001 From: xurui-c <159840875+xurui-c@users.noreply.github.com> Date: Wed, 8 Jan 2025 14:59:48 -0800 Subject: [PATCH 09/10] feat(job-runner): allow `BaseException`s to terminate the program (#6730) Catching BaseException without re-raising the exception is not a good idea. this will catch KeyboardInterrupt, which in turn makes it not possible to use Ctrl-C to kill the web server. Let's change that Co-authored-by: Rachel Chen --- snuba/admin/views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snuba/admin/views.py b/snuba/admin/views.py index e771304f48..d8ee8a04ad 100644 --- a/snuba/admin/views.py +++ b/snuba/admin/views.py @@ -1372,7 +1372,7 @@ def execute_job(job_id: str) -> Response: job_status = None try: job_status = run_job(job_specs[job_id]) - except BaseException as e: + except Exception as e: return make_response( jsonify( { From bd84663732f5cc411d44d5e1f7f491d213a3572c Mon Sep 17 00:00:00 2001 From: Josh Ferge Date: Wed, 8 Jan 2025 19:16:21 -0500 Subject: [PATCH 10/10] fix(storage): correctly name storage key (#6733) should be `uptime_monitor_checks` for the key. aligns with how `eap_spans` works. this fixes s4s deployment error here: https://console.cloud.google.com/logs/query;query=resource.type%3D%22k8s_container%22%0Aresource.labels.project_id%3D%22mattrobenolt-kube%22%0Aresource.labels.location%3D%22us-west1-c%22%0Aresource.labels.cluster_name%3D%22primary%22%0Aresource.labels.namespace_name%3D%22default%22%0Aresource.labels.pod_name%3D%22snuba-uptime-results-consumer-67d755db8c-wz6c8%22%20severity%3E%3DDEFAULT;storageScope=project;pinnedLogId=2025-01-08T23:34:40.996910078Z%2Fgl78637r4sgdqea5;cursorTimestamp=2025-01-08T23:34:40.996894468Z;aroundTime=2025-01-08T23:34:40.996910078Z;duration=PT1H?hl=en&inv=1&invt=AbmVdg&project=mattrobenolt-kube --- .../storages/uptime_monitor_checks.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml index a64a927748..6ac2ce94f9 100644 --- a/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml +++ b/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml @@ -2,7 +2,7 @@ version: v1 kind: writable_storage name: uptime_monitor_checks storage: - key: events_analytics_platform + key: uptime_monitor_checks set_key: events_analytics_platform readiness_state: experimental schema: