Reputation: 955
We are using Airflow as a scheduler. I want to invoke a simple bash operator in a DAG. The bash script needs a password as an argument to do further processing.
How can I store a password securely in Airflow (config/variables/connection
) and access it in dag definition file?
I am new to Airflow and Python so a code snippet will be appreciated.
Upvotes: 44
Views: 90125
Reputation: 150
This answer may be a little late, but I think it is important and still up to date:
When using
BaseHook.get_hook(conn_id=conn_id)
the credentials are logged as plain text into a log file by Airflow (we observed in version 2.2.3) under the path
/var/log/airflow/scheduler/<date>/<dag>/
You sure don't want you login and password there.
To avoid this, use get_connection_from_secrets
like in:
from airflow.models import Connection
Connection.get_connection_from_secrets("<connection>")
This does not log any credentials into a file.
Upvotes: 15
Reputation: 8001
You can store the password in a Hook - this will be encrypted so long as you have setup your fernet key.
Here is how you can create a connection via the UI:
To access this password:
from airflow.hooks.base_hook import BaseHook # Deprecated in Airflow 2
connection = BaseHook.get_connection("username_connection")
password = connection.password # This is a getter that returns the unencrypted password.
Update since Airflow 2 launch
The library airflow.hooks.base_hook
has been deprecated and you must use airflow.hooks.base
instead.
Upvotes: 85
Reputation: 207
I wrote the following utility method for creating a Session to an external db configuration saved in Airflow:
from airflow.hooks.base_hook import BaseHook
from sqlalchemy.orm.session import sessionmaker
def get_session(conn_id):
dbhook = BaseHook.get_hook(conn_id=conn_id)
engine = create_engine(dbhook.get_uri())
Session = sessionmaker()
session = Session(bind=engine)
return session
Upvotes: 1
Reputation: 1051
from airflow.hooks.base_hook import BaseHook
conn = BaseHook.get_connection('bigquery_connection')
print(conn.get_extra())
These conn.get_extra()
will give you JSON of the settings stored in the connection.
Upvotes: 11
Reputation: 2628
Use the GUI in the admin/connections tab.
The answer that truly works, with persisting the connection in Airflow programatically, works as in the snippet below.
In the below example myservice
represents some external credential cache.
When using the approach below, you can store your connections that you manage externally inside of airflow. Without having to poll the service from within every dag/task. Instead you can rely on airflow's connection mechanism and you don't have to lose out on the Operators that Airflow exposes either (should your organisation allow this).
The trick is using airflow.utils.db.merge_conn
to handle the setting of your created connection object.
from airflow.utils.db import provide_session, merge_conn
creds = {"user": myservice.get_user(), "pwd": myservice.get_pwd()
c = Connection(conn_id=f'your_airflow_connection_id_here',
login=creds["user"],
host=None)
c.set_password(creds["pwd"])
merge_conn(c)
merge_conn is build-in and used by airflow itself to initialise empty connections. However it will not auto-update. for that you will have to use your own helper function.
from airflow.utils.db import provide_session
@provide_session
def store_conn(conn, session=None):
from airflow.models import Connection
if session.query(Connection).filter(Connection.conn_id == conn.conn_id).first():
logging.info("Connection object already exists, attempting to remove it...")
session.delete(session.query(Connection).filter(Connection.conn_id == conn.conn_id).first())
session.add(conn)
session.commit()
Upvotes: 5
Reputation: 211
This is what I've used.
def add_slack_token(ds, **kwargs):
""""Add a slack token"""
session = settings.Session()
new_conn = Connection(conn_id='slack_token')
new_conn.set_password(SLACK_LEGACY_TOKEN)
if not (session.query(Connection).filter(Connection.conn_id ==
new_conn.conn_id).first()):
session.add(new_conn)
session.commit()
else:
msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
msg = msg.format(conn_id=new_conn.conn_id)
print(msg)
dag = DAG(
'add_connections',
default_args=default_args,
schedule_interval="@once")
t2 = PythonOperator(
dag=dag,
task_id='add_slack_token',
python_callable=add_slack_token,
provide_context=True,
)
Upvotes: 1
Reputation: 2591
You can store the password in airflow variables, https://airflow.incubator.apache.org/ui.html#variable-view
from airflow.models import Variable
command = """
echo "{{ params.my_param }}"
"""
task = BashOperator(
task_id='templated',
bash_command=command,
params={'my_param': MyPass},
dag=dag)
Upvotes: 6
Reputation: 2329
In this case I would use a PythonOperator from which you are able to get a Hook
on your database connection using
hook = PostgresHook(postgres_conn_id=postgres_conn_id)
. You can then call get_connection
on this hook which will give you a Connection object from which you can get the host, login and password for your database connection.
Finally, use for example subprocess.call(your_script.sh, connection_string)
passing the connection details as a parameter.
This method is a bit convoluted but it does allow you to keep the encryption for database connections in Airflow. Also, you should be able to pull this strategy into a separate Operator class inheriting the base behaviour from PythonOperator but adding the logic for getting the hook and calling the bash script.
Upvotes: 2