Reputation: 304
I'm running spark via pycharm and respectively pyspark shell. I've stacked with this error:
: java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.api.python.PythonRDD$.readRDDFromFile(PythonRDD.scala:416)
at org.apache.spark.api.python.PythonRDD.readRDDFromFile(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:748)
My code is:
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
import time
if __name__ == '__main__':
print("Started at " + time.strftime("%H:%M:%S"))
conf = (SparkConf()
.setAppName("TestRdd") \
.set('spark.driver.cores', '1') \
.set('spark.executor.cores', '1') \
.set('spark.driver.memory', '16G') \
.set('spark.executor.memory', '9G'))
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1000000000),100)
print(rdd.take(10))
print("Finished at " + time.strftime("%H:%M:%S"))
These are max memory settings,I can set on the cluster. I tried to allocate all memory to 1 core for creating rdd. But seems to me that application fails before distributing dataset. It fails on creating step I assume. Also I tried to set various number of partitions 100-10000. I've calculated how much memory it would take, so 1Billion of int - aproximately 4.5-4.7Gb in memory, less then I have, but no luck.
How can I optimize and force to run my code?
Upvotes: 4
Views: 2188
Reputation: 35249
TL;DR Don't use parallelize
outside tests and simple experiments. Because you use Python 2.7, range
is not lazy, so you'll materialize a full range of values multiple types:
list
after the call.Using xrange
would help, but you shouldn't use parallelize
in the first place (or Python 2 in 2018).
If you want to create a series of values just use SparkContext.range
range(start, end=None, step=1, numSlices=None)
Create a new RDD of int containing elements from start to end (exclusive), increased by step every element. Can be called the same way as python’s built-in range() function. If called with a single argument, the argument is interpreted as end, and start is set to 0.
so in your case:
rdd = sc.range(1000000000, numSlices=100)
With DataFrame
:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.range(1000000000, numPartitions=100)
Upvotes: 2