red888
red888

Reputation: 31642

Does airflow support the cloud-sql-python-connector for connecting to CloudSQL Postgres?

Airflow uses SQLAlchemy to connect to the DB: https://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html#database-uri

Google has official support for connecting to CloudSQL via the cloud-sql-python-connector package: https://cloud.google.com/sql/docs/postgres/connect-connectors#examples

It looks like it even supports the IAM integration for Postgres which makes things even more seamless: https://github.com/GoogleCloudPlatform/cloud-sql-python-connector/commit/6703232d6ea624f868e750c8c49c3bb1151f1f1e#diff-f5a708e43b65d84f59aa4fd685978087acc7e903b740f3e45c68356dbb2fd7b4R77

This is so you don't have to use the CloudSQL proxy (or use private networking/firewalling) and can connect directly in python like this:

# The Cloud SQL Python Connector can be used along with SQLAlchemy using the
# 'creator' argument to 'create_engine'
def init_connection_engine() -> sqlalchemy.engine.Engine:
    def getconn() -> pg8000.dbapi.Connection:
        conn: pg8000.dbapi.Connection = connector.connect(
            os.environ["POSTGRES_CONNECTION_NAME"],
            "pg8000",
            user=os.environ["POSTGRES_USER"],
            password=os.environ["POSTGRES_PASS"],
            db=os.environ["POSTGRES_DB"],
        )
        return conn

    engine = sqlalchemy.create_engine(
        "postgresql+pg8000://",
        creator=getconn,
    )
    engine.dialect.description_encoding = None
    return engine

Can I install cloud-sql-python-connector in airflow and just set the connection to use it in my sql_alchemy_conn settings?

Upvotes: 1

Views: 679

Answers (1)

Catherine O
Catherine O

Reputation: 1011

To answer your question, yes, you can install cloud-sql-python-connector in airflow. Tried it on my end as well.

enter image description here

Below is the sample configuration on how to setup sql_alchemy_conn.

def init_connection_engine() -> sqlalchemy.engine.Engine:
        def getconn() -> pg8000.dbapi.Connection:
            conn: pg8000.dbapi.Connection = connector.connect(
                CONNECTION_NAME,
                "pg8000",
                user=MYSQL_USER,
                password=MYSQL_PASS,
                db=DB_NAME,
            )
            return conn

        engine = sqlalchemy.create_engine(
            "postgresql+pg8000://",
            creator=getconn,
        )
        engine.dialect.description_encoding = None

        query = sqlalchemy.text("SELECT * FROM entries")

        with engine.connect() as db_conn:
            results = db_conn.execute(query)
            for result in results:
                logging.info(result)
       
       process_data = python.PythonOperator(
        task_id='query_postgre',
        python_callable=init_connection_engine,
        provide_context=True
        )

Upvotes: 2

Related Questions