Mukul Jain
Mukul Jain

Reputation: 1927

Use DB to generate airflow tasks dynamically

I want to run an airflow dag like so ->

This is my code

dag = DAG('deploy_single', catchup=False, default_args=default_args, schedule_interval='16 15 * * *')

t1 = BashOperator(
        task_id='dummy_task',
        bash_command='echo hi > /tmp/hi',
        queue='W1_queue',
        dag=dag)

get_all_engines = "select full_command, queue_name from internal_airflow_hosts where logical_group = 'live_engines';"

db_creds = json.loads(open('/opt/airflow/db_creds.json').read())
conn_dict = db_creds["airflowdb_local"]
connection = psycopg2.connect(**conn_dict)

cursor = connection.cursor()

cursor.execute(get_all_engines)
records = cursor.fetchall()
i = 1
for record in records:
    t = BashOperator(
        task_id='script_test_'+str(i),
        bash_command="{full_command} ".format(full_command=str(record[0])),
        queue=str(record[1]),
        dag=dag)
    t.set_upstream(t1)
    i += 1

cursor.close()
connection.close()

However, when I run this, the task on W1 completed successfully but all tasks on W2 failed. In the airflow UI, I can see that it can resolve the correct number of tasks (10 in this case) but each of these 10 failed.

Looking at the logs, I saw that on W2 (which is on a different machine), airflow could not find the db_creds.json file.

I do not want to provide the DB creds file to W2.

My question is how can an airflow task be created dynamically in this case? Basically i want to run a DB query on the airflow server and assign tasks to one or more workers based on the results of that query. The DB will contain updated info about which engines are active etc I want the DAG to reflect this. From logs, it looks like each worker runs the DB query. Providing access to DB to each worker is not an option.

Upvotes: 3

Views: 2372

Answers (2)

Mukul Jain
Mukul Jain

Reputation: 1927

Thank you @viraj-parekh and @cwurtz.

After much trial and error, found the correct way to use airflow variables for this case.

Step 1) We create another script called gen_var.pyand place it in the dag folder. This way, the scheduler will pick up and generate the variables. If the code for generating variables is within the deploy_single dag then we run into the same dependency issue as the worker will try and process the dag too.

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/airbnb/airflow/blob/master/airflow/example_dags/tutorial.py
"""
import json
import psycopg2
from airflow.models import Variable
from psycopg2.extensions import AsIs

get_all_engines = "select full_command, queue_name from internal_airflow_hosts where logical_group = 'live_engines';"

db_creds = json.loads(open('/opt/airflow/db_creds.json').read())
conn_dict = db_creds["airflowdb_local"]
connection = psycopg2.connect(**conn_dict)

cursor = connection.cursor()

cursor.execute(get_all_engines)
records = cursor.fetchall()

hosts = {}
i = 1
for record in records:
    comm_dict = {}
    comm_dict['full_command'] = str(record[0])
    comm_dict['queue_name'] = str(record[1])
    hosts[i] = comm_dict
    i += 1

cursor.close()
connection.close()

Variable.set("hosts",hosts,serialize_json=True)

Note the call to serialize_json. Airflow will try to store the variable as a string. If you want it to be stored as a dict, then use serialize_json=True. Airflow will still store it as string via json.dumps

Step 2) Simplify the dag and call this "hosts" variable (now deserialize to get back the dict) like so -

hoztz = Variable.get("hosts",deserialize_json=True)
for key in hoztz:
    host = hoztz.get(key)
    t = BashOperator(
        task_id='script_test_'+str(key),
        bash_command="{full_command} ".format(full_command=str(host.get('full_command'))),
        queue=str(host.get('queue_name')),
        dag=dag)
    t.set_upstream(t1)

Hope it helps someone else.

Upvotes: 5

Viraj Parekh
Viraj Parekh

Reputation: 1381

One way to do this would be to store the information in an Airflow Variable.

You can fetch the information needed to dynamically generate the DAG (and necessary configs) in a Variable and have W2 access it from there.

Variables are an airflow model that can be used to store static information (information that does not have an associated timestamp) that all tasks can access.

Upvotes: 2

Related Questions