Reputation: 11337
We are using Airflow 2.1.4 and running on Kubernetes.
We have separated pods for web-server, scheduler and we are using Kubernetes executors.
We are using variety of operator such as PythonOperator
, KubernetesPodOperator
etc.
Our setup handles ~2K customers (businesses) and each one of them has it's own DAG.
Our code looks something like:
def get_customers():
logger.info("querying database to get all customers")
return sql_connection.query(SELECT id, name, offset FROM customers_table)
customers = get_customers()
for id, name, offset in customers:
dag = DAG(
dag_id=f"{id}-{name}",
schedule_interval=offset,
)
with dag:
first = PythonOperator(..)
second = KubernetesPodOperator(..)
third = SimpleHttpOperator(..)
first >> second >> third
globals()[id] = dag
In the snippet above is a simplified version of what we've got, but we have a few dozens of operators in the DAG (and not just three).
The problem is that for each one of the operators in each one of the DAGs we see the querying database to get all customers
log - which means that we query the database a way more than we want to.
The database doesn't updated frequently and we can update the DAGs only once-twice a day. I know that the DAGs are being saved in the metadata database or something..
In our case, ~60 operators X ~2,000 customers = ~120,000 queries to the database.
Upvotes: 1
Views: 193
Reputation: 20067
Yes this is entirely expected. The DAGs are parsed by Airflow regularly (evey 30 second by default) so any top-level code (the one that is executed during parsing the file rather than "execute" methods of operators) is executed then.
Simple answer (and best practice) is "do not use any heavy operations in the top-level code of your DAGs". Specifically do not use DB queries. But if you want some more specific answers and possible ways how you can handle it, there are dedicated chapters about it in Airflow documentation about best practices:
This is explanation why Top-Level code should be "light" https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code
This one is about strategies you might use to avoid "heavy" operations in Top-level code when you do dynamic DAG generation as you do in your case: https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#dynamic-dag-generation
In short there are three proposed ways:
You could use either 2) or 3) to achive your goal I believe.
Upvotes: 1