Skip to content

Commit

Permalink
feat: accepts a table ID, which downloads the table without a query (#…
Browse files Browse the repository at this point in the history
…443)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery-pandas/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)

Fixes #266  🦕
  • Loading branch information
tswast authored Dec 22, 2021
1 parent c0f9da0 commit bf0e863
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 73 deletions.
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ omit =
google/cloud/__init__.py

[report]
fail_under = 88
fail_under = 89
show_missing = True
exclude_lines =
# Re-enable the standard pragma
Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def cover(session):
test runs (not system test runs), and then erases coverage data.
"""
session.install("coverage", "pytest-cov")
session.run("coverage", "report", "--show-missing", "--fail-under=88")
session.run("coverage", "report", "--show-missing", "--fail-under=89")

session.run("coverage", "erase")

Expand Down
2 changes: 1 addition & 1 deletion owlbot.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
templated_files = common.py_library(
unit_test_python_versions=["3.7", "3.8", "3.9", "3.10"],
system_test_python_versions=["3.7", "3.8", "3.9", "3.10"],
cov_level=88,
cov_level=89,
unit_test_extras=extras,
system_test_extras=extras,
intersphinx_dependencies={
Expand Down
148 changes: 96 additions & 52 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,21 @@
# license that can be found in the LICENSE file.

import logging
import re
import time
import warnings
from datetime import datetime
import typing
from typing import Any, Dict, Optional, Union

import numpy as np

# Only import at module-level at type checking time to avoid circular
# dependencies in the pandas package, which has an optional dependency on
# pandas-gbq.
if typing.TYPE_CHECKING: # pragma: NO COVER
import pandas

# Required dependencies, but treat as optional so that _test_google_api_imports
# can provide a better error message.
try:
Expand Down Expand Up @@ -64,6 +73,10 @@ def _test_google_api_imports():
raise ImportError("pandas-gbq requires google-cloud-bigquery") from ex


def _is_query(query_or_table: str) -> bool:
return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None


class DatasetCreationError(ValueError):
"""
Raised when the create dataset method fails
Expand Down Expand Up @@ -374,6 +387,30 @@ def process_http_error(ex):

raise GenericGBQException("Reason: {0}".format(ex))

def download_table(
self,
table_id: str,
max_results: Optional[int] = None,
progress_bar_type: Optional[str] = None,
dtypes: Optional[Dict[str, Union[str, Any]]] = None,
) -> "pandas.DataFrame":
self._start_timer()

try:
table_ref = bigquery.TableReference.from_string(
table_id, default_project=self.project_id
)
rows_iter = self.client.list_rows(table_ref, max_results=max_results)
except self.http_error as ex:
self.process_http_error(ex)

return self._download_results(
rows_iter,
max_results=max_results,
progress_bar_type=progress_bar_type,
user_dtypes=dtypes,
)

def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
from concurrent.futures import TimeoutError
from google.auth.exceptions import RefreshError
Expand All @@ -390,15 +427,6 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
if config is not None:
job_config.update(config)

if "query" in config and "query" in config["query"]:
if query is not None:
raise ValueError(
"Query statement can't be specified "
"inside config while it is specified "
"as parameter"
)
query = config["query"].pop("query")

self._start_timer()

try:
Expand Down Expand Up @@ -464,15 +492,25 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
)

dtypes = kwargs.get("dtypes")

# Ensure destination is populated.
try:
query_reply.result()
except self.http_error as ex:
self.process_http_error(ex)

rows_iter = self.client.list_rows(
query_reply.destination, max_results=max_results
)
return self._download_results(
query_reply,
rows_iter,
max_results=max_results,
progress_bar_type=progress_bar_type,
user_dtypes=dtypes,
)

