Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restructured split config and added datetime splitting #2132

Merged
merged 59 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
7535424
Refactor split
tgaddair Jun 11, 2022
71ede20
Added datetime splitter
tgaddair Jun 11, 2022
65ae007
Fixed tests
tgaddair Jun 11, 2022
eb84e37
Fixed tests
tgaddair Jun 11, 2022
2a2d103
Added random_seed
tgaddair Jun 11, 2022
58885e7
Fixd pre-split datasets
tgaddair Jun 13, 2022
40801a4
Fixed writing split file
tgaddair Jun 13, 2022
8a74998
Added has_split
tgaddair Jun 13, 2022
8704809
Fixed split file
tgaddair Jun 13, 2022
1260dd4
Fixed ext
tgaddair Jun 13, 2022
e1c865e
Added random split test
tgaddair Jun 13, 2022
ec1e836
Fixed split return order
tgaddair Jun 13, 2022
9ee9ecd
Added stratify test
tgaddair Jun 13, 2022
1c9a2e5
Fixed stratify test
tgaddair Jun 13, 2022
83adea5
Added datetime split test
tgaddair Jun 13, 2022
4fadb71
Fixed imports
tgaddair Jun 13, 2022
b37c0c2
Fixed test
tgaddair Jun 13, 2022
23ed558
Fixed test
tgaddair Jun 13, 2022
ca07313
Merge
tgaddair Jun 18, 2022
9bfcd1b
Improved stratify
tgaddair Jun 18, 2022
476ecbe
Added tests
tgaddair Jun 18, 2022
7aecc08
Test determinism
tgaddair Jun 18, 2022
ea113d1
Addressed comments
tgaddair Jun 18, 2022
0b67e43
Added upgrade path
tgaddair Jun 18, 2022
06cb8d8
Fixed backwards compatibility
tgaddair Jun 18, 2022
2cbe63a
Fixed split
tgaddair Jun 19, 2022
09f2f84
Added backwards compatibility test
tgaddair Jun 19, 2022
cf02d1b
Refactor split
tgaddair Jun 11, 2022
fabe7fd
Added datetime splitter
tgaddair Jun 11, 2022
a36c365
Fixed tests
tgaddair Jun 11, 2022
299b6eb
Fixed tests
tgaddair Jun 11, 2022
151ec90
Added random_seed
tgaddair Jun 11, 2022
fe045e4
Fixd pre-split datasets
tgaddair Jun 13, 2022
5880202
Fixed writing split file
tgaddair Jun 13, 2022
71deb9f
Added has_split
tgaddair Jun 13, 2022
649ab7d
Fixed split file
tgaddair Jun 13, 2022
14f89a4
Fixed ext
tgaddair Jun 13, 2022
e9ada1a
Added random split test
tgaddair Jun 13, 2022
1ef8f82
Fixed split return order
tgaddair Jun 13, 2022
6cce0a2
Added stratify test
tgaddair Jun 13, 2022
4b9f83a
Fixed stratify test
tgaddair Jun 13, 2022
f7c6fc8
Added datetime split test
tgaddair Jun 13, 2022
19fcffb
Fixed imports
tgaddair Jun 13, 2022
259fc64
Fixed test
tgaddair Jun 13, 2022
cdf4371
Fixed test
tgaddair Jun 13, 2022
16bc391
Improved stratify
tgaddair Jun 18, 2022
30b4f92
Added tests
tgaddair Jun 18, 2022
9bd98fb
Test determinism
tgaddair Jun 18, 2022
2bf445f
Addressed comments
tgaddair Jun 18, 2022
c371360
Added upgrade path
tgaddair Jun 18, 2022
9c38b8b
Fixed backwards compatibility
tgaddair Jun 18, 2022
d3f0758
Fixed split
tgaddair Jun 19, 2022
599b6db
Added backwards compatibility test
tgaddair Jun 19, 2022
97e4d9f
Merge branch 'master' into ref-split
tgaddair Jun 21, 2022
62f8030
Raise a value error if the training dataset is empty.
justinxzhao Jun 21, 2022
cd4190a
Raise a value error if the training dataset is empty.
justinxzhao Jun 21, 2022
ef40508
Increase tolerance for checking tensor/np array equivalence.
justinxzhao Jun 21, 2022
c359162
Merge branch 'master' into ref-split
tgaddair Jun 27, 2022
94f26e9
Removed unused import
tgaddair Jun 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions examples/insurance_lite/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ trainer:
early_stop: 0
batch_size: 8
preprocessing:
force_split: false
split_probabilities: [0.7, 0.1, 0.2]
split:
type: random
probabilities: [0.7, 0.1, 0.2]
14 changes: 14 additions & 0 deletions ludwig/data/concatenate_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,20 @@ def get_split(idx):
return concatenated_df


