noamanfaisal
noamanfaisal

Reputation: 91

Python operator is not being called in dynamic subdag in Airflow

I have created a subdag dynamically. Everything working properly, main_dag is running fine. Its PythonOperator function is being called. But Python callable in Subdag are not being called. Kindly help me. As I am new to Airflow, so got and merged this code from different sources.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from copy import deepcopy
import airflow

main_default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2019, 12, 16),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

def sub_dag_method_a():
    """
    sub dag method a
    """
    import pdb;pdb.set_trace()
    print('Subdag method a')
    return 'a'

def sub_dag_method_b():
    """
    sub dag method a
    """
    print('Subdag method b')
    return 'b'

# sub dag arguments
def create_subdag(dag_parent, dag_id_child_prefix, db_name, dag_child_id, start_date, schedule_interval):
    # dag params

    # import pdb;pdb.set_trace()
    dag_id_child = '%s.%s_%s' % (dag_parent,dag_child_id,dag_id_child_prefix)
    # main default
    default_args_copy = deepcopy(main_default_args)
    subdag = DAG(dag_id=dag_id_child, schedule_interval=schedule_interval,
    start_date=start_date, default_args=default_args_copy)
    # operators
    tid_check = 'dummy_task_start_%s' % dag_id_child_prefix
    print(tid_check)
    method_start = DummyOperator(task_id=tid_check, dag=subdag, default_args=default_args_copy)

    tid_check = 'get_from_facebook_and_save_to_db_%s' % dag_id_child_prefix
    print(tid_check)

    method_a = PythonOperator(task_id=tid_check, dag=subdag, default_args=default_args_copy,
                                 python_callable=sub_dag_method_a)

    tid_check = 'save_to_es_fetch_from_db_%s' % dag_id_child_prefix
    print(tid_check)
    method_b = PythonOperator(task_id=tid_check, dag=subdag, default_args=default_args_copy,
                              provide_context=True,
                                 python_callable=sub_dag_method_b)

    tid_check = 'dummy_task_end_%s' % dag_id_child_prefix
    print(tid_check)
    method_end = DummyOperator(task_id=tid_check, dag=subdag, default_args=default_args_copy)

    method_start >> method_a
    method_a >> method_b
    method_b >> method_end

    return subdag

# main default arguments
# main dag
main_dag = DAG('main_dag', default_args=deepcopy(main_default_args), schedule_interval=timedelta(hours=1),
start_date=datetime(2019, 12, 16))

# hello_world
def hello_world():
    """
    Hello world
    """
    i=0
    subdag = create_subdag('main_dag', str(i), 'db_name'+str(i), 'task_dag',
    main_dag.start_date, main_dag.schedule_interval)
        # import pdb;pdb.set_trace()
    sd_op = SubDagOperator(task_id='task_dag_'+str(i), subdag=subdag, dag=main_dag)
    return subdag


# main task
main_task = PythonOperator(task_id='main_task', python_callable=hello_world, dag=main_dag)
# hello_world()

the output by running the command

airflow test 'main_dag' 'main_task' 2019/12/16

is

(alphavu3711_1) Noamans-MacBook-Pro-2:python3 noamanfaisalbinbadar$ airflow test 'main_dag' 'main_task' 2019/12/16
[2019-12-16 21:56:10,312] {settings.py:252} INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=4100
[2019-12-16 21:56:11,119] {__init__.py:51} INFO - Using executor SequentialExecutor
[2019-12-16 21:56:11,119] {dagbag.py:92} INFO - Filling up the DagBag from /Users/noamanfaisalbinbadar/code/alphavu/production/python3/fb_messenger_airflow/dags
[2019-12-16 21:56:11,415] {taskinstance.py:630} INFO - Dependencies all met for <TaskInstance: main_dag.main_task 2019-12-16T00:00:00+00:00 [success]>
[2019-12-16 21:56:11,433] {taskinstance.py:630} INFO - Dependencies all met for <TaskInstance: main_dag.main_task 2019-12-16T00:00:00+00:00 [success]>
[2019-12-16 21:56:11,433] {taskinstance.py:841} INFO - 
--------------------------------------------------------------------------------
[2019-12-16 21:56:11,433] {taskinstance.py:842} INFO - Starting attempt 2 of 1
[2019-12-16 21:56:11,433] {taskinstance.py:843} INFO - 
--------------------------------------------------------------------------------
[2019-12-16 21:56:11,433] {taskinstance.py:862} INFO - Executing <Task(PythonOperator): main_task> on 2019-12-16T00:00:00+00:00
[2019-12-16 21:56:11,455] {python_operator.py:105} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_ID=main_dag
AIRFLOW_CTX_TASK_ID=main_task
AIRFLOW_CTX_EXECUTION_DATE=2019-12-16T00:00:00+00:00
AIRFLOW_CTX_DAG_RUN_ID=scheduled__2019-12-16T00:00:00+00:00
dummy_task_start_0
get_from_facebook_and_save_to_db_0
save_to_es_fetch_from_db_0
dummy_task_end_0
[2019-12-16 21:56:11,459] {python_operator.py:114} INFO - Done. Returned value was: <DAG: main_dag.task_dag_0>

