Reputation: 1725
I am trying to package up python dependencies to send along to a hadoop cluster with spark-submit
and I would like to do this in the DRYest way possible.
I want my my_spark_app.py
to look like this:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName('MyApp').setMaster('yarn-client')
sc = SparkContext(conf=conf)
sc.addPyFile('/path/to/dependencies.py')
from dependencies import DependencyManager
dm = DependencyManager(sc)
dm.register_lib('dateutil')
import dateutil
# do stuff with dateutil
And then dependencies.py
is this:
import zipfile, os
LIBPATH = '/path/to/my/python/env/lib/python2.7/site-packages/'
class DependencyManager(object):
"""
Collects dependencies to be zipped and sent to the spark context
"""
def __init__(self, spark_context):
self.sc = spark_context
def register_lib(self, p):
libpath = os.path.join(LIBPATH, p)
zippath = libpath + '.zip'
zf = zipfile.PyZipFile(zippath, mode='w')
try:
zf.debug = 3
zf.writepy(libpath)
self.sc.addPyFile(zippath)
finally:
zf.close()
This produces this (because of zf.debug = 3
):
Adding package in /path/to/env/lib/python2.7/site-packages/dateutil as dateutil
Adding dateutil/__init__.pyc
Adding dateutil/rrule.pyc
Adding dateutil/relativedelta.pyc
Adding package in /path/to/env/lib/python2.7/site-packages/dateutil/zoneinfo as dateutil/zoneinfo
Adding dateutil/zoneinfo/__init__.pyc
Adding dateutil/zoneinfo/rebuild.pyc
Adding dateutil/parser.pyc
Adding dateutil/tzwin.pyc
Adding dateutil/easter.pyc
Adding package in /path/to/env/lib/python2.7/site-packages/dateutil/tz as dateutil/tz
Adding dateutil/tz/__init__.pyc
Adding dateutil/tz/tz.pyc
Adding dateutil/tz/win.pyc
Adding dateutil/tz/_common.pyc
Traceback (most recent call last):
File "/path/to/my_spark_app.py", line 25
import dateutil
ImportError: No module named dateutil
Somehow, calling self.sc.addPyFile()
from within a the DependencyManager
class does not affect the SparkContext, even though it works fine directly in my_spark_app.py
.
What is going on here?
Upvotes: 1
Views: 332
Reputation: 1725
The issue is simple and has little to do with spark. In here:
def register_lib(self, p):
libpath = os.path.join(LIBPATH, p)
zippath = libpath + '.zip'
zf = zipfile.PyZipFile(zippath, mode='w')
try:
zf.debug = 3
zf.writepy(libpath)
self.sc.addPyFile(zippath)
finally:
zf.close()
When self.sc.addPyFile(zippath)
is called the zf
io is still open. We just need to close it out before calling:
def register_lib(self, p):
libpath = os.path.join(LIBPATH, p)
zippath = libpath + '.zip'
zf = zipfile.PyZipFile(zippath, mode='w')
try:
zf.debug = 3
zf.writepy(libpath)
zf.close() # file is now ready to add to the spark context
self.sc.addPyFile(zippath)
finally:
zf.close()
Upvotes: 1