Reputation: 1699
In our Big Data project there are ~3000 tables to load, all of these tables should be processed by a separate DAG in Airflow.
In our solution a single python file generates every type of table loaders so they can be triggered separately via REST API on event-based manner via Cloud Function. Therefore we generate our DAGs by using:
Unfortunately we are bound to version Airflow v1.x.x
Problem:
We have noticed that the Airflow/Cloud Composer is signigicantly slower between the task executions when multiple DAGs are generated. When only 10-20 DAGs are generated the time between the Task executions is much faster then we have 100-200 DAGs. When 1000 DAGs are generated it takes minutes to start a new task after finishing a preceeding task for a given DAG even when a no other DAGs are executed.
We don't understand why the Task execution times are affected that hard by the number of generated DAGs. Shouldn't be near constant time for Airflow to search in it's metabase for the required parameters for the TaskInstances? We are not sure if the Cloud Composer is configured/scaled/managed properly by Google.
Questions:
This is a very simple example of the generator code what we use:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def create_dag(dag_id, schedule, dag_number, default_args):
def example(*args):
print('Example DAG: {}'.format(str(dag_number)))
dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
with dag:
t1 = PythonOperator(task_id='example', python_callable=example)
return dag
for dag_number in range(1, 5000):
dag_id = 'Example_{}'.format(str(dag_number))
default_args = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
globals()[dag_id] = create_dag(dag_id, '@daily', dag_number, default_args)
Upvotes: 2
Views: 1166
Reputation: 20047
Yes. That is a known problem. It's been fixed in Airflow 2.
It's inherent to how processing of DAG files were done in Airflow 1 (mainly about the number of queries generated).
Other than migrating to Airflow 2, there is not much you can do. Fixing that required complete refactoring and half-rewriting of Airflow scheduler logic.
One way to mitigiate it - you could potentially, rather than generate all DAGs from single file, split it to many of those. For example rather than generating DAG objects in the single Python file, you could generate 3000 separate, dynamically generated small DAG files. This will scale much better.
However, the good news is that in Airflow 2 this is Many many times faster and scalable. And Airlfow 1.10 reached EOL and is not supported any more and will not receive any more updates. So rather than changing process I'd heartily recommend to migrate.
Upvotes: 4