the new approach after your answer is this

from fb_messenger.airflow_helpers.get_conversation_ids_page_wise import GetConversationIdsPageWise
from fb_messenger.airflow_helpers.get_conversation_messages_info import GetConversationMessagesInfo
from fb_messenger.airflow_helpers.save_to_es import SaveToES
from copy import deepcopy
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
import airflow


main_default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

def create_subdag(dag_name, dag_name_prefix, start_date, schedule_interval, conversation_info):
    # dag params
    # import pdb;pdb.set_trace()
    dag_name_processed = '%s_%s' % (dag_name, dag_name_prefix)
    # main default
    default_args_copy = deepcopy(main_default_args)
    subdag = DAG(dag_name_processed, schedule_interval=schedule_interval, start_date=start_date,
                 default_args=deepcopy(main_default_args))
    def sub_dag_method_a(**kwargs):
        """
        sub dag method a
        """
        print('Subdag method a')
        print(kwargs['conversation_id'])
        print(kwargs['updated_time'])
        return 'a'

    def sub_dag_method_b(**kwargs):
        """
        sub dag method a
        """
        print('Subdag method b')
        print(kwargs['conversation_id'])
        print(kwargs['updated_time'])
        return 'b'

    with subdag:
    # operators
        tid_check = 'dummy_task_start_%s' % dag_name_prefix
        # print(tid_check)
        method_start = DummyOperator(task_id=tid_check, dag=subdag)
        # new tid
        tid_check = 'get_from_facebook_and_save_to_db_%s' % dag_name_prefix
        # print(tid_check)
        method_a = PythonOperator(task_id=tid_check, dag=subdag, python_callable=sub_dag_method_a,
                                op_kwargs={'conversation_id':conversation_info['id'], 
                                'updated_time':conversation_info['updated_time']})
        # new tid
        tid_check = 'save_to_es_fetch_from_db_%s' % dag_name_prefix
        # print(tid_check)
        method_b = PythonOperator(task_id=tid_check, dag=subdag, python_callable=sub_dag_method_b,
                                op_kwargs={'conversation_id':conversation_info['id'], 
                                'updated_time':conversation_info['updated_time']})
        # new tid
        tid_check = 'dummy_task_end_%s' % dag_name_prefix
        # print(tid_check)
        method_end = DummyOperator(task_id=tid_check, dag=subdag)
        # dependencies
        method_start >> method_a
        method_a >> method_b
        method_b >> method_end
    # return subdag
    return subdag

start_date_ = datetime.now() + timedelta(minutes=-1)
# getting list of dictionaries
conversation_infos = GetConversationIdsPageWise().get_all()
print(conversation_infos)
print(len(conversation_infos))
for conversation_info in conversation_infos:
    print(conversation_info)
    i = conversation_info['id']
    subdag_name = 'main_dag'
    sub_dag = create_subdag(subdag_name, str(i), start_date_, timedelta(minutes=2), conversation_info)
    print(sub_dag)


But I am unable to create multiple Dags even

Upvotes: 2

Views: 1453

Answers (2)

Jarek Potiuk
Jarek Potiuk

Reputation: 20077

It's impossible to create SubDAG dynamically in "execute" method of another operator. This is effectively what you try to achieve.

