gaurav agnihotri
gaurav agnihotri

Reputation: 387

EMR Cluster Creation using Airflow dag run, Once task is done EMR will be terminated

I have Airflow jobs, which are running fine on the EMR cluster. what I need is, let's say if I have a 4 airflow jobs which required an EMR cluster for let's say 20 min to complete the task. why not we can create an EMR cluster at DAG run time and once the job is to finish it will terminate the created an EMR cluster.

Upvotes: 6

Views: 11812

Answers (4)

Rishav Jain
Rishav Jain

Reputation: 31

The best way to do is as below

create EMR cluster >> run spark application >> wait to complete spark application >> terminate EMR cluster

import time    
from airflow.operators.python import PythonOperator     
from datetime import timedelta    
from airflow import DAG    
from airflow.providers.amazon.aws.operators.emr_add_steps import EmrAddStepsOperator    
from airflow.providers.amazon.aws.operators.emr_create_job_flow import EmrCreateJobFlowOperator    
from airflow.providers.amazon.aws.operators.emr_terminate_job_flow import EmrTerminateJobFlowOperator     
from airflow.providers.amazon.aws.sensors.emr_step import EmrStepSensor 


# Spark-submit command for application

SPARK_APP = [
    {
        'Name': 'spark_app1',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [ 
            'spark-submit',
                '--deploy-mode',
                'cluster',
                '--master',
                'yarn',
                 '--class',
                    'package_path_to_main',
                 'location_of_jar',
                 args],
        },
    }
] 


# EMR cluster configurations

JOB_FLOW_OVERRIDES = {    
    'Name': 'emr_cluster_name',    
    'ReleaseLabel': 'emr-6.4.0',    
    'Applications': [{"Name": "Spark"}],
    'LogUri': 's3_path_for_log',    
    'Instances': {    
        'InstanceGroups': [    
            {    
                'Name': 'Master node',    
                'Market': 'ON_DEMAND',    
                'InstanceRole': 'MASTER',    
                'InstanceType': 'r5.8xlarge',    
                'InstanceCount': 1    
            },    
            {    
                    'Name': "Slave nodes",    
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',    
                    'InstanceType': 'r5.8xlarge',    
                    'InstanceCount': 32    
            }    
        ],    
        'Ec2SubnetId': 'subnet-id',    
        'Ec2KeyName': 'KeyPair',    
        'KeepJobFlowAliveWhenNoSteps': True,    
        'TerminationProtected': False,
        "AdditionalMasterSecurityGroups": [ "security-group" ]
    }, 
    'JobFlowRole': 'EMR_EC2_DefaultRole',  
    'SecurityConfiguration': "SecurityConfig_name",  
    'ServiceRole': 'EMR_DefaultRole',
    'StepConcurrencyLevel': 10,    
}    

# Airflow Dag defination

with DAG(    
    dag_id='dag_name',    
    default_args={    
        'owner': 'airflow',    
        'depends_on_past': False,    
        'email': ['email-address'],    
        'email_on_failure': True,    
        'email_on_retry': False,    
    },    
    dagrun_timeout=timedelta(hours=4),    
    start_date=days_ago(1),
    schedule_interval='0 * * * *',
    catchup=False,
    tags=['example'],
) as dag:

# EMR cluster creator 

    cluster_creator = EmrCreateJobFlowOperator(
        task_id='cluster_creator',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )

# Adding step adder to run spark application

    step_adder_1 = EmrAddStepsOperator(
        task_id='step_adder_1',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='cluster_creator', key='return_value')}}",
        aws_conn_id='aws_default',
        steps=SPARK_APP,
        trigger_rule='all_done',
    )

# Adding step sensor to track the completion of step adder

    step_checker_1 = EmrStepSensor(
        task_id='step_checker_1',
        job_flow_id="{{ task_instance.xcom_pull('cluster_creator', key='return_value') }}",
        step_id="{{ task_instance.xcom_pull(task_ids='step_adder_1', key='return_value')[0] }}",
       aws_conn_id='aws_default',
       trigger_rule='all_done',
    )

# Terminating EMR cluster if all task are completed which are running on top of cluster_remover task
    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='cluster_remover',
        job_flow_id="{{ task_instance.xcom_pull('cluster_creator', key='return_value') }}",
        aws_conn_id='aws_default',
        trigger_rule='all_done',
    )

# defining the order of task

cluster_creator >> step_adder_1 >> step_checker_1 >> cluster_remover

Upvotes: 2

vijay
vijay

Reputation: 59

Check my implementation, DAG will create emr cluster and run the spark job against the data in s3 and terminate automatically once done.

https://beyondexperiment.com/vijayravichandran06/aws-emr-orchestrate-with-airflow/

Upvotes: 1

y2k-shubham
y2k-shubham

Reputation: 11607

Absolutely, that would be the most efficient use of resources. Let me warn you: there are a lot of details in this; I'll try to list as many as would get you going. I encourage you to add your own comprehensive answer listing any problems that you encountered and the workaround (once you are through this)


Regarding cluster creation / termination


Regarding job submission


Upvotes: 11

Max Gasner
Max Gasner

Reputation: 1256

The best way to do this is probably to have a node at the root of your Airflow DAG that creates the EMR cluster, and then another node at the very end of the DAG that spins the cluster down after all of the other nodes have completed.

Upvotes: 1

Related Questions