Reputation: 43
I am trying to create an External Sensor (in DAG B) on a task in a different DAG (let's call this as DAG A) which runs at following intervals: 'schedule_interval': '0 4,6,8,10,12,14,16,18,20,22 * * *'. DAG B is scheduled to run at 2AM daily. I want to create a Sensor Task in DAG B that checks whether the 4AM run for the external task in DAG A has succeeded
I cannot reschedule my DAG B to run at 4 since there are other tasks in DAG B which needs to run at 2. I have tried changing the window_size and window_offset parameters but it does not work.
The ExternalTaskSensor methods have been overwritten as follows
from airflow.models import TaskInstance, DagRun
def return_start_end_time(self, context):
execution_date = context.get('next_execution_date')
return (execution_date - self.window_offset - self.window_size,
execution_date - self.window_offset)
def poke(self, context):
start_date, end_date = self.return_start_end_time(context)
expected_executions = date_range(start_date, end_date,
delta=self.dep_dag_schedule)
TI = TaskInstance
DR = DagRun
executions = (
session.query(TI.dag_id, TI.task_id, TI.execution_date,
TI.state)
.join(DR, and_(DR.dag_id == TI.dag_id,
DR.execution_date == TI.execution_date))
.filter(TI.dag_id == self.external_dag_id,
TI.task_id == self.external_task_id,
TI.execution_date.in_(expected_executions),
DR.run_id.startswith('scheduled__'))
.order_by(TI.execution_date.desc()).all()
)
The code for Task Sensor is as follows:
wait_task = CustomTaskSensor(
task_id=wait_task,
poke_interval=60,
dag=dag,
external_dag_id=DAGA,
external_task_id=TaskA,
window_size=timedelta(days=0, hours=5),
window_offset=timedelta(days=0,hours=-5),
execution_timeout=timedelta(hours=5),
success_fn=MOST_RECENT_SUCCESS
)
Upvotes: 4
Views: 7279
Reputation: 43
We had our own custom ExternalTaskSensor class which did not have the execution_date_fn and execution_delta params. Instead we had parameters like window_size and window_offset. The way the methods were implemented, the execution date of Task Sensor was same as the actual execution date (which is not the default). That means if the DAG containing the TaskSensor triggered at 9/17 2 AM, the execution date of the sensor was set to 9/17 2 AM. However the execution date of the external task was set to previous execution date (which is the default Lakitu behaviour) i.e. if the external task runs at 9/17 4 AM then the execution date is set to 9/16 10 PM (which is the previous execution date). I had to define my window_size and window_offset parameters such that the execution date of the external task falls in the window calculated using the return_start_end_time function (where execution date refers to the execution date of the TaskSensor).
Upvotes: 0
Reputation: 11627
execution_date_fn
param of ExternalTaskSensor
to achieve this:param execution_date_fn: function that receives the current execution date and returns the desired execution dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both. :type execution_date_fn: callable
execution_date
as argument and is supposed to return a single execution_date
of a list of those, who's execution should be 'sensed'.. elif self.execution_date_fn: dttm = self.execution_date_fn(context['execution_date']) .. dttm_filter = dttm if isinstance(dttm, list) else [dttm] serialized_dttm_filter = ','.join( [datetime.isoformat() for datetime in dttm_filter])
Upvotes: 2