Skip to content

Commit

Permalink
Finalize removing MultiMessage from Morpheus (#1886)
Browse files Browse the repository at this point in the history
Should be merged after merging #1803.

The Final step of completely remove `MultiMessage` from the whole Morpheus repo.

This PR removes `MultiMessage` and its sub-classes. All occurrences of `MultiMessage` and its sub-classes in examples and docs are updated to `ControlMessage`.

Closes #1802 

## By Submitting this PR I confirm:
- I am familiar with the [Contributing Guidelines](https://github.com/nv-morpheus/Morpheus/blob/main/docs/source/developer_guide/contributing.md).
- When the PR is ready for review, new or existing tests cover these changes.
- When the PR is ready for review, the documentation is up to date with these changes.

Authors:
  - Yuchen Zhang (https://github.com/yczhang-nv)
  - Michael Demoret (https://github.com/mdemoret-nv)

Approvers:
  - David Gardner (https://github.com/dagardner-nv)
  - Michael Demoret (https://github.com/mdemoret-nv)

URL: #1886
  • Loading branch information
yczhang-nv authored Sep 12, 2024
1 parent 381097e commit 64482ee
Show file tree
Hide file tree
Showing 93 changed files with 545 additions and 7,371 deletions.
26 changes: 10 additions & 16 deletions docs/source/basics/building_a_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ Configuring Pipeline via CLI
Starting pipeline via CLI... Ctrl+C to Quit
Config:
{
"_model_max_batch_size": 8,
"_pipeline_batch_size": 256,
"ae": null,
"class_labels": [],
"debug": false,
Expand All @@ -75,31 +77,23 @@ Config:
"log_config_file": null,
"log_level": 10,
"mode": "OTHER",
"model_max_batch_size": 8,
"num_threads": 64,
"pipeline_batch_size": 256,
"plugins": []
}
CPP Enabled: True
====Registering Pipeline====
====Building Pipeline====
====Building Pipeline Complete!====
Starting! Time: 1689786614.4988477
====Registering Pipeline Complete!====
====Starting Pipeline====
====Pipeline Started====
====Building Segment: linear_segment_0====
Added source: <from-file-0; FileSourceStage(filename=examples/data/pcap_dump.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True)>
Added source: <from-file-0; FileSourceStage(filename=examples/data/pcap_dump.jsonlines, iterative=False, file_type=FileTypes.Auto, repeat=1, filter_null=True, filter_null_columns=(), parser_kwargs={})>
└─> morpheus.MessageMeta
Added stage: <deserialize-1; DeserializeStage(ensure_sliceable_index=True)>
└─ morpheus.MessageMeta -> morpheus.MultiMessage
Added stage: <serialize-2; SerializeStage(include=(), exclude=('^ID$', '^_ts_'), fixed_columns=True)>
└─ morpheus.MultiMessage -> morpheus.MessageMeta
Added stage: <deserialize-1; DeserializeStage(ensure_sliceable_index=True, task_type=None, task_payload=None)>
└─ morpheus.MessageMeta -> morpheus.ControlMessage
Added stage: <serialize-2; SerializeStage(include=(), exclude=(), fixed_columns=True)>
└─ morpheus.ControlMessage -> morpheus.MessageMeta
Added stage: <to-file-3; WriteToFileStage(filename=.tmp/temp_out.json, overwrite=True, file_type=FileTypes.Auto, include_index_col=True, flush=False)>
└─ morpheus.MessageMeta -> morpheus.MessageMeta
====Building Segment Complete!====
====Pipeline Started====
====Pipeline Complete====
Pipeline visualization saved to .tmp/simple_identity.png
```

### Pipeline Build Checks
Expand All @@ -113,10 +107,10 @@ morpheus --log_level=DEBUG run pipeline-other \

Then the following error displays:
```
RuntimeError: The to-file stage cannot handle input of <class 'morpheus.messages.multi_message.MultiMessage'>. Accepted input types: (<class 'morpheus.messages.message_meta.MessageMeta'>,)
RuntimeError: The to-file stage cannot handle input of <class 'morpheus._lib.messages.ControlMessage'>. Accepted input types: (<class 'morpheus.messages.message_meta.MessageMeta'>,)
```

This indicates that the ``to-file`` stage cannot accept the input type of `morpheus.messages.multi_message.MultiMessage`. This is because the ``to-file`` stage has no idea how to write that class to a file; it only knows how to write instances of `morpheus.messages.message_meta.MessageMeta`. To ensure you have a valid pipeline, examine the `Accepted input types: (<class 'morpheus.messages.message_meta.MessageMeta'>,)` portion of the message. This indicates you need a stage that converts from the output type of the `deserialize` stage, `MultiMessage`, to `MessageMeta`, which is exactly what the `serialize` stage does.
This indicates that the ``to-file`` stage cannot accept the input type of `morpheus.messages.ControlMessage`. This is because the ``to-file`` stage has no idea how to write that class to a file; it only knows how to write instances of `morpheus.messages.message_meta.MessageMeta`. To ensure you have a valid pipeline, examine the `Accepted input types: (<class 'morpheus.messages.message_meta.MessageMeta'>,)` portion of the message. This indicates you need a stage that converts from the output type of the `deserialize` stage, `ControlMessage`, to `MessageMeta`, which is exactly what the `serialize` stage does.

### Kafka Source Example
The above example essentially just copies a file. However, it is an important to note that most Morpheus pipelines are similar in structure, in that they begin with a source stage (`from-file`) followed by a `deserialize` stage, end with a `serialize` stage followed by a sink stage (`to-file`), with the actual training or inference logic occurring in between.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ Source: `morpheus/modules/mlflow_model_writer.py`

The `mlflow_model_writer` module is responsible for uploading trained models to the MLflow server.

For each `MultiAEMessage` received, containing a trained model, the function uploads the model to MLflow along with associated metadata such as experiment name, run name, parameters, metrics, and the model signature. If the MLflow server is running on Databricks, the function also applies the required permissions to the registered model.
For each `ControlMessage` received, containing a trained model, the function uploads the model to MLflow along with associated metadata such as experiment name, run name, parameters, metrics, and the model signature. If the MLflow server is running on Databricks, the function also applies the required permissions to the registered model.

For a complete reference, refer to: [MLflow Model Writer](../../modules/core/mlflow_model_writer.md)

Expand Down Expand Up @@ -460,9 +460,9 @@ For a complete reference, refer to: [DFP Post Processing](../../modules/examples

Source: `morpheus/modules/serialize.py`

The serialize module function is responsible for filtering columns from a `MultiMessage` object and emitting a `MessageMeta` object.
The serialize module function is responsible for filtering columns from a `ControlMessage` object and emitting a `MessageMeta` object.

The `convert_to_df` function converts a DataFrame to JSON lines. It takes a `MultiMessage` instance, `include_columns` (a pattern for columns to include), `exclude_columns` (a list of patterns for columns to exclude), and `columns` (a list of columns to include). The function filters the columns of the input DataFrame based on the include and exclude patterns and retrieves the metadata of the filtered columns.
The `convert_to_df` function converts a DataFrame to JSON lines. It takes a `ControlMessage` instance, `include_columns` (a pattern for columns to include), `exclude_columns` (a list of patterns for columns to exclude), and `columns` (a list of columns to include). The function filters the columns of the input DataFrame based on the include and exclude patterns and retrieves the metadata of the filtered columns.

The module function compiles the include and exclude patterns into regular expressions. It then creates a node using the `convert_to_df` function with the compiled include and exclude patterns and the specified columns.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ There are four methods that need to be defined in our new subclass to implement
return "pass-thru"
```

The `accepted_types` method returns a tuple of message classes that this stage is able to accept as input. Morpheus uses this to validate that the parent of this stage emits a message that this stage can accept. Since our stage is a pass through, we will declare that we can accept any incoming message type. Note that production stages will often declare only a single Morpheus message class such as `MessageMeta` or `MultiMessage` (refer to the message classes defined in `morpheus.pipeline.messages` for a complete list).
The `accepted_types` method returns a tuple of message classes that this stage is able to accept as input. Morpheus uses this to validate that the parent of this stage emits a message that this stage can accept. Since our stage is a pass through, we will declare that we can accept any incoming message type. Note that production stages will often declare only a single Morpheus message class such as `MessageMeta` or `ControlMessage` (refer to the message classes defined in `morpheus.messages` for a complete list).
```python
def accepted_types(self) -> tuple:
return (typing.Any,)
Expand Down
4 changes: 2 additions & 2 deletions docs/source/developer_guide/guides/2_real_world_phishing.md
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ pipeline.add_stage(AddScoresStage(config, labels=["is_phishing"]))

Lastly, we will save our results to disk. For this purpose, we are using two stages that are often used in conjunction with each other: `SerializeStage` and `WriteToFileStage`.

The `SerializeStage` is used to include and exclude columns as desired in the output. Importantly, it also handles conversion from the `MultiMessage`-derived output type to the `MessageMeta` class that is expected as input by the `WriteToFileStage`.
The `SerializeStage` is used to include and exclude columns as desired in the output. Importantly, it also handles conversion from `ControlMessage` output type to the `MessageMeta` class that is expected as input by the `WriteToFileStage`.

The `WriteToFileStage` will append message data to the output file as messages are received. Note however that for performance reasons the `WriteToFileStage` does not flush its contents out to disk every time a message is received. Instead, it relies on the underlying [buffered output stream](https://gcc.gnu.org/onlinedocs/libstdc++/manual/streambufs.html) to flush as needed, and then will close the file handle on shutdown.

Expand Down Expand Up @@ -889,7 +889,7 @@ class RabbitMQSourceStage(PreallocatorMixin, SingleOutputSource):
```

### Function Based Approach
Similar to the `stage` decorator used in previous examples Morpheus provides a `source` decorator which wraps a generator function to be used as a source stage. In the class based approach we explicitly added the `PreallocatorMixin`, when using the `source` decorator the return type annotation will be inspected and a stage will be created with the `PreallocatorMixin` if the return type is a `DataFrame` type or a message which contains a `DataFrame` (`MessageMeta` and `MultiMessage`).
Similar to the `stage` decorator used in previous examples Morpheus provides a `source` decorator which wraps a generator function to be used as a source stage. In the class based approach we explicitly added the `PreallocatorMixin`, when using the `source` decorator the return type annotation will be inspected and a stage will be created with the `PreallocatorMixin` if the return type is a `DataFrame` type or a message which contains a `DataFrame` (`MessageMeta` and `ControlMessage`).

The code for the function will first perform the same setup as was used in the class constructor, then entering a nearly identical loop as that in the `source_generator` method.

Expand Down
Loading

0 comments on commit 64482ee

Please sign in to comment.