def _download_results(
self, query_job, max_results=None, progress_bar_type=None, user_dtypes=None,
self, rows_iter, max_results=None, progress_bar_type=None, user_dtypes=None,
):
# No results are desired, so don't bother downloading anything.
if max_results == 0:
Expand Down Expand Up @@ -504,11 +542,6 @@ def _download_results(
to_dataframe_kwargs["create_bqstorage_client"] = create_bqstorage_client

try:
query_job.result()
# Get the table schema, so that we can list rows.
destination = self.client.get_table(query_job.destination)
rows_iter = self.client.list_rows(destination, max_results=max_results)

schema_fields = [field.to_api_repr() for field in rows_iter.schema]
conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
conversion_dtypes.update(user_dtypes)
Expand Down Expand Up @@ -644,7 +677,7 @@ def _cast_empty_df_dtypes(schema_fields, df):


def read_gbq(
query,
query_or_table,
project_id=None,
index_col=None,
col_order=None,
Expand All @@ -668,17 +701,18 @@ def read_gbq(
This method uses the Google Cloud client library to make requests to
Google BigQuery, documented `here
<https://google-cloud-python.readthedocs.io/en/latest/bigquery/usage.html>`__.
<https://googleapis.dev/python/bigquery/latest/index.html>`__.
See the :ref:`How to authenticate with Google BigQuery <authentication>`
guide for authentication instructions.
Parameters
----------
query : str
SQL-Like Query to return data values.
query_or_table : str
SQL query to return data values. If the string is a table ID, fetch the
rows directly from the table without running a query.
project_id : str, optional
Google BigQuery Account project ID. Optional when available from
Google Cloud Platform project ID. Optional when available from
the environment.
index_col : str, optional
Name of result column to use for index in results DataFrame.
Expand All @@ -688,14 +722,14 @@ def read_gbq(
reauth : boolean, default False
Force Google BigQuery to re-authenticate the user. This is useful
if multiple accounts are used.
auth_local_webserver : boolean, default False
Use the `local webserver flow`_ instead of the `console flow`_
when getting user credentials.
.. _local webserver flow:
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server
.. _console flow:
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console
auth_local_webserver : bool, default False
Use the `local webserver flow
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server>`_
instead of the `console flow
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console>`_
when getting user credentials. Your code must run on the same machine
as your web browser and your web browser can access your application
via ``localhost:808X``.
.. versionadded:: 0.2.0
dialect : str, default 'standard'
Expand Down Expand Up @@ -745,13 +779,6 @@ def read_gbq(
<https://cloud.google.com/bigquery/docs/access-control#roles>`__
permission on the project you are billing queries to.
**Note:** Due to a `known issue in the ``google-cloud-bigquery``
package
<https://github.com/googleapis/google-cloud-python/pull/7633>`__
(fixed in version 1.11.0), you must write your query results to a
destination table. To do this with ``read_gbq``, supply a
``configuration`` dictionary.
This feature requires the ``google-cloud-bigquery-storage`` and
``pyarrow`` packages.
Expand Down Expand Up @@ -823,6 +850,15 @@ def read_gbq(
if dialect not in ("legacy", "standard"):
raise ValueError("'{0}' is not valid for dialect".format(dialect))

if configuration and "query" in configuration and "query" in configuration["query"]:
if query_or_table is not None:
raise ValueError(
"Query statement can't be specified "
"inside config while it is specified "
"as parameter"
)
query_or_table = configuration["query"].pop("query")

connector = GbqConnector(
project_id,
reauth=reauth,
Expand All @@ -834,13 +870,21 @@ def read_gbq(
use_bqstorage_api=use_bqstorage_api,
)

final_df = connector.run_query(
query,
configuration=configuration,
max_results=max_results,
progress_bar_type=progress_bar_type,
dtypes=dtypes,
)
if _is_query(query_or_table):
final_df = connector.run_query(
query_or_table,
configuration=configuration,
max_results=max_results,
progress_bar_type=progress_bar_type,
dtypes=dtypes,
)
else:
final_df = connector.download_table(
query_or_table,
max_results=max_results,
progress_bar_type=progress_bar_type,
dtypes=dtypes,
)

# Reindex the DataFrame on the provided column
if index_col is not None:
Expand Down Expand Up @@ -889,7 +933,7 @@ def to_gbq(
This method uses the Google Cloud client library to make requests to
Google BigQuery, documented `here
<https://google-cloud-python.readthedocs.io/en/latest/bigquery/usage.html>`__.
<https://googleapis.dev/python/bigquery/latest/index.html>`__.
See the :ref:`How to authenticate with Google BigQuery <authentication>`
guide for authentication instructions.
Expand All @@ -902,7 +946,7 @@ def to_gbq(
Name of table to be written, in the form ``dataset.tablename`` or
``project.dataset.tablename``.
project_id : str, optional
Google BigQuery Account project ID. Optional when available from
Google Cloud Platform project ID. Optional when available from
the environment.
chunksize : int, optional
Number of rows to be inserted in each chunk from the dataframe.
Expand All @@ -920,13 +964,13 @@ def to_gbq(
``'append'``
If table exists, insert data. Create if does not exist.
auth_local_webserver : bool, default False
Use the `local webserver flow`_ instead of the `console flow`_
when getting user credentials.
.. _local webserver flow:
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server
.. _console flow:
http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console
Use the `local webserver flow
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server>`_
instead of the `console flow
<https://googleapis.dev/python/google-auth-oauthlib/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console>`_
when getting user credentials. Your code must run on the same machine
as your web browser and your web browser can access your application
via ``localhost:808X``.
.. versionadded:: 0.2.0
table_schema : list of dicts, optional
Expand Down
8 changes: 7 additions & 1 deletion pandas_gbq/timestamp.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
Private module.
"""

import pandas.api.types


def localize_df(df, schema_fields):
"""Localize any TIMESTAMP columns to tz-aware type.
Expand Down Expand Up @@ -38,7 +40,11 @@ def localize_df(df, schema_fields):
if "mode" in field and field["mode"].upper() == "REPEATED":
continue

if field["type"].upper() == "TIMESTAMP" and df[column].dt.tz is None:
if (
field["type"].upper() == "TIMESTAMP"
and pandas.api.types.is_datetime64_ns_dtype(df.dtypes[column])
and df[column].dt.tz is None
):
df[column] = df[column].dt.tz_localize("UTC")

return df
19 changes: 19 additions & 0 deletions tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# license that can be found in the LICENSE file.

import os
import functools
import pathlib

from google.cloud import bigquery
Expand Down Expand Up @@ -56,6 +57,24 @@ def project(project_id):
return project_id


@pytest.fixture
def to_gbq(credentials, project_id):
import pandas_gbq

return functools.partial(
pandas_gbq.to_gbq, project_id=project_id, credentials=credentials
)


@pytest.fixture
def read_gbq(credentials, project_id):
import pandas_gbq

return functools.partial(
pandas_gbq.read_gbq, project_id=project_id, credentials=credentials
)


@pytest.fixture()
def random_dataset_id(bigquery_client: bigquery.Client, project_id: str):
dataset_id = prefixer.create_prefix()
Expand Down
Loading

0 comments on commit bf0e863

Please sign in to comment.