mshahabi
mshahabi

Reputation: 43

Pyspark Memory Issue

I am running a program involving spark parallelization multiple times. The program runs ok for the very first few iterations but crashes due to memory issue.I am using Spark 2.2.0 with Python 2.7 and I am running my tests on AWS EC2 with 30g of memory.

Below is My Spark Setting:

conf = pyspark.SparkConf()
conf.set("spark.executor.memory", '4g')
conf.set('spark.executor.cores', '16')
conf.set('spark.cores.max', '16')
conf.set("spark.driver.memory",'4g')
conf.setMaster("local[*]") 

and here is my error log:

    Traceback (most recent call last):
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1982, 
    in wsgi_app
    response = self.full_dispatch_request()
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1614, 
    in full_dispatch_request
    rv = self.handle_user_exception(e)
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1517, 
    in handle_user_exception
    reraise(exc_type, exc_value, tb)
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1612, 
    in full_dispatch_request
    rv = self.dispatch_request()
    File "C:\ProgramData\Anaconda2\lib\site-packages\flask\app.py", line 
    1598, 
    in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
    File 
    "C:/Users/Administrator/Desktop/Flex_Api_Post/
    flex_api_post_func_spark_setup.py", line 152, in travel_time_est
    count = ssc.parallelize(input_json).map(lambda j: 
    flex_func(j)).collect()
    File "C:\ProgramData\Anaconda2\lib\site-packages\pyspark\rdd.py", line 
    809, 
    in collect
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    File "C:\ProgramData\Anaconda2\lib\site-packages\py4j\java_gateway.py", 
    line 
    1160, in __call__
    answer, self.gateway_client, self.target_id, self.name)
    File "C:\ProgramData\Anaconda2\lib\site-packages\py4j\protocol.py", line 
    320, in get_return_value
    format(target_id, ".", name), value)
    Py4JJavaError: An error occurred while calling 
    z:org.apache.spark.api.python.PythonRDD.collectAndServe.
    : org.apache.spark.SparkException: Job aborted due to 
    stage failure: Task 7 
    in stage 13.0 failed 1 times, most recent failure:
    Lost task 7.0 in stage 
    13.0 (TID 215, localhost, executor driver): 
    org.apache.spark.api.python.PythonException:
    Traceback (most recent call 
    last):
    File "C:\opt\spark\spark-2.2.0-bin-
    hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 166, in main
    File "C:\opt\spark\spark-2.2.0-bin-
    hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 57, in 
    read_command
    File "C:\opt\spark\spark-2.2.0-bin-
    hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", 
    line 454, in loads
    return pickle.loads(obj)
    MemoryError

    at 
    org.apache.spark.api.python.PythonRunner$$anon$1.
    read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>
    (PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

    Driver stacktrace:
    at 

    org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler
    $DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
    at 
    org.apache.spark.scheduler.DAGScheduler
    $$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
    at org.apache.spark.scheduler.
    DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
    at scala.collection.
    mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.
    DAGScheduler.abortStage(DAGScheduler.scala:1486)
    at org.apache.spark.scheduler.
    DAGScheduler$$anonfun$handleTaskSetFailed$1.apply
    (DAGScheduler.scala:814)
    at org.apache.spark.scheduler.
    DAGScheduler$$anonfun$handleTaskSetFailed$1.apply
    (DAGScheduler.scala:814)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.
    DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
    at org.apache.spark.scheduler.
    DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
    at org.apache.spark.scheduler.
    DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
    at org.apache.spark.scheduler.
    DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.
    runJob(DAGScheduler.scala:630)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
    at org.apache.spark.rdd.RDDOperationScope$.
    withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.
    withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
    at org.apache.spark.api.python.PythonRDD$.
    collectAndServe(PythonRDD.scala:458)
    at org.apache.spark.api.python.PythonRDD.
    collectAndServe(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Unknown Source)
    Caused by: org.apache.spark.api.python.PythonException: 
    Traceback (most recent call last):
    File "C:\opt\spark\spark-2.2.0-bin-
    hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
    line 166, in main
    File "C:\opt\spark\spark-2.2.0-bin-
    hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py",
    line 57, in read_command
    File "C:\opt\spark\spark-2.2.0-bin-
    hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py",
    line 454, in loads
    return pickle.loads(obj)
    MemoryError

    at org.apache.spark.api.python.
    PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>
    (PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    1 more

Upvotes: 2

Views: 3088

Answers (1)

Thiago Baldim
Thiago Baldim

Reputation: 7742

Let's explain a little bit how PySpark works.

Using pyspark with 16 cores for each worker, you are requesting Spark to start in parallel 16 instances of Python for each JVM worker. You can see in the image below:

enter image description here

So according to I can check here about your configuration, you are requesting a worker with 4Gb each, and each one will run with 16 cores. So this will create a structure with 1 JVM that will create 16 pipes, and 16 python instances that will run in parallel. This error that you are facing is about not enough memory for the Python to run.

Maybe you need to reduce the number of the cores per worker and it can handle the process, or you can add more memory.

For more details check here.

Upvotes: 2

Related Questions