Reputation: 1
I am developing an operator which inherits from the KubernetesPodOperator (KPO) to execute a bash script. I have built-in error handling to deal with the many possible causes of failure for this bash script.
I am trying to create test DAGs which prove that this error handling works. This DAG would contain a subsequent task which asserts that the exit code from the KPO based task is equal to the expected exit code value. To do this I have been attempting to use XCOM to push the exit code from the KPO based operator task and pull this exit code in the subsequent assertion task.
I have managed to get this working in the case of a successful run of the KPO based task, but have not yet managed this when the task initially fails. I have seen in the documentation on the KPO that "XCOMs will be pushed only for tasks marked as State.SUCCESS." and have been trying to find a workaround.
I have tried to mark the failed task as a success as shown here. While this does change the task state to Success, the exit code is still not present as an Xcom value, and I have thus not been able to pull the exit code via xcom in a subsequent task.
Upvotes: 0
Views: 66
Reputation: 5314
The KubernetesPodOperator accepts a callbacks param, see here in the docs: KubernetesPodOperator — apache-airflow-providers-cncf-kubernetes Documentation
So, you could create something like this, ensuring this method is called after completion of the Pod but before its deletion.
class MyKpoCallback(KubernetesPodOperatorCallback):
@staticmethod
def on_pod_completion(*, pod: k8s.V1Pod, client: client_type, mode: str, **kwargs) -> None:
context = get_current_context()
exit_code = pod.status.container_statuses[0].state.terminated.exit_code
context["ti"].xcom_push(key="exit_code", value=exit_code)
And pass this class (not an instance!) to the KubernetesPodOperator. Consider the above psuedo code; this was not tested and based on a high level review of the k8s client library.
Upvotes: 0