LinebakeR
LinebakeR

Reputation: 222

Airflow run tasks in parallel

I'm confused how it's working airflow to run 2 tasks in parallel.

This is my Dag:

import datetime as dt
from airflow import DAG
import os
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.dagrun_operator import TriggerDagRunOperator

scriptAirflow = '/home/alexw/scriptAirflow/'
uploadPath='/apps/man-data/data/to_load/'
receiptPath= '/apps/man-data/data/to_receipt/'

def result():
    if(os.listdir(receiptPath)):
        for files in os.listdir(receiptPath):
            if files.startswith('MEM') and files.endswith('.csv'):
                return 'mem_script'
                pass
                print('Launching script for: '+files)
            elif files.startswith('FMS') and files.endswith('.csv'):
                return 'fms_script'
                pass
            else:
                pass   
    else:
        print('No script to launch')
        return "no_script"
        pass

def onlyCsvFiles():
    if(os.listdir(uploadPath)):
        for files in os.listdir(uploadPath):
            if files.startswith('MEM') or files.startswith('FMS') and files.endswith('.csv'):
                return 'move_good_file'
            else:
                return 'move_bad_file'
    else:
        pass

default_args = {
    'owner': 'testingA',
    'start_date': dt.datetime(2020, 2, 17),
    'retries': 1,
}

dag = DAG('tryingAirflow', default_args=default_args, description='airflow20',
          schedule_interval=None, catchup=False)

file_sensor = FileSensor(
    task_id="file_sensor",
    filepath=uploadPath,
    fs_conn_id='airflow_db',
    poke_interval=10,
    dag=dag,
)

onlyCsvFiles=BranchPythonOperator(
    task_id='only_csv_files',
    python_callable=onlyCsvFiles,
    trigger_rule='none_failed',
    dag=dag,)

move_good_file = BashOperator(
    task_id="move_good_file",
    bash_command='python3 '+scriptAirflow+'movingGoodFiles.py "{{ execution_date }}"',
    dag=dag,
)
move_bad_file = BashOperator(
    task_id="move_bad_file",
    bash_command='python3 '+scriptAirflow+'movingBadFiles.py "{{ execution_date }}"',
    dag=dag,
)
result_mv = BranchPythonOperator(
    task_id='result_mv',
    python_callable=result,
    trigger_rule='none_failed',
    dag=dag,
)
run_Mem_Script = BashOperator(
    task_id="mem_script",
    bash_command='python3 '+scriptAirflow+'memShScript.py "{{ execution_date }}"',
    dag=dag,
)
run_Fms_Script = BashOperator(
    task_id="fms_script",
    bash_command='python3 '+scriptAirflow+'fmsScript.py "{{ execution_date }}"',
    dag=dag,
)
skip_script= BashOperator(
    task_id="no_script",
    bash_command="echo No script to launch",
    dag=dag,
)

rerun_dag=TriggerDagRunOperator(
    task_id='rerun_dag',
    trigger_dag_id='tryingAirflow',
    trigger_rule='none_failed',
    dag=dag,
)

onlyCsvFiles.set_upstream(file_sensor)
onlyCsvFiles.set_upstream(file_sensor)
move_good_file.set_upstream(onlyCsvFiles)
move_bad_file.set_upstream(onlyCsvFiles)
result_mv.set_upstream(move_good_file)
result_mv.set_upstream(move_bad_file)
run_Fms_Script.set_upstream(result_mv)
run_Mem_Script.set_upstream(result_mv)
skip_script.set_upstream(result_mv)
rerun_dag.set_upstream(run_Fms_Script)
rerun_dag.set_upstream(run_Mem_Script)
rerun_dag.set_upstream(skip_script)

When it come to choose the task in result, and if i have to call both it only execute one task and skip the other one.

I'd like to execute both task in same time when it's necessary. For my airflow.cfg. Question is: How to run task in parallel (or not if not necessary) with using BranchPythonOperator.

enter image description here thx for help !

Upvotes: 6

Views: 6579

Answers (1)

UJIN
UJIN

Reputation: 1758

If you wanted to surely run either both scripts or none I would add a dummy task before the two tasks that need to run in parallel. Airflow will always choose one branch to execute when you use the BranchPythonOperator.

I would make these changes:

# import the DummyOperator
from airflow.operators.dummy_operator import DummyOperator

# modify the returns of the function result()
def result():
    if(os.listdir(receiptPath)):
        for files in os.listdir(receiptPath):
            if (files.startswith('MEM') and files.endswith('.csv') or 
                files.startswith('FMS') and files.endswith('.csv')):
                return 'run_scripts'
    else:
        print('No script to launch')
        return "no_script"

# add the dummy task
run_scripts = DummyOperator(
    task_id="run_scripts",
    dag=dag
)

# add dependency
run_scripts.set_upstream(result_mv)

# CHANGE two of the dependencies to
run_Fms_Script.set_upstream(run_scripts)
run_Mem_Script.set_upstream(run_scripts)

I have to admit I never worked with LocalExecutor working on parallel tasks, but this should make sure you run both tasks in case you want to run the scripts.

EDIT:

If you want to run either none, one of the two, or both I think the easiest way is to create another task that runs both scripts in parallel in bash (or at least it runs them together with &). I would do something like this:

# import the DummyOperator
from airflow.operators.dummy_operator import DummyOperator

# modify the returns of the function result() so that it chooses between 4 different outcomes
def result():
    if(os.listdir(receiptPath)):
        mem_flag = False
        fms_flag = False
        for files in os.listdir(receiptPath):
            if (files.startswith('MEM') and files.endswith('.csv')):
                mem_flag = True
            if (files.startswith('FMS') and files.endswith('.csv')):
                fms_flag = True
        if mem_flag and fms_flag:
            return "both_scripts"
        elif mem_flag:
            return "mem_script"
        elif fms_flag:
            return "fms_script"
        else:
            return "no_script"
    else:
        print('No script to launch')
        return "no_script"

# add the 'run both scripts' task
run_both_scripts = BashOperator(
    task_id="both_script",
    bash_command='python3 '+scriptAirflow+'memShScript.py "{{ execution_date }}" & python3 '+scriptAirflow+'fmsScript.py "{{ execution_date }}" &',
    dag=dag,
)

# add dependency
run_both_scripts.set_upstream(result_mv)   

Upvotes: 2

Related Questions