Skip to content

Commit

Permalink
fix: Enable Spark materialization on Yarn (#3370)
Browse files Browse the repository at this point in the history
* fix: Fix Spark materialization engine to work on Yarn

Signed-off-by: Danny Chiao <[email protected]>

* fix

Signed-off-by: Danny Chiao <[email protected]>

* fix

Signed-off-by: Danny Chiao <[email protected]>

* add docs

Signed-off-by: Danny Chiao <[email protected]>

Signed-off-by: Danny Chiao <[email protected]>
  • Loading branch information
adchia authored Dec 1, 2022
1 parent 00fa21f commit 0c20a4e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
34 changes: 34 additions & 0 deletions docs/reference/batch-materialization/spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,37 @@ batch_engine:
partitions: [optional num partitions to use to write to online store]
```
{% endcode %}
## Example in Python
{% code title="feature_store.py" %}
```python
from feast import FeatureStore, RepoConfig
from feast.repo_config import RegistryConfig
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig
from feast.infra.offline_stores.contrib.spark_offline_store.spark import SparkOfflineStoreConfig

repo_config = RepoConfig(
registry="s3://[YOUR_BUCKET]/feast-registry.db",
project="feast_repo",
provider="aws",
offline_store=SparkOfflineStoreConfig(
spark_conf={
"spark.ui.enabled": "false",
"spark.eventLog.enabled": "false",
"spark.sql.catalogImplementation": "hive",
"spark.sql.parser.quotedRegexColumnNames": "true",
"spark.sql.session.timeZone": "UTC"
}
),
batch_engine={
"type": "spark.engine",
"partitions": 10
},
online_store=DynamoDBOnlineStoreConfig(region="us-west-1"),
entity_key_serialization_version=2
)

store = FeatureStore(config=repo_config)
```
{% endcode %}
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import tempfile
from dataclasses import dataclass
from datetime import datetime
from typing import Callable, List, Literal, Optional, Sequence, Union
Expand Down Expand Up @@ -196,7 +195,7 @@ class _SparkSerializedArtifacts:
"""Class to assist with serializing unpicklable artifacts to the spark workers"""

feature_view_proto: str
repo_config_file: str
repo_config_byte: str

@classmethod
def serialize(cls, feature_view, repo_config):
Expand All @@ -205,12 +204,10 @@ def serialize(cls, feature_view, repo_config):
feature_view_proto = feature_view.to_proto().SerializeToString()

# serialize repo_config to disk. Will be used to instantiate the online store
repo_config_file = tempfile.NamedTemporaryFile(delete=False).name
with open(repo_config_file, "wb") as f:
dill.dump(repo_config, f)
repo_config_byte = dill.dumps(repo_config)

return _SparkSerializedArtifacts(
feature_view_proto=feature_view_proto, repo_config_file=repo_config_file
feature_view_proto=feature_view_proto, repo_config_byte=repo_config_byte
)

def unserialize(self):
Expand All @@ -220,8 +217,7 @@ def unserialize(self):
feature_view = FeatureView.from_proto(proto)

# load
with open(self.repo_config_file, "rb") as f:
repo_config = dill.load(f)
repo_config = dill.loads(self.repo_config_byte)

provider = PassthroughProvider(repo_config)
online_store = provider.online_store
Expand Down

0 comments on commit 0c20a4e

Please sign in to comment.