Reputation: 85
I have an airflow machine which has apache-airflow==1.10.5 version. I know how to run a dag which automatically creates a cluster and runs the step and terminate the cluster. Using connections in airflow UI I am able to achieve this. But to run a dag on existing aws emr cluster i am unable to know which parameters i need to pass in the connections.
AIRFLOW UI --> Admin --> Connections --> Created Conn ID (EMR Default1), conn type Elastic Map reduce.
[2019-10-14 12:12:40,919] {taskinstance.py:1051} ERROR - Parameter validation failed:
Missing required parameter in input: "Instances"
Traceback (most recent call last):
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
result = task_copy.execute(context=context)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/contrib/operators/emr_create_job_flow_operator.py", line 68, in execute
response = emr.create_job_flow(self.job_flow_overrides)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/airflow/contrib/hooks/emr_hook.py", line 55, in create_job_flow
response = self.get_conn().run_job_flow(**config)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 314, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 586, in _make_api_call
api_params, operation_model, context=request_context)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/client.py", line 621, in _convert_to_request_dict
api_params, operation_model)
File "/root/anaconda3/envs/airflow/lib/python3.6/site-packages/botocore/validate.py", line 291, in serialize_to_request
raise ParamValidationError(report=report.generate_report())
botocore.exceptions.ParamValidationError: Parameter validation failed:
Missing required parameter in input: "Instances"
[2019-10-14 12:12:40,920] {taskinstance.py:1082} INFO - Marking task as FAILED.
Upvotes: 1
Views: 1609
Reputation: 4698
In the first case, instead of dynamically creating/terminating clusters using UI, you can also achieve it by extending the SparkSubmitOperator
operator. After launching the EMR cluster you can copy the *.xml (e.g. core-site.xml) files from EMR master into some location on the airflow node and then point to those files in your spark-submit task in airflow. At least we're doing it that day in our product. To extend that logically, if you're planning to reuse an existing cluster, all you need is to know where these *.xml files are already stored. Then, the rest will be the same. You need to only refer to those files when triggering the task.
More details
I don't know of any such doc, so I can only suggest you to explore the following, which is based on the knowledge that I have gathered:
We need to write a custom plugin for spark-submit. As part of this custom-plugin module let's define a CustomSparkSubmitOperator
class. It needs to extend the BaseOperator
. You can find plenty of articles on writing custom plugins in airflow. This might be a good place to start. Here, you can see more details of the BaseOperator
.
In BaseOperator
, you'll find a method called pre_execute
. It's a viable option to perform following actions inside this method:
a. Wait until your cluster is up. You can easily do that using boto3, if you pass the cluster-id.
b. Once the cluster is up, get the ip of the EMR master node, and copy stuffs matching /etc/hadoop/conf/*-site.xml
to your airflow node. It's doable via subprocess call in python.
Once you've got the xml file, in execute
method, just use the SparkSubmitHook
to submit your spark-job. You need to make sure the spark binaries on your airflow node are using this path for spark-submit.
You can clean up the cluster in the post_execute
method in case that's required.
Upvotes: 1