Reputation: 471
I am pretty new to airflow and trying to run an ETL process for every 5 min. I have an airflow dag which I am trying to schedule to run for every 5 minutes but the dag fails with an error message ERROR-bash command failed, Permission Denied.
The dag is basically an ETL process with one BashOperator(which fails) and three PythonOperators which downstream process for BashOperator.
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from bin.int_medications import int_meds_auto_updt, storage, insert, del_stag, int_med_stag_clean
DAG_DEFAULT_ARGS = {
'owner':'airflow',
'depends_on_past':False,
'retires':1,
}
dag3 = DAG(dag_id = 'int_meds_dag_v1',
start_date=datetime(2019, 10, 10),
default_args = DAG_DEFAULT_ARGS,
schedule_interval = '*/5 * * * *',
catchup = False)
cmd_command = "/home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py"
data_loading = BashOperator(
task_id = "int_meds",
bash_command = cmd_command,
dag=dag3)
data_cleaning = PythonOperator(task_id = 'data_cleaning', python_callable = int_med_stag_clean.clean_stag)
data_insert = PythonOperator(task_id = 'data_insert', python_callable = insert.insert_stag)
data_delete = PythonOperator(task_id = 'data_delete', python_callable = del_stag.delete_stag)
data_loading >> data_cleaning >> data_insert >> data_delete
Attached is the code for the dag file and the error message is below.
*** Reading local file: /home/akash/airflow/logs/int_meds_dag_v1/int_meds/2019-10-10T14:45:00+00:00/1.log
[2019-10-10 10:50:26,649] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {__init__.py:1139} INFO - Dependencies all met for <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [queued]>
[2019-10-10 10:50:26,652] {__init__.py:1353} INFO -
--------------------------------------------------------------------------------
[2019-10-10 10:50:26,652] {__init__.py:1354} INFO - Starting attempt 1 of 1
[2019-10-10 10:50:26,652] {__init__.py:1355} INFO -
--------------------------------------------------------------------------------
[2019-10-10 10:50:26,659] {__init__.py:1374} INFO - Executing <Task(BashOperator): int_meds> on 2019-10-10T14:45:00+00:00
[2019-10-10 10:50:26,659] {base_task_runner.py:119} INFO - Running: ['airflow', 'run', 'int_meds_dag_v1', 'int_meds', '2019-10-10T14:45:00+00:00', '--job_id', '15495', '--raw', '-sd', 'DAGS_FOLDER/int_med_dag.py', '--cfg_path', '/tmp/tmpenegd6zi']
[2019-10-10 10:50:28,319] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,318] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-10-10 10:50:28,436] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:28,436] {__init__.py:305} INFO - Filling up the DagBag from /home/akash/airflow/dags/int_med_dag.py
[2019-10-10 10:50:29,739] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds [2019-10-10 10:50:29,739] {cli.py:517} INFO - Running <TaskInstance: int_meds_dag_v1.int_meds 2019-10-10T14:45:00+00:00 [running]> on host TRLPowerSpec.local
[2019-10-10 10:50:29,751] {bash_operator.py:81} INFO - Tmp dir root location:
/tmp
[2019-10-10 10:50:29,751] {bash_operator.py:90} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=int_meds_dag_v1
AIRFLOW_CTX_TASK_ID=int_meds
AIRFLOW_CTX_EXECUTION_DATE=2019-10-10T14:45:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2019-10-10T14:45:00+00:00
[2019-10-10 10:50:29,751] {bash_operator.py:104} INFO - Temporary script location: /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v
[2019-10-10 10:50:29,751] {bash_operator.py:114} INFO - Running command: /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py
[2019-10-10 10:50:29,756] {bash_operator.py:123} INFO - Output:
[2019-10-10 10:50:29,757] {bash_operator.py:127} INFO - /tmp/airflowtmp7a1q6w0c/int_medsykc0by4v: line 1: /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py: Permission denied
[2019-10-10 10:50:29,757] {bash_operator.py:131} INFO - Command exited with return code 126
[2019-10-10 10:50:29,760] {__init__.py:1580} ERROR - Bash command failed
Traceback (most recent call last):
File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
result = task_copy.execute(context=context)
File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 135, in execute
raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:29,761] {__init__.py:1611} INFO - Marking task as FAILED.
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds Traceback (most recent call last):
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/bin/airflow", line 32, in <module>
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds args.func(args)
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-10-10 10:50:29,768] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds return f(*args, **kwargs)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 523, in run
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds _run(args, dag, ti)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/bin/cli.py", line 442, in _run
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds pool=args.pool,
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/utils/db.py", line 73, in wrapper
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds return func(*args, **kwargs)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/models/__init__.py", line 1441, in _run_raw_task
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds result = task_copy.execute(context=context)
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds File "/home/akash/miniconda3/lib/python3.7/site-packages/airflow/operators/bash_operator.py", line 135, in execute
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds raise AirflowException("Bash command failed")
[2019-10-10 10:50:29,769] {base_task_runner.py:101} INFO - Job 15495: Subtask int_meds airflow.exceptions.AirflowException: Bash command failed
[2019-10-10 10:50:31,649] {logging_mixin.py:95} INFO - [2019-10-10 10:50:31,649] {jobs.py:2562} INFO - Task exited with return code 1
I also tried giving permissions to the python file using
sudo chmod -R -f 777 /path/to/file
but still, it throws the same error in airflow.
I'd really appreciate it if I can know what the mistake is and I can rectify it.
Upvotes: 2
Views: 7042
Reputation: 1570
Bash Operator expects either a bash file in bash_command
argument(in that case file extension should be .sh
) or a Bash command. Try replacing cmd_command
with:
cmd_command = "python /home/akash/airflow/dags/bin/int_medications/int_meds_auto_updt.py"
Alternatively, you could use PythonOperator instead and run code from int_meds_auto_updt.py
Upvotes: 1