Reputation: 73
I am trying to run an Airflow pipeline where I have a main Postgres task, then a Python looped task, followed by a final postgres task, so the dag resembles something like this
PythonOperator
|Task B.1|
Postgres |Task B. | Postgres
Task A ------|Task B. |-----> Task C
|Task B.n|
So, I have written this dag:
import glob
import logging
import os
from datetime import datetime, timedelta
import pandas as pd
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import execute_values
dag_default_args = {
'owner': 'xxx',
'start_date': datetime.now() - timedelta(days=2),
'email': [],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=1),
'depends_on_past': False,
'wait_for_downstream': False,
}
dag = DAG(
dag_id='xxx',
default_args=dag_default_args,
schedule_interval="@daily",
catchup=True,
max_active_runs=1,
concurrency=5
)
AIRFLOW_HOME = os.getenv('AIRFLOW_HOME')
def insert_data_func(filename,**kwargs):
#path to read csv files
df = pd.read_csv(f'./dags/data/{filename}')
# Data transformations
df.dropna(inplace = True ,how='all')
df["id"] = pd.to_numeric(df["id"])
ps_pg_hook = PostgresHook(postgres_conn_id="postgres")
conn_ps = ps_pg_hook.get_conn()
if len(df) > 0:
column_names = ['COLUMN1']
values = df[column_names].to_dict('split')
values = values['data']
insert_sql = """
INSERT INTO TABLE COLUMN1)
VALUES %s
"""
result = execute_values(conn_ps.cursor(), insert_sql, values, page_size=len(df))
conn_ps.commit()
else:
None
return None
create_psql_table= PostgresOperator(
task_id="create_psql_table",
postgres_conn_id="postgres",
sql="""
CREATE SCHEMA IF NOT EXISTS SCHEMA;
CREATE TABLE IF NOT EXISTS SCHEMA.TABLE(
COLUMN1 INT,
);
""",
dag=dag
)
create_fact_table = PostgresOperator(
task_id="create_fact_table",
postgres_conn_id="postgres",
sql="""
CREATE TABLE IF NOT EXISTS SCHEMA.TABLE1 (
select
);
""",
dag=dag
)
files = ['1.csv', '2.csv', '3.csv']
for listing in files:
insert_data = PythonOperator(
task_id=f'insert_data_func_{listing}',
python_callable=insert_data_func,
op_kwargs={'filename': listing},
provide_context=True,
dag=dag
)
create_psql_table >> insert_data
[insert_data] >> create_fact_table
But that's only having the create_fact_table happening after 3.csv is run, and I would like to be run after all the CSVs are processed.
So, my question is, is there a way to
create a dynamic task? and
a silly one, but I am learning python, still. Is there a way to have the for loop list of files without specifying the file names? Just that they start with a letter and are CSVs?
Upvotes: 1
Views: 2173
Reputation: 808
Do create_psql_table >> insert_data >> create_fact_table
in your for loop. In this way create_fact_table
will be executed only once when all insert_data_funcX
task are successfully executed.
Also it's a good idea to avoid .
in task_ids as this symbol is somehow related with subdags and can cause some troubles in some cases.
EDIT:
This (mind that there's no [insert_data] >> create_fact_table
at the end)
for listing in files:
insert_data = PythonOperator(
task_id=f'insert_data_func_{listing}',
python_callable=insert_data_func,
op_kwargs={'filename': listing},
provide_context=True,
dag=dag
)
create_psql_table >> insert_data >> create_fact_table
So the create_fact_table
will be executed once.
Upvotes: 2