diff --git a/ci/requirements-3.7-0.24.2.conda b/ci/requirements-3.7-0.24.2.conda index a99bd59e..2facfb2c 100644 --- a/ci/requirements-3.7-0.24.2.conda +++ b/ci/requirements-3.7-0.24.2.conda @@ -3,6 +3,7 @@ coverage db-dtypes==0.3.1 fastavro flake8 +freezegun numpy==1.16.6 google-cloud-bigquery==1.27.2 google-cloud-bigquery-storage==1.1.0 diff --git a/ci/requirements-3.9-1.3.4.conda b/ci/requirements-3.9-1.3.4.conda index 73595253..1411fe5b 100644 --- a/ci/requirements-3.9-1.3.4.conda +++ b/ci/requirements-3.9-1.3.4.conda @@ -3,6 +3,7 @@ coverage db-dtypes fastavro flake8 +freezegun google-cloud-bigquery google-cloud-bigquery-storage numpy diff --git a/noxfile.py b/noxfile.py index 1b719448..209ec3ae 100644 --- a/noxfile.py +++ b/noxfile.py @@ -94,6 +94,7 @@ def default(session): "-c", constraints_path, ) + session.install("freezegun", "-c", constraints_path) if session.python == "3.9": extras = "" diff --git a/owlbot.py b/owlbot.py index 5cf11d2e..02eeb069 100644 --- a/owlbot.py +++ b/owlbot.py @@ -38,6 +38,7 @@ unit_test_python_versions=["3.7", "3.8", "3.9", "3.10"], system_test_python_versions=["3.7", "3.8", "3.9", "3.10"], cov_level=96, + unit_test_external_dependencies=["freezegun"], unit_test_extras=extras, unit_test_extras_by_python=extras_by_python, system_test_extras=extras, diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 6c9b6804..6d06d3d6 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -2,6 +2,8 @@ # Use of this source code is governed by a BSD-style # license that can be found in the LICENSE file. +import copy +import concurrent.futures from datetime import datetime import logging import re @@ -378,6 +380,9 @@ def process_http_error(ex): # See `BigQuery Troubleshooting Errors # `__ + if "cancelled" in ex.message: + raise QueryTimeout("Reason: {0}".format(ex)) + raise GenericGBQException("Reason: {0}".format(ex)) def download_table( @@ -406,8 +411,41 @@ def download_table( user_dtypes=dtypes, ) + def _wait_for_query_job(self, query_reply, timeout_ms): + """Wait for query to complete, pausing occasionally to update progress. + + Args: + query_reply (QueryJob): + A query job which has started. + + timeout_ms (Optional[int]): + How long to wait before cancelling the query. + """ + # Wait at most 10 seconds so we can show progress. + # TODO(https://github.com/googleapis/python-bigquery-pandas/issues/327): + # Include a tqdm progress bar here instead of a stream of log messages. + timeout_sec = 10.0 + if timeout_ms: + timeout_sec = min(timeout_sec, timeout_ms / 1000.0) + + while query_reply.state != "DONE": + self.log_elapsed_seconds(" Elapsed", "s. Waiting...") + + if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000: + self.client.cancel_job( + query_reply.job_id, location=query_reply.location + ) + raise QueryTimeout("Query timeout: {} ms".format(timeout_ms)) + + try: + query_reply.result(timeout=timeout_sec) + except concurrent.futures.TimeoutError: + # Use our own timeout logic + pass + except self.http_error as ex: + self.process_http_error(ex) + def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): - from concurrent.futures import TimeoutError from google.auth.exceptions import RefreshError from google.cloud import bigquery import pandas @@ -449,28 +487,11 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): job_id = query_reply.job_id logger.debug("Job ID: %s" % job_id) - while query_reply.state != "DONE": - self.log_elapsed_seconds(" Elapsed", "s. Waiting...") - - timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get( - "timeoutMs" - ) - timeout_ms = int(timeout_ms) if timeout_ms else None - if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000: - raise QueryTimeout("Query timeout: {} ms".format(timeout_ms)) - - timeout_sec = 1.0 - if timeout_ms: - # Wait at most 1 second so we can show progress bar - timeout_sec = min(1.0, timeout_ms / 1000.0) - - try: - query_reply.result(timeout=timeout_sec) - except TimeoutError: - # Use our own timeout logic - pass - except self.http_error as ex: - self.process_http_error(ex) + timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get( + "timeoutMs" + ) + timeout_ms = int(timeout_ms) if timeout_ms else None + self._wait_for_query_job(query_reply, timeout_ms) if query_reply.cache_hit: logger.debug("Query done.\nCache hit.\n") @@ -673,6 +694,28 @@ def _finalize_dtypes( return df +def _transform_read_gbq_configuration(configuration): + """ + For backwards-compatibility, convert any previously client-side only + parameters such as timeoutMs to the property name expected by the REST API. + + Makes a copy of configuration if changes are needed. + """ + + if configuration is None: + return None + + timeout_ms = configuration.get("query", {}).get("timeoutMs") + if timeout_ms is not None: + # Transform timeoutMs to an actual server-side configuration. + # https://github.com/googleapis/python-bigquery-pandas/issues/479 + configuration = copy.deepcopy(configuration) + del configuration["query"]["timeoutMs"] + configuration["jobTimeoutMs"] = timeout_ms + + return configuration + + def read_gbq( query_or_table, project_id=None, @@ -847,6 +890,8 @@ def read_gbq( if dialect not in ("legacy", "standard"): raise ValueError("'{0}' is not valid for dialect".format(dialect)) + configuration = _transform_read_gbq_configuration(configuration) + if configuration and "query" in configuration and "query" in configuration["query"]: if query_or_table is not None: raise ValueError( diff --git a/tests/system/test_gbq.py b/tests/system/test_gbq.py index 214b1f74..2290744c 100644 --- a/tests/system/test_gbq.py +++ b/tests/system/test_gbq.py @@ -473,22 +473,13 @@ def test_configuration_raises_value_error_with_multiple_config(self, project_id) def test_timeout_configuration(self, project_id): sql_statement = """ - SELECT - SUM(bottles_sold) total_bottles, - UPPER(category_name) category_name, - magnitude, - liquor.zip_code zip_code - FROM `bigquery-public-data.iowa_liquor_sales.sales` liquor - JOIN `bigquery-public-data.geo_us_boundaries.zip_codes` zip_codes - ON liquor.zip_code = zip_codes.zip_code - JOIN `bigquery-public-data.noaa_historic_severe_storms.tornado_paths` tornados - ON liquor.date = tornados.storm_date - WHERE ST_INTERSECTS(tornado_path_geom, zip_code_geom) - GROUP BY category_name, magnitude, zip_code - ORDER BY magnitude ASC, total_bottles DESC + select count(*) from unnest(generate_array(1,1000000)), unnest(generate_array(1, 10000)) """ configs = [ + # pandas-gbq timeout configuration. Transformed to REST API compatible version. {"query": {"useQueryCache": False, "timeoutMs": 1}}, + # REST API job timeout. See: + # https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.job_timeout_ms {"query": {"useQueryCache": False}, "jobTimeoutMs": 1}, ] for config in configs: diff --git a/tests/unit/test_gbq.py b/tests/unit/test_gbq.py index 511e68d6..457d356b 100644 --- a/tests/unit/test_gbq.py +++ b/tests/unit/test_gbq.py @@ -4,10 +4,12 @@ # -*- coding: utf-8 -*- +import concurrent.futures import copy import datetime from unittest import mock +import freezegun import google.api_core.exceptions import numpy import pandas @@ -114,6 +116,61 @@ def test__is_query(query_or_table, expected): assert result == expected +@pytest.mark.parametrize( + ["original", "expected"], + [ + (None, None), + ({}, {}), + ({"query": {"useQueryCache": False}}, {"query": {"useQueryCache": False}}), + ({"jobTimeoutMs": "1234"}, {"jobTimeoutMs": "1234"}), + ({"query": {"timeoutMs": "1234"}}, {"query": {}, "jobTimeoutMs": "1234"}), + ], +) +def test__transform_read_gbq_configuration_makes_copy(original, expected): + should_change = original == expected + got = gbq._transform_read_gbq_configuration(original) + assert got == expected + # Catch if we accidentally modified the original. + did_change = original == got + assert did_change == should_change + + +def test__wait_for_query_job_exits_when_done(mock_bigquery_client): + connector = _make_connector() + connector.client = mock_bigquery_client + connector.start = datetime.datetime(2020, 1, 1).timestamp() + + mock_query = mock.create_autospec(google.cloud.bigquery.QueryJob) + type(mock_query).state = mock.PropertyMock(side_effect=("RUNNING", "DONE")) + mock_query.result.side_effect = concurrent.futures.TimeoutError("fake timeout") + + with freezegun.freeze_time("2020-01-01 00:00:00", tick=False): + connector._wait_for_query_job(mock_query, 60) + + mock_bigquery_client.cancel_job.assert_not_called() + + +def test__wait_for_query_job_cancels_after_timeout(mock_bigquery_client): + connector = _make_connector() + connector.client = mock_bigquery_client + connector.start = datetime.datetime(2020, 1, 1).timestamp() + + mock_query = mock.create_autospec(google.cloud.bigquery.QueryJob) + mock_query.job_id = "a-random-id" + mock_query.location = "job-location" + mock_query.state = "RUNNING" + mock_query.result.side_effect = concurrent.futures.TimeoutError("fake timeout") + + with freezegun.freeze_time( + "2020-01-01 00:00:00", auto_tick_seconds=15 + ), pytest.raises(gbq.QueryTimeout): + connector._wait_for_query_job(mock_query, 60) + + mock_bigquery_client.cancel_job.assert_called_with( + "a-random-id", location="job-location" + ) + + def test_GbqConnector_get_client_w_new_bq(mock_bigquery_client): gbq._test_google_api_imports() pytest.importorskip("google.api_core.client_info") @@ -125,6 +182,14 @@ def test_GbqConnector_get_client_w_new_bq(mock_bigquery_client): assert kwargs["client_info"].user_agent == "pandas-{}".format(pandas.__version__) +def test_GbqConnector_process_http_error_transforms_timeout(): + original = google.api_core.exceptions.GoogleAPICallError( + "Job execution was cancelled: Job timed out after 0s" + ) + with pytest.raises(gbq.QueryTimeout): + gbq.GbqConnector.process_http_error(original) + + def test_to_gbq_should_fail_if_invalid_table_name_passed(): with pytest.raises(gbq.NotFoundException): gbq.to_gbq(DataFrame([[1]]), "invalid_table_name", project_id="1234")