mattc-7
mattc-7

Reputation: 442

How to set a Dynamic name for the job_flow_overrides in Airflow EmrCreateJobFlowOperator?

I am trying to set up an AWS EMR process in Airflow and I need the job_flow_overrides in the EmrCreateJobFlowOperator and the steps in the EmrAddStepsOperator to be set by separate JSON files located elsewhere.

I have tried numerous ways both of linking the JSON files directly and of setting and getting Airflow Variables for the JSON. If I were to use the Airflow Variables, they would also need to be dynamically named, which I am having trouble with. I am able to easily Variable.set a dynamic name using a PythonOperator, but cannot Variable.get a dynamic name in the job_flow_overrides or steps because of Airflow's limitations of writing Python code outside of a PythonOperator.

The Airflow Variables have already been set earlier in the code, the following is my code trying to use the JSON data and set up the cluster

def get_global_json_contents():
    return json.dumps(requests.get("PATH/TO/JSON/FILE").json())

# Use the 'Name' Key in this JSON as a specific identifier for the Variables created by this job
def get_global_json_name():
    return json.loads(get_global_json_contents())['Name']

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    job_flow_overrides=json.loads(Variable.get("CLUSTER_SETUP-"+get_global_json_name())),
    dag=dag
)

add_steps = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=json.loads(Variable.get("RUN_STEPS-"+get_global_json_name())),
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

Does anyone know how I can get this process to work?

Does anyone have an idea how to get around the limitations of not being able to use Python functions in Airflow outside of PythonOperator and python_callable?

Might it be possible to solve this by defining functions in a separate Python file located elsewhere and importing it to Airflow? And if so, how would I go about doing that?

Upvotes: 2

Views: 2333

Answers (1)

y2k-shubham
y2k-shubham

Reputation: 11607

I haven't completely understood the problem here, but from the bits that I've gathered, I can give some ideas


A. If the dynamism refers to frequently (once every few days) changing file-names

Store the name of file in an Airflow Variable (you seem to have already figured this out)

  • refer the name of file from Airflow Variable (rather than hard-coding in dag-definition file)
  • and when need arises, you can update the file-name (Variable) via WebUI itself

B. If dynamism means each task / dag-run must process a different file, then here are the options

  1. External system that is generating these JSON files can also update the above-mentioned Airflow Variable so that your tasks can pick the correct file-name. I personally don't like this approach since external system has to become Airflow-aware and also it appears to be error-prone

  2. If possible, the external system and Airflow tasks should adhere to some file-naming convention, so that we don't even need to communicate the exact filename to task (task already knows how to determine the filename). For instance, you can have your file named as {execution_date}-{dag_id}-{env}-{task_id}.json. This approach is relatively good, but only as long as a templatized file-name fits into your system (which might not be the case)

  3. The upstream task that gets hold of the exact filename, can convey this information to downstream task via XCOM. This approach is most robust of all


As for your query

Does anyone have an idea how to get around the limitations of not being able to use Python functions in Airflow outside of PythonOperator and python_callable?

I think solving this takes just some getting-used-to (and a bit of creativity) with Airflow framework. You can always subclass any Airflow Operator (including EmrCreateJobFlowOperator), and then override the pre_execute(), execute(), & post_execute() methods and add your custom logic there


Suggested reads


EDIT-1

responding to queries raised over comments

I could not get the job_flow_overrides to take an XCOM as input. Do you know how to remedy this?

Knowing that job_flow_overrides is a template field of EmrCreateJobFlowOperator (the developers of Airflow had your use-case in mind), you can employ JINJA templating to pull it via XCOM

import json
my_dag = DAG(dag_id="my_dag",
             ..
             user_defined_macros={
                 'json': json
             }
             ..
             )
create_emr_task = EmrCreateJobFlowOperator(dag_id="my_dag",
                                           task_id="create_emr_task",
                                           ..
                                           job_flow_overrides="{{ json.loads(ti.xcom_pull(task_ids='my_xcom_pusher_task')) }}"
                                           ..)

do you have any examples for using/ overwriting the execute() methods?

Well this now is a pure python question. If you have ever done object-oriented programming, particularly inheritance, you will feel home at this.

  • This involves just subclassing the BaseOperator (or any subclass of it) and defining your custom functionality in execute() function
  • For instance, the PythonOperator also extends BaseOperator

References

Upvotes: 4

Related Questions