Skip to content

Commit

Permalink
BUG: Adapt to breaking change in google-cloud-bigquery 0.32.0.dev1 (#152
Browse files Browse the repository at this point in the history
)

* BUG: Update pandas-gbq to latest version of google-cloud-bigquery

There was a breaking change in 0.32.0.dev1 which changed the way
configuration for the query job gets loaded. Also, it added the
'description' field to the schema resource, so this change updates the
schema comparison logic to account for that.

Detect google-cloud-bigquery version for backwards compatibility

* DOC: Add verbose deprecation to changelog.

* TST: MASTER in CI also builds with g-c-bigquery at MASTER.
  • Loading branch information
tswast authored Apr 3, 2018
1 parent 9666965 commit 6294f7a
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 60 deletions.
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ install:
conda install -q numpy pytz python-dateutil;
PRE_WHEELS="https://7933911d6844c6c53a7d-47bd50c35cd79bd838daf386af554a83.ssl.cf2.rackcdn.com";
pip install --pre --upgrade --timeout=60 -f $PRE_WHEELS pandas;
pip install -e 'git+https://github.com/GoogleCloudPlatform/google-cloud-python.git#egg=version_subpkg&subdirectory=api_core';
pip install -e 'git+https://github.com/GoogleCloudPlatform/google-cloud-python.git#egg=version_subpkg&subdirectory=core';
pip install -e 'git+https://github.com/GoogleCloudPlatform/google-cloud-python.git#egg=version_subpkg&subdirectory=bigquery';
else
conda install -q pandas=$PANDAS;
fi
Expand Down
1 change: 0 additions & 1 deletion ci/requirements-3.6-MASTER.pip
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
google-auth
google-auth-oauthlib
mock
google-cloud-bigquery
5 changes: 4 additions & 1 deletion docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
Changelog
=========

0.3.2 / [TBD]
0.4.0 / [TBD]
------------------
- Fix bug with querying for an array of floats (:issue:`123`)
- Fix bug with integer columns on Windows. Explicitly use 64bit integers when converting from BQ types. (:issue:`119`)
- Fix bug caused by breaking change the way ``google-cloud-python`` version 0.32.0+ handles additional configuration argument to ``read_gbq``. (:issue:`152`)
- **Deprecates** the ``verbose`` parameter. Messages use the logging module instead of printing progress directly to standard output. (:issue:`12`)

0.3.1 / 2018-02-13
------------------
Expand Down
25 changes: 25 additions & 0 deletions pandas_gbq/_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

import pkg_resources
from google.cloud import bigquery


# Version with query config breaking change.
BIGQUERY_CONFIG_VERSION = pkg_resources.parse_version('0.32.0.dev1')


def query_config_old_version(resource):
# Verify that we got a query resource. In newer versions of
# google-cloud-bigquery enough of the configuration is passed on to the
# backend that we can expect a backend validation error instead.
if len(resource) != 1:
raise ValueError("Only one job type must be specified, but "
"given {}".format(','.join(resource.keys())))
if 'query' not in resource:
raise ValueError("Only 'query' job type is supported")
return bigquery.QueryJobConfig.from_api_repr(resource['query'])


def query_config(resource, installed_version):
if installed_version < BIGQUERY_CONFIG_VERSION:
return query_config_old_version(resource)
return bigquery.QueryJobConfig.from_api_repr(resource)
93 changes: 41 additions & 52 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time
import warnings
from datetime import datetime
from distutils.version import StrictVersion
from time import sleep

import numpy as np
Expand All @@ -14,7 +13,11 @@
logger = logging.getLogger(__name__)


BIGQUERY_INSTALLED_VERSION = None


def _check_google_client_version():
global BIGQUERY_INSTALLED_VERSION

try:
import pkg_resources
Expand All @@ -23,17 +26,15 @@ def _check_google_client_version():
raise ImportError('Could not import pkg_resources (setuptools).')

# https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md
bigquery_client_minimum_version = '0.29.0'

