Reputation: 861
I have the following airflow code:
dag = DAG(
dag_id='example_python_operator', default_args=args,
schedule_interval=None)
for i in range(5):
t = BashOperator(
task_id = 'some_id',
params={'i':i},
bash_command = "python airflow/scripts/a.py '{{ execution_date }}' '{{ params.i }}' '{{ prev_execution_date }}'" ,
dag = dag)
set_upstream(t)
However this doesn't work... Apparently I cant do set_upstream(t)
.
Basically I want to run the script 5 times with these parameters:
python airflow/scripts/a.py '{{ execution_date }}' 0 '{{ prev_execution_date }}'
python airflow/scripts/a.py '{{ execution_date }}' 1 '{{ prev_execution_date }}'
python airflow/scripts/a.py '{{ execution_date }}' 2 '{{ prev_execution_date }}'
python airflow/scripts/a.py '{{ execution_date }}' 3 '{{ prev_execution_date }}'
python airflow/scripts/a.py '{{ execution_date }}' 4 '{{ prev_execution_date }}'
Each run is totally separate than the other so they can run simultaneously.
How can I fix that?
Upvotes: 0
Views: 1114
Reputation: 6279
I would just use airflow's DummyOperator (an operator that does nothing) as a start task and set the upstream to that. Like this:
from airflow.operators import DummyOperator
...
dag = DAG(
dag_id='example_dag', default_args=args,
schedule_interval=None)
start_task = DummyOperator(task_id='start_task', dag=dag)
for i in range(5):
t = BashOperator(
task_id = 'task_{0}'.format(i),
params={'i':i},
bash_command = "python airflow/scripts/a.py '{{ execution_date }}' '{{ params.i }}' '{{ prev_execution_date }}'" ,
dag = dag)
t.set_upstream(start_task)
Upvotes: 1
Reputation: 45361
Your current code would make 5 tasks and they'd run approximately in parallel if you just remove the set_upstream(t)
.
If you had t1
and t2
you could do t2.set_upstream(t1)
, or t1 >> t2
, but since you want them to run simultaneously you best not set it.
You will have to give each task a unique task_id
though.
tasks = []
with DAG(
dag_id='example_python_operator',
default_args=args,
schedule_interval=None
) as dag:
tasks = [ BashOperator(
task_id = 'some_id_{}'.format(i),
params={'i':i},
bash_command="""python airflow/scripts/a.py \
'{{ execution_date }}'\
'{{ params.i }}'\
'{{ prev_execution_date }}'""",
) for i in range(5) ]
Upvotes: 1