user3595632
user3595632

Reputation: 5730

Why does Airflow ExternalTaskSensor not work on the dag having PythonOperator?

1. Dag structure working well(without failed)

from datetime import datetime

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.dummy import DummyOperator
from airflow.sensors.external_task import ExternalTaskSensor


def temp_task():
    print(1)

a_dag = DAG(
    dag_id='a_dag1', default_args={'owner': 'brownbear'}, start_date=datetime(2021, 11, 6, 0, 0, 0),
    schedule_interval="*/1 * * * *", tags=['external'],
    catchup=False
)
with a_dag:
    start = DummyOperator(task_id='wow1')
    end = DummyOperator(task_id='wow2')
    start >> end


b_dag = DAG(
    dag_id='a_dag2', default_args={'owner': 'brownbear'}, start_date=datetime(2021, 11, 6, 0, 0, 0),
    schedule_interval='*/1 * * * *', tags=['external'],
    catchup=False
)
with b_dag:
    downstream_task1 = ExternalTaskSensor(
        task_id="downstream_task1",
        mode='reschedule',
        external_dag_id='a_dag1',
        external_task_id="wow2",
        timeout=600,
    )
    start2 = DummyOperator(task_id='start2')
    start2 >> downstream_task1

enter image description here

2. Dag structure failed

def temp_task():
    print(1)

a_dag = DAG(
    dag_id='a_dag1', default_args={'owner': 'brownbear'}, start_date=datetime(2021, 11, 6, 0, 0, 0),
    schedule_interval="*/1 * * * *", tags=['external'],
    catchup=False
)
with a_dag:
    # Doens't work...
    task1 = PythonOperator(task_id='wow1', python_callable=temp_task)
    task2 = PythonOperator(task_id='wow2', python_callable=temp_task)
    task1 >> task2

enter image description here

