Reputation: 1
I have two DAGs to test the use of ExternalTaskSensor
However, the ExternalTaskSensortask just keeps saying Poking and doesn't finish.
What could be happening?
These are my DAGs:
from datetime import timedelta, datetime
import pendulum
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
dag_id = 'teste_dagA'
default_args = {
'owner': 'Engineering',
'retries': 2,
'retry_delay': timedelta(minutes=1),
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
}
with DAG(
dag_id=dag_id,
tags=['teste', 'lakehouse', 'mediator'],
default_args=default_args,
description='teste',
schedule_interval=timedelta(minutes=3),
start_date=datetime(2025, 1, 15,0,0,0),
catchup=False,
) as dag:
empty_start_task = EmptyOperator(task_id='empty-start-task')
empty_end_task = BashOperator(
task_id='end',
bash_command='echo "Tarefa concluída com sucesso!"'
)
empty_start_task >> empty_end_task
from datetime import timedelta, datetime
import pendulum
from airflow.models import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.external_task import ExternalTaskSensor
dag_id = 'teste_mediator'
default_args = {
'owner': 'Engineering',
'retries': 2,
'retry_delay': timedelta(minutes=1),
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
}
with DAG(
dag_id=dag_id,
tags=['teste', 'lakehouse', 'mediator'],
default_args=default_args,
description='teste',
schedule_interval=timedelta(minutes=3),
start_date=datetime(2025, 1, 15,0,0,0),
catchup=False,
) as dag:
waiting = ExternalTaskSensor(
task_id='waiting',
external_dag_id='teste_dagA',
external_task_id='end',
execution_delta=timedelta(minutes=2)
)
empty_end_task = EmptyOperator(task_id='empty-end-task')
waiting >> empty_end_task
If anyone understands the problem, I appreciate it.
I have already made some changes with execution_delta, but without success.
I am trying to understand the solution to my problem
Upvotes: 0
Views: 16