From 08ed2a05f5e3dbcc3285afaf32800a58664b5abe Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 22 Feb 2024 12:34:12 -0800 Subject: [PATCH 1/5] Add test param for #1530 --- tests/test_file_in_out.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/tests/test_file_in_out.py b/tests/test_file_in_out.py index 7100aea514..bd0d06d6c2 100755 --- a/tests/test_file_in_out.py +++ b/tests/test_file_in_out.py @@ -16,6 +16,8 @@ import filecmp import os +import pathlib +import typing import numpy as np import pytest @@ -24,6 +26,7 @@ from _utils import assert_path_exists from _utils.dataset_manager import DatasetManager from morpheus.common import FileTypes +from morpheus.config import Config from morpheus.config import CppConfig from morpheus.io.deserializers import read_file_to_df from morpheus.io.serializers import write_df_to_file @@ -39,11 +42,22 @@ @pytest.mark.slow @pytest.mark.parametrize("input_type", ["csv", "jsonlines", "parquet"]) +@pytest.mark.parametrize("use_pathlib", [False, True]) @pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"]) @pytest.mark.parametrize("flush", [False, True], ids=["no_flush", "flush"]) @pytest.mark.parametrize("repeat", [1, 2, 5], ids=["repeat1", "repeat2", "repeat5"]) -def test_file_rw_pipe(tmp_path, config, input_type, output_type, flush, repeat: int): +def test_file_rw_pipe(tmp_path: pathlib.Path, + config: Config, + input_type: str, + use_pathlib: bool, + output_type: str, + flush: bool, + repeat: int): input_file = os.path.join(TEST_DIRS.tests_data_dir, f'filter_probs.{input_type}') + + if use_pathlib: + input_file = pathlib.Path(input_file) + validation_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") out_file = os.path.join(tmp_path, f'results.{output_type}') @@ -77,7 +91,7 @@ def test_file_rw_pipe(tmp_path, config, input_type, output_type, flush, repeat: assert output_data.tolist() == validation_data.tolist() -def test_file_read_json(config): +def test_file_read_json(config: Config): src_file = os.path.join(TEST_DIRS.tests_data_dir, "simple.json") pipe = LinearPipeline(config) @@ -98,7 +112,7 @@ def test_file_read_json(config): @pytest.mark.slow @pytest.mark.use_python @pytest.mark.usefixtures("chdir_tmpdir") -def test_to_file_no_path(tmp_path, config): +def test_to_file_no_path(tmp_path: pathlib.Path, config: Config): """ Test to ensure issue #48 is fixed """ @@ -119,7 +133,7 @@ def test_to_file_no_path(tmp_path, config): @pytest.mark.slow @pytest.mark.parametrize("input_type", ["csv", "jsonlines", "parquet"]) @pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"]) -def test_file_rw_multi_segment_pipe(tmp_path, config, input_type, output_type): +def test_file_rw_multi_segment_pipe(tmp_path: pathlib.Path, config: Config, input_type: str, output_type: str): input_file = os.path.join(TEST_DIRS.tests_data_dir, f'filter_probs.{input_type}') validation_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") out_file = os.path.join(tmp_path, f'results.{output_type}') @@ -156,7 +170,7 @@ def test_file_rw_multi_segment_pipe(tmp_path, config, input_type, output_type): os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv"), os.path.join(TEST_DIRS.tests_data_dir, "filter_probs_w_id_col.csv") ]) -def test_file_rw_index_pipe(tmp_path, config, input_file): +def test_file_rw_index_pipe(tmp_path: pathlib.Path, config: Config, input_file: str): out_file = os.path.join(tmp_path, 'results.csv') pipe = LinearPipeline(config) @@ -183,7 +197,7 @@ def test_file_rw_index_pipe(tmp_path, config, input_file): }), (os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.jsonlines"), {})], ids=["CSV", "CSV_ID", "JSON"]) @pytest.mark.usefixtures("use_cpp") -def test_file_roundtrip(tmp_path, input_file, extra_kwargs): +def test_file_roundtrip(tmp_path: pathlib.Path, input_file: str, extra_kwargs: dict[str, typing.Any]): # Output file should be same type as input out_file = os.path.join(tmp_path, f'results{os.path.splitext(input_file)[1]}') @@ -222,7 +236,7 @@ def test_read_cpp_compare(input_file: str): @pytest.mark.slow @pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"]) -def test_file_rw_serialize_deserialize_pipe(tmp_path, config, output_type): +def test_file_rw_serialize_deserialize_pipe(tmp_path: pathlib.Path, config: Config, output_type: str): input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") out_file = os.path.join(tmp_path, f'results.{output_type}') @@ -252,7 +266,7 @@ def test_file_rw_serialize_deserialize_pipe(tmp_path, config, output_type): @pytest.mark.slow @pytest.mark.parametrize("output_type", ["csv", "json", "jsonlines"]) -def test_file_rw_serialize_deserialize_multi_segment_pipe(tmp_path, config, output_type): +def test_file_rw_serialize_deserialize_multi_segment_pipe(tmp_path: pathlib.Path, config: Config, output_type: str): input_file = os.path.join(TEST_DIRS.tests_data_dir, "filter_probs.csv") out_file = os.path.join(tmp_path, f'results.{output_type}') From a1c5d00a6b3b6e5b67f0c1a1e8b65e7aa8aa8e52 Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 22 Feb 2024 12:46:24 -0800 Subject: [PATCH 2/5] Add test for determine_file_type --- tests/common/test_determine_file_type.py | 32 ++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 tests/common/test_determine_file_type.py diff --git a/tests/common/test_determine_file_type.py b/tests/common/test_determine_file_type.py new file mode 100644 index 0000000000..d3e71af72b --- /dev/null +++ b/tests/common/test_determine_file_type.py @@ -0,0 +1,32 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pathlib + +import pytest + +from morpheus.common import FileTypes +from morpheus.common import determine_file_type + + +@pytest.mark.parametrize("use_pathlib", [False, True]) +@pytest.mark.parametrize("ext, expected_result", + [("csv", FileTypes.CSV), ("json", FileTypes.JSON), ("jsonlines", FileTypes.JSON), + ("parquet", FileTypes.PARQUET)]) +def test_determine_file_type(ext: str, expected_result: FileTypes, use_pathlib: bool): + file_path = f"test.{ext}" + if use_pathlib: + file_path = pathlib.Path(file_path) + + assert determine_file_type(file_path) == expected_result From 53b77497bfd3810afa3ce5429bf422bceba494fd Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 22 Feb 2024 14:21:49 -0800 Subject: [PATCH 3/5] Add std::filesystem::path overrides for determine_file_type and the python bindings for the file source stage --- morpheus/_lib/common/__init__.pyi | 5 +++ morpheus/_lib/common/module.cpp | 38 +++++++++++++++---- .../include/morpheus/objects/file_types.hpp | 10 +++++ .../include/morpheus/stages/file_source.hpp | 6 +++ morpheus/_lib/src/objects/file_types.cpp | 5 +++ morpheus/_lib/src/stages/file_source.cpp | 11 ++++++ morpheus/_lib/stages/__init__.pyi | 4 ++ morpheus/_lib/stages/module.cpp | 22 ++++++++--- 8 files changed, 89 insertions(+), 12 deletions(-) diff --git a/morpheus/_lib/common/__init__.pyi b/morpheus/_lib/common/__init__.pyi index f6e94c7468..3a27d05002 100644 --- a/morpheus/_lib/common/__init__.pyi +++ b/morpheus/_lib/common/__init__.pyi @@ -7,6 +7,7 @@ from __future__ import annotations import morpheus._lib.common import typing +import os __all__ = [ "FiberQueue", @@ -190,6 +191,10 @@ class TypeId(): UINT8: morpheus._lib.common.TypeId # value = __members__: dict # value = {'EMPTY': , 'INT8': , 'INT16': , 'INT32': , 'INT64': , 'UINT8': , 'UINT16': , 'UINT32': , 'UINT64': , 'FLOAT32': , 'FLOAT64': , 'BOOL8': , 'STRING': } pass +@typing.overload +def determine_file_type(filename: os.PathLike) -> FileTypes: + pass +@typing.overload def determine_file_type(filename: str) -> FileTypes: pass def read_file_to_df(filename: str, file_type: FileTypes = FileTypes.Auto) -> object: diff --git a/morpheus/_lib/common/module.cpp b/morpheus/_lib/common/module.cpp index 397fec2341..444b35ad57 100644 --- a/morpheus/_lib/common/module.cpp +++ b/morpheus/_lib/common/module.cpp @@ -36,8 +36,10 @@ #include #include #include -#include // for return_value_policy::reference +#include // for return_value_policy::reference +#include // for pathlib.Path -> std::filesystem::path conversions +#include // for std::filesystem::path #include #include #include @@ -58,13 +60,29 @@ PYBIND11_MODULE(common, _module) CudfHelper::load(); LoaderRegistry::register_factory_fn( - "file", [](nlohmann::json config) { return std::make_unique(config); }, false); + "file", + [](nlohmann::json config) { + return std::make_unique(config); + }, + false); LoaderRegistry::register_factory_fn( - "grpc", [](nlohmann::json config) { return std::make_unique(config); }, false); + "grpc", + [](nlohmann::json config) { + return std::make_unique(config); + }, + false); LoaderRegistry::register_factory_fn( - "payload", [](nlohmann::json config) { return std::make_unique(config); }, false); + "payload", + [](nlohmann::json config) { + return std::make_unique(config); + }, + false); LoaderRegistry::register_factory_fn( - "rest", [](nlohmann::json config) { return std::make_unique(config); }, false); + "rest", + [](nlohmann::json config) { + return std::make_unique(config); + }, + false); py::class_(_module, "Tensor") .def_property_readonly("__cuda_array_interface__", &TensorObjectInterfaceProxy::cuda_array_interface) @@ -106,9 +124,15 @@ PYBIND11_MODULE(common, _module) .value("CSV", FileTypes::CSV) .value("PARQUET", FileTypes::PARQUET); - _module.def("typeid_to_numpy_str", [](TypeId tid) { return DType(tid).type_str(); }); + _module.def("typeid_to_numpy_str", [](TypeId tid) { + return DType(tid).type_str(); + }); - _module.def("determine_file_type", &determine_file_type, py::arg("filename")); + _module.def( + "determine_file_type", py::overload_cast(&determine_file_type), py::arg("filename")); + _module.def("determine_file_type", + py::overload_cast(&determine_file_type), + py::arg("filename")); _module.def("read_file_to_df", &read_file_to_df, py::arg("filename"), py::arg("file_type") = FileTypes::Auto); _module.def("write_df_to_file", &SerializersProxy::write_df_to_file, diff --git a/morpheus/_lib/include/morpheus/objects/file_types.hpp b/morpheus/_lib/include/morpheus/objects/file_types.hpp index 54b59a5b35..329ade91ef 100644 --- a/morpheus/_lib/include/morpheus/objects/file_types.hpp +++ b/morpheus/_lib/include/morpheus/objects/file_types.hpp @@ -18,6 +18,7 @@ #pragma once #include +#include // for path #include #include #include @@ -84,6 +85,15 @@ static inline std::ostream& operator<<(std::ostream& os, const FileTypes& f) */ FileTypes determine_file_type(const std::string& filename); +/** + * @brief Determines the file type from a filename based on extension. For example, my_file.json would return + * `FileTypes::JSON`. + * + * @param filename path to a file. Does not need to exist + * @return FileTypes + */ +FileTypes determine_file_type(const std::filesystem::path& filename); + #pragma GCC visibility pop /** @} */ // end of group diff --git a/morpheus/_lib/include/morpheus/stages/file_source.hpp b/morpheus/_lib/include/morpheus/stages/file_source.hpp index 4818f2e0ef..a7f1a2f2ef 100644 --- a/morpheus/_lib/include/morpheus/stages/file_source.hpp +++ b/morpheus/_lib/include/morpheus/stages/file_source.hpp @@ -31,6 +31,7 @@ #include #include // for apply, make_subscriber, observable_member, is_on_error<>::not_void, is_on_next_of<>::not_void, trace_activity +#include // for path #include #include #include @@ -98,6 +99,11 @@ struct FileSourceStageInterfaceProxy std::string filename, int repeat = 1, pybind11::dict parser_kwargs = pybind11::dict()); + static std::shared_ptr> init(mrc::segment::Builder& builder, + const std::string& name, + std::filesystem::path filename, + int repeat = 1, + pybind11::dict parser_kwargs = pybind11::dict()); }; #pragma GCC visibility pop /** @} */ // end of group diff --git a/morpheus/_lib/src/objects/file_types.cpp b/morpheus/_lib/src/objects/file_types.cpp index b573842e62..20f8ca956e 100644 --- a/morpheus/_lib/src/objects/file_types.cpp +++ b/morpheus/_lib/src/objects/file_types.cpp @@ -49,4 +49,9 @@ FileTypes determine_file_type(const std::string& filename) } } +FileTypes determine_file_type(const std::filesystem::path& filename) +{ + return determine_file_type(filename.string()); +} + } // namespace morpheus diff --git a/morpheus/_lib/src/stages/file_source.cpp b/morpheus/_lib/src/stages/file_source.cpp index bae39fc37c..ffcd976d4d 100644 --- a/morpheus/_lib/src/stages/file_source.cpp +++ b/morpheus/_lib/src/stages/file_source.cpp @@ -37,6 +37,7 @@ #include // for str_attr_accessor #include // for pybind11::int_ +#include #include #include #include @@ -133,4 +134,14 @@ std::shared_ptr> FileSourceStageInterfaceP return stage; } + +std::shared_ptr> FileSourceStageInterfaceProxy::init( + mrc::segment::Builder& builder, + const std::string& name, + std::filesystem::path filename, + int repeat, + pybind11::dict parser_kwargs) +{ + return init(builder, name, filename.string(), repeat, std::move(parser_kwargs)); +} } // namespace morpheus diff --git a/morpheus/_lib/stages/__init__.pyi b/morpheus/_lib/stages/__init__.pyi index 580a7a8357..2b40565087 100644 --- a/morpheus/_lib/stages/__init__.pyi +++ b/morpheus/_lib/stages/__init__.pyi @@ -11,6 +11,7 @@ import typing from morpheus._lib.common import FilterSource import morpheus._lib.common import mrc.core.segment +import os __all__ = [ "AddClassificationsStage", @@ -45,6 +46,9 @@ class DeserializeMultiMessageStage(mrc.core.segment.SegmentObject): def __init__(self, builder: mrc.core.segment.Builder, name: str, batch_size: int, ensure_sliceable_index: bool = True) -> None: ... pass class FileSourceStage(mrc.core.segment.SegmentObject): + @typing.overload + def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: os.PathLike, repeat: int, parser_kwargs: dict) -> None: ... + @typing.overload def __init__(self, builder: mrc.core.segment.Builder, name: str, filename: str, repeat: int, parser_kwargs: dict) -> None: ... pass class FilterDetectionsStage(mrc.core.segment.SegmentObject): diff --git a/morpheus/_lib/stages/module.cpp b/morpheus/_lib/stages/module.cpp index d5f8b6bad0..61dd63a7d0 100644 --- a/morpheus/_lib/stages/module.cpp +++ b/morpheus/_lib/stages/module.cpp @@ -38,12 +38,14 @@ #include #include -#include // for multiple_inheritance -#include // for arg, init, class_, module_, str_attr_accessor, PYBIND11_MODULE, pybind11 -#include // for dict, sequence -#include // for pymrc::import +#include // for multiple_inheritance +#include // for arg, init, class_, module_, str_attr_accessor, PYBIND11_MODULE, pybind11 +#include // for dict, sequence +#include // for pathlib.Path -> std::filesystem::path conversions +#include // for pymrc::import #include +#include // for std::filesystem::path #include #include @@ -107,11 +109,21 @@ PYBIND11_MODULE(stages, _module) py::arg("task_type") = py::none(), py::arg("task_payload") = py::none()); + // py::overload_cast py::class_, mrc::segment::ObjectProperties, std::shared_ptr>>( _module, "FileSourceStage", py::multiple_inheritance()) - .def(py::init<>(&FileSourceStageInterfaceProxy::init), + .def(py::init(py::overload_cast( + &FileSourceStageInterfaceProxy::init)), + py::arg("builder"), + py::arg("name"), + py::arg("filename"), + py::arg("repeat"), + py::arg("parser_kwargs")) + .def(py::init( + py::overload_cast( + &FileSourceStageInterfaceProxy::init)), py::arg("builder"), py::arg("name"), py::arg("filename"), From d92cc762b703d45781b3039feb8c9ac791f863ca Mon Sep 17 00:00:00 2001 From: David Gardner Date: Thu, 22 Feb 2024 14:41:26 -0800 Subject: [PATCH 4/5] IWYU fixes --- morpheus/_lib/common/module.cpp | 5 +++-- morpheus/_lib/include/morpheus/stages/file_source.hpp | 7 ------- morpheus/_lib/src/stages/file_source.cpp | 7 +------ morpheus/_lib/stages/module.cpp | 11 +++++++---- 4 files changed, 11 insertions(+), 19 deletions(-) diff --git a/morpheus/_lib/common/module.cpp b/morpheus/_lib/common/module.cpp index 444b35ad57..0c2ae40914 100644 --- a/morpheus/_lib/common/module.cpp +++ b/morpheus/_lib/common/module.cpp @@ -36,8 +36,9 @@ #include #include #include -#include // for return_value_policy::reference -#include // for pathlib.Path -> std::filesystem::path conversions +#include // for return_value_policy::reference +// for pathlib.Path -> std::filesystem::path conversions +#include // IWYU pragma: keep #include // for std::filesystem::path #include diff --git a/morpheus/_lib/include/morpheus/stages/file_source.hpp b/morpheus/_lib/include/morpheus/stages/file_source.hpp index a7f1a2f2ef..6ed1ea4852 100644 --- a/morpheus/_lib/include/morpheus/stages/file_source.hpp +++ b/morpheus/_lib/include/morpheus/stages/file_source.hpp @@ -20,24 +20,17 @@ #include "morpheus/messages/meta.hpp" #include -#include -#include -#include -#include #include #include -#include #include #include #include // for apply, make_subscriber, observable_member, is_on_error<>::not_void, is_on_next_of<>::not_void, trace_activity #include // for path -#include #include #include #include #include -#include // for vector namespace morpheus { /****** Component public implementations *******************/ diff --git a/morpheus/_lib/src/stages/file_source.cpp b/morpheus/_lib/src/stages/file_source.cpp index ffcd976d4d..84a59f5f12 100644 --- a/morpheus/_lib/src/stages/file_source.cpp +++ b/morpheus/_lib/src/stages/file_source.cpp @@ -17,11 +17,7 @@ #include "morpheus/stages/file_source.hpp" -#include "mrc/node/rx_sink_base.hpp" -#include "mrc/node/rx_source_base.hpp" -#include "mrc/node/source_properties.hpp" #include "mrc/segment/object.hpp" -#include "mrc/types.hpp" #include "pymrc/node.hpp" #include "morpheus/io/deserializers.hpp" @@ -32,13 +28,12 @@ #include #include #include -#include +#include // IWYU pragma: keep #include #include // for str_attr_accessor #include // for pybind11::int_ #include -#include #include #include #include diff --git a/morpheus/_lib/stages/module.cpp b/morpheus/_lib/stages/module.cpp index 61dd63a7d0..01f4cf6aae 100644 --- a/morpheus/_lib/stages/module.cpp +++ b/morpheus/_lib/stages/module.cpp @@ -36,18 +36,21 @@ #include "morpheus/utilities/http_server.hpp" // for DefaultMaxPayloadSize #include "morpheus/version.hpp" +#include // for Builder #include #include -#include // for multiple_inheritance -#include // for arg, init, class_, module_, str_attr_accessor, PYBIND11_MODULE, pybind11 -#include // for dict, sequence -#include // for pathlib.Path -> std::filesystem::path conversions +#include // for multiple_inheritance +#include // for arg, init, class_, module_, str_attr_accessor, PYBIND11_MODULE, pybind11 +#include // for dict, sequence +// for pathlib.Path -> std::filesystem::path conversions +#include // IWYU pragma: keep #include // for pymrc::import #include #include // for std::filesystem::path #include #include +#include namespace morpheus { namespace py = pybind11; From 89b45f983a84edbe05f7cdc30e05d1ac7bab4fde Mon Sep 17 00:00:00 2001 From: David Gardner Date: Wed, 28 Feb 2024 12:45:37 -0800 Subject: [PATCH 5/5] Remove old comment --- morpheus/_lib/stages/module.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/morpheus/_lib/stages/module.cpp b/morpheus/_lib/stages/module.cpp index 01f4cf6aae..0fc47034d6 100644 --- a/morpheus/_lib/stages/module.cpp +++ b/morpheus/_lib/stages/module.cpp @@ -112,7 +112,6 @@ PYBIND11_MODULE(stages, _module) py::arg("task_type") = py::none(), py::arg("task_payload") = py::none()); - // py::overload_cast py::class_, mrc::segment::ObjectProperties, std::shared_ptr>>(