Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove NVTabular #1825

Merged
merged 4 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ci/conda/recipes/morpheus/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ outputs:
- mrc
- networkx=2.8.8
- numpydoc =1.5.*
- nvtabular =23.08.00
- pydantic
- pluggy =1.3.*
- python
Expand Down
1 change: 0 additions & 1 deletion conda/environments/all_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ dependencies:
- nodejs=18.*
- numexpr
- numpydoc=1.5
- nvtabular=23.08.00
- onnx=1.15
- openai=1.13
- papermill=2.4.0
Expand Down
1 change: 0 additions & 1 deletion conda/environments/dev_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ dependencies:
- nlohmann_json=3.11
- nodejs=18.*
- numpydoc=1.5
- nvtabular=23.08.00
- pip
- pkg-config=0.29
- pluggy=1.3
Expand Down
1 change: 0 additions & 1 deletion conda/environments/examples_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ dependencies:
- nodejs=18.*
- numexpr
- numpydoc=1.5
- nvtabular=23.08.00
- onnx=1.15
- openai=1.13
- papermill=2.4.0
Expand Down
1 change: 0 additions & 1 deletion conda/environments/runtime_cuda-121_arch-x86_64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ dependencies:
- mrc=24.10
- networkx=2.8.8
- numpydoc=1.5
- nvtabular=23.08.00
- pip
- pluggy=1.3
- pydantic
Expand Down
1 change: 0 additions & 1 deletion dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ dependencies:
- mrc=24.10
- networkx=2.8.8
- numpydoc=1.5
- nvtabular=23.08.00
- pydantic
# - python ##
- python-confluent-kafka>=1.9.2,<1.10.0a0
Expand Down
1 change: 0 additions & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@
"langchain_core",
"merlin",
"morpheus.cli.commands", # Dont document the CLI in Sphinx
"nvtabular",
"pandas",
"pydantic",
"pymilvus",
Expand Down
1 change: 0 additions & 1 deletion examples/digital_fingerprinting/production/conda_env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ dependencies:
- librdkafka
- mlflow>=2.10.0,<3
- nodejs=18.*
- nvtabular=23.06
- papermill
- s3fs>=2023.6

Expand Down
10 changes: 0 additions & 10 deletions morpheus/utils/column_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,13 @@
import logging
import re
import typing
import warnings
from datetime import datetime
from functools import partial

import pandas as pd

import cudf

if (typing.TYPE_CHECKING):
with warnings.catch_warnings():
# Ignore warning regarding tensorflow not being installed
warnings.filterwarnings("ignore", message=".*No module named 'tensorflow'", category=UserWarning)
import nvtabular as nvt

logger = logging.getLogger(f"morpheus.{__name__}")

DEFAULT_DATE = '1970-01-01T00:00:00.000000+00:00'
Expand Down Expand Up @@ -749,7 +742,6 @@ class DataFrameInputSchema:
input_columns: typing.Dict[str, str] = dataclasses.field(init=False, repr=False)
output_columns: typing.List[tuple[str, str]] = dataclasses.field(init=False, repr=False)

nvt_workflow: "nvt.Workflow" = dataclasses.field(init=False, repr=False)
prep_dataframe: typing.Callable[[pd.DataFrame], typing.List[str]] = dataclasses.field(init=False, repr=False)

def __post_init__(self):
Expand Down Expand Up @@ -796,5 +788,3 @@ def __post_init__(self):
input_columns=self.input_columns,
json_cols=self.json_columns,
preserve_re=self.preserve_columns)

self.nvt_workflow = None
43 changes: 16 additions & 27 deletions morpheus/utils/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@
"""

import logging
import multiprocessing as mp
import os
import threading
import typing
import warnings
from enum import Enum

import fsspec
import pandas as pd
from merlin.core.utils import Distributed

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -69,7 +66,6 @@ def __init__(self,
download_method: typing.Union[DownloadMethods, str] = DownloadMethods.DASK_THREAD,
dask_heartbeat_interval: str = "30s"):

self._merlin_distributed = None
self._dask_heartbeat_interval = dask_heartbeat_interval

download_method = os.environ.get("MORPHEUS_FILE_DOWNLOAD_TYPE", download_method)
Expand Down Expand Up @@ -99,20 +95,19 @@ def get_dask_cluster(self):

Returns
-------
dask_cuda.LocalCUDACluster
dask.distributed.LocalCluster
"""

with Downloader._mutex:
if Downloader._dask_cluster is None:
import dask_cuda.utils
import dask
import dask.distributed

logger.debug("Creating dask cluster...")

n_workers = dask_cuda.utils.get_n_gpus()
threads_per_worker = mp.cpu_count() // n_workers

Downloader._dask_cluster = dask_cuda.LocalCUDACluster(n_workers=n_workers,
threads_per_worker=threads_per_worker)
Downloader._dask_cluster = dask.distributed.LocalCluster(start=True,
processes=self.download_method
!= "dask_thread")

logger.debug("Creating dask cluster... Done. Dashboard: %s", Downloader._dask_cluster.dashboard_link)

Expand All @@ -127,24 +122,18 @@ def get_dask_client(self):
dask.distributed.Client
"""
import dask.distributed
return dask.distributed.Client(self.get_dask_cluster())

# Up the heartbeat interval which can get violated with long download times
dask.config.set({"distributed.client.heartbeat": self._dask_heartbeat_interval})
def close(self):
"""Close the dask cluster if it exists."""
if (self._dask_cluster is not None):
logger.debug("Stopping dask cluster...")

if (self._merlin_distributed is None):
with warnings.catch_warnings():
# Merlin.Distributed will warn if a client already exists, the client in question is the one created
# and are explicitly passing to it in the constructor.
warnings.filterwarnings("ignore",
message="Existing Dask-client object detected in the current context.*",
category=UserWarning)
self._merlin_distributed = Distributed(client=dask.distributed.Client(self.get_dask_cluster()))
self._dask_cluster.close()

return self._merlin_distributed
self._dask_cluster = None

def close(self):
"""Cluster management is handled by Merlin.Distributed"""
pass
logger.debug("Stopping dask cluster... Done.")

def download(self,
download_buckets: fsspec.core.OpenFiles,
Expand All @@ -169,8 +158,8 @@ def download(self,
if (self._download_method.startswith("dask")):
# Create the client each time to ensure all connections to the cluster are closed (they can time out)
with self.get_dask_client() as dist:
dfs = dist.client.map(download_fn, download_buckets)
dfs = dist.client.gather(dfs)
dfs = dist.map(download_fn, download_buckets)
dfs = dist.gather(dfs)

else:
# Simply loop
Expand Down
13 changes: 0 additions & 13 deletions morpheus/utils/nvt/__init__.py

This file was deleted.

123 changes: 0 additions & 123 deletions morpheus/utils/nvt/decorators.py

This file was deleted.

17 changes: 0 additions & 17 deletions morpheus/utils/nvt/extensions/__init__.py

This file was deleted.

27 changes: 0 additions & 27 deletions morpheus/utils/nvt/extensions/morpheus_ext.py

This file was deleted.

Loading
Loading