Darshan Mehta
Darshan Mehta

Reputation: 30819

Airflow : ExternalTaskSensor doesn't trigger the task

I have already seen this and this questions on SO and made the changes accordingly. However, my dependent DAG still gets stuck in poking state. Below is my master DAG:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': 0,
    'start_date': datetime(today.year, today.month, today.day),
    'schedule_interval': '@once'
}

dag = DAG('call-procedure-and-bash', default_args=default_args)

call_procedure = JdbcOperator(
    task_id='call_procedure',
    jdbc_conn_id='airflow_db2',
    sql='CALL AIRFLOW.TEST_INSERT (20)',
    dag=dag
)

call_procedure

Below is my dependent DAG:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': 0,
    'start_date': datetime(today.year, today.month, today.day),
    'schedule_interval': '@once'
}

dag = DAG('external-dag-upstream', default_args=default_args)

task_sensor = ExternalTaskSensor(
    task_id='link_upstream',
    external_dag_id='call-procedure-and-bash',
    external_task_id='call_procedure',
    execution_delta=timedelta(minutes=-2),
    dag=dag
)

count_rows = JdbcOperator(
    task_id='count_rows',
    jdbc_conn_id='airflow_db2',
    sql='SELECT COUNT(*) FROM AIRFLOW.TEST',
    dag=dag
)

count_rows.set_upstream(task_sensor)

Below are the logs of dependent DAG once the master DAG gets executed:

[2019-01-10 11:43:52,951] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:44:52,955] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:45:52,961] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:46:52,949] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:47:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:48:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:49:52,905] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 

Below are the logs of master DAG execution:

[2019-01-10 11:45:20,215] {{jdbc_operator.py:56}} INFO - Executing: CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:21,477] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:21,476] {{dbapi_hook.py:166}} INFO - CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:24,139] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:24,137] {{jobs.py:2627}} INFO - Task exited with return code 0

My assumption is, Airflow should trigger the dependent DAG if the master runs fine? I have tried playing around with execution_delta but that doesn't seem to work.

Also, schedule_interval and start_date are same for both of the DAGs so don't think that should cause any trouble.

Am I missing anything?

Upvotes: 10

Views: 10156

Answers (4)

peschü
peschü

Reputation: 1349

I had this problem because of a summer/winter time change: "1 day before" means "exactly 24 hours before" so if the time zone has daylight savings time change in between, the DAG is stuck.

One way out of this is to manually set it as successful.

The other way would be to use the execution_date_fn argument and manually calculate the time difference correctly in this case.

Upvotes: 0

Afz Abd
Afz Abd

Reputation: 493

Hope you are not triggering DAG manually. If you want to test it let the DAG run as per the schedule and then monitor the DAG runs.

Upvotes: 5

psius1
psius1

Reputation: 51

Make sure both DAGs start at the same time and you don't start either DAGs manually.

Upvotes: 5

dlamblin
dlamblin

Reputation: 45321

It may be that you should use a positive timedelta: https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html because when subtracting the execution delta it's going to end up looking for a task that ran 2 minutes after itself.

However the delta isn't really a range, the TI has to have a matching Dag ID, Task ID, successful result and also an execution date in the list of datetimes. Which when you give execution_delta as a delta, is a list of one datetime taking the current execution date and subtracting the timedelta.

This probably comes down to you either removing the timedelta so that the two execution dates match and the sensor will wait until the other task is successful, OR your start date and schedule interval being set as basically today and @once are getting execution dates not in predictable lock-step with each other. You could try setting say datetime(2019,1,10) and 0 1 * * * to get them to both run daily at 1am (again without an execution_delta).

Upvotes: 1

Related Questions