Reputation: 771
I have a DAG which has a triggerdagrunoperator to self trigger the same DAG. The DAG code is shared below.
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
default_args = {
'owner': 'ownername',
'depends_on_past': False,
'start_date': datetime(2021,3,2,10,1),
'email': [***@mail.com],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('self_trigger_dag', schedule_interval=None, max_active_runs=1, catchup=False, default_args=default_args) as dag:
sleep_task = BashOperator(
task_id='sleep_task',
bash_command='sleep 180',
dag=dag,
)
bash_command =BashOperator(
task_id='run_command',
bash_command="my bash_command",
use_legacy_sql=False,
dag=dag,
)
dag_trigger = TriggerDagRunOperator(
task_id='trigger_self',
trigger_dag_id='self_trigger_dag',
dag=dag)
sleep_task >> bash_command >> dag_trigger
The requirement is that the DAG should be scheduled from 8 AM to 9 PM. I cannot give expression like '* 8-21 * * *' as this is a self triggering DAG. Kindly help me with the correct crontab expression or any other alternative.
Thanks in advance.
Upvotes: 1
Views: 280
Reputation: 771
I was able to achieve the requirement using a time_check DAG that controls the process of my main DAG. Here I am triggering the main DAG at 12 PM and switching it off at 1 AM using time_check DAG.
The time_check DAG code is shared below:
from datetime import timedelta, datetime,timezone,date
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.models import Variable
default_args = {
'owner': 'owner',
'depends_on_past': False,
'start_date': datetime(2021,3,2,10,1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
airflow_variable = 'stream_dag_status'
def check_current_time(**context):
now_utc = datetime.now(timezone.utc)
start_utc = now_utc.replace(hour=12, minute=0, second=0,microsecond=0)
end_utc = now_utc.replace(hour=1, minute=0, second=0,microsecond=0)+ timedelta(days=1)
if now_utc >= start_utc and now_utc < end_utc:
Variable.set(airflow_variable, 'START')
start_stream = 'start_stream'
return start_stream
else:
update_variable = 'update_variable'
return update_variable
def set_airflow_variable(**context):
Variable.set(airflow_variable, 'STOP')
with DAG('time_check', schedule_interval='0 12,1 * * *', max_active_runs=1, catchup=False,
default_args=default_args) as dag:
check_current_time = BranchPythonOperator(task_id='check_current_time', python_callable=check_current_time,
provide_context=True,
dag=dag)
start_stream = TriggerDagRunOperator(
task_id='start_stream',
trigger_dag_id='STREAMING_TEST',
dag=dag)
update_variable = PythonOperator(task_id='update_variable', python_callable=set_airflow_variable,
provide_context=True,
dag=dag)
stop_stream_email = EmailOperator(task_id='stop_stream_email', to='[email protected]',
subject='Streaming DAG is OFF now',
html_content="<p>Hi,<br><br>Turning streaming DAG to OFF state<br>", dag=dag)
check_current_time >> start_stream
check_current_time >> update_variable >> stop_stream_email
The self triggering DAG code is shared below:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.models import Variable
default_args = {
'owner': 'owner',
'depends_on_past': False,
'start_date': datetime(2021,3,2,10,1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
airflow_variable = 'stream_dag_status'
def check_airflow_variable(**context):
status = Variable.get(airflow_variable)
if status == 'START':
sleep_task = 'sleep_task'
return sleep_task
else:
send_email = 'email_notify'
return send_email
with DAG('STREAMING_TEST', schedule_interval=None, max_active_runs=1, catchup=False, default_args=default_args) as dag:
check_airflow_variable = BranchPythonOperator(task_id='check_airflow_variable', python_callable=check_airflow_variable,
provide_context=True,
dag=dag)
sleep_task = BashOperator(
task_id='sleep_task',
bash_command='sleep 60',
dag=dag,
)
start_group = DummyOperator(task_id='start_split', depends_on_past=False)
dag_trigger = TriggerDagRunOperator(
task_id='trigger_self',
trigger_dag_id='STREAMING_TEST',
dag=dag)
email_notify = EmailOperator(task_id='email_notify', to='xxx@[email protected]',
subject='Variable value is STOP',
html_content="<p>Hi,<br><br>Streaming is stopped<br>", dag=dag)
check_airflow_variable >> sleep_task >> start_group >> dag_trigger
check_airflow_variable >> email_notify
Upvotes: 1