Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(inc-984): Cleanup EAP spans #6676

Merged
merged 8 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions snuba/manual_jobs/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ def run_job(job_spec: JobSpec) -> JobStatus:
if not job_spec.is_async:
current_job_status = _set_job_status(job_spec.job_id, JobStatus.FINISHED)
job_logger.info("[runner] job execution finished")
except BaseException:
except BaseException as e:
current_job_status = _set_job_status(job_spec.job_id, JobStatus.FAILED)
job_logger.error("[runner] job execution failed")
job_logger.error(f"[runner] job execution failed {e}")
job_logger.info(f"[runner] exception {traceback.format_exc()}")
finally:
_release_job_lock(job_spec.job_id)
Expand Down
49 changes: 49 additions & 0 deletions snuba/manual_jobs/scrub_ips_from_eap_spans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from datetime import datetime
from typing import Any, Mapping, Optional

from snuba.clusters.cluster import ClickhouseClientSettings, get_cluster
from snuba.clusters.storage_sets import StorageSetKey
from snuba.manual_jobs import Job, JobLogger, JobSpec


class ScrubIpFromEAPSpans(Job):
def __init__(self, job_spec: JobSpec) -> None:
self.__validate_job_params(job_spec.params)
super().__init__(job_spec)

def __validate_job_params(self, params: Optional[Mapping[Any, Any]]) -> None:
assert params
assert isinstance(params["organization_ids"], list)
assert all([isinstance(p, int) for p in params["organization_ids"]])
self._organization_ids = params["organization_ids"]
self._start_datetime = datetime.fromisoformat(params["start_datetime"])
self._end_datetime = datetime.fromisoformat(params["end_datetime"])

def _get_query(self, cluster_name: str | None) -> str:
organization_ids = ",".join([str(p) for p in self._organization_ids])
start_datetime = self._start_datetime.isoformat()
end_datetime = self._end_datetime.isoformat()
on_cluster = f"ON CLUSTER '{cluster_name}'" if cluster_name else ""
return f"""ALTER TABLE eap_spans_2_local
{on_cluster}
UPDATE `attr_str_1` = mapApply((k, v) -> (k, if(k = 'user.ip', 'scrubbed', v)), `attr_str_1`)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just added the col name attr_str_1 in the end because https://clickhouse.com/docs/en/sql-reference/functions/tuple-map-functions#mapapply

WHERE organization_id IN [{organization_ids}]
AND _sort_timestamp >= toDateTime('{start_datetime}')
AND _sort_timestamp < toDateTime('{end_datetime}')"""

def execute(self, logger: JobLogger) -> None:
cluster = get_cluster(StorageSetKey.EVENTS_ANALYTICS_PLATFORM)
storage_node = cluster.get_local_nodes()[0]
connection = cluster.get_node_connection(
ClickhouseClientSettings.CLEANUP, storage_node
)
if not cluster.is_single_node():
cluster_name = cluster.get_clickhouse_cluster_name()
else:
cluster_name = None
query = self._get_query(cluster_name)
logger.info("Executing query: {query}")
result = connection.execute(query=query, settings={"mutations_sync": 0})

