Reputation: 23
My settings are: Spark 2.1 on a 3 node YARN cluster with 160 GB, 48 vcores.
Dynamic allocation turned on.
spark.executor.memory=6G
, spark.executor.cores=6
First, I am reading hive tables: orders (329MB) and lineitems (1.43GB) and
doing a left outer join.
Next, I apply 7 different filter conditions based on the joined
dataset (something like var line1 = joinedDf.filter("linenumber=1")
, var line2 = joinedDf.filter("l_linenumber=2")
, etc).
Because I'm filtering on the joined dataset multiple times, I thought doing a persist (MEMORY_ONLY
) would help here as the joined dataset will fits fully in memory.
I noticed that with persist, the Spark application takes longer to run than without persist (3.5 mins vs 3.3 mins). With persist, the DAG shows that a single stage was created for persist and other downstream jobs are waiting for the persist to complete. Does that mean persist is a blocking call? Or do stages in other jobs start processing when persisted blocks become available?
In the non-persist case, different jobs are creating different stages to read the same data. Data is read multiple times in different stages, but this is still is turning out to be faster than the persist case.
With larger data sets, persist actually causes executors to run out of
memory (Java heap space). Without persist, the Spark jobs complete just fine. I looked at some other suggestions here: Spark java.lang.OutOfMemoryError: Java heap space
.
I tried increasing/decreasing executor cores, persisting
with disk only, increasing partitions, modifying the storage ratio, but nothing seems to help with executor memory issues.
I would appreciate it if someone could mention how persist works, in what cases it is faster than not-persisting and more importantly, how to go about troubleshooting out of memory issues.
Upvotes: 1
Views: 4072
Reputation: 2495
I'd recommend reading up on the difference between transformations and actions in spark. I must admit that I've been bitten by this myself on multiple occasions.
Data in spark is evaluated lazily, which essentially means nothing happens until an "action" is performed. The .filter()
function is a transformation, so nothing actually happens when your code reaches that point, except to add a section to the transformation pipeline. A call to .persist()
behaves in the same way.
If your code downstream of the .persist()
call has multiple actions that can be triggered simultaneously, then it's quite likely that you are actually "persisting" the data for each action separately, and eating up memory (The "Storage' tab in the Spark UI will tell you the % cached of the dataset, if it's more than 100% cached, then you are seeing what I describe here). Worse, you may never actually be using cached data.
Generally, if you have a point in code where the data set forks into two separate transformation pipelines (each of the separate .filter()
s in your example), a .persist()
is a good idea to prevent multiple readings of your data source, and/or to save the result of an expensive transformation pipeline before the fork.
Many times it's a good idea to trigger a single action right after the .persist()
call (before the data forks) to ensure that later actions (which may run simultaneously) read from the persisted cache, rather than evaluate (and uselessly cache) the data independently.
TL;DR:
Do a joinedDF.count()
after your .persist()
, but before your .filter()
s.
Upvotes: 4