Reputation: 101
I have a requirement where I have to do xcom pull to fet few details from previous task.I tried many options but did not work in Airflow 2.2.3. Here is a sample code example
from airflow.operators.python_operator import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
def read_files_from_table():
# Function to read the table and fetch the list of files to be processed
files = ['file1.txt', 'file2.csv', 'file3.json', 'x_file.txt']
return files
def process_file(file_path):
# Function to process each file
print(f"Processing file: {file_path}")
# TODO: Add code to process the file
default_args = {
'start_date': datetime(2022, 1, 1)
}
with DAG('process_files', default_args=default_args, schedule_interval='@daily') as dag:
# Task 1: Read table and fetch list of files to be processed
task1 = PythonOperator(task_id='read_files_from_table',
python_callable=read_files_from_table,
dag=dag)
# Task 2: Process each file
with TaskGroup('process_files_task_group') as task_group:
# files = "{{ ti.xcom_pull(task_ids='read_files_from_table') }}"
files = task1.output
for file_path in files:
if 'x_' not in file_path:
task2 = PythonOperator(task_id=f'process_file_{file_path}',
python_callable=process_file,
op_kwargs={'file_path': file_path},
provide_context=True,
dag=dag)
task_group.add(task2)
# Define the task dependencies
task1 >> task_group
Here read_files_from_table method is simple example i have given but its basically I read from sftp server and get list of files to be processed and based on that I have to loop the task for no of files.
Any suggestions?
Upvotes: 1
Views: 1819
Reputation: 51
You need to work with task decorators to perform iteration over the previous task output. I have answered the solution for this exact use case hhttps://stackoverflow.com/questions/77479560/airflow-xcom-pull-in-taskgroup/78201205#78201205
Upvotes: 0
Reputation: 56
Firstly, to create dynamic Airflow tasks you should update Airflow to at least 2.3.0 version. Secondly, you should restructure your code as below. And one more suggestion: it's better to start using airflow decorators, it's a more declarative way to create tasks, task groups, etc. Read more about the concept of dynamic tasks here or here
from datetime import datetime
from airflow import DAG
from airflow.decorators import task, task_group
@task
def read_files_from_table():
# Function to read the table and fetch the list of files to be processed
files = ['file1.txt', 'file2.csv', 'file3.json', 'x_file.txt']
return files
@task
def process_file(file_path):
# Function to process each file
print(f"Processing file: {file_path}")
# TODO: Add code to process the file
default_args = {
'start_date': datetime(2022, 1, 1)
}
with DAG('process_files', default_args=default_args, schedule_interval='@daily') as dag:
@task_group
def process_files_task_group(file_path):
process_file(file_path)
# expand method is used for the creation of dynamic tasks
process_files_task_group.expand(file_path=read_files_from_table())
You can see the DAG representation by following this link DAG representation
Upvotes: 1