Skip to content

Commit

Permalink
Merge branch 'verify-pipeline-input-data' into 'branch-0.2.1-EA'
Browse files Browse the repository at this point in the history
Add drop `nulls` stage

Closes nv-morpheus#17

See merge request morpheus/morpheus!44
  • Loading branch information
mdemoret-nv committed Nov 10, 2021
2 parents 561e96d + 9507525 commit 8aec53e
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
// "--viz_file=.tmp/nlp-viz.png",
"from-file",
"--filename=data/pcap_dump.jsonlines",
// "dropna",
// "from-kafka",
// "monitor",
// "--description",
Expand Down Expand Up @@ -195,6 +196,7 @@
// "monitor",
// "--description",
// "Input Message Rate",
// "dropna",
// "buffer",
"deserialize",
// "buffer",
Expand Down Expand Up @@ -356,6 +358,7 @@
// "monitor",
// "--description",
// "Input Message Rate",
// "dropna",
"buffer",
"deserialize",
// "buffer",
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ Commands:
buffer Buffer results
delay Delay results
deserialize Deserialize source data from JSON
dropna Drop null data entries
filter Filter message by a classification threshold
from-file Load messages from a file
from-kafka Load messages from a Kafka cluster
Expand All @@ -317,6 +318,7 @@ Commands:
buffer Buffer results
delay Delay results
deserialize Deserialize source data from JSON
dropna Drop null data entries
filter Filter message by a classification threshold
from-file Load messages from a file
from-kafka Load messages from a Kafka cluster
Expand Down
25 changes: 25 additions & 0 deletions morpheus/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,11 @@ def post_pipeline(ctx: click.Context, stages, **kwargs):
default=1,
type=click.IntRange(min=1),
help=("Repeats the input dataset multiple times. Useful to extend small datasets for debugging."))
@click.option('--filter_null',
default=True,
type=bool,
help=("Whether or not to filter rows with null 'data' column. Null values in the 'data' column can "
"cause issues down the line with processing. Setting this to True is recommended."))
@prepare_command(False)
def from_file(ctx: click.Context, **kwargs):

Expand Down Expand Up @@ -510,6 +515,24 @@ def buffer(ctx: click.Context, **kwargs):
return stage


@click.command(short_help="Drop null data entries from a DataFrame", **command_kwargs)
@click.option('--column', type=str, default="data", help="Which column to use when searching for null values.")
@prepare_command(False)
def dropna(ctx: click.Context, **kwargs):

from morpheus.pipeline import LinearPipeline

p: LinearPipeline = ctx.ensure_object(LinearPipeline)

from morpheus.pipeline.preprocessing import DropNullStage

stage = DropNullStage(Config.get(), **kwargs)

p.add_stage(stage)

return stage


@click.command(short_help="Delay results", **command_kwargs)
@click.option('--duration', type=str, help="Time to delay messages in the pipeline. Follows the pandas interval format")
@prepare_command(False)
Expand Down Expand Up @@ -884,6 +907,7 @@ def gen_viz(ctx: click.Context, **kwargs):
pipeline_nlp.add_command(buffer)
pipeline_nlp.add_command(delay)
pipeline_nlp.add_command(deserialize)
pipeline_nlp.add_command(dropna)
pipeline_nlp.add_command(filter)
pipeline_nlp.add_command(from_file)
pipeline_nlp.add_command(from_kafka)
Expand All @@ -903,6 +927,7 @@ def gen_viz(ctx: click.Context, **kwargs):
pipeline_fil.add_command(buffer)
pipeline_fil.add_command(delay)
pipeline_fil.add_command(deserialize)
pipeline_fil.add_command(dropna)
pipeline_fil.add_command(filter)
pipeline_fil.add_command(from_file)
pipeline_fil.add_command(from_kafka)
Expand Down
24 changes: 20 additions & 4 deletions morpheus/pipeline/input/from_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,16 @@ async def _run(self):
self.stopped = True


def df_onread_cleanup(x: typing.Union[cudf.DataFrame, pd.DataFrame]):
def filter_null_data(x: typing.Union[cudf.DataFrame, pd.DataFrame]):
return x[~x['data'].isna()]


def cudf_json_onread_cleanup(x: typing.Union[cudf.DataFrame, pd.DataFrame]):
"""
Fixes parsing issues when reading from a file. When loading a JSON file, cuDF converts ``\\n`` to
``\\\\n`` for some reason
"""

