Reputation: 51
We typically start Airflow DAGs with the trigger_dag
CLI command. For example:
airflow trigger_dag my_dag --conf '{"field1": 1, "field2": 2}'
We access this conf in our operators using context[‘dag_run’].conf
Sometimes when the DAG breaks at some task, we'd like to "update" the conf and restart the broken task (and downstream dependencies) with this new conf. For example:
new conf --> {"field1": 3, "field2": 4}
Is it possible to “update” the dag_run conf with a new json string like this?
Would be interested in hearing thoughts on this, other solutions, or potentially ways to avoid this situation to begin with.
Working with Apache Airflow v1.10.3
Thank you very much in advance.
Upvotes: 5
Views: 7231
Reputation: 5155
Updating conf after a dag run has been created isn't as straight forward as reading from conf, because conf is read from the dag_run metadata table whenever it's used after a dag run has been created. While Variables have methods to both write to and read from a metadata table, dag runs only let you read.
I agree that Variables are a useful tool, but when you have k=v pairs that you only want to use for a single run, it gets complicated and messy.
Below is an operator that will let you update a dag_run's conf after instantiation (tested in v1.10.10):
#! /usr/bin/env python3
"""Operator to overwrite a dag run's conf after creation."""
import os
from airflow.models import BaseOperator
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.operator_helpers import context_to_airflow_vars
class UpdateConfOperator(BaseOperator):
"""Updates an existing DagRun's conf with `given_conf`.
Args:
given_conf: A dictionary of k:v values to update a DagRun's conf with. Templated.
replace: Whether or not `given_conf` should replace conf (True)
or be used to update the existing conf (False).
Defaults to True.
"""
template_fields = ("given_conf",)
ui_color = "#ffefeb"
@apply_defaults
def __init__(self, given_conf: Dict, replace: bool = True, *args, **kwargs):
super().__init__(*args, **kwargs)
self.given_conf = given_conf
self.replace = replace
@staticmethod
def update_conf(given_conf: Dict, replace: bool = True, **context) -> None:
@provide_session
def save_to_db(dag_run, session):
session.add(dag_run)
session.commit()
dag_run.refresh_from_db()
dag_run = context["dag_run"]
# When there's no conf provided,
# conf will be None if scheduled or {} if manually triggered
if replace or not dag_run.conf:
dag_run.conf = given_conf
elif dag_run.conf:
# Note: dag_run.conf.update(given_conf) doesn't work
dag_run.conf = {**dag_run.conf, **given_conf}
save_to_db(dag_run)
def execute(self, context):
# Export context to make it available for callables to use.
airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
self.log.debug(
"Exporting the following env vars:\n%s",
"\n".join(["{}={}".format(k, v) for k, v in airflow_context_vars.items()]),
)
os.environ.update(airflow_context_vars)
self.update_conf(given_conf=self.given_conf, replace=self.replace, **context)
Example usage:
CONF = {"field1": 3, "field2": 4}
with DAG(
"some_dag",
# schedule_interval="*/1 * * * *",
schedule_interval=None,
max_active_runs=1,
catchup=False,
) as dag:
t_update_conf = UpdateConfOperator(
task_id="update_conf", given_conf=CONF,
)
t_print_conf = BashOperator(
task_id="print_conf",
bash_command="echo {{ dag_run['conf'] }}",
)
t_update_conf >> t_print_conf
Upvotes: 5
Reputation: 11607
This seems like a good use-case of Airflow Variable
s. If you were to read your configs from Variables you can easily see and modify the configuration inputs from the Airflow UI itself.
You can even go creative and automate that updation of config (which is now stored in a Variable) before re-running a Task / DAG via another Airflow task itself. See With code, how do you update and airflow variable
Upvotes: 0