logger.info("complete")
logger.info(repr(result))
1 change: 0 additions & 1 deletion snuba/web/db_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ def execute_query_with_readthrough_caching(
clickhouse_query_settings["query_id"] = query_id
if span:
span.set_data("query_id", query_id)

xurui-c marked this conversation as resolved.
Show resolved Hide resolved
return execute_query(
clickhouse_query,
query_settings,
Expand Down
290 changes: 290 additions & 0 deletions tests/manual_jobs/test_scrub_ips_from_eap_spans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
import random
import uuid
from datetime import datetime, timedelta
from typing import Any, Mapping

import pytest
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import (
Column,
TraceItemColumnValues,
TraceItemTableRequest,
TraceItemTableResponse,
)
from sentry_protos.snuba.v1.request_common_pb2 import (
PageToken,
RequestMeta,
ResponseMeta,
)
from sentry_protos.snuba.v1.trace_item_attribute_pb2 import AttributeKey, AttributeValue
from sentry_protos.snuba.v1.trace_item_filter_pb2 import ExistsFilter, TraceItemFilter

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.manual_jobs import JobSpec
from snuba.manual_jobs.job_status import JobStatus
from snuba.manual_jobs.runner import get_job_status, run_job
from snuba.manual_jobs.scrub_ips_from_eap_spans import ScrubIpFromEAPSpans
from snuba.web.rpc.v1.endpoint_trace_item_table import EndpointTraceItemTable
from tests.helpers import write_raw_unprocessed_events

_RELEASE_TAG = "[email protected]+c45b49caed1e5fcbf70097ab3f434b487c359b6b"
_USER_IP = "192.168.0.45"


@pytest.mark.redis_db
@pytest.mark.clickhouse_db
def test_basic() -> None:
job_id = "abc"
run_job(
JobSpec(
job_id,
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1, 3, 5, 6],
"start_datetime": "2024-12-01 00:00:00",
"end_datetime": "2024-12-10 00:00:00",
},
)
)

assert get_job_status(job_id) == JobStatus.FINISHED


@pytest.mark.parametrize(
("jobspec"),
[
JobSpec(
"abc",
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1, "b"],
"start_datetime": "2024-12-01 00:00:00",
"end_datetime": "2024-12-10 00:00:00",
},
),
JobSpec(
"abc",
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1, 2],
"start_datetime": "2024-12-01 00:00:0",
"end_datetime": "2024-12-10 00:00:00",
},
),
JobSpec(
"abc",
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1, 2],
"start_datetime": "2024-12-01 00:00:00",
"end_datetime": "2024-12-10 00:00:0",
},
),
],
)
@pytest.mark.redis_db
def test_fail_validation(jobspec: JobSpec) -> None:
with pytest.raises(Exception):
run_job(jobspec)


@pytest.mark.redis_db
def test_generate_query() -> None:
job = ScrubIpFromEAPSpans(
JobSpec(
"bassa",
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1, 3, 5, 6],
"start_datetime": "2024-12-01 00:00:00",
"end_datetime": "2024-12-10 00:00:00",
},
)
)
assert (
job._get_query(None)
== """ALTER TABLE eap_spans_2_local

UPDATE `attr_str_1` = mapApply((k, v) -> (k, if(k = 'user.ip', 'scrubbed', v)), `attr_str_1`)
WHERE organization_id IN [1,3,5,6]
AND _sort_timestamp >= toDateTime('2024-12-01T00:00:00')
AND _sort_timestamp < toDateTime('2024-12-10T00:00:00')"""
)


