Skip to content

Commit

Permalink
feat: Adding get_online_features_async to feature store sdk (#4172)
Browse files Browse the repository at this point in the history
* feat: Adding get_online_features_async to feature store sdk

Signed-off-by: Breno Costa <[email protected]>

* add more unit tests

Signed-off-by: Breno Costa <[email protected]>

* fix redis key generation

Signed-off-by: Breno Costa <[email protected]>

* fix unit tests

Signed-off-by: Breno Costa <[email protected]>

---------

Signed-off-by: Breno Costa <[email protected]>
  • Loading branch information
breno-costa authored May 9, 2024
1 parent 369ca98 commit 311efc5
Show file tree
Hide file tree
Showing 9 changed files with 522 additions and 56 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<img src="docs/assets/feast_logo.png" width="550">
</a>
</p>
<br />
<br />

[![unit-tests](https://github.com/feast-dev/feast/actions/workflows/unit_tests.yml/badge.svg?branch=master&event=push)](https://github.com/feast-dev/feast/actions/workflows/unit_tests.yml)
[![integration-tests-and-build](https://github.com/feast-dev/feast/actions/workflows/master_only.yml/badge.svg?branch=master&event=push)](https://github.com/feast-dev/feast/actions/workflows/master_only.yml)
Expand Down
233 changes: 206 additions & 27 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1550,6 +1550,54 @@ def get_online_features(
native_entity_values=True,
)

@log_exceptions_and_usage
async def get_online_features_async(
self,
features: Union[List[str], FeatureService],
entity_rows: List[Dict[str, Any]],
full_feature_names: bool = False,
) -> OnlineResponse:
"""
[Alpha] Retrieves the latest online feature data asynchronously.
Note: This method will download the full feature registry the first time it is run. If you are using a
remote registry like GCS or S3 then that may take a few seconds. The registry remains cached up to a TTL
duration (which can be set to infinity). If the cached registry is stale (more time than the TTL has
passed), then a new registry will be downloaded synchronously by this method. This download may
introduce latency to online feature retrieval. In order to avoid synchronous downloads, please call
refresh_registry() prior to the TTL being reached. Remember it is possible to set the cache TTL to
infinity (cache forever).
Args:
features: The list of features that should be retrieved from the online store. These features can be
specified either as a list of string feature references or as a feature service. String feature
references must have format "feature_view:feature", e.g. "customer_fv:daily_transactions".
entity_rows: A list of dictionaries where each key-value is an entity-name, entity-value pair.
full_feature_names: If True, feature names will be prefixed with the corresponding feature view name,
changing them from the format "feature" to "feature_view__feature" (e.g. "daily_transactions"
changes to "customer_fv__daily_transactions").
Returns:
OnlineResponse containing the feature data in records.
Raises:
Exception: No entity with the specified name exists.
"""
columnar: Dict[str, List[Any]] = {k: [] for k in entity_rows[0].keys()}
for entity_row in entity_rows:
for key, value in entity_row.items():
try:
columnar[key].append(value)
except KeyError as e:
raise ValueError("All entity_rows must have the same keys.") from e

return await self._get_online_features_async(
features=features,
entity_values=columnar,
full_feature_names=full_feature_names,
native_entity_values=True,
)

def _get_online_request_context(
self, features: Union[List[str], FeatureService], full_feature_names: bool
):
Expand Down Expand Up @@ -1609,7 +1657,7 @@ def _get_online_request_context(
entityless_case,
)

def _get_online_features(
def _prepare_entities_to_read_from_online_store(
self,
features: Union[List[str], FeatureService],
entity_values: Mapping[
Expand All @@ -1619,7 +1667,7 @@ def _get_online_features(
native_entity_values: bool = True,
):
(
_feature_refs,
feature_refs,
requested_on_demand_feature_views,
entity_name_to_join_key_map,
entity_type_map,
Expand Down Expand Up @@ -1694,6 +1742,40 @@ def _get_online_features(
[DUMMY_ENTITY_VAL] * num_rows, DUMMY_ENTITY.value_type
)

return (
join_key_values,
grouped_refs,
entity_name_to_join_key_map,
requested_on_demand_feature_views,
feature_refs,
requested_result_row_names,
online_features_response,
)

def _get_online_features(
self,
features: Union[List[str], FeatureService],
entity_values: Mapping[
str, Union[Sequence[Any], Sequence[Value], RepeatedValue]
],
full_feature_names: bool = False,
native_entity_values: bool = True,
):
(
join_key_values,
grouped_refs,
entity_name_to_join_key_map,
requested_on_demand_feature_views,
feature_refs,
requested_result_row_names,
online_features_response,
) = self._prepare_entities_to_read_from_online_store(
features=features,
entity_values=entity_values,
full_feature_names=full_feature_names,
native_entity_values=native_entity_values,
)

provider = self._get_provider()
for table, requested_features in grouped_refs:
# Get the correct set of entity values with the correct join keys.
Expand Down Expand Up @@ -1724,7 +1806,71 @@ def _get_online_features(
if requested_on_demand_feature_views:
self._augment_response_with_on_demand_transforms(
online_features_response,
_feature_refs,
feature_refs,
requested_on_demand_feature_views,
full_feature_names,
)

self._drop_unneeded_columns(
online_features_response, requested_result_row_names
)
return OnlineResponse(online_features_response)

async def _get_online_features_async(
self,
features: Union[List[str], FeatureService],
entity_values: Mapping[
str, Union[Sequence[Any], Sequence[Value], RepeatedValue]
],
full_feature_names: bool = False,
native_entity_values: bool = True,
):
(
join_key_values,
grouped_refs,
entity_name_to_join_key_map,
requested_on_demand_feature_views,
feature_refs,
requested_result_row_names,
online_features_response,
) = self._prepare_entities_to_read_from_online_store(
features=features,
entity_values=entity_values,
full_feature_names=full_feature_names,
native_entity_values=native_entity_values,
)

provider = self._get_provider()
for table, requested_features in grouped_refs:
# Get the correct set of entity values with the correct join keys.
table_entity_values, idxs = self._get_unique_entities(
table,
join_key_values,
entity_name_to_join_key_map,
)

# Fetch feature data for the minimum set of Entities.
feature_data = await self._read_from_online_store_async(
table_entity_values,
provider,
requested_features,
table,
)

# Populate the result_rows with the Features from the OnlineStore inplace.
self._populate_response_from_feature_data(
feature_data,
idxs,
online_features_response,
full_feature_names,
requested_features,
table,
)

if requested_on_demand_feature_views:
self._augment_response_with_on_demand_transforms(
online_features_response,
feature_refs,
requested_on_demand_feature_views,
full_feature_names,
)
Expand Down Expand Up @@ -1965,38 +2111,24 @@ def _get_unique_entities(
)
return unique_entities, indexes

def _read_from_online_store(
def _get_entity_key_protos(
self,
entity_rows: Iterable[Mapping[str, Value]],
provider: Provider,
requested_features: List[str],
table: FeatureView,
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
"""Read and process data from the OnlineStore for a given FeatureView.
This method guarantees that the order of the data in each element of the
List returned is the same as the order of `requested_features`.
This method assumes that `provider.online_read` returns data for each
combination of Entities in `entity_rows` in the same order as they
are provided.
"""
) -> List[EntityKeyProto]:
# Instantiate one EntityKeyProto per Entity.
entity_key_protos = [
EntityKeyProto(join_keys=row.keys(), entity_values=row.values())
for row in entity_rows
]
return entity_key_protos

# Fetch data for Entities.
read_rows = provider.online_read(
config=self.config,
table=table,
entity_keys=entity_key_protos,
requested_features=requested_features,
)

# Each row is a set of features for a given entity key. We only need to convert
# the data to Protobuf once.
def _convert_rows_to_protobuf(
self,
requested_features: List[str],
read_rows: List[Tuple[Optional[datetime], Optional[Dict[str, Value]]]],
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
# Each row is a set of features for a given entity key.
# We only need to convert the data to Protobuf once.
null_value = Value()
read_row_protos = []
for read_row in read_rows:
Expand All @@ -2023,6 +2155,53 @@ def _read_from_online_store(
read_row_protos.append((event_timestamps, statuses, values))
return read_row_protos

def _read_from_online_store(
self,
entity_rows: Iterable[Mapping[str, Value]],
provider: Provider,
requested_features: List[str],
table: FeatureView,
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
"""Read and process data from the OnlineStore for a given FeatureView.
This method guarantees that the order of the data in each element of the
List returned is the same as the order of `requested_features`.
This method assumes that `provider.online_read` returns data for each
combination of Entities in `entity_rows` in the same order as they
are provided.
"""
entity_key_protos = self._get_entity_key_protos(entity_rows)

# Fetch data for Entities.
read_rows = provider.online_read(
config=self.config,
table=table,
entity_keys=entity_key_protos,
requested_features=requested_features,
)

return self._convert_rows_to_protobuf(requested_features, read_rows)

async def _read_from_online_store_async(
self,
entity_rows: Iterable[Mapping[str, Value]],
provider: Provider,
requested_features: List[str],
table: FeatureView,
) -> List[Tuple[List[Timestamp], List["FieldStatus.ValueType"], List[Value]]]:
entity_key_protos = self._get_entity_key_protos(entity_rows)

# Fetch data for Entities.
read_rows = await provider.online_read_async(
config=self.config,
table=table,
entity_keys=entity_key_protos,
requested_features=requested_features,
)

return self._convert_rows_to_protobuf(requested_features, read_rows)

def _retrieve_from_online_store(
self,
provider: Provider,
Expand Down
25 changes: 25 additions & 0 deletions sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,31 @@ def online_read(
"""
pass

async def online_read_async(
self,
config: RepoConfig,
table: FeatureView,
entity_keys: List[EntityKeyProto],
requested_features: Optional[List[str]] = None,
) -> List[Tuple[Optional[datetime], Optional[Dict[str, ValueProto]]]]:
"""
Reads features values for the given entity keys asynchronously.
Args:
config: The config for the current feature store.
table: The feature view whose feature values should be read.
entity_keys: The list of entity keys for which feature values should be read.
requested_features: The list of features that should be read.
Returns:
A list of the same length as entity_keys. Each item in the list is a tuple where the first
item is the event timestamp for the row, and the second item is a dict mapping feature names
to values, which are returned in proto format.
"""
raise NotImplementedError(
f"Online store {self.__class__.__name__} does not support online read async"
)

@abstractmethod
def update(
self,
Expand Down
Loading

0 comments on commit 311efc5

Please sign in to comment.