Reputation: 91
I'm starting with Airflow. I need to get an access token from my PostgreSQL database, then I have to query an API with the SimpleHttpOperator function using that access token.
This is my code:
from airflow.models import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python import PythonOperator
from datetime import datetime
import json
default_args = {
'start_date':datetime(2021, 1, 1)
}
def _get_access_token():
request = "SELECT access_token FROM access_token"
postgres_hook = PostgresHook(postgres_conn_id="postgres_default")
connection = postgres_hook.get_conn()
cursor = connection.cursor()
cursor.execute(request)
jobs = cursor.fetchall()
access_token = ([i[0] for i in jobs])
return access_token
with DAG('get_broadworks_subscribers', schedule_interval='@once',
default_args = default_args,
catchup=False) as dag:
# Tasks
get_access_token = PythonOperator(
task_id='get_access_token',
python_callable=_get_access_token
)
get_subscribers_list = SimpleHttpOperator(
task_id = 'get_subscribers_list',
http_conn_id = 'webex',
endpoint = 'v1/broadworks/subscribers/',
method = 'GET',
authorization = "Bearer" + " " + access_token[0],
headers = {
"Authorization": "authorization"
},
response_filter = lambda response: json.loads(response.text),
log_response = True
)
get_access_token >> get_subscribers_list
I get the following error:
authorization = "Bearer" + " " + access_token[0],
NameError: name 'access_token' is not defined
I hope you can give me a hand, thank you very much in advance.
Upvotes: 4
Views: 1006
Reputation: 16079
You are probably expecting that python function will return the value to be used later in your code. This is not how Airflow works. Tasks do not share data between them tasks can share metadata via Xcom.
PythonOperator
return value is pushed to xcom (table in the metastore). Then downstream task may read that value and use it if the field is templated. Also there is no authorization
parameter in SimpleHttpOperator
.
So your code can be something like:
get_subscribers_list = SimpleHttpOperator(
task_id = 'get_subscribers_list',
http_conn_id = 'webex',
endpoint = 'v1/broadworks/subscribers/',
method = 'GET',
headers = {
"Authorization": """Bearer {{ task_instance.xcom_pull(task_ids="get_access_token") }} """
},
response_filter = lambda response: json.loads(response.text),
log_response = True
)
Since headers
is templated you can pull the xcom value from upstream task.
Note: I don't recommend passing tokens like that. You might want to consider storing it in Airflow Variable securely. It will also save you the trouble of querying it from the database in a separate task. If you will store the it in Variable all you will need to change is:
headers = {
"Authorization": """Bearer {{ var.value.get('my_var_name') }} """
}
Note that Airflow automatically mask value if the key contains any of 'password', 'secret', 'passwd', 'authorization', 'api_key', 'apikey', 'access_token'
however if you choose to use a key that doesn't contain any of those you can still hide it if you will add the string to sensitive_var_conn_names
in airflow.cfg for more information about this see the docs.
Upvotes: 2