def _gen_message(
dt: datetime,
measurements: dict[str, dict[str, float]] | None = None,
tags: dict[str, str] | None = None,
) -> Mapping[str, Any]:
measurements = measurements or {}
tags = tags or {}
return {
"description": "/api/0/relays/projectconfigs/",
"duration_ms": 152,
"event_id": "d826225de75d42d6b2f01b957d51f18f",
"exclusive_time_ms": 0.228,
"is_segment": True,
"data": {
"sentry.environment": "development",
"sentry.release": _RELEASE_TAG,
"thread.name": "uWSGIWorker1Core0",
"thread.id": "8522009600",
"sentry.segment.name": "/api/0/relays/projectconfigs/",
"sentry.sdk.name": "sentry.python.django",
"sentry.sdk.version": "2.7.0",
"my.float.field": 101.2,
"my.int.field": 2000,
"my.neg.field": -100,
"my.neg.float.field": -101.2,
"my.true.bool.field": True,
"my.false.bool.field": False,
},
"measurements": {
"num_of_spans": {"value": 50.0},
"eap.measurement": {"value": random.choice([1, 100, 1000])},
**measurements,
},
"organization_id": 1,
"origin": "auto.http.django",
"project_id": 1,
xurui-c marked this conversation as resolved.
Show resolved Hide resolved
"received": 1721319572.877828,
"retention_days": 90,
"segment_id": "8873a98879faf06d",
"sentry_tags": {
"category": "http",
"environment": "development",
"op": "http.server",
"platform": "python",
"release": _RELEASE_TAG,
"sdk.name": "sentry.python.django",
"sdk.version": "2.7.0",
"status": "ok",
"status_code": "200",
"thread.id": "8522009600",
"thread.name": "uWSGIWorker1Core0",
"trace.status": "ok",
"transaction": "/api/0/relays/projectconfigs/",
"transaction.method": "POST",
"transaction.op": "http.server",
"user": "ip:127.0.0.1",
},
"span_id": "123456781234567D",
"tags": {
"http.status_code": "200",
"relay_endpoint_version": "3",
"relay_id": "88888888-4444-4444-8444-cccccccccccc",
"relay_no_cache": "False",
"relay_protocol_version": "3",
"relay_use_post_or_schedule": "True",
"relay_use_post_or_schedule_rejected": "version",
"user.ip": _USER_IP,
"spans_over_limit": "False",
"server_name": "blah",
"color": random.choice(["red", "green", "blue"]),
"location": random.choice(["mobile", "frontend", "backend"]),
**tags,
},
"trace_id": uuid.uuid4().hex,
"start_timestamp_ms": int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200)),
"start_timestamp_precise": dt.timestamp(),
"end_timestamp_precise": dt.timestamp() + 1,
}


def _generate_request(
ts: Any, hour_ago: int, project_ids: list[int]
) -> TraceItemTableRequest:
# project_ids is added as an argument to avoid this query getting cached
return TraceItemTableRequest(
meta=RequestMeta(
project_ids=project_ids,
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=hour_ago),
end_timestamp=ts,
request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480",
),
filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="color")
)
),
columns=[
Column(key=AttributeKey(type=AttributeKey.TYPE_STRING, name="user.ip"))
],
order_by=[
TraceItemTableRequest.OrderBy(
column=Column(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="user.ip")
)
)
],
)


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
def test_span_is_scrubbed() -> None:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the extra test i added

BASE_TIME = datetime.utcnow().replace(
minute=0, second=0, microsecond=0
) - timedelta(minutes=180)
spans_storage = get_storage(StorageKey("eap_spans"))
messages = [_gen_message(BASE_TIME - timedelta(minutes=i)) for i in range(60)]
write_raw_unprocessed_events(spans_storage, messages) # type: ignore

ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
response = EndpointTraceItemTable().execute(
_generate_request(ts, hour_ago, [1, 2, 3])
)
expected_response = TraceItemTableResponse(
column_values=[
TraceItemColumnValues(
attribute_name="user.ip",
results=[AttributeValue(val_str=_USER_IP) for _ in range(60)],
)
],
page_token=PageToken(offset=60),
meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"),
)
assert response == expected_response

# next we make sure the written spans are SCRUBBED

start_datetime = datetime.utcfromtimestamp(Timestamp(seconds=hour_ago).seconds)
end_datetime = datetime.utcfromtimestamp(ts.seconds)

run_job(
JobSpec(
"plswork",
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1],
"start_datetime": start_datetime.strftime("%Y-%m-%d %H:%M:%S"),
"end_datetime": end_datetime.strftime("%Y-%m-%d %H:%M:%S"),
},
)
)

response = EndpointTraceItemTable().execute(
_generate_request(ts, hour_ago, [3, 2, 1])
)
assert response == TraceItemTableResponse(
column_values=[
TraceItemColumnValues(
attribute_name="user.ip",
results=[AttributeValue(val_str="scrubbed") for _ in range(60)],
)
],
page_token=PageToken(offset=60),
meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"),
)
Loading