Reputation: 871
I have this Python file:
class Get:
def __init__(self, i):
self.i = get_date(i)
self.df = self.get_file()
def get_file(self):
try:
...
return df
except Exception as e:
return ...
def get_date(self,i):
dt = datetime.now() - timedelta(days=i)
return dt.strftime("%Y-%m-%d")
def put(self,df):
....
class Fix:
def __init__(self,df):
....
if __name__ == '__main__':
for i in range(4, 0, -1):
get = Get(i)
fix = Fix(get.df)
get.put(fix.df)
Basically this code generates 4 last dates and run the functions over these dates (update statistics etc...)
At first I wanted to convert each function into a PythonOperator and then schedule it but I don't think this will work. I don't know how to convert the Classes and the parameters that are transferred between them.
This is what the code does if I run it on 2018-Jun-12 and below what it should be with Airflow:
Is there a template that I can use or any suggestion how to do it?
Upvotes: 1
Views: 3203
Reputation: 199
you can either execute your script using BashOperator without any changes of your script:
dag = DAG('{NAME_OF_THE_DAG}', schedule_interval='daily',
default_args=default_args)
t1 = BashOperator(
task_id = '{NAME_OF_TASK}',
dag = dag,
bash_command = python {NAME_OF_THE_FILE_TO_EXECUTE}.py')
or use PythonOperator:
update your code to create main function in your script:
def main():
for i in range(4, 0, -1):
get = Get(i)
fix = Fix(get.df)
get.put(fix.df)
define and execute the dag:
dag = DAG('{NAME_OF_THE_TASK}', schedule_interval = 'daily',
default_args=default_args)
t1 = PythonOperator(
task_id = '{NAME_OF_TASK}',
dag = dag,
python_callable = main)
Upvotes: 6