Reputation: 137
I try create graph with chain of dynamic tasks.
First of all, I start with expand
function. But problem is program should wait, when all the Add tasks finished and only then start Mul tasks. I need the next Mul to run immediately after each Add. Then I got the code that the graph could make
with DAG(dag_id="simple_maping", schedule='* * * * *', start_date=datetime(2022, 12, 22)) as dag:
@task
def read_conf():
conf = Variable.get('tables', deserialize_json=True)
return conf
@task
def add_one(x: str):
sleep(5)
return x + '1'
@task
def mul_two(x: str):
return x * 2
for i in read_conf():
mul_two(add_one(i))
but now there is an error - 'xcomarg' object is not iterable. I can fix it just remove task decorator from read_conf
method, but I am not sure it's the best decision, because in my case list configuration names could contain >1000 elements. Without decorator, method have to read configuration every time when scheduler parsed graph.
Maybe the load without the decorator will not be critical? Or is there a way to make an object iterable? How to do it right?
Upvotes: 0
Views: 3410
Reputation: 1046
EDIT: This solution has a bug in 2.5.0 which was solved for 2.5.1 (not released yet).
Yes, when you are chaining dynamically mapped tasks the latter (mul_2
) will wait until all mapped instances of the first task (add_one
) are done by default because the default trigger rule is all_success
. While you can change the trigger rule for example to one_done
this will not solve your issue because the second task will only once, when it first starts running, decide how many mapped task instances it creates (with one_done
it only creates one mapped task instance, so not helpful for your use-case).
The issue with the for-loop (and why Airflow wont allow you to iterate over an XComArg) is that for-loops are parsed when the DAG code is parsed, which happens outside of runtime, when Airflow does not know yet how many results read_conf()
will return. If the number of the configurations only rarely change then having a for-loop like that iterating over list in a separate file is an option, but yes at scale this can cause performance issues.
The best solution in my opinion is to use dynamic task group mapping which was added in Airflow 2.5.0:
All mapped task groups will run in parallel and for every input from read_conf()
. So for every add_one
its mul_two
will run immediately. I put the code for this below.
One note: You will not be able to see the mapped task groups in the Airflow UI or be able to access their logs just yet, the feature is still quite new and the UI extension should come in 2.5.1. That is why I added a task downstream of the mapped task groups that prints out the list of results of the mul_two tasks, so you can check if it is working.
from airflow import DAG
from airflow.decorators import task, task_group
from datetime import datetime
from time import sleep
with DAG(
dag_id="simple_mapping",
schedule=None,
start_date=datetime(2022, 12, 22),
catchup=False
) as dag:
@task
def read_conf():
return [10, 20, 30]
@task_group
def calculations(x):
@task
def add_one(x: int):
sleep(x)
return x + 1
@task()
def mul_two(x: int):
return x * 2
mul_two(add_one(x))
@task
def pull_xcom(**context):
pulled_xcom = context["ti"].xcom_pull(
task_ids=['calculations.mul_two'],
key="return_value"
)
print(pulled_xcom)
calculations.expand(x=read_conf()) >> pull_xcom()
Hope this helps! :)
PS: you might want to set catchup=False
unless you want to backfill a few weeks of tasks.
Upvotes: 2