Skip to content

Commit

Permalink
fix: correctly transform query job timeout configuration and exceptio…
Browse files Browse the repository at this point in the history
…ns (#492)

* fix: correctly transform query job timeout configuration and exceptions

* unit tests for configuration transformations

* add unit tests for timeout

* link todo to relevant issue

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* use correct template for freezegun deps

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* decrease tick time in case multiple time calls are made

* try no tick

* add freezegun to conda deps

* typo

Co-authored-by: Bu Sun Kim <[email protected]>

* typo

Co-authored-by: Anthonios Partheniou <[email protected]>

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Bu Sun Kim <[email protected]>
Co-authored-by: Anthonios Partheniou <[email protected]>
  • Loading branch information
4 people authored Mar 8, 2022
1 parent 13010dd commit d8c3900
Show file tree
Hide file tree
Showing 7 changed files with 141 additions and 36 deletions.
1 change: 1 addition & 0 deletions ci/requirements-3.7-0.24.2.conda
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ coverage
db-dtypes==0.3.1
fastavro
flake8
freezegun
numpy==1.16.6
google-cloud-bigquery==1.27.2
google-cloud-bigquery-storage==1.1.0
Expand Down
1 change: 1 addition & 0 deletions ci/requirements-3.9-1.3.4.conda
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ coverage
db-dtypes
fastavro
flake8
freezegun
google-cloud-bigquery
google-cloud-bigquery-storage
numpy
Expand Down
1 change: 1 addition & 0 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ def default(session):
"-c",
constraints_path,
)
session.install("freezegun", "-c", constraints_path)

if session.python == "3.9":
extras = ""
Expand Down
1 change: 1 addition & 0 deletions owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
unit_test_python_versions=["3.7", "3.8", "3.9", "3.10"],
system_test_python_versions=["3.7", "3.8", "3.9", "3.10"],
cov_level=96,
unit_test_external_dependencies=["freezegun"],
unit_test_extras=extras,
unit_test_extras_by_python=extras_by_python,
system_test_extras=extras,
Expand Down
91 changes: 68 additions & 23 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

import copy
import concurrent.futures
from datetime import datetime
import logging
import re
Expand Down Expand Up @@ -378,6 +380,9 @@ def process_http_error(ex):
# See `BigQuery Troubleshooting Errors
# <https://cloud.google.com/bigquery/troubleshooting-errors>`__

if "cancelled" in ex.message:
raise QueryTimeout("Reason: {0}".format(ex))

raise GenericGBQException("Reason: {0}".format(ex))

