From 940141e6efc61424d63aa9b00d1997e4e9948666 Mon Sep 17 00:00:00 2001 From: Geoffrey Angus Date: Wed, 27 Jul 2022 14:46:30 -0700 Subject: [PATCH] Fixes TensorDtype TypeError in Ray nightly (#2320) * Fixes TensorDtype TypeError in Ray nightly * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- ludwig/backend/ray.py | 5 ++--- ludwig/data/dataset/ray.py | 18 ++++++++++++++++-- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/ludwig/backend/ray.py b/ludwig/backend/ray.py index 5dafce3daf2..afb9d77b44a 100644 --- a/ludwig/backend/ray.py +++ b/ludwig/backend/ray.py @@ -30,13 +30,12 @@ from packaging import version from ray import ObjectRef from ray.data.dataset_pipeline import DatasetPipeline -from ray.data.extensions import TensorDtype from ray.util.dask import ray_dask_get from ludwig.backend.base import Backend, RemoteTrainingMixin from ludwig.constants import MODEL_ECD, MODEL_GBM, NAME, PREPROCESSING, PROC_COLUMN from ludwig.data.dataframe.base import DataFrameEngine -from ludwig.data.dataset.ray import RayDataset, RayDatasetManager, RayDatasetShard +from ludwig.data.dataset.ray import cast_as_tensor_dtype, RayDataset, RayDatasetManager, RayDatasetShard from ludwig.models.base import BaseModel from ludwig.models.ecd import ECD from ludwig.models.predictor import BasePredictor, get_output_columns, Predictor, RemotePredictor @@ -653,7 +652,7 @@ def batch_predict(self, dataset: RayDataset, *args, collect_logits: bool = False def to_tensors(df: pd.DataFrame) -> pd.DataFrame: for c in columns: - df[c] = df[c].astype(TensorDtype()) + df[c] = cast_as_tensor_dtype(df[c]) return df # TODO(shreya): self.trainer_kwargs should have the correct resources; debug. diff --git a/ludwig/data/dataset/ray.py b/ludwig/data/dataset/ray.py index 88b4141e4ce..70174dfc9e4 100644 --- a/ludwig/data/dataset/ray.py +++ b/ludwig/data/dataset/ray.py @@ -36,12 +36,26 @@ from ludwig.utils.data_utils import DATA_TRAIN_HDF5_FP, DATA_TRAIN_PARQUET_FP from ludwig.utils.fs_utils import get_fs_and_path from ludwig.utils.misc_utils import get_proc_features -from ludwig.utils.types import DataFrame +from ludwig.utils.types import DataFrame, Series _ray113 = version.parse(ray.__version__) == version.parse("1.13.0") +_ray_nightly = version.parse(ray.__version__) > version.parse("1.13") _SCALAR_TYPES = {BINARY, CATEGORY, NUMBER} +# https://github.com/ray-project/ray/issues/27031 +# TODO(geoffrey): remove this once Ray > 1.13 in our CI. +if _ray_nightly: + from ray.data.extensions import TensorArray + + def cast_as_tensor_dtype(series: Series) -> Series: + return TensorArray(series) + +else: + + def cast_as_tensor_dtype(series: Series) -> Series: + return series.astype(TensorDtype()) + def read_remote_parquet(path: str): fs, path = get_fs_and_path(path) @@ -269,7 +283,7 @@ def to_tensors(df: pd.DataFrame) -> pd.DataFrame: for c in columns: # do not convert scalar columns: https://github.com/ray-project/ray/issues/20825 if features[c][TYPE] not in _SCALAR_TYPES: - df[c] = df[c].astype(TensorDtype()) + df[c] = cast_as_tensor_dtype(df[c]) elif features[c][TYPE] == BINARY: # TODO(travis): figure out why Ray is converting these into object types by default df[c] = df[c].astype(np.bool_)