Le_Coeur
Le_Coeur

Reputation: 1541

PySpark OOM for multiple data files

I want to process several idependent csv files of similar sizes (100 MB) in parallel with PySpark. I'm running PySpark on a single machine: spark.driver.memory 20g spark.executor.memory 2g local[1]

File content: type (has the same value within each csv), timestamp, price

First I tested it on one csv (note I used 35 different window functions):

    logData = spark.read.csv("TypeA.csv", header=False,schema=schema)
    // Compute moving avg. I used 35 different moving averages.
    w = (Window.partitionBy("type").orderBy(f.col("timestamp").cast("long")).rangeBetween(-24*7*3600 * i, 0))
    logData = logData.withColumn("moving_avg", f.avg("price").over(w))
    // Some other simple operations... No Agg, no sort
    logData.write.parquet("res.pr")

This works great. However, i had two issues with scaling this job:

  1. I tried to increase number of window functions to 50 the job OOMs. Not sure why PySpark doesn't spill to disk in this case, since window functions are independent of each other
  2. I tried to run the job for 2 CSV files, it also OOMs. It is also not clear why it is not spilled to disk, since the window functions are basically partitioned by CSV files, so they are independent.

The question is why PySpark doesn't spill to disk in these two cases to prevent OOM, or how can I hint the Spark to do it?

Upvotes: 0

Views: 528

Answers (2)

Warren Zhu
Warren Zhu

Reputation: 1495

In theory, you could process all these 600 files in one single machine. Spark should spill to disk when meemory is not enough. But there're some points to consider:

  1. As the logic involves window agg, which results in heavy shuffle operation. You need to check whether OOM happened on map or reduce phase. Map phase process each partition of file, then write shuffle output into some file. Then reduce phase need to fetch all these shuffle output from all map tasks. It's obvious that in your case you can't hold all map tasks running.
  2. So it's highly likely that OOM happened on map phase. If this is the case, it means the memory per core can't process one signle partition of file. Please be aware that spark will do rough estimation of memory usage, then do spill if it thinks it should be. As the estatimation is not accurate, so it's still possible OOM. You can tune partition size by below configs:
spark.sql.files.maxPartitionBytes (default 128MB)

Usaually, 128M input needs 2GB heap with total 4G executor memory as

executor JVM heap execution memory (0.5 of total executor memory) =
(total executor memory - executor.memoryOverhead (default 0.1)) * spark.memory.storageFraction (0.6) 

You can post all your configs in Spark UI for further investigation.

Upvotes: 0

ste1213
ste1213

Reputation: 11

If your machine cannot run all of these you can do that in sequence and write the data of each bulk of files before loading the next bulk.

I'm not sure if this is what you mean but you can try hint spark to write some of the data to your disk instead of keep it on RAM with:

df.persist(StorageLevel.MEMORY_AND_DISK)

Update if it helps

Upvotes: 0

Related Questions