Reputation: 123
I have a problem with downloading all Airflow variables from the code.
There is an opportunity to export from UI, but i haven't found any way to do it programatically.
I discovered only Variable.get('variable_name')
method which returns one Airflow variable.
There is no variants of getting the list of Airflow variables.
Searching in the source code didn't help as well. Do you know some easy way?
Thank you in advance.
Upvotes: 4
Views: 12607
Reputation: 21
Taking into account all propositions listed above, here is a code snippet that can be used to export all Airflow variables and store them in your GCS:
import datetime
import pendulum
import os
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
local_tz = pendulum.timezone("Europe/Paris")
default_dag_args = {
'start_date': datetime.datetime(2020, 6, 18, tzinfo=local_tz),
'email_on_failure': False,
'email_on_retry': False
}
with DAG(dag_id='your_dag_id',
schedule_interval='00 3 * * *',
default_args=default_dag_args,
catchup=False,
user_defined_macros={
'env': os.environ
}) as dag:
start = DummyOperator(
task_id='start',
)
export_task = BashOperator(
task_id='export_var_task',
bash_command='airflow variables --export variables.json; gsutil cp variables.json your_cloud_storage_path',
)
start >> \
export_task
Upvotes: 1
Reputation: 187
I like the answer above about using the Airflow CLI, but it is also possible to extract all variables from a purely python point of view as well (so no need to do weird tricks to get it)
Use this code snippet:
from airflow.utils.db import create_session
from airflow.models import Variable
# a db.Session object is used to run queries against
# the create_session() method will create (yield) a session
with create_session() as session:
# By calling .query() with Variable, we are asking the airflow db
# session to return all variables (select * from variables).
# The result of this is an iterable item similar to a dict but with a
# slightly different signature (object.key, object.val).
airflow_vars = {var.key: var.val for var in session.query(Variable)}
The above method will query the Airflow sql database and return all variables. Using a simple dictionary comprehension will allow you to remap the return values to a 'normal' dictionary.
The db.session.query
will raise a sqlalchemy.exc.OperationalError
if it is unable to connect to a running Airflow db instance.
If you (for whatever reason) wish to mock create_session as part of a unittest, this snippet can be used:
from unittest import TestCase
from unittest.mock import patch, MagicMock
import contextlib
import json
mock_data = {
"foo": {
"bar": "baz"
}
}
airflow_vars = ... # reference to an output (dict) of aforementioned method
class TestAirflowVariables(TestCase)
@contextlib.contextmanager
def create_session(self):
"""Helper that mocks airflow.settings.Session().query() result signature
This is achieved by yielding a mocked airflow.settings.Session() object
"""
session = MagicMock()
session.query.return_value = [
# for the purpose of this test mock_data is converted to json where
# dicts are encountered.
# You will have to modify the above method to parse data from airflow
# correctly (it will send json objects, not dicts)
MagicMock(key=k, val=json.dumps(v) if isinstance(v, dict) else v)
for k, v in mock_data.items()
]
yield session
@patch("airflow.utils.db")
def test_data_is_correctly_parsed(self, db):
db.create_session = self.create_session
self.assertDictEqual(airflow_vars, mock_data)
Note: you will have to change the patch to however you are importing the create_session
method in the file you are referencing. I only got it to work by importing up until airflow.utils.db
and calling db.create_session
in the aforementioned method.
Hope this helps! Good luck :)
Upvotes: 9
Reputation: 36
I have issue with using BashOperator for this use case, so I copied the result of the bashcommand to a variable and used it inside my program.
import subprocess
output = (subprocess.check_output("airflow variables", shell=True)).decode('utf-8').split('pid=')[1].split()[1:-1]
print(output)
Upvotes: 0
Reputation: 18824
You can use Airflow CLI to export variables to a file and then read it from your Python code.
airflow variables --export FILEPATH
Programmatically you can use the BashOperator
to achieve this.
Upvotes: 16