Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

📖 improve README.md #11

Merged
merged 1 commit into from
Sep 23, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 97 additions & 108 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@

---

[Ray](https://github.com/ray-project/ray) integration library for [Dagster](https://github.com/dagster-io/dagster).
[Ray](https://github.com/ray-project/ray) integration for [Dagster](https://github.com/dagster-io/dagster).

`dagster-ray` allows running Ray computations in Dagster pipelines. It provides various Dagster abstractions, the most important being `Resource`, and helper `@op`s and `@schedule`s, for multiple backends.
`dagster-ray` allows creating Ray clusters and running distributed computations from Dagster code. Features include:

The following backends are implemented:
- local
- `KubeRay` (kubernetes)
- `PipesRayJobClient`, a [Dagster Pipes](https://docs.dagster.io/concepts/dagster-pipes) client for launching and monitoring `RayJob` resources in Kubernetes via [KubeRay](https://github.com/ray-project/kuberay). Most suitable for submitting long-running jobs (via external Python scripts) with no direct Ray access from Dagster code. Allows receiving rich logs, events and metadata from the job. Implemented for the `KubeRay` backend.

`dagster-ray` is tested across multiple version combinations of components such as `ray`, `dagster`, `KubeRay Operator`, and `Python`.
- `RayResource`, a resource representing a Ray cluster. Interactions are performed in client mode (requires stable persistent connection), so it's most suitable for relatively short jobs. Provide direct Ray access from the Dagster Python process. It has implementations for `KubeRay` and local (mostly for testing) backends. `dagster_ray.RayResource` defines the common interface shared by all backends and can be used for backend-agnostic type annotations.

`dagster-ray` integrates with [Dagster+](https://dagster.io/plus) out of the box.
- Miscellaneous utilities like `@op`, `@job` and `@schedule` for managing `KubeRay` clusters

`dagster-ray` is tested across multiple versions of Python, Ray, Dagster, and KubeRay Operator. It integrates with [Dagster+](https://dagster.io/plus) where possible.

Documentation can be found below.

Expand All @@ -41,92 +41,9 @@ pip install 'dagster-ray[kuberay]'

# Backends

`dagster-ray` provides a `RayResource` class, which does not implement any specific backend.
It defines the common interface for all `Ray` resources.
It can be used for type annotations in your `@op` and `@asset` definitions.

Examples:

```python
from dagster import asset
from dagster_ray import RayResource
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
):
return ray.get(ray.put(42))
```

The other resources below are the actual backends that implement the `RayResource` interface.

## Local

These resources can be used for development and testing purposes.
They provide the same interface as the other `*Ray` resources, but don't require any external infrastructure.

The public objects can be imported from `dagster_ray.local` module.

### Resources

#### `LocalRay`

A dummy resource which is useful for testing and development.
It doesn't do anything, but provides the same interface as the other `*Ray` resources.

Examples:


Using the `LocalRay` resource

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


definitions = Definitions(resources={"ray_cluster": LocalRay()}, assets=[my_asset])
```

Conditionally using the `LocalRay` resource in development and `KubeRayCluster` in production:

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
from dagster_ray.kuberay import KubeRayCluster
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


IN_K8s = ...


definitions = Definitions(
resources={"ray_cluster": KubeRayCluster() if IN_K8s else LocalRay()},
assets=[my_asset],
)
```

## KubeRay

This backend requires a Kubernetes cluster with the `KubeRay Operator` installed.
This backend requires a Kubernetes cluster with `KubeRay Operator` installed.

Integrates with [Dagster+](https://dagster.io/plus) by injecting environment variables such as `DAGSTER_CLOUD_DEPLOYMENT_NAME` and tags such as `dagster/user` into default configuration values and Kubernetes labels.

Expand All @@ -138,11 +55,11 @@ The public objects can be imported from `dagster_ray.kuberay` module.
### Pipes

`dagster-ray` provides the `PipesRayJobClient` which can be used to execute remote Ray jobs on Kubernetes and receive Dagster events and logs from them.
[RayJob](https://docs.ray.io/en/latest/cluster/kubernetes/getting-started/rayjob-quick-start.html) will manage the lifecycle of the underlying `RayCluster`, which will be cleaned up after the specified entrypoint exits.
[RayJob](https://docs.ray.io/en/latest/cluster/kubernetes/getting-started/rayjob-quick-start.html) will manage the lifecycle of the underlying `RayCluster`, which will be cleaned up after the specified entrypoint exits. Doesn't require a persistent connection to the Ray cluster.

Examples:

On the orchestration side, import the `PipesRayJobClient` and invoke it inside an `@op` or an `@asset`:
In Dagster code, import `PipesRayJobClient` and invoke it inside an `@op` or an `@asset`:

```python
from dagster import AssetExecutionContext, Definitions, asset
Expand All @@ -156,12 +73,21 @@ def my_asset(context: AssetExecutionContext, pipes_rayjob_client: PipesRayJobCli
context=context,
ray_job={
# RayJob manifest goes here
# .metadata.name is not required and will be generated if not provided
# *.container.image is not required and will be set to the current `dagster/image` tag if not provided
# full reference: https://ray-project.github.io/kuberay/reference/api/#rayjob
...
"metadata": {
# .metadata.name is not required and will be generated if not provided
"namespace": "ray"
},
"spec": {
"entrypoint": "python /app/my_script.py",
# *.container.image is not required and will be set to the current `dagster/image` tag if not provided
"rayClusterSpec": {
"headGroupSpec": {...},
"workerGroupSpecs": [...],
},
},
},
extra={"foo": "bar"},
extra={"param": "value"},
)


Expand All @@ -176,10 +102,11 @@ In the Ray job, import `dagster_pipes` (must be provided as a dependency) and em
from dagster_pipes import open_dagster_pipes


with open_dagster_pipes() as pipes:
pipes.log.info("Hello from Ray Pipes!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
with open_dagster_pipes() as context:
assert context.get_extra("param") == "value"
context.log.info("Hello from Ray Pipes!")
context.report_asset_materialization(
metadata={"some_metric": {"raw_value": 57, "type": "int"}},
data_version="alpha",
)
```
Expand All @@ -192,8 +119,7 @@ import yaml
ray_job = {"spec": {"runtimeEnvYaml": yaml.safe_dump({"pip": ["dagster-pipes"]})}}
```

The logs and events emitted by the Ray job will be captured by the `PipesRayJobClient` and will become available in the Dagster event log. Standard output and standard error streams will be forwarded to the standard output of the Dagster process.

Events emitted by the Ray job will be captured by `PipesRayJobClient` and will become available in the Dagster event log. Standard output and standard error streams will be forwarded to the standard output of the Dagster process.

**Running locally**

Expand All @@ -209,7 +135,7 @@ pipes_rayjob_client = PipesRayJobClient(..., port_forward=not in_k8s)

#### `KubeRayCluster`

`KubeRayCluster` can be used for running Ray computations on Kubernetes.
`KubeRayCluster` can be used for running Ray computations on Kubernetes in client (interactive) mode. Requies stable persistent connection through the duration of the Dagster step.

When added as resource dependency to an `@op/@asset`, the `KubeRayCluster`:
- Starts a dedicated `RayCluster` for it
Expand All @@ -231,9 +157,9 @@ import ray

@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))
ray_cluster: RayResource, # RayResource is a backeand-agnostic type annotation
):
return ray.get(ray.put(42)) # interact with the Ray cluster!


definitions = Definitions(
Expand Down Expand Up @@ -299,7 +225,70 @@ Cleanup schedules can be trivially created using the `cleanup_old_ray_clusters`
#### `cleanup_old_ray_clusters`
`dagster-ray` provides an example daily cleanup schedule.

## Executor

## Local

These resources can be used for development and testing purposes.
They provide the same interface as the other `*Ray` resources, but don't require any external infrastructure.

The public objects can be imported from `dagster_ray.local` module.

### Resources

#### `LocalRay`

A dummy resource which is useful for testing and development.
It doesn't do anything, but provides the same interface as the other `*Ray` resources.

Examples:


Using the `LocalRay` resource

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


definitions = Definitions(resources={"ray_cluster": LocalRay()}, assets=[my_asset])
```

Conditionally using the `LocalRay` resource in development and `KubeRayCluster` in production:

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
from dagster_ray.kuberay import KubeRayCluster
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


IN_K8s = ...


definitions = Definitions(
resources={"ray_cluster": KubeRayCluster() if IN_K8s else LocalRay()},
assets=[my_asset],
)
```

# Executor
WIP

# Development
Expand Down
Loading