Reputation: 1369
I have a DAG that has below Steps :-
Retrieve a list of items from an API call
For each item in the list, spin up another task that prints the value.
Basically, step 2 is indeterministic until the API call is made. I want the API call to be made only after I trigger a DAG run.
However, the Step1 of the DAG is being executed while importing the DAG itself, and if the API call is not working, then it reports DAG as broken. The entire thing is supposed to be dynamic.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import requests
# Default args for the DAG
default_args = {
'owner': 'me',
'start_date': datetime(2025, 1, 1),
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Create a DAG instance
dag = DAG(
'my_dag_id',
default_args=default_args,
schedule=None,
)
def get_items():
"""
Makes a HTTP request to an API,
retrieves a list of items from the response,
and returns the list
"""
response = requests.get('https://api.example.com/items')
items = response.json()['items']
return items
def process_item(item):
"""
Processes a single item
"""
print(f'Processing item {item}')
# Create a PythonOperator to get the items
get_items_task = PythonOperator(
task_id='get_items',
python_callable=get_items,
dag=dag,
)
# Create a PythonOperator to process each item
for item in get_items():
task = PythonOperator(
task_id=f'process_item_{item}',
python_callable=process_item,
op_args=[item],
dag=dag,
)
task.set_upstream(get_items_task)
Notice that I have set start date to future and schedule=None.
As soon as I save this py
file in the /dags folder, it immediately executes the get_items_task and reports that DAG is broken because the get_items api call returned error.
How can I stop the task from getting executed while importing DAG? I want it to be dynamic i.e., fetch list of items only once the DAG is triggered, and then only create tasks for each of those items dynamically.
Upvotes: 0
Views: 828
Reputation: 3094
You're calling get_items()
in the global scope of the DAG file (statement for item in get_items():
). This gets evaluated every time Airflow parses the DAG file.
To avoid get_items()
getting executed in the global scope, you can place this functionality in a function, to only generate tasks at runtime. For this use case, dynamic task mapping was introduced in Airflow. This allows you to generate a varying number of tasks given a collection of items.
I've refactored your DAG to generate tasks in the process_item
task given the output of get_items
:
from datetime import datetime, timedelta
import requests
from airflow import DAG
from airflow.decorators import task
# Default args for the DAG
default_args = {
"owner": "me",
"depends_on_past": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# Create a DAG instance
with DAG(
"my_dag_id",
default_args=default_args,
start_date=datetime(2025, 1, 1),
schedule=None,
):
@task
def get_items():
"""
Makes a HTTP request to an API,
retrieves a list of items from the response,
and returns the list
"""
response = requests.get("https://api.example.com/items")
items = response.json()["items"]
return items
@task
def process_item(item):
"""
Processes a single item
"""
print(f"Processing item {item}")
process_item.expand(item=get_items())
expand()
generates a task for each element in the output of get_items()
. The TaskFlow API (@task
decorator) is convenient when dealing with dynamically generated tasks, read more about it in the docs.
Upvotes: 1