Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: use original schema when appending #318

Merged
merged 5 commits into from
Apr 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- [ ] closes #xxxx
- [ ] tests added / passed
- [ ] passes `nox -s blacken lint`
- [ ] `docs/source/changelog.rst` entry
8 changes: 8 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
Changelog
=========

.. _changelog-0.13.2:

0.13.2 / TBD
------------

- Fix ``Provided Schema does not match Table`` error when the existing table
contains required fields. (:issue:`315`)

.. _changelog-0.13.1:

0.13.1 / 2020-02-13
Expand Down
92 changes: 34 additions & 58 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@

import numpy as np

# Required dependencies, but treat as optional so that _test_google_api_imports
# can provide a better error message.
try:
from google.api_core import exceptions as google_exceptions
from google.cloud import bigquery
except ImportError: # pragma: NO COVER
bigquery = None
google_exceptions = None

try:
# The BigQuery Storage API client is an optional dependency. It is only
# required when use_bqstorage_api=True.
Expand Down Expand Up @@ -388,7 +397,6 @@ def sizeof_fmt(num, suffix="B"):
return fmt % (num, "Y", suffix)

def get_client(self):
from google.cloud import bigquery
import pandas

try:
Expand Down Expand Up @@ -429,7 +437,6 @@ def run_query(
):
from concurrent.futures import TimeoutError
from google.auth.exceptions import RefreshError
from google.cloud import bigquery

