diff --git a/.vscode/launch.json b/.vscode/launch.json index 3f61a33f8f..ebac50d5f7 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -111,6 +111,7 @@ // "--viz_file=.tmp/nlp-viz.png", "from-file", "--filename=data/pcap_dump.jsonlines", + // "dropna", // "from-kafka", // "monitor", // "--description", @@ -195,6 +196,7 @@ // "monitor", // "--description", // "Input Message Rate", + // "dropna", // "buffer", "deserialize", // "buffer", @@ -356,6 +358,7 @@ // "monitor", // "--description", // "Input Message Rate", + // "dropna", "buffer", "deserialize", // "buffer", diff --git a/README.md b/README.md index 76a9782c45..1556fe098d 100644 --- a/README.md +++ b/README.md @@ -293,6 +293,7 @@ Commands: buffer Buffer results delay Delay results deserialize Deserialize source data from JSON + dropna Drop null data entries filter Filter message by a classification threshold from-file Load messages from a file from-kafka Load messages from a Kafka cluster @@ -317,6 +318,7 @@ Commands: buffer Buffer results delay Delay results deserialize Deserialize source data from JSON + dropna Drop null data entries filter Filter message by a classification threshold from-file Load messages from a file from-kafka Load messages from a Kafka cluster diff --git a/morpheus/cli.py b/morpheus/cli.py index b621656026..67a0c9fa75 100644 --- a/morpheus/cli.py +++ b/morpheus/cli.py @@ -415,6 +415,11 @@ def post_pipeline(ctx: click.Context, stages, **kwargs): default=1, type=click.IntRange(min=1), help=("Repeats the input dataset multiple times. Useful to extend small datasets for debugging.")) +@click.option('--filter_null', + default=True, + type=bool, + help=("Whether or not to filter rows with null 'data' column. Null values in the 'data' column can " + "cause issues down the line with processing. Setting this to True is recommended.")) @prepare_command(False) def from_file(ctx: click.Context, **kwargs): @@ -510,6 +515,24 @@ def buffer(ctx: click.Context, **kwargs): return stage +@click.command(short_help="Drop null data entries from a DataFrame", **command_kwargs) +@click.option('--column', type=str, default="data", help="Which column to use when searching for null values.") +@prepare_command(False) +def dropna(ctx: click.Context, **kwargs): + + from morpheus.pipeline import LinearPipeline + + p: LinearPipeline = ctx.ensure_object(LinearPipeline) + + from morpheus.pipeline.preprocessing import DropNullStage + + stage = DropNullStage(Config.get(), **kwargs) + + p.add_stage(stage) + + return stage + + @click.command(short_help="Delay results", **command_kwargs) @click.option('--duration', type=str, help="Time to delay messages in the pipeline. Follows the pandas interval format") @prepare_command(False) @@ -884,6 +907,7 @@ def gen_viz(ctx: click.Context, **kwargs): pipeline_nlp.add_command(buffer) pipeline_nlp.add_command(delay) pipeline_nlp.add_command(deserialize) +pipeline_nlp.add_command(dropna) pipeline_nlp.add_command(filter) pipeline_nlp.add_command(from_file) pipeline_nlp.add_command(from_kafka) @@ -903,6 +927,7 @@ def gen_viz(ctx: click.Context, **kwargs): pipeline_fil.add_command(buffer) pipeline_fil.add_command(delay) pipeline_fil.add_command(deserialize) +pipeline_fil.add_command(dropna) pipeline_fil.add_command(filter) pipeline_fil.add_command(from_file) pipeline_fil.add_command(from_kafka) diff --git a/morpheus/pipeline/input/from_file.py b/morpheus/pipeline/input/from_file.py index 34612a956e..500ec84576 100644 --- a/morpheus/pipeline/input/from_file.py +++ b/morpheus/pipeline/input/from_file.py @@ -101,13 +101,16 @@ async def _run(self): self.stopped = True -def df_onread_cleanup(x: typing.Union[cudf.DataFrame, pd.DataFrame]): +def filter_null_data(x: typing.Union[cudf.DataFrame, pd.DataFrame]): + return x[~x['data'].isna()] + + +def cudf_json_onread_cleanup(x: typing.Union[cudf.DataFrame, pd.DataFrame]): """ Fixes parsing issues when reading from a file. When loading a JSON file, cuDF converts ``\\n`` to ``\\\\n`` for some reason """ - - if ("data" in x): + if ("data" in x and not x.empty): x["data"] = x["data"].str.replace('\\n', '\n', regex=False) return x @@ -132,6 +135,9 @@ class FileSourceStage(SingleOutputSource): Supported extensions: 'json', 'csv' repeat: int, default = 1 Repeats the input dataset multiple times. Useful to extend small datasets for debugging. + filter_null: bool, default = True + Whether or not to filter rows with null 'data' column. Null values in the 'data' column can cause issues down + the line with processing. Setting this to True is recommended cudf_kwargs: dict, default=None keyword args passed to underlying cuDF I/O function. See the cuDF documentation for `cudf.read_csv()` and `cudf.read_json()` for the available options. With `file_type` == 'json', this defaults to ``{ "lines": True }`` @@ -143,6 +149,7 @@ def __init__(self, iterative: bool = None, file_type: FileSourceTypes = FileSourceTypes.Auto, repeat: int = 1, + filter_null: bool = True, cudf_kwargs: dict = None): super().__init__(c) @@ -152,6 +159,7 @@ def __init__(self, self._filename = filename self._iterative = iterative if iterative is not None else not c.use_dask self._file_type = file_type + self._filter_null = filter_null self._cudf_kwargs = {} if cudf_kwargs is None else cudf_kwargs self._input_count = None @@ -198,10 +206,18 @@ def _read_file(self) -> cudf.DataFrame: if (mode == FileSourceTypes.Json): df = cudf.read_json(self._filename, **cudf_args) - df = df_onread_cleanup(df) + + if (self._filter_null): + df = filter_null_data(df) + + df = cudf_json_onread_cleanup(df) return df elif (mode == FileSourceTypes.Csv): df = cudf.read_csv(self._filename, **cudf_args) + + if (self._filter_null): + df = filter_null_data(df) + return df else: assert False, "Unsupported file type mode: {}".format(mode) diff --git a/morpheus/pipeline/preprocessing.py b/morpheus/pipeline/preprocessing.py index 08fd12faae..e44824d6fd 100644 --- a/morpheus/pipeline/preprocessing.py +++ b/morpheus/pipeline/preprocessing.py @@ -21,7 +21,6 @@ import cupy as cp import numpy as np -import streamz import typing_utils import cudf @@ -36,9 +35,9 @@ from morpheus.pipeline.messages import MultiInferenceNLPMessage from morpheus.pipeline.messages import MultiMessage from morpheus.pipeline.pipeline import MultiMessageStage +from morpheus.pipeline.pipeline import SinglePortStage from morpheus.pipeline.pipeline import StreamFuture from morpheus.pipeline.pipeline import StreamPair -from morpheus.utils.cudf_subword_helper import create_tokenizer from morpheus.utils.cudf_subword_helper import tokenize_text_series @@ -127,6 +126,63 @@ def _build_single(self, input_stream: StreamPair) -> StreamPair: return stream, out_type +class DropNullStage(SinglePortStage): + """ + Drop null/empty data input entries. + + Parameters + ---------- + c : morpheus.config.Config + Pipeline configuration instance + column : str + Column name to perform null check. + + """ + def __init__(self, c: Config, column: str): + super().__init__(c) + + self._use_dask = c.use_dask + self._column = column + + # Mark these stages to log timestamps if requested + self._should_log_timestamps = True + + @property + def name(self) -> str: + return "dropna" + + def accepted_types(self) -> typing.Tuple: + """ + Accepted input types for this stage are returned. + + Returns + ------- + typing.Tuple + Accepted input types + + """ + return (cudf.DataFrame, StreamFuture[cudf.DataFrame]) + + @staticmethod + def filter(x: cudf.DataFrame, column: str): + x = x[~x[column].isna()] + + return x + + def _build_single(self, input_stream: StreamPair) -> StreamPair: + stream = input_stream[0] + + if (typing_utils.issubtype(input_stream[1], StreamFuture)): + stream = stream.map(DropNullStage.filter, column=self._column) + else: + stream = stream.async_map(DropNullStage.filter, executor=self._pipeline.thread_pool, column=self._column) + + # Filter out empty message groups + stream = stream.filter(lambda x: not x.empty) + + return stream, input_stream[1] + + class PreprocessBaseStage(MultiMessageStage): """ This is a base pre-processing class holding general functionality for all preprocessing stages.