Reputation: 1011
I'm having a problem with using Python on Spark. My application has some dependencies, such as numpy, pandas, astropy, etc. I cannot use virtualenv to create an environment with all dependencies, since the nodes on the cluster do not have any common mountpoint or filesystem, besides HDFS. Therefore I am stuck with using spark-submit --py-files
. I package the contents of site-packages in a ZIP file and submit the job like with --py-files=dependencies.zip
option (as suggested in Easiest way to install Python dependencies on Spark executor nodes?). However, the nodes on cluster still do not seem to see the modules inside and they throw ImportError
such as this when importing numpy.
File "/path/anonymized/module.py", line 6, in <module>
import numpy
File "/tmp/pip-build-4fjFLQ/numpy/numpy/__init__.py", line 180, in <module>
File "/tmp/pip-build-4fjFLQ/numpy/numpy/add_newdocs.py", line 13, in <module>
File "/tmp/pip-build-4fjFLQ/numpy/numpy/lib/__init__.py", line 8, in <module>
#
File "/tmp/pip-build-4fjFLQ/numpy/numpy/lib/type_check.py", line 11, in <module>
File "/tmp/pip-build-4fjFLQ/numpy/numpy/core/__init__.py", line 14, in <module>
ImportError: cannot import name multiarray
When I switch to the virtualenv and use the local pyspark shell, everything works fine, so the dependencies are all there. Does anyone know, what might cause this problem and how to fix it?
Thanks!
Upvotes: 48
Views: 83705
Reputation: 1570
As Andrej Palicka expained in the comments,
"the problem lies in the fact, that Python cannot import .so modules from .zip files (docs.python.org/2/library/zipimport.html)".
A solution that I found is to add the non .py files one by one to py-files separated by comas:
spark-submit --py-files modules/toolbox.cpython-38-x86_64-linux-gnu.so,modules/product.cpython-38-x86_64-linux-gnu.so spark_fro
m_cython.py
Upvotes: 0
Reputation: 2909
To get this dependency distribution approach to work with compiled extensions we need to do two things:
Using the following script to create your dependencies zip will ensure that you are isolated from any packages already installed on your system. This assumes virtualenv is installed and requirements.txt
is present in your current directory, and outputs a dependencies.zip
with all your dependencies at the root level.
env_name=temp_env
# create the virtual env
virtualenv --python=$(which python3) --clear /tmp/${env_name}
# activate the virtual env
source /tmp/${env_name}/bin/activate
# download and install dependencies
pip install -r requirements.txt
# package the dependencies in dependencies.zip. the cd magic works around the fact that you can't specify a base dir to zip
(cd /tmp/${env_name}/lib/python*/site-packages/ && zip -r - *) > dependencies.zip
The dependencies can now be deployed, unzipped, and included in the PYTHONPATH as so
spark-submit \
--master yarn \
--deploy-mode cluster \
--conf 'spark.yarn.dist.archives=dependencies.zip#deps' \
--conf 'spark.yarn.appMasterEnv.PYTHONPATH=deps' \
--conf 'spark.executorEnv.PYTHONPATH=deps' \
.
.
.
spark.yarn.dist.archives=dependencies.zip#deps
distributes your zip file and unzips it to a directory called deps
spark.yarn.appMasterEnv.PYTHONPATH=deps
spark.executorEnv.PYTHONPATH=deps
includes the deps
directory in the PYTHONPATH for the master and all workers
--deploy-mode cluster
runs the master executor on the cluster so it picks up the dependencies
Upvotes: 12
Reputation: 56
Try to use --archives
to archive your anaconda dir to each server
and use --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=
to tell your spark server where is your python executor path in your anaconda dir.
Our full config is this:
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./ANACONDA/anaconda-dependencies/bin/python
--archives <S3-path>/anaconda-dependencies.zip#ANACONDA
Upvotes: 1
Reputation: 1445
Spark will also silently fail to load a zip archive that is created with the python zipfile
module. Zip archives must be created using a zip utility.
Upvotes: 0
Reputation: 805
First you need to pass your files through --py-files or --files
Now in your code, add those zip/files by using the following command
sc.addPyFile("your zip/file")
Now import your zip/file in your code with an alias like the following to start referencing it
import zip/file as your-alias
Note: You need not use file extension while importing, like .py at the end
Hope this is useful.
Upvotes: 21
Reputation: 3540
First off, I'll assume that your dependencies are listed in requirements.txt
. To package and zip the dependencies, run the following at the command line:
pip install -t dependencies -r requirements.txt
cd dependencies
zip -r ../dependencies.zip .
Above, the cd dependencies
command is crucial to ensure that the modules are the in the top level of the zip file. Thanks to Dan Corin's post for heads up.
Next, submit the job via:
spark-submit --py-files dependencies.zip spark_job.py
The --py-files
directive sends the zip file to the Spark workers but does not add it to the PYTHONPATH
(source of confusion for me). To add the dependencies to the PYTHONPATH
to fix the ImportError
, add the following line to the Spark job, spark_job.py
:
sc.addPyFile("dependencies.zip")
A caveat from this Cloudera post:
An assumption that anyone doing distributed computing with commodity hardware must assume is that the underlying hardware is potentially heterogeneous. A Python egg built on a client machine will be specific to the client’s CPU architecture because of the required C compilation. Distributing an egg for a complex, compiled package like NumPy, SciPy, or pandas is a brittle solution that is likely to fail on most clusters, at least eventually.
Although the solution above does not build an egg, the same guideline applies.
Upvotes: 96
Reputation: 11100
You can locate all the .pys you need and add them relatively. see here for this explanation:
import os, sys, inspect
# realpath() will make your script run, even if you symlink it :)
cmd_folder = os.path.realpath(os.path.abspath(os.path.split(inspect.getfile( inspect.currentframe() ))[0]))
if cmd_folder not in sys.path:
sys.path.insert(0, cmd_folder)
# use this if you want to include modules from a subfolder
cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],"subfolder")))
if cmd_subfolder not in sys.path:
sys.path.insert(0, cmd_subfolder)
# Info:
# cmd_folder = os.path.dirname(os.path.abspath(__file__)) # DO NOT USE __file__ !!!
# __file__ fails if script is called in different ways on Windows
# __file__ fails if someone does os.chdir() before
# sys.argv[0] also fails because it doesn't not always contains the path
Upvotes: 0