sumit
sumit

Reputation: 49

Airflow Scheduler Not Recognizing Unpickled DAGs Until Metadata is Manually Updated

I am working with Apache Airflow, I am creating DAG object at runtime by using some configuration placed at some DB, the code that is calling this script is places under dags folder where airflow schedule is executing it and that script is returning DAG object, now what I am doing is I am storing this created object in postgresql by pickling the object(because if there is no change in configuration there is no point in executing this entire DAG creation code), using Dill for pickling as airflow also uses the same, all going well but I'm experiencing an issue where the scheduler does not recognise DAGs that I retrieve from a database and unpickled, while it does recognise newly created DAG objects and also adds entry to dag_pickle table. Specifically, the behaviour is as follows:

Fresh DAG Objects:

When I create a new DAG object and add it to the scheduler, it gets picked up immediately, and the last_parsed_time in the dag table is updated correctly.

Unpickled DAG Objects:

When I retrieve a pickled DAG object from a postresql or mongodb database(issue is independent of db used), unpickle it, and add it to the scheduler, it does not get recognized.

The scheduler logs indicate that it deactivates and deletes the DAG, stating it is missing:

[INFO] DAG example_dag is missing and will be deactivated.
[INFO] Deleted DAG example_dag in serialized_dag table

Manual Metadata Update:

If I manually update the last_parsed_time for the unpickled DAG in the dag table by making connection to dag table of airflow, the scheduler recognizes it and it appears in the Airflow UI.

Observations

Fresh DAG: last_parsed_time is updated automatically by the scheduler also, its making entry to dag_pickle table.

Unpickled DAG: Scheduler does not update last_parsed_time and deactivates the DAG unless last_parsed_time is manually set by making db connection to dag table and updating with current timestamp with timezone.

Questions

Why does the scheduler fail to recognize and update the metadata for unpickled DAG objects?

What additional steps can be taken to ensure that unpickled DAGs are treated the same as newly created DAGs?

Is there a specific function or hook in Airflow that needs to be called to ensure that unpickled DAGs are fully registered and recognized by the scheduler?

How airflow scheduler actually picks up DAG object? Is there some flag in DAG object using which it can determine this object is a new object, and if we use DAG object that was created earlier, it can identify that and discard that as its not the latest one?

Any insights or suggestions to address this behavior would be greatly appreciated.

[UPDATE]

I was debugging the code and found out that the pickled object that I stored in database and fetching it and unpickling it using dill, airflow scheduler not considering it entirely, its just that because I updated last_parsed_time by making connection to dag table, airflow recognised that it was updated by scheduler.

Upvotes: 0

Views: 108

Answers (0)

Related Questions