Reputation: 23
I have an Airflow operator that kicks of a job on a 3rd party service then monitors the progress of that job. In code, the execute looks like
def execute(self, context):
external_id = start_external_job()
wait_until_external_job_completes(external_id)
If the Airflow worker is restarted (usually due to a code deploy) when an instance of this task is running, I'd like for the restarted instance of that task to be able pick up where the previous one left off (monitoring the job on the 3rd party service). Is there a way to share that 3rd party job ID across subsequent runs of the same task instance?
An example of the enhance execute method would look like:
def execute(self, context):
external_id = load_external_id_for_task_instance()
if external_id is None:
external_id = start_external_job(args)
persist_external_id_for_task_instance(external_id)
wait_until_external_job_completes(external_id)
And I need to implement load_external_id_for_task_instance
and persist_external_id_for_task_instance
.
Upvotes: 1
Views: 592
Reputation: 6548
I would suggest splitting this into two tasks with the use of XComs and Sensors.
You can have one operator that submits the job and saves the id to an XCom:
class SubmitJobOperator(BaseOperator):
def execute(self, context):
external_id = start_external_job()
return external_id # return value will be stored in XCom
Then a sensor that fetches the id from XCom and polls until completion:
class JobCompleteSensor(BaseSensor):
@apply_defaults
def __init__(self, submit_task_id, *args, **kwargs):
self.submit_task_id = submit_task_id # so we know where to fetch XCom value from
super(JobCompleteSensor, self).__init__(*args, **kwargs)
def poke(self, context):
external_id = context['task_instance'].xcom_pull(task_ids=self.submit_task_id)
return check_if_external_job_is_complete(external_id):
So your DAG would look something like this:
submit_job = SubmitJobOperator(
dag=dag,
task_id='submit_job',
)
wait_for_job_to_complete = JobCompleteSensor(
dag=dag,
task_id='wait_for_job_to_complete',
submit_task_id=submit_job.task_id,
)
submit_job >> wait_for_job_to_complete
XComs are persisted in database so the sensor will always be able to find the previously submitted external_id
.
Upvotes: 2