Reputation: 347
I am trying to find a way for connection pool management for external connections created in Airflow.
Airflow version : 2.1.0
Python Version : 3.9.5
Airflow DB : SQLite
External connections created : MySQL and Snowflake
I know there are properties in airflow.cfg file
sql_alchemy_pool_enabled = True
sql_alchemy_pool_size = 5
But these properties are for managing the airflow internal DB which is SQLite in my case.
I have few tasks which are reading or writing data in MySQL and Snowflake.
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="Some Insert query",
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE
)
and
insert_mysql_task = MySqlOperator(task_id='insert_record', mysql_conn_id='mysql_default', sql="some insert query", dag=dag)
Reading data from MySQL
def get_records():
mysql_hook = MySqlHook(mysql_conn_id="mysql_default")
records = mysql_hook.get_records(sql=r"""Some select query""")
print(records)
What I observed is a new session is getting created for each of the task (there are multiple tasks in the same dag) for Snowflake, haven't verified the same for MySQL.
Is there a way to maintain connection pool for external connections (in my case Snowflake and MySQL) or any other way to run all the queries in same DAG in same session ?
Thanks
Upvotes: 5
Views: 4257
Reputation: 15979
Airflow offers using Pools as a way to limit concurrency to an external service.
You can create a Pool via the UI: Menu -> Admin -> Pools
Or with CLI :
airflow pools set NAME slots
The pool has slots which define how many tasks that use the resources can run in parallel. If the pool is full the tasks will queued untill a slot is opened.
To use the pool in operators is simply adding pool=Name
to the Operator.
In your case assuming Pool
was created with the name snowflake then:
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
dag=dag,
snowflake_conn_id=SNOWFLAKE_CONN_ID,
sql="Some Insert query",
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
pool='snowflake',
)
Note that by default a task occupy 1 slot in the pool but this is configurable. A task may occupy more than 1 slot if using pool_slots
example:
snowflake_insert = SnowflakeOperator(
task_id='insert_snowflake',
...
pool='snowflake',
pool_slots=2,
)
Upvotes: 2