Reputation: 1602
I'm trying to write some pyspark jobs that depend on a module that I want to include with the job, rather than installing on the clusters globally.
I decided to try doing this with zip files, but I can't seem to get it to work and I can't seem to find an example of doing this in the wild either.
I'm building the zip by running:
mkdir -p ./build
cd ./build && python ../src/setup.py sdist --formats=zip
This creates a file called ./build/dist/mysparklib-0.1.zip
. So far, so good.
My job looks like this:
from pyspark import SparkContext
# See: http://spark.apache.org/docs/latest/quick-start.html
readme_filename = './README.md'
sc = SparkContext('local', 'helloworld app')
readme_data = sc.textFile(readme_filename).cache()
def test_a_filter(s):
import mysparklib
return 'a' in s
a_s = readme_data.filter(test_a_filter).count()
b_s = readme_data.filter(lambda s: 'b' in s).count()
print("""
**************************************
* Lines with a: {}; Lines with b: {} *
**************************************
""".format(a_s, b_s))
sc.stop()
(this is mostly adopted from the quickstart, with the exception of me trying to import my module inside one of the filters.)
I'm kicking off the job by running:
spark-submit --master local[4] --py-files './build/dist/mysparklib-0.1.zip' ./jobs/helloworld.py
and while I see that the zip file is included:
17/05/17 17:15:31 INFO SparkContext: Added file file:/Users/myuser/dev/mycompany/myproject/./build/dist/mysparklib-0.1.zip at file:/Users/myuser/dev/mycompany/myproject/./build/dist/mysparklib-0.1.zip with timestamp 1495055731604
it won't import:
17/05/17 17:15:34 INFO DAGScheduler: ResultStage 0 (count at /Users/myuser/dev/mycompany/myproject/./jobs/helloworld.py:15) failed in 1.162 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/Users/myuser/dev/mycompany/myproject/spark/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/Users/myuser/dev/mycompany/myproject/spark/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/Users/myuser/dev/mycompany/myproject/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2408, in pipeline_func
File "/Users/myuser/dev/mycompany/myproject/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2408, in pipeline_func
File "/Users/myuser/dev/mycompany/myproject/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2408, in pipeline_func
File "/Users/myuser/dev/mycompany/myproject/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 345, in func
File "/Users/myuser/dev/mycompany/myproject/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1040, in <lambda>
File "/Users/myuser/dev/mycompany/myproject/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1040, in <genexpr>
File "/Users/myuser/dev/mycompany/myproject/./jobs/helloworld.py", line 12, in test_a_filter
import mysparklib
ModuleNotFoundError: No module named 'mysparklib'
at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
For a sanity check, I ran python setup.py develop
inside mysparklib and tried importing it on the cli, and that works swimmingly.
Any ideas?
Upvotes: 0
Views: 3445
Reputation: 1602
So I got this working! The core problem is that the directory structure of a sdist is not the structure that python is expecting when a zip is added to the module path (which is how --py-files
works; you can confirm this by printing sys.path
). In particular, the sdist zip has the file ./mysparklib-0.1/mysparklib/__init__.py
, but we need a zip with the file ./mysparklib/__init__.py
instead.
So instead of running
cd ./build && python ../src/setup.py sdist --formats=zip
I'm now running
cd ./src && zip ../dist/mysparklib.zip -r ./mysparklib
and that works.
Upvotes: 2