Skip to content

Commit

Permalink
Fixed support for distributed datasets in create_auto_config (#2508)
Browse files Browse the repository at this point in the history
  • Loading branch information
tgaddair authored and tgaddair committed Sep 16, 2022
1 parent 502aa1a commit 02557c7
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 44 deletions.
18 changes: 13 additions & 5 deletions ludwig/automl/auto_tune_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,17 @@
raise ImportError(" ray is not installed. In order to use auto_train please run pip install ludwig[ray]")

from ludwig.api import LudwigModel
from ludwig.backend import initialize_backend
from ludwig.constants import (
AUTO,
AUTOML_DEFAULT_TEXT_ENCODER,
AUTOML_LARGE_TEXT_DATASET,
AUTOML_MAX_ROWS_PER_CHECKPOINT,
AUTOML_SMALLER_TEXT_ENCODER,
AUTOML_SMALLER_TEXT_LENGTH,
AUTOML_TEXT_ENCODER_MAX_TOKEN_LEN,
BATCH_SIZE,
DEFAULT_BATCH_SIZE,
HYPEROPT,
PREPROCESSING,
SPACE,
Expand Down Expand Up @@ -70,9 +73,9 @@
BYTES_OPTIMIZER_PER_WEIGHT = 8 # for optimizer m and v vectors


def get_trainingset_metadata(config, dataset):
def get_trainingset_metadata(config, dataset, backend):
(_, _, _, training_set_metadata) = preprocess_for_training(
config, dataset=dataset, preprocessing_params=config[PREPROCESSING]
config, dataset=dataset, preprocessing_params=config[PREPROCESSING], backend=backend
)
return training_set_metadata

Expand Down Expand Up @@ -117,7 +120,10 @@ def compute_memory_usage(config, training_set_metadata, model_category) -> int:
update_config_with_metadata(config, training_set_metadata)
lm = LudwigModel.create_model(config)
model_size = lm.get_model_size() # number of parameters in model
batch_size = config[TRAINER][BATCH_SIZE]
batch_size = config[TRAINER].get(BATCH_SIZE, DEFAULT_BATCH_SIZE)
if batch_size == AUTO:
# Smallest valid batch size that will allow training to complete
batch_size = 2
memory_usage = model_size * (BYTES_PER_WEIGHT + BYTES_OPTIMIZER_PER_WEIGHT) * batch_size
if model_category == TEXT:
return _get_text_model_memory_usage(config, training_set_metadata, memory_usage)
Expand Down Expand Up @@ -194,11 +200,13 @@ def _update_num_samples(num_samples, hyperparam_search_space):


# Note: if run in Ray Cluster, this method is run remote with gpu resources requested if available
def memory_tune_config(config, dataset, model_category, row_count):
def memory_tune_config(config, dataset, model_category, row_count, backend):
backend = initialize_backend(backend)

fits_in_memory = False
tried_reduce_seq_len = False
raw_config = merge_with_defaults(config)
training_set_metadata = get_trainingset_metadata(raw_config, dataset)
training_set_metadata = get_trainingset_metadata(raw_config, dataset, backend)
modified_hyperparam_search_space = copy.deepcopy(raw_config[HYPEROPT]["parameters"])
current_param_values = {}
param_list = []
Expand Down
29 changes: 17 additions & 12 deletions ludwig/automl/automl.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from ludwig.api import LudwigModel
from ludwig.automl.auto_tune_config import memory_tune_config
from ludwig.automl.base_config import _create_default_config, _get_reference_configs, DatasetInfo, get_dataset_info
from ludwig.backend import Backend, initialize_backend
from ludwig.constants import (
AUTOML_DEFAULT_IMAGE_ENCODER,
AUTOML_DEFAULT_TABULAR_MODEL,
Expand All @@ -41,6 +42,7 @@
has_imbalanced_output,
set_output_feature_metric,
)
from ludwig.utils.data_utils import load_dataset
from ludwig.utils.defaults import default_random_seed
from ludwig.utils.fs_utils import open_file
from ludwig.utils.misc_utils import merge_dict
Expand Down Expand Up @@ -136,6 +138,7 @@ def create_auto_config(
user_config: Dict = None,
random_seed: int = default_random_seed,
use_reference_config: bool = False,
backend: Union[Backend, str] = None,
) -> dict:
"""Returns an auto-generated Ludwig config with the intent of training the best model on given given dataset /
target in the given time limit.
Expand All @@ -158,25 +161,28 @@ def create_auto_config(
# Return
:return: (dict) selected model configuration
"""
default_configs, features_metadata = _create_default_config(dataset, target, time_limit_s, random_seed)
backend = initialize_backend(backend)

if not isinstance(dataset, DatasetInfo):
dataset = load_dataset(dataset, df_lib=backend.df_engine.df_lib)

dataset_info = get_dataset_info(dataset) if not isinstance(dataset, DatasetInfo) else dataset
default_configs, features_metadata = _create_default_config(dataset_info, target, time_limit_s, random_seed)
model_config, model_category, row_count = _model_select(
dataset, default_configs, features_metadata, user_config, use_reference_config
dataset_info, default_configs, features_metadata, user_config, use_reference_config
)
if tune_for_memory:
args = (model_config, dataset, model_category, row_count, backend)
if ray.is_initialized():
resources = get_available_resources() # check if cluster has GPUS
if resources["gpu"] > 0:
model_config, fits_in_memory = ray.get(
ray.remote(num_gpus=1, num_cpus=1, max_calls=1)(memory_tune_config).remote(
model_config, dataset, model_category, row_count
)
ray.remote(num_gpus=1, num_cpus=1, max_calls=1)(memory_tune_config).remote(*args)
)
else:
model_config, fits_in_memory = ray.get(
ray.remote(num_cpus=1)(memory_tune_config).remote(model_config, dataset, model_category, row_count)
)
model_config, fits_in_memory = ray.get(ray.remote(num_cpus=1)(memory_tune_config).remote(*args))
else:
model_config, fits_in_memory = memory_tune_config(model_config, dataset, model_category, row_count)
model_config, fits_in_memory = memory_tune_config(*args)
if not fits_in_memory:
warnings.warn(
"AutoML with tune_for_memory enabled did not return estimation that model will fit in memory. "
Expand Down Expand Up @@ -210,6 +216,7 @@ def train_with_config(
:return: (AutoTrainResults) results containing hyperopt experiments and best model
"""
_ray_init()

model_type = get_model_type(config)
hyperopt_results = _train(
config, dataset, output_directory=output_directory, model_name=model_type, random_seed=random_seed, **kwargs
Expand All @@ -230,7 +237,7 @@ def train_with_config(


def _model_select(
dataset: Union[str, pd.DataFrame, dd.core.DataFrame, DatasetInfo],
dataset_info: DatasetInfo,
default_configs,
features_metadata,
user_config,
Expand All @@ -240,8 +247,6 @@ def _model_select(
Note: Current implementation returns tabnet by default for tabular datasets.
"""

dataset_info = get_dataset_info(dataset) if not isinstance(dataset, DatasetInfo) else dataset
fields = dataset_info.fields

base_config = default_configs["base_config"]
Expand Down
18 changes: 6 additions & 12 deletions ludwig/automl/base_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
from dataclasses_json import dataclass_json, LetterCase

from ludwig.constants import COMBINER, EXECUTOR, HYPEROPT, SCHEDULER, SEARCH_ALG, TEXT, TYPE
from ludwig.utils.automl.data_source import DataframeSource, DataSource
from ludwig.utils.automl.data_source import DataSource, wrap_data_source
from ludwig.utils.automl.field_info import FieldConfig, FieldInfo, FieldMetadata
from ludwig.utils.automl.ray_utils import _ray_init, get_available_resources
from ludwig.utils.automl.ray_utils import get_available_resources
from ludwig.utils.automl.type_inference import infer_type, should_exclude
from ludwig.utils.data_utils import load_dataset, load_yaml
from ludwig.utils.data_utils import load_yaml
from ludwig.utils.defaults import default_random_seed

PATH_HERE = os.path.abspath(os.path.dirname(__file__))
Expand Down Expand Up @@ -80,7 +80,7 @@ def allocate_experiment_resources(resources: dict) -> dict:


def _create_default_config(
dataset: Union[str, dd.core.DataFrame, pd.DataFrame, DatasetInfo],
dataset_info: DatasetInfo,
target_name: Union[str, List[str]] = None,
time_limit_s: Union[int, float] = None,
random_seed: int = default_random_seed,
Expand Down Expand Up @@ -108,14 +108,9 @@ def _create_default_config(
:return: (dict) dictionaries contain auto train config files for all available
combiner types
"""
_ray_init()
resources = get_available_resources()
experiment_resources = allocate_experiment_resources(resources)

dataset_info = dataset
if not isinstance(dataset, DatasetInfo):
dataset_info = get_dataset_info(dataset)

input_and_output_feature_config, features_metadata = get_features_config(
dataset_info.fields, dataset_info.row_count, resources, target_name
)
Expand Down Expand Up @@ -159,7 +154,7 @@ def _get_reference_configs() -> dict:
return reference_configs


def get_dataset_info(dataset: Union[str, pd.DataFrame, dd.core.DataFrame]) -> DatasetInfo:
def get_dataset_info(df: Union[pd.DataFrame, dd.core.DataFrame]) -> DatasetInfo:
"""Constructs FieldInfo objects for each feature in dataset. These objects are used for downstream type
inference.
Expand All @@ -169,8 +164,7 @@ def get_dataset_info(dataset: Union[str, pd.DataFrame, dd.core.DataFrame]) -> Da
# Return
:return: (List[FieldInfo]) list of FieldInfo objects
"""
dataframe = load_dataset(dataset)
source = DataframeSource(dataframe)
source = wrap_data_source(df)
return get_dataset_info_from_source(source)


Expand Down
41 changes: 41 additions & 0 deletions ludwig/utils/automl/data_source.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
from abc import ABC, abstractmethod
from typing import List, Tuple

import dask.dataframe as dd
import pandas as pd

from ludwig.utils.audio_utils import is_audio_score
from ludwig.utils.automl.utils import avg_num_tokens
from ludwig.utils.image_utils import is_image_score
from ludwig.utils.misc_utils import memoized_method
from ludwig.utils.types import DataFrame


Expand Down Expand Up @@ -89,3 +93,40 @@ def __len__(self) -> int:
class DataframeSource(DataframeSourceMixin, DataSource):
def __init__(self, df):
self.df = df


class DaskDataSource(DataframeSource):
@memoized_method(maxsize=1)
def get_sample(self) -> pd.DataFrame:
# TODO: uniform random sample
return self.df.head(10000)

@property
def sample(self) -> pd.DataFrame:
return self.get_sample()

def get_distinct_values(self, column, max_values_to_return) -> Tuple[int, List[str], float]:
unique_values = self.df[column].drop_duplicates().dropna().persist()
num_unique_values = len(unique_values)

# TODO(travis): implement imbalance ratio
imbalance_ratio = 1.0
return num_unique_values, unique_values.head(max_values_to_return), imbalance_ratio

def get_nonnull_values(self, column) -> int:
return self.df[column].notnull().sum().compute()

def get_image_values(self, column: str, sample_size: int = 10) -> int:
return int(sum(is_image_score(None, x, column) for x in self.sample[column].head(sample_size)))

def get_audio_values(self, column: str, sample_size: int = 10) -> int:
return int(sum(is_audio_score(x) for x in self.sample[column].head(sample_size)))

def get_avg_num_tokens(self, column) -> int:
return avg_num_tokens(self.sample[column])


def wrap_data_source(df: DataFrame) -> DataSource:
if isinstance(df, dd.core.DataFrame):
return DaskDataSource(df)
return DataframeSource(df)
6 changes: 5 additions & 1 deletion ludwig/utils/automl/type_inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ def should_exclude(idx: int, field: FieldInfo, dtype: str, row_count: int, targe
distinct_value_percent = float(field.num_distinct_values) / row_count
if distinct_value_percent == 1.0:
upper_name = field.name.upper()
if (idx == 0 and dtype == NUMBER) or upper_name.endswith("ID") or upper_name.startswith("ID"):
if (
(idx == 0 and "INDEX" in upper_name and dtype == NUMBER)
or upper_name.endswith("ID")
or upper_name.startswith("ID")
):
return True

return False
25 changes: 25 additions & 0 deletions ludwig/utils/misc_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
# limitations under the License.
# ==============================================================================
import copy
import functools
import os
import random
import weakref
from collections import OrderedDict
from collections.abc import Mapping

Expand Down Expand Up @@ -159,3 +161,26 @@ def set_saved_weights_in_checkpoint_flag(config):

def remove_empty_lines(str):
return "\n".join([line.rstrip() for line in str.split("\n") if line.rstrip()])


# TODO(travis): move to cached_property when we drop Python 3.7.
# https://stackoverflow.com/a/33672499
def memoized_method(*lru_args, **lru_kwargs):
def decorator(func):
@functools.wraps(func)
def wrapped_func(self, *args, **kwargs):
# We're storing the wrapped method inside the instance. If we had
# a strong reference to self the instance would never die.
self_weak = weakref.ref(self)

@functools.wraps(func)
@functools.lru_cache(*lru_args, **lru_kwargs)
def cached_method(*args, **kwargs):
return func(self_weak(), *args, **kwargs)

setattr(self, func.__name__, cached_method)
return cached_method(*args, **kwargs)

return wrapped_func

return decorator
53 changes: 40 additions & 13 deletions tests/integration_tests/test_automl.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,58 @@
import os
import tempfile
from typing import Any, Dict, List, Set
from unittest import mock

import pytest

from ludwig.api import LudwigModel
from ludwig.constants import TRAINER
from ludwig.constants import INPUT_FEATURES, NAME, OUTPUT_FEATURES, TRAINER
from tests.integration_tests.utils import category_feature, generate_data, number_feature

try:
from ludwig.automl.automl import train_with_config
import dask.dataframe as dd

from ludwig.automl.automl import create_auto_config, train_with_config
from ludwig.hyperopt.execution import RayTuneExecutor
except ImportError:
pass


@pytest.fixture(scope="module")
def test_data():
with tempfile.TemporaryDirectory() as tmpdir:
input_features = [
number_feature(),
number_feature(),
category_feature(encoder={"vocab_size": 3}),
category_feature(encoder={"vocab_size": 3}),
]
output_features = [category_feature(decoder={"vocab_size": 3})]
dataset_csv = generate_data(
input_features, output_features, os.path.join(tmpdir, "dataset.csv"), num_examples=100
)
yield input_features, output_features, dataset_csv


@pytest.mark.distributed
@pytest.mark.parametrize("time_budget", [200, 1], ids=["high", "low"])
def test_train_with_config(time_budget, ray_cluster_2cpu, tmpdir):
input_features = [
number_feature(),
number_feature(),
category_feature(encoder={"vocab_size": 3}),
category_feature(encoder={"vocab_size": 3}),
]
output_features = [category_feature(decoder={"vocab_size": 3})]
dataset = generate_data(input_features, output_features, os.path.join(tmpdir, "dataset.csv"))
@pytest.mark.parametrize("tune_for_memory", [True, False])
def test_create_auto_config(tune_for_memory, test_data, ray_cluster_2cpu):
input_features, output_features, dataset_csv = test_data
targets = [feature[NAME] for feature in output_features]
df = dd.read_csv(dataset_csv)
config = create_auto_config(df, targets, time_limit_s=600, tune_for_memory=tune_for_memory, backend="ray")

def to_name_set(features: List[Dict[str, Any]]) -> Set[str]:
return {feature[NAME] for feature in features}

assert to_name_set(config[INPUT_FEATURES]) == to_name_set(input_features)
assert to_name_set(config[OUTPUT_FEATURES]) == to_name_set(output_features)


@pytest.mark.distributed
@pytest.mark.parametrize("time_budget", [200, 1], ids=["high", "low"])
def test_train_with_config(time_budget, test_data, ray_cluster_2cpu, tmpdir):
input_features, output_features, dataset_csv = test_data
config = {
"input_features": input_features,
"output_features": output_features,
Expand Down Expand Up @@ -68,7 +95,7 @@ def test_train_with_config(time_budget, ray_cluster_2cpu, tmpdir):
mock_fn.side_effect = fn

outdir = os.path.join(tmpdir, "output")
results = train_with_config(dataset, config, output_directory=outdir)
results = train_with_config(dataset_csv, config, output_directory=outdir)
best_model = results.best_model

if time_budget > 1:
Expand Down
Loading

0 comments on commit 02557c7

Please sign in to comment.