Reputation: 418
disclaimer: I'm not (yet) a user of Airflow, just found about it today and I'm starting to explore if it may fit my uses cases.
I have one data processing workflow that is a sequential (not parallel) execution of multiple tasks. However, some of the tasks need to run on specific machines. Can Airflow manage this? What would be the advised implementation model for this use case?
Thanks.
Upvotes: 5
Views: 4428
Reputation: 8032
In case you're running Airflow in Docker, then you should do the following:
Set the queue
name in the DAG file:
with DAG(dag_id='dag_v1',
default_args={
'retries': 1,
'retry_delay': timedelta(seconds=30),
'queue':'server-1',
...
},
schedule_interval=None,
tags=['my_dags']) as dag:
...
Set the default queue in the docker-compose.yml
file
AIRFLOW__OPERATORS__DEFAULT_QUEUE: 'server-1'
Restart the Airflow Webserver, Scheduler etc.
Note: You have to do this for each worker but I assume that you have 1 worker per machine - meaning that each machine needs to have a different AIRFLOW__OPERATORS__DEFAULT_QUEUE
name and the corresponding DAGs you want to run on that machine need to have the same name for their queue
(then you can indeed use the ${HOSTNAME}
as the name).
Upvotes: 0
Reputation: 6548
Yes, you can achieve this in Airflow with queues. You can tie tasks to a specific queue. Then for each worker on a machine, you can set it to only pickup tasks from select queues.
In code, it would look like this:
task_1 = BashOperator(
dag=dag,
task_id='task_a',
...
)
task_2 = PythonOperator(
dag=dag,
task_id='task_b',
queue='special',
...
)
Note that there is this setting in airflow.cfg:
# Default queue that tasks get assigned to and that worker listen on.
default_queue = default
So if you started your workers with this:
Server A> airflow worker
Server B> airflow worker --queues special
Server C> airflow worker --queues default,special
Then task_1 can be picked up by servers A+C and task_2 can be picked up by servers B+C.
Upvotes: 11