Richard
Richard

Reputation: 51

Apache Airflow stuck in a loop executing last task (bash operator executing a python script)

I am running Airflow in a docker container on my local machine. I'm running a test DAG doing 3 tasks. The three tasks run fine, however, the last task with the bash operator is stuck in a loop as seen in the picture in the bottom. Looking in the log file, an entry is only generated for the first execution of the bash python script, then nothing, but the python file keeps getting executed. Any suggestions as to what could be the issue?

Thanks,

Richard

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

def creating_dataframe(ti):
    import pandas as pd
    import os

    loc = r'/opt/airflow/dags/'
    filename = r'demo.csv'
    df_location = loc + filename
    ti.xcom_push(key='df_location', value=df_location)

    if os.path.exists(loc + filename):
        print("if exists")
        return df_location
    
    else:
        df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['First entry']},
                      index = [pd.Timestamp.now()])
        df.to_csv(loc + filename, sep=';')
        print("does not exist")
    
    return df_location


def adding_row_to_dataframe(ti):
    import pandas as pd
    fetched_location = ti.xcom_pull(key='df_location', task_ids=['creating_dataframe'])[0]


    df = pd.read_csv(fetched_location,index_col=0,sep=';')   
    new_df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['adding entry to demo file']},
                      index = [pd.Timestamp.now()])
    df2 = pd.concat([df,new_df])
    df2.to_csv(fetched_location,sep=";")
    print("second function")

with DAG(
    dag_id="richards_airflow_demo",
    schedule_interval="@once",
    start_date=datetime(2022, 2, 17 ),
    catchup=False,
    tags=["this is a demo of airflow","adding row"],
) as dag:

    task1 = PythonOperator(
        task_id="creating_dataframe",
        python_callable=creating_dataframe,
        do_xcom_push=True
    )


    task2 = PythonOperator(
        task_id='adding_row_to_dataframe',
        python_callable=adding_row_to_dataframe


    )

    task3 = BashOperator(
        task_id='python_bash_script',
    bash_command=r"echo 'python /opt/scripts/test.py'"
    )
    

    task1 >> task2 >> task3

Bash python script:

import pandas as pd

df = pd.read_csv('/opt/airflow/dags/demo.csv',index_col=0,sep=';')   
new_df = pd.DataFrame({'GIA_AIRFLOW_DEMO': ['adding entry with bash python script']},
                      index = [pd.Timestamp.now()])
df2 = pd.concat([df,new_df])

df2.to_csv('/opt/airflow/dags/demo.csv',sep=';')

Example of issue Log file for bashoperator

Upvotes: 3

Views: 1045

Answers (1)

Richard
Richard

Reputation: 51

All right, didn't research as to why this is the case, but it seems like if I create a scripts folder inside the dags folder, the python script inside (test_dontputthescripthere.py) is executed even if the bashoperator isn't telling it to execute. As you can see, the bashoperator is executing the test.py file perfectly, and adds the following line to the csv:

2022-02-21 15:11:53.923284;adding entry with bash python script

The test_dontputthescripthere.py is executed in a loop, and without the bashoperator executing the file. This is all the "- and this is wrong" entries in the demo.csv file.

I suspect some kind of refresh is going on inside airflow, forcing it to execute the python file.

Solution

Upvotes: 2

Related Questions