if ("data" in x):
if ("data" in x and not x.empty):
x["data"] = x["data"].str.replace('\\n', '\n', regex=False)

return x
Expand All @@ -132,6 +135,9 @@ class FileSourceStage(SingleOutputSource):
Supported extensions: 'json', 'csv'
repeat: int, default = 1
Repeats the input dataset multiple times. Useful to extend small datasets for debugging.
filter_null: bool, default = True
Whether or not to filter rows with null 'data' column. Null values in the 'data' column can cause issues down
the line with processing. Setting this to True is recommended
cudf_kwargs: dict, default=None
keyword args passed to underlying cuDF I/O function. See the cuDF documentation for `cudf.read_csv()` and
`cudf.read_json()` for the available options. With `file_type` == 'json', this defaults to ``{ "lines": True }``
Expand All @@ -143,6 +149,7 @@ def __init__(self,
iterative: bool = None,
file_type: FileSourceTypes = FileSourceTypes.Auto,
repeat: int = 1,
filter_null: bool = True,
cudf_kwargs: dict = None):
super().__init__(c)

Expand All @@ -152,6 +159,7 @@ def __init__(self,
self._filename = filename
self._iterative = iterative if iterative is not None else not c.use_dask
self._file_type = file_type
self._filter_null = filter_null
self._cudf_kwargs = {} if cudf_kwargs is None else cudf_kwargs

self._input_count = None
Expand Down Expand Up @@ -198,10 +206,18 @@ def _read_file(self) -> cudf.DataFrame:

if (mode == FileSourceTypes.Json):
df = cudf.read_json(self._filename, **cudf_args)
df = df_onread_cleanup(df)

if (self._filter_null):
df = filter_null_data(df)

df = cudf_json_onread_cleanup(df)
return df
elif (mode == FileSourceTypes.Csv):
df = cudf.read_csv(self._filename, **cudf_args)

if (self._filter_null):
df = filter_null_data(df)

return df
else:
assert False, "Unsupported file type mode: {}".format(mode)
Expand Down
60 changes: 58 additions & 2 deletions morpheus/pipeline/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import cupy as cp
import numpy as np
import streamz
import typing_utils

import cudf
Expand All @@ -36,9 +35,9 @@
from morpheus.pipeline.messages import MultiInferenceNLPMessage
from morpheus.pipeline.messages import MultiMessage
from morpheus.pipeline.pipeline import MultiMessageStage
from morpheus.pipeline.pipeline import SinglePortStage
from morpheus.pipeline.pipeline import StreamFuture
from morpheus.pipeline.pipeline import StreamPair
from morpheus.utils.cudf_subword_helper import create_tokenizer
from morpheus.utils.cudf_subword_helper import tokenize_text_series


Expand Down Expand Up @@ -127,6 +126,63 @@ def _build_single(self, input_stream: StreamPair) -> StreamPair:
return stream, out_type


class DropNullStage(SinglePortStage):
"""
Drop null/empty data input entries.
Parameters
----------
c : morpheus.config.Config
Pipeline configuration instance
column : str
Column name to perform null check.
"""
def __init__(self, c: Config, column: str):
super().__init__(c)

self._use_dask = c.use_dask
self._column = column

# Mark these stages to log timestamps if requested
self._should_log_timestamps = True

@property
def name(self) -> str:
return "dropna"

def accepted_types(self) -> typing.Tuple:
"""
Accepted input types for this stage are returned.
Returns
-------
typing.Tuple
Accepted input types
"""
return (cudf.DataFrame, StreamFuture[cudf.DataFrame])

@staticmethod
def filter(x: cudf.DataFrame, column: str):
x = x[~x[column].isna()]

return x

def _build_single(self, input_stream: StreamPair) -> StreamPair:
stream = input_stream[0]

if (typing_utils.issubtype(input_stream[1], StreamFuture)):
stream = stream.map(DropNullStage.filter, column=self._column)
else:
stream = stream.async_map(DropNullStage.filter, executor=self._pipeline.thread_pool, column=self._column)

# Filter out empty message groups
stream = stream.filter(lambda x: not x.empty)

return stream, input_stream[1]


class PreprocessBaseStage(MultiMessageStage):
"""
This is a base pre-processing class holding general functionality for all preprocessing stages.
Expand Down

0 comments on commit 8aec53e

Please sign in to comment.