murthaA
murthaA

Reputation: 381

Airflow, how do i run .py files via python name.py using BashOperator

I'm running Airflow using Celery and Redis it's working just fine but I'm having problems in the worker side. I have two docker-compose files, one is the master running in the server and a worker that runs in other pcs.

I have dag that run python script.py but always fails because it cannot find the script. It seems like Airflow Base task runner just copies the dag file to tmp folder.

my dags folder is like:
    dags/ 
        test_dag.py
        test.py

test_dag.py

from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
        'owner': 'airflow',
        'start_date': datetime(2018, 9, 3)
        }
dag = DAG('test', default_args=default_args, schedule_interval='*/20 * * * *', catchup=False)

curl = BashOperator(
    task_id='testingbash',
    bash_command="python test.py",
    dag=dag)

test.py

print('it worked')

Error:

[2018-09-06 22:30:34,832] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash [2018-09-06 22:30:34,832] {cli.py:492} INFO - Running <TaskInstance: test.testingbash 2018-09-06T22:00:00+00:00 [running]> on host e91df5a905de
[2018-09-06 22:30:39,066] {bash_operator.py:74} INFO - Tmp dir root location: 
 /tmp
[2018-09-06 22:30:39,067] {bash_operator.py:87} INFO - Temporary script location: /tmp/airflowtmprgxtunoa/testingbash_7opnm28
[2018-09-06 22:30:39,067] {bash_operator.py:97} INFO - Running command: python test.py
[2018-09-06 22:30:39,079] {bash_operator.py:106} INFO - Output:
[2018-09-06 22:30:39,164] {bash_operator.py:110} INFO - python: can't open file 'test.py': [Errno 2] No such file or directory
[2018-09-06 22:30:39,165] {bash_operator.py:114} INFO - Command exited with return code 2
[2018-09-06 22:30:40,400] {models.py:1736} ERROR - Bash command failed
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1633, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 118, in execute
    raise AirflowException("Bash command failed")
airflow.exceptions.AirflowException: Bash command failed
[2018-09-06 22:30:40,408] {models.py:1764} INFO - Marking task as FAILED.
[2018-09-06 22:30:42,132] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash Traceback (most recent call last):
[2018-09-06 22:30:42,132] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash   File "/usr/local/bin/airflow", line 32, in <module>
[2018-09-06 22:30:42,132] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash     args.func(args)
[2018-09-06 22:30:42,133] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash   File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2018-09-06 22:30:42,133] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash     return f(*args, **kwargs)
[2018-09-06 22:30:42,133] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 498, in run
[2018-09-06 22:30:42,133] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash     _run(args, dag, ti)
[2018-09-06 22:30:42,133] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 402, in _run
[2018-09-06 22:30:42,134] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash     pool=args.pool,
[2018-09-06 22:30:42,134] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
[2018-09-06 22:30:42,134] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash     return func(*args, **kwargs)
[2018-09-06 22:30:42,134] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1633, in _run_raw_task
[2018-09-06 22:30:42,134] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash     result = task_copy.execute(context=context)
[2018-09-06 22:30:42,134] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash   File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 118, in execute
[2018-09-06 22:30:42,134] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash     raise AirflowException("Bash command failed")
[2018-09-06 22:30:42,135] {base_task_runner.py:107} INFO - Job 4: Subtask testingbash airflow.exceptions.AirflowException: Bash command failed
[2018-09-06 22:30:46,945] {logging_mixin.py:95} INFO - [2018-09-06 22:30:46,944] {jobs.py:2612} INFO - Task exited with return code 1

Solution:

Fixed a few days ago, i was not considering the structure my docker was building and now i ran using python ~/scripts/test.py and it worked fine.

Upvotes: 7

Views: 8871

Answers (2)

murthaA
murthaA

Reputation: 381

Fixed a few days ago, i was not considering the temporary structure my docker was building and now i ran using python ~/scripts/test.py and it worked fine.

Upvotes: 0

Viraj Parekh
Viraj Parekh

Reputation: 1381

If you just want to run a python script, it might be easier to use the PythonOperator.

If you are set on using the BashOperator, you'll just need to include the absolute file path to the file - by default, it creates and looks in a tmp directory.

Upvotes: 3

Related Questions