Rajalakshmi
Rajalakshmi

Reputation: 771

crontab expression to schedule a self triggering DAG in Apache Airflow

I have a DAG which has a triggerdagrunoperator to self trigger the same DAG. The DAG code is shared below.

from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator


default_args = {
    'owner': 'ownername',
    'depends_on_past': False,
    'start_date': datetime(2021,3,2,10,1),
    'email': [***@mail.com],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}
with DAG('self_trigger_dag', schedule_interval=None, max_active_runs=1, catchup=False, default_args=default_args) as dag:
    sleep_task = BashOperator(
        task_id='sleep_task',
        bash_command='sleep 180',
        dag=dag,
    )

    bash_command =BashOperator(
        task_id='run_command',
        bash_command="my bash_command",
        use_legacy_sql=False,
        dag=dag,
    )
    
    dag_trigger = TriggerDagRunOperator(
        task_id='trigger_self',
        trigger_dag_id='self_trigger_dag',
        dag=dag)

    sleep_task >> bash_command >> dag_trigger

The requirement is that the DAG should be scheduled from 8 AM to 9 PM. I cannot give expression like '* 8-21 * * *' as this is a self triggering DAG. Kindly help me with the correct crontab expression or any other alternative.

Thanks in advance.

Upvotes: 1

Views: 280

Answers (1)

Rajalakshmi
Rajalakshmi

Reputation: 771

I was able to achieve the requirement using a time_check DAG that controls the process of my main DAG. Here I am triggering the main DAG at 12 PM and switching it off at 1 AM using time_check DAG.

The time_check DAG code is shared below:

from datetime import timedelta, datetime,timezone,date
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.models import Variable


default_args = {
    'owner': 'owner',
    'depends_on_past': False,
    'start_date': datetime(2021,3,2,10,1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

airflow_variable = 'stream_dag_status'


def check_current_time(**context):
    now_utc = datetime.now(timezone.utc)
    start_utc = now_utc.replace(hour=12, minute=0, second=0,microsecond=0)
    end_utc = now_utc.replace(hour=1, minute=0, second=0,microsecond=0)+ timedelta(days=1)

    if now_utc >= start_utc and now_utc < end_utc:
        Variable.set(airflow_variable, 'START')
        start_stream = 'start_stream'
        return start_stream
    else:
        update_variable = 'update_variable'
        return update_variable

def set_airflow_variable(**context):
    Variable.set(airflow_variable, 'STOP')

with DAG('time_check', schedule_interval='0 12,1 * * *', max_active_runs=1, catchup=False,
         default_args=default_args) as dag:
    check_current_time = BranchPythonOperator(task_id='check_current_time', python_callable=check_current_time,
                                    provide_context=True,
                                    dag=dag)

    start_stream = TriggerDagRunOperator(
        task_id='start_stream',
        trigger_dag_id='STREAMING_TEST',
        dag=dag)

    update_variable = PythonOperator(task_id='update_variable', python_callable=set_airflow_variable,
                                    provide_context=True,
                                    dag=dag)

    stop_stream_email = EmailOperator(task_id='stop_stream_email', to='[email protected]',
                                      subject='Streaming DAG is OFF now',
                                      html_content="<p>Hi,<br><br>Turning streaming DAG to OFF state<br>", dag=dag)

    check_current_time >> start_stream
    check_current_time >> update_variable >> stop_stream_email

The self triggering DAG code is shared below:

from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.models import Variable

default_args = {
    'owner': 'owner',
    'depends_on_past': False,
    'start_date': datetime(2021,3,2,10,1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}
airflow_variable = 'stream_dag_status'

def check_airflow_variable(**context):
    status = Variable.get(airflow_variable)
    if status == 'START':
        sleep_task = 'sleep_task'
        return sleep_task
    else:
        send_email = 'email_notify'
        return send_email
        


with DAG('STREAMING_TEST', schedule_interval=None, max_active_runs=1, catchup=False, default_args=default_args) as dag:
    
    check_airflow_variable = BranchPythonOperator(task_id='check_airflow_variable', python_callable=check_airflow_variable,
                                    provide_context=True,
                                    dag=dag)    
    sleep_task = BashOperator(
        task_id='sleep_task',
        bash_command='sleep 60',
        dag=dag,
    )
    start_group = DummyOperator(task_id='start_split', depends_on_past=False)

    dag_trigger = TriggerDagRunOperator(
        task_id='trigger_self',
        trigger_dag_id='STREAMING_TEST',
        dag=dag)

    email_notify = EmailOperator(task_id='email_notify', to='xxx@[email protected]',
                                      subject='Variable value is STOP',
                                      html_content="<p>Hi,<br><br>Streaming is stopped<br>", dag=dag)

    check_airflow_variable >> sleep_task >> start_group >> dag_trigger
    check_airflow_variable >> email_notify

Upvotes: 1

Related Questions