phoenire
phoenire

Reputation: 51

How can I define a connection ID during DAG execution for a PostgresOperator?

I am using "Setup and Teardown" from Airflow to setup a temporary Database.

My first Operator is a BashOperator that start a PosGreSQL container. I need to get that container IP back from the BashOperator to use it in the Connection object for the PostgresOperator or to set it as AIRFLOW_CONN_POSTGRES_DEFAULT env variable for the PostgresOperator. Because the docker DNS with the image name is working.

I tried to read from the DAG file the XCOM from the BashOperator but, as far as I know, XCOM is meant to be read by another task not by the parent DAG file. Sadly, the PostgresOperator has no parameter to set the IP from an XCOM from another component.

from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.bash_operator import BashOperator
from airflow import DAG

from airflow.models.connection import Connection

# Define the DAG with the specified default_args
with DAG(
    "postgis_test_dag",
    schedule_interval=None,
) as dag:
    postgis_password = "postgis"
    postgis_user = "postgis"

    create_postgis = BashOperator(
        task_id="create_postgis",
        do_xcom_push=True,
        bash_command=f"docker run --name airflow_postgis -d postgis/postgis:17-3.5-alpine",
    )

    get_postgis_ip = BashOperator(
        task_id="get_postgis_ip",
        do_xcom_push=True,
        bash_command="docker inspect airflow_postgis -f '{{json.NetworkSettings.Networks}}'",
    )

    # This does not work
    postgis_ip = {{ TaskInstance.xcom_pull(task_ids='get_postgis_ip', key='return_value') }}

    #NEITHER:
    conn_id_postgis = f"postgresql://{postgis_user}:{postgis_password}@{postgis_ip}:5432/postgres"   
    os.environ['AIRFLOW_CONN_POSTGRES_DEFAULT'] = conn_id_postgis
    # NOR
    c = Connection(
        conn_id="postgis",
        host=postgis_ip,
        login="myname",
        port="port",
        password="mypassword")

    setup_postgis = PostgresOperator(
        task_id="setup_postgis",
        #conn_id=conn_id_postgis,
        sql="""SELECT * from network.roads(3381586.523108, 788516.801721)""",
    )

    delete_postgis = BashOperator(
        task_id="delete_postgis",
        bash_command="ID=$(docker container ls | grep postgis  | awk '{print $1}') && docker kill $ID  && docker rm $ID ",
    )

    create_postgis >> get_postgis_ip >> setup_postgis >> delete_postgis.as_teardown(setups=create_postgis)

Thus, I can't find a solution to inject that BashOperator XCOM back to the PostgresOperator component. If someone has an idea or a Airflow concept I didn't understand correctly.

I tried to read from the DAG file the XCOM from the BashOperator but, as far as I know, XCOM is meant to be read by another task not by the parent DAG file. Sadly, the PostgresOperator has no parameter to set the IP from an XCOM from another component.

Upvotes: 0

Views: 13

Answers (0)

Related Questions