Andrea Del Corto
Andrea Del Corto

Reputation: 191

Dynamic DAGs build using Database information

I'm a newbie with Airflow and I'm trying to figuring out which is the best approach to dynamically create a set of DAGs using the information retrieved from a DB. Currently I've thougth this possible solution:

# file: dags_builder_dag.py in DAG_FOLDER

# Get info to build required dags from DB
dag_info = api_getDBInfo()
# Dynamically create dags based on info retrieved
for dag in dag_info:
    dag_id = 'hello_world_child_{}'.format(str(dag['id']))
    default_args_child = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
    # Add dag to global scope to let airflow digest it.
    globals()[dag_id] = create_dag(dag_id, default_args_child)

However, if I am not wrong, all the dag files, including the one that generates all the dags in this example (dags_builder_dag.py), will be periodically parsed by Airflow and this means that the api_getDBInfo() will be executed at each parse. If I was right, which would be the best practices to avoid continuously execute api_getDBInfo(), that could be a time-consuming operation for the DB? Ideally, this information should be retrieved only when needed, let's say on a manual trigger.

Other possible workarounds that come to my mind:

# file: dags_builder_dag.py in DAG_FOLDER

buildDAGs = Variables.get('buildDAGs')
if buildDAGs == 'true':
  # Get info to build required dags from DB
  dag_info = api_getDBInfo()
  # Dynamically create dags based on info retrieved
  for dag in dag_info:
      dag_id = 'hello_world_child_{}'.format(str(dag['id']))
      default_args_child = {'owner': 'airflow', 'start_date': datetime(2021, 1, 1)}
      # Add dag to global scope to let airflow digest it.
      globals()[dag_id] = create_dag(dag_id, default_args_child)

Update

Thanks to @NicoE and @floating_hammer I found a solution that is suitable for my use case.

First try: Airflow variable as cache

I could use an Airflow Variable as a cache for data stored in the DB to avoid the continuous calls to "api_getDBInfo". In this way, however, I've another bottleneck: the Variable size. Airflow variables are key value pairs. Keys are of the length - 256. Value being stored in the metadata would be constrained by the size of the string supported by the metadata db. https://github.com/apache/airflow/blob/master/airflow/models/variable.py https://github.com/apache/airflow/blob/master/airflow/models/base.py

In my case I'm using Amazon MWAA and details related to the underlying metadatabase used by aws and its structure may be hard to find (actually I've not tried to investigate a lot). So I just performed a stress test forcing a lot of data inside the Variable to see what happens. Here below the results:

Data amount Results
~0,5 MB (current) No problems with write and read operations.
~50 MB (x100) No problems with write and read operations.
~125 MB (x250) No problems with write and read operations, but using the web console of airflow, is not possible to access the Variables section. Error 502 "Bad gateway" is returned from the server
~250 MB (x500) Write on the variable fails.

Second try: S3 file as a cache

Airflow Variables has a limit, as the previous test has shown, so I've tried to keep the same pattern, changing the Airflow Variable with an S3 file, and this works well for my specific use case considering that S3 doesn't have limits in space as Airflow variables.

Just to summarize:

  1. I've created a dag called: "sync_db_cache_dag" which every hour updates an S3 "db_cache.json" with data retrieved with api_getDBInfo(). Data is stored in JSON format.
  2. The script "dags_builder_dag.py" now retrieves data from "db_cache.json", in this way the DB is relieved of continuous calls to "api_getDBInfo".

Upvotes: 3

Views: 1678

Answers (1)

floating_hammer
floating_hammer

Reputation: 439

You could try the following steps.

  • Create a variable which will hold the configuration of tasks and number of tasks to create.

Create a DAG which gets triggered at a set frequency. The dag has two tasks.

  • Task 1 reads the database and populates the variable.
  • Task 2 reads the variables and creates the multiple Tasks.

Upvotes: 1

Related Questions