job_config = {
"query": {
Expand Down Expand Up @@ -640,15 +647,6 @@ def schema(self, dataset_id, table_id):
except self.http_error as ex:
self.process_http_error(ex)

def _clean_schema_fields(self, fields):
"""Return a sanitized version of the schema for comparisons."""
fields_sorted = sorted(fields, key=lambda field: field["name"])
# Ignore mode and description when comparing schemas.
return [
{"name": field["name"], "type": field["type"]}
for field in fields_sorted
]

def verify_schema(self, dataset_id, table_id, schema):
"""Indicate whether schemas match exactly

Expand All @@ -672,43 +670,12 @@ def verify_schema(self, dataset_id, table_id, schema):
Whether the schemas match
"""

fields_remote = self._clean_schema_fields(
fields_remote = pandas_gbq.schema._clean_schema_fields(
self.schema(dataset_id, table_id)
)
fields_local = self._clean_schema_fields(schema["fields"])

fields_local = pandas_gbq.schema._clean_schema_fields(schema["fields"])
return fields_remote == fields_local

def schema_is_subset(self, dataset_id, table_id, schema):
"""Indicate whether the schema to be uploaded is a subset

Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether a subset of the fields in
the former are present in the latter. Order is not considered.

Parameters
----------
dataset_id : str
Name of the BigQuery dataset for the table
table_id : str
Name of the BigQuery table
schema : list(dict)
Schema for comparison. Each item should have
a 'name' and a 'type'

Returns
-------
bool
Whether the passed schema is a subset
"""

fields_remote = self._clean_schema_fields(
self.schema(dataset_id, table_id)
)
fields_local = self._clean_schema_fields(schema["fields"])

return all(field in fields_remote for field in fields_local)

def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
table = _Table(
self.project_id, dataset_id, credentials=self.credentials
Expand Down Expand Up @@ -1141,7 +1108,6 @@ def to_gbq(
"""

_test_google_api_imports()
from pandas_gbq import schema

if verbose is not None and SHOW_VERBOSE_DEPRECATION:
warnings.warn(
Expand All @@ -1168,25 +1134,31 @@ def to_gbq(
credentials=credentials,
private_key=private_key,
)
bqclient = connector.client
dataset_id, table_id = destination_table.rsplit(".", 1)

table = _Table(
project_id,
dataset_id,
location=location,
credentials=connector.credentials,
)

default_schema = _generate_bq_schema(dataframe)
if not table_schema:
table_schema = default_schema
else:
table_schema = schema.update_schema(
table_schema = pandas_gbq.schema.update_schema(
default_schema, dict(fields=table_schema)
)

# If table exists, check if_exists parameter
if table.exists(table_id):
try:
table = bqclient.get_table(destination_table)
except google_exceptions.NotFound:
table_connector = _Table(
project_id,
dataset_id,
location=location,
credentials=connector.credentials,
)
table_connector.create(table_id, table_schema)
else:
original_schema = pandas_gbq.schema.to_pandas_gbq(table.schema)

if if_exists == "fail":
raise TableCreationError(
"Could not create the table because it "
Expand All @@ -1199,16 +1171,20 @@ def to_gbq(
dataset_id, table_id, table_schema
)
elif if_exists == "append":
if not connector.schema_is_subset(
dataset_id, table_id, table_schema
if not pandas_gbq.schema.schema_is_subset(
original_schema, table_schema
):
raise InvalidSchema(
"Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table."
)
else:
table.create(table_id, table_schema)

# Update the local `table_schema` so mode matches.
# See: https://github.com/pydata/pandas-gbq/issues/315
table_schema = pandas_gbq.schema.update_schema(
table_schema, original_schema
)

if dataframe.empty:
# Create the table (if needed), but don't try to run a load job with an
Expand Down
56 changes: 53 additions & 3 deletions pandas_gbq/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,59 @@
import copy


def to_pandas_gbq(client_schema):
"""Given a sequence of :class:`google.cloud.bigquery.schema.SchemaField`,
return a schema in pandas-gbq API format.
"""
remote_fields = [
field_remote.to_api_repr() for field_remote in client_schema
]
for field in remote_fields:
field["type"] = field["type"].upper()
field["mode"] = field["mode"].upper()

return {"fields": remote_fields}


def _clean_schema_fields(fields):
"""Return a sanitized version of the schema for comparisons.

The ``mode`` and ``description`` properties areis ignored because they
are not generated by func:`pandas_gbq.schema.generate_bq_schema`.
"""
fields_sorted = sorted(fields, key=lambda field: field["name"])
return [
{"name": field["name"], "type": field["type"]}
for field in fields_sorted
]


def schema_is_subset(schema_remote, schema_local):
"""Indicate whether the schema to be uploaded is a subset

Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether a subset of the fields in
the former are present in the latter. Order is not considered.

Parameters
----------
schema_remote : dict
Schema for comparison. Each item of ``fields`` should have a 'name'
and a 'type'
schema_local : dict
Schema for comparison. Each item of ``fields`` should have a 'name'
and a 'type'

Returns
-------
bool
Whether the passed schema is a subset
"""
fields_remote = _clean_schema_fields(schema_remote.get("fields", []))
fields_local = _clean_schema_fields(schema_local.get("fields", []))
return all(field in fields_remote for field in fields_local)


def generate_bq_schema(dataframe, default_type="STRING"):
"""Given a passed dataframe, generate the associated Google BigQuery schema.

Expand Down Expand Up @@ -59,9 +112,6 @@ def update_schema(schema_old, schema_new):
if name in field_indices:
# replace old field with new field of same name
output_fields[field_indices[name]] = field
else:
# add new field
output_fields.append(field)

return {"fields": output_fields}

Expand Down
68 changes: 30 additions & 38 deletions tests/system/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pytz

from pandas_gbq import gbq
import pandas_gbq.schema


TABLE_ID = "new_test"
Expand Down Expand Up @@ -1637,7 +1638,7 @@ def test_retrieve_schema(gbq_table, gbq_connector):
}

gbq_table.create(table_id, test_schema)
actual = gbq_connector._clean_schema_fields(
actual = pandas_gbq.schema._clean_schema_fields(
gbq_connector.schema(gbq_table.dataset_id, table_id)
)
expected = [
Expand All @@ -1649,48 +1650,39 @@ def test_retrieve_schema(gbq_table, gbq_connector):
assert expected == actual, "Expected schema used to create table"


def test_schema_is_subset_passes_if_subset(gbq_table, gbq_connector):
# Issue #24 schema_is_subset indicates whether the schema of the
# dataframe is a subset of the schema of the bigquery table
table_id = "test_schema_is_subset_passes_if_subset"
def test_to_gbq_does_not_override_mode(gbq_table, gbq_connector):
# See: https://github.com/pydata/pandas-gbq/issues/315
table_id = "test_to_gbq_does_not_override_mode"
table_schema = {
"fields": [
{"name": "A", "type": "FLOAT"},
{"name": "B", "type": "FLOAT"},
{"name": "C", "type": "STRING"},
]
}
tested_schema = {
"fields": [
{"name": "A", "type": "FLOAT"},
{"name": "B", "type": "FLOAT"},
{
"mode": "REQUIRED",
"name": "A",
"type": "FLOAT",
"description": "A",
},
{
"mode": "NULLABLE",
"name": "B",
"type": "FLOAT",
"description": "B",
},
{
"mode": "NULLABLE",
"name": "C",
"type": "STRING",
"description": "C",
},
]
}

gbq_table.create(table_id, table_schema)
assert gbq_connector.schema_is_subset(
gbq_table.dataset_id, table_id, tested_schema
gbq.to_gbq(
pandas.DataFrame({"A": [1.0], "B": [2.0], "C": ["a"]}),
"{0}.{1}".format(gbq_table.dataset_id, table_id),
project_id=gbq_connector.project_id,
if_exists="append",
)


def test_schema_is_subset_fails_if_not_subset(gbq_table, gbq_connector):
# For pull request #24
table_id = "test_schema_is_subset_fails_if_not_subset"
table_schema = {
"fields": [
{"name": "A", "type": "FLOAT"},
{"name": "B", "type": "FLOAT"},
{"name": "C", "type": "STRING"},
]
}
tested_schema = {
"fields": [
{"name": "A", "type": "FLOAT"},
{"name": "C", "type": "FLOAT"},
]
}

gbq_table.create(table_id, table_schema)
assert not gbq_connector.schema_is_subset(
gbq_table.dataset_id, table_id, tested_schema
)
actual = gbq_connector.schema(gbq_table.dataset_id, table_id)
assert table_schema["fields"] == actual
Loading