Skip to content

Commit

Permalink
cln: refactor to_gbq to avoid unnecessary extra table GET HTTP calls
Browse files Browse the repository at this point in the history
pandas-gbq already gets the table metadata when checking if a table
exists. This refactoring avoids extra calls to get the table metadata
when checking the schema.

also, fix a bug where update_schema appends columns that aren't in the
dataframe to the schema sent in the API request
  • Loading branch information
tswast committed Apr 29, 2020
1 parent 52e9c47 commit d8c11aa
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 153 deletions.
121 changes: 32 additions & 89 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@

import numpy as np

# Required dependencies, but treat as optional so that _test_google_api_imports
# can provide a better error message.
try:
from google.api_core import exceptions as google_exceptions
from google.cloud import bigquery
except ImportError: # pragma: NO COVER
bigquery = None
google_exceptions = None

try:
# The BigQuery Storage API client is an optional dependency. It is only
# required when use_bqstorage_api=True.
Expand Down Expand Up @@ -388,7 +397,6 @@ def sizeof_fmt(num, suffix="B"):
return fmt % (num, "Y", suffix)

def get_client(self):
from google.cloud import bigquery
import pandas

try:
Expand Down Expand Up @@ -429,7 +437,6 @@ def run_query(
):
from concurrent.futures import TimeoutError
from google.auth.exceptions import RefreshError
from google.cloud import bigquery

