Skip to content

Commit

Permalink
Merge pull request #6 from drobison00/devin-dfp-fixes
Browse files Browse the repository at this point in the history
Update downloader
  • Loading branch information
cwharris authored Jul 18, 2023
2 parents 8a23a30 + cfc985d commit 2244639
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,6 @@ def _single_object_to_dataframe(file_object: fsspec.core.OpenFile,
logger.warning(f"Error fetching {file_object}: {e}\nRetrying...")
retries += 1

# Run the pre-processing before returning
if (s3_df is None):
return s3_df

s3_df = process_dataframe(df_in=s3_df, input_schema=schema)

return s3_df


Expand Down Expand Up @@ -123,7 +117,7 @@ def supports_cpp_node(self):

def accepted_types(self) -> typing.Tuple:
"""Accepted input types."""
return (typing.Any,)
return (typing.Any, )

def _get_or_create_dataframe_from_s3_batch(
self, file_object_batch: typing.Tuple[fsspec.core.OpenFiles, int]) -> typing.Tuple[pd.DataFrame, bool]:
Expand All @@ -137,7 +131,7 @@ def _get_or_create_dataframe_from_s3_batch(
fs: fsspec.AbstractFileSystem = file_list.fs

# Create a list of dictionaries that only contains the information we are interested in hashing. `ukey` just
# hashes all of the output of `info()` which is perfect
# hashes all the output of `info()` which is perfect
hash_data = [{"ukey": fs.ukey(file_object.path)} for file_object in file_list]

# Convert to base 64 encoding to remove - values
Expand Down Expand Up @@ -174,6 +168,7 @@ def _get_or_create_dataframe_from_s3_batch(
return None, False

output_df: pd.DataFrame = pd.concat(dfs)
output_df = process_dataframe(df_in=output_df, input_schema=self._schema)

# Finally sort by timestamp and then reset the index
output_df.sort_values(by=[self._config.ae.timestamp_column_name], inplace=True)
Expand Down Expand Up @@ -206,11 +201,11 @@ def convert_to_dataframe(self, s3_object_batch: typing.Tuple[fsspec.core.OpenFil

duration = (time.time() - start_time) * 1000.0

logger.debug("S3 objects to DF complete. Rows: %s, Cache: %s, Duration: %s ms",
logger.debug("S3 objects to DF complete. Rows: %s, Cache: %s, Duration: %s ms, Rate: %s rows/s",
len(output_df),
"hit" if cache_hit else "miss",
duration)

duration,
len(output_df) / (duration / 1000.0))
return output_df
except Exception:
logger.exception("Error while converting S3 buckets to DF.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
from datetime import timedelta
from datetime import timezone

import pandas as pd
import numpy as np
import pandas as pd


@dataclasses.dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from morpheus.utils.column_info import create_increment_col
from morpheus.utils.file_utils import date_extractor
from morpheus.utils.logger import configure_logging
from morpheus.utils.nvt.schema_converters import create_and_attach_nvt_workflow


@click.command()
Expand Down Expand Up @@ -231,6 +232,7 @@ def run_pipeline(train_users,
]

source_schema = DataFrameInputSchema(json_columns=["properties"], column_info=source_column_info)
source_schema = create_and_attach_nvt_workflow(source_schema)

# Preprocessing schema
preprocess_column_info = [
Expand Down Expand Up @@ -263,6 +265,7 @@ def run_pipeline(train_users,
]

preprocess_schema = DataFrameInputSchema(column_info=preprocess_column_info, preserve_columns=["_batch_id"])
preprocess_schema = create_and_attach_nvt_workflow(preprocess_schema)

# Create a linear pipeline object
pipeline = LinearPipeline(config)
Expand Down
12 changes: 12 additions & 0 deletions morpheus/utils/column_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import typing
from datetime import datetime

import nvtabular as nvt
import pandas as pd

import cudf
Expand Down Expand Up @@ -494,6 +495,17 @@ class DataFrameInputSchema:
column_info: typing.List[ColumnInfo] = dataclasses.field(default_factory=list)
preserve_columns: typing.List[str] = dataclasses.field(default_factory=list)
row_filter: typing.Callable[[pd.DataFrame], pd.DataFrame] = None
json_output_columns: typing.List[str] = None
_nvt_workflow: nvt.Workflow = None
_json_preproc: typing.Callable[[pd.DataFrame], typing.List[str]] = None

@property
def nvt_workflow(self):
return self._nvt_workflow

@property
def json_preproc(self):
return self._json_preproc

def __post_init__(self):
"""
Expand Down
33 changes: 18 additions & 15 deletions morpheus/utils/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import fsspec
import pandas as pd
from merlin.core.utils import Distributed

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,6 +65,8 @@ class Downloader:
def __init__(self,
download_method: typing.Union[DownloadMethods, str] = DownloadMethods.DASK_THREAD,
dask_heartbeat_interval: str = "30s"):

self._merlin_distributed = None
self._dask_cluster = None
self._dask_heartbeat_interval = dask_heartbeat_interval

Expand Down Expand Up @@ -92,16 +95,20 @@ def get_dask_cluster(self):
-------
dask_cuda.LocalCUDACluster
"""

if self._dask_cluster is None:
import dask
import dask.distributed
import dask_cuda.utils

logger.debug("Creating dask cluster...")

# Up the heartbeat interval which can get violated with long download times
dask.config.set({"distributed.client.heartbeat": self._dask_heartbeat_interval})
n_workers = dask_cuda.utils.get_n_gpus()
threads_per_worker = mp.cpu_count() // n_workers

import dask_cuda
self._dask_cluster = dask_cuda.LocalCUDACluster()
self._dask_cluster = dask_cuda.LocalCUDACluster(n_workers=n_workers, threads_per_worker=threads_per_worker)

logger.debug("Creating dask cluster... Done. Dashboard: %s", self._dask_cluster.dashboard_link)

Expand All @@ -116,18 +123,15 @@ def get_dask_client(self):
dask.distributed.Client
"""
import dask.distributed
return dask.distributed.Client(self.get_dask_cluster())

def close(self):
"""Close the dask cluster if it exists."""
if (self._dask_cluster is not None):
logger.debug("Stopping dask cluster...")
if (self._merlin_distributed is None):
self._merlin_distributed = Distributed(client=dask.distributed.Client(self.get_dask_cluster()))

self._dask_cluster.close()
return self._merlin_distributed

self._dask_cluster = None

logger.debug("Stopping dask cluster... Done.")
def close(self):
"""Cluster management is handled by Merlin.Distributed"""
pass

def download(self,
download_buckets: fsspec.core.OpenFiles,
Expand All @@ -151,10 +155,9 @@ def download(self,
dfs = []
if (self._download_method.startswith("dask")):
# Create the client each time to ensure all connections to the cluster are closed (they can time out)
with self.get_dask_client() as client:
dfs = client.map(download_fn, download_buckets)

dfs = client.gather(dfs)
with self.get_dask_client() as dist:
dfs = dist.client.map(download_fn, download_buckets)
dfs = dist.client.gather(dfs)

elif (self._download_method in ("multiprocess", "multiprocessing")):
# Use multiprocessing here since parallel downloads are a pain
Expand Down
2 changes: 1 addition & 1 deletion morpheus/utils/nvt/mutate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
import functools
import os
import typing
from inspect import getsourcelines

import numpy as np
from merlin.core.dispatch import DataFrameType
from merlin.schema import ColumnSchema
from merlin.schema import Schema
Expand Down
57 changes: 47 additions & 10 deletions morpheus/utils/nvt/schema_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import dataclasses
import json
import os
import re
import typing
Expand Down Expand Up @@ -564,8 +565,37 @@ def _coalesce_ops(graph: nx.Graph,
return coalesced_workflow


def dataframe_input_schema_to_nvt_workflow(input_schema: DataFrameInputSchema,
visualize: typing.Optional[bool] = False) -> nvt.Workflow:
def _json_flatten(df_input: typing.Union[pd.DataFrame, cudf.DataFrame], json_cols, preserve_re=None):
convert_to_cudf = False
if (isinstance(df_input, cudf.DataFrame)):
convert_to_cudf = True
df_input = df_input.to_pandas()

json_normalized = []
cols_to_keep = list(df_input.columns)
for col in json_cols:
pd_series = df_input[col]
pd_series = pd_series.apply(lambda x: x if isinstance(x, dict) else json.loads(x))

pdf_norm = pd.json_normalize(pd_series)
pdf_norm.rename(columns=lambda x, col=col: col + "." + x, inplace=True)
pdf_norm.reset_index(drop=True, inplace=True)

json_normalized.append(pdf_norm)
if (preserve_re is not None and not preserve_re.match(col)):
cols_to_keep.remove(col)

df_input.reset_index(drop=True, inplace=True)
df_normalized = pd.concat([df_input[cols_to_keep]] + json_normalized, axis=1)

if (convert_to_cudf):
df_normalized = cudf.from_pandas(df_normalized).reset_index(drop=True)

return df_normalized


def create_and_attach_nvt_workflow(input_schema: DataFrameInputSchema,
visualize: typing.Optional[bool] = False) -> DataFrameInputSchema:
"""
Converts an `input_schema` to a `nvt.Workflow` object.
Expand Down Expand Up @@ -600,17 +630,22 @@ def dataframe_input_schema_to_nvt_workflow(input_schema: DataFrameInputSchema,
raise ValueError("Input schema is empty")

# Try to guess which output columns we'll produce
json_output_cols = _resolve_json_output_columns(input_schema)

json_cols = input_schema.json_columns
column_info_objects = list(input_schema.column_info)
if (json_cols is not None and len(json_cols) > 0):
column_info_objects.append(
JSONFlattenInfo(input_col_names=list(json_cols),
output_col_names=json_output_cols,
dtype="str",
name="json_info"))
input_schema.json_output_columns = _resolve_json_output_columns(input_schema)
input_schema._json_preproc = partial(_json_flatten,
json_cols=json_cols,
preserve_re=input_schema.preserve_columns)

# Note(Devin): soft locking problem with nvt operators, skip for now.
# column_info_objects.append(
# JSONFlattenInfo(input_col_names=list(json_cols),
# output_col_names=json_output_cols,
# dtype="str",
# name="json_info"))

column_info_objects = list(input_schema.column_info)
column_info_map = {ci.name: ci for ci in column_info_objects}

graph = _build_nx_dependency_graph(column_info_objects)
Expand All @@ -629,4 +664,6 @@ def dataframe_input_schema_to_nvt_workflow(input_schema: DataFrameInputSchema,
if (visualize):
coalesced_workflow.graph.render(view=True, format='svg')

return nvt.Workflow(coalesced_workflow)
input_schema._nvt_workflow = nvt.Workflow(coalesced_workflow)

return input_schema
47 changes: 32 additions & 15 deletions morpheus/utils/schema_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import os
import typing
Expand All @@ -24,7 +25,7 @@
from morpheus.utils.column_info import DataFrameInputSchema
from morpheus.utils.nvt import register_morpheus_extensions
from morpheus.utils.nvt.patches import patch_numpy_dtype_registry
from morpheus.utils.nvt.schema_converters import dataframe_input_schema_to_nvt_workflow
from morpheus.utils.nvt.schema_converters import create_and_attach_nvt_workflow

if os.environ.get("MORPHEUS_IN_SPHINX_BUILD") is None:
# Apply patches to NVT
Expand Down Expand Up @@ -64,41 +65,57 @@ def process_dataframe(
"""
Applies column transformations to the input dataframe as defined by the `input_schema`.
If `input_schema` is an instance of `DataFrameInputSchema`, and it has a 'json_preproc' attribute,
the function will first flatten the JSON columns and concatenate the results with the original DataFrame.
Parameters
----------
df_in : Union[pd.DataFrame, cudf.DataFrame]
The input DataFrame to process.
input_schema : Union[nvt.Workflow, DataFrameInputSchema]
Defines the transformations to apply to 'df_in'.
If an instance of nvt.Workflow, it is directly used to transform the dataframe.
If an instance of DataFrameInputSchema, it is converted to a nvt.Workflow before being used.
If an instance of DataFrameInputSchema, it is first converted to an nvt.Workflow,
with JSON columns preprocessed if 'json_preproc' attribute is present.
Returns
-------
Union[pd.DataFrame, cudf.DataFrame]
The processed DataFrame. If 'df_in' was a pd.DataFrame, the return type is pd.DataFrame.
Otherwise, it is cudf.DataFrame.
"""
The processed DataFrame. If 'df_in' was a pd.DataFrame, the return type is also pd.DataFrame,
otherwise, it is cudf.DataFrame.
workflow = input_schema
if (isinstance(input_schema, DataFrameInputSchema)):
workflow = dataframe_input_schema_to_nvt_workflow(input_schema)
Note
----
Any transformation that needs to be performed should be defined in 'input_schema'.
If 'df_in' is a pandas DataFrame, it is temporarily converted into a cudf DataFrame for the transformation.
"""

convert_to_pd = False
if (isinstance(df_in, pd.DataFrame)):
convert_to_pd = True

# for col in df_in.columns:
# print(df_in[col].dtype)
# if df_in[col].dtype == "datetime":
# df_in[col].dt.tz_localize(None)
# If we're given an nvt_schema, we just use it.
nvt_workflow = input_schema
if (isinstance(input_schema, DataFrameInputSchema)):
if (input_schema.nvt_workflow is None):
input_schema = create_and_attach_nvt_workflow(input_schema)

# Note(Devin): pre-flatten to avoid Dask hang when calling json_normalize within an NVT operator
if (input_schema.json_preproc is not None):
df_in = input_schema.json_preproc(df_in)

input_schema.json_columns = None

nvt_workflow = input_schema.nvt_workflow

if (convert_to_pd):
df_in = cudf.DataFrame(df_in)

dataset = nvt.Dataset(df_in)

result = workflow.fit_transform(dataset).to_ddf().compute()
df_result = nvt_workflow.fit_transform(dataset).to_ddf().compute()

if (convert_to_pd):
return result.to_pandas()
return df_result.to_pandas()

return result
return df_result
8 changes: 4 additions & 4 deletions tests/examples/digital_fingerprinting/test_dfp_file_to_df.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,13 @@ def test_get_or_create_dataframe_from_s3_batch_cache_miss(mock_obf_to_df: mock.M
# We're going to feed the function a file object pointing to a different file than the one we are going to return
# from out mocked fetch function. This way we will be able to easily tell if our mocks are working. Mostly we just
# want to make sure that we aren't accidentally spinning up dask clusters or process pools in CI
returnd_df = dataset_pandas['filter_probs.csv']
returned_df = dataset_pandas['filter_probs.csv']
if dl_type.startswith('dask'):
mock_dask_client.gather.return_value = [returnd_df]
mock_dask_client.gather.return_value = [returned_df]
elif dl_type in ("multiprocess", "multiprocessing"):
mock_mp_pool.map.return_value = [returnd_df]
mock_mp_pool.map.return_value = [returned_df]
else:
mock_obf_to_df.return_value = returnd_df
mock_obf_to_df.return_value = returned_df

os.environ['MORPHEUS_FILE_DOWNLOAD_TYPE'] = dl_type
stage = DFPFileToDataFrameStage(config, DataFrameInputSchema(), cache_dir=tmp_path)
Expand Down
Loading

0 comments on commit 2244639

Please sign in to comment.