Greation
Greation

Reputation: 73

Airflow - Inserting a task depedency after a for loop final task

I am trying to run an Airflow pipeline where I have a main Postgres task, then a Python looped task, followed by a final postgres task, so the dag resembles something like this

            PythonOperator
              |Task B.1|
 Postgres     |Task B. |      Postgres
 Task A ------|Task B. |-----> Task C
              |Task B.n|

So, I have written this dag:

import glob
import logging
import os
from datetime import datetime, timedelta
import pandas as pd
from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import execute_values

dag_default_args = {
    'owner': 'xxx',
    'start_date': datetime.now() - timedelta(days=2),
    'email': [],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=1),
    'depends_on_past': False,
    'wait_for_downstream': False,
}

dag = DAG(
    dag_id='xxx',
    default_args=dag_default_args,
    schedule_interval="@daily",
    catchup=True,
    max_active_runs=1,
    concurrency=5
)

AIRFLOW_HOME = os.getenv('AIRFLOW_HOME')

def insert_data_func(filename,**kwargs):
    #path to read csv files
    df = pd.read_csv(f'./dags/data/{filename}')  
        
    # Data transformations
    df.dropna(inplace = True ,how='all')
    df["id"] = pd.to_numeric(df["id"])

    ps_pg_hook = PostgresHook(postgres_conn_id="postgres")
    conn_ps = ps_pg_hook.get_conn()

    if len(df) > 0:
        column_names = ['COLUMN1']

        values = df[column_names].to_dict('split')
        values = values['data']

        insert_sql = """
                    INSERT INTO TABLE COLUMN1) 
                    VALUES %s
                    """
                    
        result = execute_values(conn_ps.cursor(), insert_sql, values, page_size=len(df))
        conn_ps.commit()
    else:
        None

    return None

create_psql_table= PostgresOperator(
    task_id="create_psql_table",
    postgres_conn_id="postgres",
    sql="""
        CREATE SCHEMA IF NOT EXISTS SCHEMA;
        CREATE TABLE IF NOT EXISTS SCHEMA.TABLE(
            COLUMN1                            INT,
            );
    """,
    dag=dag
)

create_fact_table = PostgresOperator(
    task_id="create_fact_table",
    postgres_conn_id="postgres",
    sql="""
        CREATE TABLE IF NOT EXISTS SCHEMA.TABLE1 (
            select 
            );
    """,
    dag=dag
)



files = ['1.csv', '2.csv', '3.csv']


for listing in files:
    insert_data = PythonOperator(
        task_id=f'insert_data_func_{listing}',
        python_callable=insert_data_func,
        op_kwargs={'filename': listing},
         provide_context=True,
        dag=dag
    )
    create_psql_table >> insert_data

[insert_data] >> create_fact_table

But that's only having the create_fact_table happening after 3.csv is run, and I would like to be run after all the CSVs are processed.

So, my question is, is there a way to

  1. create a dynamic task? and

  2. a silly one, but I am learning python, still. Is there a way to have the for loop list of files without specifying the file names? Just that they start with a letter and are CSVs?

Upvotes: 1

Views: 2173

Answers (1)

Tomasz Urbaszek
Tomasz Urbaszek

Reputation: 808

Do create_psql_table >> insert_data >> create_fact_table in your for loop. In this way create_fact_table will be executed only once when all insert_data_funcX task are successfully executed.

Also it's a good idea to avoid . in task_ids as this symbol is somehow related with subdags and can cause some troubles in some cases.

EDIT: This (mind that there's no [insert_data] >> create_fact_table at the end)

for listing in files:
    insert_data = PythonOperator(
        task_id=f'insert_data_func_{listing}',
        python_callable=insert_data_func,
        op_kwargs={'filename': listing},
        provide_context=True,
        dag=dag
    )
    create_psql_table >> insert_data >> create_fact_table

wil result in: enter image description here

So the create_fact_table will be executed once.

Upvotes: 2

Related Questions