Reputation: 91
I have created a subdag dynamically. Everything working properly, main_dag is running fine. Its PythonOperator function is being called. But Python callable in Subdag are not being called. Kindly help me. As I am new to Airflow, so got and merged this code from different sources.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from copy import deepcopy
import airflow
main_default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2019, 12, 16),
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
def sub_dag_method_a():
"""
sub dag method a
"""
import pdb;pdb.set_trace()
print('Subdag method a')
return 'a'
def sub_dag_method_b():
"""
sub dag method a
"""
print('Subdag method b')
return 'b'
# sub dag arguments
def create_subdag(dag_parent, dag_id_child_prefix, db_name, dag_child_id, start_date, schedule_interval):
# dag params
# import pdb;pdb.set_trace()
dag_id_child = '%s.%s_%s' % (dag_parent,dag_child_id,dag_id_child_prefix)
# main default
default_args_copy = deepcopy(main_default_args)
subdag = DAG(dag_id=dag_id_child, schedule_interval=schedule_interval,
start_date=start_date, default_args=default_args_copy)
# operators
tid_check = 'dummy_task_start_%s' % dag_id_child_prefix
print(tid_check)
method_start = DummyOperator(task_id=tid_check, dag=subdag, default_args=default_args_copy)
tid_check = 'get_from_facebook_and_save_to_db_%s' % dag_id_child_prefix
print(tid_check)
method_a = PythonOperator(task_id=tid_check, dag=subdag, default_args=default_args_copy,
python_callable=sub_dag_method_a)
tid_check = 'save_to_es_fetch_from_db_%s' % dag_id_child_prefix
print(tid_check)
method_b = PythonOperator(task_id=tid_check, dag=subdag, default_args=default_args_copy,
provide_context=True,
python_callable=sub_dag_method_b)
tid_check = 'dummy_task_end_%s' % dag_id_child_prefix
print(tid_check)
method_end = DummyOperator(task_id=tid_check, dag=subdag, default_args=default_args_copy)
method_start >> method_a
method_a >> method_b
method_b >> method_end
return subdag
# main default arguments
# main dag
main_dag = DAG('main_dag', default_args=deepcopy(main_default_args), schedule_interval=timedelta(hours=1),
start_date=datetime(2019, 12, 16))
# hello_world
def hello_world():
"""
Hello world
"""
i=0
subdag = create_subdag('main_dag', str(i), 'db_name'+str(i), 'task_dag',
main_dag.start_date, main_dag.schedule_interval)
# import pdb;pdb.set_trace()
sd_op = SubDagOperator(task_id='task_dag_'+str(i), subdag=subdag, dag=main_dag)
return subdag
# main task
main_task = PythonOperator(task_id='main_task', python_callable=hello_world, dag=main_dag)
# hello_world()
the output by running the command
airflow test 'main_dag' 'main_task' 2019/12/16
is
(alphavu3711_1) Noamans-MacBook-Pro-2:python3 noamanfaisalbinbadar$ airflow test 'main_dag' 'main_task' 2019/12/16
[2019-12-16 21:56:10,312] {settings.py:252} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=4100
[2019-12-16 21:56:11,119] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-12-16 21:56:11,119] {dagbag.py:92} INFO - Filling up the DagBag from /Users/noamanfaisalbinbadar/code/alphavu/production/python3/fb_messenger_airflow/dags
[2019-12-16 21:56:11,415] {taskinstance.py:630} INFO - Dependencies all met for <TaskInstance: main_dag.main_task 2019-12-16T00:00:00+00:00 [success]>
[2019-12-16 21:56:11,433] {taskinstance.py:630} INFO - Dependencies all met for <TaskInstance: main_dag.main_task 2019-12-16T00:00:00+00:00 [success]>
[2019-12-16 21:56:11,433] {taskinstance.py:841} INFO -
--------------------------------------------------------------------------------
[2019-12-16 21:56:11,433] {taskinstance.py:842} INFO - Starting attempt 2 of 1
[2019-12-16 21:56:11,433] {taskinstance.py:843} INFO -
--------------------------------------------------------------------------------
[2019-12-16 21:56:11,433] {taskinstance.py:862} INFO - Executing <Task(PythonOperator): main_task> on 2019-12-16T00:00:00+00:00
[2019-12-16 21:56:11,455] {python_operator.py:105} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=main_dag
AIRFLOW_CTX_TASK_ID=main_task
AIRFLOW_CTX_EXECUTION_DATE=2019-12-16T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2019-12-16T00:00:00+00:00
dummy_task_start_0
get_from_facebook_and_save_to_db_0
save_to_es_fetch_from_db_0
dummy_task_end_0
[2019-12-16 21:56:11,459] {python_operator.py:114} INFO - Done. Returned value was: <DAG: main_dag.task_dag_0>
the new approach after your answer is this
from fb_messenger.airflow_helpers.get_conversation_ids_page_wise import GetConversationIdsPageWise
from fb_messenger.airflow_helpers.get_conversation_messages_info import GetConversationMessagesInfo
from fb_messenger.airflow_helpers.save_to_es import SaveToES
from copy import deepcopy
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
import airflow
main_default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
def create_subdag(dag_name, dag_name_prefix, start_date, schedule_interval, conversation_info):
# dag params
# import pdb;pdb.set_trace()
dag_name_processed = '%s_%s' % (dag_name, dag_name_prefix)
# main default
default_args_copy = deepcopy(main_default_args)
subdag = DAG(dag_name_processed, schedule_interval=schedule_interval, start_date=start_date,
default_args=deepcopy(main_default_args))
def sub_dag_method_a(**kwargs):
"""
sub dag method a
"""
print('Subdag method a')
print(kwargs['conversation_id'])
print(kwargs['updated_time'])
return 'a'
def sub_dag_method_b(**kwargs):
"""
sub dag method a
"""
print('Subdag method b')
print(kwargs['conversation_id'])
print(kwargs['updated_time'])
return 'b'
with subdag:
# operators
tid_check = 'dummy_task_start_%s' % dag_name_prefix
# print(tid_check)
method_start = DummyOperator(task_id=tid_check, dag=subdag)
# new tid
tid_check = 'get_from_facebook_and_save_to_db_%s' % dag_name_prefix
# print(tid_check)
method_a = PythonOperator(task_id=tid_check, dag=subdag, python_callable=sub_dag_method_a,
op_kwargs={'conversation_id':conversation_info['id'],
'updated_time':conversation_info['updated_time']})
# new tid
tid_check = 'save_to_es_fetch_from_db_%s' % dag_name_prefix
# print(tid_check)
method_b = PythonOperator(task_id=tid_check, dag=subdag, python_callable=sub_dag_method_b,
op_kwargs={'conversation_id':conversation_info['id'],
'updated_time':conversation_info['updated_time']})
# new tid
tid_check = 'dummy_task_end_%s' % dag_name_prefix
# print(tid_check)
method_end = DummyOperator(task_id=tid_check, dag=subdag)
# dependencies
method_start >> method_a
method_a >> method_b
method_b >> method_end
# return subdag
return subdag
start_date_ = datetime.now() + timedelta(minutes=-1)
# getting list of dictionaries
conversation_infos = GetConversationIdsPageWise().get_all()
print(conversation_infos)
print(len(conversation_infos))
for conversation_info in conversation_infos:
print(conversation_info)
i = conversation_info['id']
subdag_name = 'main_dag'
sub_dag = create_subdag(subdag_name, str(i), start_date_, timedelta(minutes=2), conversation_info)
print(sub_dag)
But I am unable to create multiple Dags even
Upvotes: 2
Views: 1453
Reputation: 20077
It's impossible to create SubDAG dynamically in "execute" method of another operator. This is effectively what you try to achieve.
DAG and their dependencies (including SubDags) are created while parsing the python code and constructing the objects available in the top level of the python file. In this case it is creating the DAG and assigning it to main_dag variable and then creating PythonOperator and assigning it to the main_task. This is all that happens during scheduling. The PythonOperator callable is not called then.
When the task is executed and the callable is called - it is already too late to create DAG. By that time all the DAG structure and dependencies are already created and scheduling is done.
Basically you can only create new DAGs (including SubDAGs) in the scheduler - scheduler parses all the python code and creates DAGs and it's tasks. Then particular tasks (for example the PythonOperator you mentioned) are executed when their time and dependencies are due in one of the Workers (not in the Scheduler) and even if they create DAGS, it does not impact scheduler and the created DAGs are never scheduled.
Upvotes: 4
Reputation: 166
I think , you are trying to create subdag dynamically based on conversation info . I have found few issues in your updated code
Below code is working for me
from fb_messenger.airflow_helpers.get_conversation_ids_page_wise import GetConversationIdsPageWise
from fb_messenger.airflow_helpers.get_conversation_messages_info import GetConversationMessagesInfo
from fb_messenger.airflow_helpers.save_to_es import SaveToES
from copy import deepcopy
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
import airflow
main_default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
}
def create_subdag(dag_name, dag_name_prefix, start_date, schedule_interval, conversation_info):
# dag params
# import pdb;pdb.set_trace()
dag_name_processed = '%s.%s' % (dag_name, dag_name_prefix)
# main default
default_args_copy = deepcopy(main_default_args)
subdag = DAG(dag_name_processed, schedule_interval=schedule_interval, start_date=start_date,
default_args=deepcopy(main_default_args))
def sub_dag_method_a(**kwargs):
"""
sub dag method a
"""
print('Subdag method a')
print(kwargs['conversation_id'])
print(kwargs['updated_time'])
return 'a'
def sub_dag_method_b(**kwargs):
"""
sub dag method a
"""
print('Subdag method b')
print(kwargs['conversation_id'])
print(kwargs['updated_time'])
return 'b'
with subdag:
# operators
tid_check = 'dummy_task_start_%s' % dag_name_prefix
# print(tid_check)
method_start = DummyOperator(task_id=tid_check, dag=subdag)
# new tid
tid_check = 'get_from_facebook_and_save_to_db_%s' % dag_name_prefix
# print(tid_check)
method_a = PythonOperator(task_id=tid_check, dag=subdag, python_callable=sub_dag_method_a,
op_kwargs={'conversation_id':conversation_info['id'],
'updated_time':conversation_info['updated_time']})
# new tid
tid_check = 'save_to_es_fetch_from_db_%s' % dag_name_prefix
# print(tid_check)
method_b = PythonOperator(task_id=tid_check, dag=subdag, python_callable=sub_dag_method_b,
op_kwargs={'conversation_id':conversation_info['id'],
'updated_time': conversation_info['updated_time']})
# new tid
tid_check = 'dummy_task_end_%s' % dag_name_prefix
# print(tid_check)
method_end = DummyOperator(task_id=tid_check, dag=subdag)
# dependencies
method_start >> method_a
method_a >> method_b
method_b >> method_end
# return subdag
return subdag
sd = datetime.now()
main_dag = DAG('main_dag', default_args=deepcopy(main_default_args), schedule_interval=timedelta(hours=1),
start_date = sd)
# getting list of dictionaries
conversation_infos = GetConversationIdsPageWise().get_all()
print(conversation_infos)
print(len(conversation_infos))
for conversation_info in conversation_infos:
print(conversation_info)
i = conversation_info['id']
subdag_name = 'main_dag'
t_sub_dag = SubDagOperator(
subdag=create_subdag(subdag_name, str(i), sd, timedelta(minutes=2), conversation_info),
task_id=str(i),
dag=main_dag
)
Upvotes: 0