-
Notifications
You must be signed in to change notification settings - Fork 43
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add filters and columns arguments to read_gbq for enhanced data querying #198
Changes from 11 commits
7a006b0
37794a3
499bdcd
300263e
fdc539d
6ed4194
ad6d37f
3473780
276bfd0
8a4e940
b29e9b7
c00a05e
dd94369
54ca688
95e318b
ced491f
0f2840d
771d093
82f74fd
1c038b5
354fd8e
434c559
c17b815
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -284,9 +284,13 @@ def read_gbq( | |||||
index_col: Iterable[str] | str = (), | ||||||
col_order: Iterable[str] = (), | ||||||
max_results: Optional[int] = None, | ||||||
columns: Iterable[str] = (), | ||||||
filters: third_party_pandas_gbq.FiltersType = (), | ||||||
# Add a verify index argument that fails if the index is not unique. | ||||||
) -> dataframe.DataFrame: | ||||||
# TODO(b/281571214): Generate prompt to show the progress of read_gbq. | ||||||
query_or_table = self._filters_to_query(query_or_table, columns, filters) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, changed in read_gbq. |
||||||
|
||||||
if _is_query(query_or_table): | ||||||
return self._read_gbq_query( | ||||||
query_or_table, | ||||||
|
@@ -307,6 +311,80 @@ def read_gbq( | |||||
api_name="read_gbq", | ||||||
) | ||||||
|
||||||
def _filters_to_query(self, query_or_table, columns, filters): | ||||||
"""Convert filters to query""" | ||||||
if len(filters) == 0 and len(columns) == 0: | ||||||
return query_or_table | ||||||
|
||||||
sub_query = ( | ||||||
f"({query_or_table})" if _is_query(query_or_table) else query_or_table | ||||||
) | ||||||
|
||||||
select_clause = "SELECT " + ( | ||||||
", ".join(f"`{column}`" for column in columns) if columns else "*" | ||||||
) | ||||||
|
||||||
where_clause = "" | ||||||
if filters: | ||||||
valid_operators = { | ||||||
"in": "IN", | ||||||
"not in": "NOT IN", | ||||||
"==": "=", | ||||||
">": ">", | ||||||
"<": "<", | ||||||
">=": ">=", | ||||||
"<=": "<=", | ||||||
"!=": "!=", | ||||||
} | ||||||
|
||||||
if ( | ||||||
isinstance(filters, Iterable) | ||||||
and isinstance(filters[0], Tuple) | ||||||
and (len(filters[0]) == 0 or not isinstance(filters[0][0], Tuple)) | ||||||
): | ||||||
filters = [filters] | ||||||
|
||||||
or_expressions = [] | ||||||
for group in filters: | ||||||
if not isinstance(group, Iterable): | ||||||
raise ValueError( | ||||||
f"Filter group should be a iterable, {group} is not valid." | ||||||
) | ||||||
|
||||||
and_expressions = [] | ||||||
for filter_item in group: | ||||||
if not isinstance(filter_item, tuple) or (len(filter_item) != 3): | ||||||
raise ValueError( | ||||||
f"Filter condition should be a tuple of length 3, {filter_item} is not valid." | ||||||
) | ||||||
|
||||||
column, operator, value = filter_item | ||||||
|
||||||
if not isinstance(column, str): | ||||||
raise ValueError( | ||||||
f"Column name should be a string, but received '{column}' of type {type(column).__name__}." | ||||||
) | ||||||
|
||||||
if operator not in valid_operators: | ||||||
raise ValueError(f"Operator {operator} is not valid.") | ||||||
|
||||||
operator = valid_operators[operator] | ||||||
|
||||||
if operator in ["IN", "NOT IN"]: | ||||||
value_list = ", ".join([repr(v) for v in value]) | ||||||
expression = f"`{column}` {operator} ({value_list})" | ||||||
else: | ||||||
expression = f"`{column}` {operator} {repr(value)}" | ||||||
and_expressions.append(expression) | ||||||
|
||||||
or_expressions.append(" AND ".join(and_expressions)) | ||||||
|
||||||
if or_expressions: | ||||||
where_clause = " WHERE " + " OR ".join(or_expressions) | ||||||
|
||||||
full_query = f"{select_clause} FROM {sub_query} AS sub{where_clause}" | ||||||
return full_query | ||||||
|
||||||
def _query_to_destination( | ||||||
self, | ||||||
query: str, | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -309,6 +309,64 @@ def test_read_gbq_w_script_no_select(session, dataset_id: str): | |
assert df["statement_type"][0] == "SCRIPT" | ||
|
||
|
||
@pytest.mark.parametrize( | ||
("query_or_table", "filters", "validator"), | ||
[ | ||
pytest.param( | ||
"""SELECT | ||
rowindex, | ||
string_col, | ||
FROM `{scalars_table_id}` AS t | ||
""", | ||
[("rowindex", "<", 4), ("string_col", "==", "Hello, World!")], | ||
lambda row: row["rowindex"] < 4 and row["string_col"] == "Hello, World!", | ||
id="query_input", | ||
), | ||
pytest.param( | ||
"{scalars_table_id}", | ||
[("date_col", ">", "2022-10-20")], | ||
lambda row: pd.to_datetime(row["date_col"]) > pd.to_datetime("2022-10-20"), | ||
id="table_input", | ||
), | ||
pytest.param( | ||
"{scalars_table_id}", | ||
[ | ||
(("rowindex", "not in", [0, 6])), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. double parentheses here is effectively same as single parentheses, did you mean it to represent a test case with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, updated. |
||
(("string_col", "in", ["Hello, World!", "こんにちは"])), | ||
], | ||
lambda row: row["rowindex"] not in [0, 6] | ||
or row["string_col"] in ["Hello, World!", "こんにちは"], | ||
id="or_operation", | ||
), | ||
pytest.param( | ||
"{scalars_table_id}", | ||
["date_col", ">", "2022-10-20"], | ||
None, | ||
marks=pytest.mark.xfail( | ||
raises=ValueError, | ||
), | ||
id="raise_error", | ||
), | ||
], | ||
) | ||
def test_read_gbq_with_filters( | ||
session, scalars_table_id: str, query_or_table, filters, validator | ||
): | ||
df = session.read_gbq( | ||
query_or_table.format(scalars_table_id=scalars_table_id), | ||
filters=filters, | ||
) | ||
|
||
for _, row in df.iterrows(): | ||
assert validator(row) | ||
|
||
|
||
def test_read_gbq_with_columns_filter(session, scalars_table_id: str): | ||
cols = ["int64_too", "string_col", "date_col"] | ||
df = session.read_gbq(scalars_table_id, columns=cols) | ||
assert list(df.columns) == cols | ||
|
||
|
||
def test_read_gbq_model(session, penguins_linear_model_name): | ||
model = session.read_gbq_model(penguins_linear_model_name) | ||
assert isinstance(model, bigframes.ml.linear_model.LinearRegression) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,10 +3,25 @@ | |
|
||
from __future__ import annotations | ||
|
||
from typing import Iterable, Optional | ||
from typing import Any, Iterable, Literal, Optional, Tuple, Union | ||
|
||
from bigframes import constants | ||
|
||
FiltersType = ( | ||
Iterable[ | ||
Union[ | ||
Tuple[str, Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"], Any], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like this itself can be defined outside and reused for better readability FilterType = Tuple[str, Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"], Any]
FiltersType = Iterable[Union[FilterType, Iterable[FilterType]]] There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
Iterable[ | ||
Tuple[ | ||
str, | ||
Literal["in", "not in", "<", "<=", "==", "!=", ">=", ">"], | ||
Any, | ||
] | ||
], | ||
] | ||
], | ||
) | ||
|
||
|
||
class GBQIOMixin: | ||
def read_gbq( | ||
|
@@ -16,6 +31,8 @@ def read_gbq( | |
index_col: Iterable[str] | str = (), | ||
col_order: Iterable[str] = (), | ||
max_results: Optional[int] = None, | ||
columns: Iterable[str] = (), | ||
filters: FiltersType = (), | ||
): | ||
"""Loads a DataFrame from BigQuery. | ||
|
||
|
@@ -83,6 +100,15 @@ def read_gbq( | |
max_results (Optional[int], default None): | ||
If set, limit the maximum number of rows to fetch from the | ||
query results. | ||
columns(Iterable[str], default ()): If not empty, only these columns | ||
will be read from table. | ||
filters (List[Tuple], default ()): To filter out data. Filter syntax: | ||
[[(column, op, val), …],…] where op is [==, >, >=, <, <=, !=, in, | ||
not in] The innermost tuples are transposed into a set of filters | ||
applied through an AND operation. The outer list combines these | ||
sets of filters through an OR operation. A single list of tuples | ||
can also be used, meaning that no OR operation between set of | ||
filters is to be conducted. | ||
|
||
Returns: | ||
bigframes.dataframe.DataFrame: A DataFrame representing results of the query or table. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please pull the
columns
change out into a separate PR to be reviewed outside of the filters change. IMO,columns
andcol_order
are redundant and both should select a subset of columns. We need to keep both for compatibility see: googleapis/python-bigquery-pandas#701There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, removed.