Karan Alang
Karan Alang

Reputation: 1061

Airflow Composer - unable to access config file stored in gcp storage, when using PythonOperator

i'm using Airflow Python operator, which calls an API to get data from external system, parses the data and puts in to MongoDB (I guess i can use the SimpleHttpOperator as well).

In the method being called, i need to use the a config file (director_api.cfg), which has the credentials of the external system, and also the credentials for the Mongo instance. The config file is stored in gcp storage bucket

Here is the dag code:

from airflow.operators.python import PythonOperator
from airflow import models
from UpdateDirectorDataInMongo import main
# UpdateDirectorDataInMongo.py has the main method which is called in the PythonOperator task. This is stored in the same bucket as the dag file


with models.DAG(
        'Versa-directorinfo',
        # Continue to run DAG twice per day
        default_args=default_dag_args,
        schedule_interval=None,
        catchup=False,
        ) as dag:
    
        update_director_info = PythonOperator(
        task_id="update_director_info",
        python_callable=main
    )

    update_director_info


## UpdateDirectorDataInMongo.py - code where i try to access the director_api.cfg file

import configparser

self.api_username = parser.get("director-api", "user")
self.api_passwd = parser.get("director-api", "passwd")
self.mongoConnUri = parser.get('mongo', 'mongoConnUri') + "?retryWrites=true&w=majority"
 
# director-api.cfg (location is the same storage bucket as the dag file)

[director-api]
user=<user>
passwd=<passwd>

[mongo]
mongoConnUri=mongodb+srv://<user>:<passwd>@cluster0.w9yss.mongodb.net/<project>

Using the Python Operator, I'm unable to access the config file error is shown below :

-04, 04:19:13 UTC] {taskinstance.py:1776} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 174, in execute
    return_value = self.execute_callable()
  File "/opt/python3.8/lib/python3.8/site-packages/airflow/operators/python.py", line 188, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/airflow/gcs/dags/UpdateDirectorDataInMongo.py", line 86, in main
    customers = getCustomers()
  File "/home/airflow/gcs/dags/UpdateDirectorDataInMongo.py", line 72, in getCustomers
    mongoConnUri = parser.get('mongo', 'mongoConnUri') + "?retryWrites=true&w=majority"
  File "/opt/python3.8/lib/python3.8/configparser.py", line 781, in get
    d = self._unify_values(section, vars)
  File "/opt/python3.8/lib/python3.8/configparser.py", line 1149, in _unify_values
    raise NoSectionError(section) from None
configparser.NoSectionError: No section: 'mongo'

What needs to be done to fix/enable this ? tia !

Upvotes: 0

Views: 819

Answers (2)

Mazlum Tosun
Mazlum Tosun

Reputation: 6572

If your director-api.cfg was added in the root folder of Cloud Composer bucket, you can access to your file with the following way in the UpdateDirectorDataInMongo.py file :

# Root path of Cloud Composer bucket
DAGS_FOLDER = os.getenv("DAGS_FOLDER")

parser.read([f'{DAGS_FOLDER}/director-api.cfg'])

You can also think about another and more secure solution :

  • Add your secret variables in Secret manager
  • In your PythonOperator use the Google Cloud Python client to access to secrets from Secret manager :
from google.cloud import secretmanager

def _get_secret(project, secret_name, version='1'):
    client = secretmanager.SecretManagerServiceClient()
    secret_path = client.secret_version_path(project, secret_name, version)
    secret = client.access_secret_version(secret_path)
    return secret.payload.data.decode('UTF-8')

your_secret_value = _get_secret(your_project, your_key)

Upvotes: 1

Dev Yns
Dev Yns

Reputation: 229

To make it work, you need to have the director-api.cfg file in the same bucket/folder as the UpdateDirectorDataInMongo.py file. Your DAG file is not important in this case, as the PythonOperator uses the UpdateDirectorDataInMongo.py as main.

Your parser should be like : parser.read(['director-api.cfg'])

Upvotes: 1

Related Questions