Reputation: 11
I am learning. So forgive me if I'm missing something obvious. I am trying to run a DAG but it is failing. This is the code up to the point of failure:
# import the libraries
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to write tasks!
from airflow.operators.bash_operator import BashOperator
# This makes scheduling easy
from airflow.utils.dates import days_ago
#defining DAG arguments
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'Anthony Guglielmi',
'start_date': days_ago(0),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# defining the DAG
# define the DAG
dag = DAG(
'ETL_toll_data',
default_args=default_args,
description='Apache Airflow Final Assignment',
schedule_interval=timedelta(days=1),
)
# define the tasks
# define the first task
unzip_data = BashOperator(
task_id='unzip_data',
bash_command='tar -xvzf tolldata.tgz',
dag=dag,
)
When it fails I get this log:
[2022-04-16 20:39:13,492] {taskinstance.py:1262} INFO - Executing
<Task(BashOperator): unzip_data> on 2022-04-16 20:39:10.718322+00:00
[2022-04-16 20:39:13,500] {standard_task_runner.py:52} INFO - Started process 973
to run task
[2022-04-16 20:39:13,508] {standard_task_runner.py:76} INFO - Running: ['***',
'tasks', 'run', 'ETL_toll_data', 'unzip_data', 'manual__2022-04-16T20:39:10.718322+00:00', '--job-id', '2', '--raw', '--subdir', 'DAGS_FOLDER/finalassignment/ETL_toll_data.py', '--cfg-path', '/tmp/tmptzf0s_qj', '--error-file', '/tmp/tmpsr4fkibb']
[2022-04-16 20:39:13,509] {standard_task_runner.py:77} INFO - Job 2: Subtask unzip_data
[2022-04-16 20:39:13,630] {logging_mixin.py:109} INFO - Running <TaskInstance: ETL_toll_data.unzip_data manual__2022-04-16T20:39:10.718322+00:00 [running]> on host 5e2785973b3f
[2022-04-16 20:39:13,804] {taskinstance.py:1429} INFO - Exporting the following env vars:
[email protected]
AIRFLOW_CTX_DAG_OWNER=Anthony Guglielmi
AIRFLOW_CTX_DAG_ID=ETL_toll_data
AIRFLOW_CTX_TASK_ID=unzip_data
AIRFLOW_CTX_EXECUTION_DATE=2022-04-16T20:39:10.718322+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-04-16T20:39:10.718322+00:00
[2022-04-16 20:39:13,807] {subprocess.py:62} INFO - Tmp dir root location:
/tmp
[2022-04-16 20:39:13,808] {subprocess.py:74} INFO - Running command: ['bash', '-c', 'tar -xvzf tolldata.tgz']
[2022-04-16 20:39:13,833] {subprocess.py:85} INFO - Output:
[2022-04-16 20:39:13,842] {subprocess.py:89} INFO - tar (child): tolldata.tgz: Cannot open: No such file or directory
[2022-04-16 20:39:13,842] {subprocess.py:89} INFO - tar (child): Error is not recoverable: exiting now
[2022-04-16 20:39:13,843] {subprocess.py:89} INFO - tar: Child returned status 2
[2022-04-16 20:39:13,843] {subprocess.py:89} INFO - tar: Error is not recoverable: exiting now
[2022-04-16 20:39:13,844] {subprocess.py:93} INFO - Command exited with return code 2
[2022-04-16 20:39:13,885] {taskinstance.py:1703} ERROR - Task failed with exception
I'm not quite sure why it is not recognizing the file. I have it downloaded in my /home/project/airflow/dags/finalassignment folder. I'm sure it's something simple. I am just a novice and really frustrated and can't seem to find any answers online.
Any help would be appreciated. Thanks
Upvotes: 1
Views: 7892
Reputation: 38922
Forward the cwd option to the BashOperator
to set the working directory you would like for the command to be executed in. If the option isn't passed, the command is executed in a temporary directory.
The cwd
option was added in Airflow 2.2
Since you have the archive downloaded in the dags folder, you can write:
unzip_data = BashOperator(
task_id='unzip_data',
bash_command='tar -xvzf tolldata.tgz',
dag=dag,
cwd=dag.folder
)
Upvotes: 3