job_config = {
"query": {
Expand Down Expand Up @@ -640,22 +647,11 @@ def schema(self, dataset_id, table_id):
except self.http_error as ex:
self.process_http_error(ex)

def _clean_schema_fields(self, fields):
"""Return a sanitized version of the schema for comparisons."""
fields_sorted = sorted(fields, key=lambda field: field["name"])
# Ignore mode and description when comparing schemas.
return [
{"name": field["name"], "type": field["type"]}
for field in fields_sorted
]

def verify_schema(self, dataset_id, table_id, schema):
"""Indicate whether schemas match exactly
Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether all fields in the former
are present in the latter. Order is not considered.
Parameters
----------
dataset_id :str
Expand All @@ -665,50 +661,18 @@ def verify_schema(self, dataset_id, table_id, schema):
schema : list(dict)
Schema for comparison. Each item should have
a 'name' and a 'type'
Returns
-------
bool
Whether the schemas match
"""

fields_remote = self._clean_schema_fields(
fields_remote = pandas_gbq.schema._clean_schema_fields(
self.schema(dataset_id, table_id)
)
fields_local = self._clean_schema_fields(schema["fields"])

fields_local = pandas_gbq.schema._clean_schema_fields(schema["fields"])
return fields_remote == fields_local

def schema_is_subset(self, dataset_id, table_id, schema):
"""Indicate whether the schema to be uploaded is a subset
Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether a subset of the fields in
the former are present in the latter. Order is not considered.
Parameters
----------
dataset_id : str
Name of the BigQuery dataset for the table
table_id : str
Name of the BigQuery table
schema : list(dict)
Schema for comparison. Each item should have
a 'name' and a 'type'
Returns
-------
bool
Whether the passed schema is a subset
"""

fields_remote = self._clean_schema_fields(
self.schema(dataset_id, table_id)
)
fields_local = self._clean_schema_fields(schema["fields"])

return all(field in fields_remote for field in fields_local)

def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
table = _Table(
self.project_id, dataset_id, credentials=self.credentials
Expand Down Expand Up @@ -1141,7 +1105,6 @@ def to_gbq(
"""

_test_google_api_imports()
from pandas_gbq import schema

if verbose is not None and SHOW_VERBOSE_DEPRECATION:
warnings.warn(
Expand All @@ -1168,25 +1131,31 @@ def to_gbq(
credentials=credentials,
private_key=private_key,
)
bqclient = connector.client
dataset_id, table_id = destination_table.rsplit(".", 1)

table = _Table(
project_id,
dataset_id,
location=location,
credentials=connector.credentials,
)

default_schema = _generate_bq_schema(dataframe)
if not table_schema:
table_schema = default_schema
else:
table_schema = schema.update_schema(
table_schema = pandas_gbq.schema.update_schema(
default_schema, dict(fields=table_schema)
)

# If table exists, check if_exists parameter
if table.exists(table_id):
try:
table = bqclient.get_table(destination_table)
except google_exceptions.NotFound:
table_connector = _Table(
project_id,
dataset_id,
location=location,
credentials=connector.credentials,
)
table_connector.create(table_id, table_schema)
else:
original_schema = pandas_gbq.schema.to_pandas_gbq(table.schema)

if if_exists == "fail":
raise TableCreationError(
"Could not create the table because it "
Expand All @@ -1199,22 +1168,20 @@ def to_gbq(
dataset_id, table_id, table_schema
)
elif if_exists == "append":
if not connector.schema_is_subset(
dataset_id, table_id, table_schema
if not pandas_gbq.schema.schema_is_subset(
original_schema, table_schema
):
raise InvalidSchema(
"Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table."
)

# Fetch original schema, and update the local `table_schema`
original_schema = table.schema(table_id)
table_schema = schema.update_schema(
table_schema, dict(fields=original_schema)
# Update the local `table_schema` so mode matches.
# See: https://github.com/pydata/pandas-gbq/issues/315
table_schema = pandas_gbq.schema.update_schema(
table_schema, original_schema
)
else:
table.create(table_id, table_schema)

if dataframe.empty:
# Create the table (if needed), but don't try to run a load job with an
Expand Down Expand Up @@ -1373,30 +1340,6 @@ def delete(self, table_id):
except self.http_error as ex:
self.process_http_error(ex)

def schema(self, table_id):
"""Retrieve the schema of the table
Obtain from BigQuery the field names and field types
for the table defined by the parameters
Parameters
----------
table_id : str
Name of table whose schema is to be fetched
Returns
-------
list of dicts
Fields representing the schema
"""
if not self.exists(table_id):
raise NotFoundException(
"Table {0} does not exist".format(table_id)
)

original_schema = super(_Table, self).schema(self.dataset_id, table_id)
return original_schema


class _Dataset(GbqConnector):
def __init__(
Expand Down
56 changes: 53 additions & 3 deletions pandas_gbq/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,59 @@
import copy


def to_pandas_gbq(client_schema):
"""Given a sequence of :class:`google.cloud.bigquery.schema.SchemaField`,
return a schema in pandas-gbq API format.
"""
remote_fields = [
field_remote.to_api_repr() for field_remote in client_schema
]
for field in remote_fields:
field["type"] = field["type"].upper()
field["mode"] = field["mode"].upper()

return {"fields": remote_fields}


def _clean_schema_fields(fields):
"""Return a sanitized version of the schema for comparisons.
The ``mode`` and ``description`` properties areis ignored because they
are not generated by func:`pandas_gbq.schema.generate_bq_schema`.
"""
fields_sorted = sorted(fields, key=lambda field: field["name"])
return [
{"name": field["name"], "type": field["type"]}
for field in fields_sorted
]


def schema_is_subset(schema_remote, schema_local):
"""Indicate whether the schema to be uploaded is a subset
Compare the BigQuery table identified in the parameters with
the schema passed in and indicate whether a subset of the fields in
the former are present in the latter. Order is not considered.
Parameters
----------
schema_remote : dict
Schema for comparison. Each item of ``fields`` should have a 'name'
and a 'type'
schema_local : dict
Schema for comparison. Each item of ``fields`` should have a 'name'
and a 'type'
Returns
-------
bool
Whether the passed schema is a subset
"""
fields_remote = _clean_schema_fields(schema_remote.get("fields", []))
fields_local = _clean_schema_fields(schema_local.get("fields", []))
return all(field in fields_remote for field in fields_local)


def generate_bq_schema(dataframe, default_type="STRING"):
"""Given a passed dataframe, generate the associated Google BigQuery schema.
Expand Down Expand Up @@ -59,9 +112,6 @@ def update_schema(schema_old, schema_new):
if name in field_indices:
# replace old field with new field of same name
output_fields[field_indices[name]] = field
else:
# add new field
output_fields.append(field)

return {"fields": output_fields}

Expand Down
52 changes: 3 additions & 49 deletions tests/system/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pytz

from pandas_gbq import gbq
import pandas_gbq.schema


TABLE_ID = "new_test"
Expand Down Expand Up @@ -1637,7 +1638,7 @@ def test_retrieve_schema(gbq_table, gbq_connector):
}

gbq_table.create(table_id, test_schema)
actual = gbq_connector._clean_schema_fields(
actual = pandas_gbq.schema._clean_schema_fields(
gbq_connector.schema(gbq_table.dataset_id, table_id)
)
expected = [
Expand All @@ -1649,53 +1650,6 @@ def test_retrieve_schema(gbq_table, gbq_connector):
assert expected == actual, "Expected schema used to create table"


def test_schema_is_subset_passes_if_subset(gbq_table, gbq_connector):
# Issue #24 schema_is_subset indicates whether the schema of the
# dataframe is a subset of the schema of the bigquery table
table_id = "test_schema_is_subset_passes_if_subset"
table_schema = {
"fields": [
{"name": "A", "type": "FLOAT"},
{"name": "B", "type": "FLOAT"},
{"name": "C", "type": "STRING"},
]
}
tested_schema = {
"fields": [
{"name": "A", "type": "FLOAT"},
{"name": "B", "type": "FLOAT"},
]
}

gbq_table.create(table_id, table_schema)
assert gbq_connector.schema_is_subset(
gbq_table.dataset_id, table_id, tested_schema
)


def test_schema_is_subset_fails_if_not_subset(gbq_table, gbq_connector):
# For pull request #24
table_id = "test_schema_is_subset_fails_if_not_subset"
table_schema = {
"fields": [
{"name": "A", "type": "FLOAT"},
{"name": "B", "type": "FLOAT"},
{"name": "C", "type": "STRING"},
]
}
tested_schema = {
"fields": [
{"name": "A", "type": "FLOAT"},
{"name": "C", "type": "FLOAT"},
]
}

gbq_table.create(table_id, table_schema)
assert not gbq_connector.schema_is_subset(
gbq_table.dataset_id, table_id, tested_schema
)


def test_schema_is_not_overwritten(gbq_table, gbq_connector):
# Fixes bug #315
table_id = "test_schema_is_not_overwritten_for_existing_table"
Expand Down Expand Up @@ -1730,5 +1684,5 @@ def test_schema_is_not_overwritten(gbq_table, gbq_connector):
if_exists="append",
)

actual = gbq_table.schema(table_id)
actual = gbq_connector.schema(gbq_table.dataset_id, table_id)
assert table_schema["fields"] == actual
Loading

0 comments on commit d8c11aa

Please sign in to comment.