Reputation: 51
I am running Airflow in a docker container on my local machine. I'm running a test DAG doing 3 tasks. The three tasks run fine, however, the last task with the bash operator is stuck in a loop as seen in the picture in the bottom. Looking in the log file, an entry is only generated for the first execution of the bash python script, then nothing, but the python file keeps getting executed. Any suggestions as to what could be the issue?
Thanks,
Richard
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
def creating_dataframe(ti):
import pandas as pd
import os
loc = r'/opt/airflow/dags/'
filename = r'demo.csv'
df_location = loc + filename
ti.xcom_push(key='df_location', value=df_location)
if os.path.exists(loc + filename):
print("if exists")
return df_location
else:
df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['First entry']},
index = [pd.Timestamp.now()])
df.to_csv(loc + filename, sep=';')
print("does not exist")
return df_location
def adding_row_to_dataframe(ti):
import pandas as pd
fetched_location = ti.xcom_pull(key='df_location', task_ids=['creating_dataframe'])[0]
df = pd.read_csv(fetched_location,index_col=0,sep=';')
new_df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['adding entry to demo file']},
index = [pd.Timestamp.now()])
df2 = pd.concat([df,new_df])
df2.to_csv(fetched_location,sep=";")
print("second function")
with DAG(
dag_id="richards_airflow_demo",
schedule_interval="@once",
start_date=datetime(2022, 2, 17 ),
catchup=False,
tags=["this is a demo of airflow","adding row"],
) as dag:
task1 = PythonOperator(
task_id="creating_dataframe",
python_callable=creating_dataframe,
do_xcom_push=True
)
task2 = PythonOperator(
task_id='adding_row_to_dataframe',
python_callable=adding_row_to_dataframe
)
task3 = BashOperator(
task_id='python_bash_script',
bash_command=r"echo 'python /opt/scripts/test.py'"
)
task1 >> task2 >> task3
Bash python script:
import pandas as pd
df = pd.read_csv('/opt/airflow/dags/demo.csv',index_col=0,sep=';')
new_df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['adding entry with bash python script']},
index = [pd.Timestamp.now()])
df2 = pd.concat([df,new_df])
df2.to_csv('/opt/airflow/dags/demo.csv',sep=';')
Example of issue Log file for bashoperator
Upvotes: 3
Views: 1045
Reputation: 51
All right, didn't research as to why this is the case, but it seems like if I create a scripts folder inside the dags folder, the python script inside (test_dontputthescripthere.py) is executed even if the bashoperator isn't telling it to execute. As you can see, the bashoperator is executing the test.py file perfectly, and adds the following line to the csv:
2022-02-21 15:11:53.923284;adding entry with bash python script
The test_dontputthescripthere.py is executed in a loop, and without the bashoperator executing the file. This is all the "- and this is wrong" entries in the demo.csv file.
I suspect some kind of refresh is going on inside airflow, forcing it to execute the python file.
Upvotes: 2