Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: Adapt to breaking change in google-cloud-bigquery 0.32.0.dev1 #152

Merged
merged 8 commits into from
Apr 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ install:
conda install -q numpy pytz python-dateutil;
PRE_WHEELS="https://7933911d6844c6c53a7d-47bd50c35cd79bd838daf386af554a83.ssl.cf2.rackcdn.com";
pip install --pre --upgrade --timeout=60 -f $PRE_WHEELS pandas;
pip install -e 'git+https://github.com/GoogleCloudPlatform/google-cloud-python.git#egg=version_subpkg&subdirectory=api_core';
pip install -e 'git+https://github.com/GoogleCloudPlatform/google-cloud-python.git#egg=version_subpkg&subdirectory=core';
pip install -e 'git+https://github.com/GoogleCloudPlatform/google-cloud-python.git#egg=version_subpkg&subdirectory=bigquery';
else
conda install -q pandas=$PANDAS;
fi
Expand Down
1 change: 0 additions & 1 deletion ci/requirements-3.6-MASTER.pip
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
google-auth
google-auth-oauthlib
mock
google-cloud-bigquery
5 changes: 4 additions & 1 deletion docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
Changelog
=========

0.3.2 / [TBD]
0.4.0 / [TBD]
------------------
- Fix bug with querying for an array of floats (:issue:`123`)
- Fix bug with integer columns on Windows. Explicitly use 64bit integers when converting from BQ types. (:issue:`119`)
- Fix bug caused by breaking change the way ``google-cloud-python`` version 0.32.0+ handles additional configuration argument to ``read_gbq``. (:issue:`152`)
- **Deprecates** the ``verbose`` parameter. Messages use the logging module instead of printing progress directly to standard output. (:issue:`12`)

0.3.1 / 2018-02-13
------------------
Expand Down
25 changes: 25 additions & 0 deletions pandas_gbq/_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

import pkg_resources
from google.cloud import bigquery


# Version with query config breaking change.
BIGQUERY_CONFIG_VERSION = pkg_resources.parse_version('0.32.0.dev1')


def query_config_old_version(resource):
# Verify that we got a query resource. In newer versions of
# google-cloud-bigquery enough of the configuration is passed on to the
# backend that we can expect a backend validation error instead.
if len(resource) != 1:
raise ValueError("Only one job type must be specified, but "
"given {}".format(','.join(resource.keys())))
if 'query' not in resource:
raise ValueError("Only 'query' job type is supported")
return bigquery.QueryJobConfig.from_api_repr(resource['query'])


def query_config(resource, installed_version):
if installed_version < BIGQUERY_CONFIG_VERSION:
return query_config_old_version(resource)
return bigquery.QueryJobConfig.from_api_repr(resource)
93 changes: 41 additions & 52 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import time
import warnings
from datetime import datetime
from distutils.version import StrictVersion
from time import sleep

import numpy as np
Expand All @@ -14,7 +13,11 @@
logger = logging.getLogger(__name__)


BIGQUERY_INSTALLED_VERSION = None


def _check_google_client_version():
global BIGQUERY_INSTALLED_VERSION

try:
import pkg_resources
Expand All @@ -23,17 +26,15 @@ def _check_google_client_version():
raise ImportError('Could not import pkg_resources (setuptools).')

# https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md
bigquery_client_minimum_version = '0.29.0'

_BIGQUERY_CLIENT_VERSION = pkg_resources.get_distribution(
'google-cloud-bigquery').version
bigquery_minimum_version = pkg_resources.parse_version('0.29.0')
BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution(
'google-cloud-bigquery').parsed_version

if (StrictVersion(_BIGQUERY_CLIENT_VERSION) <
StrictVersion(bigquery_client_minimum_version)):
raise ImportError('pandas-gbq requires google-cloud-bigquery >= {0}, '
'current version {1}'
.format(bigquery_client_minimum_version,
_BIGQUERY_CLIENT_VERSION))
if BIGQUERY_INSTALLED_VERSION < bigquery_minimum_version:
raise ImportError(
'pandas-gbq requires google-cloud-bigquery >= {0}, '
'current version {1}'.format(
bigquery_minimum_version, BIGQUERY_INSTALLED_VERSION))


def _test_google_api_imports():
Expand Down Expand Up @@ -447,8 +448,8 @@ def process_http_error(ex):

def run_query(self, query, **kwargs):
from google.auth.exceptions import RefreshError
from google.cloud.bigquery import QueryJobConfig
from concurrent.futures import TimeoutError
from pandas_gbq import _query

job_config = {
'query': {
Expand All @@ -459,29 +460,23 @@ def run_query(self, query, **kwargs):
}
config = kwargs.get('configuration')
if config is not None:
if len(config) != 1:
raise ValueError("Only one job type must be specified, but "
"given {}".format(','.join(config.keys())))
if 'query' in config:
if 'query' in config['query']:
if query is not None:
raise ValueError("Query statement can't be specified "
"inside config while it is specified "
"as parameter")
query = config['query']['query']
del config['query']['query']

job_config['query'].update(config['query'])
else:
raise ValueError("Only 'query' job type is supported")
job_config.update(config)

if 'query' in config and 'query' in config['query']:
if query is not None:
raise ValueError("Query statement can't be specified "
"inside config while it is specified "
"as parameter")
query = config['query'].pop('query')

self._start_timer()
try:

try:
logger.info('Requesting query... ')
query_reply = self.client.query(
query,
job_config=QueryJobConfig.from_api_repr(job_config['query']))
job_config=_query.query_config(
job_config, BIGQUERY_INSTALLED_VERSION))
logger.info('ok.\nQuery running...')
except (RefreshError, ValueError):
if self.private_key:
Expand Down Expand Up @@ -598,6 +593,15 @@ def schema(self, dataset_id, table_id):
except self.http_error as ex:
self.process_http_error(ex)

