Skip to content

Commit

Permalink
SCRUB
Browse files Browse the repository at this point in the history
  • Loading branch information
Rachel Chen authored and Rachel Chen committed Dec 14, 2024
1 parent 2292603 commit b44316a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 19 deletions.
7 changes: 7 additions & 0 deletions snuba/manual_jobs/scrub_ips_from_eap_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,12 @@ def execute(self, logger: JobLogger) -> None:
print("queryyyyy", query)
logger.info("Executing query: {query}")
result = connection.execute(query=query, settings={"mutations_sync": 0})
print(
"selectresult",
connection.execute(
query="SELECT `attr_str_1` from eap_spans_2_local LIMIT 1",
settings={"mutations_sync": 0},
),
)
logger.info("complete")
logger.info(repr(result))
41 changes: 22 additions & 19 deletions tests/manual_jobs/test_scrub_ips_from_eap_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from tests.helpers import write_raw_unprocessed_events

_RELEASE_TAG = "[email protected]+c45b49caed1e5fcbf70097ab3f434b487c359b6b"
_SERVER_NAME = "D23CXQ4GK2.local"
_USER_IP = "192.168.0.45"


@pytest.mark.redis_db
Expand Down Expand Up @@ -125,13 +125,6 @@ def _gen_message(
) -> Mapping[str, Any]:
measurements = measurements or {}
tags = tags or {}
start_timestamp_ms = int(dt.timestamp()) * 1000 - int(random.gauss(1000, 200))
start_timestamp_ms_str = datetime.utcfromtimestamp(start_timestamp_ms / 1000)
print(
"start_timestamp_ms inserted into eap",
start_timestamp_ms,
start_timestamp_ms_str,
)
return {
"description": "/api/0/relays/projectconfigs/",
"duration_ms": 152,
Expand Down Expand Up @@ -191,8 +184,9 @@ def _gen_message(
"relay_protocol_version": "3",
"relay_use_post_or_schedule": "True",
"relay_use_post_or_schedule_rejected": "version",
"server_name": _SERVER_NAME,
"user.ip": _USER_IP,
"spans_over_limit": "False",
"server_name": "blah",
"color": random.choice(["red", "green", "blue"]),
"location": random.choice(["mobile", "frontend", "backend"]),
**tags,
Expand Down Expand Up @@ -221,12 +215,12 @@ def _generate_request(ts: Any, hour_ago: int) -> TraceItemTableRequest:
)
),
columns=[
Column(key=AttributeKey(type=AttributeKey.TYPE_STRING, name="server_name"))
Column(key=AttributeKey(type=AttributeKey.TYPE_STRING, name="user.ip"))
],
order_by=[
TraceItemTableRequest.OrderBy(
column=Column(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="server_name")
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="user.ip")
)
)
],
Expand All @@ -240,40 +234,49 @@ def test_span_is_scrubbed() -> None:
minute=0, second=0, microsecond=0
) - timedelta(minutes=180)
spans_storage = get_storage(StorageKey("eap_spans"))
messages = [_gen_message(BASE_TIME - timedelta(minutes=i)) for i in range(120)]
write_raw_unprocessed_events(spans_storage, messages)
messages = [_gen_message(BASE_TIME - timedelta(minutes=i)) for i in range(60)]
write_raw_unprocessed_events(spans_storage, messages) # type: ignore

ts = Timestamp(seconds=int(BASE_TIME.timestamp()))
hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp())
message = _generate_request(ts, hour_ago)
response = EndpointTraceItemTable().execute(message)

expected_response = TraceItemTableResponse(
column_values=[
TraceItemColumnValues(
attribute_name="server_name",
results=[AttributeValue(val_str=_SERVER_NAME) for _ in range(60)],
attribute_name="user.ip",
results=[AttributeValue(val_str=_USER_IP) for _ in range(60)],
)
],
page_token=PageToken(offset=60),
meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"),
)
assert response == expected_response

start_datetime = datetime.utcfromtimestamp(Timestamp(seconds=hour_ago).seconds)
end_datetime = datetime.utcfromtimestamp(ts.seconds)

run_job(
JobSpec(
"plswork",
"ScrubIpFromEAPSpans",
False,
{
"organization_ids": [1],
"start_datetime": str(BASE_TIME - timedelta(hours=1)),
"end_datetime": str(BASE_TIME),
"start_datetime": start_datetime.strftime("%Y-%m-%d %H:%M:%S"),
"end_datetime": end_datetime.strftime("%Y-%m-%d %H:%M:%S"),
},
)
)

print(response)
assert response == TraceItemTableResponse(
page_token=PageToken(offset=0),
column_values=[
TraceItemColumnValues(
attribute_name="user.ip",
results=[AttributeValue(val_str="scrubbed") for _ in range(60)],
)
],
page_token=PageToken(offset=60),
meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"),
)

0 comments on commit b44316a

Please sign in to comment.