Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] Add model and experiment template 'click' options to dfp example pipelines, and make model names Databricks compatible. #1245

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def __init__(self,
source: str,
tracking_uri: str,
silence_monitors: bool,
mlflow_experiment_name_formatter: str,
mlflow_model_name_formatter: str,
train_users: str = None):

self._skip_users = list(skip_user)
Expand All @@ -65,8 +67,8 @@ def __init__(self,
self._time_fields: TimeFields = None
self._silence_monitors = silence_monitors

self._model_name_formatter = f"DFP-{source}-" + "{user_id}"
self._experiment_name_formatter = f"dfp/{source}/training/" + "{reg_model_name}"
self._model_name_formatter = mlflow_model_name_formatter
self._experiment_name_formatter = mlflow_experiment_name_formatter

@staticmethod
def verify_init(func):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@
type=str,
default="http://mlflow:5000",
help=("The MLflow tracking URI to connect to the tracking backend."))
@click.option('--mlflow_experiment_name_template',
type=str,
default="dfp/azure/training/{reg_model_name}",
help="The MLflow experiment name template to use when logging experiments. ")
@click.option('--mlflow_model_name_template',
type=str,
default="DFP-azure-{user_id}",
help="The MLflow model name template to use when logging models. ")
def run_pipeline(train_users,
skip_user: typing.Tuple[str],
only_user: typing.Tuple[str],
Expand All @@ -149,6 +157,8 @@ def run_pipeline(train_users,
log_level,
sample_rate_s,
filter_threshold,
mlflow_experiment_name_template,
mlflow_model_name_template,
**kwargs):
"""Runs the DFP pipeline."""
# To include the generic, we must be training all or generic
Expand Down Expand Up @@ -311,8 +321,8 @@ def run_pipeline(train_users,
# Output is UserMessageMeta -- Cached frame set
pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema))

model_name_formatter = "DFP-azure-{user_id}"
experiment_name_formatter = "dfp/azure/training/{reg_model_name}"
model_name_formatter = mlflow_model_name_template
experiment_name_formatter = mlflow_experiment_name_template

if (is_training):
# Finally, perform training which will output a model
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@
type=str,
default="http://mlflow:5000",
help=("The MLflow tracking URI to connect to the tracking backend."))
@click.option('--mlflow_experiment_name_template',
type=str,
default="dfp/duo/training/{reg_model_name}",
help="The MLflow experiment name template to use when logging experiments. ")
@click.option('--mlflow_model_name_template',
type=str,
default="DFP-duo-{user_id}",
help="The MLflow model name template to use when logging models. ")
def run_pipeline(train_users,
skip_user: typing.Tuple[str],
only_user: typing.Tuple[str],
Expand All @@ -150,6 +158,8 @@ def run_pipeline(train_users,
log_level,
sample_rate_s,
filter_threshold,
mlflow_experiment_name_template,
mlflow_model_name_template,
**kwargs):
"""Runs the DFP pipeline."""
# To include the generic, we must be training all or generic
Expand Down Expand Up @@ -306,8 +316,8 @@ def run_pipeline(train_users,
# Output is UserMessageMeta -- Cached frame set
pipeline.add_stage(DFPPreprocessingStage(config, input_schema=preprocess_schema))

model_name_formatter = "DFP-duo-{user_id}"
experiment_name_formatter = "dfp/duo/training/{reg_model_name}"
model_name_formatter = mlflow_model_name_template
experiment_name_formatter = mlflow_experiment_name_template

if (is_training):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@
type=str,
default="http://mlflow:5000",
help=("The MLflow tracking URI to connect to the tracking backend."))
@click.option('--mlflow_experiment_name_template',
type=str,
default="dfp/{source}/training/{reg_model_name}",
help="The MLflow experiment name template to use when logging experiments. ")
@click.option('--mlflow_model_name_template',
type=str,
default="DFP-{source}-{user_id}",
help="The MLflow model name template to use when logging models. ")
@click.option("--disable_pre_filtering",
is_flag=True,
help=("Enabling this option will skip pre-filtering of json messages. "
Expand All @@ -126,6 +134,8 @@ def run_pipeline(source: str,
tracking_uri,
silence_monitors,
use_cpp,
mlflow_experiment_name_template,
mlflow_model_name_template,
**kwargs):
if (skip_user and only_user):
logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting")
Expand All @@ -140,6 +150,8 @@ def run_pipeline(source: str,
source,
tracking_uri,
silence_monitors,
mlflow_experiment_name_template,
mlflow_model_name_template,
train_users)

dfp_arg_parser.init()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@
type=str,
default="http://mlflow:5000",
help=("The MLflow tracking URI to connect to the tracking backend."))
@click.option('--mlflow_experiment_name_template',
type=str,
default="dfp/{source}/training/{reg_model_name}",
help="The MLflow experiment name template to use when logging experiments. ")
@click.option('--mlflow_model_name_template',
type=str,
default="DFP-{source}-{user_id}",
help="The MLflow model name template to use when logging models. ")
@click.option('--bootstrap_servers',
type=str,
default="localhost:9092",
Expand Down Expand Up @@ -138,6 +146,8 @@ def run_pipeline(source: str,
tracking_uri,
silence_monitors,
use_cpp,
mlflow_experiment_name_template,
mlflow_model_name_template,
**kwargs):
if (skip_user and only_user):
logging.error("Option --skip_user and --only_user are mutually exclusive. Exiting")
Expand All @@ -152,6 +162,8 @@ def run_pipeline(source: str,
source,
tracking_uri,
silence_monitors,
mlflow_experiment_name_template,
mlflow_model_name_template,
train_users)

dfp_arg_parser.init()
Expand Down
29 changes: 26 additions & 3 deletions morpheus/controllers/mlflow_model_writer_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,27 @@ def experiment_name_formatter(self):
def databricks_permissions(self):
return self._databricks_permissions

def _create_safe_user_id(self, user_id: str):
"""
Creates a safe user ID for use in MLflow model names and experiment names.
Parameters
----------
user_id : str
The user ID.
Returns
-------
str
The generated safe user ID.
"""

safe_user_id = user_id.replace('.', '_dot_')
safe_user_id = safe_user_id.replace('/', '_slash_')
safe_user_id = safe_user_id.replace(':', '_colon_')

return safe_user_id

def user_id_to_model(self, user_id: str):
"""
Converts a user ID to an model name
Expand All @@ -102,7 +123,7 @@ def user_id_to_model(self, user_id: str):
"""

kwargs = {
"user_id": user_id,
"user_id": self._create_safe_user_id(user_id),
"user_md5": hashlib.md5(user_id.encode('utf-8')).hexdigest(),
}

Expand All @@ -123,9 +144,11 @@ def user_id_to_experiment(self, user_id: str) -> str:
The generated experiment name.
"""

safe_user_id = self._create_safe_user_id(user_id)

kwargs = {
"user_id": user_id,
"user_md5": hashlib.md5(user_id.encode('utf-8')).hexdigest(),
"user_id": safe_user_id,
"user_md5": hashlib.md5(safe_user_id.encode('utf-8')).hexdigest(),
"reg_model_name": self.user_id_to_model(user_id=user_id)
}

Expand Down