Pak Hang Leung
Pak Hang Leung

Reputation: 389

Use list output from a PythonOperator to iterate another operator in Airflow2

Background

So now we want to first get a list from one operator, and then iterate the result and run another operator.

The script is as follows:

def hello_world(ti, execution_date, **context):
    # Do sth here and generate the value final_output
    ti.xcom_push(key='whatever', value=final_output)


dag = DAG(
    "test",
    schedule_interval=None,
    start_date=datetime.datetime(2021, 5, 17),
    catchup=False,
)

with dag:
    t1 = PythonOperator(
        task_id="hello_world",
        python_callable=hello_world,
    )

    outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') }}"

    for x in outcome_list: 
       t2 = PythonOperator(
           task_id="test_{x}",
           python_callable=do_sth,
           op_kwargs={"input_param": x},
       )

     t1 >> t2

The current situation is, we managed to get the xcom variable. The list is always around with 60 elements, which will not cause any performance issue. However, it is returned as a string of list.

To iterate it, we want to transform it to a list and pass to the function which runs the operator in t2

Current issue

The outcome_list is generated via jinja template and saved as a str like this

['user_A US', 'user_B BR' , ..... ] 

We tried to convert the outcome_list into a proper python string with the following function in the DAG:

outcome_list = outcome_list.strip("[]").split(", ")

It returns error as follows

jinja2.exceptions.TemplateSyntaxError: unexpected end of template, expected ','.

And when we tried to convert the output into list with jinja syntax

outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') | list }}"

We got error when performing the loop, said that it is not itertable.

Whats's the issue here and how should we process? Thank you for the help!!

Upvotes: 1

Views: 2268

Answers (2)

Jelena Lazarevic
Jelena Lazarevic

Reputation: 152

For Airflow <2.3.0:

You can, it's not recommended though, use variables like this:

def hello_world(ti, execution_date, **context):
    # Do sth here and generate the value final_output
    Variable.update("var_name", value=final_output) variable needs to be created first
    # Variable.set("var_name", value=final_output) will create variable if doesn't exist, delete and create if exists

dag = DAG(
    "test",
    schedule_interval=None,
    start_date=datetime.datetime(2021, 5, 17),
    catchup=False,
)

with dag:
    t1 = PythonOperator(
        task_id="hello_world",
        python_callable=hello_world,
    )

    for x in Varaible.get("var_name"): 
       t2 = PythonOperator(
           task_id=f"test_{x}",
           python_callable=do_sth,
           op_kwargs={"input_param": x},
       )

     t1 >> t2

When Airflow checks for the value of an Airflow variable or connection, it does so in the following order of precedence:

  1. Secrets backend
  2. Environment variable
  3. Set via the Airflow UI

The issue with this approach is that based on MIN_FILE_PROCESS_INTERVAL ,default is 30 seconds, airflow will try to render DAGs and therefor send request to the place where variable is created in the order above every 30 seconds. That can put a lot of pressure on webserver and scheduler (if output is big list) and of course secret backend and metadata db.

You can increase value of MIN_FILE_PROCESS_INTERVAL if your use case allows it.

Upvotes: 1

Elad Kalif
Elad Kalif

Reputation: 15931

Placing outcome_list = "{{ ti.xcom_pull(key='whatever',task_ids='hello_world') }}" outside of operator scope is not going to work because this string is not going to be templated.

What you are seeking is creating tasks in a map-reduce kind of way during run time.

For Airflow <2.3.0:

This is not possible with. You can not create task_id(s) based on output of previous task.

For Airflow>=2.3.0:

a new feature added AIP-42 Dynamic Task Mapping This allows to create tasks based on output of previous tasks. Example:

from airflow.decorators import task
@task
def make_list():
    # This can also be from an API call, checking a database, -- almost anything you like, as long as the
    # resulting list/dictionary can be stored in the current XCom backend.
    return [1, 2, {"a": "b"}, "str"]

@task
def consumer(arg):
    print(repr(arg))


with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag:
    consumer.expand(arg=make_list())

enter image description here

enter image description here

Note: by the time of writing this answer Airflow 2.3.0 isn't released yet. However 2.3.0b1 is released so you can test your code. We expect to release official version in the upcoming weeks.

Upvotes: 3

Related Questions