Skip to content

Commit

Permalink
Add in-memory dataset size calculation to dataset statistics and hype…
Browse files Browse the repository at this point in the history
…ropt (#2509) (#2518)

* Add in-memory dataset size calculation to dataset statistics (#2509)

* add in-memory dataset size calculation

* reduce constraints on test

* Fix failing tests by adding conditional checks before function call

* merge into single tab, use memory formatter, fix conditional

* clean up

* make benchmarking a module

* Surfacing dataset statistics in hyperopt (#2515)

* surfacing dataset statistics in hyperopt

* make datasets loaders a module
  • Loading branch information
arnavgarg1 authored Sep 16, 2022
1 parent 4156ca6 commit 94af081
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 8 deletions.
13 changes: 5 additions & 8 deletions ludwig/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
load_yaml,
save_json,
)
from ludwig.utils.dataset_utils import generate_dataset_statistics
from ludwig.utils.defaults import default_random_seed, merge_with_defaults
from ludwig.utils.fs_utils import makedirs, path_exists, upload_output_directory
from ludwig.utils.misc_utils import (
Expand Down Expand Up @@ -473,19 +474,15 @@ def on_epoch_end(self, trainer, progress_tracker, save_path):
self.training_set_metadata = training_set_metadata

if self.backend.is_coordinator():
dataset_statistics = [["Dataset", "Size"]]
dataset_statistics.append(["Training", len(training_set)])
if validation_set is not None:
dataset_statistics.append(["Validation", len(validation_set)])
if test_set is not None:
dataset_statistics.append(["Test", len(test_set)])
dataset_statistics = generate_dataset_statistics(training_set, validation_set, test_set)

if not skip_save_model:
# save train set metadata
os.makedirs(model_dir, exist_ok=True)
save_json(os.path.join(model_dir, TRAIN_SET_METADATA_FILE_NAME), training_set_metadata)

logger.info("\nDataset sizes:")
logger.info(tabulate(dataset_statistics, headers="firstrow", tablefmt="fancy_grid", floatfmt=".4f"))
logger.info("\nDataset Statistics")
logger.info(tabulate(dataset_statistics, headers="firstrow", tablefmt="fancy_grid"))

for callback in self.callbacks:
callback.on_train_init(
Expand Down
Empty file added ludwig/benchmarking/__init__.py
Empty file.
4 changes: 4 additions & 0 deletions ludwig/data/dataset/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def initialize_batcher(self, batch_size=128, should_shuffle=True, seed=0, ignore
def to_df(self):
raise NotImplementedError()

@property
def in_memory_size_bytes(self):
raise NotImplementedError()


class DatasetManager(ABC):
@abstractmethod
Expand Down
5 changes: 5 additions & 0 deletions ludwig/data/dataset/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ def get_dataset(self):
def __len__(self):
return self.size

@property
def in_memory_size_bytes(self):
df = self.to_df()
return df.memory_usage(deep=True).sum() if df is not None else 0

@contextlib.contextmanager
def initialize_batcher(self, batch_size=128, should_shuffle=True, seed=0, ignore_last=False, horovod=None):
sampler = DistributedSampler(len(self), shuffle=should_shuffle, seed=seed, horovod=horovod)
Expand Down
6 changes: 6 additions & 0 deletions ludwig/data/dataset/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,12 @@ def __len__(self):
def size(self):
return len(self)

@property
def in_memory_size_bytes(self):
"""Memory size may be unknown, so return 0 incase size_bytes() returns None
https://docs.ray.io/en/releases-1.12.1/_modules/ray/data/dataset.html#Dataset.size_bytes."""
return self.ds.size_bytes() if self.ds is not None else 0

def to_df(self):
return self.df_engine.from_ray_dataset(self.ds)

Expand Down
Empty file.
7 changes: 7 additions & 0 deletions ludwig/hyperopt/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import pandas as pd
import yaml
from tabulate import tabulate

from ludwig.api import LudwigModel
from ludwig.backend import Backend, initialize_backend, LocalBackend
Expand All @@ -29,6 +30,7 @@
from ludwig.hyperopt.results import HyperoptResults
from ludwig.hyperopt.utils import print_hyperopt_results, save_hyperopt_stats, should_tune_preprocessing
from ludwig.utils.backward_compatibility import upgrade_to_latest_version
from ludwig.utils.dataset_utils import generate_dataset_statistics
from ludwig.utils.defaults import default_random_seed, merge_with_defaults
from ludwig.utils.fs_utils import makedirs, open_file
from ludwig.utils.misc_utils import get_class_attributes, get_from_registry, set_default_value, set_default_values
Expand Down Expand Up @@ -320,6 +322,11 @@ def hyperopt(
)
dataset = None

dataset_statistics = generate_dataset_statistics(training_set, validation_set, test_set)

logging.info("\nDataset Statistics")
logging.info(tabulate(dataset_statistics, headers="firstrow", tablefmt="fancy_grid"))

for callback in callbacks or []:
callback.on_hyperopt_preprocessing_end(experiment_name)

Expand Down
19 changes: 19 additions & 0 deletions ludwig/utils/dataset_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
from typing import List, Tuple, Union

import pandas as pd
from sklearn.model_selection import train_test_split

from ludwig.constants import TEST_SPLIT, TRAIN_SPLIT, VALIDATION_SPLIT
from ludwig.data.dataset.base import Dataset
from ludwig.utils.defaults import default_random_seed


Expand Down Expand Up @@ -85,3 +88,19 @@ def get_repeatable_train_val_test_split(
df_test["split"] = TEST_SPLIT
df_split = pd.concat([df_train, df_val, df_test], ignore_index=True)
return df_split


def generate_dataset_statistics(
training_set: Dataset, validation_set: Union[Dataset, None], test_set: Union[Dataset, None]
) -> List[Tuple[str, int, int]]:
from ludwig.benchmarking.utils import format_memory

dataset_statistics = [["Dataset", "Size (Rows)", "Size (In Memory)"]]
dataset_statistics.append(["Training", len(training_set), format_memory(training_set.in_memory_size_bytes)])
if validation_set is not None:
dataset_statistics.append(
["Validation", len(validation_set), format_memory(validation_set.in_memory_size_bytes)]
)
if test_set is not None:
dataset_statistics.append(["Test", len(test_set), format_memory(test_set.in_memory_size_bytes)])
return dataset_statistics
27 changes: 27 additions & 0 deletions tests/integration_tests/test_preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,30 @@ def test_empty_training_set_error(backend, tmpdir, ray_cluster_2cpu):
ludwig_model = LudwigModel(config, backend=backend)
with pytest.raises(ValueError, match="Training data is empty following preprocessing"):
ludwig_model.preprocess(dataset=df)


@pytest.mark.distributed
@pytest.mark.parametrize(
"backend",
[
pytest.param("local", id="local"),
pytest.param("ray", id="ray", marks=pytest.mark.distributed),
],
)
def test_in_memory_dataset_size(backend, tmpdir, ray_cluster_2cpu):
data_csv_path = os.path.join(tmpdir, "data.csv")

out_feat = binary_feature()
input_features = [number_feature()]
output_features = [out_feat]
config = {"input_features": input_features, "output_features": output_features}

training_data_csv_path = generate_data(input_features, output_features, data_csv_path)
df = pd.read_csv(training_data_csv_path)

ludwig_model = LudwigModel(config, backend=backend)
training_dataset, validation_dataset, test_dataset, _ = ludwig_model.preprocess(dataset=df)

assert training_dataset.in_memory_size_bytes > 0
assert validation_dataset.in_memory_size_bytes > 0
assert test_dataset.in_memory_size_bytes > 0

0 comments on commit 94af081

Please sign in to comment.