Mani
Mani

Reputation: 85

I have an existing EMR cluster in AWS. I want to run a dag from airflow to an aws existing cluster

I have an airflow machine which has apache-airflow==1.10.5 version. I know how to run a dag which automatically creates a cluster and runs the step and terminate the cluster. Using connections in airflow UI I am able to achieve this. But to run a dag on existing aws emr cluster i am unable to know which parameters i need to pass in the connections.

AIRFLOW UI --> Admin --> Connections --> Created Conn ID (EMR Default1), conn type Elastic Map reduce.

[2019-10-14 12:12:40,919] {taskinstance.py:1051} ERROR - Parameter validation failed:
Missing required parameter in input: "Instances"
Traceback (most recent call last):
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/contrib/operators/emr_create_job_flow_operator.py", line 68, in execute
    response = emr.create_job_flow(self.job_flow_overrides)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/contrib/hooks/emr_hook.py", line 55, in create_job_flow
    response = self.get_conn().run_job_flow(**config)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 314, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 586, in _make_api_call
    api_params, operation_model, context=request_context)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 621, in _convert_to_request_dict
    api_params, operation_model)
  File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/validate.py", line 291, in serialize_to_request
    raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Missing required parameter in input: "Instances"
[2019-10-14 12:12:40,920] {taskinstance.py:1082} INFO - Marking task as FAILED.

Upvotes: 1

Views: 1609

Answers (1)

Bitswazsky
Bitswazsky

Reputation: 4698

In the first case, instead of dynamically creating/terminating clusters using UI, you can also achieve it by extending the SparkSubmitOperator operator. After launching the EMR cluster you can copy the *.xml (e.g. core-site.xml) files from EMR master into some location on the airflow node and then point to those files in your spark-submit task in airflow. At least we're doing it that day in our product. To extend that logically, if you're planning to reuse an existing cluster, all you need is to know where these *.xml files are already stored. Then, the rest will be the same. You need to only refer to those files when triggering the task.

More details

I don't know of any such doc, so I can only suggest you to explore the following, which is based on the knowledge that I have gathered:

  1. We need to write a custom plugin for spark-submit. As part of this custom-plugin module let's define a CustomSparkSubmitOperator class. It needs to extend the BaseOperator. You can find plenty of articles on writing custom plugins in airflow. This might be a good place to start. Here, you can see more details of the BaseOperator.

  2. In BaseOperator, you'll find a method called pre_execute. It's a viable option to perform following actions inside this method:

    a. Wait until your cluster is up. You can easily do that using boto3, if you pass the cluster-id.

    b. Once the cluster is up, get the ip of the EMR master node, and copy stuffs matching /etc/hadoop/conf/*-site.xml to your airflow node. It's doable via subprocess call in python.

  3. Once you've got the xml file, in execute method, just use the SparkSubmitHook to submit your spark-job. You need to make sure the spark binaries on your airflow node are using this path for spark-submit.

  4. You can clean up the cluster in the post_execute method in case that's required.

Upvotes: 1

Related Questions