def _clean_schema_fields(self, fields):
"""Return a sanitized version of the schema for comparisons."""
fields_sorted = sorted(fields, key=lambda field: field['name'])
# Ignore mode and description when comparing schemas.
return [
{'name': field['name'], 'type': field['type']}
for field in fields_sorted
]

def verify_schema(self, dataset_id, table_id, schema):
"""Indicate whether schemas match exactly

Expand All @@ -621,17 +625,9 @@ def verify_schema(self, dataset_id, table_id, schema):
Whether the schemas match
"""

fields_remote = sorted(self.schema(dataset_id, table_id),
key=lambda x: x['name'])
fields_local = sorted(schema['fields'], key=lambda x: x['name'])

# Ignore mode when comparing schemas.
for field in fields_local:
if 'mode' in field:
del field['mode']
for field in fields_remote:
if 'mode' in field:
del field['mode']
fields_remote = self._clean_schema_fields(
self.schema(dataset_id, table_id))
fields_local = self._clean_schema_fields(schema['fields'])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much nicer


return fields_remote == fields_local

Expand All @@ -658,16 +654,9 @@ def schema_is_subset(self, dataset_id, table_id, schema):
Whether the passed schema is a subset
"""

fields_remote = self.schema(dataset_id, table_id)
fields_local = schema['fields']

# Ignore mode when comparing schemas.
for field in fields_local:
if 'mode' in field:
del field['mode']
for field in fields_remote:
if 'mode' in field:
del field['mode']
fields_remote = self._clean_schema_fields(
self.schema(dataset_id, table_id))
fields_local = self._clean_schema_fields(schema['fields'])

return all(field in fields_remote for field in fields_local)

Expand Down Expand Up @@ -709,7 +698,7 @@ def _parse_data(schema, rows):
col_names = [str(field['name']) for field in fields]
col_dtypes = [
dtype_map.get(field['type'].upper(), object)
if field['mode'] != 'repeated'
if field['mode'].lower() != 'repeated'
else object
for field in fields
]
Expand Down Expand Up @@ -847,7 +836,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
for field in schema['fields']:
if field['type'].upper() in type_map and \
final_df[field['name']].notnull().all() and \
field['mode'] != 'repeated':
field['mode'].lower() != 'repeated':
final_df[field['name']] = \
final_df[field['name']].astype(type_map[field['type'].upper()])

Expand Down
57 changes: 57 additions & 0 deletions pandas_gbq/tests/test__query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@

import pkg_resources

import mock


@mock.patch('google.cloud.bigquery.QueryJobConfig')
def test_query_config_w_old_bq_version(mock_config):
from pandas_gbq._query import query_config

old_version = pkg_resources.parse_version('0.29.0')
query_config({'query': {'useLegacySql': False}}, old_version)
mock_config.from_api_repr.assert_called_once_with({'useLegacySql': False})


@mock.patch('google.cloud.bigquery.QueryJobConfig')
def test_query_config_w_dev_bq_version(mock_config):
from pandas_gbq._query import query_config

dev_version = pkg_resources.parse_version('0.32.0.dev1')
query_config(
{
'query': {
'useLegacySql': False,
},
'labels': {'key': 'value'},
},
dev_version)
mock_config.from_api_repr.assert_called_once_with(
{
'query': {
'useLegacySql': False,
},
'labels': {'key': 'value'},
})


@mock.patch('google.cloud.bigquery.QueryJobConfig')
def test_query_config_w_new_bq_version(mock_config):
from pandas_gbq._query import query_config

dev_version = pkg_resources.parse_version('1.0.0')
query_config(
{
'query': {
'useLegacySql': False,
},
'labels': {'key': 'value'},
},
dev_version)
mock_config.from_api_repr.assert_called_once_with(
{
'query': {
'useLegacySql': False,
},
'labels': {'key': 'value'},
})
38 changes: 32 additions & 6 deletions pandas_gbq/tests/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -1266,16 +1266,42 @@ def test_retrieve_schema(self):
test_id = "15"
test_schema = {
'fields': [
{'name': 'A', 'type': 'FLOAT', 'mode': 'NULLABLE'},
{'name': 'B', 'type': 'FLOAT', 'mode': 'NULLABLE'},
{'name': 'C', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'D', 'type': 'TIMESTAMP', 'mode': 'NULLABLE'}
{
'name': 'A',
'type': 'FLOAT',
'mode': 'NULLABLE',
'description': None,
},
{
'name': 'B',
'type': 'FLOAT',
'mode': 'NULLABLE',
'description': None,
},
{
'name': 'C',
'type': 'STRING',
'mode': 'NULLABLE',
'description': None,
},
{
'name': 'D',
'type': 'TIMESTAMP',
'mode': 'NULLABLE',
'description': None,
},
]
}

self.table.create(TABLE_ID + test_id, test_schema)
actual = self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id)
expected = test_schema['fields']
actual = self.sut._clean_schema_fields(
self.sut.schema(self.dataset_prefix + "1", TABLE_ID + test_id))
expected = [
{'name': 'A', 'type': 'FLOAT'},
{'name': 'B', 'type': 'FLOAT'},
{'name': 'C', 'type': 'STRING'},
{'name': 'D', 'type': 'TIMESTAMP'},
]
assert expected == actual, 'Expected schema used to create table'

def test_schema_is_subset_passes_if_subset(self):
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def readme():


INSTALL_REQUIRES = [
'setuptools',
'pandas',
'google-auth',
'google-auth-oauthlib',
Expand Down