From d8c11aab3699b528bceafdb119ac59936d1449d2 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Wed, 29 Apr 2020 12:51:20 -0500 Subject: [PATCH] cln: refactor `to_gbq` to avoid unnecessary extra table GET HTTP calls pandas-gbq already gets the table metadata when checking if a table exists. This refactoring avoids extra calls to get the table metadata when checking the schema. also, fix a bug where update_schema appends columns that aren't in the dataframe to the schema sent in the API request --- pandas_gbq/gbq.py | 121 ++++++++++---------------------------- pandas_gbq/schema.py | 56 +++++++++++++++++- tests/system/test_gbq.py | 52 +--------------- tests/unit/test_schema.py | 63 ++++++++++++++++---- 4 files changed, 139 insertions(+), 153 deletions(-) diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 192cca8f..d063501c 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -5,6 +5,15 @@ import numpy as np +# Required dependencies, but treat as optional so that _test_google_api_imports +# can provide a better error message. +try: + from google.api_core import exceptions as google_exceptions + from google.cloud import bigquery +except ImportError: # pragma: NO COVER + bigquery = None + google_exceptions = None + try: # The BigQuery Storage API client is an optional dependency. It is only # required when use_bqstorage_api=True. @@ -388,7 +397,6 @@ def sizeof_fmt(num, suffix="B"): return fmt % (num, "Y", suffix) def get_client(self): - from google.cloud import bigquery import pandas try: @@ -429,7 +437,6 @@ def run_query( ): from concurrent.futures import TimeoutError from google.auth.exceptions import RefreshError - from google.cloud import bigquery job_config = { "query": { @@ -640,22 +647,11 @@ def schema(self, dataset_id, table_id): except self.http_error as ex: self.process_http_error(ex) - def _clean_schema_fields(self, fields): - """Return a sanitized version of the schema for comparisons.""" - fields_sorted = sorted(fields, key=lambda field: field["name"]) - # Ignore mode and description when comparing schemas. - return [ - {"name": field["name"], "type": field["type"]} - for field in fields_sorted - ] - def verify_schema(self, dataset_id, table_id, schema): """Indicate whether schemas match exactly - Compare the BigQuery table identified in the parameters with the schema passed in and indicate whether all fields in the former are present in the latter. Order is not considered. - Parameters ---------- dataset_id :str @@ -665,50 +661,18 @@ def verify_schema(self, dataset_id, table_id, schema): schema : list(dict) Schema for comparison. Each item should have a 'name' and a 'type' - Returns ------- bool Whether the schemas match """ - fields_remote = self._clean_schema_fields( + fields_remote = pandas_gbq.schema._clean_schema_fields( self.schema(dataset_id, table_id) ) - fields_local = self._clean_schema_fields(schema["fields"]) - + fields_local = pandas_gbq.schema._clean_schema_fields(schema["fields"]) return fields_remote == fields_local - def schema_is_subset(self, dataset_id, table_id, schema): - """Indicate whether the schema to be uploaded is a subset - - Compare the BigQuery table identified in the parameters with - the schema passed in and indicate whether a subset of the fields in - the former are present in the latter. Order is not considered. - - Parameters - ---------- - dataset_id : str - Name of the BigQuery dataset for the table - table_id : str - Name of the BigQuery table - schema : list(dict) - Schema for comparison. Each item should have - a 'name' and a 'type' - - Returns - ------- - bool - Whether the passed schema is a subset - """ - - fields_remote = self._clean_schema_fields( - self.schema(dataset_id, table_id) - ) - fields_local = self._clean_schema_fields(schema["fields"]) - - return all(field in fields_remote for field in fields_local) - def delete_and_recreate_table(self, dataset_id, table_id, table_schema): table = _Table( self.project_id, dataset_id, credentials=self.credentials @@ -1141,7 +1105,6 @@ def to_gbq( """ _test_google_api_imports() - from pandas_gbq import schema if verbose is not None and SHOW_VERBOSE_DEPRECATION: warnings.warn( @@ -1168,25 +1131,31 @@ def to_gbq( credentials=credentials, private_key=private_key, ) + bqclient = connector.client dataset_id, table_id = destination_table.rsplit(".", 1) - table = _Table( - project_id, - dataset_id, - location=location, - credentials=connector.credentials, - ) - default_schema = _generate_bq_schema(dataframe) if not table_schema: table_schema = default_schema else: - table_schema = schema.update_schema( + table_schema = pandas_gbq.schema.update_schema( default_schema, dict(fields=table_schema) ) # If table exists, check if_exists parameter - if table.exists(table_id): + try: + table = bqclient.get_table(destination_table) + except google_exceptions.NotFound: + table_connector = _Table( + project_id, + dataset_id, + location=location, + credentials=connector.credentials, + ) + table_connector.create(table_id, table_schema) + else: + original_schema = pandas_gbq.schema.to_pandas_gbq(table.schema) + if if_exists == "fail": raise TableCreationError( "Could not create the table because it " @@ -1199,8 +1168,8 @@ def to_gbq( dataset_id, table_id, table_schema ) elif if_exists == "append": - if not connector.schema_is_subset( - dataset_id, table_id, table_schema + if not pandas_gbq.schema.schema_is_subset( + original_schema, table_schema ): raise InvalidSchema( "Please verify that the structure and " @@ -1208,13 +1177,11 @@ def to_gbq( "schema of the destination table." ) - # Fetch original schema, and update the local `table_schema` - original_schema = table.schema(table_id) - table_schema = schema.update_schema( - table_schema, dict(fields=original_schema) + # Update the local `table_schema` so mode matches. + # See: https://github.com/pydata/pandas-gbq/issues/315 + table_schema = pandas_gbq.schema.update_schema( + table_schema, original_schema ) - else: - table.create(table_id, table_schema) if dataframe.empty: # Create the table (if needed), but don't try to run a load job with an @@ -1373,30 +1340,6 @@ def delete(self, table_id): except self.http_error as ex: self.process_http_error(ex) - def schema(self, table_id): - """Retrieve the schema of the table - - Obtain from BigQuery the field names and field types - for the table defined by the parameters - - Parameters - ---------- - table_id : str - Name of table whose schema is to be fetched - - Returns - ------- - list of dicts - Fields representing the schema - """ - if not self.exists(table_id): - raise NotFoundException( - "Table {0} does not exist".format(table_id) - ) - - original_schema = super(_Table, self).schema(self.dataset_id, table_id) - return original_schema - class _Dataset(GbqConnector): def __init__( diff --git a/pandas_gbq/schema.py b/pandas_gbq/schema.py index bb18fabc..4154044b 100644 --- a/pandas_gbq/schema.py +++ b/pandas_gbq/schema.py @@ -3,6 +3,59 @@ import copy +def to_pandas_gbq(client_schema): + """Given a sequence of :class:`google.cloud.bigquery.schema.SchemaField`, + return a schema in pandas-gbq API format. + """ + remote_fields = [ + field_remote.to_api_repr() for field_remote in client_schema + ] + for field in remote_fields: + field["type"] = field["type"].upper() + field["mode"] = field["mode"].upper() + + return {"fields": remote_fields} + + +def _clean_schema_fields(fields): + """Return a sanitized version of the schema for comparisons. + + The ``mode`` and ``description`` properties areis ignored because they + are not generated by func:`pandas_gbq.schema.generate_bq_schema`. + """ + fields_sorted = sorted(fields, key=lambda field: field["name"]) + return [ + {"name": field["name"], "type": field["type"]} + for field in fields_sorted + ] + + +def schema_is_subset(schema_remote, schema_local): + """Indicate whether the schema to be uploaded is a subset + + Compare the BigQuery table identified in the parameters with + the schema passed in and indicate whether a subset of the fields in + the former are present in the latter. Order is not considered. + + Parameters + ---------- + schema_remote : dict + Schema for comparison. Each item of ``fields`` should have a 'name' + and a 'type' + schema_local : dict + Schema for comparison. Each item of ``fields`` should have a 'name' + and a 'type' + + Returns + ------- + bool + Whether the passed schema is a subset + """ + fields_remote = _clean_schema_fields(schema_remote.get("fields", [])) + fields_local = _clean_schema_fields(schema_local.get("fields", [])) + return all(field in fields_remote for field in fields_local) + + def generate_bq_schema(dataframe, default_type="STRING"): """Given a passed dataframe, generate the associated Google BigQuery schema. @@ -59,9 +112,6 @@ def update_schema(schema_old, schema_new): if name in field_indices: # replace old field with new field of same name output_fields[field_indices[name]] = field - else: - # add new field - output_fields.append(field) return {"fields": output_fields} diff --git a/tests/system/test_gbq.py b/tests/system/test_gbq.py index 3c85ec2b..f28bb3ec 100644 --- a/tests/system/test_gbq.py +++ b/tests/system/test_gbq.py @@ -12,6 +12,7 @@ import pytz from pandas_gbq import gbq +import pandas_gbq.schema TABLE_ID = "new_test" @@ -1637,7 +1638,7 @@ def test_retrieve_schema(gbq_table, gbq_connector): } gbq_table.create(table_id, test_schema) - actual = gbq_connector._clean_schema_fields( + actual = pandas_gbq.schema._clean_schema_fields( gbq_connector.schema(gbq_table.dataset_id, table_id) ) expected = [ @@ -1649,53 +1650,6 @@ def test_retrieve_schema(gbq_table, gbq_connector): assert expected == actual, "Expected schema used to create table" -def test_schema_is_subset_passes_if_subset(gbq_table, gbq_connector): - # Issue #24 schema_is_subset indicates whether the schema of the - # dataframe is a subset of the schema of the bigquery table - table_id = "test_schema_is_subset_passes_if_subset" - table_schema = { - "fields": [ - {"name": "A", "type": "FLOAT"}, - {"name": "B", "type": "FLOAT"}, - {"name": "C", "type": "STRING"}, - ] - } - tested_schema = { - "fields": [ - {"name": "A", "type": "FLOAT"}, - {"name": "B", "type": "FLOAT"}, - ] - } - - gbq_table.create(table_id, table_schema) - assert gbq_connector.schema_is_subset( - gbq_table.dataset_id, table_id, tested_schema - ) - - -def test_schema_is_subset_fails_if_not_subset(gbq_table, gbq_connector): - # For pull request #24 - table_id = "test_schema_is_subset_fails_if_not_subset" - table_schema = { - "fields": [ - {"name": "A", "type": "FLOAT"}, - {"name": "B", "type": "FLOAT"}, - {"name": "C", "type": "STRING"}, - ] - } - tested_schema = { - "fields": [ - {"name": "A", "type": "FLOAT"}, - {"name": "C", "type": "FLOAT"}, - ] - } - - gbq_table.create(table_id, table_schema) - assert not gbq_connector.schema_is_subset( - gbq_table.dataset_id, table_id, tested_schema - ) - - def test_schema_is_not_overwritten(gbq_table, gbq_connector): # Fixes bug #315 table_id = "test_schema_is_not_overwritten_for_existing_table" @@ -1730,5 +1684,5 @@ def test_schema_is_not_overwritten(gbq_table, gbq_connector): if_exists="append", ) - actual = gbq_table.schema(table_id) + actual = gbq_connector.schema(gbq_table.dataset_id, table_id) assert table_schema["fields"] == actual diff --git a/tests/unit/test_schema.py b/tests/unit/test_schema.py index af3b2043..2b732428 100644 --- a/tests/unit/test_schema.py +++ b/tests/unit/test_schema.py @@ -3,7 +3,48 @@ import pandas import pytest -import pandas_gbq.schema + +@pytest.fixture +def module_under_test(): + import pandas_gbq.schema + + return pandas_gbq.schema + + +def test_schema_is_subset_passes_if_subset(module_under_test): + # Issue #24 schema_is_subset indicates whether the schema of the + # dataframe is a subset of the schema of the bigquery table + table_schema = { + "fields": [ + {"name": "A", "type": "FLOAT"}, + {"name": "B", "type": "FLOAT"}, + {"name": "C", "type": "STRING"}, + ] + } + tested_schema = { + "fields": [ + {"name": "A", "type": "FLOAT"}, + {"name": "B", "type": "FLOAT"}, + ] + } + assert module_under_test.schema_is_subset(table_schema, tested_schema) + + +def test_schema_is_subset_fails_if_not_subset(module_under_test): + table_schema = { + "fields": [ + {"name": "A", "type": "FLOAT"}, + {"name": "B", "type": "FLOAT"}, + {"name": "C", "type": "STRING"}, + ] + } + tested_schema = { + "fields": [ + {"name": "A", "type": "FLOAT"}, + {"name": "C", "type": "FLOAT"}, + ] + } + assert not module_under_test.schema_is_subset(table_schema, tested_schema) @pytest.mark.parametrize( @@ -51,8 +92,8 @@ ), ], ) -def test_generate_bq_schema(dataframe, expected_schema): - schema = pandas_gbq.schema.generate_bq_schema(dataframe) +def test_generate_bq_schema(module_under_test, dataframe, expected_schema): + schema = module_under_test.generate_bq_schema(dataframe) assert schema == expected_schema @@ -62,16 +103,13 @@ def test_generate_bq_schema(dataframe, expected_schema): ( {"fields": [{"name": "col1", "type": "INTEGER"}]}, {"fields": [{"name": "col2", "type": "TIMESTAMP"}]}, - { - "fields": [ - {"name": "col1", "type": "INTEGER"}, - {"name": "col2", "type": "TIMESTAMP"}, - ] - }, + # Ignore fields that aren't in the DataFrame. + {"fields": [{"name": "col1", "type": "INTEGER"}]}, ), ( {"fields": [{"name": "col1", "type": "INTEGER"}]}, {"fields": [{"name": "col1", "type": "BOOLEAN"}]}, + # Update type for fields that are in the DataFrame. {"fields": [{"name": "col1", "type": "BOOLEAN"}]}, ), ( @@ -91,12 +129,13 @@ def test_generate_bq_schema(dataframe, expected_schema): "fields": [ {"name": "col1", "type": "INTEGER"}, {"name": "col2", "type": "BOOLEAN"}, - {"name": "col3", "type": "FLOAT"}, ] }, ), ], ) -def test_update_schema(schema_old, schema_new, expected_output): - output = pandas_gbq.schema.update_schema(schema_old, schema_new) +def test_update_schema( + module_under_test, schema_old, schema_new, expected_output +): + output = module_under_test.update_schema(schema_old, schema_new) assert output == expected_output