fieldnotes
fieldnotes

Reputation: 51

Is it possible to update/overwrite the Airflow [‘dag_run’].conf?

We typically start Airflow DAGs with the trigger_dag CLI command. For example:

airflow trigger_dag my_dag --conf '{"field1": 1, "field2": 2}'

We access this conf in our operators using context[‘dag_run’].conf

Sometimes when the DAG breaks at some task, we'd like to "update" the conf and restart the broken task (and downstream dependencies) with this new conf. For example:

new conf --> {"field1": 3, "field2": 4}

Is it possible to “update” the dag_run conf with a new json string like this?

Would be interested in hearing thoughts on this, other solutions, or potentially ways to avoid this situation to begin with.

Working with Apache Airflow v1.10.3

Thank you very much in advance.

Upvotes: 5

Views: 7231

Answers (2)

ZaxR
ZaxR

Reputation: 5155

Updating conf after a dag run has been created isn't as straight forward as reading from conf, because conf is read from the dag_run metadata table whenever it's used after a dag run has been created. While Variables have methods to both write to and read from a metadata table, dag runs only let you read.

I agree that Variables are a useful tool, but when you have k=v pairs that you only want to use for a single run, it gets complicated and messy.

Below is an operator that will let you update a dag_run's conf after instantiation (tested in v1.10.10):

#! /usr/bin/env python3
"""Operator to overwrite a dag run's conf after creation."""


import os

from airflow.models import BaseOperator
from airflow.utils.db import provide_session
from airflow.utils.decorators import apply_defaults
from airflow.utils.operator_helpers import context_to_airflow_vars


class UpdateConfOperator(BaseOperator):
    """Updates an existing DagRun's conf with `given_conf`.

    Args:
        given_conf: A dictionary of k:v values to update a DagRun's conf with. Templated.
        replace: Whether or not `given_conf` should replace conf (True)
                 or be used to update the existing conf (False).
                 Defaults to True.

    """

    template_fields = ("given_conf",)
    ui_color = "#ffefeb"

    @apply_defaults
    def __init__(self, given_conf: Dict, replace: bool = True, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.given_conf = given_conf
        self.replace = replace

    @staticmethod
    def update_conf(given_conf: Dict, replace: bool = True, **context) -> None:
        @provide_session
        def save_to_db(dag_run, session):
            session.add(dag_run)
            session.commit()
            dag_run.refresh_from_db()

        dag_run = context["dag_run"]
        # When there's no conf provided,
        # conf will be None if scheduled or {} if manually triggered
        if replace or not dag_run.conf:
            dag_run.conf = given_conf
        elif dag_run.conf:
            # Note: dag_run.conf.update(given_conf) doesn't work
            dag_run.conf = {**dag_run.conf, **given_conf}

        save_to_db(dag_run)

    def execute(self, context):
        # Export context to make it available for callables to use.
        airflow_context_vars = context_to_airflow_vars(context, in_env_var_format=True)
        self.log.debug(
            "Exporting the following env vars:\n%s",
            "\n".join(["{}={}".format(k, v) for k, v in airflow_context_vars.items()]),
        )
        os.environ.update(airflow_context_vars)

        self.update_conf(given_conf=self.given_conf, replace=self.replace, **context)

Example usage:

CONF = {"field1": 3, "field2": 4}
with DAG(
    "some_dag",
    # schedule_interval="*/1 * * * *",
    schedule_interval=None,
    max_active_runs=1,
    catchup=False,
) as dag:
    t_update_conf = UpdateConfOperator(
        task_id="update_conf", given_conf=CONF,
    )
    t_print_conf = BashOperator(
        task_id="print_conf",
        bash_command="echo {{ dag_run['conf'] }}",
    )
    t_update_conf >> t_print_conf

Upvotes: 5

y2k-shubham
y2k-shubham

Reputation: 11607

This seems like a good use-case of Airflow Variables. If you were to read your configs from Variables you can easily see and modify the configuration inputs from the Airflow UI itself.


You can even go creative and automate that updation of config (which is now stored in a Variable) before re-running a Task / DAG via another Airflow task itself. See With code, how do you update and airflow variable

Upvotes: 0

Related Questions