Skip to content

Commit

Permalink
BUG: Fix uploading of dataframes containing int64 and float64 columns (
Browse files Browse the repository at this point in the history
…#117)

* BUG: Fix uploading of dataframes containing int64 and float64 columns

Fixes #116 and #96 by loading data in CSV chunks.

* ENH: allow chunksize=None to disable chunking in to_gbq()

Also, fixes lint errors.

* TST: update min g-c-bq lib to 0.29.0 in CI

* BUG: pass schema to load job for to_gbq

* Generate schema if needed for table creation.

* Restore _generate_bq_schema, as it is used in tests.

* Add fixes to changelog.
  • Loading branch information
tswast authored Feb 12, 2018
1 parent f040c18 commit 62ec85b
Show file tree
Hide file tree
Showing 8 changed files with 256 additions and 65 deletions.
2 changes: 1 addition & 1 deletion ci/requirements-3.5-0.18.1.pip
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
google-auth==1.0.0
google-auth-oauthlib==0.0.1
mock
google-cloud-bigquery==0.28.0
google-cloud-bigquery==0.29.0
3 changes: 2 additions & 1 deletion docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ Changelog

- Fix an issue where Unicode couldn't be uploaded in Python 2 (:issue:`93`)
- Add support for a passed schema in :func:``to_gbq`` instead inferring the schema from the passed ``DataFrame`` with ``DataFrame.dtypes`` (:issue:`46`)

- Fix an issue where a dataframe containing both integer and floating point columns could not be uploaded with ``to_gbq`` (:issue:`116`)
- ``to_gbq`` now uses ``to_csv`` to avoid manually looping over rows in a dataframe (should result in faster table uploads) (:issue:`96`)

0.3.0 / 2018-01-03
------------------
Expand Down
74 changes: 74 additions & 0 deletions pandas_gbq/_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""Helper methods for loading data into BigQuery"""

from google.cloud import bigquery
import six

from pandas_gbq import _schema


def encode_chunk(dataframe):
"""Return a file-like object of CSV-encoded rows.
Args:
dataframe (pandas.DataFrame): A chunk of a dataframe to encode
"""
csv_buffer = six.StringIO()
dataframe.to_csv(
csv_buffer, index=False, header=False, encoding='utf-8',
date_format='%Y-%m-%d %H:%M')

# Convert to a BytesIO buffer so that unicode text is properly handled.
# See: https://github.com/pydata/pandas-gbq/issues/106
body = csv_buffer.getvalue()
if isinstance(body, bytes):
body = body.decode('utf-8')
body = body.encode('utf-8')
return six.BytesIO(body)


def encode_chunks(dataframe, chunksize=None):
dataframe = dataframe.reset_index(drop=True)
if chunksize is None:
yield 0, encode_chunk(dataframe)
return

remaining_rows = len(dataframe)
total_rows = remaining_rows
start_index = 0
while start_index < total_rows:
end_index = start_index + chunksize
chunk_buffer = encode_chunk(dataframe[start_index:end_index])
start_index += chunksize
remaining_rows = max(0, remaining_rows - chunksize)
yield remaining_rows, chunk_buffer


def load_chunks(
client, dataframe, dataset_id, table_id, chunksize=None, schema=None):
destination_table = client.dataset(dataset_id).table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.write_disposition = 'WRITE_APPEND'
job_config.source_format = 'CSV'

if schema is None:
schema = _schema.generate_bq_schema(dataframe)

# Manually create the schema objects, adding NULLABLE mode
# as a workaround for
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4456
for field in schema['fields']:
if 'mode' not in field:
field['mode'] = 'NULLABLE'

job_config.schema = [
bigquery.SchemaField.from_api_repr(field)
for field in schema['fields']
]

chunks = encode_chunks(dataframe, chunksize=chunksize)
for remaining_rows, chunk_buffer in chunks:
yield remaining_rows
client.load_table_from_file(
chunk_buffer,
destination_table,
job_config=job_config).result()
29 changes: 29 additions & 0 deletions pandas_gbq/_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Helper methods for BigQuery schemas"""


def generate_bq_schema(dataframe, default_type='STRING'):
"""Given a passed dataframe, generate the associated Google BigQuery schema.
Arguments:
dataframe (pandas.DataFrame): D
default_type : string
The default big query type in case the type of the column
does not exist in the schema.
"""

type_mapping = {
'i': 'INTEGER',
'b': 'BOOLEAN',
'f': 'FLOAT',
'O': 'STRING',
'S': 'STRING',
'U': 'STRING',
'M': 'TIMESTAMP'
}

fields = []
for column_name, dtype in dataframe.dtypes.iteritems():
fields.append({'name': column_name,
'type': type_mapping.get(dtype.kind, default_type)})

return {'fields': fields}
95 changes: 32 additions & 63 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,45 +556,22 @@ def run_query(self, query, **kwargs):

return schema, result_rows

def load_data(self, dataframe, dataset_id, table_id, chunksize):
from google.cloud.bigquery import LoadJobConfig
from six import BytesIO

destination_table = self.client.dataset(dataset_id).table(table_id)
job_config = LoadJobConfig()
job_config.write_disposition = 'WRITE_APPEND'
job_config.source_format = 'NEWLINE_DELIMITED_JSON'
rows = []
remaining_rows = len(dataframe)

total_rows = remaining_rows
self._print("\n\n")
def load_data(
self, dataframe, dataset_id, table_id, chunksize=None,
schema=None):
from pandas_gbq import _load

for index, row in dataframe.reset_index(drop=True).iterrows():
row_json = row.to_json(
force_ascii=False, date_unit='s', date_format='iso')
rows.append(row_json)
remaining_rows -= 1
total_rows = len(dataframe)
self._print("\n\n")

if (len(rows) % chunksize == 0) or (remaining_rows == 0):
try:
for remaining_rows in _load.load_chunks(
self.client, dataframe, dataset_id, table_id,
chunksize=chunksize):
self._print("\rLoad is {0}% Complete".format(
((total_rows - remaining_rows) * 100) / total_rows))

body = '{}\n'.format('\n'.join(rows))
if isinstance(body, bytes):
body = body.decode('utf-8')
body = body.encode('utf-8')
body = BytesIO(body)

try:
self.client.load_table_from_file(
body,
destination_table,
job_config=job_config).result()
except self.http_error as ex:
self.process_http_error(ex)

rows = []
except self.http_error as ex:
self.process_http_error(ex)

self._print("\n")

Expand Down Expand Up @@ -888,7 +865,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None,
return final_df


def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
def to_gbq(dataframe, destination_table, project_id, chunksize=None,
verbose=True, reauth=False, if_exists='fail', private_key=None,
auth_local_webserver=False, table_schema=None):
"""Write a DataFrame to a Google BigQuery table.
Expand Down Expand Up @@ -922,8 +899,9 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
Name of table to be written, in the form 'dataset.tablename'
project_id : str
Google BigQuery Account project ID.
chunksize : int (default 10000)
Number of rows to be inserted in each chunk from the dataframe.
chunksize : int (default None)
Number of rows to be inserted in each chunk from the dataframe. Use
``None`` to load the dataframe in a single chunk.
verbose : boolean (default True)
Show percentage complete
reauth : boolean (default False)
Expand Down Expand Up @@ -985,7 +963,7 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
raise TableCreationError("Could not create the table because it "
"already exists. "
"Change the if_exists parameter to "
"append or replace data.")
"'append' or 'replace' data.")
elif if_exists == 'replace':
connector.delete_and_recreate_table(
dataset_id, table_id, table_schema)
Expand All @@ -999,19 +977,14 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000,
else:
table.create(table_id, table_schema)

connector.load_data(dataframe, dataset_id, table_id, chunksize)
connector.load_data(
dataframe, dataset_id, table_id, chunksize=chunksize,
schema=table_schema)


def generate_bq_schema(df, default_type='STRING'):
# deprecation TimeSeries, #11121
warnings.warn("generate_bq_schema is deprecated and will be removed in "
"a future version", FutureWarning, stacklevel=2)

return _generate_bq_schema(df, default_type=default_type)


def _generate_bq_schema(df, default_type='STRING'):
""" Given a passed df, generate the associated Google BigQuery schema.
"""DEPRECATED: Given a passed df, generate the associated Google BigQuery
schema.
Parameters
----------
Expand All @@ -1020,23 +993,16 @@ def _generate_bq_schema(df, default_type='STRING'):
The default big query type in case the type of the column
does not exist in the schema.
"""
# deprecation TimeSeries, #11121
warnings.warn("generate_bq_schema is deprecated and will be removed in "
"a future version", FutureWarning, stacklevel=2)

type_mapping = {
'i': 'INTEGER',
'b': 'BOOLEAN',
'f': 'FLOAT',
'O': 'STRING',
'S': 'STRING',
'U': 'STRING',
'M': 'TIMESTAMP'
}
return _generate_bq_schema(df, default_type=default_type)

fields = []
for column_name, dtype in df.dtypes.iteritems():
fields.append({'name': column_name,
'type': type_mapping.get(dtype.kind, default_type)})

return {'fields': fields}
def _generate_bq_schema(df, default_type='STRING'):
from pandas_gbq import _schema
return _schema.generate_bq_schema(df, default_type=default_type)


class _Table(GbqConnector):
Expand Down Expand Up @@ -1096,6 +1062,9 @@ def create(self, table_id, schema):
table_ref = self.client.dataset(self.dataset_id).table(table_id)
table = Table(table_ref)

# Manually create the schema objects, adding NULLABLE mode
# as a workaround for
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4456
for field in schema['fields']:
if 'mode' not in field:
field['mode'] = 'NULLABLE'
Expand Down
40 changes: 40 additions & 0 deletions pandas_gbq/tests/test__load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-

import numpy
import pandas


def test_encode_chunk_with_unicode():
"""Test that a dataframe containing unicode can be encoded as a file.
See: https://github.com/pydata/pandas-gbq/issues/106
"""
from pandas_gbq._load import encode_chunk

df = pandas.DataFrame(
numpy.random.randn(6, 4), index=range(6), columns=list('ABCD'))
df['s'] = u'信用卡'
csv_buffer = encode_chunk(df)
csv_bytes = csv_buffer.read()
csv_string = csv_bytes.decode('utf-8')
assert u'信用卡' in csv_string


def test_encode_chunks_splits_dataframe():
from pandas_gbq._load import encode_chunks
df = pandas.DataFrame(numpy.random.randn(6, 4), index=range(6))
chunks = list(encode_chunks(df, chunksize=2))
assert len(chunks) == 3
remaining, buffer = chunks[0]
assert remaining == 4
assert len(buffer.readlines()) == 2


def test_encode_chunks_with_chunksize_none():
from pandas_gbq._load import encode_chunks
df = pandas.DataFrame(numpy.random.randn(6, 4), index=range(6))
chunks = list(encode_chunks(df))
assert len(chunks) == 1
remaining, buffer = chunks[0]
assert remaining == 0
assert len(buffer.readlines()) == 6
55 changes: 55 additions & 0 deletions pandas_gbq/tests/test__schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

import datetime

import pandas
import pytest

from pandas_gbq import _schema


@pytest.mark.parametrize(
'dataframe,expected_schema',
[
(
pandas.DataFrame(data={'col1': [1, 2, 3]}),
{'fields': [{'name': 'col1', 'type': 'INTEGER'}]},
),
(
pandas.DataFrame(data={'col1': [True, False]}),
{'fields': [{'name': 'col1', 'type': 'BOOLEAN'}]},
),
(
pandas.DataFrame(data={'col1': [1.0, 3.14]}),
{'fields': [{'name': 'col1', 'type': 'FLOAT'}]},
),
(
pandas.DataFrame(data={'col1': [u'hello', u'world']}),
{'fields': [{'name': 'col1', 'type': 'STRING'}]},
),
(
pandas.DataFrame(data={'col1': [datetime.datetime.now()]}),
{'fields': [{'name': 'col1', 'type': 'TIMESTAMP'}]},
),
(
pandas.DataFrame(
data={
'col1': [datetime.datetime.now()],
'col2': [u'hello'],
'col3': [3.14],
'col4': [True],
'col5': [4],
}),
{
'fields': [
{'name': 'col1', 'type': 'TIMESTAMP'},
{'name': 'col2', 'type': 'STRING'},
{'name': 'col3', 'type': 'FLOAT'},
{'name': 'col4', 'type': 'BOOLEAN'},
{'name': 'col5', 'type': 'INTEGER'},
],
},
),
])
def test_generate_bq_schema(dataframe, expected_schema):
schema = _schema.generate_bq_schema(dataframe)
assert schema == expected_schema
Loading

0 comments on commit 62ec85b

Please sign in to comment.