user2679436
user2679436

Reputation: 418

How can Airflow be used to run distinct tasks of one workflow in separate machines?

disclaimer: I'm not (yet) a user of Airflow, just found about it today and I'm starting to explore if it may fit my uses cases.

I have one data processing workflow that is a sequential (not parallel) execution of multiple tasks. However, some of the tasks need to run on specific machines. Can Airflow manage this? What would be the advised implementation model for this use case?

Thanks.

Upvotes: 5

Views: 4428

Answers (2)

tsveti_iko
tsveti_iko

Reputation: 8032

In case you're running Airflow in Docker, then you should do the following:

  1. Set the queue name in the DAG file:

    with DAG(dag_id='dag_v1',
        default_args={
            'retries': 1,
            'retry_delay': timedelta(seconds=30),
            'queue':'server-1',
            ...
        },
        schedule_interval=None,
        tags=['my_dags']) as dag:
            ...
    
  2. Set the default queue in the docker-compose.yml file

    AIRFLOW__OPERATORS__DEFAULT_QUEUE: 'server-1'
    
  3. Restart the Airflow Webserver, Scheduler etc.

Note: You have to do this for each worker but I assume that you have 1 worker per machine - meaning that each machine needs to have a different AIRFLOW__OPERATORS__DEFAULT_QUEUE name and the corresponding DAGs you want to run on that machine need to have the same name for their queue (then you can indeed use the ${HOSTNAME} as the name).

Upvotes: 0

Daniel Huang
Daniel Huang

Reputation: 6548

Yes, you can achieve this in Airflow with queues. You can tie tasks to a specific queue. Then for each worker on a machine, you can set it to only pickup tasks from select queues.

In code, it would look like this:

task_1 = BashOperator(
    dag=dag,
    task_id='task_a',
    ...
)

task_2 = PythonOperator(
    dag=dag,
    task_id='task_b',
    queue='special',
    ...
)

Note that there is this setting in airflow.cfg:

# Default queue that tasks get assigned to and that worker listen on.
default_queue = default

So if you started your workers with this:

Server A> airflow worker
Server B> airflow worker --queues special
Server C> airflow worker --queues default,special

Then task_1 can be picked up by servers A+C and task_2 can be picked up by servers B+C.

Upvotes: 11

Related Questions