vijayjain024
vijayjain024

Reputation: 43

Create an Airflow ExternalTaskSensor for a specific run of an external Task that runs multiple times in a day

I am trying to create an External Sensor (in DAG B) on a task in a different DAG (let's call this as DAG A) which runs at following intervals: 'schedule_interval': '0 4,6,8,10,12,14,16,18,20,22 * * *'. DAG B is scheduled to run at 2AM daily. I want to create a Sensor Task in DAG B that checks whether the 4AM run for the external task in DAG A has succeeded

I cannot reschedule my DAG B to run at 4 since there are other tasks in DAG B which needs to run at 2. I have tried changing the window_size and window_offset parameters but it does not work.

The ExternalTaskSensor methods have been overwritten as follows

from airflow.models import TaskInstance, DagRun

def return_start_end_time(self, context):
  execution_date = context.get('next_execution_date')
  return (execution_date - self.window_offset - self.window_size,
                execution_date - self.window_offset)

def poke(self, context):
  start_date, end_date = self.return_start_end_time(context)
  expected_executions = date_range(start_date, end_date,
                                         delta=self.dep_dag_schedule)
  TI = TaskInstance
  DR = DagRun

  executions = (
            session.query(TI.dag_id, TI.task_id, TI.execution_date, 
  TI.state)
                .join(DR, and_(DR.dag_id == TI.dag_id, 
  DR.execution_date == TI.execution_date))
                .filter(TI.dag_id == self.external_dag_id,
                        TI.task_id == self.external_task_id,
                        TI.execution_date.in_(expected_executions),
                        DR.run_id.startswith('scheduled__'))
                .order_by(TI.execution_date.desc()).all()
   )

The code for Task Sensor is as follows:

wait_task = CustomTaskSensor(
            task_id=wait_task,
            poke_interval=60,
            dag=dag,
            external_dag_id=DAGA,
            external_task_id=TaskA,
            window_size=timedelta(days=0, hours=5),
            window_offset=timedelta(days=0,hours=-5),
            execution_timeout=timedelta(hours=5),
            success_fn=MOST_RECENT_SUCCESS
        )

Upvotes: 4

Views: 7279

Answers (2)

vijayjain024
vijayjain024

Reputation: 43

We had our own custom ExternalTaskSensor class which did not have the execution_date_fn and execution_delta params. Instead we had parameters like window_size and window_offset. The way the methods were implemented, the execution date of Task Sensor was same as the actual execution date (which is not the default). That means if the DAG containing the TaskSensor triggered at 9/17 2 AM, the execution date of the sensor was set to 9/17 2 AM. However the execution date of the external task was set to previous execution date (which is the default Lakitu behaviour) i.e. if the external task runs at 9/17 4 AM then the execution date is set to 9/16 10 PM (which is the previous execution date). I had to define my window_size and window_offset parameters such that the execution date of the external task falls in the window calculated using the return_start_end_time function (where execution date refers to the execution date of the TaskSensor).

Upvotes: 0

y2k-shubham
y2k-shubham

Reputation: 11627

:param execution_date_fn: function that receives the current execution date
    and returns the desired execution dates to query. Either execution_delta
    or execution_date_fn can be passed to ExternalTaskSensor, but not both. 
:type execution_date_fn: callable
  • This function will be supplied the current execution_date as argument and is supposed to return a single execution_date of a list of those, who's execution should be 'sensed'
..
elif self.execution_date_fn:
    dttm = self.execution_date_fn(context['execution_date'])
..

dttm_filter = dttm if isinstance(dttm, list) else [dttm]
serialized_dttm_filter = ','.join(
    [datetime.isoformat() for datetime in dttm_filter])

Upvotes: 2

Related Questions