jack
jack

Reputation: 871

Converting Python file to Airflow DAG

I have this Python file:

class Get:

    def __init__(self, i):
        self.i = get_date(i)
        self.df = self.get_file()

    def get_file(self):
        try:
            ...
            return df
        except Exception as e:
            return ...

    def get_date(self,i):
        dt = datetime.now() - timedelta(days=i)
        return dt.strftime("%Y-%m-%d")

    def put(self,df):
        ....


class Fix:
    def __init__(self,df):
        ....

if __name__ == '__main__':
    for i in range(4, 0, -1):
        get = Get(i)
        fix = Fix(get.df)
        get.put(fix.df)

Basically this code generates 4 last dates and run the functions over these dates (update statistics etc...)

At first I wanted to convert each function into a PythonOperator and then schedule it but I don't think this will work. I don't know how to convert the Classes and the parameters that are transferred between them.

This is what the code does if I run it on 2018-Jun-12 and below what it should be with Airflow: enter image description here

Is there a template that I can use or any suggestion how to do it?

Upvotes: 1

Views: 3203

Answers (1)

David Lexa
David Lexa

Reputation: 199

you can either execute your script using BashOperator without any changes of your script:

dag = DAG('{NAME_OF_THE_DAG}', schedule_interval='daily', 
default_args=default_args)

t1 = BashOperator(
    task_id = '{NAME_OF_TASK}',
    dag = dag,
    bash_command = python {NAME_OF_THE_FILE_TO_EXECUTE}.py')

or use PythonOperator:

  1. update your code to create main function in your script:

    def main():
        for i in range(4, 0, -1):
        get = Get(i)
        fix = Fix(get.df)
        get.put(fix.df)
    
  2. define and execute the dag:

    dag = DAG('{NAME_OF_THE_TASK}', schedule_interval = 'daily', 
    default_args=default_args)
    
    t1 = PythonOperator(
        task_id = '{NAME_OF_TASK}',
        dag = dag,
        python_callable = main)
    

Upvotes: 6

Related Questions