fjson01
fjson01

Reputation: 81

In Apache Airflow, is there a way to capture the error of a bash command?

In Apache Airflow, is it possible to capture the original error message that the failed bash command produced instead of a traceback error produced by Apache Airflow that tells you that the line failed but not exactly why it failed?

Example line in Dag:

gsutil_rsync = BashOperator(
        task_id="task1",
        bash_command='gsutil rsync -r s3://bucket/ gs://bucket',
        dag=dag)

Upvotes: 3

Views: 2521

Answers (1)

alikemalocalan
alikemalocalan

Reputation: 53

I writed this solution with python function and PythonOperator and set xcom_push=True in PythonOperator.

import subprocess
from datetime import datetime

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 15),
    'email': '[email protected]',
    'email_on_failure': False,
    'retries': 1,
    'retry_delay': 1,
}


def run_bash():
    result = subprocess.run(['ls', '-l'], stdout=subprocess.PIPE)
    return result.stdout

run_bash()

with DAG('bash_dag', schedule_interval="@daily", default_args=default_args) as dag:
    start_brach = DummyOperator(task_id='start')

    gsutil_rsync_py = PythonOperator(
        task_id="task1",
        python_callable=run_bash,
        xcom_push=True,
        dag=dag)


    start_brach.set_downstream(gsutil_rsync_py)

And result this;

enter image description here

Upvotes: 2

Related Questions