Skip to content

Commit

Permalink
refactor: move query and wait logic to separate module
Browse files Browse the repository at this point in the history
This prepares the way for using the `query_and_wait` method built-in to the
client library when available.
  • Loading branch information
tswast committed Jan 9, 2024
1 parent 78c58cc commit d3600c2
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 192 deletions.
9 changes: 9 additions & 0 deletions pandas_gbq/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,12 @@ class PerformanceWarning(RuntimeWarning):
Such warnings can occur when dependencies for the requested feature
aren't up-to-date.
"""


class QueryTimeout(ValueError):
"""
Raised when the query request exceeds the timeoutMs value specified in the
BigQuery configuration.
"""

pass
144 changes: 19 additions & 125 deletions pandas_gbq/gbq.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# license that can be found in the LICENSE file.

import copy
import concurrent.futures
from datetime import datetime
import logging
import re
Expand All @@ -20,8 +19,9 @@
if typing.TYPE_CHECKING: # pragma: NO COVER
import pandas

from pandas_gbq.exceptions import AccessDenied, GenericGBQException
from pandas_gbq.exceptions import GenericGBQException, QueryTimeout
from pandas_gbq.features import FEATURES
import pandas_gbq.query
import pandas_gbq.schema
import pandas_gbq.timestamp

Expand Down Expand Up @@ -130,15 +130,6 @@ class NotFoundException(ValueError):
pass


class QueryTimeout(ValueError):
"""
Raised when the query request exceeds the timeoutMs value specified in the
BigQuery configuration.
"""

pass


class TableCreationError(ValueError):
"""
Raised when the create table method fails
Expand Down Expand Up @@ -340,10 +331,6 @@ def __init__(
self.client = self.get_client()
self.use_bqstorage_api = use_bqstorage_api

# BQ Queries costs $5 per TB. First 1 TB per month is free
# see here for more: https://cloud.google.com/bigquery/pricing
self.query_price_for_TB = 5.0 / 2**40 # USD/TB

def _start_timer(self):
self.start = time.time()

Expand All @@ -355,16 +342,6 @@ def log_elapsed_seconds(self, prefix="Elapsed", postfix="s.", overlong=6):
if sec > overlong:
logger.info("{} {} {}".format(prefix, sec, postfix))

# http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
@staticmethod
def sizeof_fmt(num, suffix="B"):
fmt = "%3.1f %s%s"
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return fmt % (num, unit, suffix)
num /= 1024.0
return fmt % (num, "Y", suffix)

def get_client(self):
import google.api_core.client_info
from google.cloud import bigquery
Expand Down Expand Up @@ -421,46 +398,10 @@ def download_table(
user_dtypes=dtypes,
)

def _wait_for_query_job(self, query_reply, timeout_ms):
"""Wait for query to complete, pausing occasionally to update progress.
Args:
query_reply (QueryJob):
A query job which has started.
timeout_ms (Optional[int]):
How long to wait before cancelling the query.
"""
# Wait at most 10 seconds so we can show progress.
# TODO(https://github.com/googleapis/python-bigquery-pandas/issues/327):
# Include a tqdm progress bar here instead of a stream of log messages.
timeout_sec = 10.0
if timeout_ms:
timeout_sec = min(timeout_sec, timeout_ms / 1000.0)

while query_reply.state != "DONE":
self.log_elapsed_seconds(" Elapsed", "s. Waiting...")

if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000:
self.client.cancel_job(
query_reply.job_id, location=query_reply.location
)
raise QueryTimeout("Query timeout: {} ms".format(timeout_ms))

try:
query_reply.result(timeout=timeout_sec)
except concurrent.futures.TimeoutError:
# Use our own timeout logic
pass
except self.http_error as ex:
self.process_http_error(ex)

def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
from google.auth.exceptions import RefreshError
from google.cloud import bigquery
import pandas

job_config = {
job_config_dict = {
"query": {
"useLegacySql": self.dialect
== "legacy"
Expand All @@ -470,74 +411,27 @@ def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs):
}
config = kwargs.get("configuration")
if config is not None:
job_config.update(config)
job_config_dict.update(config)

self._start_timer()

try:
logger.debug("Requesting query... ")
query_reply = self.client.query(
query,
job_config=bigquery.QueryJobConfig.from_api_repr(job_config),
location=self.location,
project=self.project_id,
)
logger.debug("Query running...")
except (RefreshError, ValueError) as ex:
if self.private_key:
raise AccessDenied(
f"The service account credentials are not valid: {ex}"
)
else:
raise AccessDenied(
"The credentials have been revoked or expired, "
f"please re-run the application to re-authorize: {ex}"
)
except self.http_error as ex:
self.process_http_error(ex)

job_id = query_reply.job_id
logger.debug("Job ID: %s" % job_id)

timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get(
"timeoutMs"
)
timeout_ms = job_config_dict.get("jobTimeoutMs") or job_config_dict[
"query"
].get("timeoutMs")
timeout_ms = int(timeout_ms) if timeout_ms else None
self._wait_for_query_job(query_reply, timeout_ms)

if query_reply.cache_hit:
logger.debug("Query done.\nCache hit.\n")
else:
bytes_processed = query_reply.total_bytes_processed or 0
bytes_billed = query_reply.total_bytes_billed or 0
logger.debug(
"Query done.\nProcessed: {} Billed: {}".format(
self.sizeof_fmt(bytes_processed),
self.sizeof_fmt(bytes_billed),
)
)
logger.debug(
"Standard price: ${:,.2f} USD\n".format(
bytes_billed * self.query_price_for_TB
)
)
self._start_timer()
job_config = bigquery.QueryJobConfig.from_api_repr(job_config_dict)
rows_iter = pandas_gbq.query.query_and_wait(
self,
self.client,
query,
location=self.location,
project_id=self.project_id,
job_config=job_config,
max_results=max_results,
timeout_ms=timeout_ms,
)

dtypes = kwargs.get("dtypes")

# Ensure destination is populated.
try:
query_reply.result()
except self.http_error as ex:
self.process_http_error(ex)

# Avoid attempting to download results from DML queries, which have no
# destination.
if query_reply.destination is None:
return pandas.DataFrame()

rows_iter = self.client.list_rows(
query_reply.destination, max_results=max_results
)
return self._download_results(
rows_iter,
max_results=max_results,
Expand Down
135 changes: 135 additions & 0 deletions pandas_gbq/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright (c) 2017 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

from __future__ import annotations

import concurrent.futures
import logging
from typing import Optional

from google.cloud import bigquery

import pandas_gbq.exceptions


logger = logging.getLogger(__name__)


# On-demand BQ Queries costs $6.25 per TB. First 1 TB per month is free
# see here for more: https://cloud.google.com/bigquery/pricing
QUERY_PRICE_FOR_TB = 6.25 / 2**40 # USD/TB


# http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size
def sizeof_fmt(num, suffix="B"):
fmt = "%3.1f %s%s"
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return fmt % (num, unit, suffix)
num /= 1024.0
return fmt % (num, "Y", suffix)


def _wait_for_query_job(
connector,
client: bigquery.Client,
query_reply: bigquery.QueryJob,
timeout_ms: Optional[float],
):
"""Wait for query to complete, pausing occasionally to update progress.
Args:
query_reply (QueryJob):
A query job which has started.
timeout_ms (Optional[int]):
How long to wait before cancelling the query.
"""
# Wait at most 10 seconds so we can show progress.
# TODO(https://github.com/googleapis/python-bigquery-pandas/issues/327):
# Include a tqdm progress bar here instead of a stream of log messages.
timeout_sec = 10.0
if timeout_ms:
timeout_sec = min(timeout_sec, timeout_ms / 1000.0)

while query_reply.state != "DONE":
connector.log_elapsed_seconds(" Elapsed", "s. Waiting...")

if timeout_ms and timeout_ms < connector.get_elapsed_seconds() * 1000:
client.cancel_job(query_reply.job_id, location=query_reply.location)
raise pandas_gbq.exceptions.QueryTimeout(
"Query timeout: {} ms".format(timeout_ms)
)

try:
query_reply.result(timeout=timeout_sec)
except concurrent.futures.TimeoutError:
# Use our own timeout logic
pass
except connector.http_error as ex:
connector.process_http_error(ex)


def query_and_wait(
connector,
client: bigquery.Client,
query: str,
*,
job_config: bigquery.QueryJobConfig,
location: Optional[str],
project_id: Optional[str],
max_results: Optional[int],
timeout_ms: Optional[int],
):
from google.auth.exceptions import RefreshError

try:
logger.debug("Requesting query... ")
query_reply = client.query(
query,
job_config=job_config,
location=location,
project=project_id,
)
logger.debug("Query running...")
except (RefreshError, ValueError) as ex:
if connector.private_key:
raise pandas_gbq.exceptions.AccessDenied(
f"The service account credentials are not valid: {ex}"
)
else:
raise pandas_gbq.exceptions.AccessDenied(
"The credentials have been revoked or expired, "
f"please re-run the application to re-authorize: {ex}"
)
except connector.http_error as ex:
connector.process_http_error(ex)

job_id = query_reply.job_id
logger.debug("Job ID: %s" % job_id)

_wait_for_query_job(connector, connector.client, query_reply, timeout_ms)

if query_reply.cache_hit:
logger.debug("Query done.\nCache hit.\n")
else:
bytes_processed = query_reply.total_bytes_processed or 0
bytes_billed = query_reply.total_bytes_billed or 0
logger.debug(
"Query done.\nProcessed: {} Billed: {}".format(
sizeof_fmt(bytes_processed),
sizeof_fmt(bytes_billed),
)
)
logger.debug(
"Standard price: ${:,.2f} USD\n".format(bytes_billed * QUERY_PRICE_FOR_TB)
)

# As of google-cloud-bigquery 2.3.0, QueryJob.result() uses
# getQueryResults() instead of tabledata.list, which returns the correct
# response with DML/DDL queries.
try:
return query_reply.result(max_results=max_results)
except connector.http_error as ex:
connector.process_http_error(ex)
5 changes: 0 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@
"google-api-core >= 2.10.2, <3.0.0dev",
"google-auth >=2.13.0",
"google-auth-oauthlib >=0.7.0",
# Require 1.27.* because it has a fix for out-of-bounds timestamps. See:
# https://github.com/googleapis/python-bigquery/pull/209 and
# https://github.com/googleapis/python-bigquery-pandas/issues/365
# Exclude 2.4.* because it has a bug where waiting for the query can hang
# indefinitely. https://github.com/pydata/pandas-gbq/issues/343
"google-cloud-bigquery >=3.3.5,<4.0.0dev,!=2.4.*",
"google-cloud-bigquery-storage >=2.16.2,<3.0.0dev",
"packaging >=20.0.0",
Expand Down
Loading

0 comments on commit d3600c2

Please sign in to comment.