Skip to content

Commit

Permalink
feat: Stream Feature View FCOS (#2750)
Browse files Browse the repository at this point in the history
* Fix working version

Signed-off-by: Kevin Zhang <[email protected]>

* Working commit

Signed-off-by: Kevin Zhang <[email protected]>

* Fixes

Signed-off-by: Kevin Zhang <[email protected]>

* Fix stuffs

Signed-off-by: Kevin Zhang <[email protected]>

* Fix lint

Signed-off-by: Kevin Zhang <[email protected]>

* Fix things

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix lint

Signed-off-by: Kevin Zhang <[email protected]>

* Fix lint

Signed-off-by: Kevin Zhang <[email protected]>

* Fix lihnt

Signed-off-by: Kevin Zhang <[email protected]>

* Fix stuff

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix lint

Signed-off-by: Kevin Zhang <[email protected]>

* Fix unit tests

Signed-off-by: Kevin Zhang <[email protected]>

* Address review comments

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix lint

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fixed

Signed-off-by: Kevin Zhang <[email protected]>

* Unsaved changes

Signed-off-by: Kevin Zhang <[email protected]>
  • Loading branch information
kevjumba authored Jun 1, 2022
1 parent 5a7fcef commit 0cf3c92
Show file tree
Hide file tree
Showing 18 changed files with 761 additions and 36 deletions.
14 changes: 14 additions & 0 deletions protos/feast/core/Aggregation.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";
package feast.core;

option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
option java_outer_classname = "AggregationProto";
option java_package = "feast.proto.core";

import "google/protobuf/duration.proto";

message Aggregation {
string column = 1;
string function = 2;
google.protobuf.Duration time_window = 3;
}
9 changes: 7 additions & 2 deletions protos/feast/core/DataFormat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ option java_package = "feast.proto.core";
message FileFormat {
// Defines options for the Parquet data format
message ParquetFormat {}

oneof format {
ParquetFormat parquet_format = 1;
}
Expand All @@ -40,17 +40,22 @@ message StreamFormat {
// Feature data from the obtained stream message
string class_path = 1;
}

// Defines options for the avro data format
message AvroFormat {
// Optional if used in a File DataSource as schema is embedded in avro file.
// Specifies the schema of the Avro message as JSON string.
string schema_json = 1;
}

message JsonFormat {
string schema_json = 1;
}

// Specifies the data format and format specific options
oneof format {
AvroFormat avro_format = 1;
ProtoFormat proto_format = 2;
JsonFormat json_format = 3;
}
}
1 change: 0 additions & 1 deletion protos/feast/core/FeatureService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
option java_outer_classname = "FeatureServiceProto";
option java_package = "feast.proto.core";

import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "feast/core/FeatureViewProjection.proto";

Expand Down
4 changes: 3 additions & 1 deletion protos/feast/core/Registry.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,21 @@ import "feast/core/FeatureView.proto";
import "feast/core/InfraObject.proto";
import "feast/core/OnDemandFeatureView.proto";
import "feast/core/RequestFeatureView.proto";
import "feast/core/StreamFeatureView.proto";
import "feast/core/DataSource.proto";
import "feast/core/SavedDataset.proto";
import "feast/core/ValidationProfile.proto";
import "google/protobuf/timestamp.proto";

// Next id: 14
// Next id: 15
message Registry {
repeated Entity entities = 1;
repeated FeatureTable feature_tables = 2;
repeated FeatureView feature_views = 6;
repeated DataSource data_sources = 12;
repeated OnDemandFeatureView on_demand_feature_views = 8;
repeated RequestFeatureView request_feature_views = 9;
repeated StreamFeatureView stream_feature_views = 14;
repeated FeatureService feature_services = 7;
repeated SavedDataset saved_datasets = 11;
repeated ValidationReference validation_references = 13;
Expand Down
98 changes: 98 additions & 0 deletions protos/feast/core/StreamFeatureView.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
//
// Copyright 2020 The Feast Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//


syntax = "proto3";
package feast.core;

option go_package = "github.com/feast-dev/feast/go/protos/feast/core";
option java_outer_classname = "StreamFeatureViewProto";
option java_package = "feast.proto.core";


import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "feast/core/OnDemandFeatureView.proto";
import "feast/core/Feature.proto";
import "feast/core/DataSource.proto";
import "feast/core/Aggregation.proto";

message StreamFeatureView {
// User-specified specifications of this feature view.
StreamFeatureViewSpec spec = 1;
StreamFeatureViewMeta meta = 2;
}

// Next available id: 17
message StreamFeatureViewSpec {
// Name of the feature view. Must be unique. Not updated.
string name = 1;

// Name of Feast project that this feature view belongs to.
string project = 2;

// List of names of entities associated with this feature view.
repeated string entities = 3;

// List of specifications for each feature defined as part of this feature view.
repeated FeatureSpecV2 features = 4;

// List of specifications for each entity defined as part of this feature view.
repeated FeatureSpecV2 entity_columns = 5;

// Description of the feature view.
string description = 6;

// User defined metadata
map<string,string> tags = 7;

// Owner of the feature view.
string owner = 8;

// Features in this feature view can only be retrieved from online serving
// younger than ttl. Ttl is measured as the duration of time between
// the feature's event timestamp and when the feature is retrieved
// Feature values outside ttl will be returned as unset values and indicated to end user
google.protobuf.Duration ttl = 9;

// Batch/Offline DataSource where this view can retrieve offline feature data.
DataSource batch_source = 10;
// Streaming DataSource from where this view can consume "online" feature data.
DataSource stream_source = 11;

// Whether these features should be served online or not
bool online = 12;

// Serialized function that is encoded in the streamfeatureview
UserDefinedFunction user_defined_function = 13;

// Mode of execution
string mode = 14;

// Aggregation definitions
repeated Aggregation aggregations = 15;

// Timestamp field for aggregation
string timestamp_field = 16;
}

message StreamFeatureViewMeta {
// Time where this Feature View is created
google.protobuf.Timestamp created_timestamp = 1;

// Time where this Feature View is last updated
google.protobuf.Timestamp last_updated_timestamp = 2;
}
2 changes: 0 additions & 2 deletions protos/feast/core/ValidationProfile.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ option java_package = "feast.proto.core";
option java_outer_classname = "ValidationProfile";
option go_package = "github.com/feast-dev/feast/go/protos/feast/core";

import "feast/core/SavedDataset.proto";

message GEValidationProfiler {
message UserDefinedProfiler {
// The python-syntax function body (serialized by dill)
Expand Down
69 changes: 69 additions & 0 deletions sdk/python/feast/aggregation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from datetime import timedelta
from typing import Optional

from google.protobuf.duration_pb2 import Duration

from feast.protos.feast.core.Aggregation_pb2 import Aggregation as AggregationProto


class Aggregation:
"""
NOTE: Feast-handled aggregations are not yet supported. This class provides a way to register user-defined aggregations.
Attributes:
column: str # Column name of the feature we are aggregating.
function: str # Provided built in aggregations sum, max, min, count mean
time_window: timedelta # The time window for this aggregation.
"""

column: str
function: str
time_window: Optional[timedelta]

def __init__(
self,
column: Optional[str] = "",
function: Optional[str] = "",
time_window: Optional[timedelta] = None,
):
self.column = column or ""
self.function = function or ""
self.time_window = time_window

def to_proto(self) -> AggregationProto:
window_duration = None
if self.time_window is not None:
window_duration = Duration()
window_duration.FromTimedelta(self.time_window)

return AggregationProto(
column=self.column, function=self.function, time_window=window_duration
)

@classmethod
def from_proto(cls, agg_proto: AggregationProto):
time_window = (
timedelta(days=0)
if agg_proto.time_window.ToNanoseconds() == 0
else agg_proto.time_window.ToTimedelta()
)

aggregation = cls(
column=agg_proto.column,
function=agg_proto.function,
time_window=time_window,
)
return aggregation

def __eq__(self, other):
if not isinstance(other, Aggregation):
raise TypeError("Comparisons should only involve Aggregations.")

if (
self.column != other.column
or self.function != other.function
or self.time_window != other.time_window
):
return False

return True
24 changes: 24 additions & 0 deletions sdk/python/feast/data_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def from_proto(cls, proto):
fmt = proto.WhichOneof("format")
if fmt == "avro_format":
return AvroFormat(schema_json=proto.avro_format.schema_json)
if fmt == "json_format":
return JsonFormat(schema_json=proto.json_format.schema_json)
if fmt == "proto_format":
return ProtoFormat(class_path=proto.proto_format.class_path)
raise NotImplementedError(f"StreamFormat is unsupported: {fmt}")
Expand All @@ -113,6 +115,28 @@ def to_proto(self):
return StreamFormatProto(avro_format=proto)


class JsonFormat(StreamFormat):
"""
Defines the Json streaming data format that encodes data in Json format
"""

def __init__(self, schema_json: str):
"""
Construct a new Json data format.
For spark, uses pyspark ddl string format. Example shown here:
https://vincent.doba.fr/posts/20211004_spark_data_description_language_for_defining_spark_schema/
Args:
schema_json: Json schema definition
"""
self.schema_json = schema_json

def to_proto(self):
proto = StreamFormatProto.JsonFormat(schema_json=self.schema_json)
return StreamFormatProto(json_format=proto)


class ProtoFormat(StreamFormat):
"""
Defines the Protobuf data format
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ def __init__(
if _message_format is None:
raise ValueError("Message format must be specified for Kafka source")

if not timestamp_field and not _event_timestamp_column:
raise ValueError("Timestamp field must be specified for Kafka source")

super().__init__(
event_timestamp_column=_event_timestamp_column,
created_timestamp_column=created_timestamp_column,
Expand Down
Loading

0 comments on commit 0cf3c92

Please sign in to comment.