Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement MultiProcessingStage #1878

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
7951324
initial commit for shared_process_pool
yczhang-nv Aug 20, 2024
f8037af
fix CI
yczhang-nv Aug 20, 2024
6a48cf3
Merge remote-tracking branch 'upstream/branch-24.10' into impl-multi-…
yczhang-nv Aug 21, 2024
1443d11
Add performance verification tests to shared_process_pool
yczhang-nv Aug 22, 2024
86cbed8
Add tests and fix shutdown of shared_process_pool
yczhang-nv Aug 22, 2024
21fd8e8
Initial impl of multi_processing_stage
yczhang-nv Aug 23, 2024
46865ae
initial test for test_multi_processing_stage
yczhang-nv Aug 24, 2024
406dbeb
fix unit test for multi_processing_stage
yczhang-nv Aug 27, 2024
b073b7d
Merge remote-tracking branch 'upstream/branch-24.10' into impl-multi-…
yczhang-nv Aug 29, 2024
f7b7280
add unit tests for shared_process_pool
yczhang-nv Aug 29, 2024
40234e2
Fix SharedProcessPool unit test
yczhang-nv Aug 30, 2024
d92ef9c
fix multi_processing_stage unit test
yczhang-nv Sep 5, 2024
26f8b0f
add dataclass
yczhang-nv Sep 9, 2024
1c3666c
add result_collection_loop
yczhang-nv Sep 10, 2024
5fca573
add threading, need debug
yczhang-nv Sep 10, 2024
8108fd4
Revise shared_process_pool design
yczhang-nv Sep 16, 2024
31e2247
Add pool.status, need debug on stop elegantly
yczhang-nv Sep 16, 2024
6a7cf6e
ut passed
yczhang-nv Sep 17, 2024
fb2e877
Improved and passed unit tests
yczhang-nv Sep 18, 2024
be6eff1
Merge remote-tracking branch 'upstream/branch-24.10' into impl-multi-…
yczhang-nv Sep 18, 2024
dcda484
Fix formats
yczhang-nv Sep 18, 2024
16d46ba
fix format
yczhang-nv Sep 18, 2024
41de9df
update multi_processing_stage
yczhang-nv Sep 18, 2024
5ae1fbd
fix comments
yczhang-nv Sep 19, 2024
546efdf
Fix pool status
yczhang-nv Sep 20, 2024
506c0cf
fix test
yczhang-nv Sep 20, 2024
4c97e74
fix format
yczhang-nv Sep 20, 2024
a3e74f4
Merge branch 'branch-24.10' of github.com:nv-morpheus/Morpheus into i…
yczhang-nv Sep 20, 2024
ee8633e
fix CI
yczhang-nv Sep 20, 2024
63675d7
fix comments and CI
yczhang-nv Sep 20, 2024
c37aa39
fix type annotation
yczhang-nv Sep 20, 2024
65f9942
Merge branch 'branch-24.10' into impl-multi-process-stage
yczhang-nv Sep 24, 2024
51acd87
Merge remote-tracking branch 'upstream/branch-24.10' into impl-multi-…
yczhang-nv Sep 25, 2024
8896230
fix comments
yczhang-nv Sep 25, 2024
790701e
fix test
yczhang-nv Sep 25, 2024
5e21087
fix format
yczhang-nv Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 200 additions & 0 deletions python/morpheus/morpheus/stages/general/multi_processing_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import functools
import inspect
import typing
from abc import abstractmethod

import mrc
import mrc.core.operators as ops

from morpheus.config import Config
from morpheus.pipeline.single_port_stage import SinglePortStage
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.utils.shared_process_pool import SharedProcessPool

InputT = typing.TypeVar('InputT')
OutputT = typing.TypeVar('OutputT')


class MultiProcessingBaseStage(SinglePortStage, typing.Generic[InputT, OutputT]):

def __init__(self, *, c: Config, process_pool_usage: float, max_in_flight_messages: int = None):
super().__init__(c=c)

if not 0 <= process_pool_usage <= 1:
raise ValueError("process_pool_usage must be between 0 and 1.")
self._process_pool_usage = process_pool_usage
yczhang-nv marked this conversation as resolved.
Show resolved Hide resolved

self._shared_process_pool = SharedProcessPool()
self._shared_process_pool.wait_until_ready()

if max_in_flight_messages is None:
# set the multiplier to 1.5 to keep the workers busy
self._max_in_flight_messages = int(self._shared_process_pool.total_max_workers * 1.5)
else:
self._max_in_flight_messages = max_in_flight_messages

