NAB0815
NAB0815

Reputation: 471

Airflow dag bash operator permission denied

I am pretty new to airflow and trying to run an ETL process for every 5 min. I have an airflow dag which I am trying to schedule to run for every 5 minutes but the dag fails with an error message ERROR-bash command failed, Permission Denied.

The dag is basically an ETL process with one BashOperator(which fails) and three PythonOperators which downstream process for BashOperator.

from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator 
from airflow.contrib.sensors.file_sensor import FileSensor

from bin.int_medications import int_meds_auto_updt, storage, insert, del_stag, int_med_stag_clean


DAG_DEFAULT_ARGS = {
'owner':'airflow',
'depends_on_past':False,
'retires':1,
}


dag3 = DAG(dag_id = 'int_meds_dag_v1', 
           start_date=datetime(2019, 10, 10), 
           default_args = DAG_DEFAULT_ARGS,
           schedule_interval = '*/5 * * * *',
           catchup = False)


cmd_command = "/home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py"
data_loading = BashOperator(
         task_id = "int_meds",
         bash_command = cmd_command,
         dag=dag3)


data_cleaning = PythonOperator(task_id = 'data_cleaning', python_callable = int_med_stag_clean.clean_stag)
data_insert = PythonOperator(task_id = 'data_insert', python_callable = insert.insert_stag)
data_delete = PythonOperator(task_id = 'data_delete', python_callable = del_stag.delete_stag)

data_loading >> data_cleaning >> data_insert >> data_delete

Attached is the code for the dag file and the error message is below.

*** Reading local file: /home/akash/airflow/logs/int_meds_dag_v1/int_meds/2019-10-10T14:45:00+00:00/1.log
[2019-10-10 10:50:26,649] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {__init__.py:1353} INFO - 
--------------------------------------------------------------------------------
[2019-10-10 10:50:26,652] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-10-10 10:50:26,652] {__init__.py:1355} INFO - 
--------------------------------------------------------------------------------
[2019-10-10 10:50:26,659] {__init__.py:1374} INFO - Executing <Task(BashOperator): int_meds> on 2019-10-10T14:45:00+00:00
[2019-10-10 10:50:26,659] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'int_meds_dag_v1', 'int_meds', '2019-10-10T14:45:00+00:00', '--job_id', '15495', '--raw', '-sd', 'DAGS_FOLDER/int_med_dag.py', '--cfg_path', '/tmp/tmpenegd6zi']
[2019-10-10 10:50:28,319] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,318] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-10-10 10:50:28,436] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,436] {__init__.py:305} INFO - Filling up the DagBag from /home/akash/airflow/dags/int_med_dag.py
[2019-10-10 10:50:29,739] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:29,739] {cli.py:517} INFO - Running <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [running]> on host TRLPowerSpec.local
[2019-10-10 10:50:29,751] {bash_operator.py:81} INFO - Tmp dir root location: 
 /tmp
[2019-10-10 10:50:29,751] {bash_operator.py:90} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=int_meds_dag_v1
AIRFLOW_CTX_TASK_ID=int_meds
AIRFLOW_CTX_EXECUTION_DATE=2019-10-10T14:45:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2019-10-10T14:45:00+00:00
[2019-10-10 10:50:29,751] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v
[2019-10-10 10:50:29,751] {bash_operator.py:114} INFO - Running command: /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py
[2019-10-10 10:50:29,756] {bash_operator.py:123} INFO - Output:
[2019-10-10 10:50:29,757] {bash_operator.py:127} INFO - /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v: line 1: /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py: Permission denied
[2019-10-10 10:50:29,757] {bash_operator.py:131} INFO - Command exited with return code 126
[2019-10-10 10:50:29,760] {__init__.py:1580} ERROR - Bash command failed
Traceback (most recent call last):
  File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 135, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:29,761] {__init__.py:1611} INFO - Marking task as FAILED.
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds Traceback (most recent call last):
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/bin/airflow", line 32, in <module>
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     args.func(args)
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     return f(*args, **kwargs)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 523, in run
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     _run(args, dag, ti)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 442, in _run
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     pool=args.pool,
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     return func(*args, **kwargs)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     result = task_copy.execute(context=context)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds   File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 135, in execute
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds     raise AirflowException("Bash command failed")
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:31,649] {logging_mixin.py:95} INFO - [2019-10-10 10:50:31,649] {jobs.py:2562} INFO - Task exited with return code 1

I also tried giving permissions to the python file using

sudo chmod -R -f 777 /path/to/file

but still, it throws the same error in airflow.

I'd really appreciate it if I can know what the mistake is and I can rectify it.

Upvotes: 2

Views: 7042

Answers (1)

Artem Vovsia
Artem Vovsia

Reputation: 1570

Bash Operator expects either a bash file in bash_command argument(in that case file extension should be .sh) or a Bash command. Try replacing cmd_command with:

cmd_command = "python /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py"

Alternatively, you could use PythonOperator instead and run code from int_meds_auto_updt.py

Upvotes: 1

Related Questions