-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Improve BQ point-in-time joining scalability #3429
fix: Improve BQ point-in-time joining scalability #3429
Conversation
/assign @mavysavydav |
@@ -917,7 +931,7 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField] | |||
,created_timestamp | |||
{% endif %} | |||
) | |||
){% if loop.last %}{% else %}, {% endif %} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
haven't taken a close look, but seems like you removed this line even though there's a loop. Was that intended?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to add comma when using temp table. Treat it as a single script separately using semicolon instead
@@ -488,10 +488,24 @@ def to_bigquery( | |||
return str(job_config.destination) | |||
|
|||
with self._query_generator() as query: | |||
self._execute_query(query, job_config, timeout) | |||
dest = job_config.destination | |||
# because setting destination for scripts is not valid |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this just not working before? Seems like it was using the job_config.destination before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when we run multiple scripts separated by semicolon, BigQuery just doesn't accept passing destination explicitly, will throw error
google.api_core.exceptions.BadRequest: 400 configuration.query.destinationTable cannot be set for scripts
that's why I add some workaround here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
simply retrieve BQ-generated temporary table after scripts being executed (should look like this <project_id>._08dc845331ibb01824ea0f3c3e079e93b3a076.anon933abd7cb6950c3c488af61fe94ad1713b6820cd1115fddd0c09cde32ec12823
)
then we persist it as our naming convention with prefix historical_
temp_dest_table = f"{tmp_dest['projectId']}.{tmp_dest['datasetId']}.{tmp_dest['tableId']}" | ||
|
||
# persist temp table | ||
sql = f"CREATE TABLE {dest} AS SELECT * FROM {temp_dest_table}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similarly, confused why there is a new need to do this when it seemed like maybe it worked before?
@@ -777,7 +791,7 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField] | |||
Compute a deterministic hash for the `left_table_query_string` that will be used throughout | |||
all the logic as the field to GROUP BY the data | |||
*/ | |||
WITH entity_dataframe AS ( | |||
CREATE TEMP TABLE entity_dataframe AS ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the main change here to create temp tables for everything? (and everything else got indented)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, create temp table for entity dataframe and every joining feature views.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and the temp tables only exist within the current execution session
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you able to confirm what the scope of the session is? Does each invocation of self._execute_query
create a new session? At a cursory glance, I see that we're not explicitly passing create_session=True
in the QueryJobConfig
instance. I think it would be very helpful to have a confirmation that there isn't going to be any cross contamination between concurrent historical fetches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does each invocation of self._execute_query create a new session?
yes it does
I see that we're not explicitly passing create_session=True in the QueryJobConfig instance.
we only need to declare this config when we need to reuse the session in a different client code scope. In this case, we gather all of feature views in a collection of scripts to execute once. I can confirm after the scripts are executed, temp tables do not exist anywhere else.
@@ -488,10 +488,24 @@ def to_bigquery( | |||
return str(job_config.destination) | |||
|
|||
with self._query_generator() as query: | |||
self._execute_query(query, job_config, timeout) | |||
dest = job_config.destination |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conceptually, the PR seems reasonable (once any doubts about temp table cross-contamination are put to rest). But in terms of code readability, I can't help but notice that if this method is called without job_config
, we first create it containing only the destination at the start of the method, then we remove the destination here. Perhaps this could use a bit of a cleanup where the job_config management is all in one spot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strongly agree! but in this PR I'm trying to not break any other flows. Perhaps some users would prefer passing some of their own job configurations to to_bigquery
method, that's why I only deal with the destination
attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we shouldn't allow users to pass the destination, there would be some changes needed but let's deal with them in a separate PR
FYI: spark offline store also following the similar concept |
5ea5698
to
676395b
Compare
676395b
to
48b0865
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I apologize for not getting back to this PR sooner. I think I need to fix some github notification settings so it doesn't happen again.
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: chhabrakadabra, sudohainguyen The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
48b0865
to
c813305
Compare
Signed-off-by: Danny Chiao <[email protected]>
c813305
to
3c233d9
Compare
/lgtm |
# [0.30.0](v0.29.0...v0.30.0) (2023-03-17) ### Bug Fixes * Add description attribute to the Field.from_proto method ([#3469](#3469)) ([473f8d9](473f8d9)) * Add filesystem kwargs when read prev_table on FileRetrievalJob (… ([#3491](#3491)) ([dca4745](dca4745)), closes [#3490](#3490) * Feature view `entities` from_proto type ([#3524](#3524)) ([57bbb61](57bbb61)) * Fix missing requests requirement after GCP requirement removed. Make BigQuerySource not require gcp extra ([2c85421](2c85421)) * Fix SQL Registry cache miss ([#3482](#3482)) ([3249b97](3249b97)) * Fixed path inside quickstart notebook ([#3456](#3456)) ([66edc32](66edc32)) * Improve BQ point-in-time joining scalability ([#3429](#3429)) ([ff66784](ff66784)) * Pin typeguard to 2.13.3 which is what we are currently using. ([#3542](#3542)) ([61f6fb0](61f6fb0)) * Protobuf lower bound to 3.20 to alert that Feast is incompatible with tensorflow ([#3476](#3476)) ([9ca59e3](9ca59e3)) * Spark kafka processor sorting ([#3479](#3479)) ([f2cbf43](f2cbf43)) * UI working behind base url ([#3514](#3514)) ([9a3fd98](9a3fd98)) * Update go dependencies ([#3512](#3512)) ([bada97c](bada97c)) ### Features * Add Rockset as an OnlineStore ([#3405](#3405)) ([fd91cda](fd91cda)) * Add Snowflake Registry ([#3363](#3363)) ([ec1e61d](ec1e61d)) * Adding query timeout to `to_df` and `to_arrow` retrieval methods ([#3505](#3505)) ([bab6644](bab6644)) * adds k8s config options to Bytewax materialization engine ([#3518](#3518)) ([1883f55](1883f55))
# [0.30.0](v0.29.0...v0.30.0) (2023-03-24) ### Bug Fixes * Add description attribute to the Field.from_proto method ([#3469](#3469)) ([473f8d9](473f8d9)) * Add filesystem kwargs when read prev_table on FileRetrievalJob (… ([#3491](#3491)) ([dca4745](dca4745)), closes [#3490](#3490) * Bytewax image pull secret config ([#3547](#3547)) ([d2d13b1](d2d13b1)) * Clean up Rockset Online Store for use ([#3549](#3549)) ([a76c6d0](a76c6d0)) * Feature view `entities` from_proto type ([#3524](#3524)) ([57bbb61](57bbb61)) * Fix missing requests requirement after GCP requirement removed. Make BigQuerySource not require gcp extra ([2c85421](2c85421)) * Fix SQL Registry cache miss ([#3482](#3482)) ([3249b97](3249b97)) * Fixed path inside quickstart notebook ([#3456](#3456)) ([66edc32](66edc32)) * Improve BQ point-in-time joining scalability ([#3429](#3429)) ([ff66784](ff66784)) * Pin typeguard to 2.13.3 which is what we are currently using. ([#3542](#3542)) ([61f6fb0](61f6fb0)) * Protobuf lower bound to 3.20 to alert that Feast is incompatible with tensorflow ([#3476](#3476)) ([9ca59e3](9ca59e3)) * Spark kafka processor sorting ([#3479](#3479)) ([f2cbf43](f2cbf43)) * UI working behind base url ([#3514](#3514)) ([9a3fd98](9a3fd98)) * Update go dependencies ([#3512](#3512)) ([bada97c](bada97c)) ### Features * Add Rockset as an OnlineStore ([#3405](#3405)) ([fd91cda](fd91cda)) * Add Snowflake Registry ([#3363](#3363)) ([ec1e61d](ec1e61d)) * Added SnowflakeConnection caching ([#3531](#3531)) ([f9f8df2](f9f8df2)) * Adding query timeout to `to_df` and `to_arrow` retrieval methods ([#3505](#3505)) ([bab6644](bab6644)) * adds k8s config options to Bytewax materialization engine ([#3518](#3518)) ([1883f55](1883f55))
hey @adchia , @sudohainguyen ! It appears I have started having problems with BQ offline queries after I bumped feast version from 0.28.0 to 0.30.2. I think it has something to do with the temp table mechanism, likely temp tables are not properly re-created / replaced/ removed. I am not sure how to solve the problem, maybe adding a random suffix to a temp table name upon each new request could help to avoid the collisions. |
hmm, did you check your data? In case your feature view ttl is 1, and there were no records of that entity on 2022-05-26 but 25th, the results are expected to be the same. FYI: temp table only be reused when the query is exactly the same. |
@sudohainguyen , the problem has gone when I downgraded feast to TTL = 0. so here is the data-snippet I spot the problem on:
here is the test code:
it passed from the first attempt, but when I ran it the second time -> it failed, with all entries for the I am not sure where the problem lies, but it occurs only in feast Do you have any ideas how to fix the problem? |
weird, I didn't experience the same, my TTL is 1 btw |
I conducted retrieval with 5 entities and 2 different timestamps (2023/03/24 and 2023/03/25) in the same notebook session and they are not identical :( |
Signed-off-by: Hai Nguyen [email protected]
What this PR does / why we need it: point-in-time joining in BQ offline store can be more scalable
Which issue(s) this PR fixes:
Fixes #3426
Fixes #3003