Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Add use_bqstorage_api option to read_gbq #270

Merged
merged 3 commits into from
Apr 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ Enhancements
available (contributed by @daureg)
- ``read_gbq`` uses the timezone-aware ``DatetimeTZDtype(unit='ns',
tz='UTC')`` dtype for BigQuery ``TIMESTAMP`` columns. (:issue:`269`)
- Add ``use_bqstorage_api`` to :func:`read_gbq`. The BigQuery Storage API can
be used to download large query results (>125 MB) more quickly. If the BQ
Storage API can't be used, the BigQuery API is used instead. (:issue:`133`,
:issue:`270`)

.. _changelog-0.9.0:

Expand Down
38 changes: 38 additions & 0 deletions docs/source/reading.rst
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,41 @@ DATETIME datetime64[ns]
TIME datetime64[ns]
DATE datetime64[ns]
================== =========================

.. _reading-bqstorage-api:

Using the BigQuery Storage API
------------------------------

Use the BigQuery Storage API to download large (>125 MB) query results more
quickly (but at an `increased cost
<https://cloud.google.com/bigquery/pricing#storage-api>`__) by setting
``use_bqstorage_api`` to ``True``.

1. Enable the BigQuery Storage API on the project you are using to run
queries.

`Enable the API
<https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com>`__.
2. Ensure you have the `*bigquery.readsessions.create permission*
<https://cloud.google.com/bigquery/docs/access-control#bq-permissions>`__. to
create BigQuery Storage API read sessions. This permission is provided by
the `*bigquery.user* role
<https://cloud.google.com/bigquery/docs/access-control#roles>`__.
4. Install the ``google-cloud-bigquery-storage``, ``fastavro``, and
``python-snappy`` packages.

With pip:

..code-block:: sh

pip install --upgrade google-cloud-bigquery-storage fastavro python-snappy

With conda:

conda install -c conda-forge google-cloud-bigquery-storage fastavro python-snappy
4. Set ``use_bqstorage_api`` to ``True`` when calling the
:func:`~pandas_gbq.read_gbq` function. As of the ``google-cloud-bigquery``
package, version 1.11.1 or later,the function will fallback to the
BigQuery API if the BigQuery Storage API cannot be used, such as with
small query results.
53 changes: 52 additions & 1 deletion pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@

import numpy as np

try:
# The BigQuery Storage API client is an optional dependency. It is only
# required when use_bqstorage_api=True.
from google.cloud import bigquery_storage_v1beta1
except ImportError: # pragma: NO COVER
bigquery_storage_v1beta1 = None

