jack
jack

Reputation: 861

creating 5 bash operators in Airflow

I have the following airflow code:

dag = DAG(
    dag_id='example_python_operator', default_args=args,
    schedule_interval=None)
for i in range(5):
    t = BashOperator(
        task_id = 'some_id',
        params={'i':i},
        bash_command = "python airflow/scripts/a.py '{{ execution_date }}' '{{ params.i }}' '{{ prev_execution_date }}'" ,
        dag = dag)
    set_upstream(t)

However this doesn't work... Apparently I cant do set_upstream(t). Basically I want to run the script 5 times with these parameters:

python airflow/scripts/a.py    '{{ execution_date }}' 0 '{{ prev_execution_date }}'
python airflow/scripts/a.py    '{{ execution_date }}' 1 '{{ prev_execution_date }}'
python airflow/scripts/a.py    '{{ execution_date }}' 2 '{{ prev_execution_date }}'
python airflow/scripts/a.py    '{{ execution_date }}' 3 '{{ prev_execution_date }}'
python airflow/scripts/a.py    '{{ execution_date }}' 4 '{{ prev_execution_date }}'

Each run is totally separate than the other so they can run simultaneously.

How can I fix that?

Upvotes: 0

Views: 1114

Answers (2)

Simon D
Simon D

Reputation: 6279

I would just use airflow's DummyOperator (an operator that does nothing) as a start task and set the upstream to that. Like this:

from airflow.operators import DummyOperator
...

dag = DAG(
    dag_id='example_dag', default_args=args,
    schedule_interval=None)

start_task = DummyOperator(task_id='start_task', dag=dag)

for i in range(5):
    t = BashOperator(
        task_id = 'task_{0}'.format(i),
        params={'i':i},
        bash_command = "python airflow/scripts/a.py '{{ execution_date }}' '{{ params.i }}' '{{ prev_execution_date }}'" ,
        dag = dag)

    t.set_upstream(start_task)

Upvotes: 1

dlamblin
dlamblin

Reputation: 45361

Your current code would make 5 tasks and they'd run approximately in parallel if you just remove the set_upstream(t).

If you had t1 and t2 you could do t2.set_upstream(t1), or t1 >> t2, but since you want them to run simultaneously you best not set it.

You will have to give each task a unique task_id though.

tasks = []
with DAG(
        dag_id='example_python_operator',
        default_args=args,
        schedule_interval=None
    ) as dag:
        tasks = [ BashOperator(
                      task_id = 'some_id_{}'.format(i),
                      params={'i':i},
                      bash_command="""python airflow/scripts/a.py \
                                   '{{ execution_date }}'\
                                   '{{ params.i }}'\
                                   '{{ prev_execution_date }}'""",
                  ) for i in range(5) ]

Upvotes: 1

Related Questions