Reputation: 344
I have a list of tables I want to run my script through. It works successfully when I do one table at a time but when I try a for loop above the tasks, it run all the tables at once giving me multiple errors.
Here is my code:
def create_tunnel_postgres():
psql_host = ''
psql_port = 5432
ssh_host= ''
ssh_port = 22
ssh_username = ''
pkf = paramiko.RSAKey.from_private_key(StringIO(Variable.get('my_key')))
server = SSHTunnelForwarder(
(ssh_host, 22),
ssh_username=ssh_username,
ssh_private_key=pkf,
remote_bind_address=(psql_host, 5432))
return server
def conn_postgres_internal(server):
"""
Using the server connect to the internal postgres
"""
conn = psycopg2.connect(
database='pricing',
user= Variable.get('postgres_db_user'),
password= Variable.get('postgres_db_key'),
host=server.local_bind_host,
port=server.local_bind_port,
)
return conn
def gzip_postgres_table(**kwargs):
"""
path='/path/{}.csv'.format(table_name)
server_postgres = create_tunnel_postgres()
server_postgres.start()
etl_conn = conn_postgres_internal(server_postgres)
cur=etl_conn.cursor()
cur.execute("""
select * from schema.db.{} limit 100;
""".format(table_name))
result = cur.fetchall()
column_names = [i[0] for i in cur.description]
fp = gzip.open(path, 'wt')
myFile = csv.writer(fp,delimiter=',')
myFile.writerow(column_names)
myFile.writerows(result)
fp.close()
etl_conn.close()
server_postgres.stop()
#------------------------------------------------------------------------------------------------------------------------------------------------
default_args = {
'owner': 'mae',
'depends_on_past':False,
'start_date': datetime(2020,1,1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1)
}
tables= ['table1','table2']
s3_folder='de'
current_timestamp=datetime.now()
#Element'S VARIABLES
dag = DAG('dag1',
description = 'O',
default_args=default_args,
max_active_runs=1,
schedule_interval= '@once',
#schedule_interval='hourly'
catchup = False )
for table_name in pricing_table_name:
t1 = PythonOperator(
task_id='{}_gzip_table'.format(table_name),
python_callable= gzip_postgres_table,
provide_context=True,
op_kwargs={'table_name':table_name,'s3_folder':s3_folder,'current_timestamp':current_timestamp},
dag = dag)
Is there a way to run table1 first..let it finish and then run table 2? I tried doing that with the for table_name in tables: but to no avail. Any ideas or suggestions would help.
Upvotes: 1
Views: 6924
Reputation: 1
dag = DAG(dag_id='you_DAG',default_args=default_args,schedule_interval='10 6 * * *',max_active_runs=1 --- HERE execute only 1 task)
Upvotes: 0
Reputation: 139
i thing You need DAG like this
Code for it:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
import sys
sys.path.append('../')
from mssql_loader import core #program code, which start load
from mssql_loader import locals #local variables, contains dictionaries with name
def contact_load(typ,db):
core.starter(typ=typ,db=db)
return 'MSSQL LOADED '+db['DBpseudo']+'.'+typ
dag = DAG('contact_loader', description='MSSQL sqlcontact.uka.local loader to GBQ',
schedule_interval='0 7 * * *',
start_date=datetime(2017, 3, 20), catchup=False)
start_operator = DummyOperator(task_id='ROBO_task', retries=3, dag=dag)
for v in locals.TABLES:
for db in locals.DB:
task = PythonOperator(
task_id=db['DBpseudo']+'_mssql_' + v, #create Express_mssql_fast , UKA_mssql_important and etc
python_callable=contact_load,
op_kwargs={'typ': v,'db':db},
retries=3,
dag=dag,
)
start_operator >> task #create parent-child connection to from first task to other
Upvotes: 1
Reputation: 1797
I see 3 way of solving this.
Upvotes: 1
Reputation: 21
I saw your code, and it seems like you're creating multiple DAG tasks using looping statement, which runs the task in parallel.
There are certain ways to achieve your requirement.
airflow.executors.sequential_executor.SequentialExecutor which will only run task instances sequentially.
https://airflow.apache.org/docs/stable/start.html#quick-start
Create a script(Python) and use it as PythonOperator that repeats your current function for number of tables.
You can limit your airflow workers to 1 in its airflow.cfg
config file.
Steps:
open airflow.cfg from your airflow root(AIRFLOW_HOME).
set/update
parallelism = 1
restart your airflow.
this should work.
Upvotes: 1
Reputation: 817
Your for
is creating multiple tasks for your tables processing, this will parallelize the execution of the tasks by default on airflow.
You can either set the number of workers in the airflow config file to 1, or create only 1 task and run your loop inside the task, which will then be executed synchronously.
Upvotes: 1