from pandas_gbq.exceptions import AccessDenied

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -302,6 +309,7 @@ def __init__(
dialect="standard",
location=None,
credentials=None,
use_bqstorage_api=False,
):
global context
from google.api_core.exceptions import GoogleAPIError
Expand Down Expand Up @@ -352,6 +360,9 @@ def __init__(
context.project = self.project_id

self.client = self.get_client()
self.bqstorage_client = _make_bqstorage_client(
use_bqstorage_api, self.credentials
)

# BQ Queries costs $5 per TB. First 1 TB per month is free
# see here for more: https://cloud.google.com/bigquery/pricing
Expand Down Expand Up @@ -489,7 +500,9 @@ def run_query(self, query, **kwargs):

schema_fields = [field.to_api_repr() for field in rows_iter.schema]
nullsafe_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
df = rows_iter.to_dataframe(dtypes=nullsafe_dtypes)
df = rows_iter.to_dataframe(
dtypes=nullsafe_dtypes, bqstorage_client=self.bqstorage_client
)

if df.empty:
df = _cast_empty_df_dtypes(schema_fields, df)
Expand Down Expand Up @@ -727,6 +740,21 @@ def _localize_df(schema_fields, df):
return df


def _make_bqstorage_client(use_bqstorage_api, credentials):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this take a project too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BigQuery Storage API client doesn't actually take a project. The project is set when creating a BQ Storage API read session by to_dataframe based on the BQ client's project.

Aside: The BQ Storage API client is partially auto-generated, and the client generator doesn't support a default project ID. Since I'm considering the BQ Storage API a lower-level client that's used by the normal BQ client from to_dataframe, I felt it wasn't worth adding in that kind of handwritten helper to the autogenned layer.

if not use_bqstorage_api:
return None

if bigquery_storage_v1beta1 is None:
raise ImportError(
"Install the google-cloud-bigquery-storage and fastavro packages "
"to use the BigQuery Storage API."
)

return bigquery_storage_v1beta1.BigQueryStorageClient(
credentials=credentials
)


def read_gbq(
query,
project_id=None,
Expand All @@ -738,6 +766,7 @@ def read_gbq(
location=None,
configuration=None,
credentials=None,
use_bqstorage_api=False,
verbose=None,
private_key=None,
):
Expand Down Expand Up @@ -815,6 +844,27 @@ def read_gbq(
:class:`google.oauth2.service_account.Credentials` directly.

.. versionadded:: 0.8.0
use_bqstorage_api : bool, default False
Use the `BigQuery Storage API
<https://cloud.google.com/bigquery/docs/reference/storage/>`__ to
download query results quickly, but at an increased cost. To use this
API, first `enable it in the Cloud Console
<https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com>`__.
You must also have the `bigquery.readsessions.create
<https://cloud.google.com/bigquery/docs/access-control#roles>`__
permission on the project you are billing queries to.

**Note:** Due to a `known issue in the ``google-cloud-bigquery``
package
<https://github.com/googleapis/google-cloud-python/pull/7633>`__
(fixed in version 1.11.0), you must write your query results to a
destination table. To do this with ``read_gbq``, supply a
``configuration`` dictionary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could only allow importing this for versions >1.11.0

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few thoughts on this:

  • I'd be more comfortable bumping the required version once there's been a few more versions of google-cloud-bigquery released past 1.11. I like people to have a few options in case there's a problematic regression in 1.11.1.
  • The failure on small results will be quite obvious (InternalError stacktrace) if they do try to use with an older version. If someone really wants to try to use it with an older version, they'll find out pretty quickly why they should upgrade, or they really know what they are doing and that they are always going to get results that are large enough for the BigQuery Storage API to read from.
  • Someday we might want to make use_bqstorage_api=True the default (maybe after the BQ Storage API goes GA?). I think it'd definitely be worth bumping the minimum version at that time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that sounds v reasonable. Agree re making the default True when it's GA (or at least soliciting opinions)

tbc I was suggesting only for using bqstorage do we make the min version 1.11 (probably implemented above when attempting the import). Other versions would still run everything else


This feature requires the ``google-cloud-bigquery-storage`` and
``fastavro`` packages.

.. versionadded:: 0.10.0
verbose : None, deprecated
Deprecated in Pandas-GBQ 0.4.0. Use the `logging module
to adjust verbosity instead
Expand Down Expand Up @@ -871,6 +921,7 @@ def read_gbq(
location=location,
credentials=credentials,
private_key=private_key,
use_bqstorage_api=use_bqstorage_api,
)

final_df = connector.run_query(query, configuration=configuration)
Expand Down
28 changes: 27 additions & 1 deletion tests/system/test_gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,10 +895,36 @@ def test_tokyo(self, tokyo_dataset, tokyo_table, private_key_path):
location="asia-northeast1",
private_key=private_key_path,
)
print(df)
assert df["max_year"][0] >= 2000


@pytest.mark.skip(reason="large query for BQ Storage API tests")
def test_read_gbq_w_bqstorage_api(credentials):
df = gbq.read_gbq(
"""
SELECT
dependency_name,
dependency_platform,
project_name,
project_id,
version_number,
version_id,
dependency_kind,
optional_dependency,
dependency_requirements,
dependency_project_id
FROM
`bigquery-public-data.libraries_io.dependencies`
WHERE
LOWER(dependency_platform) = 'npm'
LIMIT 2500000
""",
use_bqstorage_api=True,
credentials=credentials,
)
assert len(df) == 2500000


class TestToGBQIntegration(object):
@pytest.fixture(autouse=True, scope="function")
def setup(self, project, credentials, random_dataset_id):
Expand Down