Skip to content

Commit

Permalink
📖 improve README.md
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Sep 23, 2024
1 parent f14b80a commit af7a93c
Showing 1 changed file with 92 additions and 104 deletions.
196 changes: 92 additions & 104 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@

---

[Ray](https://github.com/ray-project/ray) integration library for [Dagster](https://github.com/dagster-io/dagster).
[Ray](https://github.com/ray-project/ray) integration for [Dagster](https://github.com/dagster-io/dagster).

`dagster-ray` allows running Ray computations in Dagster pipelines. It provides various Dagster abstractions, the most important being `Resource`, and helper `@op`s and `@schedule`s, for multiple backends.
`dagster-ray` allows creating Ray clusters and running distributed computations from Dagster code. Features include:

The following backends are implemented:
- local
- `KubeRay` (kubernetes)
- `PipesRayJobClient`, a [Dagster Pipes](https://docs.dagster.io/concepts/dagster-pipes) client for launching and monitoring `RayJob` resources via [KubeRay](https://github.com/ray-project/kuberay). Most suitable for submitting long-running jobs (via external Python scripts) with no direct Ray access from Dagster code. Allows receiving rich logs, events and metadata from the job. Implemented for the `KubeRay` backend.

`dagster-ray` is tested across multiple version combinations of components such as `ray`, `dagster`, `KubeRay Operator`, and `Python`.
- `RayResource`, a resource representing a Ray cluster. Interactions are performed in client mode (requires stable persistent connection), so it's most suitable for relatively short jobs. Provide direct Ray access from the Dagster Python process. It has implementations for `KubeRay` and local (mostly for testing) backends. `dagster_ray.RayResource` defines the common interface shared by all backends and can be used for backend-agnostic type annotations.

`dagster-ray` integrates with [Dagster+](https://dagster.io/plus) out of the box.
- Miscellaneous utilities like `@op`, `@job` and `@schedule` for managing `KubeRay` clusters

`dagster-ray` is tested across multiple versions of Python, Ray, Dagster, and KubeRay Operator. It integrates with [Dagster+](https://dagster.io/plus) where possible.

Documentation can be found below.

Expand All @@ -41,92 +41,9 @@ pip install 'dagster-ray[kuberay]'

# Backends

`dagster-ray` provides a `RayResource` class, which does not implement any specific backend.
It defines the common interface for all `Ray` resources.
It can be used for type annotations in your `@op` and `@asset` definitions.

Examples:

```python
from dagster import asset
from dagster_ray import RayResource
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
):
return ray.get(ray.put(42))
```

The other resources below are the actual backends that implement the `RayResource` interface.

## Local

These resources can be used for development and testing purposes.
They provide the same interface as the other `*Ray` resources, but don't require any external infrastructure.

The public objects can be imported from `dagster_ray.local` module.

### Resources

#### `LocalRay`

A dummy resource which is useful for testing and development.
It doesn't do anything, but provides the same interface as the other `*Ray` resources.

Examples:


Using the `LocalRay` resource

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


definitions = Definitions(resources={"ray_cluster": LocalRay()}, assets=[my_asset])
```

Conditionally using the `LocalRay` resource in development and `KubeRayCluster` in production:

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
from dagster_ray.kuberay import KubeRayCluster
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


IN_K8s = ...


definitions = Definitions(
resources={"ray_cluster": KubeRayCluster() if IN_K8s else LocalRay()},
assets=[my_asset],
)
```

## KubeRay

This backend requires a Kubernetes cluster with the `KubeRay Operator` installed.
This backend requires a Kubernetes cluster with `KubeRay Operator` installed.

Integrates with [Dagster+](https://dagster.io/plus) by injecting environment variables such as `DAGSTER_CLOUD_DEPLOYMENT_NAME` and tags such as `dagster/user` into default configuration values and Kubernetes labels.

Expand All @@ -138,11 +55,11 @@ The public objects can be imported from `dagster_ray.kuberay` module.
### Pipes

`dagster-ray` provides the `PipesRayJobClient` which can be used to execute remote Ray jobs on Kubernetes and receive Dagster events and logs from them.
[RayJob](https://docs.ray.io/en/latest/cluster/kubernetes/getting-started/rayjob-quick-start.html) will manage the lifecycle of the underlying `RayCluster`, which will be cleaned up after the specified entrypoint exits.
[RayJob](https://docs.ray.io/en/latest/cluster/kubernetes/getting-started/rayjob-quick-start.html) will manage the lifecycle of the underlying `RayCluster`, which will be cleaned up after the specified entrypoint exits. Doesn't require a persistent connection to the Ray cluster.

Examples:

On the orchestration side, import the `PipesRayJobClient` and invoke it inside an `@op` or an `@asset`:
In Dagster code, import `PipesRayJobClient` and invoke it inside an `@op` or an `@asset`:

```python
from dagster import AssetExecutionContext, Definitions, asset
Expand All @@ -156,10 +73,19 @@ def my_asset(context: AssetExecutionContext, pipes_rayjob_client: PipesRayJobCli
context=context,
ray_job={
# RayJob manifest goes here
# .metadata.name is not required and will be generated if not provided
# *.container.image is not required and will be set to the current `dagster/image` tag if not provided
# full reference: https://ray-project.github.io/kuberay/reference/api/#rayjob
...
"metadata": {
# .metadata.name is not required and will be generated if not provided
"namespace": "ray"
},
"spec": {
"entrypoint": "python /app/my_script.py",
# *.container.image is not required and will be set to the current `dagster/image` tag if not provided
"rayClusterSpec": {
"headGroupSpec": {...},
"workerGroupSpecs": [...],
},
},
},
extra={"foo": "bar"},
)
Expand All @@ -179,7 +105,7 @@ from dagster_pipes import open_dagster_pipes
with open_dagster_pipes() as pipes:
pipes.log.info("Hello from Ray Pipes!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
metadata={"some_metric": {"raw_value": 57, "type": "int"}},
data_version="alpha",
)
```
Expand All @@ -192,8 +118,7 @@ import yaml
ray_job = {"spec": {"runtimeEnvYaml": yaml.safe_dump({"pip": ["dagster-pipes"]})}}
```

The logs and events emitted by the Ray job will be captured by the `PipesRayJobClient` and will become available in the Dagster event log. Standard output and standard error streams will be forwarded to the standard output of the Dagster process.

Events emitted by the Ray job will be captured by `PipesRayJobClient` and will become available in the Dagster event log. Standard output and standard error streams will be forwarded to the standard output of the Dagster process.

**Running locally**

Expand All @@ -209,7 +134,7 @@ pipes_rayjob_client = PipesRayJobClient(..., port_forward=not in_k8s)

#### `KubeRayCluster`

`KubeRayCluster` can be used for running Ray computations on Kubernetes.
`KubeRayCluster` can be used for running Ray computations on Kubernetes in client (interactive) mode. Requies stable persistent connection through the duration of the Dagster step.

When added as resource dependency to an `@op/@asset`, the `KubeRayCluster`:
- Starts a dedicated `RayCluster` for it
Expand All @@ -231,9 +156,9 @@ import ray

@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))
ray_cluster: RayResource, # RayResource is a backeand-agnostic type annotation
):
return ray.get(ray.put(42)) # interact with the Ray cluster!


definitions = Definitions(
Expand Down Expand Up @@ -299,7 +224,70 @@ Cleanup schedules can be trivially created using the `cleanup_old_ray_clusters`
#### `cleanup_old_ray_clusters`
`dagster-ray` provides an example daily cleanup schedule.

## Executor

## Local

These resources can be used for development and testing purposes.
They provide the same interface as the other `*Ray` resources, but don't require any external infrastructure.

The public objects can be imported from `dagster_ray.local` module.

### Resources

#### `LocalRay`

A dummy resource which is useful for testing and development.
It doesn't do anything, but provides the same interface as the other `*Ray` resources.

Examples:


Using the `LocalRay` resource

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


definitions = Definitions(resources={"ray_cluster": LocalRay()}, assets=[my_asset])
```

Conditionally using the `LocalRay` resource in development and `KubeRayCluster` in production:

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
from dagster_ray.kuberay import KubeRayCluster
import ray


@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))


IN_K8s = ...


definitions = Definitions(
resources={"ray_cluster": KubeRayCluster() if IN_K8s else LocalRay()},
assets=[my_asset],
)
```

# Executor
WIP

# Development
Expand Down

0 comments on commit af7a93c

Please sign in to comment.