From 6294f7a6e4fc9ec63fce49077d146caba96387c3 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Tue, 3 Apr 2018 13:27:52 -0700 Subject: [PATCH] BUG: Adapt to breaking change in google-cloud-bigquery 0.32.0.dev1 (#152) * BUG: Update pandas-gbq to latest version of google-cloud-bigquery There was a breaking change in 0.32.0.dev1 which changed the way configuration for the query job gets loaded. Also, it added the 'description' field to the schema resource, so this change updates the schema comparison logic to account for that. Detect google-cloud-bigquery version for backwards compatibility * DOC: Add verbose deprecation to changelog. * TST: MASTER in CI also builds with g-c-bigquery at MASTER. --- .travis.yml | 3 ++ ci/requirements-3.6-MASTER.pip | 1 - docs/source/changelog.rst | 5 +- pandas_gbq/_query.py | 25 +++++++++ pandas_gbq/gbq.py | 93 +++++++++++++++------------------ pandas_gbq/tests/test__query.py | 57 ++++++++++++++++++++ pandas_gbq/tests/test_gbq.py | 38 +++++++++++--- setup.py | 1 + 8 files changed, 163 insertions(+), 60 deletions(-) create mode 100644 pandas_gbq/_query.py create mode 100644 pandas_gbq/tests/test__query.py diff --git a/.travis.yml b/.travis.yml index 92129cc6..42378680 100644 --- a/.travis.yml +++ b/.travis.yml @@ -28,6 +28,9 @@ install: conda install -q numpy pytz python-dateutil; PRE_WHEELS="https://7933911d6844c6c53a7d-47bd50c35cd79bd838daf386af554a83.ssl.cf2.rackcdn.com"; pip install --pre --upgrade --timeout=60 -f $PRE_WHEELS pandas; + pip install -e 'git+https://github.com/GoogleCloudPlatform/google-cloud-python.git#egg=version_subpkg&subdirectory=api_core'; + pip install -e 'git+https://github.com/GoogleCloudPlatform/google-cloud-python.git#egg=version_subpkg&subdirectory=core'; + pip install -e 'git+https://github.com/GoogleCloudPlatform/google-cloud-python.git#egg=version_subpkg&subdirectory=bigquery'; else conda install -q pandas=$PANDAS; fi diff --git a/ci/requirements-3.6-MASTER.pip b/ci/requirements-3.6-MASTER.pip index b52f2aeb..78f6834f 100644 --- a/ci/requirements-3.6-MASTER.pip +++ b/ci/requirements-3.6-MASTER.pip @@ -1,4 +1,3 @@ google-auth google-auth-oauthlib mock -google-cloud-bigquery diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index eca6f9ce..94f2c424 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -1,9 +1,12 @@ Changelog ========= -0.3.2 / [TBD] +0.4.0 / [TBD] ------------------ - Fix bug with querying for an array of floats (:issue:`123`) +- Fix bug with integer columns on Windows. Explicitly use 64bit integers when converting from BQ types. (:issue:`119`) +- Fix bug caused by breaking change the way ``google-cloud-python`` version 0.32.0+ handles additional configuration argument to ``read_gbq``. (:issue:`152`) +- **Deprecates** the ``verbose`` parameter. Messages use the logging module instead of printing progress directly to standard output. (:issue:`12`) 0.3.1 / 2018-02-13 ------------------ diff --git a/pandas_gbq/_query.py b/pandas_gbq/_query.py new file mode 100644 index 00000000..864bbb37 --- /dev/null +++ b/pandas_gbq/_query.py @@ -0,0 +1,25 @@ + +import pkg_resources +from google.cloud import bigquery + + +# Version with query config breaking change. +BIGQUERY_CONFIG_VERSION = pkg_resources.parse_version('0.32.0.dev1') + + +def query_config_old_version(resource): + # Verify that we got a query resource. In newer versions of + # google-cloud-bigquery enough of the configuration is passed on to the + # backend that we can expect a backend validation error instead. + if len(resource) != 1: + raise ValueError("Only one job type must be specified, but " + "given {}".format(','.join(resource.keys()))) + if 'query' not in resource: + raise ValueError("Only 'query' job type is supported") + return bigquery.QueryJobConfig.from_api_repr(resource['query']) + + +def query_config(resource, installed_version): + if installed_version < BIGQUERY_CONFIG_VERSION: + return query_config_old_version(resource) + return bigquery.QueryJobConfig.from_api_repr(resource) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 6d5aacf8..cca6d211 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -4,7 +4,6 @@ import time import warnings from datetime import datetime -from distutils.version import StrictVersion from time import sleep import numpy as np @@ -14,7 +13,11 @@ logger = logging.getLogger(__name__) +BIGQUERY_INSTALLED_VERSION = None + + def _check_google_client_version(): + global BIGQUERY_INSTALLED_VERSION try: import pkg_resources @@ -23,17 +26,15 @@ def _check_google_client_version(): raise ImportError('Could not import pkg_resources (setuptools).') # https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md - bigquery_client_minimum_version = '0.29.0' - - _BIGQUERY_CLIENT_VERSION = pkg_resources.get_distribution( - 'google-cloud-bigquery').version + bigquery_minimum_version = pkg_resources.parse_version('0.29.0') + BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution( + 'google-cloud-bigquery').parsed_version - if (StrictVersion(_BIGQUERY_CLIENT_VERSION) < - StrictVersion(bigquery_client_minimum_version)): - raise ImportError('pandas-gbq requires google-cloud-bigquery >= {0}, ' - 'current version {1}' - .format(bigquery_client_minimum_version, - _BIGQUERY_CLIENT_VERSION)) + if BIGQUERY_INSTALLED_VERSION < bigquery_minimum_version: + raise ImportError( + 'pandas-gbq requires google-cloud-bigquery >= {0}, ' + 'current version {1}'.format( + bigquery_minimum_version, BIGQUERY_INSTALLED_VERSION)) def _test_google_api_imports(): @@ -447,8 +448,8 @@ def process_http_error(ex): def run_query(self, query, **kwargs): from google.auth.exceptions import RefreshError - from google.cloud.bigquery import QueryJobConfig from concurrent.futures import TimeoutError + from pandas_gbq import _query job_config = { 'query': { @@ -459,29 +460,23 @@ def run_query(self, query, **kwargs): } config = kwargs.get('configuration') if config is not None: - if len(config) != 1: - raise ValueError("Only one job type must be specified, but " - "given {}".format(','.join(config.keys()))) - if 'query' in config: - if 'query' in config['query']: - if query is not None: - raise ValueError("Query statement can't be specified " - "inside config while it is specified " - "as parameter") - query = config['query']['query'] - del config['query']['query'] - - job_config['query'].update(config['query']) - else: - raise ValueError("Only 'query' job type is supported") + job_config.update(config) + + if 'query' in config and 'query' in config['query']: + if query is not None: + raise ValueError("Query statement can't be specified " + "inside config while it is specified " + "as parameter") + query = config['query'].pop('query') self._start_timer() - try: + try: logger.info('Requesting query... ') query_reply = self.client.query( query, - job_config=QueryJobConfig.from_api_repr(job_config['query'])) + job_config=_query.query_config( + job_config, BIGQUERY_INSTALLED_VERSION)) logger.info('ok.\nQuery running...') except (RefreshError, ValueError): if self.private_key: @@ -598,6 +593,15 @@ def schema(self, dataset_id, table_id): except self.http_error as ex: self.process_http_error(ex) + def _clean_schema_fields(self, fields): + """Return a sanitized version of the schema for comparisons.""" + fields_sorted = sorted(fields, key=lambda field: field['name']) + # Ignore mode and description when comparing schemas. + return [ + {'name': field['name'], 'type': field['type']} + for field in fields_sorted + ] + def verify_schema(self, dataset_id, table_id, schema): """Indicate whether schemas match exactly @@ -621,17 +625,9 @@ def verify_schema(self, dataset_id, table_id, schema): Whether the schemas match """ - fields_remote = sorted(self.schema(dataset_id, table_id), - key=lambda x: x['name']) - fields_local = sorted(schema['fields'], key=lambda x: x['name']) - - # Ignore mode when comparing schemas. - for field in fields_local: - if 'mode' in field: - del field['mode'] - for field in fields_remote: - if 'mode' in field: - del field['mode'] + fields_remote = self._clean_schema_fields( + self.schema(dataset_id, table_id)) + fields_local = self._clean_schema_fields(schema['fields']) return fields_remote == fields_local @@ -658,16 +654,9 @@ def schema_is_subset(self, dataset_id, table_id, schema): Whether the passed schema is a subset """ - fields_remote = self.schema(dataset_id, table_id) - fields_local = schema['fields'] - - # Ignore mode when comparing schemas. - for field in fields_local: - if 'mode' in field: - del field['mode'] - for field in fields_remote: - if 'mode' in field: - del field['mode'] + fields_remote = self._clean_schema_fields( + self.schema(dataset_id, table_id)) + fields_local = self._clean_schema_fields(schema['fields']) return all(field in fields_remote for field in fields_local) @@ -709,7 +698,7 @@ def _parse_data(schema, rows): col_names = [str(field['name']) for field in fields] col_dtypes = [ dtype_map.get(field['type'].upper(), object) - if field['mode'] != 'repeated' + if field['mode'].lower() != 'repeated' else object for field in fields ] @@ -847,7 +836,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, for field in schema['fields']: if field['type'].upper() in type_map and \ final_df[field['name']].notnull().all() and \ - field['mode'] != 'repeated': + field['mode'].lower() != 'repeated': final_df[field['name']] = \ final_df[field['name']].astype(type_map[field['type'].upper()]) diff --git a/pandas_gbq/tests/test__query.py b/pandas_gbq/tests/test__query.py new file mode 100644 index 00000000..43ab00f3 --- /dev/null +++ b/pandas_gbq/tests/test__query.py @@ -0,0 +1,57 @@ + +import pkg_resources + +import mock + + +@mock.patch('google.cloud.bigquery.QueryJobConfig') +def test_query_config_w_old_bq_version(mock_config): + from pandas_gbq._query import query_config + + old_version = pkg_resources.parse_version('0.29.0') + query_config({'query': {'useLegacySql': False}}, old_version) + mock_config.from_api_repr.assert_called_once_with({'useLegacySql': False}) + + +@mock.patch('google.cloud.bigquery.QueryJobConfig') +def test_query_config_w_dev_bq_version(mock_config): + from pandas_gbq._query import query_config + + dev_version = pkg_resources.parse_version('0.32.0.dev1') + query_config( + { + 'query': { + 'useLegacySql': False, + }, + 'labels': {'key': 'value'}, + }, + dev_version) + mock_config.from_api_repr.assert_called_once_with( + { + 'query': { + 'useLegacySql': False, + }, + 'labels': {'key': 'value'}, + }) + + +@mock.patch('google.cloud.bigquery.QueryJobConfig') +def test_query_config_w_new_bq_version(mock_config): + from pandas_gbq._query import query_config + + dev_version = pkg_resources.parse_version('1.0.0') + query_config( + { + 'query': { + 'useLegacySql': False, + }, + 'labels': {'key': 'value'}, + }, + dev_version) + mock_config.from_api_repr.assert_called_once_with( + { + 'query': { + 'useLegacySql': False, + }, + 'labels': {'key': 'value'}, + }) diff --git a/pandas_gbq/tests/test_gbq.py b/pandas_gbq/tests/test_gbq.py index 2df1b9bd..abb670ef 100644 --- a/pandas_gbq/tests/test_gbq.py +++ b/pandas_gbq/tests/test_gbq.py @@ -1266,16 +1266,42 @@ def test_retrieve_schema(self): test_id = "15" test_schema = { 'fields': [ - {'name': 'A', 'type': 'FLOAT', 'mode': 'NULLABLE'}, - {'name': 'B', 'type': 'FLOAT', 'mode': 'NULLABLE'}, - {'name': 'C', 'type': 'STRING', 'mode': 'NULLABLE'}, - {'name': 'D', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'} + { + 'name': 'A', + 'type': 'FLOAT', + 'mode': 'NULLABLE', + 'description': None, + }, + { + 'name': 'B', + 'type': 'FLOAT', + 'mode': 'NULLABLE', + 'description': None, + }, + { + 'name': 'C', + 'type': 'STRING', + 'mode': 'NULLABLE', + 'description': None, + }, + { + 'name': 'D', + 'type': 'TIMESTAMP', + 'mode': 'NULLABLE', + 'description': None, + }, ] } self.table.create(TABLE_ID + test_id, test_schema) - actual = self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id) - expected = test_schema['fields'] + actual = self.sut._clean_schema_fields( + self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id)) + expected = [ + {'name': 'A', 'type': 'FLOAT'}, + {'name': 'B', 'type': 'FLOAT'}, + {'name': 'C', 'type': 'STRING'}, + {'name': 'D', 'type': 'TIMESTAMP'}, + ] assert expected == actual, 'Expected schema used to create table' def test_schema_is_subset_passes_if_subset(self): diff --git a/setup.py b/setup.py index d3c35311..ebe147c3 100644 --- a/setup.py +++ b/setup.py @@ -17,6 +17,7 @@ def readme(): INSTALL_REQUIRES = [ + 'setuptools', 'pandas', 'google-auth', 'google-auth-oauthlib',