Миша Попов
Миша Попов

Reputation: 137

how to create a chain of dynamic tasks?

I try create graph with chain of dynamic tasks. enter image description here

First of all, I start with expand function. But problem is program should wait, when all the Add tasks finished and only then start Mul tasks. I need the next Mul to run immediately after each Add. Then I got the code that the graph could make

with DAG(dag_id="simple_maping", schedule='* * * * *', start_date=datetime(2022, 12, 22)) as dag:
    @task
    def read_conf():
        conf = Variable.get('tables', deserialize_json=True)
        return conf


    @task
    def add_one(x: str):
        sleep(5)
        return x + '1'


    @task
    def mul_two(x: str):
        return x * 2


    for i in read_conf():
        mul_two(add_one(i))

but now there is an error - 'xcomarg' object is not iterable. I can fix it just remove task decorator from read_conf method, but I am not sure it's the best decision, because in my case list configuration names could contain >1000 elements. Without decorator, method have to read configuration every time when scheduler parsed graph. Maybe the load without the decorator will not be critical? Or is there a way to make an object iterable? How to do it right?

Upvotes: 0

Views: 3410

Answers (1)

TJaniF
TJaniF

Reputation: 1046

EDIT: This solution has a bug in 2.5.0 which was solved for 2.5.1 (not released yet).

Yes, when you are chaining dynamically mapped tasks the latter (mul_2) will wait until all mapped instances of the first task (add_one) are done by default because the default trigger rule is all_success. While you can change the trigger rule for example to one_done this will not solve your issue because the second task will only once, when it first starts running, decide how many mapped task instances it creates (with one_done it only creates one mapped task instance, so not helpful for your use-case).

The issue with the for-loop (and why Airflow wont allow you to iterate over an XComArg) is that for-loops are parsed when the DAG code is parsed, which happens outside of runtime, when Airflow does not know yet how many results read_conf() will return. If the number of the configurations only rarely change then having a for-loop like that iterating over list in a separate file is an option, but yes at scale this can cause performance issues.

The best solution in my opinion is to use dynamic task group mapping which was added in Airflow 2.5.0:

All mapped task groups will run in parallel and for every input from read_conf(). So for every add_one its mul_two will run immediately. I put the code for this below.

One note: You will not be able to see the mapped task groups in the Airflow UI or be able to access their logs just yet, the feature is still quite new and the UI extension should come in 2.5.1. That is why I added a task downstream of the mapped task groups that prints out the list of results of the mul_two tasks, so you can check if it is working.

from airflow import DAG
from airflow.decorators import task, task_group
from datetime import datetime
from time import sleep

with DAG(
    dag_id="simple_mapping",
    schedule=None,
    start_date=datetime(2022, 12, 22),
    catchup=False
) as dag:

    @task
    def read_conf():
        return [10, 20, 30]


    @task_group
    def calculations(x):

        @task
        def add_one(x: int):
            sleep(x)
            return x + 1

        @task()
        def mul_two(x: int):
            return x * 2

        mul_two(add_one(x))


    @task
    def pull_xcom(**context):

        pulled_xcom = context["ti"].xcom_pull(
            task_ids=['calculations.mul_two'], 
            key="return_value"
        )
        print(pulled_xcom)


    calculations.expand(x=read_conf()) >> pull_xcom()

Hope this helps! :)

PS: you might want to set catchup=False unless you want to backfill a few weeks of tasks.

Upvotes: 2

Related Questions