user4046073
user4046073

Reputation: 871

How to find out the delayed jobs in airflow

Some of my DAG are waiting to get scheduled, and some are waiting in the queue. I suspect there are reasons for this delay but not sure how I can start to debug this problem. Majority of the pipelines are running Spark jobs.

Can someone help to give me some directions in terms of where to look at to 1) anaylse which DAGs were delayed (did not start at the scheduled time) 2) where are the places I should look at to find out if the resources are enough. I'm quite new to scheduling in Airflow. Many thanks. Please let me know if I can describe the question better.

Upvotes: 0

Views: 1083

Answers (2)

dimButTries
dimButTries

Reputation: 878

If you are looking for code that takes advantage of Airflows' wider capabilities.

There are three modules within airflow.models which can be harnessed.

  1. To programmatically retrieve all DAGs which your Airflow is away of, we import DagBag. From the docs "A dagbag is a collection of dags, parsed out of a folder tree and has high"
  2. We utilise DagModel and the method get_current, to initialise each dag_id present in our bag
  3. We check if any DAG is active using the DagModel property is_paused
  4. We retrieve the latest DAG run using the DagRun.find
  5. Sort the individual dag runs by latest to earliest
  6. Here you could just subset [0] to get 1, however, for your debugging purposes I just loop through them all
  7. DagRun returns a lot of information for us to use. In my loop I have output print(i, run.state, run.execution_date, run.start_date). So you can see what is going on under the hood.

id state dag_id queued_at execution_date start_date end_date run_id data_interval_start data_interval_end last_scheduling_decision

  1. I have commented out an if check for any queued Dags for you to uncomment. Additionally you can do some arithmetic on dates if you desire, to add further conditional functionality.
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import DagBag, DagModel, DagRun
from airflow.operators.python import PythonOperator


# make a function that returns if a DAG is set to active or paused

def check_dag_active():
    bag = DagBag()
    for dag_id in bag.dags:
        in_bag = DagModel.get_current(dag_id)
        if not in_bag.is_paused:
            latest = DagRun.find(dag_id=dag_id)
            latest.sort(key=lambda x: x.execution_date, reverse=True)
            for i, run in enumerate(latest):
                print(i, run.state, run.execution_date, run.start_date)
                # if run.state == 'queued':
                #     return [run.dag_id, run.execution_date, run.start_date]

with DAG(
  'stack_overflow_ans_3',
  tags = ['SO'],
  start_date = datetime(2022, 1, 1),
  schedule_interval = None,
  catchup = False,
  is_paused_upon_creation = False
) as dag:

  t1 = PythonOperator(
    task_id = 'task_that_will_fail',
    python_callable = check_dag_active
  )

Upvotes: 1

Thom Bedford
Thom Bedford

Reputation: 387

Depending on your version of Airflow and your setup, you should be able to query the Airflow DB directly to get this information.

If you're using Airflow 1.x, there should be an "Ad Hoc Query" executor in the Data Profiling tab in the UI. This was disabled in 2.x though, so if you're running 2.x you'll need to connect directly to your Airflow DB using psql or something similar (this differs from Google to AWS to Docker).

Once you're in, check out this link for some queries on DAG runtime.

Upvotes: 1

Related Questions