Skip to content

Commit

Permalink
ENH: Added support for partitioned table creation and data insertion (g…
Browse files Browse the repository at this point in the history
  • Loading branch information
paoloburelli committed Aug 25, 2017
1 parent 79c9067 commit 2871690
Show file tree
Hide file tree
Showing 2 changed files with 317 additions and 21 deletions.
159 changes: 138 additions & 21 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import uuid
import time
import sys
from random import randint

import numpy as np

Expand Down Expand Up @@ -674,6 +675,38 @@ def load_data(self, dataframe, dataset_id, table_id, chunksize):

self._print("\n")

def resource(self, dataset_id, table_id):
"""Retrieve the resource describing a table
Obtain from BigQuery complete description
for the table defined by the parameters
Parameters
----------
dataset_id : str
Name of the BigQuery dataset for the table
table_id : str
Name of the BigQuery table
Returns
-------
object
Table resource
"""

try:
from googleapiclient.errors import HttpError
except:
from apiclient.errors import HttpError

try:
return self.service.tables().get(
projectId=self.project_id,
datasetId=dataset_id,
tableId=table_id).execute()
except HttpError as ex:
self.process_http_error(ex)

def schema(self, dataset_id, table_id):
"""Retrieve the schema of the table
Expand Down Expand Up @@ -1046,27 +1079,89 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,

table_schema = _generate_bq_schema(dataframe)

# If table exists, check if_exists parameter
if table.exists(table_id):
if if_exists == 'fail':
raise TableCreationError("Could not create the table because it "
"already exists. "
"Change the if_exists parameter to "
"append or replace data.")
elif if_exists == 'replace':
connector.delete_and_recreate_table(
dataset_id, table_id, table_schema)
elif if_exists == 'append':
if not connector.schema_is_subset(dataset_id,
table_id,
table_schema):
raise InvalidSchema("Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table.")
if _Table.contains_partition_decorator(table_id):
root_table_id, partition_id = table_id.rsplit('$', 1)

if not table.exists(root_table_id):
raise NotFoundException("Could not write to the partition because "
"the table does not exist.")

table_resource = table.resource(dataset_id, root_table_id)

if 'timePartitioning' not in table_resource:
raise InvalidSchema("Could not write to the partition because "
"the table is not partitioned.")

partition_exists = read_gbq("SELECT COUNT(*) AS num_rows FROM {0}"
.format(destination_table),
project_id=project_id,
private_key=private_key)['num_rows'][0] > 0

if partition_exists:
if if_exists == 'fail':
raise TableCreationError("Could not create the partition because it "
"already exists. "
"Change the if_exists parameter to "
"append or replace data.")
elif if_exists == 'append':
if not connector.schema_is_subset(dataset_id,
root_table_id,
table_schema):
raise InvalidSchema("Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table.")
connector.load_data(dataframe, dataset_id, table_id, chunksize)

elif if_exists == 'replace':
if not connector.schema_is_subset(dataset_id,
root_table_id,
table_schema):
raise InvalidSchema("Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table.")

temporary_table_id = '_'.join([root_table_id + '_' + partition_id, str(randint(1, 100000))])
table.create(temporary_table_id, table_schema)
connector.load_data(dataframe, dataset_id, temporary_table_id, chunksize)
sleep(30) # <- Curses Google!!!
connector.run_query('select * from {0}.{1}'.format(dataset_id, temporary_table_id), configuration={
'query': {
'destinationTable': {
'projectId': project_id,
'datasetId': dataset_id,
'tableId': table_id
},
'createDisposition': 'CREATE_IF_NEEDED',
'writeDisposition': 'WRITE_TRUNCATE',
'allowLargeResults': True
}
})
table.delete(temporary_table_id)

else:
connector.load_data(dataframe, dataset_id, table_id, chunksize)

else:
table.create(table_id, table_schema)
if table.exists(table_id):
if if_exists == 'fail':
raise TableCreationError("Could not create the table because it "
"already exists. "
"Change the if_exists parameter to "
"append or replace data.")
elif if_exists == 'replace':
connector.delete_and_recreate_table(
dataset_id, table_id, table_schema)
elif if_exists == 'append':
if not connector.schema_is_subset(dataset_id,
table_id,
table_schema):
raise InvalidSchema("Please verify that the structure and "
"data types in the DataFrame match the "
"schema of the destination table.")
else:
table.create(table_id, table_schema)

connector.load_data(dataframe, dataset_id, table_id, chunksize)
connector.load_data(dataframe, dataset_id, table_id, chunksize)


def generate_bq_schema(df, default_type='STRING'):
Expand Down Expand Up @@ -1132,19 +1227,24 @@ def exists(self, table_id):
true if table exists, otherwise false
"""

if _Table.contains_partition_decorator(table_id):
root_table_id, partition_id = table_id.rsplit('$', 1)
else:
root_table_id = table_id

try:
self.service.tables().get(
projectId=self.project_id,
datasetId=self.dataset_id,
tableId=table_id).execute()
tableId=root_table_id).execute()
return True
except self.http_error as ex:
if ex.resp.status == 404:
return False
else:
self.process_http_error(ex)

def create(self, table_id, schema):
def create(self, table_id, schema, **kwargs):
""" Create a table in Google BigQuery given a table and schema
Parameters
Expand All @@ -1154,6 +1254,15 @@ def create(self, table_id, schema):
schema : str
Use the generate_bq_schema to generate your table schema from a
dataframe.
**kwargs : Arbitrary keyword arguments
body (dict): table creation extra parameters
For example:
body = {'timePartitioning': {'type': 'DAY'}}
For more information see `BigQuery SQL Reference
<https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource>`__
"""

if self.exists(table_id):
Expand All @@ -1174,6 +1283,10 @@ def create(self, table_id, schema):
}
}

config = kwargs.get('body')
if config is not None:
body.update(config)

try:
self.service.tables().insert(
projectId=self.project_id,
Expand Down Expand Up @@ -1204,6 +1317,10 @@ def delete(self, table_id):
if ex.resp.status != 404:
self.process_http_error(ex)

@staticmethod
def contains_partition_decorator(table_id):
return "$" in table_id


class _Dataset(GbqConnector):

Expand Down
Loading

0 comments on commit 2871690

Please sign in to comment.