Skip to content

Commit

Permalink
feat: map "if_exists" value to LoadJobConfig.WriteDisposition (#583)
Browse files Browse the repository at this point in the history
* feat: map "if_exists" value to LoadJobConfig.WriteDisposition

This uses LoadJobConfig.WriteDisposition to replace if_exists='fail'/'replace'/'append' behavior in to_gbq()

### Dependency updates

- Update the minimum version of `db-dtypes` to 1.0.4
- Update the minimum version of `google-api-core` to 2.10.2
- Update the minimum version of `google-auth` to 2.13.0
- Update the minimum version of `google-auth-oauthlib` to 0.7.0
- Update the minimum version of `google-cloud-bigquery` to 3.3.5
- Update the minimum version of `google-cloud-bigquery-storage` to 2.16.2
- Update the minimum version of `pandas` to 1.1.4
- Update the minimum version of `pydata-google-auth` to 1.4.0
  • Loading branch information
aribray authored Nov 7, 2022
1 parent afd6e21 commit 7389cd2
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 121 deletions.
15 changes: 9 additions & 6 deletions ci/requirements-3.7-0.24.2.conda
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
codecov
coverage
db-dtypes==0.3.1
db-dtypes
fastavro
flake8
freezegun
numpy==1.16.6
google-cloud-bigquery==1.27.2
google-cloud-bigquery-storage==1.1.0
pyarrow==3.0.0
numpy
google-api-core
google-auth
google-cloud-bigquery
google-cloud-bigquery-storage
pyarrow
pydata-google-auth
pytest
pytest-cov
tqdm==4.23.0
requests-oauthlib
tqdm
99 changes: 41 additions & 58 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
if typing.TYPE_CHECKING: # pragma: NO COVER
import pandas

from pandas_gbq.exceptions import (
AccessDenied,
GenericGBQException,
)
from pandas_gbq.exceptions import AccessDenied, GenericGBQException
from pandas_gbq.features import FEATURES
import pandas_gbq.schema
import pandas_gbq.timestamp
Expand Down Expand Up @@ -116,20 +113,12 @@ class InvalidSchema(ValueError):
table in BigQuery.
"""

def __init__(
self, message: str, local_schema: Dict[str, Any], remote_schema: Dict[str, Any]
):
super().__init__(message)
self._local_schema = local_schema
self._remote_schema = remote_schema

@property
def local_schema(self) -> Dict[str, Any]:
return self._local_schema
def __init__(self, message: str):
self._message = message

@property
def remote_schema(self) -> Dict[str, Any]:
return self._remote_schema
def message(self) -> str:
return self._message


class NotFoundException(ValueError):
Expand All @@ -155,7 +144,12 @@ class TableCreationError(ValueError):
Raised when the create table method fails
"""

pass
def __init__(self, message: str):
self._message = message

@property
def message(self) -> str:
return self._message


class Context(object):
Expand Down Expand Up @@ -382,8 +376,14 @@ def process_http_error(ex):

if "cancelled" in ex.message:
raise QueryTimeout("Reason: {0}".format(ex))

raise GenericGBQException("Reason: {0}".format(ex))
elif "Provided Schema does not match" in ex.message:
error_message = ex.errors[0]["message"]
raise InvalidSchema(f"Reason: {error_message}")
elif "Already Exists: Table" in ex.message:
error_message = ex.errors[0]["message"]
raise TableCreationError(f"Reason: {error_message}")
else:
raise GenericGBQException("Reason: {0}".format(ex))

def download_table(
self,
Expand Down Expand Up @@ -577,6 +577,7 @@ def load_data(
self,
dataframe,
destination_table_ref,
write_disposition,
chunksize=None,
schema=None,
progress_bar=True,
Expand All @@ -596,6 +597,7 @@ def load_data(
schema=schema,
location=self.location,
api_method=api_method,
write_disposition=write_disposition,
billing_project=billing_project,
)
if progress_bar and tqdm:
Expand All @@ -609,11 +611,6 @@ def load_data(
except self.http_error as ex:
self.process_http_error(ex)

def delete_and_recreate_table(self, project_id, dataset_id, table_id, table_schema):
table = _Table(project_id, dataset_id, credentials=self.credentials)
table.delete(table_id)
table.create(table_id, table_schema)


def _bqschema_to_nullsafe_dtypes(schema_fields):
"""Specify explicit dtypes based on BigQuery schema.
Expand Down Expand Up @@ -975,11 +972,9 @@ def to_gbq(
):
"""Write a DataFrame to a Google BigQuery table.
The main method a user calls to export pandas DataFrame contents to
Google BigQuery table.
The main method a user calls to export pandas DataFrame contents to Google BigQuery table.
This method uses the Google Cloud client library to make requests to
Google BigQuery, documented `here
This method uses the Google Cloud client library to make requests to Google BigQuery, documented `here
<https://googleapis.dev/python/bigquery/latest/index.html>`__.
See the :ref:`How to authenticate with Google BigQuery <authentication>`
Expand Down Expand Up @@ -1114,15 +1109,21 @@ def to_gbq(
stacklevel=2,
)

if if_exists not in ("fail", "replace", "append"):
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))

if "." not in destination_table:
raise NotFoundException(
"Invalid Table Name. Should be of the form 'datasetId.tableId' or "
"'projectId.datasetId.tableId'"
)

if if_exists not in ("fail", "replace", "append"):
raise ValueError("'{0}' is not valid for if_exists".format(if_exists))

if_exists_list = ["fail", "replace", "append"]
dispositions = ["WRITE_EMPTY", "WRITE_TRUNCATE", "WRITE_APPEND"]
dispositions_dict = dict(zip(if_exists_list, dispositions))

write_disposition = dispositions_dict[if_exists]

connector = GbqConnector(
project_id,
reauth=reauth,
Expand All @@ -1142,17 +1143,20 @@ def to_gbq(
table_id = destination_table_ref.table_id

default_schema = _generate_bq_schema(dataframe)
# If table_schema isn't provided, we'll create one for you
if not table_schema:
table_schema = default_schema
# It table_schema is provided, we'll update the default_schema to the provided table_schema
else:
table_schema = pandas_gbq.schema.update_schema(
default_schema, dict(fields=table_schema)
)

# If table exists, check if_exists parameter
try:
# Try to get the table
table = bqclient.get_table(destination_table_ref)
except google_exceptions.NotFound:
# If the table doesn't already exist, create it
table_connector = _Table(
project_id_table,
dataset_id,
Expand All @@ -1161,34 +1165,12 @@ def to_gbq(
)
table_connector.create(table_id, table_schema)
else:
# Convert original schema (the schema that already exists) to pandas-gbq API format
original_schema = pandas_gbq.schema.to_pandas_gbq(table.schema)

if if_exists == "fail":
raise TableCreationError(
"Could not create the table because it "
"already exists. "
"Change the if_exists parameter to "
"'append' or 'replace' data."
)
elif if_exists == "replace":
connector.delete_and_recreate_table(
project_id_table, dataset_id, table_id, table_schema
)
else:
if not pandas_gbq.schema.schema_is_subset(original_schema, table_schema):
raise InvalidSchema(
"Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table.",
table_schema,
original_schema,
)

# Update the local `table_schema` so mode (NULLABLE/REQUIRED)
# matches. See: https://github.com/pydata/pandas-gbq/issues/315
table_schema = pandas_gbq.schema.update_schema(
table_schema, original_schema
)
# Update the local `table_schema` so mode (NULLABLE/REQUIRED)
# matches. See: https://github.com/pydata/pandas-gbq/issues/315
table_schema = pandas_gbq.schema.update_schema(table_schema, original_schema)

if dataframe.empty:
# Create the table (if needed), but don't try to run a load job with an
Expand All @@ -1198,6 +1180,7 @@ def to_gbq(
connector.load_data(
dataframe,
destination_table_ref,
write_disposition=write_disposition,
chunksize=chunksize,
schema=table_schema,
progress_bar=progress_bar,
Expand Down
18 changes: 12 additions & 6 deletions pandas_gbq/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,13 @@ def load_parquet(
client: bigquery.Client,
dataframe: pandas.DataFrame,
destination_table_ref: bigquery.TableReference,
write_disposition: str,
location: Optional[str],
schema: Optional[Dict[str, Any]],
billing_project: Optional[str] = None,
):
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = "WRITE_APPEND"
job_config.create_disposition = "CREATE_NEVER"
job_config.write_disposition = write_disposition
job_config.source_format = "PARQUET"

if schema is not None:
Expand All @@ -143,13 +143,13 @@ def load_parquet(

def load_csv(
dataframe: pandas.DataFrame,
write_disposition: str,
chunksize: Optional[int],
bq_schema: Optional[List[bigquery.SchemaField]],
load_chunk: Callable,
):
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = "WRITE_APPEND"
job_config.create_disposition = "CREATE_NEVER"
job_config.write_disposition = write_disposition
job_config.source_format = "CSV"
job_config.allow_quoted_newlines = True

Expand All @@ -167,6 +167,7 @@ def load_csv_from_dataframe(
client: bigquery.Client,
dataframe: pandas.DataFrame,
destination_table_ref: bigquery.TableReference,
write_disposition: str,
location: Optional[str],
chunksize: Optional[int],
schema: Optional[Dict[str, Any]],
Expand All @@ -187,13 +188,14 @@ def load_chunk(chunk, job_config):
project=billing_project,
).result()

return load_csv(dataframe, chunksize, bq_schema, load_chunk)
return load_csv(dataframe, write_disposition, chunksize, bq_schema, load_chunk)


def load_csv_from_file(
client: bigquery.Client,
dataframe: pandas.DataFrame,
destination_table_ref: bigquery.TableReference,
write_disposition: str,
location: Optional[str],
chunksize: Optional[int],
schema: Optional[Dict[str, Any]],
Expand Down Expand Up @@ -223,7 +225,7 @@ def load_chunk(chunk, job_config):
finally:
chunk_buffer.close()

return load_csv(dataframe, chunksize, bq_schema, load_chunk)
return load_csv(dataframe, write_disposition, chunksize, bq_schema, load_chunk)


def load_chunks(
Expand All @@ -234,13 +236,15 @@ def load_chunks(
schema=None,
location=None,
api_method="load_parquet",
write_disposition="WRITE_EMPTY",
billing_project: Optional[str] = None,
):
if api_method == "load_parquet":
load_parquet(
client,
dataframe,
destination_table_ref,
write_disposition,
location,
schema,
billing_project=billing_project,
Expand All @@ -253,6 +257,7 @@ def load_chunks(
client,
dataframe,
destination_table_ref,
write_disposition,
location,
chunksize,
schema,
Expand All @@ -263,6 +268,7 @@ def load_chunks(
client,
dataframe,
destination_table_ref,
write_disposition,
location,
chunksize,
schema,
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ pandas
google-auth
google-auth-oauthlib
google-cloud-bigquery
tqdm
tqdm
16 changes: 8 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,24 @@
release_status = "Development Status :: 4 - Beta"
dependencies = [
"setuptools",
"db-dtypes >=0.3.1,<2.0.0",
"db-dtypes >=1.0.4,<2.0.0",
"numpy >=1.16.6",
"pandas >=0.24.2",
"pandas >=1.1.4",
"pyarrow >=3.0.0, <10.0dev",
"pydata-google-auth",
"pydata-google-auth >=1.4.0",
# Note: google-api-core and google-auth are also included via transitive
# dependency on google-cloud-bigquery, but this library also uses them
# directly.
"google-api-core >= 1.31.5, <3.0.0dev,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0",
"google-auth >=1.25.0",
"google-auth-oauthlib >=0.0.1",
"google-api-core >= 2.10.2, <3.0.0dev",
"google-auth >=2.13.0",
"google-auth-oauthlib >=0.7.0",
# Require 1.27.* because it has a fix for out-of-bounds timestamps. See:
# https://github.com/googleapis/python-bigquery/pull/209 and
# https://github.com/googleapis/python-bigquery-pandas/issues/365
# Exclude 2.4.* because it has a bug where waiting for the query can hang
# indefinitely. https://github.com/pydata/pandas-gbq/issues/343
"google-cloud-bigquery >=1.27.2,<4.0.0dev,!=2.4.*",
"google-cloud-bigquery-storage >=1.1.0,<3.0.0dev",
"google-cloud-bigquery >=3.3.5,<4.0.0dev,!=2.4.*",
"google-cloud-bigquery-storage >=2.16.2,<3.0.0dev",
]
extras = {
"tqdm": "tqdm>=4.23.0",
Expand Down
16 changes: 8 additions & 8 deletions testing/constraints-3.7.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@
#
# e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev",
# Then this file should have foo==1.14.0
db-dtypes==0.3.1
google-api-core==1.31.5
google-auth==1.25.0
google-auth-oauthlib==0.0.1
google-cloud-bigquery==1.27.2
google-cloud-bigquery-storage==1.1.0
db-dtypes==1.0.4
google-api-core==2.10.2
google-auth==2.13.0
google-auth-oauthlib==0.7.0
google-cloud-bigquery==3.3.5
google-cloud-bigquery-storage==2.16.2
numpy==1.16.6
pandas==0.24.2
pandas==1.1.4
pyarrow==3.0.0
pydata-google-auth==0.1.2
pydata-google-auth==1.4.0
tqdm==4.23.0
protobuf==3.19.5
Loading

0 comments on commit 7389cd2

Please sign in to comment.