This library provides a Dagster integration for Stitch, a managed batch data ingestion service similar to Fivetran and Airbyte.
Please note this library is under active development and should be used cautiously! Currently it supports triggering replication jobs and materializing the resulting tables as Dagster assets that can be used in downstream jobs. Reconciliation and other features are not yet included but would be great additions.
To install the library, run:
$ pip install dagster-stitch
For development, it can be installed locally and tested with:
$ pip install -e .[lint,test]
$ pytest
To use the library, you must configure a stitch
resource in your Dagster instance. The resource requires an api_key
to authenticate with Stitch. You can find or generate one in the Stitch UI under Account Settings > API Access Keys
.
You will also need to note your Stitch account ID and the ID of the data source you want to replicate to be used in the asset or operation configuration. These can be found by navigating to the data source in the Stitch UI and looking at the URL. For example, if the URL is https://app.stitchdata.com/client/12345/pipeline/v2/sources/67890/summary
, then the account ID is 12345
and the data source ID is 67890
.
Here's an example of how to instantiate an asset that will materialize the table_name
table from the data_source_id
data source:
from dagster import Definitions
from dagster_stitch import stitch_resource, build_stitch_assets
stitch_instance = stitch_resource.configured(
{
"api_key": "your_stitch_api_key",
"account_id": 12345,
}
)
stitch_assets = build_stitch_assets(
data_source_id=54321,
destination_tables=["data_source_name.table_name"]
)
definitions = Definitions(
assets=stitch_assets,
resources={"stitch": stitch_instance},
)
Note that you should include your data source name prefixed by a point in your destination_tables
and if the table do not correspond to a materialized asset, the materialization will raise an error.