def download_table(
Expand Down Expand Up @@ -406,8 +411,41 @@ def download_table(
user_dtypes=dtypes,
)

def _wait_for_query_job(self, query_reply, timeout_ms):
"""Wait for query to complete, pausing occasionally to update progress.
Args:
query_reply (QueryJob):
A query job which has started.
timeout_ms (Optional[int]):
How long to wait before cancelling the query.
"""
# Wait at most 10 seconds so we can show progress.
# TODO(https://github.com/googleapis/python-bigquery-pandas/issues/327):
# Include a tqdm progress bar here instead of a stream of log messages.
timeout_sec = 10.0
if timeout_ms:
timeout_sec = min(timeout_sec, timeout_ms / 1000.0)

while query_reply.state != "DONE":
self.log_elapsed_seconds(" Elapsed", "s. Waiting...")

if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000:
self.client.cancel_job(
query_reply.job_id, location=query_reply.location
)
raise QueryTimeout("Query timeout: {} ms".format(timeout_ms))

try:
query_reply.result(timeout=timeout_sec)
except concurrent.futures.TimeoutError:
# Use our own timeout logic
pass
except self.http_error as ex:
self.process_http_error(ex)

def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
from concurrent.futures import TimeoutError
from google.auth.exceptions import RefreshError
from google.cloud import bigquery
import pandas
Expand Down Expand Up @@ -449,28 +487,11 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
job_id = query_reply.job_id
logger.debug("Job ID: %s" % job_id)

while query_reply.state != "DONE":
self.log_elapsed_seconds(" Elapsed", "s. Waiting...")

timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get(
"timeoutMs"
)
timeout_ms = int(timeout_ms) if timeout_ms else None
if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000:
raise QueryTimeout("Query timeout: {} ms".format(timeout_ms))

timeout_sec = 1.0
if timeout_ms:
# Wait at most 1 second so we can show progress bar
timeout_sec = min(1.0, timeout_ms / 1000.0)

try:
query_reply.result(timeout=timeout_sec)
except TimeoutError:
# Use our own timeout logic
pass
except self.http_error as ex:
self.process_http_error(ex)
timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get(
"timeoutMs"
)
timeout_ms = int(timeout_ms) if timeout_ms else None
self._wait_for_query_job(query_reply, timeout_ms)

if query_reply.cache_hit:
logger.debug("Query done.\nCache hit.\n")
Expand Down Expand Up @@ -673,6 +694,28 @@ def _finalize_dtypes(
return df


def _transform_read_gbq_configuration(configuration):
"""
For backwards-compatibility, convert any previously client-side only
parameters such as timeoutMs to the property name expected by the REST API.
Makes a copy of configuration if changes are needed.
"""

if configuration is None:
return None

timeout_ms = configuration.get("query", {}).get("timeoutMs")
if timeout_ms is not None:
# Transform timeoutMs to an actual server-side configuration.
# https://github.com/googleapis/python-bigquery-pandas/issues/479
configuration = copy.deepcopy(configuration)
del configuration["query"]["timeoutMs"]
configuration["jobTimeoutMs"] = timeout_ms

return configuration


def read_gbq(
query_or_table,
project_id=None,
Expand Down Expand Up @@ -847,6 +890,8 @@ def read_gbq(
if dialect not in ("legacy", "standard"):
raise ValueError("'{0}' is not valid for dialect".format(dialect))

configuration = _transform_read_gbq_configuration(configuration)

if configuration and "query" in configuration and "query" in configuration["query"]:
if query_or_table is not None:
raise ValueError(
Expand Down
17 changes: 4 additions & 13 deletions tests/system/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,22 +473,13 @@ def test_configuration_raises_value_error_with_multiple_config(self, project_id)

def test_timeout_configuration(self, project_id):
sql_statement = """
SELECT
SUM(bottles_sold) total_bottles,
UPPER(category_name) category_name,
magnitude,
liquor.zip_code zip_code
FROM `bigquery-public-data.iowa_liquor_sales.sales` liquor
JOIN `bigquery-public-data.geo_us_boundaries.zip_codes` zip_codes
ON liquor.zip_code = zip_codes.zip_code
JOIN `bigquery-public-data.noaa_historic_severe_storms.tornado_paths` tornados
ON liquor.date = tornados.storm_date
WHERE ST_INTERSECTS(tornado_path_geom, zip_code_geom)
GROUP BY category_name, magnitude, zip_code
ORDER BY magnitude ASC, total_bottles DESC
select count(*) from unnest(generate_array(1,1000000)), unnest(generate_array(1, 10000))
"""
configs = [
# pandas-gbq timeout configuration. Transformed to REST API compatible version.
{"query": {"useQueryCache": False, "timeoutMs": 1}},
# REST API job timeout. See:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.job_timeout_ms
{"query": {"useQueryCache": False}, "jobTimeoutMs": 1},
]
for config in configs:
Expand Down
65 changes: 65 additions & 0 deletions tests/unit/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

# -*- coding: utf-8 -*-

import concurrent.futures
import copy
import datetime
from unittest import mock

import freezegun
import google.api_core.exceptions
import numpy
import pandas
Expand Down Expand Up @@ -114,6 +116,61 @@ def test__is_query(query_or_table, expected):
assert result == expected


@pytest.mark.parametrize(
["original", "expected"],
[
(None, None),
({}, {}),
({"query": {"useQueryCache": False}}, {"query": {"useQueryCache": False}}),
({"jobTimeoutMs": "1234"}, {"jobTimeoutMs": "1234"}),
({"query": {"timeoutMs": "1234"}}, {"query": {}, "jobTimeoutMs": "1234"}),
],
)
def test__transform_read_gbq_configuration_makes_copy(original, expected):
should_change = original == expected
got = gbq._transform_read_gbq_configuration(original)
assert got == expected
# Catch if we accidentally modified the original.
did_change = original == got
assert did_change == should_change


def test__wait_for_query_job_exits_when_done(mock_bigquery_client):
connector = _make_connector()
connector.client = mock_bigquery_client
connector.start = datetime.datetime(2020, 1, 1).timestamp()

mock_query = mock.create_autospec(google.cloud.bigquery.QueryJob)
type(mock_query).state = mock.PropertyMock(side_effect=("RUNNING", "DONE"))
mock_query.result.side_effect = concurrent.futures.TimeoutError("fake timeout")

with freezegun.freeze_time("2020-01-01 00:00:00", tick=False):
connector._wait_for_query_job(mock_query, 60)

mock_bigquery_client.cancel_job.assert_not_called()


def test__wait_for_query_job_cancels_after_timeout(mock_bigquery_client):
connector = _make_connector()
connector.client = mock_bigquery_client
connector.start = datetime.datetime(2020, 1, 1).timestamp()

mock_query = mock.create_autospec(google.cloud.bigquery.QueryJob)
mock_query.job_id = "a-random-id"
mock_query.location = "job-location"
mock_query.state = "RUNNING"
mock_query.result.side_effect = concurrent.futures.TimeoutError("fake timeout")

with freezegun.freeze_time(
"2020-01-01 00:00:00", auto_tick_seconds=15
), pytest.raises(gbq.QueryTimeout):
connector._wait_for_query_job(mock_query, 60)

mock_bigquery_client.cancel_job.assert_called_with(
"a-random-id", location="job-location"
)


def test_GbqConnector_get_client_w_new_bq(mock_bigquery_client):
gbq._test_google_api_imports()
pytest.importorskip("google.api_core.client_info")
Expand All @@ -125,6 +182,14 @@ def test_GbqConnector_get_client_w_new_bq(mock_bigquery_client):
assert kwargs["client_info"].user_agent == "pandas-{}".format(pandas.__version__)


def test_GbqConnector_process_http_error_transforms_timeout():
original = google.api_core.exceptions.GoogleAPICallError(
"Job execution was cancelled: Job timed out after 0s"
)
with pytest.raises(gbq.QueryTimeout):
gbq.GbqConnector.process_http_error(original)


def test_to_gbq_should_fail_if_invalid_table_name_passed():
with pytest.raises(gbq.NotFoundException):
gbq.to_gbq(DataFrame([[1]]), "invalid_table_name", project_id="1234")
Expand Down

0 comments on commit d8c3900

Please sign in to comment.