Anup
Anup

Reputation: 955

Store and access password using Apache airflow

We are using Airflow as a scheduler. I want to invoke a simple bash operator in a DAG. The bash script needs a password as an argument to do further processing.

How can I store a password securely in Airflow (config/variables/connection) and access it in dag definition file?

I am new to Airflow and Python so a code snippet will be appreciated.

Upvotes: 44

Views: 90125

Answers (8)

Code Monkey
Code Monkey

Reputation: 150

This answer may be a little late, but I think it is important and still up to date:

When using

BaseHook.get_hook(conn_id=conn_id)

the credentials are logged as plain text into a log file by Airflow (we observed in version 2.2.3) under the path /var/log/airflow/scheduler/<date>/<dag>/

You sure don't want you login and password there.

To avoid this, use get_connection_from_secretslike in:

from airflow.models import Connection
Connection.get_connection_from_secrets("<connection>")

This does not log any credentials into a file.

Upvotes: 15

Daniel Lee
Daniel Lee

Reputation: 8001

You can store the password in a Hook - this will be encrypted so long as you have setup your fernet key.

Here is how you can create a connection via the UI:

Main Menu Then: Create Connection

To access this password:

from airflow.hooks.base_hook import BaseHook  # Deprecated in Airflow 2

connection = BaseHook.get_connection("username_connection")
password = connection.password # This is a getter that returns the unencrypted password.

Update since Airflow 2 launch

The library airflow.hooks.base_hook has been deprecated and you must use airflow.hooks.base instead.

Upvotes: 85

Nick Falco
Nick Falco

Reputation: 207

I wrote the following utility method for creating a Session to an external db configuration saved in Airflow:

from airflow.hooks.base_hook import BaseHook
from sqlalchemy.orm.session import sessionmaker


def get_session(conn_id):
    dbhook = BaseHook.get_hook(conn_id=conn_id)
    engine = create_engine(dbhook.get_uri())
    Session = sessionmaker()
    session = Session(bind=engine)
    return session

Upvotes: 1

Nikhil Redij
Nikhil Redij

Reputation: 1051

from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('bigquery_connection')
print(conn.get_extra())

These conn.get_extra() will give you JSON of the settings stored in the connection.

Upvotes: 11

Havnar
Havnar

Reputation: 2628

Use the GUI in the admin/connections tab.

The answer that truly works, with persisting the connection in Airflow programatically, works as in the snippet below.

In the below example myservice represents some external credential cache.

When using the approach below, you can store your connections that you manage externally inside of airflow. Without having to poll the service from within every dag/task. Instead you can rely on airflow's connection mechanism and you don't have to lose out on the Operators that Airflow exposes either (should your organisation allow this).

The trick is using airflow.utils.db.merge_conn to handle the setting of your created connection object.

    from airflow.utils.db import provide_session, merge_conn




    creds = {"user": myservice.get_user(), "pwd": myservice.get_pwd() 

    c = Connection(conn_id=f'your_airflow_connection_id_here',
                   login=creds["user"],
                   host=None)
    c.set_password(creds["pwd"])
    merge_conn(c)

merge_conn is build-in and used by airflow itself to initialise empty connections. However it will not auto-update. for that you will have to use your own helper function.

from airflow.utils.db import provide_session

@provide_session
def store_conn(conn, session=None):
    from airflow.models import Connection
    if session.query(Connection).filter(Connection.conn_id == conn.conn_id).first():
        logging.info("Connection object already exists, attempting to remove it...")
        session.delete(session.query(Connection).filter(Connection.conn_id == conn.conn_id).first())

    session.add(conn)
    session.commit()

Upvotes: 5

Don Bar
Don Bar

Reputation: 211

This is what I've used.

    def add_slack_token(ds, **kwargs):
        """"Add a slack token"""
        session = settings.Session()

        new_conn = Connection(conn_id='slack_token')
        new_conn.set_password(SLACK_LEGACY_TOKEN)

        if not (session.query(Connection).filter(Connection.conn_id == 
         new_conn.conn_id).first()):
        session.add(new_conn)
        session.commit()
        else:
            msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
            msg = msg.format(conn_id=new_conn.conn_id)
            print(msg)

    dag = DAG(
        'add_connections',
        default_args=default_args,
        schedule_interval="@once")


    t2 = PythonOperator(
        dag=dag,
        task_id='add_slack_token',
        python_callable=add_slack_token,
        provide_context=True,
    )

Upvotes: 1

Chengzhi
Chengzhi

Reputation: 2591

You can store the password in airflow variables, https://airflow.incubator.apache.org/ui.html#variable-view

  1. Create a variable with key&value in UI, for example, mypass:XXX
  2. Import Variable from airflow.models import Variable
  3. MyPass = Variable.get("mypass")
  4. Pass MyPass to your bash script:
command = """
          echo "{{ params.my_param }}"
          """



task = BashOperator(
        task_id='templated',
        bash_command=command,
        params={'my_param': MyPass},
        dag=dag)

Upvotes: 6

Matthijs Brouns
Matthijs Brouns

Reputation: 2329

In this case I would use a PythonOperator from which you are able to get a Hook on your database connection using hook = PostgresHook(postgres_conn_id=postgres_conn_id). You can then call get_connection on this hook which will give you a Connection object from which you can get the host, login and password for your database connection.

Finally, use for example subprocess.call(your_script.sh, connection_string) passing the connection details as a parameter.

This method is a bit convoluted but it does allow you to keep the encryption for database connections in Airflow. Also, you should be able to pull this strategy into a separate Operator class inheriting the base behaviour from PythonOperator but adding the logic for getting the hook and calling the bash script.

Upvotes: 2

Related Questions