Bitswazsky
Bitswazsky

Reputation: 4698

Missing Python dependency when submitting pyspark job to EMR using Airflow

We're using a bootstrap script for installing python libraries on the EMR cluster nodes for our Spark jobs. The script looks something like this:

sudo python3 -m pip install pandas==0.22.0 scikit-learn==0.21.0

Once the cluster is up, we use Airflow's SparkSubmitHook to submit jobs to EMR. We use this configuration to bind pyspark to python3. Problem is, once in a while, when the job starts running, we get ModuleNotFoundError: No module named 'sklearn' error. One such stacktrace is like this one below:

return self.loads(obj)
 File "/mnt1/yarn/usercache/root/appcache/application_1565624418111_0001/container_1565624418111_0001_01_000033/pyspark.zip/pyspark/serializers.py", line 577, in loads
   return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'sklearn'

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:452)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:172)
    at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)

This issue is sporadic in nature, so out of 10 job submissions it might be happening 2-3 times. We're using EMR 5.23.0. I've tried upgrading to 5.26.0 as well, but same issue persists.

If I go to the cluster nodes, and check for that 'missing' package, I can see it's already installed. So, clearly it's not the issue with bootstrap script. That leaves me quite confused, because I've no clue whatsoever on what's going on here. I'd guess that it's binding to a different python version when the job gets triggered from Airflow, but that's just a shot in the dark. Any help is appreciated.

Upvotes: 0

Views: 3174

Answers (4)

Yong Wang
Yong Wang

Reputation: 1313

similar case may for reference. not sure if it work for EMR In hadoop case, the python environment and package should be installed under the user hadoop or spark.

if install python package in root or other user environment, similar case like you may happend.

So, try to install your package with same user name of hadoop or spark.

Update ===============================================

I used to install cloudear work bench which similar spark cloud environment. In that case, the distributed dependency also needed.

Here is the hyperlink https://www.cloudera.com/documentation/data-science-workbench/latest/topics/cdsw_pyspark.html#distributing_dependencies

The keys are:

    1. install dependencies package in all cloud nodes.
    1. set up the conda virtual environment
    1. set up the pyspark or pyspark3 path environment.
    1. deployment the yarn & spark configuration to gateway(the sparksubmit host, or airflow host).

Goodluck.

If you feel the answer helpful, pls vote up.

Upvotes: 2

Erik563
Erik563

Reputation: 1

Another solution, if you use LaunchClusterOperator in your DAG file, is to use the "cluster_overrides" property. Then you can just copy the configuration from this Amazon page. So the result would look like this (mentioning "Configurations" twice is done intentionally):

   LaunchClusterOperator(dag=yourdag, param2="something", cluster_overrides={
       "Configurations": [
         {
           "Classification": "spark-env",
           "Configurations": [
             {
              "Classification": "export",
              "Properties": {"PYSPARK_PYTHON": "/usr/bin/python3"} 
             }
            ]
         }
       ]
      }
     )

Upvotes: 0

Bitswazsky
Bitswazsky

Reputation: 4698

After a lot of trial and error, the following snippet worked out fine as a bootstrap script. The commented out part was also previously included in our scipt, and it caused issues. After removing that portion everything seems to be working fine.

sudo python3 -m pip install --upgrade pip==19.1.1 >> /tmp/out.log

wget https://download-ib01.fedoraproject.org/pub/epel/7/x86_64/Packages/s/spatialindex-1.8.5-1.el7.x86_64.rpm >> /tmp/out.log
sudo yum -y localinstall spatialindex-1.8.5-1.el7.x86_64.rpm >> /tmp/out.log

sudo python3 -m pip install python-dateutil==2.8.0 pandas==0.22.0 pyarrow==0.13.0 scikit-learn==0.21.0 geopy==1.19.0 Shapely==1.6.4.post2 geohash2==1.1 boto3==1.9.183 rtree==0.8.3 geopandas==0.5.0 >> /tmp/out.log

# python3 -m pip install --user python-dateutil==2.8.0 pandas==0.22.0 pyarrow==0.13.0 geopy==1.19.0 Shapely==1.6.4.post2 geohash2==1.1 boto3==1.9.183
# python3 -m pip install --user scikit-learn==0.21.0

One note here, when a job get's submitted through airflow it runs as root user. So probably that's why the --user installation doesn't work. Because this scipt gets executed as user hadoop on each EMR node.

Upvotes: 1

FraDel
FraDel

Reputation: 184

One way to resolve your problem could be by changing the way you summit your job to the cluster :

  • Package the code of the step to run (with its dependencies) on a a s3 bucket (using pipenv and pipfiles for example). The package would look like this :
<script_to_execute_package>.zip
|- <script_to_execute_main>.py
|-other step files.py
|- ...
|-scikit-learn
    |-scikit-learn files
    | ...
|-pandas
    |- pandas files
    |- ...
|-other packages
    |-other packages files
    |- ...


  • Instead of using the SparkSubmitHook use a EmrAddStepsOperator (+Sensor +CreateJobFlowOperator). Run the step with your packaged Python code. It would be something like this:
step_to_run = [
                {
                    'Name': 'your_step_name',
                    'ActionOnFailure': 'CONTINUE',
                    'HadoopJarStep': {
                        'Jar': 'command-runner.jar',
                        'Args': ["spark-submit", "--master", "yarn", "--deploy-mode", "client", "--py-files", "s3://<script_to_execute_package>.zip", "/tmp/driver.py", "<script_to_execute_main>.py", "", "--arg_to_pass_1", "arg1", "--arg_to_pass_2", "arg2", ...]
                    }
                }
]

some_task = EmrAddStepsOperator(
                task_id='some_task',
                job_flow_id='the_previously_created_job_flow_id',
                aws_conn_id=aws_conn_id,
                steps=extract_check_args_spark_step,
                dag=dag
            )

            some_task_check = EmrStepSensor(
                task_id='task_check_extract_check',
                job_flow_id='the_previously_created_job_flow_id',
                step_id="{{ task_instance.xcom_pull('some_task', key='return_value')[0] }}",
                aws_conn_id=aws_conn_id,
                poke_interval=10,
                dag=dag
            )

Upvotes: 2

Related Questions