diff --git a/docs/source/changelog.rst b/docs/source/changelog.rst index d710b37f..f18b20d9 100644 --- a/docs/source/changelog.rst +++ b/docs/source/changelog.rst @@ -41,6 +41,10 @@ Enhancements available (contributed by @daureg) - ``read_gbq`` uses the timezone-aware ``DatetimeTZDtype(unit='ns', tz='UTC')`` dtype for BigQuery ``TIMESTAMP`` columns. (:issue:`269`) +- Add ``use_bqstorage_api`` to :func:`read_gbq`. The BigQuery Storage API can + be used to download large query results (>125 MB) more quickly. If the BQ + Storage API can't be used, the BigQuery API is used instead. (:issue:`133`, + :issue:`270`) .. _changelog-0.9.0: diff --git a/docs/source/reading.rst b/docs/source/reading.rst index 4a7b9d66..e10f533c 100644 --- a/docs/source/reading.rst +++ b/docs/source/reading.rst @@ -84,3 +84,41 @@ DATETIME datetime64[ns] TIME datetime64[ns] DATE datetime64[ns] ================== ========================= + +.. _reading-bqstorage-api: + +Using the BigQuery Storage API +------------------------------ + +Use the BigQuery Storage API to download large (>125 MB) query results more +quickly (but at an `increased cost +`__) by setting +``use_bqstorage_api`` to ``True``. + +1. Enable the BigQuery Storage API on the project you are using to run + queries. + + `Enable the API + `__. +2. Ensure you have the `*bigquery.readsessions.create permission* + `__. to + create BigQuery Storage API read sessions. This permission is provided by + the `*bigquery.user* role + `__. +4. Install the ``google-cloud-bigquery-storage``, ``fastavro``, and + ``python-snappy`` packages. + + With pip: + + ..code-block:: sh + + pip install --upgrade google-cloud-bigquery-storage fastavro python-snappy + + With conda: + + conda install -c conda-forge google-cloud-bigquery-storage fastavro python-snappy +4. Set ``use_bqstorage_api`` to ``True`` when calling the + :func:`~pandas_gbq.read_gbq` function. As of the ``google-cloud-bigquery`` + package, version 1.11.1 or later,the function will fallback to the + BigQuery API if the BigQuery Storage API cannot be used, such as with + small query results. diff --git a/pandas_gbq/gbq.py b/pandas_gbq/gbq.py index b9978887..9c02538d 100644 --- a/pandas_gbq/gbq.py +++ b/pandas_gbq/gbq.py @@ -5,6 +5,13 @@ import numpy as np +try: + # The BigQuery Storage API client is an optional dependency. It is only + # required when use_bqstorage_api=True. + from google.cloud import bigquery_storage_v1beta1 +except ImportError: # pragma: NO COVER + bigquery_storage_v1beta1 = None + from pandas_gbq.exceptions import AccessDenied logger = logging.getLogger(__name__) @@ -302,6 +309,7 @@ def __init__( dialect="standard", location=None, credentials=None, + use_bqstorage_api=False, ): global context from google.api_core.exceptions import GoogleAPIError @@ -352,6 +360,9 @@ def __init__( context.project = self.project_id self.client = self.get_client() + self.bqstorage_client = _make_bqstorage_client( + use_bqstorage_api, self.credentials + ) # BQ Queries costs $5 per TB. First 1 TB per month is free # see here for more: https://cloud.google.com/bigquery/pricing @@ -489,7 +500,9 @@ def run_query(self, query, **kwargs): schema_fields = [field.to_api_repr() for field in rows_iter.schema] nullsafe_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields) - df = rows_iter.to_dataframe(dtypes=nullsafe_dtypes) + df = rows_iter.to_dataframe( + dtypes=nullsafe_dtypes, bqstorage_client=self.bqstorage_client + ) if df.empty: df = _cast_empty_df_dtypes(schema_fields, df) @@ -727,6 +740,21 @@ def _localize_df(schema_fields, df): return df +def _make_bqstorage_client(use_bqstorage_api, credentials): + if not use_bqstorage_api: + return None + + if bigquery_storage_v1beta1 is None: + raise ImportError( + "Install the google-cloud-bigquery-storage and fastavro packages " + "to use the BigQuery Storage API." + ) + + return bigquery_storage_v1beta1.BigQueryStorageClient( + credentials=credentials + ) + + def read_gbq( query, project_id=None, @@ -738,6 +766,7 @@ def read_gbq( location=None, configuration=None, credentials=None, + use_bqstorage_api=False, verbose=None, private_key=None, ): @@ -815,6 +844,27 @@ def read_gbq( :class:`google.oauth2.service_account.Credentials` directly. .. versionadded:: 0.8.0 + use_bqstorage_api : bool, default False + Use the `BigQuery Storage API + `__ to + download query results quickly, but at an increased cost. To use this + API, first `enable it in the Cloud Console + `__. + You must also have the `bigquery.readsessions.create + `__ + permission on the project you are billing queries to. + + **Note:** Due to a `known issue in the ``google-cloud-bigquery`` + package + `__ + (fixed in version 1.11.0), you must write your query results to a + destination table. To do this with ``read_gbq``, supply a + ``configuration`` dictionary. + + This feature requires the ``google-cloud-bigquery-storage`` and + ``fastavro`` packages. + + .. versionadded:: 0.10.0 verbose : None, deprecated Deprecated in Pandas-GBQ 0.4.0. Use the `logging module to adjust verbosity instead @@ -871,6 +921,7 @@ def read_gbq( location=location, credentials=credentials, private_key=private_key, + use_bqstorage_api=use_bqstorage_api, ) final_df = connector.run_query(query, configuration=configuration) diff --git a/tests/system/test_gbq.py b/tests/system/test_gbq.py index 6c876068..5c1e6db2 100644 --- a/tests/system/test_gbq.py +++ b/tests/system/test_gbq.py @@ -895,10 +895,36 @@ def test_tokyo(self, tokyo_dataset, tokyo_table, private_key_path): location="asia-northeast1", private_key=private_key_path, ) - print(df) assert df["max_year"][0] >= 2000 +@pytest.mark.skip(reason="large query for BQ Storage API tests") +def test_read_gbq_w_bqstorage_api(credentials): + df = gbq.read_gbq( + """ + SELECT + dependency_name, + dependency_platform, + project_name, + project_id, + version_number, + version_id, + dependency_kind, + optional_dependency, + dependency_requirements, + dependency_project_id + FROM + `bigquery-public-data.libraries_io.dependencies` + WHERE + LOWER(dependency_platform) = 'npm' + LIMIT 2500000 + """, + use_bqstorage_api=True, + credentials=credentials, + ) + assert len(df) == 2500000 + + class TestToGBQIntegration(object): @pytest.fixture(autouse=True, scope="function") def setup(self, project, credentials, random_dataset_id):