airflow-scheduler_1  | [2022-08-19 01:12:02,238] {dag.py:2915} INFO - Setting next_dagrun for a_dag2 to 2022-08-19T01:12:00+00:00, run_after=2022-08-19T01:13:00+00:00
airflow-scheduler_1  | [2022-08-19 01:12:02,243] {dagrun.py:562} INFO - Marking run <DagRun a_dag1 @ 2022-08-19 01:11:00+00:00: scheduled__2022-08-19T01:11:00+00:00, externally triggered: False> successful
airflow-scheduler_1  | [2022-08-19 01:12:02,244] {dagrun.py:607} INFO - DagRun Finished: dag_id=a_dag1, execution_date=2022-08-19 01:11:00+00:00, run_id=scheduled__2022-08-19T01:11:00+00:00, run_start_date=2022-08-19 01:12:00.111676+00:00, run_end_date=2022-08-19 01:12:02.244042+00:00, run_duration=2.132366, state=success, external_trigger=False, run_type=scheduled, data_interval_start=2022-08-19 01:11:00+00:00, data_interval_end=2022-08-19 01:12:00+00:00, dag_hash=c05eae379e808492a6614dfda6985c68
airflow-scheduler_1  | [2022-08-19 01:12:02,248] {dag.py:2915} INFO - Setting next_dagrun for a_dag1 to 2022-08-19T01:12:00+00:00, run_after=2022-08-19T01:13:00+00:00
airflow-scheduler_1  | [2022-08-19 01:12:02,250] {dagrun.py:547} ERROR - Marking run <DagRun after_dag2 @ 2022-08-19 01:11:00+00:00: scheduled__2022-08-19T01:11:00+00:00, externally triggered: False> failed
airflow-scheduler_1  | [2022-08-19 01:12:02,251] {dagrun.py:607} INFO - DagRun Finished: dag_id=after_dag2, execution_date=2022-08-19 01:11:00+00:00, run_id=scheduled__2022-08-19T01:11:00+00:00, run_start_date=2022-08-19 01:12:00.112784+00:00, run_end_date=2022-08-19 01:12:02.251044+00:00, run_duration=2.13826, state=failed, external_trigger=False, run_type=scheduled, data_interval_start=2022-08-19 01:11:00+00:00, data_interval_end=2022-08-19 01:12:00+00:00, dag_hash=a87e590bae62e97d0798e39e15be9f55
airflow-scheduler_1  | [2022-08-19 01:12:02,255] {dag.py:2915} INFO - Setting next_dagrun for after_dag2 to 2022-08-19T01:12:00+00:00, run_after=2022-08-19T01:13:00+00:00
airflow-scheduler_1  | [2022-08-19 01:12:02,300] {scheduler_job.py:596} INFO - Executor reports execution of a_dag2.downstream_task1 run_id=scheduled__2022-08-19T01:11:00+00:00 exited with status queued for try_number 1
airflow-scheduler_1  | [2022-08-19 01:12:02,300] {scheduler_job.py:596} INFO - Executor reports execution of a_dag1.wow2 run_id=scheduled__2022-08-19T01:11:00+00:00 exited with status success for try_number 1
airflow-scheduler_1  | [2022-08-19 01:12:02,300] {scheduler_job.py:596} INFO - Executor reports execution of after_dag2.wait_for_task_2 run_id=scheduled__2022-08-19T01:11:00+00:00 exited with status success for try_number 1
airflow-scheduler_1  | [2022-08-19 01:12:02,303] {scheduler_job.py:630} INFO - Setting external_id for <TaskInstance: a_dag2.downstream_task1 scheduled__2022-08-19T01:11:00+00:00 [failed]> to d43c2bef-0f56-4556-b34e-af64158e0545
airflow-scheduler_1  | [2022-08-19 01:12:02,303] {scheduler_job.py:640} INFO - TaskInstance Finished: dag_id=after_dag2, task_id=wait_for_task_2, run_id=scheduled__2022-08-19T01:11:00+00:00, map_index=-1, run_start_date=2022-08-19 01:12:00.769613+00:00, run_end_date=2022-08-19 01:12:01.117768+00:00, run_duration=0.348155, state=failed, executor_state=success, try_number=1, max_tries=0, job_id=708751, pool=default_pool, queue=default, priority_weight=2, operator=ExternalTaskSensor, queued_dttm=2022-08-19 01:12:00.990796+00:00, queued_by_job_id=650667, pid=1357729
airflow-scheduler_1  | [2022-08-19 01:12:02,303] {scheduler_job.py:640} INFO - TaskInstance Finished: dag_id=a_dag1, task_id=wow2, run_id=scheduled__2022-08-19T01:11:00+00:00, map_index=-1, run_start_date=2022-08-19 01:12:01.506658+00:00, run_end_date=2022-08-19 01:12:01.658264+00:00, run_duration=0.151606, state=success, executor_state=success, try_number=1, max_tries=0, job_id=708754, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2022-08-19 01:12:00.990796+00:00, queued_by_job_id=650667, pid=1357738
airflow-scheduler_1  | [2022-08-19 01:12:03,357] {scheduler_job.py:596} INFO - Executor reports execution of a_dag2.downstream_task1 run_id=scheduled__2022-08-19T01:11:00+00:00 exited with status success for try_number 1
airflow-scheduler_1  | [2022-08-19 01:12:03,359] {scheduler_job.py:640} INFO - TaskInstance Finished: dag_id=a_dag2, task_id=downstream_task1, run_id=scheduled__2022-08-19T01:11:00+00:00, map_index=-1, run_start_date=2022-08-19 01:12:00.871492+00:00, run_end_date=2022-08-19 01:12:01.205089+00:00, run_duration=0.333597, state=failed, executor_state=success, try_number=1, max_tries=0, job_id=708753, pool=default_pool, queue=default, priority_weight=1, operator=ExternalTaskSensor, queued_dttm=2022-08-19 01:12:01.082837+00:00, queued_by_job_id=650667, pid=1357731
airflow-scheduler_1  | [2022-08-19 01:12:05,177] {dagrun.py:562} INFO - Marking run <DagRun after_dag1 @ 2022-08-19 01:11:00+00:00: scheduled__2022-08-19T01:11:00+00:00, externally triggered: False> successful
airflow-scheduler_1  | [2022-08-19 01:12:05,177] {dagrun.py:607} INFO - DagRun Finished: dag_id=after_dag1, execution_date=2022-08-19 01:11:00+00:00, run_id=scheduled__2022-08-19T01:11:00+00:00, run_start_date=2022-08-19 01:12:00.112426+00:00, run_end_date=2022-08-19 01:12:05.177649+00:00, run_duration=5.065223, state=success, external_trigger=False, run_type=scheduled, data_interval_start=2022-08-19 01:11:00+00:00, data_interval_end=2022-08-19 01:12:00+00:00, dag_hash=edf550ca4ca8e90dfdeb6b1d2a06c789
airflow-scheduler_1  | [2022-08-19 01:12:05,182] {dag.py:2915} INFO - Setting next_dagrun for after_dag1 to 2022-08-19T01:12:00+00:00, run_after=2022-08-19T01:13:00+00:00
airflow-scheduler_1  | [2022-08-19 01:12:05,208] {scheduler_job.py:596} INFO - Executor reports execution of after_dag1.b_task run_id=scheduled__2022-08-19T01:11:00+00:00 exited with status success for try_number 1
airflow-scheduler_1  | [2022-08-19 01:12:05,211] {scheduler_job.py:640} INFO - TaskInstance Finished: dag_id=after_dag1, task_id=b_task, run_id=scheduled__2022-08-19T01:11:00+00:00, map_index=-1, run_start_date=2022-08-19 01:12:01.641734+00:00, run_end_date=2022-08-19 01:12:04.801520+00:00, run_duration=3.159786, state=success, executor_state=success, try_number=1, max_tries=0, job_id=708755, pool=default_pool, queue=default, priority_weight=1, operator=PythonOperator, queued_dttm=2022-08-19 01:12:01.082837+00:00, queued_by_job_id=650667, pid=1357739

Upvotes: 1

Views: 201

Answers (1)

Hussein Awala
Hussein Awala

Reputation: 5096

I just tested your code with airflow from 2.3.0 to 2.3.3, it didn't work with 2.3.0 but it works normally with the other versions, so it seems that there was a bug solved in 2.3.1.

Upvotes: 1

Related Questions