_BIGQUERY_CLIENT_VERSION = pkg_resources.get_distribution(
'google-cloud-bigquery').version
bigquery_minimum_version = pkg_resources.parse_version('0.29.0')
BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution(
'google-cloud-bigquery').parsed_version

if (StrictVersion(_BIGQUERY_CLIENT_VERSION) <
StrictVersion(bigquery_client_minimum_version)):
raise ImportError('pandas-gbq requires google-cloud-bigquery >= {0}, '
'current version {1}'
.format(bigquery_client_minimum_version,
_BIGQUERY_CLIENT_VERSION))
if BIGQUERY_INSTALLED_VERSION < bigquery_minimum_version:
raise ImportError(
'pandas-gbq requires google-cloud-bigquery >= {0}, '
'current version {1}'.format(
bigquery_minimum_version, BIGQUERY_INSTALLED_VERSION))


def _test_google_api_imports():
Expand Down Expand Up @@ -447,8 +448,8 @@ def process_http_error(ex):

def run_query(self, query, **kwargs):
from google.auth.exceptions import RefreshError
from google.cloud.bigquery import QueryJobConfig
from concurrent.futures import TimeoutError
from pandas_gbq import _query

job_config = {
'query': {
Expand All @@ -459,29 +460,23 @@ def run_query(self, query, **kwargs):
}
config = kwargs.get('configuration')
if config is not None:
if len(config) != 1:
raise ValueError("Only one job type must be specified, but "
"given {}".format(','.join(config.keys())))
if 'query' in config:
if 'query' in config['query']:
if query is not None:
raise ValueError("Query statement can't be specified "
"inside config while it is specified "
"as parameter")
query = config['query']['query']
del config['query']['query']

job_config['query'].update(config['query'])
else:
raise ValueError("Only 'query' job type is supported")
job_config.update(config)

if 'query' in config and 'query' in config['query']:
if query is not None:
raise ValueError("Query statement can't be specified "
"inside config while it is specified "
"as parameter")
query = config['query'].pop('query')

self._start_timer()
try:

try:
logger.info('Requesting query... ')
query_reply = self.client.query(
query,
job_config=QueryJobConfig.from_api_repr(job_config['query']))
job_config=_query.query_config(
job_config, BIGQUERY_INSTALLED_VERSION))
logger.info('ok.\nQuery running...')
except (RefreshError, ValueError):
if self.private_key:
Expand Down Expand Up @@ -598,6 +593,15 @@ def schema(self, dataset_id, table_id):
except self.http_error as ex:
self.process_http_error(ex)

def _clean_schema_fields(self, fields):
"""Return a sanitized version of the schema for comparisons."""
fields_sorted = sorted(fields, key=lambda field: field['name'])
# Ignore mode and description when comparing schemas.
return [
{'name': field['name'], 'type': field['type']}
for field in fields_sorted
]

def verify_schema(self, dataset_id, table_id, schema):
"""Indicate whether schemas match exactly
Expand All @@ -621,17 +625,9 @@ def verify_schema(self, dataset_id, table_id, schema):
Whether the schemas match
"""

fields_remote = sorted(self.schema(dataset_id, table_id),
key=lambda x: x['name'])
fields_local = sorted(schema['fields'], key=lambda x: x['name'])

# Ignore mode when comparing schemas.
for field in fields_local:
if 'mode' in field:
del field['mode']
for field in fields_remote:
if 'mode' in field:
del field['mode']
fields_remote = self._clean_schema_fields(
self.schema(dataset_id, table_id))
fields_local = self._clean_schema_fields(schema['fields'])

return fields_remote == fields_local

