Ace Haidrey
Ace Haidrey

Reputation: 1228

Airflow unpause dag programmatically?

I have a dag that we'll deploy to multiple different airflow instances and in our airflow.cfg we have dags_are_paused_at_creation = True but for this specific dag we want it to be turned on without having to do so manually by clicking on the UI. Is there a way to do it programmatically?

Upvotes: 17

Views: 23916

Answers (9)

chr1st14n_ru4
chr1st14n_ru4

Reputation: 1

a bash approach of srinivasreddyramaram comment could be this one, please notice that you need predefined variables like $DEFAULT_PASSWORD, $ENDPOINT_URL, and Dag id.

The default password belongs to the password used in the Airflow UI, the endpoint URL, is the URL to access your UI, and finally, the dag id is self-explanatory.

A possible approach to implement these rest API calls could be a bootstrap dag, that updates the airflow-providers-amazon library from version 6.0.0 to 6.1.0, because you need to assume some role arn.

restApiResponse=$(curl \
        -s \
        --header "Content-Type: application/json" \
        --request PATCH \
        --user "admin:$DEFAULT_PASSWORD" \
        -w "%{stdout}" \
        --data '{"is_paused": false}' \
        "${ENDPOINT_URL}/api/v1/dags/$DagId?update_mask=is_paused")

echo $restApiResponse

Upvotes: 0

Shubham Asabe
Shubham Asabe

Reputation: 99

You can do this using in the python operator of any dag to pause and unpause the dags programatically . This is the best approch i found instead of using cli just pass the list of dags and rest is take care

from airflow.models import DagModel
dag_id = "dag_name"
dag = DagModel.get_dagmodel(dag_id)
dag.set_is_paused(is_paused=False)

And just if you want to check if it is paused or not it will return boolean

dag.is_paused()

Upvotes: 2

formulaRoot
formulaRoot

Reputation: 1

airflow pause dag_id.

has been discontinued.

You will have to use:

airflow dags pause dag_id

Upvotes: 0

srinivasreddyramaram
srinivasreddyramaram

Reputation: 47

Airflow's REST API provides a way using the DAG patch API: we need to update the dag with query parameter ?update_mask=is_paused and send boolean as request body.

Ref: https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/patch_dag

Upvotes: 1

Vivek
Vivek

Reputation: 344

The following cli command should work per the recent docs.

airflow dags unpause dag_id

https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#unpause

Upvotes: 2

A.B
A.B

Reputation: 20445

I think you are looking for unpause ( not pause)

airflow unpause DAG_ID

Upvotes: 3

x97Core
x97Core

Reputation: 1514

airflow-rest-api-plugin plugin can also be used to programmatically pause tasks.

Pauses a DAG

Available in Airflow Version: 1.7.0 or greater

GET - http://{HOST}:{PORT}/admin/rest_api/api?api=pause

Query Arguments:

dag_id - string - The id of the dag

subdir (optional) - string - File location or directory from which to look for the dag

Examples:

http://{HOST}:{PORT}/admin/rest_api/api?api=pause&dag_id=test_id

See for more details: https://github.com/teamclairvoyant/airflow-rest-api-plugin

Upvotes: 10

Ace Haidrey
Ace Haidrey

Reputation: 1228

I created the following function to do so if anyone else runs into this issue:

import airflow.settings
from airflow.models import DagModel
def unpause_dag(dag):
    """
    A way to programatically unpause a DAG.
    :param dag: DAG object
    :return: dag.is_paused is now False
    """
    session = airflow.settings.Session()
    try:
        qry = session.query(DagModel).filter(DagModel.dag_id == dag.dag_id)
        d = qry.first()
        d.is_paused = False
        session.commit()
    except:
        session.rollback()
    finally:
        session.close()

Upvotes: 29

Anmol Karki
Anmol Karki

Reputation: 361

supply your dag_id and run this command on your command line.

airflow pause dag_id.

For more information on the airflow command line interface: https://airflow.incubator.apache.org/cli.html

Upvotes: 9

Related Questions