Reputation: 125
Below is a simple replication of a DAG i have created. The DAG has a branch operator to select an execution flow which merge into a common task. The task is supposed to generate a file list which will be used to create a task for each entry in the list file. Problem is i am not able to get the dynamic tasks to execute.
"""
Required packages to execute DAG
"""
from __future__ import print_function
from builtins import range
import airflow
from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
import os
import sys
# DAG parameters
args = {
'owner': 'AD',
'depends_on_past': False,
'start_date': datetime(2018, 5, 30),
'end_date': datetime(9999, 12, 31),
'dagrun_timeout': None,
'timeout': None,
'execution_timeout': None,
'provide_context': True,
}
# create DAG object with Name and default_args (args can set in DAG definition or while execution/runtime)
dag = DAG('sodag', schedule_interval=None, default_args=args)
# Define task - below are examples of tasks created by instantiated by PythonOperator- calling methods written in other py clas
start = DummyOperator(task_id='start', dag=dag)
dummyjoin = DummyOperator(task_id='dummyjoin', dag=dag, trigger_rule=TriggerRule.ONE_SUCCESS)
multidummy = DummyOperator(task_id='multidummy', dag=dag)
def identify_pre_process(**context):
return 'task1'
def xcl_preq(filename, **kwargs):
return BashOperator(
task_id="so_dag{}".format(filename),
trigger_rule=TriggerRule.ONE_SUCCESS,
provide_context=True,
bash_command='echo "executing branch tasks"',
dag=dag)
with dag:
router = BranchPythonOperator(task_id='trigger_pre_process',
python_callable=identify_pre_process,
dag=dag)
task1 = BashOperator(
task_id="task1",
bash_command='echo "executing task1"',
execution_timeout=None,
dag=dag)
task2 = BashOperator(
task_id="task2",
bash_command='echo "executing task2"',
execution_timeout=None,
dag=dag)
with open('/root/filelist.txt', 'r') as fp:
for file in fp:
filename = os.path.basename(file)
dummyjoin >> xcl_preq(filename) >> multidummy
start >> router
router >> task1 >> dummyjoin
router >> task2 >> dummyjoin
Upvotes: 2
Views: 8016
Reputation: 2364
Here what's causing the issue is not the fact that the tasks are generated dynamically, but it's something trickier. Your DAG works very well, except for the following subtle thing: in the line
filename = os.path.basename(file)
the variable filename
will contain the newline special character \n
. In your example, filename
would take the values file\n
, file1\n
, file2\n
. This causes those tasks to not run since special characters are apparently not allowed as values for task_id (I agree it's weird that no errors is raised at compilation time of the DAG). You don't see through the Graph View of the DAG in the UI, because the newline characters are not rendered there, but if you click on the Details of the DAG, the issue becomes visible.
A simple fix is to strip out the newline characters from the lines after reading from the file, that is,
filename = os.path.basename(file.rstrip())
Success!
Upvotes: 4