Reputation: 81
In Apache Airflow, is it possible to capture the original error message that the failed bash command produced instead of a traceback error produced by Apache Airflow that tells you that the line failed but not exactly why it failed?
Example line in Dag:
gsutil_rsync = BashOperator(
task_id="task1",
bash_command='gsutil rsync -r s3://bucket/ gs://bucket',
dag=dag)
Upvotes: 3
Views: 2521
Reputation: 53
I writed this solution with python function and PythonOperator
and set xcom_push=True
in PythonOperator
.
import subprocess
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 10, 15),
'email': '[email protected]',
'email_on_failure': False,
'retries': 1,
'retry_delay': 1,
}
def run_bash():
result = subprocess.run(['ls', '-l'], stdout=subprocess.PIPE)
return result.stdout
run_bash()
with DAG('bash_dag', schedule_interval="@daily", default_args=default_args) as dag:
start_brach = DummyOperator(task_id='start')
gsutil_rsync_py = PythonOperator(
task_id="task1",
python_callable=run_bash,
xcom_push=True,
dag=dag)
start_brach.set_downstream(gsutil_rsync_py)
And result this;
Upvotes: 2