Expand All @@ -658,16 +654,9 @@ def schema_is_subset(self, dataset_id, table_id, schema):
Whether the passed schema is a subset
"""

fields_remote = self.schema(dataset_id, table_id)
fields_local = schema['fields']

# Ignore mode when comparing schemas.
for field in fields_local:
if 'mode' in field:
del field['mode']
for field in fields_remote:
if 'mode' in field:
del field['mode']
fields_remote = self._clean_schema_fields(
self.schema(dataset_id, table_id))
fields_local = self._clean_schema_fields(schema['fields'])

return all(field in fields_remote for field in fields_local)

Expand Down Expand Up @@ -709,7 +698,7 @@ def _parse_data(schema, rows):
col_names = [str(field['name']) for field in fields]
col_dtypes = [
dtype_map.get(field['type'].upper(), object)
if field['mode'] != 'repeated'
if field['mode'].lower() != 'repeated'
else object
for field in fields
]
Expand Down Expand Up @@ -847,7 +836,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
for field in schema['fields']:
if field['type'].upper() in type_map and \
final_df[field['name']].notnull().all() and \
field['mode'] != 'repeated':
field['mode'].lower() != 'repeated':
final_df[field['name']] = \
final_df[field['name']].astype(type_map[field['type'].upper()])

Expand Down
57 changes: 57 additions & 0 deletions pandas_gbq/tests/test__query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@

import pkg_resources

import mock


@mock.patch('google.cloud.bigquery.QueryJobConfig')
def test_query_config_w_old_bq_version(mock_config):
from pandas_gbq._query import query_config

old_version = pkg_resources.parse_version('0.29.0')
query_config({'query': {'useLegacySql': False}}, old_version)
mock_config.from_api_repr.assert_called_once_with({'useLegacySql': False})


@mock.patch('google.cloud.bigquery.QueryJobConfig')
def test_query_config_w_dev_bq_version(mock_config):
from pandas_gbq._query import query_config

dev_version = pkg_resources.parse_version('0.32.0.dev1')
query_config(
{
'query': {
'useLegacySql': False,
},
'labels': {'key': 'value'},
},
dev_version)
mock_config.from_api_repr.assert_called_once_with(
{
'query': {
'useLegacySql': False,
},
'labels': {'key': 'value'},
})


@mock.patch('google.cloud.bigquery.QueryJobConfig')
def test_query_config_w_new_bq_version(mock_config):
from pandas_gbq._query import query_config

dev_version = pkg_resources.parse_version('1.0.0')
query_config(
{
'query': {
'useLegacySql': False,
},
'labels': {'key': 'value'},
},
dev_version)
mock_config.from_api_repr.assert_called_once_with(
{
'query': {
'useLegacySql': False,
},
'labels': {'key': 'value'},
})
38 changes: 32 additions & 6 deletions pandas_gbq/tests/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,16 +1266,42 @@ def test_retrieve_schema(self):
test_id = "15"
test_schema = {
'fields': [
{'name': 'A', 'type': 'FLOAT', 'mode': 'NULLABLE'},
{'name': 'B', 'type': 'FLOAT', 'mode': 'NULLABLE'},
{'name': 'C', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'D', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'}
{
'name': 'A',
'type': 'FLOAT',
'mode': 'NULLABLE',
'description': None,
},
{
'name': 'B',
'type': 'FLOAT',
'mode': 'NULLABLE',
'description': None,
},
{
'name': 'C',
'type': 'STRING',
'mode': 'NULLABLE',
'description': None,
},
{
'name': 'D',
'type': 'TIMESTAMP',
'mode': 'NULLABLE',
'description': None,
},
]
}

self.table.create(TABLE_ID + test_id, test_schema)
actual = self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id)
expected = test_schema['fields']
actual = self.sut._clean_schema_fields(
self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id))
expected = [
{'name': 'A', 'type': 'FLOAT'},
{'name': 'B', 'type': 'FLOAT'},
{'name': 'C', 'type': 'STRING'},
{'name': 'D', 'type': 'TIMESTAMP'},
]
assert expected == actual, 'Expected schema used to create table'

def test_schema_is_subset_passes_if_subset(self):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def readme():


INSTALL_REQUIRES = [
'setuptools',
'pandas',
'google-auth',
'google-auth-oauthlib',
Expand Down

0 comments on commit 6294f7a

Please sign in to comment.