Rohan Nimje
Rohan Nimje

Reputation: 11

Airflow DAG error "command failed. The command returned a non-zero exit code in airflow"

I am new to airflow; There is an error while scheduling a talend job in airflow. The error is command failed. The command returned a non-zero exit code in airflow.

So please help me if anyone has any answer or related to it.

How to solve the following error

Here is my pyton code

Please find the below python code

import airflow
from datetime import datetime
from datetime import timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.models import DAG
from airflow.utils.email import send_email


# Declare the variables
airflow_home='/root/airflow/talendjobs'
job_name='first_job'
job_version='0.1'

# these args will get passed on to each operator
# you can override them on a per-task basis during operator initialization

default_args = {
    'owner': 'snowadmin',
    # 'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    # 'email': ['VALID_EMAIL_ID'],
    # 'email_on_failure': True,
    # 'email_on_success': True,
      'retries':1,

}


# Shell script location for the respective bash command
talend_job =  airflow_home+"/"+job_name+"_"+job_version+"/"+job_name+"/"+job_name+"_run.sh --context_param file_path=/root/airflow/data/Employee/Employee/ --context_param file_name=employees02.csv" 


# DAG for airflow task
dag = DAG(job_name, default_args=default_args,schedule_interval=None)

# t1, t2, t3, etc are tasks created by instantiating operators
    
t1 = BashOperator(
    task_id='load_file_to_snowflake',
    bash_command=talend_job,
    params={'job_name':job_name,'job_version':job_version},
    dag=dag)

Error Log

Here are the logs

*** Reading local file: /root/airflow/logs/first_job/load_file_to_snowflake/2022-06-20T06:54:19.095687+00:00/1.log
[2022-06-20 06:54:19,619] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: first_job.load_file_to_snowflake 2022-06-20T06:54:19.095687+00:00 [queued]>
[2022-06-20 06:54:19,648] {taskinstance.py:826} INFO - Dependencies all met for <TaskInstance: first_job.load_file_to_snowflake 2022-06-20T06:54:19.095687+00:00 [queued]>
[2022-06-20 06:54:19,648] {taskinstance.py:1017} INFO - 
--------------------------------------------------------------------------------
[2022-06-20 06:54:19,648] {taskinstance.py:1018} INFO - Starting attempt 1 of 2
[2022-06-20 06:54:19,648] {taskinstance.py:1019} INFO - 
--------------------------------------------------------------------------------
[2022-06-20 06:54:19,665] {taskinstance.py:1038} INFO - Executing <Task(BashOperator): load_file_to_snowflake> on 2022-06-20T06:54:19.095687+00:00
[2022-06-20 06:54:19,668] {standard_task_runner.py:51} INFO - Started process 603104 to run task
[2022-06-20 06:54:19,679] {standard_task_runner.py:75} INFO - Running: ['airflow', 'tasks', 'run', 'first_job', 'load_file_to_snowflake', '2022-06-20T06:54:19.095687+00:00', '--job-id', '33', '--pool', 'default_pool', '--raw', '--subdir', 'DAGS_FOLDER/Talend_job_with_context.py', '--cfg-path', '/tmp/tmpna_jxx6o']
[2022-06-20 06:54:19,680] {standard_task_runner.py:76} INFO - Job 33: Subtask load_file_to_snowflake
[2022-06-20 06:54:19,772] {logging_mixin.py:103} INFO - Running <TaskInstance: first_job.load_file_to_snowflake 2022-06-20T06:54:19.095687+00:00 [running]> on host osboxes
[2022-06-20 06:54:19,896] {taskinstance.py:1230} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=snowadmin
AIRFLOW_CTX_DAG_ID=first_job
AIRFLOW_CTX_TASK_ID=load_file_to_snowflake
AIRFLOW_CTX_EXECUTION_DATE=2022-06-20T06:54:19.095687+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-06-20T06:54:19.095687+00:00
[2022-06-20 06:54:19,897] {bash.py:135} INFO - Tmp dir root location: 
 /tmp
[2022-06-20 06:54:19,897] {bash.py:158} INFO - Running command: /root/airflow/talendjobs/first_job_0.1/first_job/first_job_run.sh --context_param file_path=/root/airflow/data/Employee/Employee/ --context_param file_name=employees02.csv
[2022-06-20 06:54:19,907] {bash.py:169} INFO - Output:
[2022-06-20 06:54:38,018] {bash.py:173} INFO - Exception in component tFileInputDelimited_1 (first_job)
[2022-06-20 06:54:38,219] {bash.py:173} INFO - java.io.FileNotFoundException: F:/Snowflake Courses/Snowflake Cloud Database ETL with python and talend/02 Before we start/Airflow/data/Employee/Employee/employees02.csv (No such file or directory)
[2022-06-20 06:54:38,219] {bash.py:173} INFO -  at java.io.FileInputStream.open0(Native Method)
[2022-06-20 06:54:38,219] {bash.py:173} INFO -  at java.io.FileInputStream.open(FileInputStream.java:195)
[2022-06-20 06:54:38,219] {bash.py:173} INFO -  at java.io.FileInputStream.<init>(FileInputStream.java:138)
[2022-06-20 06:54:38,219] {bash.py:173} INFO -  at java.io.FileInputStream.<init>(FileInputStream.java:93)
[2022-06-20 06:54:38,219] {bash.py:173} INFO -  at snowflake_project_1.first_job_0_1.first_job.tFileInputDelimited_1Process(first_job.java:921)
[2022-06-20 06:54:38,219] {bash.py:173} INFO -  at snowflake_project_1.first_job_0_1.first_job.runJobInTOS(first_job.java:1795)
[2022-06-20 06:54:38,219] {bash.py:173} INFO -  at snowflake_project_1.first_job_0_1.first_job.main(first_job.java:1640)
[2022-06-20 06:54:38,378] {bash.py:177} INFO - Command exited with return code 1
[2022-06-20 06:54:38,408] {taskinstance.py:1396} ERROR - Bash command failed. The command returned a non-zero exit code.
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1086, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1260, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/usr/local/lib/python3.8/dist-packages/airflow/models/taskinstance.py", line 1300, in _execute_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.8/dist-packages/airflow/operators/bash.py", line 180, in execute
    raise AirflowException('Bash command failed. The command returned a non-zero exit code.')
airflow.exceptions.AirflowException: Bash command failed. The command returned a non-zero exit code.
[2022-06-20 06:54:38,445] {taskinstance.py:1433} INFO - Marking task as UP_FOR_RETRY. dag_id=first_job, task_id=load_file_to_snowflake, execution_date=20220620T065419, start_date=20220620T065419, end_date=20220620T065438
[2022-06-20 06:54:38,523] {local_task_job.py:118} INFO - Task exited with return code 1

Upvotes: 1

Views: 9672

Answers (1)

Mobeen
Mobeen

Reputation: 96

If you see the error carefully the error is telling you whats wrong

F:/Snowflake Courses/Snowflake Cloud Database ETL with python and talend/02 Before we start/Airflow/data/Employee/Employee/employees02.csv (No such file or directory)

Bash cant find the directory path you mentioned. Your path is wrong first try to access the path via cmd or linux if that path work then paste it.

I dont know if there is employee 2 times written by mistake or this is actual path, either way whenever you are running bash command first try to run this command in shell or preferbly linux if these command work fine like open the desired location then paste and it will work.

Upvotes: 0

Related Questions