Coder123
Coder123

Reputation: 344

How to individually run task separately in airflow?

I have a list of tables I want to run my script through. It works successfully when I do one table at a time but when I try a for loop above the tasks, it run all the tables at once giving me multiple errors.

Here is my code:

def create_tunnel_postgres():

    psql_host = ''
    psql_port = 5432
    ssh_host= ''
    ssh_port = 22
    ssh_username = ''
    pkf = paramiko.RSAKey.from_private_key(StringIO(Variable.get('my_key')))

    server = SSHTunnelForwarder(
        (ssh_host, 22),
        ssh_username=ssh_username,
        ssh_private_key=pkf,
        remote_bind_address=(psql_host, 5432))

    return server

def conn_postgres_internal(server):
    """
    Using the server connect to the internal postgres
    """
    conn = psycopg2.connect(
        database='pricing',
        user= Variable.get('postgres_db_user'),
        password= Variable.get('postgres_db_key'),
        host=server.local_bind_host,
        port=server.local_bind_port,
    )

    return conn

def gzip_postgres_table(**kwargs):
    """

    path='/path/{}.csv'.format(table_name)
    server_postgres = create_tunnel_postgres()
    server_postgres.start()
    etl_conn = conn_postgres_internal(server_postgres)
    cur=etl_conn.cursor()
    cur.execute("""
        select * from schema.db.{} limit 100;
        """.format(table_name))
    result = cur.fetchall()
    column_names = [i[0] for i in cur.description]
    fp = gzip.open(path, 'wt')
    myFile = csv.writer(fp,delimiter=',')
    myFile.writerow(column_names)
    myFile.writerows(result)
    fp.close()
    etl_conn.close()
    server_postgres.stop()


#------------------------------------------------------------------------------------------------------------------------------------------------

default_args = {
    'owner': 'mae',
    'depends_on_past':False,
    'start_date': datetime(2020,1,1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}


tables= ['table1','table2']
s3_folder='de'
current_timestamp=datetime.now()



#Element'S VARIABLES

dag = DAG('dag1',
          description = 'O',
          default_args=default_args,
          max_active_runs=1,
          schedule_interval= '@once',
          #schedule_interval='hourly'
          catchup = False )


for table_name in pricing_table_name:
    t1 = PythonOperator(
        task_id='{}_gzip_table'.format(table_name),
        python_callable= gzip_postgres_table,
        provide_context=True,
        op_kwargs={'table_name':table_name,'s3_folder':s3_folder,'current_timestamp':current_timestamp},
        dag = dag)

Is there a way to run table1 first..let it finish and then run table 2? I tried doing that with the for table_name in tables: but to no avail. Any ideas or suggestions would help.

Upvotes: 1

Views: 6924

Answers (5)

Oliveira
Oliveira

Reputation: 1

dag = DAG(dag_id='you_DAG',default_args=default_args,schedule_interval='10 6 * * *',max_active_runs=1 --- HERE execute only 1 task)

Upvotes: 0

Nikolai Kamenskiy
Nikolai Kamenskiy

Reputation: 139

i thing You need DAG like this enter image description here

Code for it:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

import sys
sys.path.append('../')
from mssql_loader import core #program code, which start load 
from mssql_loader import locals #local variables, contains dictionaries with name
def contact_load(typ,db):

    core.starter(typ=typ,db=db)
    return 'MSSQL LOADED '+db['DBpseudo']+'.'+typ

dag = DAG('contact_loader', description='MSSQL sqlcontact.uka.local loader to GBQ',
          schedule_interval='0 7 * * *',
          start_date=datetime(2017, 3, 20), catchup=False)

start_operator = DummyOperator(task_id='ROBO_task', retries=3, dag=dag)


for v in locals.TABLES:
    for db in locals.DB:        
        task = PythonOperator(
            task_id=db['DBpseudo']+'_mssql_' + v, #create Express_mssql_fast , UKA_mssql_important and etc
            python_callable=contact_load,
            op_kwargs={'typ': v,'db':db},
            retries=3,
            dag=dag,
        )

        start_operator >> task #create parent-child connection to from first task to other

Upvotes: 1

PeterRing
PeterRing

Reputation: 1797

I see 3 way of solving this.

Upvotes: 1

Gaurav Shimpi
Gaurav Shimpi

Reputation: 21

I saw your code, and it seems like you're creating multiple DAG tasks using looping statement, which runs the task in parallel.

There are certain ways to achieve your requirement.

  1. use sequential_executor.

airflow.executors.sequential_executor.SequentialExecutor which will only run task instances sequentially.

https://airflow.apache.org/docs/stable/start.html#quick-start

  1. create a script that works according to your need.

Create a script(Python) and use it as PythonOperator that repeats your current function for number of tables.

  1. limit airflow executors(parallelism) to 1.

You can limit your airflow workers to 1 in its airflow.cfg config file.

Steps:

open airflow.cfg from your airflow root(AIRFLOW_HOME).

set/update parallelism = 1

restart your airflow.

this should work.

Upvotes: 1

Lucas Abbade
Lucas Abbade

Reputation: 817

Your for is creating multiple tasks for your tables processing, this will parallelize the execution of the tasks by default on airflow.

You can either set the number of workers in the airflow config file to 1, or create only 1 task and run your loop inside the task, which will then be executed synchronously.

Upvotes: 1

Related Questions