def concatenate_splits(train_df, vali_df, test_df, backend):
def to_frame(df, split):
if df is None:
return None

df = df.index.to_frame(name=SPLIT)
df[SPLIT] = split
return df

dfs = [train_df, vali_df, test_df]
dfs = [to_frame(df, split) for split, df in enumerate(dfs)]
return backend.df_engine.df_lib.concat([df for df in dfs if df is not None])


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Concatenate train validation and test set")

Expand Down
11 changes: 10 additions & 1 deletion ludwig/data/dataframe/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,16 @@ def reduce_objects(self, series, reduce_fn):
raise NotImplementedError()

@abstractmethod
def to_parquet(self, df, path):
def split(self, df, probabilities):
"""Splits the input DataFrame into sections with the given proportions."""
raise NotImplementedError()

@abstractmethod
def to_parquet(self, df, path, index=False):
"""Write the input DataFrame to the path in the Parquet format.

Optionally includes the DataFrame index in the Parquet file.
"""
raise NotImplementedError()

@abstractmethod
Expand Down
27 changes: 23 additions & 4 deletions ludwig/data/dataframe/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from ray.util.dask import ray_dask_get

from ludwig.data.dataframe.base import DataFrameEngine
from ludwig.utils.data_utils import split_by_slices

TMP_COLUMN = "__TMP_COLUMN__"

Expand All @@ -36,10 +37,11 @@ def set_scheduler(scheduler):


class DaskEngine(DataFrameEngine):
def __init__(self, parallelism=None, persist=True, **kwargs):
def __init__(self, parallelism=None, persist=True, _use_ray=True, **kwargs):
self._parallelism = parallelism
self._persist = persist
set_scheduler(ray_dask_get)
if _use_ray:
set_scheduler(ray_dask_get)

def set_parallelism(self, parallelism):
self._parallelism = parallelism
Expand Down Expand Up @@ -89,12 +91,29 @@ def apply_objects(self, df, apply_fn, meta=None):
def reduce_objects(self, series, reduce_fn):
return series.reduction(reduce_fn, aggregate=reduce_fn, meta=("data", "object")).compute()[0]

def to_parquet(self, df, path):
def split(self, df, probabilities):
# Split the DataFrame proprotionately along partitions. This is an inexact solution designed
# to speed up the split process, as splitting within partitions would be significantly
# more expensive.
# TODO(travis): revisit in the future to make this more precise

# First ensure that every split receives at least one partition.
tgaddair marked this conversation as resolved.
Show resolved Hide resolved
# If not, we need to increase the number of partitions to satisfy this constraint.
min_prob = min(probabilities)
min_partitions = int(1 / min_prob)
if df.npartitions < min_partitions:
df = df.repartition(min_partitions)

n = df.npartitions
slices = df.partitions
return split_by_slices(slices, n, probabilities)

def to_parquet(self, df, path, index=False):
with ProgressBar():
df.to_parquet(
path,
engine="pyarrow",
write_index=False,
write_index=index,
schema="infer",
)

Expand Down
8 changes: 6 additions & 2 deletions ludwig/data/dataframe/modin.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import numpy as np

from ludwig.data.dataframe.base import DataFrameEngine
from ludwig.utils.data_utils import split_by_slices


class ModinEngine(DataFrameEngine):
Expand Down Expand Up @@ -52,8 +53,11 @@ def apply_objects(self, df, apply_fn, meta=None):
def reduce_objects(self, series, reduce_fn):
return reduce_fn(series)

def to_parquet(self, df, path):
df.to_parquet(path, engine="pyarrow")
def split(self, df, probabilities):
return split_by_slices(df.iloc, len(df), probabilities)

def to_parquet(self, df, path, index=False):
df.to_parquet(path, engine="pyarrow", index=index)

def to_ray_dataset(self, df):
from ray.data import from_modin
Expand Down
8 changes: 6 additions & 2 deletions ludwig/data/dataframe/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pandas as pd

from ludwig.data.dataframe.base import DataFrameEngine
from ludwig.utils.data_utils import split_by_slices


class PandasEngine(DataFrameEngine):
Expand Down Expand Up @@ -54,8 +55,11 @@ def apply_objects(self, df, apply_fn, meta=None):
def reduce_objects(self, series, reduce_fn):
return reduce_fn(series)

def to_parquet(self, df, path):
df.to_parquet(path, engine="pyarrow")
def split(self, df, probabilities):
return split_by_slices(df.iloc, len(df), probabilities)

def to_parquet(self, df, path, index=False):
df.to_parquet(path, engine="pyarrow", index=index)

