ethanenglish
ethanenglish

Reputation: 1327

Generating multiple tasks within Airflow DAG to be processed individually

There are parts of my DAG that generate lists which I cannot break up into individual tasks to be processed individually downstream.

Here's a pseudo example:

def push(**kwargs):
    # """Pushes an XCom without a specific target"""
    for n in range(10):
        kwargs['ti'].xcom_push(key=f'vals', value=n)

def puller(**kwargs):
    ti = kwargs['ti']
    v1 = ti.xcom_pull(key='vals', task_ids='push')
    print(v1)

push = python_operator.PythonOperator(
    task_id='push',
    python_callable=push,
    provide_context=True
)

puller = python_operator.PythonOperator(
    task_id='puller',
    python_callable=puller,
    provide_context=True
)

It appears the xcom_push only uses the last value, not generates a list. Therefore, I'd have to load the values in push into a list then use a for loop in the pull to process each item individually.

I'm completely fine doing that but it seems counter-intuitive for doing batch jobs.

How would I have the puller pull one of the 10 tasks generated by push?

Upvotes: 1

Views: 2664

Answers (1)

dlamblin
dlamblin

Reputation: 45361

Between DAG runs, you shouldn't be changing the structure of the DAG, so your puller is either one task, meant to pull all the values, or 10 tasks each meant to pull one of the values.

Here's how you'd push all 10 values with xcom:

def push(**kwargs):
    # """Pushes an XCom without a specific target"""
    final_output = []
    for n in range(10):
        # doing work
        final_output.append(n)
    kwargs['ti'].xcom_push(key=f'vals', value=final_output)

push = python_operator.PythonOperator(
    task_id='push',
    python_callable=push,
    provide_context=True
)

And then you can either pull all 10 of them like this

def puller(**kwargs):
    ti = kwargs['ti']
    v1 = ti.xcom_pull(key='vals', task_ids='push')
    print(v1)  # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

puller = python_operator.PythonOperator(
    task_id='puller',
    python_callable=puller,
    provide_context=True
)

Or one value for each of ten tasks:

def puller(index=0, **kwargs):
    ti = kwargs['ti']
    v1 = ti.xcom_pull(key='vals', task_ids='push')[index]
    print(v1)

ten_ops = [python_operator.PythonOperator(
        task_id=f'puller_{n}',
        python_callable=puller,
        provide_context=True,
        op_kwargs={'index': n},
    ) for n in range(10)]

I hope that helps, unless I misunderstood the question.

Upvotes: 4

Related Questions