diff --git a/morpheus/pipeline/pipeline.py b/morpheus/pipeline/pipeline.py index 180a9a2188..6f719e4d54 100644 --- a/morpheus/pipeline/pipeline.py +++ b/morpheus/pipeline/pipeline.py @@ -458,7 +458,7 @@ async def build_and_start(self): self.build() except Exception: logger.exception("Error occurred during Pipeline.build(). Exiting.", exc_info=True) - return + raise await self._start() diff --git a/tests/test_multi_port_pipeline.py b/tests/test_multi_port_pipeline.py index 7b3d8bf8e4..cac48b0bd0 100755 --- a/tests/test_multi_port_pipeline.py +++ b/tests/test_multi_port_pipeline.py @@ -16,27 +16,45 @@ import pytest +import cudf + # When segment modules are imported, they're added to the module registry. # To avoid flake8 warnings about unused code, the noqa flag is used during import. import modules.multiplexer # noqa: F401 # pylint: disable=unused-import from _utils.dataset_manager import DatasetManager +from morpheus.config import Config from morpheus.pipeline.pipeline import Pipeline from morpheus.stages.general.multi_port_modules_stage import MultiPortModulesStage from morpheus.stages.input.in_memory_source_stage import InMemorySourceStage from morpheus.stages.output.in_memory_sink_stage import InMemorySinkStage +def _run_pipeline(config: Config, source_df: cudf.DataFrame, module_conf: dict, + stage_input_ports: list[str]) -> InMemorySinkStage: + pipe = Pipeline(config) + + mux_stage = pipe.add_stage( + MultiPortModulesStage(config, module_conf, input_ports=stage_input_ports, output_ports=["output"])) + + for x in range(len(stage_input_ports)): + source_stage = pipe.add_stage(InMemorySourceStage(config, [source_df.copy(deep=True)])) + pipe.add_edge(source_stage, mux_stage.input_ports[x]) + + sink_stage = pipe.add_stage(InMemorySinkStage(config)) + + pipe.add_edge(mux_stage, sink_stage) + + pipe.run() + + return sink_stage + + @pytest.mark.parametrize("source_count, expected_count", [(1, 1), (2, 2), (3, 3)]) -def test_multi_port_pipeline(config, dataset_cudf: DatasetManager, source_count, expected_count): +def test_multi_port_pipeline(config: Config, dataset_cudf: DatasetManager, source_count, expected_count): filter_probs_df = dataset_cudf["filter_probs.csv"] - pipe = Pipeline(config) - - input_ports = [] - for x in range(source_count): - input_port = f"input_{x}" - input_ports.append(input_port) + input_ports = [f"input_{x}" for x in range(source_count)] multiplexer_module_conf = { "module_id": "multiplexer", @@ -46,17 +64,29 @@ def test_multi_port_pipeline(config, dataset_cudf: DatasetManager, source_count, "output_port": "output" } - mux_stage = pipe.add_stage( - MultiPortModulesStage(config, multiplexer_module_conf, input_ports=input_ports, output_ports=["output"])) + sink_stage = _run_pipeline(config=config, + source_df=filter_probs_df, + module_conf=multiplexer_module_conf, + stage_input_ports=input_ports) + assert len(sink_stage.get_messages()) == expected_count - for x in range(source_count): - source_stage = pipe.add_stage(InMemorySourceStage(config, [filter_probs_df])) - pipe.add_edge(source_stage, mux_stage.input_ports[x]) - sink_stage = pipe.add_stage(InMemorySinkStage(config)) +def test_multi_port_pipeline_mis_config(config: Config, dataset_cudf: DatasetManager): + config_input_ports = ["input_0", "input_1"] + stage_input_ports = ["input_0", "input_1", "input_2"] - pipe.add_edge(mux_stage, sink_stage) + filter_probs_df = dataset_cudf["filter_probs.csv"] - pipe.run() + multiplexer_module_conf = { + "module_id": "multiplexer", + "namespace": "morpheus_test", + "module_name": "multiplexer", + "input_ports": config_input_ports, + "output_port": "output" + } - assert len(sink_stage.get_messages()) == expected_count + with pytest.raises(ValueError): + _run_pipeline(config=config, + source_df=filter_probs_df, + module_conf=multiplexer_module_conf, + stage_input_ports=stage_input_ports) diff --git a/tests/test_multi_segment.py b/tests/test_multi_segment.py index 1d79990586..21ccd970a8 100644 --- a/tests/test_multi_segment.py +++ b/tests/test_multi_segment.py @@ -36,7 +36,6 @@ def test_linear_boundary_stages(config, filter_probs_df): assert_results(comp_stage.get_results()) -@pytest.mark.skip(reason="Skipping due to MRC issue #360") @pytest.mark.use_cudf def test_multi_segment_bad_data_type(config, filter_probs_df): with pytest.raises(RuntimeError):