Reputation: 1541
I want to process several idependent csv files of similar sizes (100 MB) in parallel with PySpark. I'm running PySpark on a single machine: spark.driver.memory 20g spark.executor.memory 2g local[1]
File content: type (has the same value within each csv), timestamp, price
First I tested it on one csv (note I used 35 different window functions):
logData = spark.read.csv("TypeA.csv", header=False,schema=schema)
// Compute moving avg. I used 35 different moving averages.
w = (Window.partitionBy("type").orderBy(f.col("timestamp").cast("long")).rangeBetween(-24*7*3600 * i, 0))
logData = logData.withColumn("moving_avg", f.avg("price").over(w))
// Some other simple operations... No Agg, no sort
logData.write.parquet("res.pr")
This works great. However, i had two issues with scaling this job:
The question is why PySpark doesn't spill to disk in these two cases to prevent OOM, or how can I hint the Spark to do it?
Upvotes: 0
Views: 528
Reputation: 1495
In theory, you could process all these 600 files in one single machine. Spark should spill to disk when meemory is not enough. But there're some points to consider:
spark.sql.files.maxPartitionBytes (default 128MB)
Usaually, 128M input needs 2GB heap with total 4G executor memory as
executor JVM heap execution memory (0.5 of total executor memory) =
(total executor memory - executor.memoryOverhead (default 0.1)) * spark.memory.storageFraction (0.6)
You can post all your configs in Spark UI for further investigation.
Upvotes: 0
Reputation: 11
If your machine cannot run all of these you can do that in sequence and write the data of each bulk of files before loading the next bulk.
I'm not sure if this is what you mean but you can try hint spark to write some of the data to your disk instead of keep it on RAM with:
df.persist(StorageLevel.MEMORY_AND_DISK)
Update if it helps
Upvotes: 0