def to_ray_dataset(self, df):
from ray.data import from_pandas
Expand Down
132 changes: 51 additions & 81 deletions ludwig/data/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.
# ==============================================================================
import logging
import warnings
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional, Tuple

Expand Down Expand Up @@ -48,8 +49,9 @@
VALIDATION,
)
from ludwig.data.cache.types import wrap
from ludwig.data.concatenate_datasets import concatenate_df, concatenate_files
from ludwig.data.concatenate_datasets import concatenate_df, concatenate_files, concatenate_splits
from ludwig.data.dataset.base import Dataset
from ludwig.data.split import get_splitter, split_dataset
from ludwig.encoders.registry import get_encoder_cls
from ludwig.features.feature_registries import base_type_registry
from ludwig.features.feature_utils import compute_feature_hash
Expand Down Expand Up @@ -89,16 +91,14 @@
read_stata,
read_tsv,
SAS_FORMATS,
save_array,
split_dataset_ttv,
SPSS_FORMATS,
STATA_FORMATS,
TSV_FORMATS,
use_credentials,
)
from ludwig.utils.defaults import default_preprocessing_parameters, default_random_seed
from ludwig.utils.fs_utils import file_lock, path_exists
from ludwig.utils.misc_utils import get_from_registry, merge_dict, resolve_pointers, set_random_seed
from ludwig.utils.misc_utils import get_from_registry, merge_dict, resolve_pointers
from ludwig.utils.types import DataFrame, Series

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -979,7 +979,7 @@ def preprocess_for_training(
@staticmethod
def preprocess_for_prediction(dataset, features, preprocessing_params, training_set_metadata, backend, callbacks):
hdf5_fp = dataset
dataset = load_hdf5(dataset, features, split_data=False, shuffle_training=False)
dataset = load_hdf5(dataset, preprocessing_params, backend, split_data=False, shuffle_training=False)
return dataset, training_set_metadata, hdf5_fp

@staticmethod
Expand Down Expand Up @@ -1020,10 +1020,12 @@ def prepare_processed_data(
training_set_metadata[DATA_TRAIN_HDF5_FP] = not_none_set

if dataset is not None:
training_set, test_set, validation_set = load_hdf5(dataset, features, shuffle_training=True)
training_set, test_set, validation_set = load_hdf5(
dataset, preprocessing_params, backend, shuffle_training=True
)

elif training_set is not None:
kwargs = dict(features=features, split_data=False)
kwargs = dict(preprocessing_params=preprocessing_params, backend=backend, split_data=False)
training_set = load_hdf5(training_set, shuffle_training=True, **kwargs)

if validation_set is not None:
Expand Down Expand Up @@ -1130,18 +1132,18 @@ def build_dataset(
for callback in callbacks or []:
callback.on_build_data_end(dataset_df, mode)

logger.debug("get split")
split = get_split(
dataset_df,
force_split=global_preprocessing_parameters["force_split"],
split_probabilities=global_preprocessing_parameters["split_probabilities"],
stratify=global_preprocessing_parameters["stratify"],
backend=backend,
random_seed=random_seed,
)
# Get any additional columns needed for splitting downstream, otherwise they will not be
# included in the preprocessed output.
split_params = global_preprocessing_parameters.get(SPLIT, {})
if "type" not in split_params and SPLIT in dataset_df:
warnings.warn(
'Detected "split" column in the data, but using default split type '
'"random". Did you mean to set split type to "fixed"?'
)

if split is not None:
proc_cols[SPLIT] = split
splitter = get_splitter(**split_params)
for col in splitter.required_columns:
proc_cols[col] = dataset_df[col]

# TODO ray: this is needed because ray 1.7 doesn't support Dask to RayDataset
# conversion with Tensor columns. Can remove for 1.8.
Expand Down Expand Up @@ -1412,45 +1414,7 @@ def handle_missing_values(dataset_cols, feature, preprocessing_parameters):
raise ValueError("Invalid missing value strategy")


def get_split(
dataset_df,
force_split=False,
split_probabilities=(0.7, 0.1, 0.2),
stratify=None,
backend=LOCAL_BACKEND,
random_seed=default_random_seed,
):
if SPLIT in dataset_df and not force_split:
split = dataset_df[SPLIT].astype(np.int8)
else:
set_random_seed(random_seed)
if stratify is None or stratify not in dataset_df:
if backend.df_engine.partitioned:
# This approach is very inefficient for partitioned backends, which
# can split by partition
return

split = (
dataset_df.index.to_series()
.map(lambda x: np.random.choice(3, 1, p=split_probabilities))
.astype(np.int8)
)
else:
split = np.zeros(len(dataset_df))
for val in dataset_df[stratify].unique():
# TODO dask: find a way to better parallelize this operation
idx_list = dataset_df.index[dataset_df[stratify] == val].tolist()
array_lib = backend.df_engine.array_lib
val_list = array_lib.random.choice(
3,
len(idx_list),
p=split_probabilities,
).astype(np.int8)
split[idx_list] = val_list
return split


def load_hdf5(hdf5_file_path, features, split_data=True, shuffle_training=False):
def load_hdf5(hdf5_file_path, preprocessing_params, backend, split_data=True, shuffle_training=False):
# TODO dask: this needs to work with DataFrames
logger.info(f"Loading data from: {hdf5_file_path}")

Expand All @@ -1463,7 +1427,7 @@ def shuffle(df):
dataset = shuffle(dataset)
return dataset

training_set, test_set, validation_set = split_dataset_ttv(dataset, SPLIT)
training_set, validation_set, test_set = split_dataset(dataset, preprocessing_params, backend)

if shuffle_training:
training_set = shuffle(training_set)
Expand Down Expand Up @@ -1668,12 +1632,6 @@ def _preprocess_file_for_training(
mode="training",
)

# TODO(travis): implement saving split for Ray
if backend.is_coordinator() and not skip_save_processed_input and SPLIT in data.columns:
# save split values for use by visualization routines
split_fp = get_split_path(dataset)
save_array(split_fp, data[SPLIT])

elif training_set:
# use data_train (including _validation and _test if they are present)
# and ignore data and train set metadata
Expand All @@ -1684,6 +1642,21 @@ def _preprocess_file_for_training(
concatenated_df = concatenate_files(training_set, validation_set, test_set, read_fn, backend)
training_set_metadata[SRC] = training_set

# Data is pre-split, so we override whatever split policy the user specified
if preprocessing_params["split"]:
warnings.warn(
'Preprocessing "split" section provided, but pre-split dataset given as input. '
"Ignoring split configuration."
)

preprocessing_params = {
**preprocessing_params,
"split": {
"type": "fixed",
"column": SPLIT,
},
}

data, training_set_metadata = build_dataset(
concatenated_df,
features,
Expand All @@ -1698,15 +1671,16 @@ def _preprocess_file_for_training(
else:
raise ValueError("either data or data_train have to be not None")

logger.info("Building dataset: DONE")
logger.debug("split train-val-test")
training_data, validation_data, test_data = split_dataset(data, preprocessing_params, backend, random_seed)

if SPLIT in data.columns:
logger.debug("split on split column")
training_data, test_data, validation_data = split_dataset_ttv(data, SPLIT)
else:
logger.debug("split randomly by partition")
training_data, test_data, validation_data = data.random_split(preprocessing_params["split_probabilities"])
if dataset and backend.is_coordinator() and not skip_save_processed_input:
logger.debug("writing split file")
splits_df = concatenate_splits(training_data, validation_data, test_data, backend)
split_fp = get_split_path(dataset or training_set)
backend.df_engine.to_parquet(splits_df, split_fp, index=True)

logger.info("Building dataset: DONE")
if preprocessing_params["oversample_minority"] or preprocessing_params["undersample_majority"]:
training_data = balance_data(training_data, config["output_features"], preprocessing_params, backend)

Expand Down Expand Up @@ -1740,7 +1714,7 @@ def _preprocess_df_for_training(
dataset = concatenate_df(training_set, validation_set, test_set, backend)
logger.info("Building dataset (it may take a while)")

dataset, training_set_metadata = build_dataset(
data, training_set_metadata = build_dataset(
dataset,
features,
preprocessing_params,
Expand All @@ -1751,15 +1725,10 @@ def _preprocess_df_for_training(
mode="training",
)

logger.info("Building dataset: DONE")

if SPLIT in dataset.columns:
logger.debug("split on split column")
training_set, test_set, validation_set = split_dataset_ttv(dataset, SPLIT)
else:
logger.debug("split randomly by partition")
training_set, test_set, validation_set = dataset.random_split(preprocessing_params["split_probabilities"])
logger.debug("split train-val-test")
training_set, validation_set, test_set = split_dataset(data, preprocessing_params, backend, random_seed)

logger.info("Building dataset: DONE")
if preprocessing_params["oversample_minority"] or preprocessing_params["undersample_majority"]:
training_set = balance_data(training_set, config["output_features"], preprocessing_params, backend)

Expand Down Expand Up @@ -1869,7 +1838,8 @@ def preprocess_for_prediction(
training_set_metadata[DATA_TRAIN_HDF5_FP] = new_hdf5_fp

if split != FULL:
training_set, test_set, validation_set = split_dataset_ttv(dataset, SPLIT)
logger.debug("split train-val-test")
training_set, validation_set, test_set = split_dataset(dataset, preprocessing_params, backend)

if split == TRAINING:
dataset = training_set
Expand Down
Loading