Skip to content

Commit

Permalink
BUG: use original schema when appending (#318)
Browse files Browse the repository at this point in the history
* BUG: use original schema when appending

Don't overwrite table schema when appending to an existing table

* python 3.5 doesn't support f-string

* cln: refactor `to_gbq` to avoid unnecessary extra table GET HTTP calls

pandas-gbq already gets the table metadata when checking if a table
exists. This refactoring avoids extra calls to get the table metadata
when checking the schema.

also, fix a bug where update_schema appends columns that aren't in the
dataframe to the schema sent in the API request

* doc: add fix to changelog

* doc: revert accidental whitespace change

Co-authored-by: Tim Swast <[email protected]>
  • Loading branch information
ShantanuKumar and tswast authored Apr 29, 2020
1 parent 97e9a9e commit 3114e24
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 111 deletions.
4 changes: 4 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- [ ] closes #xxxx
- [ ] tests added / passed
- [ ] passes `nox -s blacken lint`
- [ ] `docs/source/changelog.rst` entry
8 changes: 8 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

.. _changelog-0.13.2:

0.13.2 / TBD
------------

- Fix ``Provided Schema does not match Table`` error when the existing table
contains required fields. (:issue:`315`)

.. _changelog-0.13.1:

0.13.1 / 2020-02-13
Expand Down
92 changes: 34 additions & 58 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@

import numpy as np

# Required dependencies, but treat as optional so that _test_google_api_imports
# can provide a better error message.
try:
from google.api_core import exceptions as google_exceptions
from google.cloud import bigquery
except ImportError: # pragma: NO COVER
bigquery = None
google_exceptions = None

try:
# The BigQuery Storage API client is an optional dependency. It is only
# required when use_bqstorage_api=True.
Expand Down Expand Up @@ -388,7 +397,6 @@ def sizeof_fmt(num, suffix="B"):
return fmt % (num, "Y", suffix)

def get_client(self):
from google.cloud import bigquery
import pandas

try:
Expand Down Expand Up @@ -429,7 +437,6 @@ def run_query(
):
from concurrent.futures import TimeoutError
from google.auth.exceptions import RefreshError
from google.cloud import bigquery

job_config = {
"query": {
Expand Down Expand Up @@ -640,15 +647,6 @@ def schema(self, dataset_id, table_id):
except self.http_error as ex:
self.process_http_error(ex)

def _clean_schema_fields(self, fields):
"""Return a sanitized version of the schema for comparisons."""
fields_sorted = sorted(fields, key=lambda field: field["name"])
# Ignore mode and description when comparing schemas.
return [
{"name": field["name"], "type": field["type"]}
for field in fields_sorted
]

def verify_schema(self, dataset_id, table_id, schema):
"""Indicate whether schemas match exactly
Expand All @@ -672,43 +670,12 @@ def verify_schema(self, dataset_id, table_id, schema):
Whether the schemas match
"""

fields_remote = self._clean_schema_fields(
fields_remote = pandas_gbq.schema._clean_schema_fields(
self.schema(dataset_id, table_id)
)
fields_local = self._clean_schema_fields(schema["fields"])

fields_local = pandas_gbq.schema._clean_schema_fields(schema["fields"])
return fields_remote == fields_local

def schema_is_subset(self, dataset_id, table_id, schema):
"""Indicate whether the schema to be uploaded is a subset
Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether a subset of the fields in
the former are present in the latter. Order is not considered.
Parameters
----------
dataset_id : str
Name of the BigQuery dataset for the table
table_id : str
Name of the BigQuery table
schema : list(dict)
Schema for comparison. Each item should have
a 'name' and a 'type'
Returns
-------
bool
Whether the passed schema is a subset
"""

fields_remote = self._clean_schema_fields(
self.schema(dataset_id, table_id)
)
fields_local = self._clean_schema_fields(schema["fields"])

return all(field in fields_remote for field in fields_local)

def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
table = _Table(
self.project_id, dataset_id, credentials=self.credentials
Expand Down Expand Up @@ -1141,7 +1108,6 @@ def to_gbq(
"""

_test_google_api_imports()
from pandas_gbq import schema

if verbose is not None and SHOW_VERBOSE_DEPRECATION:
warnings.warn(
Expand All @@ -1168,25 +1134,31 @@ def to_gbq(
credentials=credentials,
private_key=private_key,
)
bqclient = connector.client
dataset_id, table_id = destination_table.rsplit(".", 1)

table = _Table(
project_id,
dataset_id,
location=location,
credentials=connector.credentials,
)

default_schema = _generate_bq_schema(dataframe)
if not table_schema:
table_schema = default_schema
else:
table_schema = schema.update_schema(
table_schema = pandas_gbq.schema.update_schema(
default_schema, dict(fields=table_schema)
)

# If table exists, check if_exists parameter
if table.exists(table_id):
try:
table = bqclient.get_table(destination_table)
except google_exceptions.NotFound:
table_connector = _Table(
project_id,
dataset_id,
location=location,
credentials=connector.credentials,
)
table_connector.create(table_id, table_schema)
else:
original_schema = pandas_gbq.schema.to_pandas_gbq(table.schema)

if if_exists == "fail":
raise TableCreationError(
"Could not create the table because it "
Expand All @@ -1199,16 +1171,20 @@ def to_gbq(
dataset_id, table_id, table_schema
)
elif if_exists == "append":
if not connector.schema_is_subset(
dataset_id, table_id, table_schema
if not pandas_gbq.schema.schema_is_subset(
original_schema, table_schema
):
raise InvalidSchema(
"Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table."
)
else:
table.create(table_id, table_schema)

# Update the local `table_schema` so mode matches.
# See: https://github.com/pydata/pandas-gbq/issues/315
table_schema = pandas_gbq.schema.update_schema(
table_schema, original_schema
)

if dataframe.empty:
# Create the table (if needed), but don't try to run a load job with an
Expand Down
56 changes: 53 additions & 3 deletions pandas_gbq/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,59 @@
import copy


def to_pandas_gbq(client_schema):
"""Given a sequence of :class:`google.cloud.bigquery.schema.SchemaField`,
return a schema in pandas-gbq API format.
"""
remote_fields = [
field_remote.to_api_repr() for field_remote in client_schema
]
for field in remote_fields:
field["type"] = field["type"].upper()
field["mode"] = field["mode"].upper()

return {"fields": remote_fields}


def _clean_schema_fields(fields):
"""Return a sanitized version of the schema for comparisons.
The ``mode`` and ``description`` properties areis ignored because they
are not generated by func:`pandas_gbq.schema.generate_bq_schema`.
"""
fields_sorted = sorted(fields, key=lambda field: field["name"])
return [
{"name": field["name"], "type": field["type"]}
for field in fields_sorted
]


def schema_is_subset(schema_remote, schema_local):
"""Indicate whether the schema to be uploaded is a subset
Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether a subset of the fields in
the former are present in the latter. Order is not considered.
Parameters
----------
schema_remote : dict
Schema for comparison. Each item of ``fields`` should have a 'name'
and a 'type'
schema_local : dict
Schema for comparison. Each item of ``fields`` should have a 'name'
and a 'type'
Returns
-------
bool
Whether the passed schema is a subset
"""
fields_remote = _clean_schema_fields(schema_remote.get("fields", []))
fields_local = _clean_schema_fields(schema_local.get("fields", []))
return all(field in fields_remote for field in fields_local)


def generate_bq_schema(dataframe, default_type="STRING"):
"""Given a passed dataframe, generate the associated Google BigQuery schema.
Expand Down Expand Up @@ -59,9 +112,6 @@ def update_schema(schema_old, schema_new):
if name in field_indices:
# replace old field with new field of same name
output_fields[field_indices[name]] = field
else:
# add new field
output_fields.append(field)

return {"fields": output_fields}

Expand Down
68 changes: 30 additions & 38 deletions tests/system/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pytz

from pandas_gbq import gbq
import pandas_gbq.schema


TABLE_ID = "new_test"
Expand Down Expand Up @@ -1637,7 +1638,7 @@ def test_retrieve_schema(gbq_table, gbq_connector):
}

gbq_table.create(table_id, test_schema)
actual = gbq_connector._clean_schema_fields(
actual = pandas_gbq.schema._clean_schema_fields(
gbq_connector.schema(gbq_table.dataset_id, table_id)
)
expected = [
Expand All @@ -1649,48 +1650,39 @@ def test_retrieve_schema(gbq_table, gbq_connector):
assert expected == actual, "Expected schema used to create table"


def test_schema_is_subset_passes_if_subset(gbq_table, gbq_connector):
# Issue #24 schema_is_subset indicates whether the schema of the
# dataframe is a subset of the schema of the bigquery table
table_id = "test_schema_is_subset_passes_if_subset"
def test_to_gbq_does_not_override_mode(gbq_table, gbq_connector):
# See: https://github.com/pydata/pandas-gbq/issues/315
table_id = "test_to_gbq_does_not_override_mode"
table_schema = {
"fields": [
{"name": "A", "type": "FLOAT"},
{"name": "B", "type": "FLOAT"},
{"name": "C", "type": "STRING"},
]
}
tested_schema = {
"fields": [
{"name": "A", "type": "FLOAT"},
{"name": "B", "type": "FLOAT"},
{
"mode": "REQUIRED",
"name": "A",
"type": "FLOAT",
"description": "A",
},
{
"mode": "NULLABLE",
"name": "B",
"type": "FLOAT",
"description": "B",
},
{
"mode": "NULLABLE",
"name": "C",
"type": "STRING",
"description": "C",
},
]
}

gbq_table.create(table_id, table_schema)
assert gbq_connector.schema_is_subset(
gbq_table.dataset_id, table_id, tested_schema
gbq.to_gbq(
pandas.DataFrame({"A": [1.0], "B": [2.0], "C": ["a"]}),
"{0}.{1}".format(gbq_table.dataset_id, table_id),
project_id=gbq_connector.project_id,
if_exists="append",
)


def test_schema_is_subset_fails_if_not_subset(gbq_table, gbq_connector):
# For pull request #24
table_id = "test_schema_is_subset_fails_if_not_subset"
table_schema = {
"fields": [
{"name": "A", "type": "FLOAT"},
{"name": "B", "type": "FLOAT"},
{"name": "C", "type": "STRING"},
]
}
tested_schema = {
"fields": [
{"name": "A", "type": "FLOAT"},
{"name": "C", "type": "FLOAT"},
]
}

gbq_table.create(table_id, table_schema)
assert not gbq_connector.schema_is_subset(
gbq_table.dataset_id, table_id, tested_schema
)
actual = gbq_connector.schema(gbq_table.dataset_id, table_id)
assert table_schema["fields"] == actual
Loading

0 comments on commit 3114e24

Please sign in to comment.