LuckyLuke
LuckyLuke

Reputation: 31

Airflow ExternalTaskSensor manually triggered

I have created two dags which I want to trigger manually schedule_interval=None.

First, I trigger "External_test_A" which should be pending till the "External_test_B" will not be triggered. After a while (after 2 minutes) I trigger "External_test_B" DAG, which runs only one task first_task.

When the task: first_task is success, then Poking for External_test_B.first_task from "External_test_A", should finished, return success and run second_task task from "External_test_A".

I stuck in the situation where even if first_task is a success, the Poking for ... from "External_test_A" is still going.

External_test_A:

default_args = {
    'owner': 'airflow',
    'start_date': pendulum.yesterday().astimezone('US/Eastern')
}

dag = DAG(
    dag_id='External_test_A',
    default_args=default_args,
    schedule_interval=None
)

def do_second_task():
    print('Second task is done')

sensor = ExternalTaskSensor(
    task_id='wait_for_the_first_task_to_be_completed',
    external_dag_id='External_test_B',
    external_task_id='first_task',
    execution_delta=timedelta(minutes=3),
    dag=dag)

t2 = PythonOperator(
    task_id='second_task',
    python_callable=do_second_task,
    dag=dag)

sensor >> t2

if __name__ == "__main__":
    dag.cli()

External_test_B:

default_args = {
    'owner': 'airflow',
    'start_date': pendulum.yesterday().astimezone('US/Eastern')
}

dag = DAG(
    dag_id='External_test_B',
    default_args=default_args,
    schedule_interval=None
)

t1 = DummyOperator(task_id='first_task', dag=dag)

t1

if __name__ == "__main__":
    dag.cli()

Does some of you could tell me what I'm doing wrong? How to solve the problem with communication between two tasks from two different DAGs using only manual trigger?

Upvotes: 3

Views: 2966

Answers (3)

Thomas
Thomas

Reputation: 613

You can achieve this by creating a third DAG, the triggerer, which will launch the two DAGs with the right execution dates (which is how Airflow knows the task of which DAG Run to listen to in the ExternalTaskSensor).

@dag(
dag_id="trigger_AB",
start_date=datetime(2023, 2, 9),
catchup=False,
schedule=None
)
def trigger():
    EXEC_DATE = '{{ ts }}'
    triggerA = TriggerDagRunOperator(
        task_id="trigger_A",
        trigger_dag_id="External_test_A",
        execution_date=EXEC_DATE
    )

    triggerB = TriggerDagRunOperator(
        task_id="trigger_B",
        trigger_dag_id="External_test_B",
        execution_date=EXEC_DATE
    )

my_dag = trigger()

Now if you trigger this DAG, it will launch both DAGs A and B and the DAG A will wait for the task in DAG B to finish before continuing, as expected.

Upvotes: 0

Johann Bzaih
Johann Bzaih

Reputation: 1

There is no solution to your problem since both of your dags are manually triggered. I suggest you try to do it in one DAG...

Upvotes: 0

Sagar
Sagar

Reputation: 21

After a lot of research there is no way to do this with manual trigger unless you write your own class to do the job. Both dags have to have a schedule interval with same start date and you need to mention execution_delta in external sensor definition.

Upvotes: 2

Related Questions