diff --git a/tests/manual_jobs/test_scrub_ips_from_eap_spans.py b/tests/manual_jobs/test_scrub_ips_from_eap_spans.py index 0d7df75c18..a3914b8961 100644 --- a/tests/manual_jobs/test_scrub_ips_from_eap_spans.py +++ b/tests/manual_jobs/test_scrub_ips_from_eap_spans.py @@ -120,6 +120,7 @@ def test_generate_query() -> None: def _gen_message( dt: datetime, + organization_id: int, measurements: dict[str, dict[str, float]] | None = None, tags: dict[str, str] | None = None, ) -> Mapping[str, Any]: @@ -151,7 +152,7 @@ def _gen_message( "eap.measurement": {"value": random.choice([1, 100, 1000])}, **measurements, }, - "organization_id": 1, + "organization_id": organization_id, "origin": "auto.http.django", "project_id": 1, "received": 1721319572.877828, @@ -199,13 +200,13 @@ def _gen_message( def _generate_request( - ts: Any, hour_ago: int, project_ids: list[int] + ts: Any, hour_ago: int, organization_id: int, project_ids: list[int] ) -> TraceItemTableRequest: # project_ids is added as an argument to avoid this query getting cached return TraceItemTableRequest( meta=RequestMeta( project_ids=project_ids, - organization_id=1, + organization_id=organization_id, cogs_category="something", referrer="something", start_timestamp=Timestamp(seconds=hour_ago), @@ -230,35 +231,45 @@ def _generate_request( ) +def _generate_expected_response(ip: str) -> TraceItemTableResponse: + return TraceItemTableResponse( + column_values=[ + TraceItemColumnValues( + attribute_name="user.ip", + results=[AttributeValue(val_str=ip) for _ in range(20)], + ) + ], + page_token=PageToken(offset=20), + meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), + ) + + @pytest.mark.clickhouse_db @pytest.mark.redis_db def test_span_is_scrubbed() -> None: BASE_TIME = datetime.utcnow().replace( minute=0, second=0, microsecond=0 ) - timedelta(minutes=180) + organization_ids = [0, 1] spans_storage = get_storage(StorageKey("eap_spans")) - messages = [_gen_message(BASE_TIME - timedelta(minutes=i)) for i in range(60)] + messages = [ + _gen_message(BASE_TIME - timedelta(minutes=i), organization_id) + for organization_id in organization_ids + for i in range(20) + ] write_raw_unprocessed_events(spans_storage, messages) # type: ignore + # we inserted spans for organizations 0, 1, 2, and we make sure they look as expected ts = Timestamp(seconds=int(BASE_TIME.timestamp())) hour_ago = int((BASE_TIME - timedelta(hours=1)).timestamp()) - response = EndpointTraceItemTable().execute( - _generate_request(ts, hour_ago, [1, 2, 3]) - ) - expected_response = TraceItemTableResponse( - column_values=[ - TraceItemColumnValues( - attribute_name="user.ip", - results=[AttributeValue(val_str=_USER_IP) for _ in range(60)], - ) - ], - page_token=PageToken(offset=60), - meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), - ) - assert response == expected_response + for organization_id in organization_ids: - # next we make sure the written spans are SCRUBBED + response = EndpointTraceItemTable().execute( + _generate_request(ts, hour_ago, organization_id, [1, 2, 3]) + ) + assert response == _generate_expected_response(_USER_IP) + # next we scrub organizations 0 start_datetime = datetime.utcfromtimestamp(Timestamp(seconds=hour_ago).seconds) end_datetime = datetime.utcfromtimestamp(ts.seconds) @@ -268,7 +279,7 @@ def test_span_is_scrubbed() -> None: "ScrubIpFromEAPSpans", False, { - "organization_ids": [1], + "organization_ids": [organization_ids[0]], "start_datetime": start_datetime.strftime("%Y-%m-%d %H:%M:%S"), "end_datetime": end_datetime.strftime("%Y-%m-%d %H:%M:%S"), }, @@ -276,15 +287,12 @@ def test_span_is_scrubbed() -> None: ) response = EndpointTraceItemTable().execute( - _generate_request(ts, hour_ago, [3, 2, 1]) + _generate_request(ts, hour_ago, organization_ids[0], [3, 2, 1]) ) - assert response == TraceItemTableResponse( - column_values=[ - TraceItemColumnValues( - attribute_name="user.ip", - results=[AttributeValue(val_str="scrubbed") for _ in range(60)], - ) - ], - page_token=PageToken(offset=60), - meta=ResponseMeta(request_id="be3123b3-2e5d-4eb9-bb48-f38eaa9e8480"), + assert response == _generate_expected_response("scrubbed") + + # then we make sure organization 1 is NOT SCRUBBED + response = EndpointTraceItemTable().execute( + _generate_request(ts, hour_ago, organization_ids[1], [3, 2, 1]) ) + assert response == _generate_expected_response(_USER_IP)