Skip to content

Commit

Permalink
Merge branch 'master' into secure-ClickHouse-connections
Browse files Browse the repository at this point in the history
  • Loading branch information
patsevanton authored Jan 9, 2025
2 parents 4f7f7a7 + bd84663 commit be3d961
Show file tree
Hide file tree
Showing 21 changed files with 546 additions and 908 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ jobs:
# cache-to is re-implemented using the next step, to run conditionally for contributors
outputs: type=docker,dest=/tmp/snuba-ci.tar

- name: Load snuba-ci image
run: |
docker load --input /tmp/snuba-ci.tar
docker image ls -a
- name: Publish snuba-ci image to registry
if: steps.branch.outputs.branch == 'master' || github.event.pull_request.head.repo.full_name == github.repository
# outside contributors won't be able to push to the docker registry
Expand Down
1 change: 1 addition & 0 deletions gocd/templates/bash/deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ eval $(/devinfra/scripts/regions/project_env_vars.py --region="${SENTRY_REGION}"
--container-name="eap-spans-profiled-consumer" \
--container-name="eap-spans-subscriptions-scheduler" \
--container-name="eap-spans-subscriptions-executor" \
--container-name="uptime-results-consumer" \
--container-name="lw-deletions-search-issues-consumer" \
&& /devinfra/scripts/k8s/k8s-deploy.py \
--label-selector="${LABEL_SELECTOR}" \
Expand Down
3 changes: 2 additions & 1 deletion rust_snuba/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ mod querylog;
mod release_health_metrics;
mod replays;
mod spans;
mod uptime_monitor_checks;
mod utils;

use crate::config::ProcessorConfig;
use crate::types::{InsertBatch, InsertOrReplacement, KafkaMessageMetadata};
use sentry_arroyo::backends::kafka::types::KafkaPayload;
Expand Down Expand Up @@ -54,6 +54,7 @@ define_processing_functions! {
("ProfilesMessageProcessor", "processed-profiles", ProcessingFunctionType::ProcessingFunction(profiles::process_message)),
("QuerylogProcessor", "snuba-queries", ProcessingFunctionType::ProcessingFunction(querylog::process_message)),
("ReplaysProcessor", "ingest-replay-events", ProcessingFunctionType::ProcessingFunction(replays::process_message)),
("UptimeMonitorChecksProcessor", "snuba-uptime-results", ProcessingFunctionType::ProcessingFunction(uptime_monitor_checks::process_message)),
("SpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(spans::process_message)),
("EAPSpansMessageProcessor", "snuba-spans", ProcessingFunctionType::ProcessingFunction(eap_spans::process_message)),
("OutcomesProcessor", "outcomes", ProcessingFunctionType::ProcessingFunction(outcomes::process_message)),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
source: src/processors/mod.rs
description: "{\n \"guid\": \"54afc7ed9c53491481919c931f75bae1\",\n \"subscription_id\": \"5421b5df80744113a6b57014f01a3a42\",\n \"status\": \"failure\",\n \"status_reason\": {\n \"type\": \"dns_error\",\n \"description\": \"Unable to resolve hostname example.xyz\"\n },\n \"trace_id\": \"947efba02dac463b9c1d886a44bafc94\",\n \"span_id\": \"58e84098e63f42e1\",\n \"scheduled_check_time_ms\": 1717614062978,\n \"actual_check_time_ms\": 1717614068008,\n \"duration_ms\": 100,\n \"request_info\": {\n \"request_type\": \"HEAD\",\n \"http_status_code\": 500\n },\n \"organization_id\": 123,\n \"project_id\": 456,\n \"retention_days\": 90,\n \"region_slug\": \"us-east-1\"\n}\n"
expression: snapshot_payload
---
[
{
"check_status": "failure",
"check_status_reason": "dns_error",
"duration": 100,
"environment": null,
"http_status_code": 500,
"offset": 1,
"organization_id": 123,
"partition": 0,
"project_id": 456,
"region_slug": "us-east-1",
"retention_days": 90,
"scheduled_check_time": 1717614062978.0,
"timestamp": 1717614068008.0,
"trace_id": "947efba0-2dac-463b-9c1d-886a44bafc94",
"uptime_check_id": "54afc7ed-9c53-4914-8191-9c931f75bae1",
"uptime_subscription_id": "5421b5df-8074-4113-a6b5-7014f01a3a42"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
source: src/processors/mod.rs
description: "{\n \"guid\": \"54afc7ed9c53491481919c931f75bae1\",\n \"subscription_id\": \"5421b5df80744113a6b57014f01a3a42\",\n \"status\": \"success\",\n \"status_reason\": null,\n \"trace_id\": \"947efba02dac463b9c1d886a44bafc94\",\n \"span_id\": \"58e84098e63f42e1\",\n \"scheduled_check_time_ms\": 1717614062978,\n \"actual_check_time_ms\": 1717614068008,\n \"duration_ms\": 50,\n \"request_info\": {\n \"request_type\": \"HEAD\",\n \"http_status_code\": 200\n },\n \"organization_id\": 123,\n \"project_id\": 456,\n \"retention_days\": 90,\n \"region_slug\": \"us-east-1\"\n}\n"
expression: snapshot_payload
---
[
{
"check_status": "success",
"check_status_reason": null,
"duration": 50,
"environment": null,
"http_status_code": 200,
"offset": 1,
"organization_id": 123,
"partition": 0,
"project_id": 456,
"region_slug": "us-east-1",
"retention_days": 90,
"scheduled_check_time": 1717614062978.0,
"timestamp": 1717614068008.0,
"trace_id": "947efba0-2dac-463b-9c1d-886a44bafc94",
"uptime_check_id": "54afc7ed-9c53-4914-8191-9c931f75bae1",
"uptime_subscription_id": "5421b5df-8074-4113-a6b5-7014f01a3a42"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
source: src/processors/mod.rs
description: "{\n \"guid\": \"54afc7ed9c53491481919c931f75bae1\",\n \"subscription_id\": \"5421b5df80744113a6b57014f01a3a42\",\n \"status\": \"failure\",\n \"status_reason\": {\n \"type\": \"timeout\",\n \"description\": \"Check timed out\"\n },\n \"trace_id\": \"947efba02dac463b9c1d886a44bafc94\",\n \"span_id\": \"58e84098e63f42e1\",\n \"scheduled_check_time_ms\": 1717614062978,\n \"actual_check_time_ms\": 1717614068008,\n \"duration_ms\": 100,\n \"request_info\": {\n \"request_type\": \"HEAD\",\n \"http_status_code\": null\n },\n \"organization_id\": 123,\n \"project_id\": 456,\n \"retention_days\": 90,\n \"region_slug\": \"us-east-1\"\n}\n"
expression: snapshot_payload
---
[
{
"check_status": "failure",
"check_status_reason": "timeout",
"duration": 100,
"environment": null,
"http_status_code": null,
"offset": 1,
"organization_id": 123,
"partition": 0,
"project_id": 456,
"region_slug": "us-east-1",
"retention_days": 90,
"scheduled_check_time": 1717614062978.0,
"timestamp": 1717614068008.0,
"trace_id": "947efba0-2dac-463b-9c1d-886a44bafc94",
"uptime_check_id": "54afc7ed-9c53-4914-8191-9c931f75bae1",
"uptime_subscription_id": "5421b5df-8074-4113-a6b5-7014f01a3a42"
}
]
158 changes: 158 additions & 0 deletions rust_snuba/src/processors/uptime_monitor_checks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use crate::config::ProcessorConfig;
use anyhow::Context;
use chrono::DateTime;
use sentry_arroyo::backends::kafka::types::KafkaPayload;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

use crate::types::{InsertBatch, KafkaMessageMetadata};

pub fn process_message(
payload: KafkaPayload,
metadata: KafkaMessageMetadata,
_config: &ProcessorConfig,
) -> anyhow::Result<InsertBatch> {
let payload_bytes = payload.payload().context("Expected payload")?;
let (rows, origin_timestamp) =
deserialize_message(payload_bytes, metadata.partition, metadata.offset)?;

InsertBatch::from_rows(rows, DateTime::from_timestamp(origin_timestamp as i64, 0))
}

pub fn deserialize_message(
payload: &[u8],
partition: u16,
offset: u64,
) -> anyhow::Result<(Vec<UptimeMonitorCheckRow>, f64)> {
let monitor_message: UptimeMonitorCheckMessage = serde_json::from_slice(payload)?;

let rows = vec![UptimeMonitorCheckRow {
organization_id: monitor_message.organization_id,
project_id: monitor_message.project_id,
environment: monitor_message.environment,
uptime_subscription_id: monitor_message.subscription_id,
uptime_check_id: monitor_message.guid,
scheduled_check_time: monitor_message.scheduled_check_time_ms,
timestamp: monitor_message.actual_check_time_ms,
duration: monitor_message.duration_ms,
region_slug: monitor_message.region_slug.unwrap_or_default(),
check_status: monitor_message.status,
check_status_reason: monitor_message.status_reason.map(|r| r.ty),
http_status_code: monitor_message
.request_info
.unwrap_or_default()
.http_status_code,
trace_id: monitor_message.trace_id,
retention_days: monitor_message.retention_days,
partition,
offset,
}];

Ok((rows, monitor_message.actual_check_time_ms))
}

#[derive(Debug, Deserialize)]
struct UptimeMonitorCheckMessage<'a> {
// TODO: add these to the message
organization_id: u64,
project_id: u64,
retention_days: u16,
region_slug: Option<&'a str>,
environment: Option<&'a str>,
subscription_id: Uuid,
guid: Uuid,
scheduled_check_time_ms: f64,
actual_check_time_ms: f64,
duration_ms: u64,
status: &'a str,
status_reason: Option<CheckStatusReason<'a>>,
trace_id: Uuid,
request_info: Option<RequestInfo>,
}
#[derive(Debug, Deserialize, Default)]
pub struct RequestInfo {
pub http_status_code: Option<u16>,
}
#[derive(Debug, PartialEq, Deserialize, Serialize)]
#[serde(rename_all = "snake_case")]
pub struct CheckStatusReason<'a> {
/// The type of the status reason
#[serde(rename = "type")]
pub ty: &'a str,

/// A human readable description of the status reason
pub description: String,
}

#[derive(Debug, Default, Serialize)]
pub struct UptimeMonitorCheckRow<'a> {
organization_id: u64,
project_id: u64,
environment: Option<&'a str>,
uptime_subscription_id: Uuid,
uptime_check_id: Uuid,
scheduled_check_time: f64,
timestamp: f64,
duration: u64,
region_slug: &'a str,
check_status: &'a str,
check_status_reason: Option<&'a str>,
http_status_code: Option<u16>,
trace_id: Uuid,
retention_days: u16,
partition: u16,
offset: u64,
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_parse_monitor_checkin() {
let data = r#"{
"organization_id": 1,
"project_id": 1,
"retention_days": 30,
"region_slug": "global",
"environment": "prod",
"subscription_id": "123e4567-e89b-12d3-a456-426614174000",
"guid": "550e8400-e29b-41d4-a716-446655440000",
"scheduled_check_time_ms": 1702659277,
"actual_check_time_ms": 1702659277,
"duration_ms": 100,
"status": "ok",
"status_reason": {
"type": "Request successful",
"description": "Request successful"
},
"http_status_code": 200,
"trace_id": "550e8400-e29b-41d4-a716-446655440000",
"request_info": {
"request_type": "GET",
"http_status_code": 200
}
}"#;

let (rows, timestamp) = deserialize_message(data.as_bytes(), 0, 0).unwrap();
let monitor_row = rows.first().unwrap();

assert_eq!(monitor_row.organization_id, 1);
assert_eq!(monitor_row.project_id, 1);
assert_eq!(monitor_row.environment, Some("prod"));
assert_eq!(
monitor_row.uptime_subscription_id,
Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000").unwrap()
);
assert_eq!(monitor_row.duration, 100);
assert_eq!(monitor_row.timestamp, 1702659277.0);
assert_eq!(monitor_row.region_slug, "global".to_string());
assert_eq!(monitor_row.check_status, "ok");
assert_eq!(monitor_row.check_status_reason, Some("Request successful"));
assert_eq!(monitor_row.http_status_code, Some(200));
assert_eq!(monitor_row.retention_days, 30);
assert_eq!(monitor_row.partition, 0);
assert_eq!(monitor_row.offset, 0);
assert_eq!(timestamp, 1702659277.0);
}
}
67 changes: 67 additions & 0 deletions scripts/load_uptime_checks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/usr/bin/env python3

import datetime
import json
import random
import uuid

import requests

# Generate and insert data for uptime checks for each project
base_time = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)

query = """
INSERT INTO default.uptime_monitor_checks_local (
organization_id, project_id, environment, uptime_subscription_id, uptime_check_id,
scheduled_check_time, timestamp, duration, region_id, check_status,
check_status_reason, http_status_code, trace_id, retention_days
) FORMAT JSONEachRow
"""

total_records = 0

for project_id in range(1, 2):
project_data = []
for minute in range(24 * 60 * 90): # 24 hours * 60 minutes * 90 days
timestamp = base_time + datetime.timedelta(minutes=minute)
scheduled_time = timestamp - datetime.timedelta(seconds=random.randint(1, 30))
http_status = (
500 if minute % 100 == 0 else 200
) # Every 100th record gets status 500
check_status = "failure" if http_status == 500 else "success"
project_data.append(
{
"organization_id": 1,
"project_id": project_id,
"environment": "production",
"uptime_subscription_id": random.randint(1, 3) * project_id,
"uptime_check_id": str(uuid.uuid4()),
"scheduled_check_time": scheduled_time.strftime("%Y-%m-%d %H:%M:%S"),
"timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"),
"duration": random.randint(1, 1000),
"region_id": random.randint(1, 3),
"check_status": check_status,
"check_status_reason": "Timeout error"
if check_status == "failure"
else None,
"http_status_code": http_status,
"trace_id": str(uuid.uuid4()),
"retention_days": 30,
}
)

response = requests.post(
"http://localhost:8123",
params={"query": query},
data="\n".join(json.dumps(row) for row in project_data),
)

if response.status_code == 200:
total_records += len(project_data)
print(
f"Successfully inserted {len(project_data)} records for project {project_id}"
)
else:
print(f"Error inserting data for project {project_id}: {response.text}")

print(f"Total records inserted: {total_records}")
2 changes: 1 addition & 1 deletion snuba/admin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1372,7 +1372,7 @@ def execute_job(job_id: str) -> Response:
job_status = None
try:
job_status = run_job(job_specs[job_id])
except BaseException as e:
except Exception as e:
return make_response(
jsonify(
{
Expand Down
Loading

0 comments on commit be3d961

Please sign in to comment.