def accepted_types(self) -> typing.Tuple:
"""
There are two approaches to inherit from this class:
- With generic types: MultiProcessingDerivedStage(MultiProcessingBaseStage[InputT, OutputT])
- With concrete types: MultiProcessingDerivedStage(MultiProcessingBaseStage[int, str])

When inheriting with generic types, the derived class can be instantiated like this:

stage = MultiProcessingDerivedStage[int, str]()

In this case, typing.Generic stores the stage type in stage.__orig_class__, the concrete types can be accessed
as below:

input_type = typing.get_args(stage.__orig_class__)[0] # int
output_type = typing.get_args(stage.__orig_class__)[1] # str

However, when instantiating a stage which inherits with concrete types:

stage = MultiProcessingDerivedStage()

The stage instance does not have __orig_class__ attribute (since it is not a generic type). Thus, the concrete
types need be retrieved from its base class (which is a generic type):

input_type = typing.get_args(stage.__orig_bases__[0])[0] # int
output_type = typing.get_args(stage.__orig_bases__[0])[1] # str

Raises:
RuntimeError: if the accepted cannot be deducted from either __orig_class__ or __orig_bases__

Returns:
typing.Tuple: accepted input types
"""
if hasattr(self, "__orig_class__"):
# inherited with generic types
input_type = typing.get_args(self.__orig_class__)[0] # pylint: disable=no-member

elif hasattr(self, "__orig_bases__"):
# inherited with concrete types
input_type = typing.get_args(self.__orig_bases__[0])[0] # pylint: disable=no-member

else:
raise RuntimeError("Could not deduct input type")

return (input_type, )

def compute_schema(self, schema: StageSchema):
"""
See the comment on `accepted_types` for more information on accessing the input and output types.

Args:
schema (StageSchema): StageSchema

Raises:
RuntimeError: if the output type cannot be deducted from either __orig_class__ or __orig_bases__
"""
if hasattr(self, "__orig_class__"):
# inherited with abstract types
output_type = typing.get_args(self.__orig_class__)[1] # pylint: disable=no-member

elif hasattr(self, "__orig_bases__"):
# inherited with concrete types
output_type = typing.get_args(self.__orig_bases__[0])[1]

else:
raise RuntimeError("Could not deduct output type")

schema.output_schema.set_type(output_type)

def supports_cpp_node(self):
return False

@abstractmethod
def _on_data(self, data: InputT) -> OutputT:
pass

def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_node(self.name, ops.map(self._on_data))
node.launch_options.pe_count = self._max_in_flight_messages

builder.make_edge(input_node, node)

return node


def _get_func_signature(func: typing.Callable[[InputT], OutputT]) -> tuple[type, type]:
signature = inspect.signature(func)

if isinstance(func, functools.partial):
# If the function is a partial, find the type of the first unbound argument
bound_args = func.keywords
input_arg = None

for param in signature.parameters.values():
if param.name not in bound_args:
if input_arg is not None:
raise ValueError("Found more than one unbound arguments in partial function")
input_arg = param

if input_arg is None:
raise ValueError("Cannot find unbound argument in partial function")
input_t = input_arg.annotation

else:
if len(signature.parameters) != 1:
raise ValueError("Function must have exactly one argument")

input_t = next(iter(signature.parameters.values())).annotation

output_t = signature.return_annotation

return (input_t, output_t)


class MultiProcessingStage(MultiProcessingBaseStage[InputT, OutputT]):

def __init__(self,
*,
c: Config,
unique_name: str,
process_fn: typing.Callable[[InputT], OutputT],
process_pool_usage: float,
max_in_flight_messages: int = None):
super().__init__(c=c, process_pool_usage=process_pool_usage, max_in_flight_messages=max_in_flight_messages)

self._name = unique_name
self._process_fn = process_fn
self._shared_process_pool.set_usage(self.name, self._process_pool_usage)

@property
def name(self) -> str:
return self._name

def _on_data(self, data: InputT) -> OutputT:
task = self._shared_process_pool.submit_task(self.name, self._process_fn, data)
result = task.result()

return result

@staticmethod
def create(*,
c: Config,
unique_name: str,
process_fn: typing.Callable[[InputT], OutputT],
process_pool_usage: float):

input_t, output_t = _get_func_signature(process_fn)
return MultiProcessingStage[input_t, output_t](c=c,
unique_name=unique_name,
process_pool_usage=process_pool_usage,
process_fn=process_fn)
Loading
Loading