Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update DFP integrated pipeline to use MRC Router node #2050

Merged

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ Subclass of `DateTimeColumn`, counts the unique occurrences of a value in `group
![Input Stages](img/dfp_input_config.png)

#### Source Stage (`MultiFileSource`)
The `MultiFileSource` (`examples/digital_fingerprinting/production/morpheus/dfp/stages/multi_file_source.py`) receives a path or list of paths (`filenames`), and will collectively be emitted into the pipeline as an [`fsspec.core.OpenFiles`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.OpenFiles) object. The paths may include wildcards `*` as well as URLs (ex: `s3://path`) to remote storage providers such as S3, FTP, GCP, Azure, Databricks and others as defined by [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files). In addition to this paths can be cached locally by prefixing them with `filecache::` (ex: `filecache::s3://bucket-name/key-name`).
The `MultiFileSource` (`python/morpheus/morpheus/modules/input/multi_file_source.py`) receives a path or list of paths (`filenames`), and will collectively be emitted into the pipeline as an [`fsspec.core.OpenFiles`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.OpenFiles) object. The paths may include wildcards `*` as well as URLs (ex: `s3://path`) to remote storage providers such as S3, FTP, GCP, Azure, Databricks and others as defined by [`fsspec`](https://filesystem-spec.readthedocs.io/en/latest/api.html?highlight=open_files#fsspec.open_files). In addition to this paths can be cached locally by prefixing them with `filecache::` (ex: `filecache::s3://bucket-name/key-name`).

> **Note:** This stage does not actually download the data files, allowing the file list to be filtered and batched prior to being downloaded.

Expand All @@ -187,7 +187,7 @@ The `MultiFileSource` (`examples/digital_fingerprinting/production/morpheus/dfp/


#### File Batcher Stage (`DFPFileBatcherStage`)
The `DFPFileBatcherStage` (`examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_batcher_stage.py`) groups data in the incoming `DataFrame` in batches of a time period (per day default), and optionally filtering incoming data to a specific time window. This stage can potentially improve performance by combining multiple small files into a single batch. This stage assumes that the date of the logs can be easily inferred such as encoding the creation time in the file name (for example, `AUTH_LOG-2022-08-21T22.05.23Z.json`), or using the modification time as reported by the file system. The actual method for extracting the date is encoded in a user-supplied `date_conversion_func` function (more on this later).
The `DFPFileBatcherStage` (`python/morpheus_dfp/morpheus_dfp/stages/dfp_file_batcher_stage.py`) groups data in the incoming `DataFrame` in batches of a time period (per day default), and optionally filtering incoming data to a specific time window. This stage can potentially improve performance by combining multiple small files into a single batch. This stage assumes that the date of the logs can be easily inferred such as encoding the creation time in the file name (for example, `AUTH_LOG-2022-08-21T22.05.23Z.json`), or using the modification time as reported by the file system. The actual method for extracting the date is encoded in a user-supplied `date_conversion_func` function (more on this later).

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand Down Expand Up @@ -219,7 +219,7 @@ pipeline.add_stage(
> **Note:** If `date_conversion_func` returns time-zone aware timestamps, then `start_time` and `end_time` if not `None` need to also be timezone aware `datetime` objects.

#### File to DataFrame Stage (`DFPFileToDataFrameStage`)
The `DFPFileToDataFrameStage` (`examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_file_to_df.py`) stage receives a `list` of an [`fsspec.core.OpenFiles`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.OpenFiles) and loads them into a single `DataFrame` which is then emitted into the pipeline. When the parent stage is `DFPFileBatcherStage` each batch (typically one day) is concatenated into a single `DataFrame`. If the parent was `MultiFileSource` the entire dataset is loaded into a single `DataFrame`. Because of this, it is important to choose a `period` argument for `DFPFileBatcherStage` small enough such that each batch can fit into memory.
The `DFPFileToDataFrameStage` (`python/morpheus_dfp/morpheus_dfp/stages/dfp_file_to_df.py`) stage receives a `list` of an [`fsspec.core.OpenFiles`](https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.core.OpenFiles) and loads them into a single `DataFrame` which is then emitted into the pipeline. When the parent stage is `DFPFileBatcherStage` each batch (typically one day) is concatenated into a single `DataFrame`. If the parent was `MultiFileSource` the entire dataset is loaded into a single `DataFrame`. Because of this, it is important to choose a `period` argument for `DFPFileBatcherStage` small enough such that each batch can fit into memory.

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand Down Expand Up @@ -251,7 +251,7 @@ This final stage will write all received messages to a single output file in eit
| `overwrite` | `bool` | Optional, defaults to `False`. If the file specified in `filename` already exists, it will be overwritten if this option is set to `True` |

#### Write to S3 Stage (`WriteToS3Stage`)
The {py:obj}`~dfp.stages.write_to_s3_stage.WriteToS3Stage` stage writes the resulting anomaly detections to S3. The `WriteToS3Stage` decouples the S3 specific operations from the Morpheus stage, and as such receives an `s3_writer` argument.
The {py:obj}`~morpheus_dfp.stages.write_to_s3_stage.WriteToS3Stage` stage writes the resulting anomaly detections to S3. The `WriteToS3Stage` decouples the S3 specific operations from the Morpheus stage, and as such receives an `s3_writer` argument.

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand All @@ -262,7 +262,7 @@ The {py:obj}`~dfp.stages.write_to_s3_stage.WriteToS3Stage` stage writes the resu
These stages are common to both the training and inference pipelines, unlike the input and output stages these are specific to the DFP pipeline and intended to be configured but not replaceable.

#### Split Users Stage (`DFPSplitUsersStage`)
The {py:obj}`~dfp.stages.dfp_split_users_stage.DFPSplitUsersStage` stage receives an incoming `DataFrame` and emits a `list` of `DFPMessageMeta` where each `DFPMessageMeta` represents the records associated for a given user. This allows for downstream stages to perform all necessary operations on a per user basis.
The {py:obj}`~morpheus_dfp.stages.dfp_split_users_stage.DFPSplitUsersStage` stage receives an incoming `DataFrame` and emits a `list` of `DFPMessageMeta` where each `DFPMessageMeta` represents the records associated for a given user. This allows for downstream stages to perform all necessary operations on a per user basis.

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand All @@ -273,7 +273,7 @@ The {py:obj}`~dfp.stages.dfp_split_users_stage.DFPSplitUsersStage` stage receive
| `only_users` | `List[str]` or `None` | Limit records to a specific list of users, when `include_generic` is `True` the generic user's records will also be limited to the users in this list. Mutually exclusive with `skip_users`. |

#### Rolling Window Stage (`DFPRollingWindowStage`)
The {py:obj}`~dfp.stages.dfp_rolling_window_stage.DFPRollingWindowStage` stage performs several key pieces of functionality for DFP.
The {py:obj}`~morpheus_dfp.stages.dfp_rolling_window_stage.DFPRollingWindowStage` stage performs several key pieces of functionality for DFP.
<!-- Work-around for https://github.com/errata-ai/vale/issues/874 -->
<!-- vale off -->
1. This stage keeps a moving window of logs on a per user basis
Expand All @@ -299,7 +299,7 @@ The {py:obj}`~dfp.stages.dfp_rolling_window_stage.DFPRollingWindowStage` stage p
> **Note:** this stage computes a row hash for the first and last rows of the incoming `DataFrame` as such all data contained must be hashable, any non-hashable values such as `lists` should be dropped or converted into hashable types in the `DFPFileToDataFrameStage`.

#### Preprocessing Stage (`DFPPreprocessingStage`)
The {py:obj}`~dfp.stages.dfp_preprocessing_stage.DFPPreprocessingStage` stage, the actual logic of preprocessing is defined in the `input_schema` argument. Since this stage occurs in the pipeline after the `DFPFileBatcherStage` and `DFPSplitUsersStage` stages all records in the incoming `DataFrame` correspond to only a single user within a specific time period allowing for columns to be computer on a per-user per-time period basis such as the `logcount` and `locincrement` features mentioned above. Making the type of processing performed in this stage different from those performed in the `DFPFileToDataFrameStage`.
The {py:obj}`~morpheus_dfp.stages.dfp_preprocessing_stage.DFPPreprocessingStage` stage, the actual logic of preprocessing is defined in the `input_schema` argument. Since this stage occurs in the pipeline after the `DFPFileBatcherStage` and `DFPSplitUsersStage` stages all records in the incoming `DataFrame` correspond to only a single user within a specific time period allowing for columns to be computer on a per-user per-time period basis such as the `logcount` and `locincrement` features mentioned above. Making the type of processing performed in this stage different from those performed in the `DFPFileToDataFrameStage`.

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand All @@ -316,7 +316,7 @@ After training the generic model, individual user models can be trained. Individ
### Training Stages

#### Training Stage (`DFPTraining`)
The {py:obj}`~dfp.stages.dfp_training.DFPTraining` trains a model for each incoming `DataFrame` and emits an instance of `morpheus.messages.ControlMessage` containing the trained model.
The {py:obj}`~morpheus_dfp.stages.dfp_training.DFPTraining` trains a model for each incoming `DataFrame` and emits an instance of `morpheus.messages.ControlMessage` containing the trained model.

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand All @@ -326,7 +326,7 @@ The {py:obj}`~dfp.stages.dfp_training.DFPTraining` trains a model for each incom
| `validation_size` | `float` | Proportion of the input dataset to use for training validation. Should be between 0.0 and 1.0. Default is 0.0.|

#### MLflow Model Writer Stage (`DFPMLFlowModelWriterStage`)
The {py:obj}`~dfp.stages.dfp_mlflow_model_writer.DFPMLFlowModelWriterStage` stage publishes trained models into MLflow, skipping any model which lacked sufficient training data (current required minimum is 300 log records).
The {py:obj}`~morpheus_dfp.stages.dfp_mlflow_model_writer.DFPMLFlowModelWriterStage` stage publishes trained models into MLflow, skipping any model which lacked sufficient training data (current required minimum is 300 log records).

| Argument | Type | Description |
| -------- | ---- | ----------- |
Expand All @@ -343,7 +343,7 @@ The {py:obj}`~dfp.stages.dfp_mlflow_model_writer.DFPMLFlowModelWriterStage` stag
### Inference Stages

#### Inference Stage (`DFPInferenceStage`)
The {py:obj}`~dfp.stages.dfp_inference_stage.DFPInferenceStage` stage loads models from MLflow and performs inferences against those models. This stage emits a message containing the original `DataFrame` along with new columns containing the z score (`mean_abs_z`), as well as the name and version of the model that generated that score (`model_version`). For each feature in the model, three additional columns will also be added:
The {py:obj}`~morpheus_dfp.stages.dfp_inference_stage.DFPInferenceStage` stage loads models from MLflow and performs inferences against those models. This stage emits a message containing the original `DataFrame` along with new columns containing the z score (`mean_abs_z`), as well as the name and version of the model that generated that score (`model_version`). For each feature in the model, three additional columns will also be added:
* `<feature name>_loss` : The loss
* `<feature name>_z_loss` : The loss z-score
* `<feature name>_pred` : The predicted value
Expand All @@ -370,4 +370,4 @@ The {py:obj}`~morpheus.stages.postprocess.filter_detections_stage.FilterDetectio
| `field_name` | `str` | `probs` | Name of the tensor (`filter_source=FilterSource.TENSOR`) or DataFrame column (`filter_source=FilterSource.DATAFRAME`) to use as the filter criteria. |

#### Post Processing Stage (`DFPPostprocessingStage`)
The {py:obj}`~dfp.stages.dfp_postprocessing_stage.DFPPostprocessingStage` stage adds a new `event_time` column to the DataFrame indicating the time which Morpheus detected the anomalous messages, and replaces any `NAN` values with the a string value of `'NaN'`.
The {py:obj}`~morpheus_dfp.stages.dfp_postprocessing_stage.DFPPostprocessingStage` stage adds a new `event_time` column to the DataFrame indicating the time which Morpheus detected the anomalous messages, and replaces any `NAN` values with the a string value of `'NaN'`.
2 changes: 1 addition & 1 deletion docs/source/extra_info/troubleshooting.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ Error trying to get model

Traceback (most recent call last):

File "/workspace/examples/digital_fingerprinting/production/morpheus/dfp/stages/dfp_inference_stage.py", line 101, in on_data
File "/workspace/python/morpheus_dfp/morpheus_dfp/stages/dfp_inference_stage.py", line 101, in on_data

loaded_model = model_cache.load_model(self._client)
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
# limitations under the License.

import logging
import typing
from datetime import datetime

import click
Expand Down Expand Up @@ -120,17 +119,17 @@
@click.option('--silence_monitors', flag_value=True, help='Controls whether monitors will be verbose.')
def run_pipeline(source: str,
train_users: str,
skip_user: typing.Tuple[str],
only_user: typing.Tuple[str],
start_time: datetime,
skip_user: tuple[str],
only_user: tuple[str],
start_time: datetime | None,
duration: str,
cache_dir: str,
log_level: int,
sample_rate_s: int,
tracking_uri,
silence_monitors,
mlflow_experiment_name_template,
mlflow_model_name_template,
tracking_uri: str,
silence_monitors: bool,
mlflow_experiment_name_template: str | None,
mlflow_model_name_template: str | None,
**kwargs):
if (skip_user and only_user):
logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting")
Expand Down
23 changes: 16 additions & 7 deletions python/morpheus_dfp/morpheus_dfp/modules/dfp_deployment.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
import logging

import mrc
from mrc.core.node import Broadcast
from mrc.core.node import Router

from morpheus.messages import ControlMessage
from morpheus.utils.loader_ids import FSSPEC_LOADER
from morpheus.utils.module_ids import DATA_LOADER
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
Expand Down Expand Up @@ -165,7 +166,7 @@ def dfp_deployment(builder: mrc.Builder):
# |
# v
# +-------------------------------------+
# | broadcast |
# | router |
# +-------------------------------------+
# / \
# / \
Expand Down Expand Up @@ -205,13 +206,21 @@ def dfp_deployment(builder: mrc.Builder):
"dfp_inference_pipe",
dfp_inference_pipe_conf)

# Create broadcast node to fork the pipeline.
broadcast = Broadcast(builder, "broadcast")
def router_key_fn(cm: ControlMessage) -> str:
if cm.has_task("training"):
return "training"
if cm.has_task("inference"):
return "inference"

raise ValueError("Control message does not have a valid task.")

# Create router node to fork the pipeline.
router = Router(builder, "router", router_keys=["training", "inference"], key_fn=router_key_fn)

# Make an edge between modules
builder.make_edge(fsspec_dataloader_module.output_port("output"), broadcast)
builder.make_edge(broadcast, dfp_training_pipe_module.input_port("input"))
builder.make_edge(broadcast, dfp_inference_pipe_module.input_port("input"))
builder.make_edge(fsspec_dataloader_module.output_port("output"), router)
builder.make_edge(router.get_source("training"), dfp_training_pipe_module.input_port("input"))
builder.make_edge(router.get_source("inference"), dfp_inference_pipe_module.input_port("input"))

out_nodes = [dfp_training_pipe_module.output_port("output"), dfp_inference_pipe_module.output_port("output")]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,14 @@ def dfp_inference_pipe(builder: mrc.Builder):
ts_column_name = config.get("timestamp_column_name")
monitor_options = config.get("monitor_options", {})

preproc_monitor_options = monitor_options.copy()
if "name_postfix" not in preproc_monitor_options:
preproc_monitor_options["name_postfix"] = "[inference_pipe]"

preproc_options = {
"batching_options": config.get("batching_options", {}),
"cache_dir": cache_dir,
"monitor_options": monitor_options,
"pre_filter_options": {
"enable_task_filtering": True, "filter_task_type": "inference"
},
"monitor_options": preproc_monitor_options,
"timestamp_column_name": ts_column_name,
"user_splitting_options": config.get("user_splitting_options", {}),
}
Expand Down
Loading
Loading