Reputation: 31642
Airflow uses SQLAlchemy to connect to the DB: https://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri
Google has official support for connecting to CloudSQL via the cloud-sql-python-connector
package: https://cloud.google.com/sql/docs/postgres/connect-connectors#examples
It looks like it even supports the IAM integration for Postgres which makes things even more seamless: https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/commit/6703232d6ea624f868e750c8c49c3bb1151f1f1e#diff-f5a708e43b65d84f59aa4fd685978087acc7e903b740f3e45c68356dbb2fd7b4R77
This is so you don't have to use the CloudSQL proxy (or use private networking/firewalling) and can connect directly in python like this:
# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
# 'creator' argument to 'create_engine'
def init_connection_engine() -> sqlalchemy.engine.Engine:
def getconn() -> pg8000.dbapi.Connection:
conn: pg8000.dbapi.Connection = connector.connect(
os.environ["POSTGRES_CONNECTION_NAME"],
"pg8000",
user=os.environ["POSTGRES_USER"],
password=os.environ["POSTGRES_PASS"],
db=os.environ["POSTGRES_DB"],
)
return conn
engine = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
)
engine.dialect.description_encoding = None
return engine
Can I install cloud-sql-python-connector
in airflow and just set the connection to use it in my sql_alchemy_conn settings?
Upvotes: 1
Views: 679
Reputation: 1011
To answer your question, yes, you can install cloud-sql-python-connector
in airflow. Tried it on my end as well.
Below is the sample configuration on how to setup sql_alchemy_conn
.
def init_connection_engine() -> sqlalchemy.engine.Engine:
def getconn() -> pg8000.dbapi.Connection:
conn: pg8000.dbapi.Connection = connector.connect(
CONNECTION_NAME,
"pg8000",
user=MYSQL_USER,
password=MYSQL_PASS,
db=DB_NAME,
)
return conn
engine = sqlalchemy.create_engine(
"postgresql+pg8000://",
creator=getconn,
)
engine.dialect.description_encoding = None
query = sqlalchemy.text("SELECT * FROM entries")
with engine.connect() as db_conn:
results = db_conn.execute(query)
for result in results:
logging.info(result)
process_data = python.PythonOperator(
task_id='query_postgre',
python_callable=init_connection_engine,
provide_context=True
)
Upvotes: 2