Reputation: 43
I am running a program involving spark parallelization multiple times. The program runs ok for the very first few iterations but crashes due to memory issue.I am using Spark 2.2.0 with Python 2.7 and I am running my tests on AWS EC2 with 30g of memory.
Below is My Spark Setting:
conf = pyspark.SparkConf()
conf.set("spark.executor.memory", '4g')
conf.set('spark.executor.cores', '16')
conf.set('spark.cores.max', '16')
conf.set("spark.driver.memory",'4g')
conf.setMaster("local[*]")
and here is my error log:
Traceback (most recent call last):
File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line
1982,
in wsgi_app
response = self.full_dispatch_request()
File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line
1614,
in full_dispatch_request
rv = self.handle_user_exception(e)
File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line
1517,
in handle_user_exception
reraise(exc_type, exc_value, tb)
File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line
1612,
in full_dispatch_request
rv = self.dispatch_request()
File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line
1598,
in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File
"C:/Users/Administrator/Desktop/Flex_Api_Post/
flex_api_post_func_spark_setup.py", line 152, in travel_time_est
count = ssc.parallelize(input_json).map(lambda j:
flex_func(j)).collect()
File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line
809,
in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "C:\ProgramData\Anaconda2\lib\site-packages\py4j\java_gateway.py",
line
1160, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "C:\ProgramData\Anaconda2\lib\site-packages\py4j\protocol.py", line
320, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to
stage failure: Task 7
in stage 13.0 failed 1 times, most recent failure:
Lost task 7.0 in stage
13.0 (TID 215, localhost, executor driver):
org.apache.spark.api.python.PythonException:
Traceback (most recent call
last):
File "C:\opt\spark\spark-2.2.0-bin-
hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 166, in main
File "C:\opt\spark\spark-2.2.0-bin-
hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 57, in
read_command
File "C:\opt\spark\spark-2.2.0-bin-
hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
line 454, in loads
return pickle.loads(obj)
MemoryError
at
org.apache.spark.api.python.PythonRunner$$anon$1.
read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>
(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler
$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
at
org.apache.spark.scheduler.DAGScheduler
$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
at org.apache.spark.scheduler.
DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
at scala.collection.
mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.
DAGScheduler.abortStage(DAGScheduler.scala:1486)
at org.apache.spark.scheduler.
DAGScheduler$$anonfun$handleTaskSetFailed$1.apply
(DAGScheduler.scala:814)
at org.apache.spark.scheduler.
DAGScheduler$$anonfun$handleTaskSetFailed$1.apply
(DAGScheduler.scala:814)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.
DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
at org.apache.spark.scheduler.
DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
at org.apache.spark.scheduler.
DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
at org.apache.spark.scheduler.
DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.
runJob(DAGScheduler.scala:630)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
at org.apache.spark.rdd.RDDOperationScope$.
withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.
withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
at org.apache.spark.api.python.PythonRDD$.
collectAndServe(PythonRDD.scala:458)
at org.apache.spark.api.python.PythonRDD.
collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.api.python.PythonException:
Traceback (most recent call last):
File "C:\opt\spark\spark-2.2.0-bin-
hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 166, in main
File "C:\opt\spark\spark-2.2.0-bin-
hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
line 57, in read_command
File "C:\opt\spark\spark-2.2.0-bin-
hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
line 454, in loads
return pickle.loads(obj)
MemoryError
at org.apache.spark.api.python.
PythonRunner$$anon$1.read(PythonRDD.scala:193)
at org.apache.spark.api.python.PythonRunner$$anon$1.<init>
(PythonRDD.scala:234)
at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
1 more
Upvotes: 2
Views: 3088
Reputation: 7742
Let's explain a little bit how PySpark works.
Using pyspark with 16 cores for each worker, you are requesting Spark to start in parallel 16 instances of Python for each JVM worker. You can see in the image below:
So according to I can check here about your configuration, you are requesting a worker with 4Gb each, and each one will run with 16 cores. So this will create a structure with 1 JVM that will create 16 pipes, and 16 python instances that will run in parallel. This error that you are facing is about not enough memory for the Python to run.
Maybe you need to reduce the number of the cores per worker and it can handle the process, or you can add more memory.
For more details check here.
Upvotes: 2