Reputation: 406
My Spark application is currently causing executors to die due to YARN attempting to exceed the memory limit. I can't seem to find in the documentation or the O'Reilly book I bought how creating RDD's allocates memory on executors. Could someone walk me through what happens in the following code snippet?
N = 10
array = numpy.random.random_float(N)
# Is the array actually partitioned and serialized out when this is executed?
# Or when an action using this rdd is called? At this point,
# I would expect 1 float, or 4 bytes on each executor.
rdd1 = sc.parallelize(array, 10)
# Transformations return new rdd's, so now I would expect each executor
# to have 2 floats on it, one from rdd, and one from rdd2, so 8 bytes.
rdd2 = rdd1.map(lambda x: x + 2)
# Here is where things get murky. Would this cause 8 bytes of memory to be used
# to account for the intermediate product of rdd1.map(lambda x: x -2)?
# So in Spark's calculations, if we would now require space for
# 4 floats, one for each rdd1/2/3 and one for the intermediate?
rdd3 = rdd1.map(lambda x: x - 2).map(lambda x: x * 2)
# Is this the point where each executor actually has the memory allocated?
# And since I only call collect() on the first rdd,
# would only 4 bytes be sent out to each executor?
rdd1.collect()
#How about now?
rdd2.collect()
Upvotes: 0
Views: 215
Reputation: 15879
I created a Unit test that uses sc.parallelize()
and put a break point after this line is executed. I do not see any memory allocation being logged until the subsequent collect()
is called.
Java code...
JavaRDD<String> fooBars = sparkCtx.parallelize(Lists.newArrayList("foo", "bar"));
JavaRDD<String> abcs = fooBars.map(f -> "abc");
abcs.collect(); // break point here
From the log when collect() is executed...
2016-02-03 15:49:14 INFO DAGScheduler:59 - Got job 0 (collect at MyTest.java:40) with 1 output partitions (allowLocal=false)
2016-02-03 15:49:14 INFO DAGScheduler:59 - Final stage: Stage 0(collect at MyTest.java:40)
2016-02-03 15:49:14 INFO DAGScheduler:59 - Parents of final stage: List()
2016-02-03 15:49:14 INFO DAGScheduler:59 - Missing parents: List()
2016-02-03 15:49:14 INFO DAGScheduler:59 - Submitting Stage 0 (ParallelCollectionRDD[0] at parallelize at RddFactory.java:42), which has no missing parents
2016-02-03 15:49:14 INFO MemoryStore:59 - ensureFreeSpace(1416) called with curMem=0, maxMem=991753666
2016-02-03 15:49:14 INFO MemoryStore:59 - Block broadcast_0 stored as values in memory (estimated size 1416.0 B, free 945.8 MB)
This is consistent with the Spark documentation that explains that the lazy evaluation for non-terminating functions (e.g. map()
) are not executed until a terminating function is used (e.g. collect()
). This example also shows that parallelize()
is also considered a non-terminating function because ensureFreeSpace...
was not logged until the collect() statement.
To explain your other question about the two collect() statements, Sparks RDD
objects are immutable, therefore when you call rdd1.collect()
it creates this RDD
. If you then call rdd2.collect()
it will have to create that one as well.
Upvotes: 1
Reputation: 330063
Well, this it rather tricky question.
First of all you have include memory that is completely interdependent of the work you perform. It means worker process (at 200MB or so) and memory required to run each Python executor (~30-40MB per interpreter without any additional imports).
Moreover every data which is passed between stages is practically duplicated. First it has to be passed to the workers (JVM) and then piped through sockets to Python interpreters.
Finally there are details of Spark implementation. By default Spark reuses Python interpreters between tasks. It means that temporary objects created in each tasks have to be garbage collected before memory is freed. Usually it shouldn't be a problem but it is definitely something to keep in mind. Moreover data can be spilled to disk if necessary which can make the picture even more complicated.
So while your calculations are more or less correct (assuming you've executed an action and use float32
) it is only a small part of the picture.
Upvotes: 0