Mariano Munarriz
Mariano Munarriz

Reputation: 91

Airflow: how to pass a variable obtained from DB to SimpleHttpOperator

I'm starting with Airflow. I need to get an access token from my PostgreSQL database, then I have to query an API with the SimpleHttpOperator function using that access token.

This is my code:

from airflow.models import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python import PythonOperator

from datetime import datetime
import json


default_args = {
    'start_date':datetime(2021, 1, 1)
}

def _get_access_token():
    request = "SELECT access_token FROM access_token"
    postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
    connection = postgres_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute(request)
    jobs = cursor.fetchall()
    access_token = ([i[0] for i in jobs])

    return access_token


with DAG('get_broadworks_subscribers', schedule_interval='@once',
    default_args = default_args,
    catchup=False) as dag:

    # Tasks

    get_access_token = PythonOperator(
    task_id='get_access_token', 
    python_callable=_get_access_token
    )

    get_subscribers_list = SimpleHttpOperator(
        task_id = 'get_subscribers_list',
        http_conn_id = 'webex',
        endpoint = 'v1/broadworks/subscribers/',
        method = 'GET',
        authorization = "Bearer" + " " + access_token[0],
        headers = {
            "Authorization": "authorization"
        },
        response_filter = lambda response: json.loads(response.text),
        log_response = True
    )

get_access_token >> get_subscribers_list

I get the following error:

    authorization = "Bearer" + " " + access_token[0],
NameError: name 'access_token' is not defined

I hope you can give me a hand, thank you very much in advance.

Upvotes: 4

Views: 1006

Answers (1)

Elad Kalif
Elad Kalif

Reputation: 16079

You are probably expecting that python function will return the value to be used later in your code. This is not how Airflow works. Tasks do not share data between them tasks can share metadata via Xcom.

PythonOperator return value is pushed to xcom (table in the metastore). Then downstream task may read that value and use it if the field is templated. Also there is no authorization parameter in SimpleHttpOperator.

So your code can be something like:

get_subscribers_list = SimpleHttpOperator(
    task_id = 'get_subscribers_list',
    http_conn_id = 'webex',
    endpoint = 'v1/broadworks/subscribers/',
    method = 'GET',
    headers = {
        "Authorization": """Bearer {{ task_instance.xcom_pull(task_ids="get_access_token") }} """
    },
    response_filter = lambda response: json.loads(response.text),
    log_response = True
)

Since headers is templated you can pull the xcom value from upstream task.

Note: I don't recommend passing tokens like that. You might want to consider storing it in Airflow Variable securely. It will also save you the trouble of querying it from the database in a separate task. If you will store the it in Variable all you will need to change is:

    headers = {
        "Authorization": """Bearer {{ var.value.get('my_var_name') }} """
    }

Note that Airflow automatically mask value if the key contains any of 'password', 'secret', 'passwd', 'authorization', 'api_key', 'apikey', 'access_token' however if you choose to use a key that doesn't contain any of those you can still hide it if you will add the string to sensitive_var_conn_names in airflow.cfg for more information about this see the docs.

Upvotes: 2

Related Questions