diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c8fcc14642d..dc0929854ed 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -170,6 +170,11 @@ jobs: # cache-to is re-implemented using the next step, to run conditionally for contributors outputs: type=docker,dest=/tmp/snuba-ci.tar + - name: Load snuba-ci image + run: | + docker load --input /tmp/snuba-ci.tar + docker image ls -a + - name: Publish snuba-ci image to registry if: steps.branch.outputs.branch == 'master' || github.event.pull_request.head.repo.full_name == github.repository # outside contributors won't be able to push to the docker registry diff --git a/gocd/templates/bash/deploy.sh b/gocd/templates/bash/deploy.sh index 77f3ab24b39..e6f08ee513a 100644 --- a/gocd/templates/bash/deploy.sh +++ b/gocd/templates/bash/deploy.sh @@ -54,6 +54,7 @@ eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}" --container-name="eap-spans-profiled-consumer" \ --container-name="eap-spans-subscriptions-scheduler" \ --container-name="eap-spans-subscriptions-executor" \ + --container-name="uptime-results-consumer" \ --container-name="lw-deletions-search-issues-consumer" \ && /devinfra/scripts/k8s/k8s-deploy.py \ --label-selector="${LABEL_SELECTOR}" \ diff --git a/rust_snuba/src/processors/mod.rs b/rust_snuba/src/processors/mod.rs index b4667db3d4a..8ad75fe7b7a 100644 --- a/rust_snuba/src/processors/mod.rs +++ b/rust_snuba/src/processors/mod.rs @@ -9,8 +9,8 @@ mod querylog; mod release_health_metrics; mod replays; mod spans; +mod uptime_monitor_checks; mod utils; - use crate::config::ProcessorConfig; use crate::types::{InsertBatch, InsertOrReplacement, KafkaMessageMetadata}; use sentry_arroyo::backends::kafka::types::KafkaPayload; @@ -54,6 +54,7 @@ define_processing_functions! { ("ProfilesMessageProcessor", "processed-profiles", ProcessingFunctionType::ProcessingFunction(profiles::process_message)), ("QuerylogProcessor", "snuba-queries", ProcessingFunctionType::ProcessingFunction(querylog::process_message)), ("ReplaysProcessor", "ingest-replay-events", ProcessingFunctionType::ProcessingFunction(replays::process_message)), + ("UptimeMonitorChecksProcessor", "snuba-uptime-results", ProcessingFunctionType::ProcessingFunction(uptime_monitor_checks::process_message)), ("SpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(spans::process_message)), ("EAPSpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(eap_spans::process_message)), ("OutcomesProcessor", "outcomes", ProcessingFunctionType::ProcessingFunction(outcomes::process_message)), diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__failure.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__failure.json.snap new file mode 100644 index 00000000000..b4fce9485ac --- /dev/null +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__failure.json.snap @@ -0,0 +1,25 @@ +--- +source: src/processors/mod.rs +description: "{\n \"guid\": \"54afc7ed9c53491481919c931f75bae1\",\n \"subscription_id\": \"5421b5df80744113a6b57014f01a3a42\",\n \"status\": \"failure\",\n \"status_reason\": {\n \"type\": \"dns_error\",\n \"description\": \"Unable to resolve hostname example.xyz\"\n },\n \"trace_id\": \"947efba02dac463b9c1d886a44bafc94\",\n \"span_id\": \"58e84098e63f42e1\",\n \"scheduled_check_time_ms\": 1717614062978,\n \"actual_check_time_ms\": 1717614068008,\n \"duration_ms\": 100,\n \"request_info\": {\n \"request_type\": \"HEAD\",\n \"http_status_code\": 500\n },\n \"organization_id\": 123,\n \"project_id\": 456,\n \"retention_days\": 90,\n \"region_slug\": \"us-east-1\"\n}\n" +expression: snapshot_payload +--- +[ + { + "check_status": "failure", + "check_status_reason": "dns_error", + "duration": 100, + "environment": null, + "http_status_code": 500, + "offset": 1, + "organization_id": 123, + "partition": 0, + "project_id": 456, + "region_slug": "us-east-1", + "retention_days": 90, + "scheduled_check_time": 1717614062978.0, + "timestamp": 1717614068008.0, + "trace_id": "947efba0-2dac-463b-9c1d-886a44bafc94", + "uptime_check_id": "54afc7ed-9c53-4914-8191-9c931f75bae1", + "uptime_subscription_id": "5421b5df-8074-4113-a6b5-7014f01a3a42" + } +] diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__succeess.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__succeess.json.snap new file mode 100644 index 00000000000..38bb4035fce --- /dev/null +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__succeess.json.snap @@ -0,0 +1,25 @@ +--- +source: src/processors/mod.rs +description: "{\n \"guid\": \"54afc7ed9c53491481919c931f75bae1\",\n \"subscription_id\": \"5421b5df80744113a6b57014f01a3a42\",\n \"status\": \"success\",\n \"status_reason\": null,\n \"trace_id\": \"947efba02dac463b9c1d886a44bafc94\",\n \"span_id\": \"58e84098e63f42e1\",\n \"scheduled_check_time_ms\": 1717614062978,\n \"actual_check_time_ms\": 1717614068008,\n \"duration_ms\": 50,\n \"request_info\": {\n \"request_type\": \"HEAD\",\n \"http_status_code\": 200\n },\n \"organization_id\": 123,\n \"project_id\": 456,\n \"retention_days\": 90,\n \"region_slug\": \"us-east-1\"\n}\n" +expression: snapshot_payload +--- +[ + { + "check_status": "success", + "check_status_reason": null, + "duration": 50, + "environment": null, + "http_status_code": 200, + "offset": 1, + "organization_id": 123, + "partition": 0, + "project_id": 456, + "region_slug": "us-east-1", + "retention_days": 90, + "scheduled_check_time": 1717614062978.0, + "timestamp": 1717614068008.0, + "trace_id": "947efba0-2dac-463b-9c1d-886a44bafc94", + "uptime_check_id": "54afc7ed-9c53-4914-8191-9c931f75bae1", + "uptime_subscription_id": "5421b5df-8074-4113-a6b5-7014f01a3a42" + } +] diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__timeout.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__timeout.json.snap new file mode 100644 index 00000000000..cf453b837af --- /dev/null +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@snuba-uptime-results-UptimeMonitorChecksProcessor-snuba-uptime-results__1__timeout.json.snap @@ -0,0 +1,25 @@ +--- +source: src/processors/mod.rs +description: "{\n \"guid\": \"54afc7ed9c53491481919c931f75bae1\",\n \"subscription_id\": \"5421b5df80744113a6b57014f01a3a42\",\n \"status\": \"failure\",\n \"status_reason\": {\n \"type\": \"timeout\",\n \"description\": \"Check timed out\"\n },\n \"trace_id\": \"947efba02dac463b9c1d886a44bafc94\",\n \"span_id\": \"58e84098e63f42e1\",\n \"scheduled_check_time_ms\": 1717614062978,\n \"actual_check_time_ms\": 1717614068008,\n \"duration_ms\": 100,\n \"request_info\": {\n \"request_type\": \"HEAD\",\n \"http_status_code\": null\n },\n \"organization_id\": 123,\n \"project_id\": 456,\n \"retention_days\": 90,\n \"region_slug\": \"us-east-1\"\n}\n" +expression: snapshot_payload +--- +[ + { + "check_status": "failure", + "check_status_reason": "timeout", + "duration": 100, + "environment": null, + "http_status_code": null, + "offset": 1, + "organization_id": 123, + "partition": 0, + "project_id": 456, + "region_slug": "us-east-1", + "retention_days": 90, + "scheduled_check_time": 1717614062978.0, + "timestamp": 1717614068008.0, + "trace_id": "947efba0-2dac-463b-9c1d-886a44bafc94", + "uptime_check_id": "54afc7ed-9c53-4914-8191-9c931f75bae1", + "uptime_subscription_id": "5421b5df-8074-4113-a6b5-7014f01a3a42" + } +] diff --git a/rust_snuba/src/processors/uptime_monitor_checks.rs b/rust_snuba/src/processors/uptime_monitor_checks.rs new file mode 100644 index 00000000000..0066e701f45 --- /dev/null +++ b/rust_snuba/src/processors/uptime_monitor_checks.rs @@ -0,0 +1,158 @@ +use crate::config::ProcessorConfig; +use anyhow::Context; +use chrono::DateTime; +use sentry_arroyo::backends::kafka::types::KafkaPayload; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::types::{InsertBatch, KafkaMessageMetadata}; + +pub fn process_message( + payload: KafkaPayload, + metadata: KafkaMessageMetadata, + _config: &ProcessorConfig, +) -> anyhow::Result { + let payload_bytes = payload.payload().context("Expected payload")?; + let (rows, origin_timestamp) = + deserialize_message(payload_bytes, metadata.partition, metadata.offset)?; + + InsertBatch::from_rows(rows, DateTime::from_timestamp(origin_timestamp as i64, 0)) +} + +pub fn deserialize_message( + payload: &[u8], + partition: u16, + offset: u64, +) -> anyhow::Result<(Vec, f64)> { + let monitor_message: UptimeMonitorCheckMessage = serde_json::from_slice(payload)?; + + let rows = vec![UptimeMonitorCheckRow { + organization_id: monitor_message.organization_id, + project_id: monitor_message.project_id, + environment: monitor_message.environment, + uptime_subscription_id: monitor_message.subscription_id, + uptime_check_id: monitor_message.guid, + scheduled_check_time: monitor_message.scheduled_check_time_ms, + timestamp: monitor_message.actual_check_time_ms, + duration: monitor_message.duration_ms, + region_slug: monitor_message.region_slug.unwrap_or_default(), + check_status: monitor_message.status, + check_status_reason: monitor_message.status_reason.map(|r| r.ty), + http_status_code: monitor_message + .request_info + .unwrap_or_default() + .http_status_code, + trace_id: monitor_message.trace_id, + retention_days: monitor_message.retention_days, + partition, + offset, + }]; + + Ok((rows, monitor_message.actual_check_time_ms)) +} + +#[derive(Debug, Deserialize)] +struct UptimeMonitorCheckMessage<'a> { + // TODO: add these to the message + organization_id: u64, + project_id: u64, + retention_days: u16, + region_slug: Option<&'a str>, + environment: Option<&'a str>, + subscription_id: Uuid, + guid: Uuid, + scheduled_check_time_ms: f64, + actual_check_time_ms: f64, + duration_ms: u64, + status: &'a str, + status_reason: Option>, + trace_id: Uuid, + request_info: Option, +} +#[derive(Debug, Deserialize, Default)] +pub struct RequestInfo { + pub http_status_code: Option, +} +#[derive(Debug, PartialEq, Deserialize, Serialize)] +#[serde(rename_all = "snake_case")] +pub struct CheckStatusReason<'a> { + /// The type of the status reason + #[serde(rename = "type")] + pub ty: &'a str, + + /// A human readable description of the status reason + pub description: String, +} + +#[derive(Debug, Default, Serialize)] +pub struct UptimeMonitorCheckRow<'a> { + organization_id: u64, + project_id: u64, + environment: Option<&'a str>, + uptime_subscription_id: Uuid, + uptime_check_id: Uuid, + scheduled_check_time: f64, + timestamp: f64, + duration: u64, + region_slug: &'a str, + check_status: &'a str, + check_status_reason: Option<&'a str>, + http_status_code: Option, + trace_id: Uuid, + retention_days: u16, + partition: u16, + offset: u64, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_monitor_checkin() { + let data = r#"{ + "organization_id": 1, + "project_id": 1, + "retention_days": 30, + "region_slug": "global", + "environment": "prod", + "subscription_id": "123e4567-e89b-12d3-a456-426614174000", + "guid": "550e8400-e29b-41d4-a716-446655440000", + "scheduled_check_time_ms": 1702659277, + "actual_check_time_ms": 1702659277, + "duration_ms": 100, + "status": "ok", + "status_reason": { + "type": "Request successful", + "description": "Request successful" + }, + "http_status_code": 200, + "trace_id": "550e8400-e29b-41d4-a716-446655440000", + "request_info": { + "request_type": "GET", + "http_status_code": 200 + } + }"#; + + let (rows, timestamp) = deserialize_message(data.as_bytes(), 0, 0).unwrap(); + let monitor_row = rows.first().unwrap(); + + assert_eq!(monitor_row.organization_id, 1); + assert_eq!(monitor_row.project_id, 1); + assert_eq!(monitor_row.environment, Some("prod")); + assert_eq!( + monitor_row.uptime_subscription_id, + Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000").unwrap() + ); + assert_eq!(monitor_row.duration, 100); + assert_eq!(monitor_row.timestamp, 1702659277.0); + assert_eq!(monitor_row.region_slug, "global".to_string()); + assert_eq!(monitor_row.check_status, "ok"); + assert_eq!(monitor_row.check_status_reason, Some("Request successful")); + assert_eq!(monitor_row.http_status_code, Some(200)); + assert_eq!(monitor_row.retention_days, 30); + assert_eq!(monitor_row.partition, 0); + assert_eq!(monitor_row.offset, 0); + assert_eq!(timestamp, 1702659277.0); + } +} diff --git a/scripts/load_uptime_checks.py b/scripts/load_uptime_checks.py new file mode 100644 index 00000000000..8795f0bf53b --- /dev/null +++ b/scripts/load_uptime_checks.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 + +import datetime +import json +import random +import uuid + +import requests + +# Generate and insert data for uptime checks for each project +base_time = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) + +query = """ +INSERT INTO default.uptime_monitor_checks_local ( + organization_id, project_id, environment, uptime_subscription_id, uptime_check_id, + scheduled_check_time, timestamp, duration, region_id, check_status, + check_status_reason, http_status_code, trace_id, retention_days +) FORMAT JSONEachRow +""" + +total_records = 0 + +for project_id in range(1, 2): + project_data = [] + for minute in range(24 * 60 * 90): # 24 hours * 60 minutes * 90 days + timestamp = base_time + datetime.timedelta(minutes=minute) + scheduled_time = timestamp - datetime.timedelta(seconds=random.randint(1, 30)) + http_status = ( + 500 if minute % 100 == 0 else 200 + ) # Every 100th record gets status 500 + check_status = "failure" if http_status == 500 else "success" + project_data.append( + { + "organization_id": 1, + "project_id": project_id, + "environment": "production", + "uptime_subscription_id": random.randint(1, 3) * project_id, + "uptime_check_id": str(uuid.uuid4()), + "scheduled_check_time": scheduled_time.strftime("%Y-%m-%d %H:%M:%S"), + "timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"), + "duration": random.randint(1, 1000), + "region_id": random.randint(1, 3), + "check_status": check_status, + "check_status_reason": "Timeout error" + if check_status == "failure" + else None, + "http_status_code": http_status, + "trace_id": str(uuid.uuid4()), + "retention_days": 30, + } + ) + + response = requests.post( + "http://localhost:8123", + params={"query": query}, + data="\n".join(json.dumps(row) for row in project_data), + ) + + if response.status_code == 200: + total_records += len(project_data) + print( + f"Successfully inserted {len(project_data)} records for project {project_id}" + ) + else: + print(f"Error inserting data for project {project_id}: {response.text}") + +print(f"Total records inserted: {total_records}") diff --git a/snuba/admin/views.py b/snuba/admin/views.py index e771304f48b..d8ee8a04ad0 100644 --- a/snuba/admin/views.py +++ b/snuba/admin/views.py @@ -1372,7 +1372,7 @@ def execute_job(job_id: str) -> Response: job_status = None try: job_status = run_job(job_specs[job_id]) - except BaseException as e: + except Exception as e: return make_response( jsonify( { diff --git a/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml b/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml new file mode 100644 index 00000000000..6ac2ce94f92 --- /dev/null +++ b/snuba/datasets/configuration/events_analytics_platform/storages/uptime_monitor_checks.yaml @@ -0,0 +1,61 @@ +version: v1 +kind: writable_storage +name: uptime_monitor_checks +storage: + key: uptime_monitor_checks + set_key: events_analytics_platform +readiness_state: experimental +schema: + columns: + [ + { name: organization_id, type: UInt, args: { size: 64 } }, + { name: project_id, type: UInt, args: { size: 64 } }, + { name: environment, type: String, args: { schema_modifiers: [nullable, low_cardinality] } }, + { name: uptime_subscription_id, type: UUID }, + { name: uptime_check_id, type: UUID }, + { name: scheduled_check_time, type: DateTime64, args: { precision: 3 } }, + { name: timestamp, type: DateTime64, args: { precision: 3 } }, + { name: duration_ms, type: UInt, args: { size: 64 } }, + { name: region_slug, type: String, args: { schema_modifiers: [low_cardinality] } }, + { name: check_status, type: String, args: { schema_modifiers: [low_cardinality] } }, + { name: check_status_reason, type: String, args: { schema_modifiers: [low_cardinality] } }, + { name: http_status_code, type: UInt, args: { size: 16, schema_modifiers: [nullable] } }, + { name: trace_id, type: UUID }, + { name: retention_days, type: UInt, args: { size: 16 } }, + ] + local_table_name: uptime_monitor_checks_local + dist_table_name: uptime_monitor_checks_dist + + +mandatory_condition_checkers: + - condition: ProjectIdEnforcer + +stream_loader: + processor: UptimeMonitorChecksProcessor + default_topic: snuba-uptime-results + +allocation_policies: + - name: ConcurrentRateLimitAllocationPolicy + args: + required_tenant_types: + - organization_id + - referrer + - project_id + default_config_overrides: + is_enforced: 0 + - name: ReferrerGuardRailPolicy + args: + required_tenant_types: + - referrer + default_config_overrides: + is_enforced: 0 + is_active: 0 + - name: BytesScannedRejectingPolicy + args: + required_tenant_types: + - organization_id + - project_id + - referrer + default_config_overrides: + is_active: 0 + is_enforced: 0 diff --git a/snuba/datasets/processors/replays_processor.py b/snuba/datasets/processors/replays_processor.py index 04e8bb2c6ad..a3c5a688159 100644 --- a/snuba/datasets/processors/replays_processor.py +++ b/snuba/datasets/processors/replays_processor.py @@ -1,239 +1,6 @@ -from __future__ import annotations - -import dataclasses -import logging -import uuid -from datetime import datetime, timezone -from hashlib import md5 -from typing import Any, Callable, Mapping, TypeVar - -import rapidjson - -from snuba import environment from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor -from snuba.processor import _collapse_uint16, _collapse_uint32 -from snuba.util import force_bytes -from snuba.utils.metrics.wrapper import MetricsWrapper - -logger = logging.getLogger(__name__) - -metrics = MetricsWrapper(environment.metrics, "replays.processor") - -ReplayEventDict = Mapping[Any, Any] -RetentionDays = int - -# Limit for error_ids / trace_ids / urls array elements -LIST_ELEMENT_LIMIT = 1000 -MAX_CLICK_EVENTS = 20 - -USER_FIELDS_PRECEDENCE = ("user_id", "username", "email", "ip_address") -LOG_LEVELS = ["fatal", "error", "warning", "info", "debug"] class ReplaysProcessor(RustCompatProcessor): def __init__(self) -> None: super().__init__("ReplaysProcessor") - - -T = TypeVar("T") -U = TypeVar("U") - - -def segment_id_to_event_hash(segment_id: int | None) -> str: - if segment_id is None: - # Rows with null segment_id fields are considered "out of band" meaning they do not - # originate from the SDK and do not relate to a specific segment. - # - # For example: archive requests. - return str(uuid.uuid4()) - else: - segment_id_bytes = force_bytes(str(segment_id)) - segment_hash = md5(segment_id_bytes).hexdigest() - return to_uuid(segment_hash) - - -def default(default: Callable[[], T], value: T | None) -> T: - """Return a default value only if the given value was null. - - Falsey types such as 0, "", False, [], {} are returned. - """ - return default() if value is None else value - - -def maybe(into: Callable[[T], U], value: T | None) -> U | None: - """Optionally return a processed value.""" - return None if value is None else into(value) - - -def to_datetime(value: Any) -> datetime: - """Return a datetime instance or err. - - Datetimes for the replays schema standardize on 32 bit dates. - """ - return _timestamp_to_datetime(_collapse_or_err(_collapse_uint32, int(value))) - - -def to_uint32(value: Any) -> int: - return _collapse_or_err(_collapse_uint32, int(value)) - - -def to_uint1(value: Any) -> int: - int_value = int(value) - if int_value == 0 or int_value == 1: - return int_value - else: - raise ValueError("Value must be 0 or 1") - - -def to_uint16(value: Any) -> int: - return _collapse_or_err(_collapse_uint16, int(value)) - - -def to_string(value: Any) -> str: - """Return a string or err. - - This function follows the lead of "snuba.processors._unicodify" and enforces UTF-8 - encoding. - """ - if isinstance(value, (bool, dict, list)): - result: str = rapidjson.dumps(value) - return _encode_utf8(result) - elif value is None: - return "" - else: - return _encode_utf8(str(value)) - - -def to_capped_string(capacity: int, value: Any) -> str: - """Return a capped string.""" - return to_string(value)[:capacity] - - -def to_enum(enumeration: list[str]) -> Callable[[Any], str | None]: - def inline(value: Any) -> str | None: - for enum in enumeration: - if value == enum: - return enum - return None - - return inline - - -def to_capped_list(metric_name: str, value: Any) -> list[Any]: - """Return a list of values capped to the maximum allowable limit.""" - return _capped_list(metric_name, default(list, maybe(_is_list, value))) - - -def to_typed_list(callable: Callable[[Any], T], values: list[Any]) -> list[T]: - return list(map(callable, filter(lambda value: value is not None, values))) - - -def to_uuid(value: Any) -> str: - """Return a stringified uuid or err.""" - return str(uuid.UUID(str(value))) - - -def raise_on_null(field: str, value: Any) -> Any: - if value is None: - raise ValueError(f"Missing data for required field: {field}") - return value - - -def _is_list(value: Any) -> list[Any]: - if isinstance(value, list): - return value - else: - raise TypeError( - f"Invalid type specified. Expected list; received {type(value)} with a value of " - f"{value}" - ) - - -def _encode_utf8(value: str) -> str: - """Return a utf-8 encoded string.""" - return value.encode("utf8", errors="backslashreplace").decode("utf8") - - -def _capped_list(metric_name: str, value: list[Any]) -> list[Any]: - """Return a list with a maximum configured length.""" - if len(value) > LIST_ELEMENT_LIMIT: - metrics.increment(f'"{metric_name}" exceeded maximum length.') - - return value[:LIST_ELEMENT_LIMIT] - - -def _collapse_or_err(callable: Callable[[int], int | None], value: int) -> int: - """Return the integer or error if it overflows.""" - if callable(value) is None: - # This exception can only be triggered through abuse. We choose not to suppress these - # exceptions in favor of identifying the origin. - raise ValueError(f'Integer "{value}" overflowed.') - else: - return value - - -def _timestamp_to_datetime(timestamp: int) -> datetime: - """Convert an integer timestamp to a timezone-aware utc datetime instance.""" - return datetime.fromtimestamp(timestamp, tz=timezone.utc) - - -# Tags processor. - - -@dataclasses.dataclass -class Tag: - keys: list[str] - values: list[str] - transaction: str | None - - @classmethod - def empty_set(cls) -> Tag: - return cls([], [], None) - - -def process_tags_object(value: Any) -> Tag: - if value is None: - return Tag.empty_set() - - # Excess tags are trimmed. - tags = _capped_list("tags", normalize_tags(value)) - - keys = [] - values = [] - transaction = None - - for key, value in tags: - # Keys and values are stored as optional strings regardless of their input type. - parsed_key, parsed_value = to_string(key), maybe(to_string, value) - - if key == "transaction": - transaction = parsed_value - elif parsed_value is not None: - keys.append(parsed_key) - values.append(parsed_value) - - return Tag(keys=keys, values=values, transaction=transaction) - - -def normalize_tags(value: Any) -> list[tuple[str, str]]: - """Normalize tags payload to a single format.""" - if isinstance(value, dict): - return _coerce_tags_dictionary(value) - elif isinstance(value, list): - return _coerce_tags_tuple_list(value) - else: - raise TypeError(f'Invalid tags type specified: "{type(value)}"') - - -def _coerce_tags_dictionary(tags: dict[str, Any]) -> list[tuple[str, str]]: - """Return a list of tag tuples from an unspecified dictionary.""" - return [(key, value) for key, value in tags.items() if isinstance(key, str)] - - -def _coerce_tags_tuple_list(tags_list: list[Any]) -> list[tuple[str, str]]: - """Return a list of tag tuples from an unspecified list of values.""" - return [ - (item[0], item[1]) - for item in tags_list - if (isinstance(item, (list, tuple)) and len(item) == 2) - ] diff --git a/snuba/datasets/processors/uptime_monitors_processor.py b/snuba/datasets/processors/uptime_monitors_processor.py new file mode 100644 index 00000000000..7bbc9dabb59 --- /dev/null +++ b/snuba/datasets/processors/uptime_monitors_processor.py @@ -0,0 +1,14 @@ +import logging + +from snuba import environment +from snuba.datasets.processors.rust_compat_processor import RustCompatProcessor +from snuba.utils.metrics.wrapper import MetricsWrapper + +logger = logging.getLogger(__name__) + +metrics = MetricsWrapper(environment.metrics, "uptime_monitor_checks.processor") + + +class UptimeMonitorChecksProcessor(RustCompatProcessor): + def __init__(self) -> None: + super().__init__("UptimeMonitorChecksProcessor") diff --git a/snuba/datasets/storages/validator.py b/snuba/datasets/storages/validator.py index b3cd3192484..ed32bb4fce5 100644 --- a/snuba/datasets/storages/validator.py +++ b/snuba/datasets/storages/validator.py @@ -29,7 +29,10 @@ def check_readiness_state_level_with_migration_group(self) -> None: ) if storage_readiness_state > group_readiness_state: raise IncompatibleReadinessStates( - f"The storage={storage_set_key} readiness state is greater than the corresponding migration group's readiness state." + ( + f"The storage={storage_set_key} readiness state is greater than the corresponding migration group's readiness state.", + f"storage_readiness_state={storage_readiness_state}, group_readiness_state={group_readiness_state}", + ) ) diff --git a/snuba/snuba_migrations/events_analytics_platform/0020_uptime_monitors_init.py b/snuba/snuba_migrations/events_analytics_platform/0019_uptime_monitors_init.py similarity index 100% rename from snuba/snuba_migrations/events_analytics_platform/0020_uptime_monitors_init.py rename to snuba/snuba_migrations/events_analytics_platform/0019_uptime_monitors_init.py diff --git a/snuba/snuba_migrations/events_analytics_platform/0020_ourlogs_init.py b/snuba/snuba_migrations/events_analytics_platform/0020_ourlogs_init.py new file mode 100644 index 00000000000..36b15aaac0f --- /dev/null +++ b/snuba/snuba_migrations/events_analytics_platform/0020_ourlogs_init.py @@ -0,0 +1,90 @@ +from typing import List, Sequence + +from snuba.clickhouse.columns import UUID, Column, String, UInt +from snuba.clusters.storage_sets import StorageSetKey +from snuba.migrations import migration, operations, table_engines +from snuba.migrations.columns import MigrationModifiers as Modifiers +from snuba.migrations.operations import OperationTarget, SqlOperation +from snuba.utils.schemas import DateTime64, Float, Map + +storage_set = StorageSetKey.EVENTS_ANALYTICS_PLATFORM +local_table_name = "ourlogs_local" +dist_table_name = "ourlogs_dist" +num_attr_buckets = 20 + +columns: List[Column[Modifiers]] = [ + Column("organization_id", UInt(64)), + Column("project_id", UInt(64)), + Column("trace_id", UUID()), # optional + Column("span_id", UInt(64)), # optional + Column("severity_text", String()), + Column("severity_number", UInt(8)), + Column("retention_days", UInt(16)), + Column("timestamp", DateTime64(9)), # nanosecond precision + Column("body", String(modifiers=Modifiers(codecs=["ZSTD(1)"]))), +] + +columns.extend( + [ + Column( + f"attr_str_{i}", + Map(String(), String(), modifiers=Modifiers(codecs=["ZSTD(1)"])), + ) + for i in range(num_attr_buckets) + ] +) + +columns.extend( + [ + Column( + f"attr_num_{i}", + Map(String(), Float(64), modifiers=Modifiers(codecs=["ZSTD(1)"])), + ) + for i in range(num_attr_buckets) + ] +) + + +class Migration(migration.ClickhouseNodeMigration): + blocking = False + + def forwards_ops(self) -> Sequence[SqlOperation]: + return [ + operations.CreateTable( + storage_set=storage_set, + table_name=local_table_name, + columns=columns, + engine=table_engines.MergeTree( + order_by="(organization_id, project_id, toDateTime(timestamp), trace_id)", + partition_by="(retention_days, toMonday(timestamp))", + settings={"index_granularity": "8192"}, + storage_set=storage_set, + ttl="toDateTime(timestamp) + toIntervalDay(retention_days)", + ), + target=OperationTarget.LOCAL, + ), + operations.CreateTable( + storage_set=storage_set, + table_name=dist_table_name, + columns=columns, + engine=table_engines.Distributed( + local_table_name=local_table_name, + sharding_key="rand()", + ), + target=OperationTarget.DISTRIBUTED, + ), + ] + + def backwards_ops(self) -> Sequence[SqlOperation]: + return [ + operations.DropTable( + storage_set=storage_set, + table_name=dist_table_name, + target=OperationTarget.DISTRIBUTED, + ), + operations.DropTable( + storage_set=storage_set, + table_name=local_table_name, + target=OperationTarget.LOCAL, + ), + ] diff --git a/snuba/utils/streams/topics.py b/snuba/utils/streams/topics.py index 0cf1692be20..fada1887a2d 100644 --- a/snuba/utils/streams/topics.py +++ b/snuba/utils/streams/topics.py @@ -45,6 +45,7 @@ class Topic(Enum): PROFILE_CHUNKS = "snuba-profile-chunks" REPLAYEVENTS = "ingest-replay-events" + UPTIME_RESULTS = "snuba-uptime-results" GENERIC_METRICS = "snuba-generic-metrics" GENERIC_METRICS_SETS_COMMIT_LOG = "snuba-generic-metrics-sets-commit-log" GENERIC_METRICS_DISTRIBUTIONS_COMMIT_LOG = ( diff --git a/snuba/web/rpc/common/aggregation.py b/snuba/web/rpc/common/aggregation.py index 313038c4531..593318fbe43 100644 --- a/snuba/web/rpc/common/aggregation.py +++ b/snuba/web/rpc/common/aggregation.py @@ -46,7 +46,7 @@ class ExtrapolationContext(ABC): sample_count: int @property - def extrapolated_data_present(self) -> bool: + def is_data_present(self) -> bool: return self.sample_count > 0 @property @@ -67,8 +67,8 @@ def from_row( value = row_data[column_label] confidence_interval = None - average_sample_rate = None - sample_count = None + average_sample_rate = 0 + sample_count = 0 percentile = 0.0 granularity = 0.0 @@ -108,18 +108,6 @@ def from_row( elif custom_column_information.custom_column_id == "count": sample_count = col_value - if ( - confidence_interval is None - or average_sample_rate is None - or sample_count is None - ): - return GenericExtrapolationContext( - value=value, - confidence_interval=None, - average_sample_rate=0, - sample_count=0, - ) - if is_percentile: return PercentileExtrapolationContext( value=value, @@ -150,7 +138,7 @@ def is_extrapolated(self) -> bool: @cached_property def reliability(self) -> Reliability.ValueType: - if not self.is_extrapolated or not self.extrapolated_data_present: + if not self.is_extrapolated or not self.is_data_present: return Reliability.RELIABILITY_UNSPECIFIED relative_confidence = ( @@ -183,7 +171,7 @@ def is_extrapolated(self) -> bool: @cached_property def reliability(self) -> Reliability.ValueType: - if not self.is_extrapolated or not self.extrapolated_data_present: + if not self.is_extrapolated or not self.is_data_present: return Reliability.RELIABILITY_UNSPECIFIED lower_bound, upper_bound = _calculate_approximate_ci_percentile_levels( diff --git a/snuba/web/rpc/v1/endpoint_time_series.py b/snuba/web/rpc/v1/endpoint_time_series.py index 04495bb8f75..78e9848b24c 100644 --- a/snuba/web/rpc/v1/endpoint_time_series.py +++ b/snuba/web/rpc/v1/endpoint_time_series.py @@ -173,14 +173,7 @@ def _convert_result_timeseries( extrapolation_context = ExtrapolationContext.from_row( timeseries.label, row_data ) - if ( - # This isn't quite right but all non extrapolated aggregates - # are assumed to be present. - not extrapolation_context.is_extrapolated - # This checks if the extrapolated aggregate is present by - # inspecting the sample count - or extrapolation_context.extrapolated_data_present - ): + if extrapolation_context.is_data_present: timeseries.data_points.append( DataPoint( data=row_data[timeseries.label], @@ -209,7 +202,7 @@ def _build_query(request: TimeSeriesRequest) -> Query: for aggregation in request.aggregations ] - extrapolation_columns = [] + additional_context_columns = [] for aggregation in request.aggregations: if ( aggregation.extrapolation_mode @@ -217,7 +210,7 @@ def _build_query(request: TimeSeriesRequest) -> Query: ): confidence_interval_column = get_confidence_interval_column(aggregation) if confidence_interval_column is not None: - extrapolation_columns.append( + additional_context_columns.append( SelectedExpression( name=confidence_interval_column.alias, expression=confidence_interval_column, @@ -225,16 +218,17 @@ def _build_query(request: TimeSeriesRequest) -> Query: ) average_sample_rate_column = get_average_sample_rate_column(aggregation) - count_column = get_count_column(aggregation) - extrapolation_columns.append( + additional_context_columns.append( SelectedExpression( name=average_sample_rate_column.alias, expression=average_sample_rate_column, ) ) - extrapolation_columns.append( - SelectedExpression(name=count_column.alias, expression=count_column) - ) + + count_column = get_count_column(aggregation) + additional_context_columns.append( + SelectedExpression(name=count_column.alias, expression=count_column) + ) groupby_columns = [ SelectedExpression( @@ -275,7 +269,7 @@ def _build_query(request: TimeSeriesRequest) -> Query: ), *aggregation_columns, *groupby_columns, - *extrapolation_columns, + *additional_context_columns, ], granularity=request.granularity_secs, condition=base_conditions_and( diff --git a/tests/datasets/test_replays_processor.py b/tests/datasets/test_replays_processor.py deleted file mode 100644 index ad9be0aa2cd..00000000000 --- a/tests/datasets/test_replays_processor.py +++ /dev/null @@ -1,635 +0,0 @@ -from __future__ import annotations - -import json -import uuid -from dataclasses import dataclass -from datetime import datetime, timezone -from typing import Any, Mapping, Optional - -import pytest - -from snuba.consumers.types import KafkaMessageMetadata -from snuba.datasets.processors.replays_processor import ( - ReplaysProcessor, - maybe, - segment_id_to_event_hash, - to_capped_list, - to_uuid, -) -from snuba.processor import InsertBatch - -LOG_LEVELS = ["fatal", "error", "warning", "info", "debug"] - - -@dataclass -class ReplayEvent: - replay_id: str - replay_type: str | None - event_hash: str | None - error_sample_rate: float | None - session_sample_rate: float | None - segment_id: Any - trace_ids: Any - error_ids: Any - urls: Any - is_archived: int | None - timestamp: Any - replay_start_timestamp: Any - platform: Any - environment: Any - release: Any - dist: Any - ipv4: str | None - ipv6: str | None - user_name: Any - user_id: Any - user_email: Any - os_name: Any - os_version: Any - browser_name: Any - browser_version: Any - device_name: Any - device_brand: Any - device_family: Any - device_model: Any - sdk_name: Any - sdk_version: Any - title: str | None - - @classmethod - def empty_set(cls) -> ReplayEvent: - return cls( - replay_id="e5e062bf2e1d4afd96fd2f90b6770431", - timestamp=int(datetime.now(timezone.utc).timestamp()), - segment_id=None, - replay_type=None, - event_hash=None, - error_sample_rate=None, - session_sample_rate=None, - title=None, - error_ids=[], - trace_ids=[], - replay_start_timestamp=None, - platform=None, - dist=None, - urls=[], - is_archived=None, - os_name=None, - os_version=None, - browser_name=None, - browser_version=None, - device_name=None, - device_brand=None, - device_family=None, - device_model=None, - user_name=None, - user_id=None, - user_email=None, - ipv4=None, - ipv6=None, - environment=None, - release=None, - sdk_name=None, - sdk_version=None, - ) - - def serialize( - self, header_overrides: Optional[dict[Any, Any]] = None - ) -> Mapping[Any, Any]: - replay_event: Any = { - "type": "replay_event", - "replay_id": self.replay_id, - "replay_type": self.replay_type, - "segment_id": self.segment_id, - "event_hash": self.event_hash, - "tags": [["customtag", "is_set"], ["transaction", self.title]], - "urls": self.urls, - "is_archived": self.is_archived, - "trace_ids": self.trace_ids, - "error_ids": self.error_ids, - "dist": self.dist, - "platform": self.platform, - "timestamp": _datetime_to_timestamp(self.timestamp), - "replay_start_timestamp": _datetime_to_timestamp( - self.replay_start_timestamp - ), - "environment": self.environment, - "release": self.release, - "user": { - "id": self.user_id, - "username": self.user_name, - "email": self.user_email, - "ip_address": self.ipv4, - }, - "sdk": { - "name": self.sdk_name, - "version": self.sdk_version, - }, - "contexts": { - "trace": { - "op": "pageload", - "span_id": "affa5649681a1eeb", - "trace_id": "23eda6cd4b174ef8a51f0096df3bfdd1", - }, - "replay": { - "error_sample_rate": self.error_sample_rate, - "session_sample_rate": self.session_sample_rate, - }, - "os": { - "name": self.os_name, - "version": self.os_version, - }, - "browser": { - "name": self.browser_name, - "version": self.browser_version, - }, - "device": { - "name": self.device_name, - "brand": self.device_brand, - "family": self.device_family, - "model": self.device_model, - }, - }, - "request": { - "url": "Doesn't matter not ingested.", - "headers": {"User-Agent": "not used"}, - }, - "extra": {}, - } - - headers = { - "start_time": datetime.now().timestamp(), - "type": "replay_event", - "replay_id": self.replay_id, - "project_id": 1, - "retention_days": 30, - } - headers.update(header_overrides or {}) - return { - **headers, - **{"payload": list(bytes(json.dumps(replay_event).encode()))}, - } - - def _user_field(self) -> Any | None: - user_fields = [ - self.user_id, - self.user_name, - self.user_email, - self.ipv4, - self.ipv6, - ] - for f in user_fields: - if f is not None: - return f - return None - - def build_result(self, meta: KafkaMessageMetadata) -> Mapping[str, Any]: - event_hash = segment_id_to_event_hash(self.segment_id) - - ret = { - "project_id": 1, - "replay_id": str(uuid.UUID(self.replay_id)), - "replay_type": self.replay_type, - "error_sample_rate": self.error_sample_rate, - "session_sample_rate": self.session_sample_rate, - "event_hash": self.event_hash or event_hash, - "segment_id": self.segment_id, - "trace_ids": list( - map(to_uuid, to_capped_list("trace_ids", self.trace_ids)) - ), - "error_ids": list( - map(to_uuid, to_capped_list("trace_ids", self.error_ids)) - ), - "timestamp": maybe(int, self.timestamp), - "replay_start_timestamp": maybe(int, self.replay_start_timestamp), - "platform": self.platform, - "environment": self.environment, - "release": self.release, - "dist": self.dist, - "urls": to_capped_list("urls", self.urls), - "is_archived": 1 if self.is_archived is True else None, - "user": (self.user_name or self.user_email or self.ipv4 or self.ipv6 or ""), - "user_id": self.user_id, - "user_name": self.user_name, - "user_email": self.user_email, - "os_name": self.os_name, - "os_version": self.os_version, - "browser_name": self.browser_name, - "browser_version": self.browser_version, - "device_name": self.device_name, - "device_brand": self.device_brand, - "device_family": self.device_family, - "device_model": self.device_model, - "tags.key": ["customtag"], - "tags.value": ["is_set"], - "title": self.title, - "sdk_name": "sentry.python", - "sdk_version": "0.9.0", - "retention_days": 30, - "offset": meta.offset, - "partition": meta.partition, - } - user = self._user_field() - - if user: - ret["user"] = user - - if self.ipv4: - ret["ip_address_v4"] = self.ipv4 - elif self.ipv6: - ret["ip_address_v6"] = self.ipv6 - return ret - - -class TestReplaysProcessor: - def test_process_message(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - header_overrides = { - "start_time": int(datetime.now(tz=timezone.utc).timestamp()) - } - message = ReplayEvent( - replay_id="e5e062bf2e1d4afd96fd2f90b6770431", - replay_type="session", - event_hash=None, - error_sample_rate=0.5, - session_sample_rate=0.5, - title="/organizations/:orgId/issues/", - error_ids=["36e980a9c6024cde9f5d089f15b83b5f"], - trace_ids=[ - "36e980a9c6024cde9f5d089f15b83b5f", - "8bea4461d8b944f393c15a3cb1c4169a", - ], - segment_id=0, - timestamp=int(datetime.now(tz=timezone.utc).timestamp()), - replay_start_timestamp=int(datetime.now(tz=timezone.utc).timestamp()), - platform="python", - dist="", - urls=["http://127.0.0.1:8001"], - is_archived=True, - user_name="me", - user_id="232", - user_email="test@test.com", - os_name="iOS", - os_version="16.2", - browser_name="Chrome", - browser_version="103.0.38", - device_name="iPhone 11", - device_brand="Apple", - device_family="iPhone", - device_model="iPhone", - ipv4="127.0.0.1", - ipv6=None, - environment="prod", - release="34a554c14b68285d8a8eb6c5c4c56dfc1db9a83a", - sdk_name="sentry.python", - sdk_version="0.9.0", - ) - - expected = InsertBatch( - [message.build_result(meta)], - datetime.utcfromtimestamp(header_overrides["start_time"]), - ).rows[0] - - response = ( - ReplaysProcessor() - .process_message(message.serialize(header_overrides), meta) - .rows[0] - ) - - assert response | expected == response - - def test_process_message_minimal_payload_segment_id(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - minimal_payload = { - "type": "replay_event", - "start_time": datetime.now().timestamp(), - "replay_id": str(uuid.uuid4()), - "project_id": 1, - "retention_days": 30, - "payload": list( - bytes( - json.dumps( - { - "type": "replay_event", - "replay_id": str(uuid.uuid4()), - "timestamp": int(datetime.now().timestamp()), - "segment_id": 0, - "platform": "internal", - } - ).encode() - ) - ), - } - - # Asserting that the minimal payload was successfully processed. - result = ReplaysProcessor().process_message(minimal_payload, meta) - assert isinstance(result, InsertBatch) - assert len(result.rows) == 1 - assert len(result.rows[0]["event_hash"]) == 36 - - def test_process_message_minimal_payload_event_hash(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - event_hash = uuid.uuid4().hex - minimal_payload = { - "type": "replay_event", - "start_time": datetime.now().timestamp(), - "replay_id": str(uuid.uuid4()), - "project_id": 1, - "retention_days": 30, - "payload": list( - bytes( - json.dumps( - { - "type": "replay_event", - "replay_id": str(uuid.uuid4()), - "timestamp": int(datetime.now().timestamp()), - "event_hash": event_hash, - "platform": "internal", - } - ).encode() - ) - ), - } - - # Asserting that the minimal payload was successfully processed. - result = ReplaysProcessor().process_message(minimal_payload, meta) - assert isinstance(result, InsertBatch) - assert len(result.rows) == 1 - assert result.rows[0]["event_hash"] == str(uuid.UUID(event_hash)) - - def test_process_message_nulls(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - message = ReplayEvent.empty_set() - - processed_batch = ReplaysProcessor().process_message(message.serialize(), meta) - assert isinstance(processed_batch, InsertBatch) # required for type checker - received = processed_batch.rows[0] - assert isinstance(received, dict) # required for type checker - received_event_hash = received.pop("event_hash") - - expected = message.build_result(meta) - assert isinstance(expected, dict) # required for type checker - assert received_event_hash != expected.pop("event_hash") # hash is random - - # Sample rates default to -1.0 which is an impossible state for the field. - assert received["error_sample_rate"] == -1.0 - assert received["session_sample_rate"] == -1.0 - - assert received["platform"] == "" - assert received["replay_type"] == "" - assert received["dist"] == "" - assert received["user_name"] == "" - assert received["user_id"] == "" - assert received["user_email"] == "" - assert received["os_name"] == "" - assert received["os_version"] == "" - assert received["browser_name"] == "" - assert received["browser_version"] == "" - assert received["device_name"] == "" - assert received["device_brand"] == "" - assert received["device_family"] == "" - assert received["device_model"] == "" - assert received["environment"] == "" - assert received["release"] == "" - assert received["sdk_name"] == "" - assert received["sdk_version"] == "" - assert received["replay_start_timestamp"] is None - - def test_process_message_invalid_segment_id(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - message = ReplayEvent.empty_set() - - with pytest.raises(Exception): - message.segment_id = "a" - ReplaysProcessor().process_message(message.serialize(), meta) - - with pytest.raises(Exception): - message.segment_id = -1 - ReplaysProcessor().process_message(message.serialize(), meta) - - with pytest.raises(Exception): - message.segment_id = 2**16 - ReplaysProcessor().process_message(message.serialize(), meta) - - message.segment_id = 2**16 - 1 - ReplaysProcessor().process_message(message.serialize(), meta) - - def test_process_message_invalid_timestamp(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - message = ReplayEvent.empty_set() - - with pytest.raises(Exception): - message.timestamp = f"{2**32 - 1}" - ReplaysProcessor().process_message(message.serialize(), meta) - - message.timestamp = 2**32 - 1 - ReplaysProcessor().process_message(message.serialize(), meta) - - message.timestamp = -1 - ReplaysProcessor().process_message(message.serialize(), meta) - - message.timestamp = 2**32 - ReplaysProcessor().process_message(message.serialize(), meta) - - def test_process_message_invalid_replay_start_timestamp(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - message = ReplayEvent.empty_set() - - with pytest.raises(Exception): - message.replay_start_timestamp = f"{2**32 - 1}" - ReplaysProcessor().process_message(message.serialize(), meta) - - message.replay_start_timestamp = -1 - ReplaysProcessor().process_message(message.serialize(), meta) - - message.replay_start_timestamp = 2**32 - ReplaysProcessor().process_message(message.serialize(), meta) - - message.replay_start_timestamp = 2**32 - 1 - ReplaysProcessor().process_message(message.serialize(), meta) - - -class TestReplaysActionProcessor: - def test_replay_actions(self) -> None: - meta = KafkaMessageMetadata( - offset=0, partition=0, timestamp=datetime(1970, 1, 1) - ) - - now = datetime.now(tz=timezone.utc).replace(microsecond=0) - - message = { - "type": "replay_event", - "start_time": datetime.now().timestamp(), - "replay_id": "bb570198b8f04f8bbe87077668530da7", - "project_id": 1, - "retention_days": 30, - "payload": list( - bytes( - json.dumps( - { - "type": "replay_actions", - "replay_id": "bb570198b8f04f8bbe87077668530da7", - "clicks": [ - { - "node_id": 59, - "tag": "div", - "id": "id", - "class": ["class1", "class2"], - "role": "button", - "aria_label": "test", - "alt": "", - "testid": "", - "title": "", - "text": "text", - "component_name": "SignUpButton", - "timestamp": int(now.timestamp()), - "event_hash": "df3c3aa2daae465e89f1169e49139827", - "is_dead": 0, - "is_rage": 1, - } - ], - } - ).encode() - ) - ), - } - - result = ReplaysProcessor().process_message(message, meta) - assert isinstance(result, InsertBatch) - rows = result.rows - assert len(rows) == 1 - - row = rows[0] - assert row["project_id"] == 1 - assert row["timestamp"] == int(now.timestamp()) - assert row["replay_id"] == str(uuid.UUID("bb570198b8f04f8bbe87077668530da7")) - assert row["event_hash"] == str(uuid.UUID("df3c3aa2daae465e89f1169e49139827")) - assert row["segment_id"] is None - assert row["trace_ids"] == [] - assert row["error_ids"] == [] - assert row["urls"] == [] - assert row["platform"] == "" - assert row["user"] == "" - assert row["sdk_name"] == "" - assert row["sdk_version"] == "" - assert row["retention_days"] == 30 - assert row["partition"] == 0 - assert row["offset"] == 0 - assert row["click_node_id"] == 59 - assert row["click_tag"] == "div" - assert row["click_id"] == "id" - assert row["click_class"] == ["class1", "class2"] - assert row["click_aria_label"] == "test" - assert row["click_role"] == "button" - assert row["click_text"] == "text" - assert row["click_alt"] == "" - assert row["click_testid"] == "" - assert row["click_title"] == "" - assert row["click_component_name"] == "SignUpButton" - assert row["click_is_dead"] == 0 - assert row["click_is_rage"] == 1 - - -from hashlib import md5 - - -def make_payload_for_event_link(severity: str) -> tuple[dict[str, Any], str, str]: - now = datetime.now(tz=timezone.utc).replace(microsecond=0) - replay_id = "bb570198b8f04f8bbe87077668530da7" - event_id = uuid.uuid4().hex - message = { - "type": "replay_event", - "start_time": datetime.now().timestamp(), - "replay_id": replay_id, - "project_id": 1, - "retention_days": 30, - "payload": list( - bytes( - json.dumps( - { - "type": "event_link", - "replay_id": replay_id, - severity + "_id": event_id, - "timestamp": int(now.timestamp()), - "event_hash": md5( - (replay_id + event_id).encode("utf-8") - ).hexdigest(), - } - ).encode() - ) - ), - } - return (message, event_id, severity) - - -@pytest.mark.parametrize( - "event_link_message", - [pytest.param(make_payload_for_event_link(log_level)) for log_level in LOG_LEVELS], -) -def test_replay_event_links( - event_link_message: tuple[dict[str, Any], str, str] -) -> None: - message, event_id, severity = event_link_message - - meta = KafkaMessageMetadata(offset=0, partition=0, timestamp=datetime(1970, 1, 1)) - - result = ReplaysProcessor().process_message(message, meta) - assert isinstance(result, InsertBatch) - rows = result.rows - assert len(rows) == 1 - - row = rows[0] - assert row["project_id"] == 1 - assert "timestamp" in row - assert row[severity + "_id"] == str(uuid.UUID(event_id)) - assert row["replay_id"] == str(uuid.UUID(message["replay_id"])) - assert row["event_hash"] == str( - uuid.UUID(md5((message["replay_id"] + event_id).encode("utf-8")).hexdigest()) - ) - assert row["segment_id"] is None - assert row["retention_days"] == 30 - assert row["partition"] == 0 - assert row["offset"] == 0 - - -@pytest.mark.parametrize( - "event_link_message", - [pytest.param(make_payload_for_event_link("not_valid"))], -) -def test_replay_event_links_invalid_severity( - event_link_message: tuple[dict[str, Any], str, str] -) -> None: - message, _, _ = event_link_message - - meta = KafkaMessageMetadata(offset=0, partition=0, timestamp=datetime(1970, 1, 1)) - - with pytest.raises(Exception): - ReplaysProcessor().process_message(message, meta) - - -def _datetime_to_timestamp(value: Any) -> Any: - if isinstance(value, datetime): - return value.timestamp() - else: - return value diff --git a/tests/web/rpc/test_aggregation.py b/tests/web/rpc/test_aggregation.py index c7db14b9f91..48bba0fca6d 100644 --- a/tests/web/rpc/test_aggregation.py +++ b/tests/web/rpc/test_aggregation.py @@ -87,7 +87,7 @@ def test_get_confidence_interval_column_for_non_extrapolatable_column() -> None: "column_name", "average_sample_rate", "reliability", - "extrapolated_data_present", + "is_data_present", "is_extrapolated", ), [ @@ -180,18 +180,18 @@ def test_get_confidence_interval_column_for_non_extrapolatable_column() -> None: ), ], ) -def test_get_extrapolation_meta( +def test_get_extrapolation_context( row_data: dict[str, Any], column_name: str, average_sample_rate: float, reliability: Reliability.ValueType, - extrapolated_data_present: bool, + is_data_present: bool, is_extrapolated: bool, ) -> None: extrapolation_context = ExtrapolationContext.from_row(column_name, row_data) assert extrapolation_context.average_sample_rate == average_sample_rate assert extrapolation_context.reliability == reliability - assert extrapolation_context.extrapolated_data_present == extrapolated_data_present + assert extrapolation_context.is_data_present == is_data_present assert extrapolation_context.is_extrapolated == is_extrapolated diff --git a/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py b/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py index 8a5c5ef46d4..6aa7fb189f6 100644 --- a/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py +++ b/tests/web/rpc/v1/test_endpoint_time_series/test_endpoint_time_series.py @@ -626,6 +626,54 @@ def test_start_time_not_divisible_by_time_buckets_returns_valid_data(self) -> No assert len(ts.data_points) == 1 assert ts.data_points[0].data == 300 + def test_with_non_existent_attribute(self) -> None: + store_timeseries( + BASE_TIME, + 1, + 3600, + metrics=[ + DummyMetric("test_metric", get_value=lambda x: 1), + ], + ) + + message = TimeSeriesRequest( + meta=RequestMeta( + project_ids=[1, 2, 3], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp() + 60 * 30)), + ), + aggregations=[ + AttributeAggregation( + aggregate=Function.FUNCTION_SUM, + key=AttributeKey( + type=AttributeKey.TYPE_FLOAT, name="non_existent_metric" + ), + label="sum", + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_NONE, + ), + ], + granularity_secs=300, + ) + + response = EndpointTimeSeries().execute(message) + expected_buckets = [ + Timestamp(seconds=int(BASE_TIME.timestamp()) + secs) + for secs in range(0, 60 * 30, 300) + ] + + assert response.result_timeseries == [ + TimeSeries( + label="sum", + buckets=expected_buckets, + data_points=[ + DataPoint(data_present=False) for _ in range(len(expected_buckets)) + ], + ) + ] + class TestUtils: def test_no_duplicate_labels(self) -> None: