Kumar Padhy
Kumar Padhy

Reputation: 101

How pull xcom value between task and taskgroup?

I have a requirement where I have to do xcom pull to fet few details from previous task.I tried many options but did not work in Airflow 2.2.3. Here is a sample code example

from airflow.operators.python_operator import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime

def read_files_from_table():
    # Function to read the table and fetch the list of files to be processed
    files = ['file1.txt', 'file2.csv', 'file3.json', 'x_file.txt']
    return files

def process_file(file_path):
    # Function to process each file
    print(f"Processing file: {file_path}")
    # TODO: Add code to process the file

default_args = {
    'start_date': datetime(2022, 1, 1)
}

with DAG('process_files', default_args=default_args, schedule_interval='@daily') as dag:

    # Task 1: Read table and fetch list of files to be processed
    task1 = PythonOperator(task_id='read_files_from_table',
                           python_callable=read_files_from_table,
                           dag=dag)
    
    # Task 2: Process each file
    with TaskGroup('process_files_task_group') as task_group:
        # files = "{{ ti.xcom_pull(task_ids='read_files_from_table') }}"
        files = task1.output
        for file_path in files:
            if 'x_' not in file_path:
                task2 = PythonOperator(task_id=f'process_file_{file_path}',
                                      python_callable=process_file,
                                      op_kwargs={'file_path': file_path},
                                      provide_context=True,
                                      dag=dag)
                task_group.add(task2)

    # Define the task dependencies
    task1 >> task_group

Here read_files_from_table method is simple example i have given but its basically I read from sftp server and get list of files to be processed and based on that I have to loop the task for no of files.

Any suggestions?

Upvotes: 1

Views: 1819

Answers (2)

Santosh Gopane
Santosh Gopane

Reputation: 51

You need to work with task decorators to perform iteration over the previous task output. I have answered the solution for this exact use case hhttps://stackoverflow.com/questions/77479560/airflow-xcom-pull-in-taskgroup/78201205#78201205

Upvotes: 0

Dmitry Zhyhimont
Dmitry Zhyhimont

Reputation: 56

Firstly, to create dynamic Airflow tasks you should update Airflow to at least 2.3.0 version. Secondly, you should restructure your code as below. And one more suggestion: it's better to start using airflow decorators, it's a more declarative way to create tasks, task groups, etc. Read more about the concept of dynamic tasks here or here

from datetime import datetime

from airflow import DAG
from airflow.decorators import task, task_group


@task
def read_files_from_table():
    # Function to read the table and fetch the list of files to be processed
    files = ['file1.txt', 'file2.csv', 'file3.json', 'x_file.txt']
    return files


@task
def process_file(file_path):
    # Function to process each file
    print(f"Processing file: {file_path}")
    # TODO: Add code to process the file


default_args = {
    'start_date': datetime(2022, 1, 1)
}

with DAG('process_files', default_args=default_args, schedule_interval='@daily') as dag:
    @task_group
    def process_files_task_group(file_path):
        process_file(file_path)
    # expand method is used for the creation of dynamic tasks
    process_files_task_group.expand(file_path=read_files_from_table())

You can see the DAG representation by following this link DAG representation

Upvotes: 1

Related Questions