A curated set of tools for managing distributed task workflows.
- Airflow: Task Workflow Engine
- Docker Swarm: Container Orchestration
- Docker Infrakit: Infrastructure Orchestration to deploy on the cloud
Note: This project was inspired by https://github.com/puckel/docker-airflow and infrakit examples https://github.com/infrakit/examples
-
Install docker
- NO Nvidia GPU support needed
wget -qO- https://get.docker.com/ | sh
- YES Nvidia GPU support needed
- Install Docker CE
- Install nvidia-docker2
- NO Nvidia GPU support needed
-
Install docker compose
pip install docker-compose
- Install requirements
- Clone this repo
- Deploy Local
- Go to localhost
- Activate dag and trigger run
- Uncomment every dag and plugin folder mounts in deploy/docker-compose-CeleryExecutor.yml
#- ../dags/:/usr/local/airflow/dags #- ../plugins:/usr/local/airflow/plugins
- Create or modify DAG inside dags folder
- Check webserver to see if DAG is now updated
See other examples for inspiration
- Build docker image
docker build -f docker/Dockerfile -t wongwill86/air-tasks:<your tag> .
- Before committing/pushing, comment every dag and plugin folder mounts in deploy/docker-compose-CeleryExecutor.yml
#- ../dags/:/usr/local/airflow/dags #- ../plugins:/usr/local/airflow/plugins
- Replace every air-tasks tag with your tag in deploy/docker-compose-CeleryExecutor.yml
<every service that has this>: image: wongwill86/air-tasks:<your tag>
- Try to Deploy
- Push to docker and/or wait for docker cloud to build
docker-compose -f deploy/docker-compose-CeleryExecutor.yml up -d
echo '<blank or username here>' | docker secret create basic_auth_username -
echo '<blank or password here>' | docker secret create basic_auth_password -
echo '<blank ssl certificate here>' | docker secret create ssl_certificate -
echo '<blank ssl certificate key here>' | docker secret create ssl_certificate_key -
docker stack deploy -c deploy/docker-compose-CeleryExecutor.yml <stack name>
- Initialize submodule
git submodule init git submodule update --recursive --remote
- Use Cloudformation to create a new stack.
- Use this cloud/latest/swarm/aws/vpc.cfn
- Initialize submodule
git submodule init git submodule update --recursive --remote
- Install gcloud
- (Optional) configure yaml cloud/latest/swarm/google/cloud-deployment.yaml
- Deploy using gcloud
gcloud deployment-manager deployments create <deployment name> --config cloud/latest/swarm/google/cloud-deployment.yaml
AirFlow or (<host>/
) - Airflow Webserver
Celery Flower or (<host>/flower
)- Monitor Workers
Swarm Visualizer or (<host>/visualizer
)- Visualize Stack Deployment
RabbitMQ or (<host>/rabbitmq
)- RabbitMQ Management Plugin (Queue Info)
Note: if running with ssl, use https: instead of http
-
Clone this repo
-
Build the test image
export PYTHONDONTWRITEBYTECODE=1 export IMAGE_NAME=wongwill86/air-tasks:<your tag> docker-compose -f docker/docker-compose.test.yml -p ci build
-
Run test container
docker-compose -f docker/docker-compose.test.yml -p ci run --rm sut
-
(Optional) Watch / Test.
docker-compose -f docker/docker-compose.test.yml -p ci run --rm sut ptw -- --pylama
Warning 1: if nothing runs, make sure all tests pass first
Warning 2: you may need to restart if you rename/move files, especially possible if these are plugin modules
Tasks are defined as independent and stateless units of work. A task is described by creating an Airflow Operator Airflow provides many operators function such as calling bash scripts, python scripts, and even calling Docker images.
There is no guarantee that related tasks will be run on the same machine/environment. It is preferrable to use docker containers for your tasks to isolate the runtime environment and prevent polluting the environment of other tasks.
When a task is being scheduled to run, a task_instance
is created.
A Directed Acyclic Graph (DAG) is a static set of repeatable tasks operators that are invoked automatically or manually. DAGs are described in Airflow DAGS. The nodes of this graph are the task operators and the edges describe the dependencies between them. Edges are created by setting operator.set_upstream
or operator.set_downstream
to and from each task operator.
It should be assumed that the size of a dag is immutable ( actually its not but it gets really messy if you modify it ). DAGS themselves can also be invoked using parameters.
See example_trigger_target_dag
When a DAG is being scheduled to run, a dag_run
is created.
DAGs can be triggered by using the web ui as well as from the bash terminal of any airflow container, i.e.
$ docker exec -it <air_flow container> bash
$ airflow trigger_dag dag_id --conf '{"param_name":"param_value" }'
See more examples from https://github.com/wongwill86/air-tasks/tree/master/dags/examples
Create a one shot dag that is only run when manually triggered:
See https://github.com/wongwill86/air-tasks/blob/master/dags/examples/interleaved.py
This should be the most common use case. Should fit most needs.
Two separate DAGS are created:
- Listener DAG: Listens for command to be triggered with parameters
- Trigger DAG: Dynamically create a list of parameters to trigger the Listener DAG
See https://github.com/wongwill86/air-tasks/blob/master/dags/examples/multi_trigger.py
This should be avoided if possible since there is no good way to set fan-in dependencies for the listener DAG (possible but probably very hacky)
These are global variables that all task operators can have access to.
See Variables
This file is a schedule of services necessary to start Air-tasks
See https://github.com/wongwill86/air-tasks/blob/master/deploy/docker-compose-CeleryExecutor.yml
This is a description of all the services for docker-compose.
- Postgres: Database for saving DAGs, DAG runs, Tasks, Task Instances, etc...
- RabbitMQ: Internal queue service used to schedule tasks instances. Task instances are only scheduled when they are ready to run
- Webserver: Parses DAG python files and inserts them into the database
- Scheduler: Searches database for task_instances ready to run and places them in the queue
- Flower: Web UI monitoring of worker state and statistics
- Worker (worker-worker): Runs the task_instance
- Worker (worker-manager): Runs exclusively on Manager type instances. Runs tasks such as Autoscaling
- Visualizer: Basic Docker Swarm container visualizing UI
- Proxy: Reverse proxy for all web UI. Can be configured for basic auth and HTTPS
- add-secrets: Injects any specified secrets as a docker variable
Deployment of Air-Tasks can be split into 3 layers of abstraction:
Docker Infrakit to manage cloud services. Infrakit managers are able to start/stop and monitor cloud instances. Each manager/worker instance settings is defined from manager.json/worker.json for Google Cloud and manager.json/worker.json for AWS. One manager node bootstraps the other nodes.
Docker Swarm to join separate machines into a cluster. Docker managers are able to deploy and monitoring services. Services are defined in the compose file. Manager nodes run all services except for worker-worker
. This is made possible via deploy constraints using the engine label infrakit-manager=true
.
Airflow tasks are run on worker-worker containers that, in turn, run on infrakit worker nodes.
- Webserver parses python DAG file and inserts into database
- DAG is triggered either manually via web/cli or via cron schedule
- Scheduler creates
dag_runs
andtask_instances
for DAG - Scheduler inserts any valid ready
task_instances
into the queue - Worker processes tasks
- Worker writes back to queue and database indicating status as done
Air-Tasks is capable of autoscaling the cluster by monitoring the number of tasks ready to be executed (on the queue). This is done by careful coordination between the above 3 layers.
A special worker service ("worker-manager") is created in the compose file. This service is deployed exclusively on manager nodes, thus capable of creating instances via infrakit. Additionally a separate queue topic ("worker-manager") is dedicated for tasks that need to run on managers.
See https://github.com/wongwill86/air-tasks/blob/master/dags/manager/scaler.py for more information
Nvidia-docker is not forward compatible with edge releases of Docker. Therefore you must install the latest stable version of docker. As of writing the latest nvidia-docker can only be used with docker version 17.09. Check to make sure that the version is correct by using docker version
.
Repeated from Setup
- Install Docker CE
- Install nvidia-docker2
nvidia-docker2 will add configurations via /etc/docker/daemon.json
. If you have made any changes i.e. multiple-instance-types, make sure those are still persisted.
If you are using CUDA in your docker image, please make sure that the CUA version matches the host's Nvidia driver version. See compatibility matrix for more details
Use any air-task's custom docker operators with runtime set. i.e.
start = DockerConfigurableOperator(
task_id='docker_gpu_task',
command='nvidia-smi',
default_args=default_args,
image='nvidia/cuda:8.0-runtime-ubuntu16.04',
host_args={'runtime': 'nvidia'},
dag=dag
)
If your docker operator requires secrets, you can add them using variables. Then you can mount these secrets using DockerWithVariablesOperator. i.e.
start = DockerWithVariablesOperator(
['your_key'],
mount_point='/secrets',
task_id='docker_task',
command='sh -c "ls /secrets &&\
cat /secrets/variables/your_key && echo done"',
default_args=default_args,
image='alpine:latest',
dag=dag
)
You can mount your host machine's directory as a volume in the Docker Operator. DockerOperator calls docker directly from the host machine. In other words, your task's docker container is not running inside the worker container. Instead, it is running directly off the host machine. Therefore when you are testing locally, you can just mount your host's secret directory as a volume into the operator.
INCOMPLETE
If you need to run tasks on different machine instance types, this can be achieved by scheduling the task on a new queue topic. Currently all standard workers listen to the queue topic worker
. If you require specialized workers to run specific tasks, this can be achieved by:
- In the task operator, specify a new queue topic. This will schedule all tasks of this operator into a separate queue topic (in this case
other-instance-type
).start = BashOperator( task_id='new_instance_tag', bash_command='echo run from other instance type', queue='other-instance-type', dag=dag)
- In the docker compose file, create a new service copied and pasted from
worker-worker
. This will create workers that will listen to this new queue topic (other-instance-type
) and only deploy on machines with the docker engine label:[ engine.labels.infrakit-role == other-instance-type ]
.worker-other-instance-type: ... command: worker -q other-instance-type deploy: mode: global placement: constraints: [ engine.labels.infrakit-role == other-instance-type ]
- Add support for new workers in Infrakit.
- Create a new worker init script to set the role to
worker-other-instance-type
i.e. cloud/latest/swarm/worker-init.sh. This role is used to set the docker engine label (to deploy your new docker service that listens to the queue topicother-instance-type
). - Create a new worker definition i.e. cloud/latest/swarm/google/worker.json. This is used to specify the instance type.
- Add a new group plugin with ID
worker-other-instance-type
to enable worker definitions created from Steps 1 and 2 cloud/latest/swarm/groups.json. If you create an ID in this format, autoscaling will work for this instance type.
- Create a new worker init script to set the role to
Sometimes you may need to make operators that will be useful for others. These can be shared with others as a plugin. You can add plugins to the plugins folder.
See https://github.com/wongwill86/air-tasks/blob/master/plugins/custom/docker.py
To access a private AWS container registry, remember to set aws environment variables such as:
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- AWS_DEFAULT_REGION
Docker login to AWS ECR will automatically be set up.
The main Dockerfile is built on top of one of two base images:
Additionally, this base image is used to build the test image which includes python test libraries. This is useful for testing derived images so that the test libraries do not need to be reinstalled. See docker-compose.test.yml for more details. These base images should automatically be built in docker cloud.
If for any reason you require building new base images:
- Build the base image
docker build -f docker/base/Dockerfile.base-alpine -t wongwill86/air-tasks:<your base tag> .
- Prepare base image to build test base image
export IMAGE_NAME=wongwill86/air-tasks:<your base tag>
- Build test base image
docker-compose -f docker/docker-compose.test.yml -p ci_base build
- Retag built test base image for testing
docker tag ci_base_sut wongwill86/air-tasks:<your base tag>-test
Source Type | Source | Docker Tag | Dockerfile location | Build Context |
---|---|---|---|---|
Branch | master | latest | Dockerfile | / |
Branch | /^(.{1,5}|.{7}|([^m]|m[^a]|ma[^s]|mas[^t]|mast[^e]|maste[^r])(.*))$/ | {source-ref} | docker/Dockerfile | / |
Tag | /^base-([0-9.a-zA-Z-]+)$/ | base-alpine-{\1} | docker/base/Dockerfile.base-alpine | / |
Tag | /^base-([0-9.a-zA-Z-]+)$/ | base-slim-{\1} | docker/base/Dockerfile.base-slim | / |
Tag | /^v([0-9.a-zA-Z-]+)$/ | release-v{\1} | docker/Dockerfile | / |