From 62a81b903f319803724c2cf98a62e89cf62ba2db Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 24 Jul 2024 15:49:10 +0000 Subject: [PATCH 1/4] rm nvtabular from dependencies --- ci/conda/recipes/morpheus/meta.yaml | 1 - conda/environments/all_cuda-121_arch-x86_64.yaml | 1 - conda/environments/dev_cuda-121_arch-x86_64.yaml | 1 - conda/environments/examples_cuda-121_arch-x86_64.yaml | 1 - conda/environments/runtime_cuda-121_arch-x86_64.yaml | 1 - dependencies.yaml | 1 - docs/source/conf.py | 1 - examples/digital_fingerprinting/production/conda_env.yml | 1 - 8 files changed, 8 deletions(-) diff --git a/ci/conda/recipes/morpheus/meta.yaml b/ci/conda/recipes/morpheus/meta.yaml index 2d3921bbe9..fc55e54f30 100644 --- a/ci/conda/recipes/morpheus/meta.yaml +++ b/ci/conda/recipes/morpheus/meta.yaml @@ -101,7 +101,6 @@ outputs: - mrc - networkx=2.8.8 - numpydoc =1.5.* - - nvtabular =23.08.00 - pydantic - pluggy =1.3.* - python diff --git a/conda/environments/all_cuda-121_arch-x86_64.yaml b/conda/environments/all_cuda-121_arch-x86_64.yaml index 6ee1a32335..f8bb9b9529 100644 --- a/conda/environments/all_cuda-121_arch-x86_64.yaml +++ b/conda/environments/all_cuda-121_arch-x86_64.yaml @@ -71,7 +71,6 @@ dependencies: - nodejs=18.* - numexpr - numpydoc=1.5 -- nvtabular=23.08.00 - onnx=1.15 - openai=1.13 - papermill=2.4.0 diff --git a/conda/environments/dev_cuda-121_arch-x86_64.yaml b/conda/environments/dev_cuda-121_arch-x86_64.yaml index 01095d7e85..e0d60b211e 100644 --- a/conda/environments/dev_cuda-121_arch-x86_64.yaml +++ b/conda/environments/dev_cuda-121_arch-x86_64.yaml @@ -59,7 +59,6 @@ dependencies: - nlohmann_json=3.11 - nodejs=18.* - numpydoc=1.5 -- nvtabular=23.08.00 - pip - pkg-config=0.29 - pluggy=1.3 diff --git a/conda/environments/examples_cuda-121_arch-x86_64.yaml b/conda/environments/examples_cuda-121_arch-x86_64.yaml index dc14b6dd15..e79948e636 100644 --- a/conda/environments/examples_cuda-121_arch-x86_64.yaml +++ b/conda/environments/examples_cuda-121_arch-x86_64.yaml @@ -36,7 +36,6 @@ dependencies: - nodejs=18.* - numexpr - numpydoc=1.5 -- nvtabular=23.08.00 - onnx=1.15 - openai=1.13 - papermill=2.4.0 diff --git a/conda/environments/runtime_cuda-121_arch-x86_64.yaml b/conda/environments/runtime_cuda-121_arch-x86_64.yaml index 6944df559d..ea6a442b3a 100644 --- a/conda/environments/runtime_cuda-121_arch-x86_64.yaml +++ b/conda/environments/runtime_cuda-121_arch-x86_64.yaml @@ -30,7 +30,6 @@ dependencies: - mrc=24.10 - networkx=2.8.8 - numpydoc=1.5 -- nvtabular=23.08.00 - pip - pluggy=1.3 - pydantic diff --git a/dependencies.yaml b/dependencies.yaml index 9f671f08d5..84b58f0e11 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -334,7 +334,6 @@ dependencies: - mrc=24.10 - networkx=2.8.8 - numpydoc=1.5 - - nvtabular=23.08.00 - pydantic # - python ## - python-confluent-kafka>=1.9.2,<1.10.0a0 diff --git a/docs/source/conf.py b/docs/source/conf.py index 709e8230e0..70f5e51eb9 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -175,7 +175,6 @@ "langchain_core", "merlin", "morpheus.cli.commands", # Dont document the CLI in Sphinx - "nvtabular", "pandas", "pydantic", "pymilvus", diff --git a/examples/digital_fingerprinting/production/conda_env.yml b/examples/digital_fingerprinting/production/conda_env.yml index 36ebd7d448..0eb8d3b2db 100644 --- a/examples/digital_fingerprinting/production/conda_env.yml +++ b/examples/digital_fingerprinting/production/conda_env.yml @@ -29,7 +29,6 @@ dependencies: - librdkafka - mlflow>=2.10.0,<3 - nodejs=18.* - - nvtabular=23.06 - papermill - s3fs>=2023.6 From 70948e13739d4c42d4aaffc118c94284529ef651 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Fri, 26 Jul 2024 21:38:02 +0000 Subject: [PATCH 2/4] rm nvtabular --- morpheus/utils/column_info.py | 9 - morpheus/utils/downloader.py | 43 +- morpheus/utils/nvt/__init__.py | 13 - morpheus/utils/nvt/decorators.py | 123 ---- morpheus/utils/nvt/extensions/__init__.py | 17 - morpheus/utils/nvt/extensions/morpheus_ext.py | 27 - morpheus/utils/nvt/mutate.py | 195 ------ morpheus/utils/nvt/patches/__init__.py | 15 - morpheus/utils/nvt/patches/merlin_patches.py | 30 - morpheus/utils/nvt/schema_converters.py | 642 ----------------- morpheus/utils/nvt/transforms.py | 64 -- morpheus/utils/schema_transforms.py | 107 +-- .../test_dfp_file_to_df.py | 24 +- tests/test_column_info.py | 63 -- tests/test_downloader.py | 19 +- tests/utils/nvt/__init__.py | 13 - tests/utils/nvt/integration/__init__.py | 13 - tests/utils/nvt/integration/test_mutate_op.py | 65 -- .../utils/nvt/test_json_flatten_transform.py | 68 -- tests/utils/nvt/test_mutate_op.py | 120 ---- tests/utils/nvt/test_schema_converters.py | 661 ------------------ tests/utils/nvt/test_transforms.py | 52 -- 22 files changed, 58 insertions(+), 2325 deletions(-) delete mode 100644 morpheus/utils/nvt/__init__.py delete mode 100644 morpheus/utils/nvt/decorators.py delete mode 100644 morpheus/utils/nvt/extensions/__init__.py delete mode 100644 morpheus/utils/nvt/extensions/morpheus_ext.py delete mode 100644 morpheus/utils/nvt/mutate.py delete mode 100644 morpheus/utils/nvt/patches/__init__.py delete mode 100644 morpheus/utils/nvt/patches/merlin_patches.py delete mode 100644 morpheus/utils/nvt/schema_converters.py delete mode 100644 morpheus/utils/nvt/transforms.py delete mode 100644 tests/utils/nvt/__init__.py delete mode 100644 tests/utils/nvt/integration/__init__.py delete mode 100644 tests/utils/nvt/integration/test_mutate_op.py delete mode 100644 tests/utils/nvt/test_json_flatten_transform.py delete mode 100644 tests/utils/nvt/test_mutate_op.py delete mode 100644 tests/utils/nvt/test_schema_converters.py delete mode 100644 tests/utils/nvt/test_transforms.py diff --git a/morpheus/utils/column_info.py b/morpheus/utils/column_info.py index a5e892a8bb..9ebba46f2b 100644 --- a/morpheus/utils/column_info.py +++ b/morpheus/utils/column_info.py @@ -17,7 +17,6 @@ import logging import re import typing -import warnings from datetime import datetime from functools import partial @@ -25,12 +24,6 @@ import cudf -if (typing.TYPE_CHECKING): - with warnings.catch_warnings(): - # Ignore warning regarding tensorflow not being installed - warnings.filterwarnings("ignore", message=".*No module named 'tensorflow'", category=UserWarning) - import nvtabular as nvt - logger = logging.getLogger(f"morpheus.{__name__}") DEFAULT_DATE = '1970-01-01T00:00:00.000000+00:00' @@ -749,7 +742,6 @@ class DataFrameInputSchema: input_columns: typing.Dict[str, str] = dataclasses.field(init=False, repr=False) output_columns: typing.List[tuple[str, str]] = dataclasses.field(init=False, repr=False) - nvt_workflow: "nvt.Workflow" = dataclasses.field(init=False, repr=False) prep_dataframe: typing.Callable[[pd.DataFrame], typing.List[str]] = dataclasses.field(init=False, repr=False) def __post_init__(self): @@ -797,4 +789,3 @@ def __post_init__(self): json_cols=self.json_columns, preserve_re=self.preserve_columns) - self.nvt_workflow = None diff --git a/morpheus/utils/downloader.py b/morpheus/utils/downloader.py index 0a68ae6e14..0cbd17f56a 100644 --- a/morpheus/utils/downloader.py +++ b/morpheus/utils/downloader.py @@ -26,7 +26,6 @@ import fsspec import pandas as pd -from merlin.core.utils import Distributed logger = logging.getLogger(__name__) @@ -69,7 +68,6 @@ def __init__(self, download_method: typing.Union[DownloadMethods, str] = DownloadMethods.DASK_THREAD, dask_heartbeat_interval: str = "30s"): - self._merlin_distributed = None self._dask_heartbeat_interval = dask_heartbeat_interval download_method = os.environ.get("MORPHEUS_FILE_DOWNLOAD_TYPE", download_method) @@ -99,20 +97,25 @@ def get_dask_cluster(self): Returns ------- - dask_cuda.LocalCUDACluster + dask.distributed.LocalCluster """ with Downloader._mutex: if Downloader._dask_cluster is None: + import dask + import dask.distributed import dask_cuda.utils logger.debug("Creating dask cluster...") - n_workers = dask_cuda.utils.get_n_gpus() - threads_per_worker = mp.cpu_count() // n_workers + Downloader._dask_cluster = dask.distributed.LocalCluster(start=True, + processes=self.download_method != "dask_thread") - Downloader._dask_cluster = dask_cuda.LocalCUDACluster(n_workers=n_workers, - threads_per_worker=threads_per_worker) + # n_workers = dask_cuda.utils.get_n_gpus() + # threads_per_worker = mp.cpu_count() // n_workers + + # Downloader._dask_cluster = dask_cuda.LocalCUDACluster(n_workers=n_workers, + # threads_per_worker=threads_per_worker) logger.debug("Creating dask cluster... Done. Dashboard: %s", Downloader._dask_cluster.dashboard_link) @@ -127,24 +130,18 @@ def get_dask_client(self): dask.distributed.Client """ import dask.distributed + return dask.distributed.Client(self.get_dask_cluster()) - # Up the heartbeat interval which can get violated with long download times - dask.config.set({"distributed.client.heartbeat": self._dask_heartbeat_interval}) + def close(self): + """Close the dask cluster if it exists.""" + if (self._dask_cluster is not None): + logger.debug("Stopping dask cluster...") - if (self._merlin_distributed is None): - with warnings.catch_warnings(): - # Merlin.Distributed will warn if a client already exists, the client in question is the one created - # and are explicitly passing to it in the constructor. - warnings.filterwarnings("ignore", - message="Existing Dask-client object detected in the current context.*", - category=UserWarning) - self._merlin_distributed = Distributed(client=dask.distributed.Client(self.get_dask_cluster())) + self._dask_cluster.close() - return self._merlin_distributed + self._dask_cluster = None - def close(self): - """Cluster management is handled by Merlin.Distributed""" - pass + logger.debug("Stopping dask cluster... Done.") def download(self, download_buckets: fsspec.core.OpenFiles, @@ -169,8 +166,8 @@ def download(self, if (self._download_method.startswith("dask")): # Create the client each time to ensure all connections to the cluster are closed (they can time out) with self.get_dask_client() as dist: - dfs = dist.client.map(download_fn, download_buckets) - dfs = dist.client.gather(dfs) + dfs = dist.map(download_fn, download_buckets) + dfs = dist.gather(dfs) else: # Simply loop diff --git a/morpheus/utils/nvt/__init__.py b/morpheus/utils/nvt/__init__.py deleted file mode 100644 index 66061e580b..0000000000 --- a/morpheus/utils/nvt/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/morpheus/utils/nvt/decorators.py b/morpheus/utils/nvt/decorators.py deleted file mode 100644 index 6d13dfa444..0000000000 --- a/morpheus/utils/nvt/decorators.py +++ /dev/null @@ -1,123 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import functools -import inspect -import os -import typing - -import pandas as pd - -import cudf - - -def sync_df_as_pandas(df_arg_name='df'): - """ - This function serves as a decorator that synchronizes cudf.DataFrame to pandas.DataFrame before applying the - function. - - Parameters - ---------- - df_arg_name : str - The name of the DataFrame parameter in the decorated function. - - Returns - ------- - Callable - The decorator. - """ - - # pylint: disable=pointless-string-statement - x_data_frame = typing.TypeVar("x_data_frame", pd.DataFrame, cudf.DataFrame) - """ - Represents a DataFrame that can be either a pandas or cudf DataFrame - """ - - # pylint: disable=pointless-string-statement - _sync_pandas_args = typing.ParamSpec('_sync_pandas_args') - """ - Represents the remaining arguments to the function after the first argument (the DataFrame) - """ - - def decorator(func: typing.Callable[typing.Concatenate[pd.DataFrame, _sync_pandas_args], pd.DataFrame]) -> \ - typing.Callable[typing.Concatenate[x_data_frame, _sync_pandas_args], x_data_frame]: - """ - The actual decorator that wraps the function. - - Parameters - ---------- - func : Callable - The function to apply to the DataFrame. - - Returns - ------- - Callable - The wrapped function. - """ - - def wrapper(*args, **kwargs) -> typing.Union[pd.DataFrame, cudf.DataFrame]: - is_arg = False - arg_index = 0 - df_arg = kwargs.get(df_arg_name) - if df_arg is None: - # try to get DataFrame argument from positional arguments - func_args = inspect.signature(func).parameters - for i, arg in enumerate(func_args): - if arg == df_arg_name: - is_arg = True - arg_index = i - df_arg = args[i] - break - - convert_to_cudf = False - if isinstance(df_arg, cudf.DataFrame): - convert_to_cudf = True - if (is_arg): - args = list(args) - args[arg_index] = df_arg.to_pandas() - args = tuple(args) - else: - kwargs[df_arg_name] = df_arg.to_pandas() - - result = func(*args, **kwargs) - - if convert_to_cudf: - result = cudf.from_pandas(result) - - return result - - return wrapper - - return decorator - - -# Avoid using the annotate decorator in sphinx builds, instead define a simple pass-through decorator -if os.environ.get("MORPHEUS_IN_SPHINX_BUILD") is None: - from merlin.core.dispatch import annotate # pylint: disable=unused-import -else: - - def annotate(*args, **kwargs): # pylint: disable=unused-argument - """ - `merlin.core.dispatch.annotate` - """ - - def decorator(func): - - @functools.wraps(func) - def wrappper(*args, **kwargs): - return func(*args, **kwargs) - - return wrappper - - return decorator diff --git a/morpheus/utils/nvt/extensions/__init__.py b/morpheus/utils/nvt/extensions/__init__.py deleted file mode 100644 index 57d1384dac..0000000000 --- a/morpheus/utils/nvt/extensions/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .morpheus_ext import register_morpheus_extensions - -__all__ = ["register_morpheus_extensions"] diff --git a/morpheus/utils/nvt/extensions/morpheus_ext.py b/morpheus/utils/nvt/extensions/morpheus_ext.py deleted file mode 100644 index b85a3bb68d..0000000000 --- a/morpheus/utils/nvt/extensions/morpheus_ext.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -def register_morpheus_extensions(): - from datetime import datetime - - import merlin.dtypes.aliases as mn - from merlin.dtypes import register - from merlin.dtypes.mapping import DTypeMapping - - morpheus_extension = DTypeMapping(mapping={ - mn.datetime64: [datetime], - }, ) - - register("morpheus_ext", morpheus_extension) diff --git a/morpheus/utils/nvt/mutate.py b/morpheus/utils/nvt/mutate.py deleted file mode 100644 index c9228f1310..0000000000 --- a/morpheus/utils/nvt/mutate.py +++ /dev/null @@ -1,195 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import typing -from inspect import getsourcelines - -import numpy as np -from merlin.core.dispatch import DataFrameType -from merlin.schema import ColumnSchema -from merlin.schema import Schema -from nvtabular.ops.operator import ColumnSelector -from nvtabular.ops.operator import Operator - -from morpheus.utils.nvt.decorators import annotate - - -class MutateOp(Operator): - - def __init__(self, - func: typing.Callable, - output_columns: typing.Optional[typing.List] = None, - dependencies: typing.Optional[typing.List] = None, - label: typing.Optional[str] = None): - """ - Initialize MutateOp class. - - Parameters - ---------- - func : Callable - Function to perform mutation operation. - output_columns : Optional[List], optional - List of output columns, by default None. - dependencies : Optional[List], optional - List of dependencies, by default None. - label : Optional[str], optional - Label for MutateOp, by default None. - """ - - super().__init__() - - self._dependencies = dependencies or [] - self._func = func - self._label = label - self._output_columns = output_columns or [] - - def _remove_deps(self, column_selector: ColumnSelector): - """ - Remove dependencies from column selector. - - Parameters - ---------- - column_selector : ColumnSelector - Instance of ColumnSelector from which dependencies will be removed. - - Returns - ------- - ColumnSelector - Updated instance of ColumnSelector. - """ - - to_skip = ColumnSelector( - [dep if isinstance(dep, str) else dep.output_schema.column_names for dep in self._dependencies]) - - return column_selector.filter_columns(to_skip) - - @property - def label(self): - """ - Get the label of the MutateOp instance. - - Returns - ------- - str - The label of the MutateOp instance. - """ - - if (self._label is not None): - return self._label - - # if we have a named function (not a lambda) return the function name - name = self._func.__name__.split(".")[-1] - if name != "": - return f"MutateOp: {name}" - - try: - # otherwise get the lambda source code from the inspect module if possible - source = getsourcelines(self.f)[0][0] # pylint: disable=no-member - lambdas = [op.strip() for op in source.split(">>") if "lambda " in op] - if len(lambdas) == 1 and lambdas[0].count("lambda") == 1: - return lambdas[0] - except Exception: # pylint: disable=broad-except - # we can fail to load the source in distributed environments. Since the - # label is mainly used for diagnostics, don't worry about the error here and - # fallback to the default labelling - pass - - # Failed to figure out the source - return "MutateOp" - - # pylint: disable=arguments-renamed - @annotate("MutateOp", color="darkgreen", domain="nvt_python") - def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType: - """ - Apply the transformation function on the dataframe. - - Parameters - ---------- - col_selector : ColumnSelector - Instance of ColumnSelector. - df : DataFrameType - Input dataframe. - - Returns - ------- - DataFrameType - Transformed dataframe. - """ - - df = self._func(col_selector, df) - - # If our dataframe doesn't contain the expected output columns, even after processing, we add dummy columns. - # This could occur if our JSON data doesn't always contain columns we expect to be expanded. - df_cols_set = set(df.columns) - new_cols = { - col[0]: np.zeros(df.shape[0], dtype=col[1]) - for col in self._output_columns if col[0] not in df_cols_set - } - - df = df.assign(**new_cols) - - return df - - def column_mapping(self, col_selector: ColumnSelector) -> typing.Dict[str, str]: - """ - Generate a column mapping. - - Parameters - ---------- - col_selector : ColumnSelector - Instance of ColumnSelector. - - Returns - ------- - Dict[str, str] - Dictionary of column mappings. - """ - - column_mapping = {} - - for col_name, _ in self._output_columns: - column_mapping[col_name] = col_selector.names - - return column_mapping - - def compute_output_schema( - self, - input_schema: Schema, - col_selector: ColumnSelector, - prev_output_schema: typing.Optional[Schema] = None, - ) -> Schema: - """ - Compute the output schema. - - Parameters - ---------- - input_schema : Schema - The input schema. - col_selector : ColumnSelector - Instance of ColumnSelector. - prev_output_schema : Optional[Schema], optional - Previous output schema, by default None. - - Returns - ------- - Schema - The output schema. - """ - output_schema = super().compute_output_schema(input_schema, col_selector, prev_output_schema) - - # Add new columns to the output schema - for col, dtype in self._output_columns: - output_schema += Schema([ColumnSchema(col, dtype=dtype)]) - - return output_schema diff --git a/morpheus/utils/nvt/patches/__init__.py b/morpheus/utils/nvt/patches/__init__.py deleted file mode 100644 index 03a1b3bc36..0000000000 --- a/morpheus/utils/nvt/patches/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .merlin_patches import patch_numpy_dtype_registry diff --git a/morpheus/utils/nvt/patches/merlin_patches.py b/morpheus/utils/nvt/patches/merlin_patches.py deleted file mode 100644 index 9d9d82e81d..0000000000 --- a/morpheus/utils/nvt/patches/merlin_patches.py +++ /dev/null @@ -1,30 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -def patch_numpy_dtype_registry() -> None: - """ - Patches the Merlin dtypes registry to support conversion from Merlin 'struct' dtypes to the equivalent numpy object. - - This is necessary to support pandas conversion of input dataframes containing 'struct' dtypes within an NVT - operator. Until this is fixed upstream, with the mappings added to `merlin/dtypes/mappings/numpy.py`, this patch - should be used. The function is idempotent, and should be called before any NVT operators are used. - """ - import merlin.dtypes.aliases as mn - import numpy as np - from merlin.dtypes import _dtype_registry - - numpy_dtypes = _dtype_registry.mappings["numpy"].from_merlin_ - if (mn.struct not in numpy_dtypes.keys()): - numpy_dtypes[mn.struct] = [np.dtype("O"), object] diff --git a/morpheus/utils/nvt/schema_converters.py b/morpheus/utils/nvt/schema_converters.py deleted file mode 100644 index 44249618b7..0000000000 --- a/morpheus/utils/nvt/schema_converters.py +++ /dev/null @@ -1,642 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import dataclasses -import os -import typing -from functools import partial - -import networkx as nx -import nvtabular as nvt -import pandas as pd -from merlin.core.dispatch import DataFrameType -from merlin.core.dispatch import annotate -from merlin.core.dispatch import is_dataframe_object -from merlin.core.dispatch import is_series_object -from merlin.dag import ColumnSelector -from nvtabular.ops import Filter -from nvtabular.ops import LambdaOp -from nvtabular.ops import Rename - -import cudf - -from morpheus.utils.column_info import BoolColumn -from morpheus.utils.column_info import ColumnInfo -from morpheus.utils.column_info import CustomColumn -from morpheus.utils.column_info import DataFrameInputSchema -from morpheus.utils.column_info import DateTimeColumn -from morpheus.utils.column_info import DistinctIncrementColumn -from morpheus.utils.column_info import IncrementColumn -from morpheus.utils.column_info import RenameColumn -from morpheus.utils.column_info import StringCatColumn -from morpheus.utils.column_info import StringJoinColumn -from morpheus.utils.column_info import create_increment_col -from morpheus.utils.nvt.decorators import sync_df_as_pandas -from morpheus.utils.nvt.mutate import MutateOp -from morpheus.utils.nvt.transforms import json_flatten - - -@dataclasses.dataclass -class JSONFlattenInfo(ColumnInfo): - """ - Subclass of `ColumnInfo`. Makes it easier to generate a graph of the column dependencies. - - Attributes - ---------- - input_col_names : list - List of input column names. - output_col_names : list - List of output column names. - """ - - input_col_names: list - output_col_names: list - - def get_input_column_types(self) -> dict[str, str]: - """ - Return a dictionary of input column names and types needed for processing. This is used for schema - validation and should be overridden by subclasses. - """ - return {name: ColumnInfo.convert_pandas_dtype(str) for name in self.input_col_names} - - -# Same in every way to the base, except we don't drop the index -class _MorpheusFilter(Filter): - - @annotate("Filter_op", color="darkgreen", domain="nvt_python") - def transform(self, col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType: - filtered = self.f(df) - if is_dataframe_object(filtered): - new_df = filtered - elif is_series_object(filtered) and filtered.dtype == bool: - new_df = df[filtered] - else: - raise ValueError(f"Invalid output from filter op: f{filtered.__class__}") - - # new_df.reset_index(drop=True, inplace=True) - return new_df - - -def _get_ci_column_selector(col_info) -> typing.Union[str, typing.List[str]]: - """ - Return a column selector based on a ColumnInfo object. - - Parameters - ---------- - col_info : ColumnInfo - The ColumnInfo object. - - Returns - ------- - Union[str, list of str] - A column selector. - - Raises - ------ - TypeError - If the input `ci` is not an instance of ColumnInfo. - Exception - If the type of ColumnInfo is unknown. - """ - - if (not isinstance(col_info, ColumnInfo)): - raise TypeError - - selected_cols = col_info.get_input_column_types() - - if (len(selected_cols) == 0 and isinstance(col_info, CustomColumn)): - return ["*"] - - return list(selected_cols.keys()) - - -def _json_flatten_from_input_schema(json_input_cols: typing.List[str], - json_output_cols: typing.List[typing.Tuple[str, str]]) -> MutateOp: - """ - Return a JSON flatten operation from an input schema. - - Parameters - ---------- - json_input_cols : list of str - A list of JSON input columns. - json_output_cols : list of tuple - A list of JSON output columns. - - Returns - ------- - MutateOp - A MutateOp object that represents the JSON flatten operation. - """ - - json_flatten_op = MutateOp(json_flatten, dependencies=json_input_cols, output_columns=json_output_cols) - - return json_flatten_op - - -@sync_df_as_pandas() -def _string_cat_col(df: pd.DataFrame, output_column: str, sep: str) -> pd.DataFrame: - """ - Concatenate the string representation of all supplied columns in a DataFrame. - - Parameters - ---------- - df : pandas.DataFrame - The input DataFrame. - output_column : str - The name of the output column. - sep : str - The separator to use when concatenating the strings. - - Returns - ------- - pandas.DataFrame - The resulting DataFrame. - """ - - cat_col = df.apply(lambda row: sep.join(row.values.astype(str)), axis=1) - - return pd.DataFrame({output_column: cat_col}, index=cat_col.index) - - -# pylint -def _nvt_string_cat_col( - column_selector: ColumnSelector, # pylint: disable=unused-argument - df: typing.Union[pd.DataFrame, cudf.DataFrame], - output_column: str, - input_columns: typing.List[str], - sep: str = ', '): - """ - Concatenates the string representation of the specified columns in a DataFrame. - - Parameters - ---------- - column_selector : ColumnSelector - A ColumnSelector object. - df : Union[pandas.DataFrame, cudf.DataFrame] - The input DataFrame. - output_column : str - The name of the output column. - input_columns : list of str - The input columns to concatenate. - sep : str, default is ', ' - The separator to use when concatenating the strings. - - Returns - ------- - Union[pandas.DataFrame, cudf.DataFrame] - The resulting DataFrame. - """ - - return _string_cat_col(df[input_columns], output_column=output_column, sep=sep) - - -@sync_df_as_pandas() -def _increment_column(df: pd.DataFrame, - output_column: str, - input_column: str, - groupby_column: str, - period: str = 'D') -> pd.DataFrame: - """ - Crete an increment a column in a DataFrame. - - Parameters - ---------- - df : pandas.DataFrame - The input DataFrame. - output_column : str - The name of the output column. - input_column : str - The name of the input column. - period : str, default is 'D' - The period to increment by. - - Returns - ------- - pandas.DataFrame - The resulting DataFrame. - """ - - period_index = pd.to_datetime(df[input_column]).dt.to_period(period) - groupby_col = df.groupby([groupby_column, period_index]).cumcount() - - return pd.DataFrame({output_column: groupby_col}, index=groupby_col.index) - - -def _nvt_increment_column( - column_selector: ColumnSelector, # pylint: disable=unused-argument - df: typing.Union[pd.DataFrame, cudf.DataFrame], - output_column: str, - input_column: str, - groupby_column: str, - period: str = 'D') -> typing.Union[pd.DataFrame, cudf.DataFrame]: - """ - Increment a column in a DataFrame. - - Parameters - ---------- - column_selector : ColumnSelector - A ColumnSelector object. Unused. - df : Union[pandas.DataFrame, cudf.DataFrame] - The input DataFrame. - output_column : str - The name of the output column. - input_column : str - The name of the input column. - groupby_column : str - Name of the column to groupby after creating the increment - period : str, default is 'D' - The period to increment by. - - Returns - ------- - Union[pandas.DataFrame, cudf.DataFrame] - The resulting DataFrame. - """ - - return _increment_column(df, output_column, input_column, groupby_column, period) - - -@sync_df_as_pandas() -def _distinct_increment_column(df: pd.DataFrame, - output_column: str, - input_column: str, - groupby_column: str = "username", - period: str = 'D', - timestamp_column: str = "timestamp") -> pd.DataFrame: - - output_series = create_increment_col(df=df, - column_name=input_column, - groupby_column=groupby_column, - period=period, - timestamp_column=timestamp_column) - - return pd.DataFrame({output_column: output_series}, index=output_series.index) - - -def _nvt_distinct_increment_column(_: ColumnSelector, - df: typing.Union[pd.DataFrame, cudf.DataFrame], - output_column: str, - input_column: str, - groupby_column: str = "username", - period: str = 'D', - timestamp_column: str = "timestamp") -> typing.Union[pd.DataFrame, cudf.DataFrame]: - - return _distinct_increment_column(df, output_column, input_column, groupby_column, period, timestamp_column) - - -@sync_df_as_pandas() -def _nvt_try_rename(df: pd.DataFrame, input_col_name: str, output_col_name: str, dtype: None) -> pd.Series: - if (input_col_name in df.columns): - return df.rename(columns={input_col_name: output_col_name}) - - return pd.Series(None, index=df.index, dtype=dtype) - - -# Mappings from ColumnInfo types to functions that create the corresponding NVT operator -ColumnInfoProcessingMap = { - BoolColumn: - lambda ci, - deps: [ - LambdaOp( - lambda series: series.map(ci.value_map).astype(bool), dtype="bool", label=f"[BoolColumn] '{ci.name}'") - ], - ColumnInfo: - lambda ci, - deps: [ - MutateOp(lambda _, - df: df.assign(**{ci.name: df[ci.name].astype(ci.get_pandas_dtype())}) if (ci.name in df.columns) - else df.assign(**{ci.name: pd.Series(None, index=df.index, dtype=ci.get_pandas_dtype())}), - dependencies=deps, - output_columns=[(ci.name, ci.dtype)], - label=f"[ColumnInfo] '{ci.name}'") - ], - # Note(Devin): Custom columns are, potentially, very inefficient, because we have to run the custom function on the - # entire dataset this is because NVT requires the input column be available, but CustomColumn is a generic - # transform taking df->series(ci.name) - CustomColumn: - lambda ci, - deps: [ - MutateOp(lambda _, - df: cudf.DataFrame({ci.name: ci.process_column_fn(df)}, index=df.index), - dependencies=deps, - output_columns=[(ci.name, ci.dtype)], - label=f"[CustomColumn] '{ci.name}'") - ], - DateTimeColumn: - lambda ci, - deps: [ - Rename(f=lambda name: ci.name if name == ci.input_name else name), - LambdaOp(lambda series: series.astype(ci.dtype), dtype=ci.dtype, label=f"[DateTimeColumn] '{ci.name}'") - ], - IncrementColumn: - lambda ci, - deps: [ - MutateOp(partial(_nvt_increment_column, - output_column=ci.name, - input_column=ci.input_name, - groupby_column=ci.groupby_column, - period=ci.period), - dependencies=deps, - output_columns=[(ci.name, ci.dtype)], - label=f"[IncrementColumn] '{ci.input_name}.{ci.groupby_column}' => '{ci.name}'") - ], - DistinctIncrementColumn: - lambda ci, - deps: [ - MutateOp(partial(_nvt_distinct_increment_column, - output_column=ci.name, - input_column=ci.input_name, - groupby_column=ci.groupby_column, - period=ci.period, - timestamp_column=ci.timestamp_column), - dependencies=deps, - output_columns=[(ci.name, ci.dtype)], - label=(f"[DistinctIncrementColumn] " - f"'{ci.input_name}.{ci.groupby_column}.{ci.timestamp_column}' => '{ci.name}'")) - ], - RenameColumn: - lambda ci, - deps: [ - MutateOp(lambda selector, - df: _nvt_try_rename(df, ci.input_name, ci.name, ci.dtype), - dependencies=deps, - output_columns=[(ci.name, ci.dtype)], - label=f"[RenameColumn] '{ci.input_name}' => '{ci.name}'") - ], - StringCatColumn: - lambda ci, - deps: [ - MutateOp(partial(_nvt_string_cat_col, output_column=ci.name, input_columns=ci.input_columns, sep=ci.sep), - dependencies=deps, - output_columns=[(ci.name, ci.dtype)], - label=f"[StringCatColumn] '{','.join(ci.input_columns)}' => '{ci.name}'") - ], - StringJoinColumn: - lambda ci, - deps: [ - MutateOp(partial( - _nvt_string_cat_col, output_column=ci.name, input_columns=[ci.name, ci.input_name], sep=ci.sep), - dependencies=deps, - output_columns=[(ci.name, ci.dtype)], - label=f"[StringJoinColumn] '{ci.input_name}' => '{ci.name}'") - ], - JSONFlattenInfo: - lambda ci, - deps: [_json_flatten_from_input_schema(ci.input_col_names, ci.output_col_names)] -} - - -def _build_nx_dependency_graph(column_info_objects: typing.List[ColumnInfo]) -> nx.DiGraph: - """ - Build a networkx directed graph for dependencies among columns. - - Parameters - ---------- - column_info_objects : list of ColumnInfo - List of column information objects. - - Returns - ------- - nx.DiGraph - A networkx DiGraph where nodes represent columns and edges represent dependencies between columns. - - """ - graph = nx.DiGraph() - - def _find_dependent_column(name, current_name): - for col_info in column_info_objects: - if col_info.name == current_name: - continue - - # pylint: disable=no-else-return - if col_info.name == name: - return col_info - elif col_info.__class__ == JSONFlattenInfo: - if name in [c for c, _ in col_info.output_col_names]: - return col_info - - return None - - # For each column, determine the inputs for that column and add edges to the graph - # Some columns will use simple strings - for col_info in column_info_objects: - graph.add_node(col_info.name) - - for input_col_name in col_info.get_input_column_types().keys(): - dep_col_info = _find_dependent_column(input_col_name, col_info.name) - if (dep_col_info): - graph.add_edge(dep_col_info.name, col_info.name) - - return graph - - -def _bfs_traversal_with_op_map(graph: nx.Graph, - ci_map: typing.Dict[str, ColumnInfo], - root_nodes: typing.List[typing.Any]): - """ - Perform Breadth-First Search (BFS) on a given graph. - - Parameters - ---------- - graph : nx.Graph - The graph on which BFS needs to be performed. - ci_map : dict - The dictionary mapping column info. - root_nodes : list - List of root nodes where BFS should start. - - Returns - ------- - tuple - Tuple containing the visited nodes and node-operation mapping. - """ - - visited = set() - queue = list(root_nodes) - node_op_map = {} - - while queue: - node = queue.pop(0) - if node not in visited: - visited.add(node) - - # We need to start an operator chain with a column selector, so root nodes need to prepend a parent - # column selection operator - parent_input = _get_ci_column_selector(ci_map[node]) - - parents = list(graph.predecessors(node)) - - # Thin the parent_input by any actual parents - parent_input = [x for x in parent_input if x not in parents] - - # If we are a single element list, just use the element - if (len(parent_input) == 1): - parent_input = parent_input[0] - - if len(parents) > 0: - # Not a root node, so we need to gather the parent operators, and collect them up. - for parent in parents: - if isinstance(parent_input, list) and len(parent_input) == 0: - parent_input = node_op_map[parent] - else: - parent_input = parent_input + node_op_map[parent] - - # Map the column info object to its NVT operator implementation - nvt_ops = ColumnInfoProcessingMap[type(ci_map[node])](ci_map[node], deps=[]) - - # Chain ops together into a compound op - node_op = parent_input - for nvt_op in nvt_ops: - node_op = node_op >> nvt_op - - # Set the op for this node to the compound operator - node_op_map[node] = node_op - - # Add our neighbors to the queue - neighbors = list(graph.neighbors(node)) - for neighbor in neighbors: - queue.append(neighbor) - - return visited, node_op_map - - -def _coalesce_leaf_nodes(node_op_map: typing.Dict[typing.Any, typing.Any], - column_info_objects: list[ColumnInfo]) -> typing.Any: - """ - Coalesce (combine) operations for the leaf nodes of a graph. - - Parameters - ---------- - node_op_map : dict - Dictionary mapping nodes to operations. - graph : nx.Graph - The graph to be processed. - preserve_re : regex - Regular expression for nodes to be preserved. - - Returns - ------- - obj - Coalesced workflow for leaf nodes. - """ - coalesced_workflow = None - - for column_info in column_info_objects: - - nvt_op = node_op_map[column_info.name] - - if coalesced_workflow is None: - coalesced_workflow = nvt_op - else: - coalesced_workflow = coalesced_workflow + nvt_op - - return coalesced_workflow - - -def _coalesce_ops(graph: nx.Graph, column_info_objects: list[ColumnInfo]) -> typing.Any: - """ - Coalesce (combine) operations for a graph. - - Parameters - ---------- - graph : nx.Graph - The graph to be processed. - ci_map : dict - The dictionary mapping column info. - preserve_re : regex, optional - Regular expression for nodes to be preserved. - - Returns - ------- - obj - Coalesced workflow for the graph. - """ - - ci_map = {ci.name: ci for ci in column_info_objects} - - root_nodes = [node for node, in_degree in graph.in_degree() if in_degree == 0] - - _, node_op_map = _bfs_traversal_with_op_map(graph, ci_map, root_nodes) - - coalesced_workflow = _coalesce_leaf_nodes(node_op_map, column_info_objects) - - return coalesced_workflow - - -def create_and_attach_nvt_workflow(input_schema: DataFrameInputSchema, - visualize: typing.Optional[bool] = False) -> DataFrameInputSchema: - """ - Converts an `input_schema` to a `nvt.Workflow` object. - - Parameters - ---------- - input_schema : DataFrameInputSchema - Input schema which specifies how the DataFrame should be processed. - visualize : bool, optional - If True, the resulting workflow graph will be visualized. - Default is False. - - Returns - ------- - nvt.Workflow - A nvt.Workflow object representing the steps specified in the input schema. - - Raises - ------ - ValueError - If the input schema is empty. - - Notes - ----- - First we aggregate all preprocessing steps, which we assume are independent of each other - and can be run in parallel. - - Next we aggregate all column operations, which we assume are independent of each other and - can be run in parallel and pass them the updated schema from the preprocessing steps. - """ - - if (input_schema is None): - input_schema = DataFrameInputSchema() - return input_schema - if (len(input_schema.column_info) == 0): - input_schema.nvt_workflow = None - return input_schema - - # Note(Devin): soft locking problem with nvt operators, skip for now. - # column_info_objects.append( - # JSONFlattenInfo(input_col_names=list(json_cols), - # output_col_names=json_output_cols, - # dtype="str", - # name="json_info")) - - graph = _build_nx_dependency_graph(input_schema.column_info) - - if os.getenv('MORPHEUS_NVT_VIS_DEBUG') is not None: - from matplotlib import pyplot as plt - from networkx.drawing.nx_pydot import graphviz_layout - pos = graphviz_layout(graph, prog='neato') - nx.draw(graph, pos, with_labels=True, font_weight='bold') - plt.show() - - coalesced_workflow = _coalesce_ops(graph, input_schema.column_info) - if (input_schema.row_filter is not None): - # Use our own filter here to preserve any index from the DataFrame - coalesced_workflow = coalesced_workflow >> _MorpheusFilter(f=input_schema.row_filter) - - if (visualize): - coalesced_workflow.graph.render(view=True, format='svg') - - input_schema.nvt_workflow = nvt.Workflow(coalesced_workflow) - - return input_schema diff --git a/morpheus/utils/nvt/transforms.py b/morpheus/utils/nvt/transforms.py deleted file mode 100644 index c8aab33b81..0000000000 --- a/morpheus/utils/nvt/transforms.py +++ /dev/null @@ -1,64 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import typing - -import pandas as pd -from nvtabular import ColumnSelector - -import cudf - - -def json_flatten(col_selector: ColumnSelector, - df: typing.Union[pd.DataFrame, cudf.DataFrame]) -> typing.Union[pd.DataFrame, cudf.DataFrame]: - """ - Flattens JSON columns in the given DataFrame and concatenates them into a single DataFrame. - - Parameters - ---------- - col_selector : ColumnSelector - An instance of ColumnSelector that contains the names of the columns to flatten. - df : Union[pd.DataFrame, cudf.DataFrame] - The input DataFrame that contains the JSON columns to flatten. - - Returns - ------- - Union[pd.DataFrame, cudf.DataFrame] - A new DataFrame with flattened JSON columns. If 'df' was a cudf.DataFrame, - the return type is cudf.DataFrame. Otherwise, it is pd.DataFrame. - """ - convert_to_cudf = False - if isinstance(df, cudf.DataFrame): - convert_to_cudf = True - - # Normalize JSON columns and accumulate into a single dataframe - df_normalized = None - for col in col_selector.names: - pd_series = df[col] if not convert_to_cudf else df[col].to_pandas() - pd_series = pd_series.apply(lambda x: x if isinstance(x, dict) else json.loads(x)) - pdf_norm = pd.json_normalize(pd_series) - pdf_norm.rename(columns=lambda x, col=col: col + "." + x, inplace=True) - pdf_norm.reset_index(drop=True, inplace=True) - - if (df_normalized is None): - df_normalized = pdf_norm - else: - df_normalized = pd.concat([df_normalized, pdf_norm], axis=1) - - # Convert back to cudf if necessary - if convert_to_cudf: - df_normalized = cudf.from_pandas(df_normalized) - - return df_normalized diff --git a/morpheus/utils/schema_transforms.py b/morpheus/utils/schema_transforms.py index c0203d4453..1cf8b65183 100644 --- a/morpheus/utils/schema_transforms.py +++ b/morpheus/utils/schema_transforms.py @@ -13,36 +13,13 @@ # limitations under the License. import logging -import os import typing -import warnings import pandas as pd import cudf from morpheus.utils.column_info import DataFrameInputSchema -from morpheus.utils.column_info import PreparedDFInfo -from morpheus.utils.nvt import patches -from morpheus.utils.nvt.extensions import morpheus_ext -from morpheus.utils.nvt.schema_converters import create_and_attach_nvt_workflow - -with warnings.catch_warnings(): - # Ignore warning regarding tensorflow not being installed - warnings.filterwarnings("ignore", message=".*No module named 'tensorflow'", category=UserWarning) - import nvtabular as nvt - - if os.environ.get("MORPHEUS_IN_SPHINX_BUILD") is None: - # Apply patches to NVT - # TODO(Devin): Can be removed, once numpy mappings are updated in Merlin - # ======================================================================== - patches.patch_numpy_dtype_registry() - # ======================================================================== - - # Add morpheus conversion mappings - # ======================================================================== - morpheus_ext.register_morpheus_extensions() - # ========================================================================= logger = logging.getLogger(__name__) @@ -50,7 +27,7 @@ @typing.overload def process_dataframe( df_in: pd.DataFrame, - input_schema: typing.Union[nvt.Workflow, DataFrameInputSchema], + input_schema: DataFrameInputSchema, ) -> pd.DataFrame: ... @@ -58,14 +35,14 @@ def process_dataframe( @typing.overload def process_dataframe( df_in: cudf.DataFrame, - input_schema: typing.Union[nvt.Workflow, DataFrameInputSchema], + input_schema: DataFrameInputSchema, ) -> cudf.DataFrame: ... def process_dataframe( df_in: typing.Union[pd.DataFrame, cudf.DataFrame], - input_schema: typing.Union[nvt.Workflow, DataFrameInputSchema], + input_schema: DataFrameInputSchema, ) -> typing.Union[pd.DataFrame, cudf.DataFrame]: """ Applies column transformations to the input dataframe as defined by the `input_schema`. @@ -77,10 +54,8 @@ def process_dataframe( ---------- df_in : Union[pd.DataFrame, cudf.DataFrame] The input DataFrame to process. - input_schema : Union[nvt.Workflow, DataFrameInputSchema] + input_schema : Union[DataFrameInputSchema] Defines the transformations to apply to 'df_in'. - If an instance of nvt.Workflow, it is directly used to transform the dataframe. - If an instance of DataFrameInputSchema, it is first converted to an nvt.Workflow, with JSON columns preprocessed if 'json_preproc' attribute is present. Returns @@ -95,63 +70,31 @@ def process_dataframe( If 'df_in' is a pandas DataFrame, it is temporarily converted into a cudf DataFrame for the transformation. """ - convert_to_pd = False - if (isinstance(df_in, pd.DataFrame)): - convert_to_pd = True + output_df = pd.DataFrame() - # If we're given a nvt_schema, we just use it. - nvt_workflow = input_schema - if (isinstance(input_schema, DataFrameInputSchema)): - if (input_schema.nvt_workflow is None): - input_schema = create_and_attach_nvt_workflow(input_schema) + convert_to_cudf = False + if (isinstance(df_in, cudf.DataFrame)): + df_in = df_in.to_pandas() + convert_to_cudf = True - # Note(Devin): pre-flatten to avoid Dask hang when calling json_normalize within an NVT operator - if (input_schema.prep_dataframe is not None): - prepared_df_info: PreparedDFInfo = input_schema.prep_dataframe(df_in) + # Iterate over the column info + for ci in input_schema.column_info: + try: + output_df[ci.name] = ci._process_column(df_in) + except Exception: + logger.exception("Failed to process column '%s'. Dataframe: \n%s", ci.name, df_in, exc_info=True) + raise - nvt_workflow = input_schema.nvt_workflow + if (input_schema.preserve_columns is not None): + # Get the list of remaining columns not already added + df_in_columns = set(df_in.columns) - set(output_df.columns) - preserve_df = None + # Finally, keep any columns that match the preserve filters + match_columns = [y for y in df_in_columns if input_schema.preserve_columns.match(y)] - if prepared_df_info is not None: - df_in = prepared_df_info.df + output_df[match_columns] = df_in[match_columns] - if prepared_df_info.columns_to_preserve: - preserve_df = df_in[prepared_df_info.columns_to_preserve] + if (convert_to_cudf): + return cudf.from_pandas(output_df) - if (convert_to_pd): - df_in = cudf.DataFrame(df_in) - - # NVT will always reset the index, so we need to save it and restore it after the transformation - saved_index = df_in.index - df_in.reset_index(drop=True, inplace=True) - - dataset = nvt.Dataset(df_in) - - if (nvt_workflow is not None): - df_result = nvt_workflow.fit_transform(dataset).to_ddf().compute() - else: - df_result = df_in - - # Now reset the index - if (len(df_result) == len(saved_index)): - df_result.set_index(saved_index, inplace=True) - else: - # Must have done some filtering. Use the new index to index into the old index - df_result.set_index(saved_index.take(df_result.index), inplace=True) - - if (convert_to_pd): - df_result = df_result.to_pandas() - - # Restore preserved columns - if (preserve_df is not None): - # Ensure there is no overlap with columns to preserve - columns_to_merge = set(preserve_df.columns) - set(df_result.columns) - columns_to_merge = list(columns_to_merge) - if (columns_to_merge): - if (convert_to_pd): - df_result = pd.concat([df_result, preserve_df[columns_to_merge]], axis=1) - else: - df_result = cudf.concat([df_result, preserve_df[columns_to_merge]], axis=1) - - return df_result + return output_df diff --git a/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py b/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py index 7540836f1e..19fa6add61 100644 --- a/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py +++ b/tests/examples/digital_fingerprinting/test_dfp_file_to_df.py @@ -105,12 +105,10 @@ def test_constructor(config: Config): @pytest.mark.parametrize('dl_type', ["single_thread", "dask", "dask_thread"]) @pytest.mark.parametrize('use_convert_to_dataframe', [True, False]) @mock.patch('dask.distributed.Client') -@mock.patch('dask_cuda.LocalCUDACluster') +@mock.patch('dask.distributed.LocalCluster') @mock.patch('morpheus.controllers.file_to_df_controller.single_object_to_dataframe') -@mock.patch('morpheus.utils.downloader.Distributed') @mock.patch('morpheus.controllers.file_to_df_controller.process_dataframe') def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicMock, - mock_distributed: mock.MagicMock, mock_obf_to_df: mock.MagicMock, mock_dask_cluster: mock.MagicMock, mock_dask_client: mock.MagicMock, @@ -127,12 +125,6 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM mock_dask_client.__enter__.return_value = mock_dask_client mock_dask_client.__exit__.return_value = False - mock_dist_client = mock.MagicMock() - mock_distributed.return_value = mock_distributed - mock_distributed.client = mock_dist_client - mock_distributed.__enter__.return_value = mock_distributed - mock_distributed.__exit__.return_value = False - expected_hash = hashlib.md5(json.dumps([{ 'ukey': single_file_obj.fs.ukey(single_file_obj.path) }]).encode()).hexdigest() @@ -149,8 +141,7 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM returned_df = dataset_pandas['filter_probs.csv'] mock_proc_df.return_value = returned_df if dl_type.startswith('dask'): - mock_dist_client.map.return_value = [returned_df] - mock_dist_client.gather.return_value = [returned_df] + mock_dask_client.gather.return_value = [returned_df] else: mock_obf_to_df.return_value = returned_df @@ -174,12 +165,11 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM if dl_type.startswith('dask'): mock_dask_client.assert_called_once_with(mock_dask_cluster) - mock_dist_client.map.assert_called_once() - mock_dist_client.gather.assert_called_once() + mock_dask_client.map.assert_called_once() + mock_dask_client.gather.assert_called_once() else: mock_dask_cluster.assert_not_called() - mock_dist_client.map.assert_not_called() - mock_dist_client.gather.assert_not_called() + mock_dask_client.assert_not_called() dataset_pandas.assert_df_equal(output_df, expected_df) @@ -194,7 +184,7 @@ def test_get_or_create_dataframe_from_batch_cache_miss(mock_proc_df: mock.MagicM @pytest.mark.parametrize('use_convert_to_dataframe', [True, False]) @mock.patch('dask.config') @mock.patch('dask.distributed.Client') -@mock.patch('dask_cuda.LocalCUDACluster') +@mock.patch('dask.distributed.LocalCluster') @mock.patch('morpheus.controllers.file_to_df_controller.single_object_to_dataframe') def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.MagicMock, mock_dask_cluster: mock.MagicMock, @@ -253,7 +243,7 @@ def test_get_or_create_dataframe_from_batch_cache_hit(mock_obf_to_df: mock.Magic @pytest.mark.parametrize('use_convert_to_dataframe', [True, False]) @mock.patch('dask.config') @mock.patch('dask.distributed.Client') -@mock.patch('dask_cuda.LocalCUDACluster') +@mock.patch('dask.distributed.LocalCluster') @mock.patch('morpheus.controllers.file_to_df_controller.single_object_to_dataframe') def test_get_or_create_dataframe_from_batch_none_noop(mock_obf_to_df: mock.MagicMock, mock_dask_cluster: mock.MagicMock, diff --git a/tests/test_column_info.py b/tests/test_column_info.py index 4cd71a9804..c40e7854ac 100644 --- a/tests/test_column_info.py +++ b/tests/test_column_info.py @@ -35,7 +35,6 @@ from morpheus.utils.column_info import RenameColumn from morpheus.utils.column_info import StringCatColumn from morpheus.utils.column_info import StringJoinColumn -from morpheus.utils.nvt.schema_converters import create_and_attach_nvt_workflow from morpheus.utils.schema_transforms import process_dataframe @@ -60,68 +59,6 @@ def azure_ad_logs_cdf_fixture(_azure_ad_logs_pdf: pd.DataFrame): yield cudf.from_pandas(_azure_ad_logs_pdf) -@pytest.mark.use_python -def test_dataframe_input_schema_with_json_cols(azure_ad_logs_cdf: cudf.DataFrame): - raw_data_columns = [ - 'time', - 'resourceId', - 'operationName', - 'operationVersion', - 'category', - 'tenantId', - 'resultType', - 'resultSignature', - 'resultDescription', - 'durationMs', - 'callerIpAddress', - 'correlationId', - 'identity', - 'Level', - 'location', - 'properties' - ] - - assert len(azure_ad_logs_cdf.columns) == 16 - assert list(azure_ad_logs_cdf.columns) == raw_data_columns - - column_info = [ - DateTimeColumn(name="timestamp", dtype='datetime64[ns]', input_name="time"), - RenameColumn(name="userId", dtype='str', input_name="properties.userPrincipalName"), - RenameColumn(name="appDisplayName", dtype='str', input_name="properties.appDisplayName"), - ColumnInfo(name="category", dtype='str'), - RenameColumn(name="clientAppUsed", dtype='str', input_name="properties.clientAppUsed"), - RenameColumn(name="deviceDetailbrowser", dtype='str', input_name="properties.deviceDetail.browser"), - RenameColumn(name="deviceDetaildisplayName", dtype='str', input_name="properties.deviceDetail.displayName"), - RenameColumn(name="deviceDetailoperatingSystem", - dtype='str', - input_name="properties.deviceDetail.operatingSystem"), - StringCatColumn(name="location", - dtype='str', - input_columns=[ - "properties.location.city", - "properties.location.countryOrRegion", - ], - sep=", "), - RenameColumn(name="statusfailureReason", dtype='str', input_name="properties.status.failureReason"), - ] - - schema = DataFrameInputSchema(json_columns=["properties"], column_info=column_info) - - df_processed_schema = process_dataframe(azure_ad_logs_cdf, schema) - processed_df_cols = df_processed_schema.columns - - assert len(azure_ad_logs_cdf) == len(df_processed_schema) - assert len(processed_df_cols) == len(column_info) - assert "timestamp" in processed_df_cols - assert "userId" in processed_df_cols - assert "time" not in processed_df_cols - assert "properties.userPrincipalName" not in processed_df_cols - - nvt_workflow = create_and_attach_nvt_workflow(schema) - df_processed_workflow = process_dataframe(azure_ad_logs_cdf, nvt_workflow) - assert df_processed_schema.equals(df_processed_workflow) - - @pytest.mark.use_python def test_dataframe_input_schema_without_json_cols(azure_ad_logs_pdf: pd.DataFrame): assert len(azure_ad_logs_pdf.columns) == 16 diff --git a/tests/test_downloader.py b/tests/test_downloader.py index 4534b88cbb..451c6cde64 100644 --- a/tests/test_downloader.py +++ b/tests/test_downloader.py @@ -38,14 +38,6 @@ def dask_distributed(fail_missing: bool): fail_missing=fail_missing) -@pytest.fixture(autouse=True, scope='session') -def dask_cuda(fail_missing: bool): - """ - Mark tests requiring dask_cuda - """ - yield import_or_skip("dask_cuda", reason="Downloader requires dask_cuda", fail_missing=fail_missing) - - @pytest.mark.usefixtures("restore_environ") @pytest.mark.parametrize('use_env', [True, False]) @pytest.mark.parametrize('dl_method', ["single_thread", "dask", "dask_thread"]) @@ -90,7 +82,7 @@ def test_constructor_invalid_dltype(use_env: bool): @pytest.mark.reload_modules(morpheus.utils.downloader) @pytest.mark.parametrize("dl_method", ["dask", "dask_thread"]) @pytest.mark.usefixtures("reload_modules") -@mock.patch('dask_cuda.LocalCUDACluster') +@mock.patch('dask.distributed.LocalCluster') def test_get_dask_cluster(mock_dask_cluster: mock.MagicMock, dl_method: str): mock_dask_cluster.return_value = mock_dask_cluster downloader1 = Downloader(download_method=dl_method) @@ -107,7 +99,7 @@ def test_get_dask_cluster(mock_dask_cluster: mock.MagicMock, dl_method: str): @pytest.mark.reload_modules(morpheus.utils.downloader) @pytest.mark.parametrize('dl_method', ["dask", "dask_thread"]) @pytest.mark.usefixtures("reload_modules") -@mock.patch('dask_cuda.LocalCUDACluster') +@mock.patch('dask.distributed.LocalCluster') def test_close(mock_dask_cluster: mock.MagicMock, dl_method: str): mock_dask_cluster.return_value = mock_dask_cluster downloader = Downloader(download_method=dl_method) @@ -117,7 +109,7 @@ def test_close(mock_dask_cluster: mock.MagicMock, dl_method: str): downloader.close() -@mock.patch('dask_cuda.LocalCUDACluster') +@mock.patch('dask.distributed.LocalCluster') @pytest.mark.parametrize('dl_method', ["single_thread"]) def test_close_noop(mock_dask_cluster: mock.MagicMock, dl_method: str): mock_dask_cluster.return_value = mock_dask_cluster @@ -135,7 +127,7 @@ def test_close_noop(mock_dask_cluster: mock.MagicMock, dl_method: str): @pytest.mark.parametrize('dl_method', ["single_thread", "dask", "dask_thread"]) @mock.patch('dask.config') @mock.patch('dask.distributed.Client') -@mock.patch('dask_cuda.LocalCUDACluster') +@mock.patch('dask.distributed.LocalCluster') def test_download(mock_dask_cluster: mock.MagicMock, mock_dask_client: mock.MagicMock, mock_dask_config: mock.MagicMock, @@ -161,7 +153,6 @@ def test_download(mock_dask_cluster: mock.MagicMock, downloader = Downloader(download_method=dl_method) results = downloader.download(download_buckets, download_fn) - assert results == [returnd_df for _ in range(num_buckets)] if dl_method == "single_thread": download_fn.assert_has_calls([mock.call(bucket) for bucket in download_buckets]) @@ -177,6 +168,8 @@ def test_download(mock_dask_cluster: mock.MagicMock, mock_dask_client.assert_not_called() mock_dask_config.assert_not_called() + assert results == [returnd_df for _ in range(num_buckets)] + @pytest.mark.usefixtures("restore_environ") @pytest.mark.parametrize('use_env', [True, False]) diff --git a/tests/utils/nvt/__init__.py b/tests/utils/nvt/__init__.py deleted file mode 100644 index 66061e580b..0000000000 --- a/tests/utils/nvt/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tests/utils/nvt/integration/__init__.py b/tests/utils/nvt/integration/__init__.py deleted file mode 100644 index 66061e580b..0000000000 --- a/tests/utils/nvt/integration/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/tests/utils/nvt/integration/test_mutate_op.py b/tests/utils/nvt/integration/test_mutate_op.py deleted file mode 100644 index ce0808681b..0000000000 --- a/tests/utils/nvt/integration/test_mutate_op.py +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import typing - -import pandas as pd -import pytest -from merlin.dag import ColumnSelector - -import cudf - -from morpheus.utils.nvt.mutate import MutateOp -from morpheus.utils.nvt.transforms import json_flatten - - -@pytest.fixture(name="json_data") -def json_data_fixture(): - yield [ - '{"key1": "value1", "key2": {"subkey1": "subvalue1", "subkey2": "subvalue2"}}', - '{"key1": "value2", "key2": {"subkey1": "subvalue3", "subkey2": "subvalue4"}}', - '{"key1": "value3", "key2": {"subkey1": "subvalue5", "subkey2": "subvalue6"}}' - ] - - -@pytest.fixture(name="expected_pdf") -def expected_pdf_fixture(): - yield pd.DataFrame({ - 'col1.key1': ['value1', 'value2', 'value3'], - 'col1.key2.subkey1': ['subvalue1', 'subvalue3', 'subvalue5'], - 'col1.key2.subkey2': ['subvalue2', 'subvalue4', 'subvalue6'] - }) - - -def test_integration_pandas(json_data: typing.List[str], expected_pdf: pd.DataFrame): - pdf = pd.DataFrame({'col1': json_data}) - col_selector = ColumnSelector(['col1']) - - nvt_op = MutateOp(json_flatten, [("col1.key1", "object"), ("col1.key2.subkey1", "object"), - ("col1.key2.subkey2", "object")]) - result_pdf = nvt_op.transform(col_selector, pdf) - - assert result_pdf.equals(expected_pdf), "Integration test with pandas DataFrame failed" - - -def test_integration_cudf(json_data: typing.List[str], expected_pdf: pd.DataFrame): - cdf = cudf.DataFrame({'col1': json_data}) - col_selector = ColumnSelector(['col1']) - - nvt_op = MutateOp(json_flatten, [("col1.key1", "object"), ("col1.key2.subkey1", "object"), - ("col1.key2.subkey2", "object")]) - result_cdf = nvt_op.transform(col_selector, cdf) - result_pdf = result_cdf.to_pandas() - - assert result_pdf.equals(expected_pdf), "Integration test with cuDF DataFrame failed" diff --git a/tests/utils/nvt/test_json_flatten_transform.py b/tests/utils/nvt/test_json_flatten_transform.py deleted file mode 100644 index e0657925f5..0000000000 --- a/tests/utils/nvt/test_json_flatten_transform.py +++ /dev/null @@ -1,68 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import warnings - -import pandas as pd -import pytest - -with warnings.catch_warnings(): - # Ignore warning regarding tensorflow not being installed - warnings.filterwarnings("ignore", message=".*No module named 'tensorflow'", category=UserWarning) - from nvtabular.ops.operator import ColumnSelector - -import cudf - -from morpheus.utils.nvt.transforms import json_flatten - - -@pytest.fixture(name="data") -def data_fixture(): - yield { - "id": [1, 2], - "info": [ - '{"name": "John", "age": 30, "city": "New York"}', '{"name": "Jane", "age": 28, "city": "San Francisco"}' - ] - } - - -def test_json_flatten_pandas(data: dict): - df = pd.DataFrame(data) - col_selector = ColumnSelector(["info"]) - result = json_flatten(col_selector, df) - - expected_data = {"info.name": ["John", "Jane"], "info.age": [30, 28], "info.city": ["New York", "San Francisco"]} - expected_df = pd.DataFrame(expected_data) - - pd.testing.assert_frame_equal(result, expected_df) - - -def test_json_flatten_cudf(data: dict): - df = cudf.DataFrame(data) - col_selector = ColumnSelector(["info"]) - result = json_flatten(col_selector, df) - - expected_data = { - "id": [1, 2], "info.name": ["John", "Jane"], "info.age": [30, 28], "info.city": ["New York", "San Francisco"] - } - expected_df = cudf.DataFrame(expected_data) - - assert_frame_equal(result, expected_df) - - -def assert_frame_equal(df1, df2): - assert len(df1) == len(df2), "DataFrames have different lengths" - for col in df1.columns: - assert col in df2, f"Column {col} not found in the second DataFrame" - assert (df1[col] == df2[col]).all(), f"Column {col} values do not match" diff --git a/tests/utils/nvt/test_mutate_op.py b/tests/utils/nvt/test_mutate_op.py deleted file mode 100644 index 3023d9701e..0000000000 --- a/tests/utils/nvt/test_mutate_op.py +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import warnings - -import numpy as np -import pandas as pd -import pytest - -with warnings.catch_warnings(): - # Ignore warning regarding tensorflow not being installed - warnings.filterwarnings("ignore", message=".*No module named 'tensorflow'", category=UserWarning) - from merlin.core.dispatch import DataFrameType - from merlin.schema import ColumnSchema - from merlin.schema import Schema - from nvtabular.ops.operator import ColumnSelector - -from morpheus.utils.nvt.mutate import MutateOp - - -@pytest.fixture(name="df") -def df_fixture(): - yield pd.DataFrame({'A': [1, 2, 3], 'B': [4, 5, 6], 'C': [7, 8, 9]}) - - -def example_transform(col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType: - selected_columns = col_selector.names - for col in selected_columns: - df[col + '_new'] = df[col] * 2 - return df - - -def test_transform(df: DataFrameType): - nvt_op = MutateOp(example_transform, output_columns=[('A_new', np.dtype('int64')), ('B_new', np.dtype('int64'))]) - col_selector = ColumnSelector(['A', 'B']) - transformed_df = nvt_op.transform(col_selector, df) - - expected_df = df.copy() - expected_df['A_new'] = df['A'] * 2 - expected_df['B_new'] = df['B'] * 2 - - assert transformed_df.equals(expected_df), "Test transform failed" - - -# Test for lambda function transformation -def test_transform_lambda(df: DataFrameType): - nvt_op = MutateOp(lambda col_selector, - df: df.assign(**{f"{col}_new": df[col] * 2 - for col in col_selector.names}), - output_columns=[('A_new', np.dtype('int64')), ('B_new', np.dtype('int64'))]) - col_selector = ColumnSelector(['A', 'B']) - transformed_df = nvt_op.transform(col_selector, df) - - expected_df = df.copy() - expected_df['A_new'] = df['A'] * 2 - expected_df['B_new'] = df['B'] * 2 - - assert transformed_df.equals(expected_df), "Test transform with lambda failed" - - -def test_transform_additional_columns(df: DataFrameType): - - def additional_transform(col_selector: ColumnSelector, df: DataFrameType) -> DataFrameType: - selected_columns = col_selector.names - for col in selected_columns: - df[col + '_new'] = df[col] * 2 - df['D'] = df['A'] + df['B'] - return df - - nvt_op = MutateOp(additional_transform, - output_columns=[('A_new', np.dtype('int64')), ('B_new', np.dtype('int64')), - ('D', np.dtype('int64'))]) - col_selector = ColumnSelector(['A', 'B']) - transformed_df = nvt_op.transform(col_selector, df) - - expected_df = df.copy() - expected_df['A_new'] = df['A'] * 2 - expected_df['B_new'] = df['B'] * 2 - expected_df['D'] = df['A'] + df['B'] - - assert transformed_df.equals(expected_df), "Test transform with additional columns failed" - - -def test_column_mapping(): - nvt_op = MutateOp(example_transform, output_columns=[('A_new', np.dtype('int64')), ('B_new', np.dtype('int64'))]) - col_selector = ColumnSelector(['A', 'B']) - column_mapping = nvt_op.column_mapping(col_selector) - - expected_mapping = {'A_new': ['A', 'B'], 'B_new': ['A', 'B']} - - assert column_mapping == expected_mapping, "Test column mapping failed" - - -def test_compute_output_schema(): - nvt_op = MutateOp(example_transform, output_columns=[('A_new', np.dtype('int64')), ('B_new', np.dtype('int64'))]) - col_selector = ColumnSelector(['A', 'B']) - - input_schema = Schema([ - ColumnSchema('A', dtype=np.dtype('int64')), - ColumnSchema('B', dtype=np.dtype('int64')), - ColumnSchema('C', dtype=np.dtype('int64')) - ]) - - output_schema = nvt_op.compute_output_schema(input_schema, col_selector) - - expected_schema = Schema( - [ColumnSchema('A_new', dtype=np.dtype('int64')), ColumnSchema('B_new', dtype=np.dtype('int64'))]) - - assert str(output_schema) == str(expected_schema), "Test compute output schema failed" diff --git a/tests/utils/nvt/test_schema_converters.py b/tests/utils/nvt/test_schema_converters.py deleted file mode 100644 index 9b00440d1a..0000000000 --- a/tests/utils/nvt/test_schema_converters.py +++ /dev/null @@ -1,661 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import json -import warnings - -with warnings.catch_warnings(): - # Ignore warning regarding tensorflow not being installed - warnings.filterwarnings("ignore", message=".*No module named 'tensorflow'", category=UserWarning) - import nvtabular as nvt - -import pandas as pd -import pytest - -import cudf - -from morpheus.utils.column_info import BoolColumn -from morpheus.utils.column_info import ColumnInfo -from morpheus.utils.column_info import DataFrameInputSchema -from morpheus.utils.column_info import DateTimeColumn -from morpheus.utils.column_info import DistinctIncrementColumn -from morpheus.utils.column_info import IncrementColumn -from morpheus.utils.column_info import PreparedDFInfo -from morpheus.utils.column_info import RenameColumn -from morpheus.utils.column_info import StringCatColumn -from morpheus.utils.column_info import StringJoinColumn -from morpheus.utils.column_info import _resolve_json_output_columns -from morpheus.utils.nvt.schema_converters import JSONFlattenInfo -from morpheus.utils.nvt.schema_converters import _bfs_traversal_with_op_map -from morpheus.utils.nvt.schema_converters import _build_nx_dependency_graph -from morpheus.utils.nvt.schema_converters import _coalesce_leaf_nodes -from morpheus.utils.nvt.schema_converters import _get_ci_column_selector -from morpheus.utils.nvt.schema_converters import create_and_attach_nvt_workflow -from morpheus.utils.nvt.schema_converters import sync_df_as_pandas -from morpheus.utils.schema_transforms import process_dataframe - -source_column_info = [ - BoolColumn(name="result", - dtype="bool", - input_name="result", - true_values=["success", "SUCCESS"], - false_values=["denied", "Denied", "DENIED", "FRAUD"]), - ColumnInfo(name="reason", dtype=str), - DateTimeColumn(name="timestamp", dtype="datetime64[us]", input_name="timestamp"), - StringCatColumn( - name="location", - dtype="str", - input_columns=["access_device.location.city", "access_device.location.state", "access_device.location.country"], - sep=", "), - RenameColumn(name="authdevicename", dtype="str", input_name="auth_device.name"), - RenameColumn(name="username", dtype="str", input_name="user.name"), - RenameColumn(name="accessdevicebrowser", dtype="str", input_name="access_device.browser"), - RenameColumn(name="accessdeviceos", dtype="str", input_name="access_device.os"), -] - - -def create_test_dataframe(): - return pd.DataFrame({ - "access_device": [ - '{"browser": "Firefox", "os": "Linux", "location": ' - '{"city": "San Francisco", "state": "CA", "country": "USA"}}' - ], - "application": ['{"name": "AnotherApp"}'], - "auth_device": ['{"name": "Device2"}'], - "user": ['{"name": "Jane Smith"}'], - "timestamp": [pd.Timestamp("2021-02-02 12:00:00")], - "result": ["denied"], - "reason": ["Denied"] - }) - - -def test_sync_df_as_pandas_pd_dataframe(): - - @sync_df_as_pandas() - def test_func(df: pd.DataFrame, value: int) -> pd.DataFrame: - df['test_col'] = df['test_col'] * value - return df - - df = pd.DataFrame({'test_col': [1, 2, 3]}) - result = test_func(df, value=2) - expected = pd.DataFrame({'test_col': [2, 4, 6]}) - pd.testing.assert_frame_equal(result, expected) - - -def test_sync_df_as_pandas_cudf_dataframe(): - - @sync_df_as_pandas() - def test_func(df: pd.DataFrame, value: int) -> pd.DataFrame: - df['test_col'] = df['test_col'] * value - return df - - df = cudf.DataFrame({'test_col': [1, 2, 3]}) - result = test_func(df, value=2) - expected = cudf.DataFrame({'test_col': [2, 4, 6]}) - cudf.testing.assert_frame_equal(result, expected) - - -def test_json_flatten_info_init(): - col_info = JSONFlattenInfo(name="json_info", - dtype="str", - input_col_names=["json_col1.a", "json_col2.b"], - output_col_names=["json_output_col1", "json_output_col2"]) - assert col_info.name == "json_info" - assert col_info.dtype == "str" - assert col_info.input_col_names == ["json_col1.a", "json_col2.b"] - assert col_info.output_col_names == ["json_output_col1", "json_output_col2"] - - -def test_json_flatten_info_init_missing_input_col_names(): - with pytest.raises(TypeError): - # pylint: disable=no-value-for-parameter - # pylint: disable=unused-variable - col_info = JSONFlattenInfo( # noqa F841 - name="json_info", dtype="str", output_col_names=["json_output_col1", "json_output_col2"]) - - -def test_json_flatten_info_init_missing_output_col_names(): - with pytest.raises(TypeError): - # pylint: disable=no-value-for-parameter - # pylint: disable=unused-variable - col_info = JSONFlattenInfo( # noqa F841 - name="json_info", dtype="str", input_col_names=["json_col1.a", "json_col2.b"]) - - -def test_get_ci_column_selector_rename_column(): - col_info = RenameColumn(input_name="original_name", name="new_name", dtype="str") - result = _get_ci_column_selector(col_info) - assert result == ["original_name"] - - -def test_get_ci_column_selector_bool_column(): - col_info = BoolColumn(input_name="original_name", - name="new_name", - dtype="bool", - true_values=["True"], - false_values=["False"]) - result = _get_ci_column_selector(col_info) - assert result == ["original_name"] - - -def test_get_ci_column_selector_datetime_column(): - col_info = DateTimeColumn(input_name="original_name", name="new_name", dtype="datetime64[ns]") - result = _get_ci_column_selector(col_info) - assert result == ["original_name"] - - -def test_get_ci_column_selector_string_join_column(): - col_info = StringJoinColumn(input_name="original_name", name="new_name", dtype="str", sep=",") - result = _get_ci_column_selector(col_info) - assert result == ["original_name"] - - -def test_get_ci_column_selector_increment_column(): - col_info = IncrementColumn(input_name="original_name", - name="new_name", - dtype="datetime64[ns]", - groupby_column="groupby_col") - result = _get_ci_column_selector(col_info) - assert result == ["original_name", "groupby_col"] - - -def test_get_ci_column_selector_distinct_increment_column(): - col_info = DistinctIncrementColumn(input_name="original_name", - name="new_name", - dtype="datetime64[ns]", - groupby_column="groupby_col", - timestamp_column="timestamp_col") - result = _get_ci_column_selector(col_info) - assert result == ["original_name", "groupby_col", "timestamp_col"] - - -def test_get_ci_column_selector_string_cat_column(): - col_info = StringCatColumn(name="new_name", dtype="str", input_columns=["col1", "col2"], sep=", ") - result = _get_ci_column_selector(col_info) - assert result == ["col1", "col2"] - - -def test_get_ci_column_selector_json_flatten_info(): - col_info = JSONFlattenInfo(name="json_info", - dtype="str", - input_col_names=["json_col1.a", "json_col2.b"], - output_col_names=["json_col1_a", "json_col2_b"]) - result = _get_ci_column_selector(col_info) - assert result == ["json_col1.a", "json_col2.b"] - - -def test_resolve_json_output_columns(): - input_schema = DataFrameInputSchema(json_columns=["json_col"], - column_info=[ - BoolColumn(input_name="bool_col", - name="bool_col", - dtype="bool", - true_values=["True"], - false_values=["False"]), - DateTimeColumn(input_name="datetime_col", - name="datetime_col", - dtype="datetime64[ns]"), - RenameColumn(input_name="json_col.a", name="new_rename_col", dtype="str"), - StringCatColumn(name="new_str_cat_col", - dtype="str", - input_columns=["A", "B"], - sep=", "), - ]) - - output_cols = _resolve_json_output_columns(input_schema.json_columns, input_schema.input_columns) - expected_output_cols = [ - ("json_col.a", "str"), - ] - assert output_cols == expected_output_cols - - -def test_resolve_json_output_columns_empty_input_schema(): - input_schema = DataFrameInputSchema() - output_cols = _resolve_json_output_columns(input_schema.json_columns, input_schema.input_columns) - assert not output_cols - - -def test_resolve_json_output_columns_no_json_columns(): - input_schema = DataFrameInputSchema( - column_info=[ColumnInfo(name="column1", dtype="int"), ColumnInfo(name="column2", dtype="str")]) - output_cols = _resolve_json_output_columns(input_schema.json_columns, input_schema.input_columns) - assert not output_cols - - -def test_resolve_json_output_columns_with_json_columns(): - input_schema = DataFrameInputSchema(json_columns=["json_col"], - column_info=[ - ColumnInfo(name="json_col.a", dtype="str"), - ColumnInfo(name="json_col.b", dtype="int"), - ColumnInfo(name="column3", dtype="float") - ]) - output_cols = _resolve_json_output_columns(input_schema.json_columns, input_schema.input_columns) - assert output_cols == [("json_col.a", "str"), ("json_col.b", "int")] - - -def test_resolve_json_output_columns_with_complex_schema(): - input_schema = DataFrameInputSchema(json_columns=["json_col"], - column_info=[ - ColumnInfo(name="json_col.a", dtype="str"), - ColumnInfo(name="json_col.b", dtype="int"), - ColumnInfo(name="column3", dtype="float"), - RenameColumn(name="new_column", dtype="str", input_name="column4") - ]) - output_cols = _resolve_json_output_columns(input_schema.json_columns, input_schema.input_columns) - assert output_cols == [("json_col.a", "str"), ("json_col.b", "int")] - - -def test_bfs_traversal_with_op_map(): - input_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], - column_info=source_column_info) - - column_info_objects = list(input_schema.column_info) - column_info_map = {col_info.name: col_info for col_info in column_info_objects} - graph = _build_nx_dependency_graph(column_info_objects) - root_nodes = [node for node, in_degree in graph.in_degree() if in_degree == 0] - visited, node_op_map = _bfs_traversal_with_op_map(graph, column_info_map, root_nodes) - - # Check if all nodes have been visited - assert len(visited) == len(column_info_map) - - # Check if node_op_map is constructed for all nodes - assert len(node_op_map) == len(column_info_map) - - -def test_coalesce_leaf_nodes(): - input_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], - column_info=source_column_info) - - column_info_objects = list(input_schema.column_info) - column_info_map = {col_info.name: col_info for col_info in column_info_objects} - graph = _build_nx_dependency_graph(column_info_objects) - root_nodes = [node for node, in_degree in graph.in_degree() if in_degree == 0] - - # Call bfs_traversal_with_op_map() and coalesce_leaf_nodes() - _, node_op_map = _bfs_traversal_with_op_map(graph, column_info_map, root_nodes) - coalesced_workflow = _coalesce_leaf_nodes(node_op_map, column_info_objects) - - # Check if the coalesced workflow is not None - assert coalesced_workflow is not None - - # Extract the leaf nodes from the coalesced workflow - leaf_nodes = [] - for node, _ in node_op_map.items(): - neighbors = list(graph.neighbors(node)) - if len(neighbors) == 0: - leaf_nodes.append(node) - - # Define the expected leaf node names - expected_leaf_node_names = [ - "result", - "reason", - "timestamp", - "location", - "authdevicename", - "username", - "accessdevicebrowser", - "accessdeviceos", - ] - - # Compare the expected leaf node names with the actual leaf node names - assert set(leaf_nodes) == set(expected_leaf_node_names) - - -def test_input_schema_conversion_empty_schema(): - empty_schema = DataFrameInputSchema() - - # pylint: disable=unused-variable - empty_schema = create_and_attach_nvt_workflow(empty_schema) # noqa - - -def test_input_schema_conversion_additional_column(): - additional_columns = [ - RenameColumn(name="appname", dtype="str", input_name="application.name"), - ] - - modified_source_column_info = source_column_info + additional_columns - - modified_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], - column_info=modified_source_column_info) - test_df = create_test_dataframe() - - output_df = process_dataframe(test_df, modified_schema) - - expected_df = pd.DataFrame({ - "result": [False], - "reason": ["Denied"], - "timestamp": [pd.Timestamp("2021-02-02 12:00:00")], - "location": ["San Francisco, CA, USA"], - "authdevicename": ["Device2"], - "username": ["Jane Smith"], - "accessdevicebrowser": ["Firefox"], - "accessdeviceos": ["Linux"], - "appname": ["AnotherApp"] - }) - - pd.testing.assert_frame_equal(output_df, expected_df) - - -def test_input_schema_conversion_interdependent_columns(): - additional_column_1 = StringCatColumn(name="fullname", - dtype="str", - input_columns=["user.firstname", "user.lastname"], - sep=" ") - additional_column_2 = StringCatColumn(name="appinfo", - dtype="str", - input_columns=["application.name", "application.version"], - sep="-") - - modified_source_column_info = source_column_info + [additional_column_1, additional_column_2] - - modified_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], - column_info=modified_source_column_info) - - test_df = create_test_dataframe() - test_df["user"] = ['{"firstname": "Jane", "lastname": "Smith", "name": "Jane Smith"}'] - test_df["application"] = ['{"name": "AnotherApp", "version": "1.0"}'] - - modified_schema = create_and_attach_nvt_workflow(modified_schema) - prepared_df_info: PreparedDFInfo = modified_schema.prep_dataframe(test_df) - dataset = nvt.Dataset(prepared_df_info.df) - output_df = modified_schema.nvt_workflow.transform(dataset).to_ddf().compute().to_pandas() - - expected_df = pd.DataFrame({ - "result": [False], - "reason": ["Denied"], - "timestamp": [pd.Timestamp("2021-02-02 12:00:00")], - "location": ["San Francisco, CA, USA"], - "authdevicename": ["Device2"], - "username": ["Jane Smith"], - "accessdevicebrowser": ["Firefox"], - "accessdeviceos": ["Linux"], - "fullname": ["Jane Smith"], - "appinfo": ["AnotherApp-1.0"] - }) - - pd.testing.assert_frame_equal(output_df, expected_df) - - -def test_input_schema_conversion_nested_operations(): - app_column = ColumnInfo(name="application.name", dtype="str") - additional_column = StringCatColumn(name="appname", - dtype="str", - input_columns=["application.name", "appsuffix"], - sep="") - modified_source_column_info = source_column_info + [additional_column, app_column] - - modified_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], - column_info=modified_source_column_info) - - test_df = create_test_dataframe() - test_df["appsuffix"] = ["_v1"] - - # Add the 'appsuffix' column to the schema - modified_schema.column_info.append(ColumnInfo(name="appsuffix", dtype="str")) - - modified_schema = create_and_attach_nvt_workflow(modified_schema) - prepared_df_info: PreparedDFInfo = modified_schema.prep_dataframe(test_df) - dataset = nvt.Dataset(prepared_df_info.df) - output_df = modified_schema.nvt_workflow.transform(dataset).to_ddf().compute().to_pandas() - - expected_df = pd.DataFrame({ - "result": [False], - "reason": ["Denied"], - "timestamp": [pd.Timestamp("2021-02-02 12:00:00")], - "location": ["San Francisco, CA, USA"], - "authdevicename": ["Device2"], - "username": ["Jane Smith"], - "accessdevicebrowser": ["Firefox"], - "accessdeviceos": ["Linux"], - "appname": ["AnotherApp_v1"], - "application.name": ["AnotherApp"], - "appsuffix": ["_v1"] - }) - - pd.testing.assert_frame_equal(output_df, expected_df) - - -def test_input_schema_conversion_root_schema_parent_schema_mix_operations(): - additional_column_1 = StringCatColumn(name="rootcat", - dtype="str", - input_columns=["lhs_top_level", "rhs_top_level"], - sep="-") - additional_column_2 = RenameColumn(name="rhs_top_level", dtype="str", input_name="rhs_top_level_pre") - additional_column_3 = ColumnInfo(name="lhs_top_level", dtype="str") - modified_source_column_info = [additional_column_1, additional_column_2, additional_column_3] - - modified_schema = DataFrameInputSchema(json_columns=[], column_info=modified_source_column_info) - - test_df = create_test_dataframe() - test_df["lhs_top_level"] = ["lhs"] - test_df["rhs_top_level_pre"] = ["rhs"] - - modified_schema = create_and_attach_nvt_workflow(modified_schema) - dataset = nvt.Dataset(test_df) - output_df = modified_schema.nvt_workflow.transform(dataset).to_ddf().compute().to_pandas() - - expected_df = pd.DataFrame({ - "rootcat": ["lhs-rhs"], - "rhs_top_level": ["rhs"], - "lhs_top_level": ["lhs"], - }) - - pd.testing.assert_frame_equal(output_df, expected_df) - - -def test_input_schema_conversion_preserve_column(): - additional_column_1 = StringCatColumn(name="rootcat", - dtype="str", - input_columns=["lhs_top_level", "rhs_top_level"], - sep="-") - additional_column_2 = RenameColumn(name="rhs_top_level", dtype="str", input_name="rhs_top_level_pre") - additional_column_3 = ColumnInfo(name="lhs_top_level", dtype="str") - modified_source_column_info = [additional_column_1, additional_column_2, additional_column_3] - - modified_schema = DataFrameInputSchema(json_columns=[], - column_info=modified_source_column_info, - preserve_columns=["to_preserve"]) - - test_df = create_test_dataframe() - test_df["lhs_top_level"] = ["lhs"] - test_df["rhs_top_level_pre"] = ["rhs"] - test_df["to_preserve"] = ["preserve me"] - - modified_schema = create_and_attach_nvt_workflow(modified_schema) - dataset = nvt.Dataset(test_df) - output_df = modified_schema.nvt_workflow.transform(dataset).to_ddf().compute().to_pandas() - - # See issue #1074. This should include the `to_preserve` column, but it doesn't. - expected_df = pd.DataFrame({ - "rootcat": ["lhs-rhs"], - "rhs_top_level": ["rhs"], - "lhs_top_level": ["lhs"], # "to_preserve": ["preserve me"], - }) - - pd.testing.assert_frame_equal(output_df, expected_df) - - -# Test the conversion of a DataFrameInputSchema to an nvt.Workflow -def test_input_schema_conversion(): - # Create a DataFrameInputSchema instance with the example schema provided - example_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], - column_info=source_column_info) - - # Create a test dataframe with data according to the schema - test_df = pd.DataFrame({ - "access_device": [ - '{"browser": "Chrome", "os": "Windows", "location": {"city": "New York", "state": "NY", "country": "USA"}}' - ], - "application": ['{"name": "TestApp"}'], - "auth_device": ['{"name": "Device1"}'], - "user": ['{"name": "John Doe"}'], - "timestamp": [pd.Timestamp("2021-01-01 00:00:00")], - "result": ["SUCCESS"], - "reason": ["Authorized"] - }) - - # Call `input_schema_to_nvt_workflow` with the created instance - modified_schema = create_and_attach_nvt_workflow(example_schema) - - # Apply the returned nvt.Workflow to the test dataframe - prepared_df_info: PreparedDFInfo = modified_schema.prep_dataframe(test_df) - dataset = nvt.Dataset(prepared_df_info.df) - output_df = modified_schema.nvt_workflow.transform(dataset).to_ddf().compute().to_pandas() - - # Check if the output dataframe has the expected schema and values - expected_df = pd.DataFrame({ - "result": [True], - "reason": ["Authorized"], - "timestamp": [pd.Timestamp("2021-01-01 00:00:00")], - "location": ["New York, NY, USA"], - "authdevicename": ["Device1"], - "username": ["John Doe"], - "accessdevicebrowser": ["Chrome"], - "accessdeviceos": ["Windows"], - }) - - pd.set_option('display.max_columns', None) - pd.testing.assert_frame_equal(output_df, expected_df) - - -def test_input_schema_conversion_with_trivial_filter(): - # Create a DataFrameInputSchema instance with the example schema provided - example_schema = DataFrameInputSchema(json_columns=["access_device", "application", "auth_device", "user"], - column_info=source_column_info, - row_filter=lambda df: df) - - # Create a test dataframe with data according to the schema - test_df = pd.DataFrame({ - "access_device": [ - '{"browser": "Chrome", "os": "Windows", "location": {"city": "New York", "state": "NY", "country": "USA"}}' - ], - "application": ['{"name": "TestApp"}'], - "auth_device": ['{"name": "Device1"}'], - "user": ['{"name": "John Doe"}'], - "timestamp": [pd.Timestamp("2021-01-01 00:00:00")], - "result": ["SUCCESS"], - "reason": ["Authorized"] - }) - - output_df = process_dataframe(test_df, example_schema) - - # Check if the output dataframe has the expected schema and values - expected_df = pd.DataFrame({ - "result": [True], - "reason": ["Authorized"], - "timestamp": [pd.Timestamp("2021-01-01 00:00:00")], - "location": ["New York, NY, USA"], - "authdevicename": ["Device1"], - "username": ["John Doe"], - "accessdevicebrowser": ["Chrome"], - "accessdeviceos": ["Windows"], - }) - - pd.set_option('display.max_columns', None) - pd.testing.assert_frame_equal(output_df, expected_df) - - -def test_input_schema_conversion_with_functional_filter(): - # Create a DataFrameInputSchema instance with the example schema provided - example_schema = DataFrameInputSchema( - json_columns=["access_device", "application", "auth_device", "user"], - column_info=source_column_info, - # pylint: disable=singleton-comparison - row_filter=lambda df: df[df["result"] == True]) # noqa E712 - - # Create a test dataframe with data according to the schema - test_df = pd.DataFrame({ - "access_device": [ - '{"browser": "Chrome", "os": "Windows", "location": {"city": "New York", "state": "NY", "country": "USA"}}', - '{"browser": "Firefox", "os": "Linux", "location": ' - '{"city": "San Francisco", "state": "CA", "country": "USA"}}' - ], - "application": ['{"name": "TestApp"}', '{"name": "AnotherApp"}'], - "auth_device": ['{"name": "Device1"}', '{"name": "Device2"}'], - "user": ['{"name": "John Doe"}', '{"name": "Jane Smith"}'], - "timestamp": [pd.Timestamp("2021-01-01 00:00:00"), pd.Timestamp("2021-02-02 12:00:00")], - "result": ["SUCCESS", "FAILURE"], - "reason": ["Authorized", "Unauthorized"] - }) - - # Call `input_schema_to_nvt_workflow` with the created instance - example_schema = create_and_attach_nvt_workflow(example_schema) - - # Apply the returned nvt.Workflow to the test dataframe - prepared_df_info: PreparedDFInfo = example_schema.prep_dataframe(test_df) - dataset = nvt.Dataset(prepared_df_info.df) - output_df = example_schema.nvt_workflow.transform(dataset).to_ddf().compute().to_pandas() - - # Check if the output dataframe has the expected schema and values - expected_df = pd.DataFrame({ - "result": [True], - "reason": ["Authorized"], - "timestamp": [pd.Timestamp("2021-01-01 00:00:00")], - "location": ["New York, NY, USA"], - "authdevicename": ["Device1"], - "username": ["John Doe"], - "accessdevicebrowser": ["Chrome"], - "accessdeviceos": ["Windows"], - }) - - pd.set_option('display.max_columns', None) - pd.testing.assert_frame_equal(output_df, expected_df) - - -def test_input_schema_conversion_with_filter_and_index(): - # Create a DataFrameInputSchema instance with the example schema provided - example_schema = DataFrameInputSchema( - json_columns=["access_device"], - column_info=[ - BoolColumn(name="result", - dtype="bool", - input_name="result", - true_values=["success", "SUCCESS"], - false_values=["denied", "Denied", "DENIED", "FRAUD"]), - RenameColumn(name="accessdeviceos", dtype="str", input_name="access_device.os"), - ], - # pylint: disable=singleton-comparison - row_filter=lambda df: df[df["result"] == True]) # noqa E712 - - # Create a test dataframe with data according to the schema - test_df = pd.DataFrame({ - "access_device": [ - '{"browser": "Chrome", "os": "Windows", "location": {"city": "New York", "state": "NY", "country": "USA"}}', - '{"browser": "Firefox", "os": "Linux", "location": ' - '{"city": "San Francisco", "state": "CA", "country": "USA"}}', - '{"browser": "Chrome", "os": "Windows", "location": {"city": "New York", "state": "NY", "country": "USA"}}', - '{"browser": "Firefox", "os": "Linux", "location": ' - '{"city": "San Francisco", "state": "CA", "country": "USA"}}', - ], - "result": ["SUCCESS", "FAILURE", "FAILURE", "SUCCESS"], - }) - - # Offset the index - test_df.index += 5 - - # Apply the returned nvt.Workflow to the test dataframe - output_df = process_dataframe(test_df, example_schema) - - # Check if the output dataframe has the expected schema and values - expected_df = test_df.copy() - - # Filter the rows - expected_df = expected_df[expected_df["result"] == "SUCCESS"] - - expected_df["result"] = expected_df["result"] == "SUCCESS" - expected_df["accessdeviceos"] = expected_df["access_device"].apply(lambda x: json.loads(x)["os"]) - expected_df = expected_df[["result", "accessdeviceos"]] - - pd.set_option('display.max_columns', None) - pd.testing.assert_frame_equal(output_df, expected_df) diff --git a/tests/utils/nvt/test_transforms.py b/tests/utils/nvt/test_transforms.py deleted file mode 100644 index 96df15447c..0000000000 --- a/tests/utils/nvt/test_transforms.py +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (c) 2022-2024, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import warnings - -import pandas as pd -import pytest - -with warnings.catch_warnings(): - # Ignore warning regarding tensorflow not being installed - warnings.filterwarnings("ignore", message=".*No module named 'tensorflow'", category=UserWarning) - from nvtabular.ops.operator import ColumnSelector - -from _utils.dataset_manager import DatasetManager -from morpheus.utils.nvt.transforms import json_flatten -from morpheus.utils.type_aliases import DataFrameType - - -@pytest.fixture(name="data") -def data_fixture(): - yield { - "id": [1, 2], - "info": [ - '{"name": "John", "age": 30, "city": "New York"}', '{"name": "Jane", "age": 28, "city": "San Francisco"}' - ] - } - - -@pytest.fixture(name="df") -def df_fixture(dataset: DatasetManager, data: dict): - yield dataset.df_class(data) - - -def test_json_flatten(df: DataFrameType): - col_selector = ColumnSelector(["info"]) - result = json_flatten(col_selector, df) - - expected_data = {"info.name": ["John", "Jane"], "info.age": [30, 28], "info.city": ["New York", "San Francisco"]} - expected_df = pd.DataFrame(expected_data) - - DatasetManager.assert_df_equal(result, expected_df) From 76bb5898c5a503bf3d16754502941410acf66097 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Mon, 29 Jul 2024 20:41:42 +0000 Subject: [PATCH 3/4] fix styles --- morpheus/utils/column_info.py | 1 - morpheus/utils/downloader.py | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/morpheus/utils/column_info.py b/morpheus/utils/column_info.py index 9ebba46f2b..eaef229666 100644 --- a/morpheus/utils/column_info.py +++ b/morpheus/utils/column_info.py @@ -788,4 +788,3 @@ def __post_init__(self): input_columns=self.input_columns, json_cols=self.json_columns, preserve_re=self.preserve_columns) - diff --git a/morpheus/utils/downloader.py b/morpheus/utils/downloader.py index 0cbd17f56a..d9d2c2bd2e 100644 --- a/morpheus/utils/downloader.py +++ b/morpheus/utils/downloader.py @@ -109,7 +109,8 @@ def get_dask_cluster(self): logger.debug("Creating dask cluster...") Downloader._dask_cluster = dask.distributed.LocalCluster(start=True, - processes=self.download_method != "dask_thread") + processes=self.download_method + != "dask_thread") # n_workers = dask_cuda.utils.get_n_gpus() # threads_per_worker = mp.cpu_count() // n_workers From fce9db75a47379a6c19873660cafcdaffd1e1150 Mon Sep 17 00:00:00 2001 From: Christopher Harris Date: Wed, 31 Jul 2024 16:41:29 +0000 Subject: [PATCH 4/4] fix styles --- morpheus/utils/downloader.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/morpheus/utils/downloader.py b/morpheus/utils/downloader.py index d9d2c2bd2e..a43ac6ff51 100644 --- a/morpheus/utils/downloader.py +++ b/morpheus/utils/downloader.py @@ -17,11 +17,9 @@ """ import logging -import multiprocessing as mp import os import threading import typing -import warnings from enum import Enum import fsspec @@ -104,7 +102,6 @@ def get_dask_cluster(self): if Downloader._dask_cluster is None: import dask import dask.distributed - import dask_cuda.utils logger.debug("Creating dask cluster...") @@ -112,12 +109,6 @@ def get_dask_cluster(self): processes=self.download_method != "dask_thread") - # n_workers = dask_cuda.utils.get_n_gpus() - # threads_per_worker = mp.cpu_count() // n_workers - - # Downloader._dask_cluster = dask_cuda.LocalCUDACluster(n_workers=n_workers, - # threads_per_worker=threads_per_worker) - logger.debug("Creating dask cluster... Done. Dashboard: %s", Downloader._dask_cluster.dashboard_link) return Downloader._dask_cluster