Bhavya Nag
Bhavya Nag

Reputation: 25

Memory Management Pyspark

1.) I understand that "Spark's operators spills data to disk if it does not fit memory allowing it to run well on any sized data". If this is true, why do we ever get OOM (Out of Memory) errors?

2.) Increasing the no. of executor cores increases parallelism. Would that also increase the chances of OOM, because the same memory is now divided into smaller parts for each core?

3.) Spark is much more susceptible to OOM because it performs operations in memory as compared to Hive, which repeatedly reads, writes into disk. Is that correct?

Upvotes: 1

Views: 1104

Answers (2)

Oscar Lopez M.
Oscar Lopez M.

Reputation: 605

There is one angle that you need to consider there. You may get memory leaks if the data is not properly distributed. That means that you need to distribute your data evenly (if possible) on the Tasks so that you reduce shuffling as much as possible and make those Tasks to manage their own data. So if you need to perform a join, if data is distributed randomly, every Task (and therefore executor) will have to:

  1. See what data they have
  2. Send data to other executors (and tasks) to provide the same keys they need
  3. Request the data that is needed by that task to the others

All that data exchange may cause network bottlenecks if you have a large dataset and also will make every Task to hold their data in memory plus whatever has been sent and temporary objects. All of those will blow up memory.

So to prevent that situation you can:

  • Load the data already repartitioned. By that I mean, if you are loading from a DB, try Spark stride as defined here. Please refer to the partitionColumn, lowerBound, upperBound attributes. That way you will create a number of partitions on the dataframe that will set the data on different tasks based on the criteria you need. If you are going to use a join of two dataframes, try similar approach on them so that partitions are similar (for not to say same) and that will prevent shuffling over network.
  • When you define partitions, try to make those values as evenly distributed among tasks as possible
  • The size of each partition should fit on memory. Although there could be spill to disk, that would slow down performance
  • If you don't have a column that make the data evenly distributed, try to create one that would have n number of different values, depending on the n number of tasks that you have
  • If you are reading from a csv, that would make it harder to create partitions, but still it's possible. You can either split the data (csv) on multiple files and create multiple dataframes (performing a union after they are loaded) or you can read that big csv and apply a repartition on the column you need. That will create shuffling as well, but it will be done once if you cache the dataframe already repartitioned
  • Reading from parquet it's possible that you may have multiple files but if they are not evenly distributed (because the previous process that generated didn't do it well) you may end up on OOM errors. To prevent that situation, you can load and apply repartition on the dataframe too
  • Or another trick valid for csv, parquet files, orc, etc. is to create a Hive table on top of that and run a query from Spark running a distribute by clause on the data, so that you can make Hive to redistribute, instead of Spark

To your question about Hive and Spark, I think you are right up to some point. Depending on the execute engine that Hive uses in your case (map/reduce, Tez, Hive on Spark, LLAP) you can have different behaviours. With map/reduce, as they are mostly disk operations, the chance to have a OOM is much lower than on Spark. Actually from Memory point of view, map/reduce is not that affected because of a skewed data distribution. But (IMHO) your goal should be to find always the best data distribution for the Spark job you are running and that will prevent that problem

Another consideration is if you are testing in a dev environment that doesn't have same data as in a prod environment. I suppose the data distribution should be similar although volumes may differ a lot (I am talking from experience ;)). In that case, when you assign Spark tuning parameters on the spark-submit command, they may be different in prod. So you need to invest some time on finding the best approach on dev and fine tune in prod

Upvotes: 1

Daniel
Daniel

Reputation: 1242

  1. Huge majority of OOM in Spark are on the driver, not executors. This is usually a result of running .collect or similar actions on a dataset that won't fit in the driver memory.

  2. Spark does a lot of work under the hood to parallelize the work, when using structured APIs (in contrast to RDDs) the chances of causing OOM on executor are really slim. Some combinations of cluster configuration and jobs can cause memory pressure that will impact performance and cause lots of garbage collection to happen so you need to address it, however spark should be able to handle low memory without explicit exception.

  3. Not really - as above, Spark should be able to recover from memory issues when using structured APIs, however it may need intervention if you see garbage collection and performance impact.

Upvotes: 1

Related Questions