From 62ec85b4002ecd25c50ad0f647bdfdfd9db62fd7 Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Mon, 12 Feb 2018 11:30:18 -0800 Subject: [PATCH] BUG: Fix uploading of dataframes containing int64 and float64 columns (#117) * BUG: Fix uploading of dataframes containing int64 and float64 columns Fixes #116 and #96 by loading data in CSV chunks. * ENH: allow chunksize=None to disable chunking in to_gbq() Also, fixes lint errors. * TST: update min g-c-bq lib to 0.29.0 in CI * BUG: pass schema to load job for to_gbq * Generate schema if needed for table creation. * Restore _generate_bq_schema, as it is used in tests. * Add fixes to changelog. --- ci/requirements-3.5-0.18.1.pip | 2 +- docs/source/changelog.rst | 3 +- pandas_gbq/_load.py | 74 +++++++++++++++++++++++++ pandas_gbq/_schema.py | 29 ++++++++++ pandas_gbq/gbq.py | 95 +++++++++++--------------------- pandas_gbq/tests/test__load.py | 40 ++++++++++++++ pandas_gbq/tests/test__schema.py | 55 ++++++++++++++++++ pandas_gbq/tests/test_gbq.py | 23 ++++++++ 8 files changed, 256 insertions(+), 65 deletions(-) create mode 100644 pandas_gbq/_load.py create mode 100644 pandas_gbq/_schema.py create mode 100644 pandas_gbq/tests/test__load.py create mode 100644 pandas_gbq/tests/test__schema.py diff --git a/ci/requirements-3.5-0.18.1.pip b/ci/requirements-3.5-0.18.1.pip index 18369345..ec27d3cc 100644 --- a/ci/requirements-3.5-0.18.1.pip +++ b/ci/requirements-3.5-0.18.1.pip @@ -1,4 +1,4 @@ google-auth==1.0.0 google-auth-oauthlib==0.0.1 mock -google-cloud-bigquery==0.28.0 +google-cloud-bigquery==0.29.0 diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index c1e09e9d..c3d0aa74 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -7,7 +7,8 @@ Changelog - Fix an issue where Unicode couldn't be uploaded in Python 2 (:issue:`93`) - Add support for a passed schema in :func:``to_gbq`` instead inferring the schema from the passed ``DataFrame`` with ``DataFrame.dtypes`` (:issue:`46`) - +- Fix an issue where a dataframe containing both integer and floating point columns could not be uploaded with ``to_gbq`` (:issue:`116`) +- ``to_gbq`` now uses ``to_csv`` to avoid manually looping over rows in a dataframe (should result in faster table uploads) (:issue:`96`) 0.3.0 / 2018-01-03 ------------------ diff --git a/pandas_gbq/_load.py b/pandas_gbq/_load.py new file mode 100644 index 00000000..45dfec58 --- /dev/null +++ b/pandas_gbq/_load.py @@ -0,0 +1,74 @@ +"""Helper methods for loading data into BigQuery""" + +from google.cloud import bigquery +import six + +from pandas_gbq import _schema + + +def encode_chunk(dataframe): + """Return a file-like object of CSV-encoded rows. + + Args: + dataframe (pandas.DataFrame): A chunk of a dataframe to encode + """ + csv_buffer = six.StringIO() + dataframe.to_csv( + csv_buffer, index=False, header=False, encoding='utf-8', + date_format='%Y-%m-%d %H:%M') + + # Convert to a BytesIO buffer so that unicode text is properly handled. + # See: https://github.com/pydata/pandas-gbq/issues/106 + body = csv_buffer.getvalue() + if isinstance(body, bytes): + body = body.decode('utf-8') + body = body.encode('utf-8') + return six.BytesIO(body) + + +def encode_chunks(dataframe, chunksize=None): + dataframe = dataframe.reset_index(drop=True) + if chunksize is None: + yield 0, encode_chunk(dataframe) + return + + remaining_rows = len(dataframe) + total_rows = remaining_rows + start_index = 0 + while start_index < total_rows: + end_index = start_index + chunksize + chunk_buffer = encode_chunk(dataframe[start_index:end_index]) + start_index += chunksize + remaining_rows = max(0, remaining_rows - chunksize) + yield remaining_rows, chunk_buffer + + +def load_chunks( + client, dataframe, dataset_id, table_id, chunksize=None, schema=None): + destination_table = client.dataset(dataset_id).table(table_id) + job_config = bigquery.LoadJobConfig() + job_config.write_disposition = 'WRITE_APPEND' + job_config.source_format = 'CSV' + + if schema is None: + schema = _schema.generate_bq_schema(dataframe) + + # Manually create the schema objects, adding NULLABLE mode + # as a workaround for + # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4456 + for field in schema['fields']: + if 'mode' not in field: + field['mode'] = 'NULLABLE' + + job_config.schema = [ + bigquery.SchemaField.from_api_repr(field) + for field in schema['fields'] + ] + + chunks = encode_chunks(dataframe, chunksize=chunksize) + for remaining_rows, chunk_buffer in chunks: + yield remaining_rows + client.load_table_from_file( + chunk_buffer, + destination_table, + job_config=job_config).result() diff --git a/pandas_gbq/_schema.py b/pandas_gbq/_schema.py new file mode 100644 index 00000000..25e3ca9b --- /dev/null +++ b/pandas_gbq/_schema.py @@ -0,0 +1,29 @@ +"""Helper methods for BigQuery schemas""" + + +def generate_bq_schema(dataframe, default_type='STRING'): + """Given a passed dataframe, generate the associated Google BigQuery schema. + + Arguments: + dataframe (pandas.DataFrame): D + default_type : string + The default big query type in case the type of the column + does not exist in the schema. + """ + + type_mapping = { + 'i': 'INTEGER', + 'b': 'BOOLEAN', + 'f': 'FLOAT', + 'O': 'STRING', + 'S': 'STRING', + 'U': 'STRING', + 'M': 'TIMESTAMP' + } + + fields = [] + for column_name, dtype in dataframe.dtypes.iteritems(): + fields.append({'name': column_name, + 'type': type_mapping.get(dtype.kind, default_type)}) + + return {'fields': fields} diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index 69f9f29f..8ccd2511 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -556,45 +556,22 @@ def run_query(self, query, **kwargs): return schema, result_rows - def load_data(self, dataframe, dataset_id, table_id, chunksize): - from google.cloud.bigquery import LoadJobConfig - from six import BytesIO - - destination_table = self.client.dataset(dataset_id).table(table_id) - job_config = LoadJobConfig() - job_config.write_disposition = 'WRITE_APPEND' - job_config.source_format = 'NEWLINE_DELIMITED_JSON' - rows = [] - remaining_rows = len(dataframe) - - total_rows = remaining_rows - self._print("\n\n") + def load_data( + self, dataframe, dataset_id, table_id, chunksize=None, + schema=None): + from pandas_gbq import _load - for index, row in dataframe.reset_index(drop=True).iterrows(): - row_json = row.to_json( - force_ascii=False, date_unit='s', date_format='iso') - rows.append(row_json) - remaining_rows -= 1 + total_rows = len(dataframe) + self._print("\n\n") - if (len(rows) % chunksize == 0) or (remaining_rows == 0): + try: + for remaining_rows in _load.load_chunks( + self.client, dataframe, dataset_id, table_id, + chunksize=chunksize): self._print("\rLoad is {0}% Complete".format( ((total_rows - remaining_rows) * 100) / total_rows)) - - body = '{}\n'.format('\n'.join(rows)) - if isinstance(body, bytes): - body = body.decode('utf-8') - body = body.encode('utf-8') - body = BytesIO(body) - - try: - self.client.load_table_from_file( - body, - destination_table, - job_config=job_config).result() - except self.http_error as ex: - self.process_http_error(ex) - - rows = [] + except self.http_error as ex: + self.process_http_error(ex) self._print("\n") @@ -888,7 +865,7 @@ def read_gbq(query, project_id=None, index_col=None, col_order=None, return final_df -def to_gbq(dataframe, destination_table, project_id, chunksize=10000, +def to_gbq(dataframe, destination_table, project_id, chunksize=None, verbose=True, reauth=False, if_exists='fail', private_key=None, auth_local_webserver=False, table_schema=None): """Write a DataFrame to a Google BigQuery table. @@ -922,8 +899,9 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, Name of table to be written, in the form 'dataset.tablename' project_id : str Google BigQuery Account project ID. - chunksize : int (default 10000) - Number of rows to be inserted in each chunk from the dataframe. + chunksize : int (default None) + Number of rows to be inserted in each chunk from the dataframe. Use + ``None`` to load the dataframe in a single chunk. verbose : boolean (default True) Show percentage complete reauth : boolean (default False) @@ -985,7 +963,7 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, raise TableCreationError("Could not create the table because it " "already exists. " "Change the if_exists parameter to " - "append or replace data.") + "'append' or 'replace' data.") elif if_exists == 'replace': connector.delete_and_recreate_table( dataset_id, table_id, table_schema) @@ -999,19 +977,14 @@ def to_gbq(dataframe, destination_table, project_id, chunksize=10000, else: table.create(table_id, table_schema) - connector.load_data(dataframe, dataset_id, table_id, chunksize) + connector.load_data( + dataframe, dataset_id, table_id, chunksize=chunksize, + schema=table_schema) def generate_bq_schema(df, default_type='STRING'): - # deprecation TimeSeries, #11121 - warnings.warn("generate_bq_schema is deprecated and will be removed in " - "a future version", FutureWarning, stacklevel=2) - - return _generate_bq_schema(df, default_type=default_type) - - -def _generate_bq_schema(df, default_type='STRING'): - """ Given a passed df, generate the associated Google BigQuery schema. + """DEPRECATED: Given a passed df, generate the associated Google BigQuery + schema. Parameters ---------- @@ -1020,23 +993,16 @@ def _generate_bq_schema(df, default_type='STRING'): The default big query type in case the type of the column does not exist in the schema. """ + # deprecation TimeSeries, #11121 + warnings.warn("generate_bq_schema is deprecated and will be removed in " + "a future version", FutureWarning, stacklevel=2) - type_mapping = { - 'i': 'INTEGER', - 'b': 'BOOLEAN', - 'f': 'FLOAT', - 'O': 'STRING', - 'S': 'STRING', - 'U': 'STRING', - 'M': 'TIMESTAMP' - } + return _generate_bq_schema(df, default_type=default_type) - fields = [] - for column_name, dtype in df.dtypes.iteritems(): - fields.append({'name': column_name, - 'type': type_mapping.get(dtype.kind, default_type)}) - return {'fields': fields} +def _generate_bq_schema(df, default_type='STRING'): + from pandas_gbq import _schema + return _schema.generate_bq_schema(df, default_type=default_type) class _Table(GbqConnector): @@ -1096,6 +1062,9 @@ def create(self, table_id, schema): table_ref = self.client.dataset(self.dataset_id).table(table_id) table = Table(table_ref) + # Manually create the schema objects, adding NULLABLE mode + # as a workaround for + # https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4456 for field in schema['fields']: if 'mode' not in field: field['mode'] = 'NULLABLE' diff --git a/pandas_gbq/tests/test__load.py b/pandas_gbq/tests/test__load.py new file mode 100644 index 00000000..d63638e2 --- /dev/null +++ b/pandas_gbq/tests/test__load.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- + +import numpy +import pandas + + +def test_encode_chunk_with_unicode(): + """Test that a dataframe containing unicode can be encoded as a file. + + See: https://github.com/pydata/pandas-gbq/issues/106 + """ + from pandas_gbq._load import encode_chunk + + df = pandas.DataFrame( + numpy.random.randn(6, 4), index=range(6), columns=list('ABCD')) + df['s'] = u'信用卡' + csv_buffer = encode_chunk(df) + csv_bytes = csv_buffer.read() + csv_string = csv_bytes.decode('utf-8') + assert u'信用卡' in csv_string + + +def test_encode_chunks_splits_dataframe(): + from pandas_gbq._load import encode_chunks + df = pandas.DataFrame(numpy.random.randn(6, 4), index=range(6)) + chunks = list(encode_chunks(df, chunksize=2)) + assert len(chunks) == 3 + remaining, buffer = chunks[0] + assert remaining == 4 + assert len(buffer.readlines()) == 2 + + +def test_encode_chunks_with_chunksize_none(): + from pandas_gbq._load import encode_chunks + df = pandas.DataFrame(numpy.random.randn(6, 4), index=range(6)) + chunks = list(encode_chunks(df)) + assert len(chunks) == 1 + remaining, buffer = chunks[0] + assert remaining == 0 + assert len(buffer.readlines()) == 6 diff --git a/pandas_gbq/tests/test__schema.py b/pandas_gbq/tests/test__schema.py new file mode 100644 index 00000000..5c7fffc1 --- /dev/null +++ b/pandas_gbq/tests/test__schema.py @@ -0,0 +1,55 @@ + +import datetime + +import pandas +import pytest + +from pandas_gbq import _schema + + +@pytest.mark.parametrize( + 'dataframe,expected_schema', + [ + ( + pandas.DataFrame(data={'col1': [1, 2, 3]}), + {'fields': [{'name': 'col1', 'type': 'INTEGER'}]}, + ), + ( + pandas.DataFrame(data={'col1': [True, False]}), + {'fields': [{'name': 'col1', 'type': 'BOOLEAN'}]}, + ), + ( + pandas.DataFrame(data={'col1': [1.0, 3.14]}), + {'fields': [{'name': 'col1', 'type': 'FLOAT'}]}, + ), + ( + pandas.DataFrame(data={'col1': [u'hello', u'world']}), + {'fields': [{'name': 'col1', 'type': 'STRING'}]}, + ), + ( + pandas.DataFrame(data={'col1': [datetime.datetime.now()]}), + {'fields': [{'name': 'col1', 'type': 'TIMESTAMP'}]}, + ), + ( + pandas.DataFrame( + data={ + 'col1': [datetime.datetime.now()], + 'col2': [u'hello'], + 'col3': [3.14], + 'col4': [True], + 'col5': [4], + }), + { + 'fields': [ + {'name': 'col1', 'type': 'TIMESTAMP'}, + {'name': 'col2', 'type': 'STRING'}, + {'name': 'col3', 'type': 'FLOAT'}, + {'name': 'col4', 'type': 'BOOLEAN'}, + {'name': 'col5', 'type': 'INTEGER'}, + ], + }, + ), + ]) +def test_generate_bq_schema(dataframe, expected_schema): + schema = _schema.generate_bq_schema(dataframe) + assert schema == expected_schema diff --git a/pandas_gbq/tests/test_gbq.py b/pandas_gbq/tests/test_gbq.py index f4f731b1..97f4729e 100644 --- a/pandas_gbq/tests/test_gbq.py +++ b/pandas_gbq/tests/test_gbq.py @@ -1218,6 +1218,29 @@ def test_upload_other_unicode_data(self): tm.assert_numpy_array_equal(expected.values, result.values) + def test_upload_mixed_float_and_int(self): + """Test that we can upload a dataframe containing an int64 and float64 column. + See: https://github.com/pydata/pandas-gbq/issues/116 + """ + test_id = "mixed_float_and_int" + test_size = 2 + df = DataFrame( + [[1, 1.1], [2, 2.2]], + index=['row 1', 'row 2'], + columns=['intColumn', 'floatColumn']) + + gbq.to_gbq( + df, self.destination_table + test_id, + _get_project_id(), + private_key=_get_private_key_path()) + + result_df = gbq.read_gbq("SELECT * FROM {0}".format( + self.destination_table + test_id), + project_id=_get_project_id(), + private_key=_get_private_key_path()) + + assert len(result_df) == test_size + def test_generate_schema(self): df = tm.makeMixedDataFrame() schema = gbq._generate_bq_schema(df)