DAG and their dependencies (including SubDags) are created while parsing the python code and constructing the objects available in the top level of the python file. In this case it is creating the DAG and assigning it to main_dag variable and then creating PythonOperator and assigning it to the main_task. This is all that happens during scheduling. The PythonOperator callable is not called then.

When the task is executed and the callable is called - it is already too late to create DAG. By that time all the DAG structure and dependencies are already created and scheduling is done.

Basically you can only create new DAGs (including SubDAGs) in the scheduler - scheduler parses all the python code and creates DAGs and it's tasks. Then particular tasks (for example the PythonOperator you mentioned) are executed when their time and dependencies are due in one of the Workers (not in the Scheduler) and even if they create DAGS, it does not impact scheduler and the created DAGs are never scheduled.

Upvotes: 4

Kapil
Kapil

Reputation: 166

I think , you are trying to create subdag dynamically based on conversation info . I have found few issues in your updated code

  1. It should have a main dag object which needs to be passed to subdag function.
  2. Need to call subdag function using subdag operator which is missing in your code .
  3. Subdag name needs to match "parent_dag_name"."child_dag_name" pattern, not "parent_dag_name"_"child_dag_name"

Below code is working for me

from fb_messenger.airflow_helpers.get_conversation_ids_page_wise import GetConversationIdsPageWise
from fb_messenger.airflow_helpers.get_conversation_messages_info import GetConversationMessagesInfo
from fb_messenger.airflow_helpers.save_to_es import SaveToES
from copy import deepcopy
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
import airflow


main_default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

def create_subdag(dag_name, dag_name_prefix, start_date, schedule_interval, conversation_info):
    # dag params
    # import pdb;pdb.set_trace()
    dag_name_processed = '%s.%s' % (dag_name, dag_name_prefix)
    # main default
    default_args_copy = deepcopy(main_default_args)
    subdag = DAG(dag_name_processed, schedule_interval=schedule_interval, start_date=start_date,
                 default_args=deepcopy(main_default_args))
    def sub_dag_method_a(**kwargs):
        """
        sub dag method a
        """
        print('Subdag method a')
        print(kwargs['conversation_id'])
        print(kwargs['updated_time'])
        return 'a'

    def sub_dag_method_b(**kwargs):
        """
        sub dag method a
        """
        print('Subdag method b')
        print(kwargs['conversation_id'])
        print(kwargs['updated_time'])
        return 'b'

    with subdag:
    # operators
        tid_check = 'dummy_task_start_%s' % dag_name_prefix
        # print(tid_check)
        method_start = DummyOperator(task_id=tid_check, dag=subdag)
        # new tid
        tid_check = 'get_from_facebook_and_save_to_db_%s' % dag_name_prefix
        # print(tid_check)
        method_a = PythonOperator(task_id=tid_check, dag=subdag, python_callable=sub_dag_method_a,
                                op_kwargs={'conversation_id':conversation_info['id'], 
                                'updated_time':conversation_info['updated_time']})
        # new tid
        tid_check = 'save_to_es_fetch_from_db_%s' % dag_name_prefix
        # print(tid_check)
        method_b = PythonOperator(task_id=tid_check, dag=subdag, python_callable=sub_dag_method_b,
                                op_kwargs={'conversation_id':conversation_info['id'], 
                                    'updated_time': conversation_info['updated_time']})
        # new tid
        tid_check = 'dummy_task_end_%s' % dag_name_prefix
        # print(tid_check)
        method_end = DummyOperator(task_id=tid_check, dag=subdag)
        # dependencies
        method_start >> method_a
        method_a >> method_b
        method_b >> method_end
    # return subdag
    return subdag

sd = datetime.now()
main_dag = DAG('main_dag', default_args=deepcopy(main_default_args), schedule_interval=timedelta(hours=1),
start_date = sd)

# getting list of dictionaries
conversation_infos = GetConversationIdsPageWise().get_all()
print(conversation_infos)
print(len(conversation_infos))
for conversation_info in conversation_infos:
    print(conversation_info)
    i = conversation_info['id']
    subdag_name = 'main_dag'

    t_sub_dag = SubDagOperator(
            subdag=create_subdag(subdag_name, str(i), sd, timedelta(minutes=2), conversation_info),
            task_id=str(i),
            dag=main_dag
            )


Upvotes: 0

Related Questions