Skip to content

Commit

Permalink
Final state before meeting #1. PyTorch model is working well.
Browse files Browse the repository at this point in the history
  • Loading branch information
mdemoret-nv committed Apr 8, 2021
1 parent a59d635 commit e29962c
Show file tree
Hide file tree
Showing 7 changed files with 1,193 additions and 32 deletions.
46 changes: 36 additions & 10 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,37 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [

{
"name": "Python: Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}"
},
{
"name": "Python: Run Network Generator",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/.tmp/kafka-producer/main.py",
"console": "integratedTerminal",
"cwd": "${workspaceFolder}/.tmp/kafka-producer",
"args": [
"-m",
"produce",
"-n",
"./network",
"-i",
"/home/mdemoret/Repos/rapids/cyber-dev/.tmp/dataset2/pcap_dump_2_pre_class-model2.json",
"-j",
"20",
"-t",
"test_pcap"
],
"env": {
"KAFKA_BROKER_SERVERS": "172.17.0.1:49161,172.17.0.1:49160,172.17.0.1:49159",
}
},
{
"name": "Python: Run Pipeline",
"type": "python",
Expand Down Expand Up @@ -57,11 +87,10 @@
"env": {
"CLX_INFERENCE_PIPELINE": "pytorch",
"CLX_VOCAB_HASH_FILE": "bert-base-cased-hash.txt",
"CLX_MODEL_SEQ_LENGTH": "512",
"CLX_MODEL_SEQ_LENGTH": "256",
"CLX_MODEL_MAX_BATCH_SIZE": "64",
},
"args": [
]
"args": []
},
{
"name": "Python: Run Triton (TRT)",
Expand All @@ -78,8 +107,7 @@
"CLX_MODEL_MAX_BATCH_SIZE": "8",
// "PYTHONASYNCIODEBUG": "1",
},
"args": [
]
"args": []
},
{
"name": "Python: Run Triton (ONNX)",
Expand All @@ -96,8 +124,7 @@
"CLX_MODEL_MAX_BATCH_SIZE": "8",
// "PYTHONASYNCIODEBUG": "1",
},
"args": [
]
"args": []
},
{
"name": "Python: Run TensorRT",
Expand All @@ -113,8 +140,7 @@
"CLX_MODEL_SEQ_LENGTH": "128",
"CLX_MODEL_MAX_BATCH_SIZE": "8",
},
"args": [
]
"args": []
},
{
"name": "Python: Run Preprocessing",
Expand Down
2 changes: 1 addition & 1 deletion cudf_subword_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def tokenize_text_series(text_ser, seq_len, stride, vocab_hash_file):

tokens, attention_masks, metadata = text_ser.str.subword_tokenize(
vocab_hash_file,
do_lower=True,
do_lower=False,
max_rows_tensor=max_rows_tensor,
stride=stride,
max_length=max_length,
Expand Down
75 changes: 55 additions & 20 deletions grpc_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,26 @@ def queue_batch(messages: MultiRequest):

inf_queue.put(messages)

def build_source():
def build_source() -> typing.Tuple[streamz.Source, int]:

source: Stream = None
max_itr: int = None # Maximum number of iterations for progress reporting. None = Unknown/Unlimited

use_kafka = True
use_kafka = False

if (not use_kafka):

def json_to_cudf(in_str: str):
df = cudf.io.read_json(io.StringIO("".join(in_str)), engine="cudf", lines=True)
return df

source: Stream = Stream.from_textfile(
".tmp/pcap_dump_nonull.json", asynchronous=True, loop=IOLoop.current()).rate_limit(
1 / 10000).timed_window(0.1).filter(lambda x: len(x) > 0).map(json_to_cudf).buffer(1000)
input_file = ".tmp/kafka-producer/pcap_out.json"

# Read the number of lines for progress reporting
max_itr = sum(1 for i in open(input_file, 'rb'))

source: Stream = Stream.from_textfile(input_file, asynchronous=True, loop=IOLoop.current()).rate_limit(
1 / 10000).timed_window(0.1).filter(lambda x: len(x) > 0).map(json_to_cudf).buffer(1000)
else:

# Kafka consumer configuration
Expand All @@ -182,22 +187,11 @@ def json_to_cudf(in_str: str):
loop=IOLoop.current(),
max_batch_size=config.model.max_batch_size)

return source
return source, max_itr

source: Source = build_source()
source, max_iterations = build_source()

def setup():
# # Preprocess each batch
# stream_df = source.map(filter_empty_data) \
# .filter(lambda x: len(x) > 0) \
# .map(process_batch) \
# .partition_batch(config.model.max_batch_size, timeout=1)

# # source.sink(lambda x: queue_progress.update())

# # Queue the inference work
# stream_df.sink(queue_batch)

# turn-on the worker thread
if (config.general.pipeline == "triton"):
from inference_triton import inference_worker
Expand Down Expand Up @@ -237,7 +231,12 @@ def to_cpu_json(message: cudf.DataFrame):
await asyncio.wait_for(channel.channel_ready(), timeout=10.0)
print("Connected to Preprocessing Server!")

progress = tqdm(desc="Running Inference for PII", smoothing=0.01, dynamic_ncols=True, unit="inf", mininterval=1.0)
progress = tqdm(desc="Running Inference for PII",
smoothing=0.01,
dynamic_ncols=True,
unit="inf",
mininterval=1.0,
total=max_iterations)

stub = request_pb2_grpc.PipelineStub(channel)

Expand Down Expand Up @@ -309,7 +308,43 @@ def post_process(x: SingleResponse):

out_stream = res_stream

out_stream = out_stream.filter(lambda x: x.probs.any().item())
# Set pre_class_file to non-None and it will output the input data with classifications appended.
pre_class_file = None
# pre_class_file = ".tmp/dataset2/pcap_dump_2_pre_class-model1-casing.json"

if (pre_class_file is not None and os.path.exists(pre_class_file)):
os.remove(pre_class_file)

def output_preclass(x: SingleResponse):
message = json.loads(x.input_str)

idx2label = {
0: 'address',
1: 'bank_acct',
2: 'credit_card',
3: 'email',
4: 'govt_id',
5: 'name',
6: 'password',
7: 'phone_num',
8: 'secret_keys',
9: 'user'
}

probs_np = (x.probs > 0.5).astype(cp.bool).get()

for i, label in idx2label.items():

message["pii_" + label] = probs_np[0, i].item()

with open(pre_class_file, "a") as f:
json.dump(message, f)
f.write("\n")

if (pre_class_file is not None):
out_stream.sink(output_preclass)

out_stream = out_stream.filter(lambda x: (x.probs > 0.5).any().item())
out_stream = out_stream.map(post_process)

producer_conf = {'bootstrap.servers': config.kafka.bootstrap_servers}
Expand Down
5 changes: 4 additions & 1 deletion inference_pytorch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
def inference_worker(loop: asyncio.BaseEventLoop, inf_queue: queue.Queue):

# Get model from https://drive.google.com/u/1/uc?id=1Lbj1IyHEBV9LS2Jo1z4cmlBNxtZUkYKa&export=download
model = torch.load(".tmp/ph_label_model.bin").to('cuda')
model = torch.load(".tmp/models_pytorch/model_10labels_256seq.bin").to('cuda')

while True:

Expand All @@ -37,6 +37,9 @@ def inference_worker(loop: asyncio.BaseEventLoop, inf_queue: queue.Queue):

probs_cp = cp.fromDlpack(to_dlpack(probs))

# TEMP: Set email to 0 since this gets detected a lot
# probs_cp[:, 3] = 0.0

# Ensure that we are of the shape `[Batch Size, Num Labels]`
if (len(probs_cp.shape) == 1):
probs_cp = cp.expand_dims(probs_cp, axis=1)
Expand Down
10 changes: 10 additions & 0 deletions launch_kafka.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,13 @@ docker run --rm -it -e KAFKA_BROKER_SERVERS=$(kafka-docker/broker-list.sh) --mou
$KAFKA_HOME/bin/kafka-topics.sh --alter --topic=test_pcap --zookeeper 172.17.0.1:2181 --config retention.ms=1
$KAFKA_HOME/bin/kafka-topics.sh --alter --topic=test_pcap --zookeeper 172.17.0.1:2181 --config retention.ms=86400000

# Start Jupyter to run the graph preprocessing
docker run --gpus=all --rm -ti -p 8889:8888 -p 8787:8787 -p 8786:8786 -v $PWD:/rapids/notebooks/host rapidsai/rapidsai-nightly:cuda10.2-runtime-ubuntu18.04-py3.8 /bin/bash

# Run viz generation pipeline then run jupyter notebook noteboks/network_graph_viz_frames_clean.ipynb. Afterwards run:
sudo chown -R mdemoret:mdemoret noteboks/output/ && rm /home/mdemoret/Repos/rapids/rapids-js-dev/modules/demo/graph/data/network_graph_viz_frames_multi_label/* && cp -r noteboks/output/* /home/mdemoret/Repos/rapids/rapids-js-dev/modules/demo/graph/data/network_graph_viz_frames_multi_label/

# Then run the viz with (from the rapids-js container)
yarn demo modules/demo/graph --nodes=$(echo data/network_graph_viz_frames_multi_label/{0..199}.0.nodes.csv | sed 's/ /,/g')\
--edges=$(echo data/network_graph_viz_frames_multi_label/{0..199}.0.edges.csv | sed 's/ /,/g')\
--params='"autoCenter":1,"strongGravityMode":0,"jitterTolerance":0.01,"scalingRatio":1,"gravity":5,"controlsVisible":0,"outboundAttraction":1,"linLogMode":1' --delay=100 --width=1920 --height=1080
Loading

0 comments on commit e29962c

Please sign in to comment.