diff --git a/ludwig/automl/auto_tune_config.py b/ludwig/automl/auto_tune_config.py index 971cbe4b104..e6686e20a99 100644 --- a/ludwig/automl/auto_tune_config.py +++ b/ludwig/automl/auto_tune_config.py @@ -12,7 +12,9 @@ raise ImportError(" ray is not installed. In order to use auto_train please run pip install ludwig[ray]") from ludwig.api import LudwigModel +from ludwig.backend import initialize_backend from ludwig.constants import ( + AUTO, AUTOML_DEFAULT_TEXT_ENCODER, AUTOML_LARGE_TEXT_DATASET, AUTOML_MAX_ROWS_PER_CHECKPOINT, @@ -20,6 +22,7 @@ AUTOML_SMALLER_TEXT_LENGTH, AUTOML_TEXT_ENCODER_MAX_TOKEN_LEN, BATCH_SIZE, + DEFAULT_BATCH_SIZE, HYPEROPT, PREPROCESSING, SPACE, @@ -70,9 +73,9 @@ BYTES_OPTIMIZER_PER_WEIGHT = 8 # for optimizer m and v vectors -def get_trainingset_metadata(config, dataset): +def get_trainingset_metadata(config, dataset, backend): (_, _, _, training_set_metadata) = preprocess_for_training( - config, dataset=dataset, preprocessing_params=config[PREPROCESSING] + config, dataset=dataset, preprocessing_params=config[PREPROCESSING], backend=backend ) return training_set_metadata @@ -117,7 +120,10 @@ def compute_memory_usage(config, training_set_metadata, model_category) -> int: update_config_with_metadata(config, training_set_metadata) lm = LudwigModel.create_model(config) model_size = lm.get_model_size() # number of parameters in model - batch_size = config[TRAINER][BATCH_SIZE] + batch_size = config[TRAINER].get(BATCH_SIZE, DEFAULT_BATCH_SIZE) + if batch_size == AUTO: + # Smallest valid batch size that will allow training to complete + batch_size = 2 memory_usage = model_size * (BYTES_PER_WEIGHT + BYTES_OPTIMIZER_PER_WEIGHT) * batch_size if model_category == TEXT: return _get_text_model_memory_usage(config, training_set_metadata, memory_usage) @@ -194,11 +200,13 @@ def _update_num_samples(num_samples, hyperparam_search_space): # Note: if run in Ray Cluster, this method is run remote with gpu resources requested if available -def memory_tune_config(config, dataset, model_category, row_count): +def memory_tune_config(config, dataset, model_category, row_count, backend): + backend = initialize_backend(backend) + fits_in_memory = False tried_reduce_seq_len = False raw_config = merge_with_defaults(config) - training_set_metadata = get_trainingset_metadata(raw_config, dataset) + training_set_metadata = get_trainingset_metadata(raw_config, dataset, backend) modified_hyperparam_search_space = copy.deepcopy(raw_config[HYPEROPT]["parameters"]) current_param_values = {} param_list = [] diff --git a/ludwig/automl/automl.py b/ludwig/automl/automl.py index bf4e2bd172c..058dcc83271 100644 --- a/ludwig/automl/automl.py +++ b/ludwig/automl/automl.py @@ -22,6 +22,7 @@ from ludwig.api import LudwigModel from ludwig.automl.auto_tune_config import memory_tune_config from ludwig.automl.base_config import _create_default_config, _get_reference_configs, DatasetInfo, get_dataset_info +from ludwig.backend import Backend, initialize_backend from ludwig.constants import ( AUTOML_DEFAULT_IMAGE_ENCODER, AUTOML_DEFAULT_TABULAR_MODEL, @@ -41,6 +42,7 @@ has_imbalanced_output, set_output_feature_metric, ) +from ludwig.utils.data_utils import load_dataset from ludwig.utils.defaults import default_random_seed from ludwig.utils.fs_utils import open_file from ludwig.utils.misc_utils import merge_dict @@ -136,6 +138,7 @@ def create_auto_config( user_config: Dict = None, random_seed: int = default_random_seed, use_reference_config: bool = False, + backend: Union[Backend, str] = None, ) -> dict: """Returns an auto-generated Ludwig config with the intent of training the best model on given given dataset / target in the given time limit. @@ -158,25 +161,28 @@ def create_auto_config( # Return :return: (dict) selected model configuration """ - default_configs, features_metadata = _create_default_config(dataset, target, time_limit_s, random_seed) + backend = initialize_backend(backend) + + if not isinstance(dataset, DatasetInfo): + dataset = load_dataset(dataset, df_lib=backend.df_engine.df_lib) + + dataset_info = get_dataset_info(dataset) if not isinstance(dataset, DatasetInfo) else dataset + default_configs, features_metadata = _create_default_config(dataset_info, target, time_limit_s, random_seed) model_config, model_category, row_count = _model_select( - dataset, default_configs, features_metadata, user_config, use_reference_config + dataset_info, default_configs, features_metadata, user_config, use_reference_config ) if tune_for_memory: + args = (model_config, dataset, model_category, row_count, backend) if ray.is_initialized(): resources = get_available_resources() # check if cluster has GPUS if resources["gpu"] > 0: model_config, fits_in_memory = ray.get( - ray.remote(num_gpus=1, num_cpus=1, max_calls=1)(memory_tune_config).remote( - model_config, dataset, model_category, row_count - ) + ray.remote(num_gpus=1, num_cpus=1, max_calls=1)(memory_tune_config).remote(*args) ) else: - model_config, fits_in_memory = ray.get( - ray.remote(num_cpus=1)(memory_tune_config).remote(model_config, dataset, model_category, row_count) - ) + model_config, fits_in_memory = ray.get(ray.remote(num_cpus=1)(memory_tune_config).remote(*args)) else: - model_config, fits_in_memory = memory_tune_config(model_config, dataset, model_category, row_count) + model_config, fits_in_memory = memory_tune_config(*args) if not fits_in_memory: warnings.warn( "AutoML with tune_for_memory enabled did not return estimation that model will fit in memory. " @@ -210,6 +216,7 @@ def train_with_config( :return: (AutoTrainResults) results containing hyperopt experiments and best model """ _ray_init() + model_type = get_model_type(config) hyperopt_results = _train( config, dataset, output_directory=output_directory, model_name=model_type, random_seed=random_seed, **kwargs @@ -230,7 +237,7 @@ def train_with_config( def _model_select( - dataset: Union[str, pd.DataFrame, dd.core.DataFrame, DatasetInfo], + dataset_info: DatasetInfo, default_configs, features_metadata, user_config, @@ -240,8 +247,6 @@ def _model_select( Note: Current implementation returns tabnet by default for tabular datasets. """ - - dataset_info = get_dataset_info(dataset) if not isinstance(dataset, DatasetInfo) else dataset fields = dataset_info.fields base_config = default_configs["base_config"] diff --git a/ludwig/automl/base_config.py b/ludwig/automl/base_config.py index 3eef6aa6b7e..cd86f52cd46 100644 --- a/ludwig/automl/base_config.py +++ b/ludwig/automl/base_config.py @@ -21,11 +21,11 @@ from dataclasses_json import dataclass_json, LetterCase from ludwig.constants import COMBINER, EXECUTOR, HYPEROPT, SCHEDULER, SEARCH_ALG, TEXT, TYPE -from ludwig.utils.automl.data_source import DataframeSource, DataSource +from ludwig.utils.automl.data_source import DataSource, wrap_data_source from ludwig.utils.automl.field_info import FieldConfig, FieldInfo, FieldMetadata -from ludwig.utils.automl.ray_utils import _ray_init, get_available_resources +from ludwig.utils.automl.ray_utils import get_available_resources from ludwig.utils.automl.type_inference import infer_type, should_exclude -from ludwig.utils.data_utils import load_dataset, load_yaml +from ludwig.utils.data_utils import load_yaml from ludwig.utils.defaults import default_random_seed PATH_HERE = os.path.abspath(os.path.dirname(__file__)) @@ -80,7 +80,7 @@ def allocate_experiment_resources(resources: dict) -> dict: def _create_default_config( - dataset: Union[str, dd.core.DataFrame, pd.DataFrame, DatasetInfo], + dataset_info: DatasetInfo, target_name: Union[str, List[str]] = None, time_limit_s: Union[int, float] = None, random_seed: int = default_random_seed, @@ -108,14 +108,9 @@ def _create_default_config( :return: (dict) dictionaries contain auto train config files for all available combiner types """ - _ray_init() resources = get_available_resources() experiment_resources = allocate_experiment_resources(resources) - dataset_info = dataset - if not isinstance(dataset, DatasetInfo): - dataset_info = get_dataset_info(dataset) - input_and_output_feature_config, features_metadata = get_features_config( dataset_info.fields, dataset_info.row_count, resources, target_name ) @@ -159,7 +154,7 @@ def _get_reference_configs() -> dict: return reference_configs -def get_dataset_info(dataset: Union[str, pd.DataFrame, dd.core.DataFrame]) -> DatasetInfo: +def get_dataset_info(df: Union[pd.DataFrame, dd.core.DataFrame]) -> DatasetInfo: """Constructs FieldInfo objects for each feature in dataset. These objects are used for downstream type inference. @@ -169,8 +164,7 @@ def get_dataset_info(dataset: Union[str, pd.DataFrame, dd.core.DataFrame]) -> Da # Return :return: (List[FieldInfo]) list of FieldInfo objects """ - dataframe = load_dataset(dataset) - source = DataframeSource(dataframe) + source = wrap_data_source(df) return get_dataset_info_from_source(source) diff --git a/ludwig/utils/automl/data_source.py b/ludwig/utils/automl/data_source.py index 46ee8418022..d808e6904b6 100644 --- a/ludwig/utils/automl/data_source.py +++ b/ludwig/utils/automl/data_source.py @@ -1,9 +1,13 @@ from abc import ABC, abstractmethod from typing import List, Tuple +import dask.dataframe as dd +import pandas as pd + from ludwig.utils.audio_utils import is_audio_score from ludwig.utils.automl.utils import avg_num_tokens from ludwig.utils.image_utils import is_image_score +from ludwig.utils.misc_utils import memoized_method from ludwig.utils.types import DataFrame @@ -89,3 +93,40 @@ def __len__(self) -> int: class DataframeSource(DataframeSourceMixin, DataSource): def __init__(self, df): self.df = df + + +class DaskDataSource(DataframeSource): + @memoized_method(maxsize=1) + def get_sample(self) -> pd.DataFrame: + # TODO: uniform random sample + return self.df.head(10000) + + @property + def sample(self) -> pd.DataFrame: + return self.get_sample() + + def get_distinct_values(self, column, max_values_to_return) -> Tuple[int, List[str], float]: + unique_values = self.df[column].drop_duplicates().dropna().persist() + num_unique_values = len(unique_values) + + # TODO(travis): implement imbalance ratio + imbalance_ratio = 1.0 + return num_unique_values, unique_values.head(max_values_to_return), imbalance_ratio + + def get_nonnull_values(self, column) -> int: + return self.df[column].notnull().sum().compute() + + def get_image_values(self, column: str, sample_size: int = 10) -> int: + return int(sum(is_image_score(None, x, column) for x in self.sample[column].head(sample_size))) + + def get_audio_values(self, column: str, sample_size: int = 10) -> int: + return int(sum(is_audio_score(x) for x in self.sample[column].head(sample_size))) + + def get_avg_num_tokens(self, column) -> int: + return avg_num_tokens(self.sample[column]) + + +def wrap_data_source(df: DataFrame) -> DataSource: + if isinstance(df, dd.core.DataFrame): + return DaskDataSource(df) + return DataframeSource(df) diff --git a/ludwig/utils/automl/type_inference.py b/ludwig/utils/automl/type_inference.py index a5f1318789d..267837c7cd7 100644 --- a/ludwig/utils/automl/type_inference.py +++ b/ludwig/utils/automl/type_inference.py @@ -72,7 +72,11 @@ def should_exclude(idx: int, field: FieldInfo, dtype: str, row_count: int, targe distinct_value_percent = float(field.num_distinct_values) / row_count if distinct_value_percent == 1.0: upper_name = field.name.upper() - if (idx == 0 and dtype == NUMBER) or upper_name.endswith("ID") or upper_name.startswith("ID"): + if ( + (idx == 0 and "INDEX" in upper_name and dtype == NUMBER) + or upper_name.endswith("ID") + or upper_name.startswith("ID") + ): return True return False diff --git a/ludwig/utils/misc_utils.py b/ludwig/utils/misc_utils.py index 6c485131854..8b62af9d5df 100644 --- a/ludwig/utils/misc_utils.py +++ b/ludwig/utils/misc_utils.py @@ -14,8 +14,10 @@ # limitations under the License. # ============================================================================== import copy +import functools import os import random +import weakref from collections import OrderedDict from collections.abc import Mapping @@ -159,3 +161,26 @@ def set_saved_weights_in_checkpoint_flag(config): def remove_empty_lines(str): return "\n".join([line.rstrip() for line in str.split("\n") if line.rstrip()]) + + +# TODO(travis): move to cached_property when we drop Python 3.7. +# https://stackoverflow.com/a/33672499 +def memoized_method(*lru_args, **lru_kwargs): + def decorator(func): + @functools.wraps(func) + def wrapped_func(self, *args, **kwargs): + # We're storing the wrapped method inside the instance. If we had + # a strong reference to self the instance would never die. + self_weak = weakref.ref(self) + + @functools.wraps(func) + @functools.lru_cache(*lru_args, **lru_kwargs) + def cached_method(*args, **kwargs): + return func(self_weak(), *args, **kwargs) + + setattr(self, func.__name__, cached_method) + return cached_method(*args, **kwargs) + + return wrapped_func + + return decorator diff --git a/tests/integration_tests/test_automl.py b/tests/integration_tests/test_automl.py index 3d7c3f9bc26..3c8c7b82d7e 100644 --- a/tests/integration_tests/test_automl.py +++ b/tests/integration_tests/test_automl.py @@ -1,31 +1,58 @@ import os +import tempfile +from typing import Any, Dict, List, Set from unittest import mock import pytest from ludwig.api import LudwigModel -from ludwig.constants import TRAINER +from ludwig.constants import INPUT_FEATURES, NAME, OUTPUT_FEATURES, TRAINER from tests.integration_tests.utils import category_feature, generate_data, number_feature try: - from ludwig.automl.automl import train_with_config + import dask.dataframe as dd + + from ludwig.automl.automl import create_auto_config, train_with_config from ludwig.hyperopt.execution import RayTuneExecutor except ImportError: pass +@pytest.fixture(scope="module") +def test_data(): + with tempfile.TemporaryDirectory() as tmpdir: + input_features = [ + number_feature(), + number_feature(), + category_feature(encoder={"vocab_size": 3}), + category_feature(encoder={"vocab_size": 3}), + ] + output_features = [category_feature(decoder={"vocab_size": 3})] + dataset_csv = generate_data( + input_features, output_features, os.path.join(tmpdir, "dataset.csv"), num_examples=100 + ) + yield input_features, output_features, dataset_csv + + @pytest.mark.distributed -@pytest.mark.parametrize("time_budget", [200, 1], ids=["high", "low"]) -def test_train_with_config(time_budget, ray_cluster_2cpu, tmpdir): - input_features = [ - number_feature(), - number_feature(), - category_feature(encoder={"vocab_size": 3}), - category_feature(encoder={"vocab_size": 3}), - ] - output_features = [category_feature(decoder={"vocab_size": 3})] - dataset = generate_data(input_features, output_features, os.path.join(tmpdir, "dataset.csv")) +@pytest.mark.parametrize("tune_for_memory", [True, False]) +def test_create_auto_config(tune_for_memory, test_data, ray_cluster_2cpu): + input_features, output_features, dataset_csv = test_data + targets = [feature[NAME] for feature in output_features] + df = dd.read_csv(dataset_csv) + config = create_auto_config(df, targets, time_limit_s=600, tune_for_memory=tune_for_memory, backend="ray") + + def to_name_set(features: List[Dict[str, Any]]) -> Set[str]: + return {feature[NAME] for feature in features} + assert to_name_set(config[INPUT_FEATURES]) == to_name_set(input_features) + assert to_name_set(config[OUTPUT_FEATURES]) == to_name_set(output_features) + + +@pytest.mark.distributed +@pytest.mark.parametrize("time_budget", [200, 1], ids=["high", "low"]) +def test_train_with_config(time_budget, test_data, ray_cluster_2cpu, tmpdir): + input_features, output_features, dataset_csv = test_data config = { "input_features": input_features, "output_features": output_features, @@ -68,7 +95,7 @@ def test_train_with_config(time_budget, ray_cluster_2cpu, tmpdir): mock_fn.side_effect = fn outdir = os.path.join(tmpdir, "output") - results = train_with_config(dataset, config, output_directory=outdir) + results = train_with_config(dataset_csv, config, output_directory=outdir) best_model = results.best_model if time_budget > 1: diff --git a/tests/ludwig/utils/automl/test_type_inference.py b/tests/ludwig/utils/automl/test_type_inference.py index cb528b2bce1..809202dfef6 100644 --- a/tests/ludwig/utils/automl/test_type_inference.py +++ b/tests/ludwig/utils/automl/test_type_inference.py @@ -68,7 +68,9 @@ def test_infer_type_explicit_date(): "idx,num_distinct_values,dtype,name,expected", [ (3, ROW_COUNT, NUMBER, "id", True), - (0, ROW_COUNT, NUMBER, "foo", True), + (0, ROW_COUNT, NUMBER, "index", True), + (1, ROW_COUNT, NUMBER, "index", False), + (0, ROW_COUNT, NUMBER, "foo", False), (3, ROW_COUNT, TEXT, "uuid", True), (0, ROW_COUNT, TEXT, "name", False), (0, ROW_COUNT, NUMBER, TARGET_NAME, False),